Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add resource group name in request context #650

Merged
merged 7 commits into from Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a
github.com/pingcap/tidb v1.1.0-beta.0.20221101102559-97add26c8f84
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Expand Up @@ -408,8 +408,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
2 changes: 2 additions & 0 deletions txnkv/transaction/2pc.go
Expand Up @@ -133,6 +133,7 @@ type twoPhaseCommitter struct {
detail unsafe.Pointer
txnSize int
hasNoNeedCommitKeys bool
resourceGroupName string

primaryKey []byte
forUpdateTS uint64
Expand Down Expand Up @@ -705,6 +706,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.resourceGroupTagger = txn.resourceGroupTagger
c.resourceGroupName = txn.resourceGroupName
c.setDetail(commitDetail)

return nil
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/cleanup.go
Expand Up @@ -68,6 +68,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
ResourceGroupTag: c.resourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/commit.go
Expand Up @@ -78,6 +78,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/pessimistic.go
Expand Up @@ -129,6 +129,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/prewrite.go
Expand Up @@ -185,6 +185,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(r)
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/txn.go
Expand Up @@ -160,6 +160,8 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
// resourceGroupName is the name of tenent resource group.
resourceGroupName string

aggressiveLockingContext *aggressiveLockingContext
aggressiveLockingDirty bool
Expand Down Expand Up @@ -298,6 +300,12 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) {
txn.GetSnapshot().SetResourceGroupTagger(tagger)
}

// SetResourceGroupName set resource group name for both read and write.
func (txn *KVTxn) SetResourceGroupName(name string) {
txn.resourceGroupName = name
txn.GetSnapshot().SetResourceGroupName(name)
}

// SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot.
// interceptor.RPCInterceptor will be executed before each RPC request is initiated.
// Note that SetRPCInterceptor will replace the previously set interceptor.
Expand Down
56 changes: 35 additions & 21 deletions txnkv/txnlock/lock_resolver.go
Expand Up @@ -136,13 +136,15 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }

// StatusCacheable checks whether the transaction status is certain.True will be
// returned if its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
//
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
func (s TxnStatus) StatusCacheable() bool {
if s.IsCommitted() {
Expand Down Expand Up @@ -285,7 +287,11 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}

req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos},
kvrpcpb.Context{RequestSource: util.RequestSourceFromCtx(bo.GetCtx())})
kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
},
)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort)
Expand Down Expand Up @@ -342,14 +348,14 @@ func (lr *LockResolver) ResolveLocksWithOpts(bo *retry.Backoffer, opts ResolveLo
}

// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2) For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
// 1. Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2. For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3. Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
opts := ResolveLocksOptions{
CallerStartTS: callerStartTS,
Expand Down Expand Up @@ -702,7 +708,8 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
ForceSyncCommit: forceSyncCommit,
ResolvingPessimisticLock: resolvingPessimisticLock,
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand Down Expand Up @@ -845,7 +852,8 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
StartVersion: txnID,
}
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
Expand Down Expand Up @@ -1000,7 +1008,9 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
lreq.CommitVersion = status.CommitTS()
}
lreq.Keys = keys
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, region, client.ReadTimeoutShort)
if err != nil {
Expand Down Expand Up @@ -1076,7 +1086,9 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
metrics.LockResolverCountWithResolveLockLite.Inc()
lreq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
Expand Down Expand Up @@ -1130,7 +1142,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
ForUpdateTs: forUpdateTS,
Keys: [][]byte{l.Key},
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{
ResourceGroupName: util.ResourceGroupNameFromCtx(bo.GetCtx()),
})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
Expand Down
24 changes: 13 additions & 11 deletions txnkv/txnsnapshot/scan.go
Expand Up @@ -226,11 +226,12 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
sreq := &kvrpcpb.ScanRequest{
Context: &kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
},
StartKey: s.nextStartKey,
EndKey: reqEndKey,
Expand All @@ -246,12 +247,13 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
s.snapshot.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
})
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
s.snapshot.mu.resourceGroupTagger(req)
Expand Down
35 changes: 23 additions & 12 deletions txnkv/txnsnapshot/snapshot.go
Expand Up @@ -146,6 +146,8 @@ type KVSnapshot struct {
resourceGroupTagger tikvrpc.ResourceGroupTagger
// interceptor is used to decorate the RPC request logic related to the snapshot.
interceptor interceptor.RPCInterceptor
// resourceGroupName is used to bind the request to specified resource group.
resourceGroupName string
}
sampleStep uint32
*util.RequestSource
Expand Down Expand Up @@ -375,12 +377,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
Keys: pending,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
Expand Down Expand Up @@ -578,12 +581,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
Key: k,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
Expand Down Expand Up @@ -852,6 +856,13 @@ func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) {
s.mu.interceptor = interceptor.ChainRPCInterceptors(s.mu.interceptor, it)
}

// SetResourceGroupName set resource group name of the kv request.
func (s *KVSnapshot) SetResourceGroupName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.resourceGroupName = name
}

// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
func (s *KVSnapshot) SnapCacheHitCount() int {
return int(atomic.LoadInt64(&s.mu.hitCnt))
Expand Down
21 changes: 21 additions & 0 deletions util/request_source.go
Expand Up @@ -80,3 +80,24 @@ func RequestSourceFromCtx(ctx context.Context) string {
}
return SourceUnknown
}

// ResourceGroupNameKeyType is the context key type of resource group name.
type resourceGroupNameKeyType struct{}

// ResourceGroupNameKey is used as the key of request source type in context.
var resourceGroupNameKey = resourceGroupNameKeyType{}

// WithResouceGroupName return a copy of the given context with a associated
// reosurce group name.
func WithResouceGroupName(ctx context.Context, groupName string) context.Context {
return context.WithValue(ctx, resourceGroupNameKey, groupName)
}

// ResourceGroupNameFromCtx extract resource group name from passed context,
// empty string is returned is the key is not set.
func ResourceGroupNameFromCtx(ctx context.Context) string {
if val := ctx.Value(resourceGroupNameKey); val != nil {
return val.(string)
}
return ""
}