diff --git a/pd-client/leader_change_test.go b/pd-client/leader_change_test.go index 838b3630262..99f2be8f6cf 100644 --- a/pd-client/leader_change_test.go +++ b/pd-client/leader_change_test.go @@ -14,12 +14,15 @@ package pd import ( + "path/filepath" + "strconv" "time" "github.com/coreos/etcd/clientv3" . "github.com/pingcap/check" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/api" + "golang.org/x/net/context" ) var _ = Suite(&testLeaderChangeSuite{}) @@ -34,12 +37,12 @@ func mustGetEtcdClient(c *C, svrs map[string]*server.Server) *clientv3.Client { return nil } -func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { - cfgs := server.NewTestMultiConfig(3) +func (s *testLeaderChangeSuite) prepareClusterN(c *C, n int) (svrs map[string]*server.Server, endpoints []string, closeFunc func()) { + cfgs := server.NewTestMultiConfig(n) - ch := make(chan *server.Server, 3) + ch := make(chan *server.Server, n) - for i := 0; i < 3; i++ { + for i := 0; i < n; i++ { cfg := cfgs[i] go func() { @@ -50,13 +53,13 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { }() } - svrs := make(map[string]*server.Server, 3) - for i := 0; i < 3; i++ { + svrs = make(map[string]*server.Server, n) + for i := 0; i < n; i++ { svr := <-ch svrs[svr.GetAddr()] = svr } - endpoints := make([]string, 0, 3) + endpoints = make([]string, 0, n) for _, svr := range svrs { go svr.Run() endpoints = append(endpoints, svr.GetEndpoints()...) @@ -64,21 +67,28 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { mustWaitLeader(c, svrs) - defer func() { + closeFunc = func() { for _, svr := range svrs { svr.Close() } for _, cfg := range cfgs { cleanServer(cfg) } - }() + } + return +} + +func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { + svrs, endpoints, closeFunc := s.prepareClusterN(c, 3) + defer closeFunc() cli, err := NewClient(endpoints) c.Assert(err, IsNil) defer cli.Close() - p1, l1, err := cli.GetTS() + physical, logical, err := cli.GetTS() c.Assert(err, IsNil) + lastTS := s.makeTS(physical, logical) leader, err := getLeader(endpoints) c.Assert(err, IsNil) @@ -101,9 +111,10 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { c.Assert(changed, IsTrue) for i := 0; i < 20; i++ { - p2, l2, err := cli.GetTS() + physical, logical, err := cli.GetTS() if err == nil { - c.Assert(p1<<18+l1, Less, p2<<18+l2) + ts := s.makeTS(physical, logical) + c.Assert(lastTS, Less, ts) return } time.Sleep(500 * time.Millisecond) @@ -111,6 +122,56 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { c.Error("failed getTS from new leader after 10 seconds") } +func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { + servers, endpoints, closeFunc := s.prepareClusterN(c, 2) + defer closeFunc() + + cli, err := NewClient(endpoints) + c.Assert(err, IsNil) + defer cli.Close() + + quit := make(chan struct{}) + physical, logical, err := cli.GetTS() + c.Assert(err, IsNil) + lastTS := s.makeTS(physical, logical) + go func() { + for { + select { + case <-quit: + return + default: + } + + physical, logical, err1 := cli.GetTS() + if err1 == nil { + ts := s.makeTS(physical, logical) + c.Assert(lastTS, Less, ts) + lastTS = ts + } + time.Sleep(time.Millisecond) + } + }() + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: time.Second, + }) + c.Assert(err, IsNil) + leaderPath := filepath.Join("/pd", strconv.FormatUint(cli.GetClusterID(), 10), "leader") + for i := 0; i < 10; i++ { + mustWaitLeader(c, servers) + _, err = etcdCli.Delete(context.TODO(), leaderPath) + c.Assert(err, IsNil) + // Sleep to make sure all servers are notified and starts campaign. + time.Sleep(time.Second) + } + close(quit) +} + +func (s *testLeaderChangeSuite) makeTS(physical, logical int64) uint64 { + return uint64(physical<<18 + logical) +} + func mustConnectLeader(c *C, urls []string, leaderAddr string) { connCh := make(chan *conn) go func() { diff --git a/server/cluster.go b/server/cluster.go index 2d6abe49950..caf7aaa39fd 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -153,6 +153,12 @@ func (s *Server) createRaftCluster() error { return s.cluster.start() } +func (s *Server) stopRaftCluster() { + // Reset connections and cluster. + s.closeAllConnections() + s.cluster.stop() +} + func makeStoreKey(clusterRootPath string, storeID uint64) string { return strings.Join([]string{clusterRootPath, "s", fmt.Sprintf("%020d", storeID)}, "/") } diff --git a/server/leader.go b/server/leader.go index c94b507ae61..1c64a6c3866 100644 --- a/server/leader.go +++ b/server/leader.go @@ -42,10 +42,6 @@ func (s *Server) enableLeader(b bool) { } atomic.StoreInt64(&s.isLeaderValue, value) - - // Reset connections and cluster. - s.closeAllConnections() - s.cluster.stop() } func (s *Server) getLeaderPath() string { @@ -177,19 +173,24 @@ func (s *Server) campaignLeader() error { } log.Debugf("campaign leader ok %s", s.Name()) - s.enableLeader(true) - defer s.enableLeader(false) // Try to create raft cluster. err = s.createRaftCluster() if err != nil { return errors.Trace(err) } + defer s.stopRaftCluster() log.Debug("sync timestamp for tso") if err = s.syncTimestamp(); err != nil { return errors.Trace(err) } + defer s.ts.Store(&atomicObject{ + physical: zeroTime, + }) + + s.enableLeader(true) + defer s.enableLeader(false) log.Infof("PD cluster leader %s is ready to serve", s.Name()) diff --git a/server/tso.go b/server/tso.go index a600f272d23..e0c63484e9b 100644 --- a/server/tso.go +++ b/server/tso.go @@ -151,7 +151,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { var resp pdpb.Timestamp for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) - if !ok { + if !ok || current.physical == zeroTime { log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i) time.Sleep(200 * time.Millisecond) continue