Skip to content

Commit

Permalink
session: support leader-and-follower for tidb_replica_read (#14761)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Feb 14, 2020
1 parent ef5fffb commit f71425b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 3 deletions.
7 changes: 4 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ const (
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
// In some cases the default value is 0, which should be treated as `ReplicaReadLeader`.
return r != ReplicaReadLeader && r != 0
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.SetReplicaRead(kv.ReplicaReadFollower)
} else if strings.EqualFold(val, "leader-and-follower") {
s.SetReplicaRead(kv.ReplicaReadMixed)
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.SetReplicaRead(kv.ReplicaReadLeader)
}
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader-and-follower") {
return "leader-and-follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader")
c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadLeader)
SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader-and-follower"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader-and-follower")
c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadMixed)

SetSessionSystemVar(v, TiDBEnableStmtSummary, types.NewStringDatum("on"))
val, err = GetSessionSystemVar(v, TiDBEnableStmtSummary)
Expand Down
26 changes: 26 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ func (r *RegionStore) follower(seed uint32) int32 {
return r.workTiKVIdx
}

// return next leader or follower store's index
func (r *RegionStore) peer(seed uint32) int32 {
candidates := make([]int32, 0, len(r.stores))
for i := 0; i < len(r.stores); i++ {
if r.stores[i].storeType != kv.TiKV {
continue
}
if r.storeFails[i] != atomic.LoadUint32(&r.stores[i].fail) {
continue
}
candidates = append(candidates, int32(i))
}

if len(candidates) == 0 {
return r.workTiKVIdx
}
return candidates[int32(seed)%int32(len(candidates))]
}

// init initializes region after constructed.
func (r *Region) init(c *RegionCache) {
// region store pull used store from global store map
Expand Down Expand Up @@ -306,6 +325,8 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
switch replicaRead {
case kv.ReplicaReadFollower:
store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed)
case kv.ReplicaReadMixed:
store, peer, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed)
default:
store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore)
}
Expand Down Expand Up @@ -1055,6 +1076,11 @@ func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (*
return r.getStorePeer(rs, rs.follower(followerStoreSeed))
}

// AnyStorePeer returns a leader or follower store with the associated peer.
func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) {
return r.getStorePeer(rs, rs.peer(followerStoreSeed))
}

// RegionVerID is a unique ID that can identify a Region at a specific version.
type RegionVerID struct {
id uint64
Expand Down
62 changes: 62 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,41 @@ func (s *testRegionCacheSuite) TestFollowerReadFallback(c *C) {
c.Assert(ctx.Peer.Id, Equals, peer3)
}

func (s *testRegionCacheSuite) TestMixedReadFallback(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)

loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
c.Assert(len(ctx.Meta.Peers), Equals, 3)

// verify follower to be store1, store2 and store3
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 0)
c.Assert(err, IsNil)
c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1)

ctxFollower2, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 1)
c.Assert(err, IsNil)
c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2)

ctxFollower3, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 2)
c.Assert(err, IsNil)
c.Assert(ctxFollower3.Peer.Id, Equals, peer3)

// send fail on store2, next follower read is going to fallback to store3
s.cache.OnSendFail(s.bo, ctxFollower1, false, errors.New("test error"))
ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadMixed, 0)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
}

func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
Expand Down Expand Up @@ -971,6 +1006,33 @@ func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
c.Assert(followReqSeed, Equals, uint32(1))
}

func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)

reqSend := NewRegionRequestSender(s.cache, nil)

// follower read failed on store1
followReqSeed := uint32(0)
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadMixed, followReqSeed)
c.Assert(err, IsNil)
c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1)
c.Assert(ctxFollower1.Store.storeID, Equals, s.store1)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
c.Assert(followReqSeed, Equals, uint32(1))
}

func createSampleRegion(startKey, endKey []byte) *Region {
return &Region{
meta: &metapb.Region{
Expand Down

0 comments on commit f71425b

Please sign in to comment.