Skip to content

Commit

Permalink
Merge pull request #572 from pingcap/disksing/tso
Browse files Browse the repository at this point in the history
tso: fix possible stale ts when leader changes twice.
  • Loading branch information
IANTHEREAL committed Mar 20, 2017
2 parents 36d7b87 + 982dbf9 commit bdc3c2a
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 19 deletions.
85 changes: 73 additions & 12 deletions pd-client/leader_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package pd

import (
"path/filepath"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
. "github.com/pingcap/check"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/api"
"golang.org/x/net/context"
)

var _ = Suite(&testLeaderChangeSuite{})
Expand All @@ -34,12 +37,12 @@ func mustGetEtcdClient(c *C, svrs map[string]*server.Server) *clientv3.Client {
return nil
}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
cfgs := server.NewTestMultiConfig(3)
func (s *testLeaderChangeSuite) prepareClusterN(c *C, n int) (svrs map[string]*server.Server, endpoints []string, closeFunc func()) {
cfgs := server.NewTestMultiConfig(n)

ch := make(chan *server.Server, 3)
ch := make(chan *server.Server, n)

for i := 0; i < 3; i++ {
for i := 0; i < n; i++ {
cfg := cfgs[i]

go func() {
Expand All @@ -50,35 +53,42 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
}()
}

svrs := make(map[string]*server.Server, 3)
for i := 0; i < 3; i++ {
svrs = make(map[string]*server.Server, n)
for i := 0; i < n; i++ {
svr := <-ch
svrs[svr.GetAddr()] = svr
}

endpoints := make([]string, 0, 3)
endpoints = make([]string, 0, n)
for _, svr := range svrs {
go svr.Run()
endpoints = append(endpoints, svr.GetEndpoints()...)
}

mustWaitLeader(c, svrs)

defer func() {
closeFunc = func() {
for _, svr := range svrs {
svr.Close()
}
for _, cfg := range cfgs {
cleanServer(cfg)
}
}()
}
return
}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
svrs, endpoints, closeFunc := s.prepareClusterN(c, 3)
defer closeFunc()

cli, err := NewClient(endpoints)
c.Assert(err, IsNil)
defer cli.Close()

p1, l1, err := cli.GetTS()
physical, logical, err := cli.GetTS()
c.Assert(err, IsNil)
lastTS := s.makeTS(physical, logical)

leader, err := getLeader(endpoints)
c.Assert(err, IsNil)
Expand All @@ -101,16 +111,67 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
c.Assert(changed, IsTrue)

for i := 0; i < 20; i++ {
p2, l2, err := cli.GetTS()
physical, logical, err := cli.GetTS()
if err == nil {
c.Assert(p1<<18+l1, Less, p2<<18+l2)
ts := s.makeTS(physical, logical)
c.Assert(lastTS, Less, ts)
return
}
time.Sleep(500 * time.Millisecond)
}
c.Error("failed getTS from new leader after 10 seconds")
}

func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) {
servers, endpoints, closeFunc := s.prepareClusterN(c, 2)
defer closeFunc()

cli, err := NewClient(endpoints)
c.Assert(err, IsNil)
defer cli.Close()

quit := make(chan struct{})
physical, logical, err := cli.GetTS()
c.Assert(err, IsNil)
lastTS := s.makeTS(physical, logical)
go func() {
for {
select {
case <-quit:
return
default:
}

physical, logical, err1 := cli.GetTS()
if err1 == nil {
ts := s.makeTS(physical, logical)
c.Assert(lastTS, Less, ts)
lastTS = ts
}
time.Sleep(time.Millisecond)
}
}()

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Second,
})
c.Assert(err, IsNil)
leaderPath := filepath.Join("/pd", strconv.FormatUint(cli.GetClusterID(), 10), "leader")
for i := 0; i < 10; i++ {
mustWaitLeader(c, servers)
_, err = etcdCli.Delete(context.TODO(), leaderPath)
c.Assert(err, IsNil)
// Sleep to make sure all servers are notified and starts campaign.
time.Sleep(time.Second)
}
close(quit)
}

func (s *testLeaderChangeSuite) makeTS(physical, logical int64) uint64 {
return uint64(physical<<18 + logical)
}

func mustConnectLeader(c *C, urls []string, leaderAddr string) {
connCh := make(chan *conn)
go func() {
Expand Down
6 changes: 6 additions & 0 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func (s *Server) createRaftCluster() error {
return s.cluster.start()
}

func (s *Server) stopRaftCluster() {
// Reset connections and cluster.
s.closeAllConnections()
s.cluster.stop()
}

func makeStoreKey(clusterRootPath string, storeID uint64) string {
return strings.Join([]string{clusterRootPath, "s", fmt.Sprintf("%020d", storeID)}, "/")
}
Expand Down
13 changes: 7 additions & 6 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ func (s *Server) enableLeader(b bool) {
}

atomic.StoreInt64(&s.isLeaderValue, value)

// Reset connections and cluster.
s.closeAllConnections()
s.cluster.stop()
}

func (s *Server) getLeaderPath() string {
Expand Down Expand Up @@ -177,19 +173,24 @@ func (s *Server) campaignLeader() error {
}

log.Debugf("campaign leader ok %s", s.Name())
s.enableLeader(true)
defer s.enableLeader(false)

// Try to create raft cluster.
err = s.createRaftCluster()
if err != nil {
return errors.Trace(err)
}
defer s.stopRaftCluster()

log.Debug("sync timestamp for tso")
if err = s.syncTimestamp(); err != nil {
return errors.Trace(err)
}
defer s.ts.Store(&atomicObject{
physical: zeroTime,
})

s.enableLeader(true)
defer s.enableLeader(false)

log.Infof("PD cluster leader %s is ready to serve", s.Name())

Expand Down
2 changes: 1 addition & 1 deletion server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp
for i := 0; i < maxRetryCount; i++ {
current, ok := s.ts.Load().(*atomicObject)
if !ok {
if !ok || current.physical == zeroTime {
log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i)
time.Sleep(200 * time.Millisecond)
continue
Expand Down

0 comments on commit bdc3c2a

Please sign in to comment.