Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b289788
metrics: refine stream-level metrics
zyguan Mar 31, 2026
4b701c5
metrics: refine batch request stage latency metrics
zyguan Mar 31, 2026
9e42b52
track tail latency for canceled entries
zyguan Apr 1, 2026
79310ef
send conn-idx info to tikv via metadata
zyguan Apr 1, 2026
5a166bb
normalize sentNS for two corner cases
zyguan Apr 1, 2026
ee5c9ac
address comments
zyguan Apr 1, 2026
623a855
simplify pending requests logging
zyguan Apr 2, 2026
6be8adc
some minor updates
zyguan Apr 2, 2026
6901600
Merge remote-tracking branch 'origin/master' into dev/refine-batch-cl…
zyguan Apr 2, 2026
47db828
address https://github.com/tikv/client-go/pull/1931#discussion_r30312…
zyguan Apr 3, 2026
41b999e
address https://github.com/tikv/client-go/pull/1931#discussion_r30313…
zyguan Apr 3, 2026
dec57c4
address https://github.com/tikv/client-go/pull/1931/changes#r3031337817
zyguan Apr 3, 2026
d672c2d
Merge remote-tracking branch 'origin/master' into dev/refine-batch-cl…
zyguan Apr 7, 2026
533df7e
client: track batch request progress with shared batch state
zyguan Apr 8, 2026
923d850
update go.mod and go.sum of submodules
zyguan Apr 8, 2026
655cf44
fix data race on batchState
zyguan Apr 8, 2026
7c9fec0
add request-id based mechanism for detecting arrival status
zyguan Apr 9, 2026
a2caf22
Merge remote-tracking branch 'origin/master' into dev/refine-batch-cl…
zyguan Apr 9, 2026
b29f87d
address comments
zyguan Apr 9, 2026
e82b78d
simplify the request stage model
zyguan Apr 10, 2026
528da9e
Merge remote-tracking branch 'origin/master' into dev/refine-batch-cl…
zyguan Apr 15, 2026
b0f69b3
address comment
zyguan Apr 15, 2026
f44bd00
log more in batch-recv-loop
zyguan Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/rawkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module txnkv
go 1.25.8

require (
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53
github.com/tikv/client-go/v2 v2.0.0
)

Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e // indirect
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXH
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e h1:KzYXzOwFIg7iLAPwTZUxKwRSS+4lFGewvjuxE2ZA4sU=
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 h1:wjhJRzyeRKpJqMg6XmqQ7cJdhEhE5mSoCh94rWdTVOk=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -115,8 +115,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20260226073533-5885ceca6b4f h1:hFnht5CcP3ssOxp74f00WcitbXs6lNUIBm0mzdojs24=
github.com/tikv/pd/client v0.0.0-20260226073533-5885ceca6b4f/go.mod h1:tloQKNFqF/T5JfLx4bloR8MKPZyp7y6cF3guAlls88M=
github.com/tikv/pd/client v0.0.0-20260401072359-048f0d8f6f71 h1:5hCQ6J2fwUpYqIgQGR625bW98wvYS9FUpTiVszIbVSg=
github.com/tikv/pd/client v0.0.0-20260401072359-048f0d8f6f71/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20260310054046-9c8b3586e4b2
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53
github.com/pingcap/tidb v1.1.0-beta.0.20260317213042-b1525070ca3e
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1440,8 +1440,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e h1:KzYXzOwFIg7iLAPwTZUxKwRSS+4lFGewvjuxE2ZA4sU=
github.com/pingcap/kvproto v0.0.0-20260331120830-0d407c8b3f6e/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 h1:wjhJRzyeRKpJqMg6XmqQ7cJdhEhE5mSoCh94rWdTVOk=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
Expand Down
103 changes: 101 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ const (
// forwardMetadataKey is the key of gRPC metadata which represents a forwarded request.
const forwardMetadataKey = "tikv-forwarded-host"

// batchConnIdxMetadataKey is the key of gRPC metadata which represents the connection index in the batch client pool.
const batchConnIdxMetadataKey = "tikv-batch-conn-index"

// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
Expand Down Expand Up @@ -339,9 +342,10 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R

start := time.Now()
bypass := resourcecontrol.MakeRequestInfo(req).Bypass()
storeMetrics := connPool.getStoreMetrics(req.Context.GetPeer().GetStoreId())
defer func() {
elapsed := time.Since(start)
connPool.updateRPCMetrics(req, resp, elapsed)
storeMetrics.updateRPCMetrics(req, resp, elapsed)
if resp != nil && !bypass {
readRPCCount, writeRPCCount := completedTiKVRUV2RPCCount(req)
switch resp.Resp.(type) {
Expand All @@ -365,7 +369,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End()
return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connPool.batchConn, batchReq, timeout, pri))
return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connPool.batchConn, storeMetrics.batchReqStage, batchReq, timeout, pri))
}
}

