From e6a96d40573ecf3131aa0332cf477ebb5357f5e0 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 28 Dec 2022 11:19:46 +0800 Subject: [PATCH] add resource group name --- go.mod | 2 +- go.sum | 4 ++-- txnkv/transaction/2pc.go | 2 ++ txnkv/transaction/cleanup.go | 1 + txnkv/transaction/commit.go | 1 + txnkv/transaction/pessimistic.go | 1 + txnkv/transaction/prewrite.go | 1 + txnkv/transaction/txn.go | 8 ++++++++ txnkv/txnsnapshot/scan.go | 24 ++++++++++++---------- txnkv/txnsnapshot/snapshot.go | 35 +++++++++++++++++++++----------- 10 files changed, 53 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 89517c4ac..8acb61cd9 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a + github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index 358481c90..64412a57b 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw= -github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a h1:4TiR0nGKmLmu2kTC22jOhUIplmv4GGUrUD9D2DTMms0= +github.com/pingcap/kvproto v0.0.0-20221227030452-22819f5b377a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index e15926d3b..f9eff8924 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -133,6 +133,7 @@ type twoPhaseCommitter struct { detail unsafe.Pointer txnSize int hasNoNeedCommitKeys bool + resourceGroupName string primaryKey []byte forUpdateTS uint64 @@ -705,6 +706,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { c.syncLog = txn.syncLog c.resourceGroupTag = txn.resourceGroupTag c.resourceGroupTagger = txn.resourceGroupTagger + c.resourceGroupName = txn.resourceGroupName c.setDetail(commitDetail) return nil diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index bf53ee1b3..63b84ffc1 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -68,6 +68,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer ResourceGroupTag: c.resourceGroupTag, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), + ResourceGroupName: c.resourceGroupName, }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(req) diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 5dfff0e03..15b618729 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -78,6 +78,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, TxnSource: c.txnSource, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), + ResourceGroupName: c.resourceGroupName, }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(req) diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index e0eb669a1..8b2a2ea33 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -129,6 +129,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * ResourceGroupTag: action.LockCtx.ResourceGroupTag, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), + ResourceGroupName: c.resourceGroupName, }) if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil { req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest)) diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 8b992cf08..b74e06fe6 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -185,6 +185,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u TxnSource: c.txnSource, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), + ResourceGroupName: c.resourceGroupName, }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(r) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a8bb345db..ae312229d 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -160,6 +160,8 @@ type KVTxn struct { interceptor interceptor.RPCInterceptor assertionLevel kvrpcpb.AssertionLevel *util.RequestSource + // resourceGroupName is the name of tenent resource group. + resourceGroupName string aggressiveLockingContext *aggressiveLockingContext aggressiveLockingDirty bool @@ -298,6 +300,12 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { txn.GetSnapshot().SetResourceGroupTagger(tagger) } +// SetResourceGroupName set resource group name for both read and write. +func (txn *KVTxn) SetResourceGroupName(name string) { + txn.resourceGroupName = name + txn.GetSnapshot().SetResourceGroupName(name) +} + // SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot. // interceptor.RPCInterceptor will be executed before each RPC request is initiated. // Note that SetRPCInterceptor will replace the previously set interceptor. diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 2e24d52fc..a55a417b2 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -226,11 +226,12 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { } sreq := &kvrpcpb.ScanRequest{ Context: &kvrpcpb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - IsolationLevel: s.snapshot.isolationLevel.ToPB(), - ResourceGroupTag: s.snapshot.mu.resourceGroupTag, - RequestSource: s.snapshot.GetRequestSource(), + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + IsolationLevel: s.snapshot.isolationLevel.ToPB(), + ResourceGroupTag: s.snapshot.mu.resourceGroupTag, + RequestSource: s.snapshot.GetRequestSource(), + ResourceGroupName: s.snapshot.mu.resourceGroupName, }, StartKey: s.nextStartKey, EndKey: reqEndKey, @@ -246,12 +247,13 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{ - Priority: s.snapshot.priority.ToPB(), - NotFillCache: s.snapshot.notFillCache, - TaskId: s.snapshot.mu.taskID, - ResourceGroupTag: s.snapshot.mu.resourceGroupTag, - IsolationLevel: s.snapshot.isolationLevel.ToPB(), - RequestSource: s.snapshot.GetRequestSource(), + Priority: s.snapshot.priority.ToPB(), + NotFillCache: s.snapshot.notFillCache, + TaskId: s.snapshot.mu.taskID, + ResourceGroupTag: s.snapshot.mu.resourceGroupTag, + IsolationLevel: s.snapshot.isolationLevel.ToPB(), + RequestSource: s.snapshot.GetRequestSource(), + ResourceGroupName: s.snapshot.mu.resourceGroupName, }) if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil { s.snapshot.mu.resourceGroupTagger(req) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 8ba0888d4..f263b3a3a 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -146,6 +146,8 @@ type KVSnapshot struct { resourceGroupTagger tikvrpc.ResourceGroupTagger // interceptor is used to decorate the RPC request logic related to the snapshot. interceptor interceptor.RPCInterceptor + // resourceGroupName is used to bind the request to specified resource group. + resourceGroupName string } sampleStep uint32 *util.RequestSource @@ -375,12 +377,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, - ResourceGroupTag: s.mu.resourceGroupTag, - IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.mu.resourceGroupTag, + IsolationLevel: s.isolationLevel.ToPB(), + RequestSource: s.GetRequestSource(), + ResourceGroupName: s.mu.resourceGroupName, }) if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) @@ -578,12 +581,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{ - Priority: s.priority.ToPB(), - NotFillCache: s.notFillCache, - TaskId: s.mu.taskID, - ResourceGroupTag: s.mu.resourceGroupTag, - IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), + Priority: s.priority.ToPB(), + NotFillCache: s.notFillCache, + TaskId: s.mu.taskID, + ResourceGroupTag: s.mu.resourceGroupTag, + IsolationLevel: s.isolationLevel.ToPB(), + RequestSource: s.GetRequestSource(), + ResourceGroupName: s.mu.resourceGroupName, }) if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) @@ -852,6 +856,13 @@ func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) { s.mu.interceptor = interceptor.ChainRPCInterceptors(s.mu.interceptor, it) } +// SetResourceGroupName set resource group name of the kv request. +func (s *KVSnapshot) SetResourceGroupName(name string) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.resourceGroupName = name +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt))