Skip to content

Commit

Permalink
Merge branch 'master' into request-source
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 committed Jun 21, 2022
2 parents c04f4ef + 98a4e27 commit d568a81
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 42 deletions.
3 changes: 2 additions & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ type testCommitterSuite struct {
func (s *testCommitterSuite) SetupSuite() {
atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s
atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`))
}

func (s *testCommitterSuite) TearDownSuite() {
s.Nil(failpoint.Disable("tikvclient/injectLiveness"))
atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000)
}

Expand All @@ -98,7 +100,6 @@ func (s *testCommitterSuite) SetupTest() {
store, err := tikv.NewKVStore("mocktikv-store", pdCli, spkv, client)
store.EnableTxnLocalLatches(8096)
s.Require().Nil(err)

s.store = tikv.StoreProbe{KVStore: store}
}

Expand Down
59 changes: 39 additions & 20 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ type RegionCache struct {
stores []*Store
}
notifyCheckCh chan struct{}
closeCh chan struct{}

// Context for background jobs
ctx context.Context
cancelFunc context.CancelFunc

testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
Expand All @@ -402,7 +405,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.tiflashMPPStoreMu.needReload = true
c.tiflashMPPStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.closeCh = make(chan struct{})
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
interval := config.GetGlobalConfig().StoresRefreshInterval
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
Expand All @@ -423,7 +426,7 @@ func (c *RegionCache) clear() {

// Close releases region cache's resource.
func (c *RegionCache) Close() {
close(c.closeCh)
c.cancelFunc()
}

// asyncCheckAndResolveLoop with
Expand All @@ -434,7 +437,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
for {
needCheckStores = needCheckStores[:0]
select {
case <-c.closeCh:
case <-c.ctx.Done():
return
case <-c.notifyCheckCh:
c.checkAndResolve(needCheckStores, func(s *Store) bool {
Expand Down Expand Up @@ -613,7 +616,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
proxyAddr string
)
if c.enableForwarding && isLeaderReq {
if atomic.LoadInt32(&store.unreachable) == 0 {
if store.getLivenessState() == reachable {
regionStore.unsetProxyStoreIfNeeded(cachedRegion)
} else {
proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
Expand Down Expand Up @@ -1627,7 +1630,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
}

func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || atomic.LoadInt32(&store.unreachable) == 0 {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || store.getLivenessState() == reachable {
return
}

Expand Down Expand Up @@ -1657,7 +1660,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
}
storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index))
// Skip unreachable stores.
if atomic.LoadInt32(&store.unreachable) != 0 {
if store.getLivenessState() == unreachable {
continue
}

Expand Down Expand Up @@ -2159,7 +2162,7 @@ type Store struct {
// whether the store is unreachable due to some reason, therefore requests to the store needs to be
// forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store.
// this mechanism is currently only applicable for TiKV stores.
unreachable int32
livenessState uint32
unreachableSince time.Time
}

Expand Down Expand Up @@ -2362,12 +2365,19 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r
return res
}

// getLivenessState gets the cached liveness state of the store.
// When it's not reachable, a goroutine will update the state in background.
// To get the accurate liveness state, use checkLiveness instead.
func (s *Store) getLivenessState() livenessState {
return livenessState(atomic.LoadUint32(&s.livenessState))
}

type livenessState uint32

var (
livenessSf singleflight.Group
// storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance.
storeLivenessTimeout time.Duration
storeLivenessTimeout = time.Second
)

// SetStoreLivenessTimeout sets storeLivenessTimeout to t.
Expand All @@ -2381,12 +2391,12 @@ func GetStoreLivenessTimeout() time.Duration {
}

const (
unknown livenessState = iota
reachable
reachable livenessState = iota
unreachable
unknown
)

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) {
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store",
Expand All @@ -2395,24 +2405,21 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache) {
}

// It may be already started by another thread.
if atomic.CompareAndSwapInt32(&s.unreachable, 0, 1) {
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
s.unreachableSince = time.Now()
go s.checkUntilHealth(c)
}
}

func (s *Store) checkUntilHealth(c *RegionCache) {
defer atomic.CompareAndSwapInt32(&s.unreachable, 1, 0)
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))

ticker := time.NewTicker(time.Second)
lastCheckPDTime := time.Now()

// TODO(MyonKeminta): Set a more proper ctx here so that it can be interrupted immediately when the RegionCache is
// shutdown.
ctx := context.Background()
for {
select {
case <-c.closeCh:
case <-c.ctx.Done():
return
case <-ticker.C:
if time.Since(lastCheckPDTime) > time.Second*30 {
Expand All @@ -2427,18 +2434,30 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
}
}

bo := retry.NewNoopBackoff(ctx)
bo := retry.NewNoopBackoff(c.ctx)
l := s.requestLiveness(bo, c)
if l == reachable {
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))

return
}
atomic.StoreUint32(&s.livenessState, uint32(l))
}
}
}

func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) {
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
switch val.(string) {
case "unreachable":
return unreachable
case "reachable":
return reachable
case "unknown":
return unknown
}
}
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
return c.testingKnobs.mockRequestLiveness(s, bo)
}
Expand Down Expand Up @@ -2511,7 +2530,7 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
}

status := resp.GetStatus()
if status == healthpb.HealthCheckResponse_UNKNOWN {
if status == healthpb.HealthCheckResponse_UNKNOWN || status == healthpb.HealthCheckResponse_SERVICE_UNKNOWN {
logutil.BgLogger().Info("[health check] check health returns unknown", zap.String("store", addr))
l = unknown
return
Expand Down
32 changes: 23 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,16 @@ type accessKnownLeader struct {

func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
leader := selector.replicas[state.leaderIdx]
if leader.isExhausted(maxReplicaAttempt) {
liveness := leader.store.getLivenessState()
if liveness == unreachable && selector.regionCache.enableForwarding {
selector.state = &tryNewProxy{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
}
// If hibernate region is enabled and the leader is not reachable, the raft group
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return nil, stateChanged{}
}
Expand All @@ -332,7 +341,8 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec

func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
liveness := selector.checkLiveness(bo, selector.targetReplica())
if liveness != reachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding {
// Only enable forwarding when unreachable to avoid using proxy to access a TiKV that cannot serve.
if liveness == unreachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding {
selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx}
return
}
Expand Down Expand Up @@ -407,7 +417,7 @@ type accessByKnownProxy struct {

func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
leader := selector.replicas[state.leaderIdx]
if atomic.LoadInt32(&leader.store.unreachable) == 0 {
if leader.store.getLivenessState() == reachable {
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
Expand Down Expand Up @@ -442,7 +452,7 @@ type tryNewProxy struct {

func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
leader := selector.replicas[state.leaderIdx]
if atomic.LoadInt32(&leader.store.unreachable) == 0 {
if leader.store.getLivenessState() == reachable {
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx}
return nil, stateChanged{}
Expand Down Expand Up @@ -770,11 +780,8 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
// We only check health in loop if forwarding is enabled now.
// The restriction might be relaxed if necessary, but the implementation
// may be checked carefully again.
if liveness != reachable && s.regionCache.enableForwarding {
store.startHealthCheckLoopIfNeeded(s.regionCache)
if liveness != reachable {
store.startHealthCheckLoopIfNeeded(s.regionCache, liveness)
}
return liveness
}
Expand Down Expand Up @@ -815,6 +822,13 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
}
for i, replica := range s.replicas {
if isSamePeer(replica.peer, leader) {
// If hibernate region is enabled and the leader is not reachable, the raft group
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if replica.store.getLivenessState() != reachable {
return
}
if replica.isExhausted(maxReplicaAttempt) {
// Give the replica one more chance and because each follower is tried only once,
// it won't result in infinite retry.
Expand Down
Loading

0 comments on commit d568a81

Please sign in to comment.