Skip to content

Commit

Permalink
fix issue of doesn't fast fail request when no available connections
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed May 13, 2024
1 parent 6cb0704 commit a886b76
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
9 changes: 5 additions & 4 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ import (

const (
// DefStoreLivenessTimeout is the default value for store liveness timeout.
DefStoreLivenessTimeout = "1s"
DefGrpcInitialWindowSize = 1 << 27 // 128MiB
DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB
DefStoreLivenessTimeout = "1s"
DefGrpcInitialWindowSize = 1 << 27 // 128MiB
DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB
DefMaxConcurrencyRequestLimit = math.MaxInt64
)

// TiKVClient is the config for tikv client.
Expand Down Expand Up @@ -174,7 +175,7 @@ func DefaultTiKVClient() TiKVClient {
CoprReqTimeout: 60 * time.Second,

ResolveLockLiteThreshold: 16,
MaxConcurrencyRequestLimit: math.MaxInt64,
MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit,
EnableReplicaSelectorV2: true,
}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,12 @@ func (a *batchConn) getClientAndSend() {
if cli == nil {
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
metrics.TiKVNoAvailableConnectionCounter.Inc()
if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit {
// Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
// TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
// This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
a.reqBuilder.cancel(errors.New("no available connections"))
}
return
}
defer cli.unlockForSend()
Expand Down
41 changes: 39 additions & 2 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,15 @@ func TestCancelTimeoutRetErr(t *testing.T) {
func TestSendWhenReconnect(t *testing.T) {
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
restoreFn := config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxConcurrencyRequestLimit = 10000
})

rpcClient := NewRPCClient()
defer rpcClient.Close()
defer func() {
rpcClient.Close()
restoreFn()
}()
addr := server.Addr()
conn, err := rpcClient.getConnArray(addr, true)
assert.Nil(t, err)
Expand All @@ -142,7 +148,7 @@ func TestSendWhenReconnect(t *testing.T) {

req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second)
assert.True(t, strings.Contains(err.Error(), "timeout"))
require.Regexp(t, "wait recvLoop timeout,timeout:5s, wait_duration:.* context deadline exceeded", err.Error())
server.Stop()
}

Expand Down Expand Up @@ -950,6 +956,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||
strings.Contains(err.Error(), "context deadline exceeded") ||
strings.Contains(err.Error(), "connect: connection refused") ||
strings.Contains(err.Error(), "no available connections") ||
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") {
continue
}
Expand Down Expand Up @@ -1010,3 +1017,33 @@ func TestErrConn(t *testing.T) {
assert.True(t, errors.As(err1, &errMsg))
assert.EqualError(t, err1, errMsg.Error())
}

func TestFastFailWhenNoAvailableConn(t *testing.T) {
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
addr := server.Addr()
client := NewRPCClient()
defer func() {
err := client.Close()
require.NoError(t, err)
server.Stop()
}()

req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
conn, err := client.getConnArray(addr, true)
assert.Nil(t, err)
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second, 0)
require.NoError(t, err)

for _, c := range conn.batchConn.batchCommandsClients {
// mock all client a in recreate.
c.lockForRecreate()
}
start := time.Now()
timeout := time.Second
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout, 0)
require.Error(t, err)
require.Equal(t, "no available connections", err.Error())
require.Less(t, time.Since(start), timeout)
}

0 comments on commit a886b76

Please sign in to comment.