Skip to content

Commit

Permalink
Backport the stale read optimizations to 6.5 (#818)
Browse files Browse the repository at this point in the history
* add metrics for stale-read traffic (#776)

Signed-off-by: you06 <you1474600@gmail.com>

* Remove stale-read flag after resolving lock (#792)

* remove stale-read when key-is-locked

Signed-off-by: you06 <you1474600@gmail.com>

* disable follower read for batch get

Signed-off-by: you06 <you1474600@gmail.com>

* the stale-read flag may already set to false

Signed-off-by: you06 <you1474600@gmail.com>

* fix batchget

Signed-off-by: you06 <you1474600@gmail.com>

* Reset busy-threshold when stale read fallback to leader read

Signed-off-by: you06 <you1474600@gmail.com>

* address comment

Signed-off-by: you06 <you1474600@gmail.com>

* trigger CI

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed May 29, 2023
1 parent 80181bc commit a78518e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 1 deletion.
70 changes: 70 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,23 @@ func (s *RegionRequestSender) SendReqCtx(
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(tryTimes))
}
}()

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
}

for {
if tryTimes > 0 {
req.IsRetryRequest = true
if tryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes))
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1071,6 +1082,9 @@ func (s *RegionRequestSender) SendReqCtx(
s.replicaSelector.onSendSuccess()
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
}
return resp, rpcCtx, nil
}
}
Expand Down Expand Up @@ -1642,3 +1656,59 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
// Because caller may need to re-split the request.
return false, nil
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
size := 0
switch s.tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size()
case tikvrpc.CmdScan:
size += resp.Resp.(*kvrpcpb.ScanResponse).Size()
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
}
}
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
)

// Label constants.
Expand All @@ -116,6 +117,7 @@ const (
LblToStore = "to_store"
LblStaleRead = "stale_read"
LblSource = "source"
LblDirection = "direction"
)

func initMetrics(namespace, subsystem string) {
Expand Down Expand Up @@ -589,6 +591,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
}, []string{LblResult, LblDirection})

initShortcuts()
}

Expand Down Expand Up @@ -659,6 +669,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
10 changes: 10 additions & 0 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ var (
PrewriteAssertionUsageCounterExist prometheus.Counter
PrewriteAssertionUsageCounterNotExist prometheus.Counter
PrewriteAssertionUsageCounterUnknown prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
)

func initShortcuts() {
Expand Down Expand Up @@ -235,4 +240,9 @@ func initShortcuts() {
PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist")
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
}
6 changes: 6 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func (req *Request) EnableStaleRead() {
req.ReplicaRead = false
}

// DisableStaleReadMeetLock is called when stale-read fallbacks to leader read after meeting key-is-locked error.
func (req *Request) DisableStaleReadMeetLock() {
req.StaleRead = false
req.ReplicaReadType = kv.ReplicaReadLeader
}

// IsGlobalStaleRead checks if the request is a global stale read request.
func (req *Request) IsGlobalStaleRead() bool {
return req.ReadReplicaScope == oracle.GlobalTxnScope &&
Expand Down
10 changes: 9 additions & 1 deletion txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mergeRegionRequestStats(cli.Stats)
}()
}
isStaleness := s.mu.isStaleness
s.mu.RUnlock()

pending := batch.keys
Expand All @@ -386,7 +387,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mu.resourceGroupTagger(req)
}
scope := s.mu.readReplicaScope
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
replicaAdjuster := s.mu.replicaReadAdjuster
s.mu.RUnlock()
Expand Down Expand Up @@ -479,6 +479,10 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
// we need to read from leader after resolving the lock.
if isStaleness {
isStaleness = false
}
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: s.version,
Locks: locks,
Expand Down Expand Up @@ -656,6 +660,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
return nil, err
}
if firstLock == nil {
// we need to read from leader after resolving the lock.
if isStaleness {
req.DisableStaleReadMeetLock()
}
firstLock = lock
} else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID {
// If it is an autocommit point get, it needs to be blocked only
Expand Down

0 comments on commit a78518e

Please sign in to comment.