Skip to content

Commit

Permalink
add resource group name
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 28, 2022
1 parent 018c59d commit e6a96d4
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0=
github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 2 additions & 0 deletions txnkv/transaction/2pc.go
Expand Up @@ -133,6 +133,7 @@ type twoPhaseCommitter struct {
detail unsafe.Pointer
txnSize int
hasNoNeedCommitKeys bool
resourceGroupName string

primaryKey []byte
forUpdateTS uint64
Expand Down Expand Up @@ -705,6 +706,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.resourceGroupTagger = txn.resourceGroupTagger
c.resourceGroupName = txn.resourceGroupName
c.setDetail(commitDetail)

return nil
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/cleanup.go
Expand Up @@ -68,6 +68,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
ResourceGroupTag: c.resourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/commit.go
Expand Up @@ -78,6 +78,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/pessimistic.go
Expand Up @@ -129,6 +129,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
Expand Down
1 change: 1 addition & 0 deletions txnkv/transaction/prewrite.go
Expand Up @@ -185,6 +185,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
TxnSource: c.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(r)
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/txn.go
Expand Up @@ -160,6 +160,8 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
// resourceGroupName is the name of tenent resource group.
resourceGroupName string

aggressiveLockingContext *aggressiveLockingContext
aggressiveLockingDirty bool
Expand Down Expand Up @@ -298,6 +300,12 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) {
txn.GetSnapshot().SetResourceGroupTagger(tagger)
}

// SetResourceGroupName set resource group name for both read and write.
func (txn *KVTxn) SetResourceGroupName(name string) {
txn.resourceGroupName = name
txn.GetSnapshot().SetResourceGroupName(name)
}

// SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot.
// interceptor.RPCInterceptor will be executed before each RPC request is initiated.
// Note that SetRPCInterceptor will replace the previously set interceptor.
Expand Down
24 changes: 13 additions & 11 deletions txnkv/txnsnapshot/scan.go
Expand Up @@ -226,11 +226,12 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
sreq := &kvrpcpb.ScanRequest{
Context: &kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
},
StartKey: s.nextStartKey,
EndKey: reqEndKey,
Expand All @@ -246,12 +247,13 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
s.snapshot.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.mu.resourceGroupTag,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
RequestSource: s.snapshot.GetRequestSource(),
ResourceGroupName: s.snapshot.mu.resourceGroupName,
})
if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil {
s.snapshot.mu.resourceGroupTagger(req)
Expand Down
35 changes: 23 additions & 12 deletions txnkv/txnsnapshot/snapshot.go
Expand Up @@ -146,6 +146,8 @@ type KVSnapshot struct {
resourceGroupTagger tikvrpc.ResourceGroupTagger
// interceptor is used to decorate the RPC request logic related to the snapshot.
interceptor interceptor.RPCInterceptor
// resourceGroupName is used to bind the request to specified resource group.
resourceGroupName string
}
sampleStep uint32
*util.RequestSource
Expand Down Expand Up @@ -375,12 +377,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
Keys: pending,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
Expand Down Expand Up @@ -578,12 +581,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
Key: k,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.mu.resourceGroupTag,
IsolationLevel: s.isolationLevel.ToPB(),
RequestSource: s.GetRequestSource(),
ResourceGroupName: s.mu.resourceGroupName,
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
Expand Down Expand Up @@ -852,6 +856,13 @@ func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) {
s.mu.interceptor = interceptor.ChainRPCInterceptors(s.mu.interceptor, it)
}

// SetResourceGroupName set resource group name of the kv request.
func (s *KVSnapshot) SetResourceGroupName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.resourceGroupName = name
}

// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
func (s *KVSnapshot) SnapCacheHitCount() int {
return int(atomic.LoadInt64(&s.mu.hitCnt))
Expand Down

0 comments on commit e6a96d4

Please sign in to comment.