Expand Down Expand Up @@ -776,6 +780,29 @@ type storeMetrics struct {
rpcSrcLatSum sync.Map
rpcNetLatExternal prometheus.Observer
rpcNetLatInternal prometheus.Observer
batchReqStage *batchRequestStageMetrics
}

type batchRequestStageMetrics struct {
batchWaitOK prometheus.Observer
batchWaitTimeout prometheus.Observer
batchWaitCanceled prometheus.Observer
batchWaitFailed prometheus.Observer
batchWaitClosed prometheus.Observer

sendWaitOK prometheus.Observer
sendWaitTimeout prometheus.Observer
sendWaitCanceled prometheus.Observer
sendWaitFailed prometheus.Observer
sendWaitClosed prometheus.Observer

recvWaitOK prometheus.Observer
recvWaitTimeout prometheus.Observer
recvWaitCanceled prometheus.Observer
recvWaitFailed prometheus.Observer
recvWaitClosed prometheus.Observer

doneOK prometheus.Observer
}

func newStoreMetrics(storeID uint64) *storeMetrics {
Expand All @@ -785,10 +812,82 @@ func newStoreMetrics(storeID uint64) *storeMetrics {
rpcLatHist: deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: store})),
rpcNetLatExternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "false"),
rpcNetLatInternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "true"),
batchReqStage: &batchRequestStageMetrics{
batchWaitOK: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageBatchWait), string(batchRequestOutcomeOK)),
batchWaitTimeout: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageBatchWait), string(batchRequestOutcomeTimeout)),
batchWaitCanceled: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageBatchWait), string(batchRequestOutcomeCanceled)),
batchWaitFailed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageBatchWait), string(batchRequestOutcomeFailed)),
batchWaitClosed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageBatchWait), string(batchRequestOutcomeClosed)),
sendWaitOK: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageSendWait), string(batchRequestOutcomeOK)),
sendWaitTimeout: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageSendWait), string(batchRequestOutcomeTimeout)),
sendWaitCanceled: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageSendWait), string(batchRequestOutcomeCanceled)),
sendWaitFailed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageSendWait), string(batchRequestOutcomeFailed)),
sendWaitClosed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageSendWait), string(batchRequestOutcomeClosed)),
recvWaitOK: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageRecvWait), string(batchRequestOutcomeOK)),
recvWaitTimeout: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageRecvWait), string(batchRequestOutcomeTimeout)),
recvWaitCanceled: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageRecvWait), string(batchRequestOutcomeCanceled)),
recvWaitFailed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageRecvWait), string(batchRequestOutcomeFailed)),
recvWaitClosed: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageRecvWait), string(batchRequestOutcomeClosed)),
doneOK: metrics.TiKVBatchRequestStageDuration.WithLabelValues(store, string(batchRequestStageDone), string(batchRequestOutcomeOK)),
},
}
return m
}

