Skip to content

Commit

Permalink
cluster: reset stores cache before loading cluster info (#4942) (#4957)
Browse files Browse the repository at this point in the history
close #4941, ref #4942

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-chi-bot and rleungx committed Sep 21, 2022
1 parent 8c4c7a9 commit c7dcc3c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 6 deletions.
7 changes: 4 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
return nil, nil
}

c.core.ResetStores()
start := time.Now()
if err := c.storage.LoadStores(c.core.PutStore); err != nil {
return nil, err
Expand Down Expand Up @@ -1034,10 +1035,10 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
return err
}

// buryStore marks a store as tombstone in cluster.
// BuryStore marks a store as tombstone in cluster.
// The store should be empty before calling this func
// State transition: Offline -> Tombstone.
func (c *RaftCluster) buryStore(storeID uint64) error {
func (c *RaftCluster) BuryStore(storeID uint64) error {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -1181,7 +1182,7 @@ func (c *RaftCluster) checkStores() {
// If the store is empty, it can be buried.
regionCount := c.core.GetStoreRegionCount(offlineStore.GetId())
if regionCount == 0 {
if err := c.buryStore(offlineStore.GetId()); err != nil {
if err := c.BuryStore(offlineStore.GetId()); err != nil {
log.Error("bury store failed",
zap.Stringer("store", offlineStore),
errs.ZapError(err))
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) {
for storeID := uint64(0); storeID <= 4; storeID++ {
store := cluster.GetStore(storeID)
if store == nil || store.IsUp() {
c.Assert(cluster.buryStore(storeID), NotNil)
c.Assert(cluster.BuryStore(storeID), NotNil)
} else {
c.Assert(cluster.buryStore(storeID), IsNil)
c.Assert(cluster.BuryStore(storeID), IsNil)
}
}
}
Expand All @@ -270,7 +270,7 @@ func (s *testClusterInfoSuite) TestReuseAddress(c *C) {
c.Assert(cluster.RemoveStore(3, true), IsNil)
// store 4: tombstone
c.Assert(cluster.RemoveStore(4, true), IsNil)
c.Assert(cluster.buryStore(4), IsNil)
c.Assert(cluster.BuryStore(4), IsNil)

for id := uint64(1); id <= 4; id++ {
storeInfo := cluster.GetStore(id)
Expand Down
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Lock()
defer bc.Unlock()
bc.Stores = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Lock()
Expand Down
57 changes: 57 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,3 +1126,60 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, NotNil)
}

// See https://github.com/tikv/pd/issues/4941
func (s *clusterTestSuite) TestTransferLeaderBack(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(s.ctx, svr.GetClusterRootPath(), svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
c.Assert(storage.SaveMeta(meta), IsNil)
n := 4
stores := make([]*metapb.Store, 0, n)
for i := 1; i <= n; i++ {
store := &metapb.Store{Id: uint64(i), State: metapb.StoreState_Up}
stores = append(stores, store)
}

for _, store := range stores {
c.Assert(storage.SaveStore(store), IsNil)
}
rc, err = rc.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(rc, NotNil)
// offline a store
c.Assert(rc.RemoveStore(1, false), IsNil)
c.Assert(rc.GetStore(1).GetState(), Equals, metapb.StoreState_Offline)

// transfer PD leader to another PD
tc.ResignLeader()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
svr1 := leaderServer.GetServer()
rc1 := svr1.GetRaftCluster()
c.Assert(err, IsNil)
c.Assert(rc1, NotNil)
// tombstone a store, and remove its record
c.Assert(rc1.BuryStore(1), IsNil)
c.Assert(rc1.RemoveTombStoneRecords(), IsNil)

// transfer PD leader back to the previous PD
tc.ResignLeader()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
svr = leaderServer.GetServer()
rc = svr.GetRaftCluster()
c.Assert(rc, NotNil)

// check store count
c.Assert(rc.GetConfig(), DeepEquals, meta)
c.Assert(rc.GetStoreCount(), Equals, 3)
}

0 comments on commit c7dcc3c

Please sign in to comment.