diff --git a/config/client.go b/config/client.go index c9c549685..b65d55ebe 100644 --- a/config/client.go +++ b/config/client.go @@ -44,9 +44,10 @@ import ( const ( // DefStoreLivenessTimeout is the default value for store liveness timeout. - DefStoreLivenessTimeout = "1s" - DefGrpcInitialWindowSize = 1 << 27 // 128MiB - DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB + DefStoreLivenessTimeout = "1s" + DefGrpcInitialWindowSize = 1 << 27 // 128MiB + DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB + DefMaxConcurrencyRequestLimit = math.MaxInt64 ) // TiKVClient is the config for tikv client. @@ -174,7 +175,7 @@ func DefaultTiKVClient() TiKVClient { CoprReqTimeout: 60 * time.Second, ResolveLockLiteThreshold: 16, - MaxConcurrencyRequestLimit: math.MaxInt64, + MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit, EnableReplicaSelectorV2: true, } } diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 5ba07c25a..78ce1e51e 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -418,6 +418,12 @@ func (a *batchConn) getClientAndSend() { if cli == nil { logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons)) metrics.TiKVNoAvailableConnectionCounter.Inc() + if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit { + // Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions. + // TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout. + // This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time. + a.reqBuilder.cancel(errors.New("no available connections")) + } return } defer cli.unlockForSend() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 3436406ec..a1fe349cc 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -128,9 +128,15 @@ func TestCancelTimeoutRetErr(t *testing.T) { func TestSendWhenReconnect(t *testing.T) { server, port := mockserver.StartMockTikvService() require.True(t, port > 0) + restoreFn := config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxConcurrencyRequestLimit = 10000 + }) rpcClient := NewRPCClient() - defer rpcClient.Close() + defer func() { + rpcClient.Close() + restoreFn() + }() addr := server.Addr() conn, err := rpcClient.getConnArray(addr, true) assert.Nil(t, err) @@ -142,7 +148,7 @@ func TestSendWhenReconnect(t *testing.T) { req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) _, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second) - assert.True(t, strings.Contains(err.Error(), "timeout")) + require.Regexp(t, "wait recvLoop timeout,timeout:5s, wait_duration:.* context deadline exceeded", err.Error()) server.Stop() } @@ -950,6 +956,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) { err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" || strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "connect: connection refused") || + strings.Contains(err.Error(), "no available connections") || strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") { continue } @@ -1010,3 +1017,33 @@ func TestErrConn(t *testing.T) { assert.True(t, errors.As(err1, &errMsg)) assert.EqualError(t, err1, errMsg.Error()) } + +func TestFastFailWhenNoAvailableConn(t *testing.T) { + server, port := mockserver.StartMockTikvService() + require.True(t, port > 0) + require.True(t, server.IsRunning()) + addr := server.Addr() + client := NewRPCClient() + defer func() { + err := client.Close() + require.NoError(t, err) + server.Stop() + }() + + req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} + conn, err := client.getConnArray(addr, true) + assert.Nil(t, err) + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second, 0) + require.NoError(t, err) + + for _, c := range conn.batchConn.batchCommandsClients { + // mock all client a in recreate. + c.lockForRecreate() + } + start := time.Now() + timeout := time.Second + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout, 0) + require.Error(t, err) + require.Equal(t, "no available connections", err.Error()) + require.Less(t, time.Since(start), timeout) +}