func (m *batchRequestStageMetrics) observe(stage batchRequestStage, outcome batchRequestOutcome, duration time.Duration) {
// This method is called internally and the caller [visitBatchRequestObservations] is guaranteed to pass valid stage
// and outcome, so we don't need to handle the default case in the following switch statements.
var observer prometheus.Observer
switch stage {
case batchRequestStageBatchWait:
Comment thread
zyguan marked this conversation as resolved.
switch outcome {
case batchRequestOutcomeOK:
observer = m.batchWaitOK
case batchRequestOutcomeTimeout:
observer = m.batchWaitTimeout
case batchRequestOutcomeCanceled:
observer = m.batchWaitCanceled
case batchRequestOutcomeFailed:
observer = m.batchWaitFailed
case batchRequestOutcomeClosed:
observer = m.batchWaitClosed
}
case batchRequestStageSendWait:
switch outcome {
case batchRequestOutcomeOK:
observer = m.sendWaitOK
case batchRequestOutcomeTimeout:
observer = m.sendWaitTimeout
case batchRequestOutcomeCanceled:
observer = m.sendWaitCanceled
case batchRequestOutcomeFailed:
observer = m.sendWaitFailed
case batchRequestOutcomeClosed:
observer = m.sendWaitClosed
}
case batchRequestStageRecvWait:
switch outcome {
case batchRequestOutcomeOK:
observer = m.recvWaitOK
case batchRequestOutcomeTimeout:
observer = m.recvWaitTimeout
case batchRequestOutcomeCanceled:
observer = m.recvWaitCanceled
case batchRequestOutcomeFailed:
observer = m.recvWaitFailed
case batchRequestOutcomeClosed:
observer = m.recvWaitClosed
}
case batchRequestStageDone:
if outcome == batchRequestOutcomeOK {
observer = m.doneOK
}
}
if observer != nil {
observer.Observe(duration.Seconds())
}
}

func (m *storeMetrics) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
seconds := latency.Seconds()
stale := req.GetStaleRead()
Expand Down
36 changes: 16 additions & 20 deletions internal/client/client_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/internal/resourcecontrol"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
Expand Down Expand Up @@ -89,15 +88,17 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
}

var (
entry = &batchCommandsEntry{
ctx: ctx,
req: batchReq,
cb: cb,
forwardedHost: req.ForwardedHost,
canceled: 0,
err: nil,
pri: req.GetResourceControlContext().GetOverridePriority(),
start: time.Now(),
storeMetrics = connPool.getStoreMetrics(req.Context.GetPeer().GetStoreId())
entry = &batchCommandsEntry{
ctx: ctx,
req: batchReq,
cb: cb,
batchRequestMetrics: storeMetrics.batchReqStage,
forwardedHost: req.ForwardedHost,
canceled: 0,
err: nil,
pri: req.GetResourceControlContext().GetOverridePriority(),
reqArriveAt: time.Now(),
}
stop func() bool
)
Expand All @@ -108,19 +109,13 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
stop()
}

elapsed := time.Since(entry.start)
elapsed := time.Since(entry.reqArriveAt)

// batch client metrics
if sendLat := atomic.LoadInt64(&entry.sendLat); sendLat > 0 {
metrics.BatchRequestDurationSend.Observe(time.Duration(sendLat).Seconds())
}
if recvLat := atomic.LoadInt64(&entry.recvLat); recvLat > 0 {
metrics.BatchRequestDurationRecv.Observe(time.Duration(recvLat).Seconds())
}
metrics.BatchRequestDurationDone.Observe(elapsed.Seconds())
observeBatchRequestCompletion(entry, err)

// rpc metrics
connPool.updateRPCMetrics(req, resp, elapsed)
storeMetrics.updateRPCMetrics(req, resp, elapsed)
if resp != nil && !resourcecontrol.MakeRequestInfo(req).Bypass() {
readRPCCount, writeRPCCount := completedTiKVRUV2RPCCount(req)
config.UpdateTiKVRUV2FromExecDetailsV2(ctx, resp.GetExecDetailsV2(), readRPCCount, writeRPCCount)
Expand All @@ -130,7 +125,7 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
if spanRPC != nil {
if util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
si.addTo(spanRPC, entry.start)
si.addTo(spanRPC, entry.reqArriveAt)
}
}
spanRPC.Finish()
Expand All @@ -148,6 +143,7 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
stop = context.AfterFunc(ctx, func() {
logutil.Logger(ctx).Debug("async send request cancelled (context done)", zap.String("to", addr), zap.Error(ctx.Err()))
entry.error(ctx.Err())
atomic.StoreInt32(&entry.canceled, 1)
})

batchConn := connPool.batchConn
Expand Down
Loading
Loading