From b09192d30d7bb3d8ac3b5ba1e9a23bbe38bfdd3d Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Mon, 14 Nov 2022 14:22:30 +0800 Subject: [PATCH] add `txn_source` field in txn Signed-off-by: xiongjiwei --- go.mod | 4 +--- go.sum | 5 +++-- txnkv/transaction/2pc.go | 8 +++++--- txnkv/transaction/commit.go | 2 +- txnkv/transaction/prewrite.go | 2 +- txnkv/transaction/txn.go | 8 ++++---- 6 files changed, 15 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index d922e8240..653a1b9da 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 + github.com/pingcap/kvproto v0.0.0-20221114031243-29a30c4ef9c5 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 @@ -58,5 +58,3 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect stathat.com/c/consistent v1.0.0 // indirect ) - -replace github.com/pingcap/kvproto => github.com/xiongjiwei/kvproto v0.0.0-20221108100240-247b5ba7ccb1 diff --git a/go.sum b/go.sum index 18d9f0260..ad9d22c96 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,9 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221114031243-29a30c4ef9c5 h1:buJ/WCoxGzznvYge7tY0e/tqSMntiZ7ztCWRnwy9Klc= +github.com/pingcap/kvproto v0.0.0-20221114031243-29a30c4ef9c5/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -202,8 +205,6 @@ github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= -github.com/xiongjiwei/kvproto v0.0.0-20221108100240-247b5ba7ccb1 h1:Q813eoYb31g/+rwCIAw1cfmZHf+ehFS7XRBfnZ7Ncg8= -github.com/xiongjiwei/kvproto v0.0.0-20221108100240-247b5ba7ccb1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index c332897de..bf4caa140 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -177,7 +177,9 @@ type twoPhaseCommitter struct { // allowed when tikv disk full happened. diskFullOpt kvrpcpb.DiskFullOpt - cdc uint8 + // txnSource is used to record the source of the transaction. + txnSource uint8 + // The total number of kv request after batch split. prewriteTotalReqNum int @@ -1061,8 +1063,8 @@ func (c *twoPhaseCommitter) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) { c.diskFullOpt = level } -func (c *twoPhaseCommitter) SetCDC(cdc uint8) { - c.cdc = cdc +func (c *twoPhaseCommitter) SetTxnSource(txnSource uint8) { + c.txnSource = txnSource } type ttlManagerState uint32 diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index f306820d4..796370097 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -75,7 +75,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt, - MvccMeta: uint32(c.cdc), + TxnSource: uint32(c.txnSource), MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), }) diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index a2f0e790d..7ca5c4476 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -182,7 +182,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt, - MvccMeta: uint32(c.cdc), + TxnSource: uint32(c.txnSource), MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), RequestSource: c.txn.GetRequestSource(), }) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a7e108d3f..d753a6641 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -121,7 +121,7 @@ type KVTxn struct { resourceGroupTag []byte resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil diskFullOpt kvrpcpb.DiskFullOpt - cdc uint8 + txnSource uint8 commitTSUpperBoundCheck func(uint64) bool // interceptor is used to decorate the RPC request logic related to the txn. interceptor interceptor.RPCInterceptor @@ -327,8 +327,8 @@ func (txn *KVTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) { txn.diskFullOpt = level } -func (txn *KVTxn) SetCDC(cdc uint8) { - txn.cdc = cdc +func (txn *KVTxn) SetTxnSource(txnSource uint8) { + txn.txnSource = txnSource } // GetDiskFullOpt gets the options of current operation in each TiKV disk usage level. @@ -414,7 +414,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { } txn.committer.SetDiskFullOpt(txn.diskFullOpt) - txn.committer.SetCDC(txn.cdc) + txn.committer.SetTxnSource(txn.txnSource) defer committer.ttlManager.close()