From 9fa48527d2d6939ce1c0ebf9dedfe62df6786779 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 17 Mar 2017 17:53:40 +0800 Subject: [PATCH 1/6] tso: fix possible stale ts when leader changes twice. --- server/leader.go | 18 ++++++++++++------ server/tso.go | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/server/leader.go b/server/leader.go index c94b507ae61..39ff04609a8 100644 --- a/server/leader.go +++ b/server/leader.go @@ -42,10 +42,6 @@ func (s *Server) enableLeader(b bool) { } atomic.StoreInt64(&s.isLeaderValue, value) - - // Reset connections and cluster. - s.closeAllConnections() - s.cluster.stop() } func (s *Server) getLeaderPath() string { @@ -177,8 +173,6 @@ func (s *Server) campaignLeader() error { } log.Debugf("campaign leader ok %s", s.Name()) - s.enableLeader(true) - defer s.enableLeader(false) // Try to create raft cluster. err = s.createRaftCluster() @@ -191,6 +185,18 @@ func (s *Server) campaignLeader() error { return errors.Trace(err) } + s.enableLeader(true) + defer func() { + s.enableLeader(false) + // Reset connections and cluster. + s.closeAllConnections() + s.cluster.stop() + // Clear ts. + s.ts.Store(&atomicObject{ + physical: zeroTime, + }) + }() + log.Infof("PD cluster leader %s is ready to serve", s.Name()) tsTicker := time.NewTicker(updateTimestampStep) diff --git a/server/tso.go b/server/tso.go index a600f272d23..e0c63484e9b 100644 --- a/server/tso.go +++ b/server/tso.go @@ -151,7 +151,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { var resp pdpb.Timestamp for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) - if !ok { + if !ok || current.physical == zeroTime { log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i) time.Sleep(200 * time.Millisecond) continue From 35c4dfa54e193a5e779d94e0ac9ae9a3aec70646 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 17 Mar 2017 18:48:34 +0800 Subject: [PATCH 2/6] address comment. --- server/cluster.go | 6 ++++++ server/leader.go | 15 +++++---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/server/cluster.go b/server/cluster.go index 2d6abe49950..caf7aaa39fd 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -153,6 +153,12 @@ func (s *Server) createRaftCluster() error { return s.cluster.start() } +func (s *Server) stopRaftCluster() { + // Reset connections and cluster. + s.closeAllConnections() + s.cluster.stop() +} + func makeStoreKey(clusterRootPath string, storeID uint64) string { return strings.Join([]string{clusterRootPath, "s", fmt.Sprintf("%020d", storeID)}, "/") } diff --git a/server/leader.go b/server/leader.go index 39ff04609a8..1c64a6c3866 100644 --- a/server/leader.go +++ b/server/leader.go @@ -179,23 +179,18 @@ func (s *Server) campaignLeader() error { if err != nil { return errors.Trace(err) } + defer s.stopRaftCluster() log.Debug("sync timestamp for tso") if err = s.syncTimestamp(); err != nil { return errors.Trace(err) } + defer s.ts.Store(&atomicObject{ + physical: zeroTime, + }) s.enableLeader(true) - defer func() { - s.enableLeader(false) - // Reset connections and cluster. - s.closeAllConnections() - s.cluster.stop() - // Clear ts. - s.ts.Store(&atomicObject{ - physical: zeroTime, - }) - }() + defer s.enableLeader(false) log.Infof("PD cluster leader %s is ready to serve", s.Name()) From 0246015c0dbf521753aa9253d7b41b655fe87643 Mon Sep 17 00:00:00 2001 From: disksing Date: Sat, 18 Mar 2017 23:47:06 +0800 Subject: [PATCH 3/6] add leader transfer test. --- pd-client/leader_change_test.go | 70 ++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/pd-client/leader_change_test.go b/pd-client/leader_change_test.go index 838b3630262..42ca1b8c1be 100644 --- a/pd-client/leader_change_test.go +++ b/pd-client/leader_change_test.go @@ -14,6 +14,9 @@ package pd import ( + "context" + "path/filepath" + "strconv" "time" "github.com/coreos/etcd/clientv3" @@ -34,12 +37,12 @@ func mustGetEtcdClient(c *C, svrs map[string]*server.Server) *clientv3.Client { return nil } -func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { - cfgs := server.NewTestMultiConfig(3) +func (s *testLeaderChangeSuite) prepareClusterN(c *C, n int) (svrs map[string]*server.Server, endpoints []string, closeFunc func()) { + cfgs := server.NewTestMultiConfig(n) - ch := make(chan *server.Server, 3) + ch := make(chan *server.Server, n) - for i := 0; i < 3; i++ { + for i := 0; i < n; i++ { cfg := cfgs[i] go func() { @@ -50,13 +53,13 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { }() } - svrs := make(map[string]*server.Server, 3) - for i := 0; i < 3; i++ { + svrs = make(map[string]*server.Server, n) + for i := 0; i < n; i++ { svr := <-ch svrs[svr.GetAddr()] = svr } - endpoints := make([]string, 0, 3) + endpoints = make([]string, 0, n) for _, svr := range svrs { go svr.Run() endpoints = append(endpoints, svr.GetEndpoints()...) @@ -64,14 +67,20 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { mustWaitLeader(c, svrs) - defer func() { + closeFunc = func() { for _, svr := range svrs { svr.Close() } for _, cfg := range cfgs { cleanServer(cfg) } - }() + } + return +} + +func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { + svrs, endpoints, closeFunc := s.prepareClusterN(c, 3) + defer closeFunc() cli, err := NewClient(endpoints) c.Assert(err, IsNil) @@ -111,6 +120,49 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { c.Error("failed getTS from new leader after 10 seconds") } +func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { + servers, endpoints, closeFunc := s.prepareClusterN(c, 3) + defer closeFunc() + + cli, err := NewClient(endpoints) + c.Assert(err, IsNil) + defer cli.Close() + + quit := make(chan struct{}) + lastPhysical, lastLogical, err := cli.GetTS() + c.Assert(err, IsNil) + go func() { + for { + select { + case <-quit: + return + default: + } + + physical, logical, err := cli.GetTS() + if err == nil { + c.Assert(lastPhysical<<18+lastLogical, Less, physical<<18+logical) + } + time.Sleep(time.Millisecond) + } + }() + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: time.Second, + }) + c.Assert(err, IsNil) + leaderPath := filepath.Join("/pd", strconv.FormatUint(cli.GetClusterID(), 10), "leader") + for i := 0; i < 10; i++ { + mustWaitLeader(c, servers) + _, err = etcdCli.Delete(context.TODO(), leaderPath) + c.Assert(err, IsNil) + // Sleep to make sure all servers are notified and starts campaign. + time.Sleep(time.Second) + } + close(quit) +} + func mustConnectLeader(c *C, urls []string, leaderAddr string) { connCh := make(chan *conn) go func() { From d8e6ae47ed8fb952d1d119f95ae921da3ae74614 Mon Sep 17 00:00:00 2001 From: disksing Date: Sun, 19 Mar 2017 00:04:41 +0800 Subject: [PATCH 4/6] fix CI. --- pd-client/leader_change_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pd-client/leader_change_test.go b/pd-client/leader_change_test.go index 42ca1b8c1be..f0b920cd4e7 100644 --- a/pd-client/leader_change_test.go +++ b/pd-client/leader_change_test.go @@ -139,8 +139,8 @@ func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { default: } - physical, logical, err := cli.GetTS() - if err == nil { + physical, logical, err1 := cli.GetTS() + if err1 == nil { c.Assert(lastPhysical<<18+lastLogical, Less, physical<<18+logical) } time.Sleep(time.Millisecond) From f434ab45a76ea26dadb1edc30169a814c4311643 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 20 Mar 2017 11:14:19 +0800 Subject: [PATCH 5/6] fix test. --- pd-client/leader_change_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pd-client/leader_change_test.go b/pd-client/leader_change_test.go index f0b920cd4e7..97a9c8b4090 100644 --- a/pd-client/leader_change_test.go +++ b/pd-client/leader_change_test.go @@ -14,7 +14,6 @@ package pd import ( - "context" "path/filepath" "strconv" "time" @@ -23,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/api" + "golang.org/x/net/context" ) var _ = Suite(&testLeaderChangeSuite{}) @@ -121,7 +121,7 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { } func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { - servers, endpoints, closeFunc := s.prepareClusterN(c, 3) + servers, endpoints, closeFunc := s.prepareClusterN(c, 2) defer closeFunc() cli, err := NewClient(endpoints) @@ -142,6 +142,7 @@ func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { physical, logical, err1 := cli.GetTS() if err1 == nil { c.Assert(lastPhysical<<18+lastLogical, Less, physical<<18+logical) + lastPhysical, lastLogical = physical, logical } time.Sleep(time.Millisecond) } From 982dbf9f0078f6487d084947334f09fbcc6db5de Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 20 Mar 2017 14:51:45 +0800 Subject: [PATCH 6/6] extract makeTS(). --- pd-client/leader_change_test.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pd-client/leader_change_test.go b/pd-client/leader_change_test.go index 97a9c8b4090..99f2be8f6cf 100644 --- a/pd-client/leader_change_test.go +++ b/pd-client/leader_change_test.go @@ -86,8 +86,9 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { c.Assert(err, IsNil) defer cli.Close() - p1, l1, err := cli.GetTS() + physical, logical, err := cli.GetTS() c.Assert(err, IsNil) + lastTS := s.makeTS(physical, logical) leader, err := getLeader(endpoints) c.Assert(err, IsNil) @@ -110,9 +111,10 @@ func (s *testLeaderChangeSuite) TestLeaderChange(c *C) { c.Assert(changed, IsTrue) for i := 0; i < 20; i++ { - p2, l2, err := cli.GetTS() + physical, logical, err := cli.GetTS() if err == nil { - c.Assert(p1<<18+l1, Less, p2<<18+l2) + ts := s.makeTS(physical, logical) + c.Assert(lastTS, Less, ts) return } time.Sleep(500 * time.Millisecond) @@ -129,8 +131,9 @@ func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { defer cli.Close() quit := make(chan struct{}) - lastPhysical, lastLogical, err := cli.GetTS() + physical, logical, err := cli.GetTS() c.Assert(err, IsNil) + lastTS := s.makeTS(physical, logical) go func() { for { select { @@ -141,8 +144,9 @@ func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { physical, logical, err1 := cli.GetTS() if err1 == nil { - c.Assert(lastPhysical<<18+lastLogical, Less, physical<<18+logical) - lastPhysical, lastLogical = physical, logical + ts := s.makeTS(physical, logical) + c.Assert(lastTS, Less, ts) + lastTS = ts } time.Sleep(time.Millisecond) } @@ -164,6 +168,10 @@ func (s *testLeaderChangeSuite) TestLeaderTransfer(c *C) { close(quit) } +func (s *testLeaderChangeSuite) makeTS(physical, logical int64) uint64 { + return uint64(physical<<18 + logical) +} + func mustConnectLeader(c *C, urls []string, leaderAddr string) { connCh := make(chan *conn) go func() {