Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#5551
Browse files Browse the repository at this point in the history
close tikv#5207

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Oct 10, 2022
1 parent 90fe355 commit 9baa5bb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 20 deletions.
50 changes: 30 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,13 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
<<<<<<< HEAD
retryInterval = 1 * time.Second
maxRetryTimes = 5
=======
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -694,12 +699,11 @@ func (c *client) handleDispatcher(
dc string,
tbc *tsoBatchController) {
var (
retryTimeConsuming time.Duration
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
Expand Down Expand Up @@ -756,6 +760,7 @@ func (c *client) handleDispatcher(
}

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
tsoBatchLoop:
for {
select {
Expand All @@ -778,6 +783,7 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
for {
Expand All @@ -788,24 +794,22 @@ tsoBatchLoop:
// Check stream and retry if necessary.
if stream == nil {
log.Info("[pd] tso stream is not ready", zap.String("dc", dc))
c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs)
if retryTimeConsuming >= c.option.timeout {
err = errs.ErrClientCreateTSOStream.FastGenByArgs("retry timeout")
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
retryTimeConsuming = 0
continue tsoBatchLoop
if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
retryTimeConsuming += time.Second
continue
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
}
}
retryTimeConsuming = 0
select {
case <-streamCtx.Done():
log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr))
Expand Down Expand Up @@ -899,15 +903,17 @@ type connectionContext struct {
cancel context.CancelFunc
}

func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) {
func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnect
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable
Expand All @@ -923,6 +929,8 @@ func (c *client) tryConnect(
networkErrNum uint64
err error
stream pdpb.PD_TsoClient
url string
cc *grpc.ClientConn
)
updateAndClear := func(newAddr string, connectionCtx *connectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded {
Expand All @@ -938,9 +946,11 @@ func (c *client) tryConnect(
return true
})
}
cc, url := c.getAllocatorClientConnByDCLocation(dc)
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
c.ScheduleCheckLeader()
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc))
failpoint.Inject("unreachableNetwork", func() {
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
70 changes: 70 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,77 @@ func (s *clientTestSuite) TestTSOFollowerProxy(c *C) {
wg.Wait()
}

<<<<<<< HEAD
func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) {
=======
// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)

var wg sync.WaitGroup
var maxUnavailableTime, leaderReadyTime time.Time
getTsoFunc := func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
var physical, logical int64
var ts uint64
physical, logical, err = cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
if err != nil {
maxUnavailableTime = time.Now()
continue
}
re.NoError(err)
re.Less(lastTS, ts)
lastTS = ts
}
}

// test resign pd leader or stop pd leader
wg.Add(1 + 1)
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
leader.Stop()
cluster.WaitLeader()
leaderReadyTime = time.Now()
cluster.RunServers([]*tests.TestServer{leader})
}()
wg.Wait()
re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli())

// test kill pd leader pod or network of leader is unreachable
wg.Add(1 + 1)
maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{}
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
leader.Stop()
cluster.WaitLeader()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
leaderReadyTime = time.Now()
}()
wg.Wait()
re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli())
}

func TestGlobalAndLocalTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551))
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand Down

0 comments on commit 9baa5bb

Please sign in to comment.