From ce697867b0b26dd375782b839384595022b27e2b Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 19 Mar 2024 23:37:38 +0900 Subject: [PATCH] refactor set option to detect race risk Signed-off-by: you06 commit intest package Signed-off-by: you06 update Signed-off-by: you06 --- txnkv/transaction/2pc.go | 18 ++- txnkv/transaction/txn.go | 288 +++++++++++++++++++++++++++++---------- util/intest/common.go | 19 +++ util/intest/intest.go | 20 +++ 4 files changed, 265 insertions(+), 80 deletions(-) create mode 100644 util/intest/common.go create mode 100644 util/intest/intest.go diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 824a45dec..ccf972f1e 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -468,16 +468,14 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) { // newTwoPhaseCommitter creates a twoPhaseCommitter. func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) { committer := &twoPhaseCommitter{ - store: txn.store, - txn: txn, - startTS: txn.StartTS(), - sessionID: sessionID, - regionTxnSize: map[uint64]int{}, - isPessimistic: txn.IsPessimistic(), - binlog: txn.binlog, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - resourceGroupName: txn.resourceGroupName, - } + store: txn.store, + txn: txn, + startTS: txn.StartTS(), + sessionID: sessionID, + regionTxnSize: map[uint64]int{}, + isPessimistic: txn.IsPessimistic(), + } + txn.commitActionContext.applyToCommitter(committer) return committer, nil } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 890a86d42..58f471d26 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -67,6 +67,7 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" + "github.com/tikv/client-go/v2/util/intest" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -133,32 +134,13 @@ type KVTxn struct { valid bool - // schemaVer is the infoSchema fetched at startTS. - schemaVer SchemaVer // commitCallback is called after current transaction gets committed commitCallback func(info string, err error) - binlog BinlogExecutor - schemaLeaseChecker SchemaLeaseChecker - syncLog bool - priority txnutil.Priority - isPessimistic bool - enableAsyncCommit bool - enable1PC bool - causalConsistency bool - scope string - kvFilter KVFilter - resourceGroupTag []byte - resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil - diskFullOpt kvrpcpb.DiskFullOpt - txnSource uint64 - commitTSUpperBoundCheck func(uint64) bool - // interceptor is used to decorate the RPC request logic related to the txn. - interceptor interceptor.RPCInterceptor - assertionLevel kvrpcpb.AssertionLevel - *util.RequestSource - // resourceGroupName is the name of tenant resource group. - resourceGroupName string + isPessimistic bool + enableAsyncCommit bool + enable1PC bool + *commitActionContext aggressiveLockingContext *aggressiveLockingContext aggressiveLockingDirty atomic.Bool @@ -179,12 +161,11 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, startTime: time.Now(), valid: true, vars: tikv.DefaultVars, - scope: options.TxnScope, enableAsyncCommit: cfg.EnableAsyncCommit, enable1PC: cfg.Enable1PC, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - RequestSource: snapshot.RequestSource, } + newTiKVTxn.commitActionContext = defaultCommitActionContext(newTiKVTxn) + newTiKVTxn.commitActionContext.setScope(options.TxnScope) if !options.PipelinedMemDB { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDBWithContext(), snapshot) return newTiKVTxn, nil @@ -270,12 +251,12 @@ func (txn *KVTxn) Delete(k []byte) error { // SetSchemaLeaseChecker sets a hook to check schema version. func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) { - txn.schemaLeaseChecker = checker + txn.commitActionContext.setSchemaLeaseChecker(checker) } // EnableForceSyncLog indicates tikv to always sync log for the transaction. func (txn *KVTxn) EnableForceSyncLog() { - txn.syncLog = true + txn.commitActionContext.setForceSyncLog(true) } // SetPessimistic indicates if the transaction should use pessimictic lock. @@ -289,59 +270,47 @@ func (txn *KVTxn) SetPessimistic(b bool) { // SetSchemaVer updates schema version to validate transaction. func (txn *KVTxn) SetSchemaVer(schemaVer SchemaVer) { txn.schemaVer = schemaVer + txn.commitActionContext.setSchemaVer(schemaVer) } // SetPriority sets the priority for both write and read. func (txn *KVTxn) SetPriority(pri txnutil.Priority) { - txn.priority = pri txn.GetSnapshot().SetPriority(pri) + txn.commitActionContext.setPriority(pri) } // SetResourceGroupTag sets the resource tag for both write and read. func (txn *KVTxn) SetResourceGroupTag(tag []byte) { - txn.resourceGroupTag = tag txn.GetSnapshot().SetResourceGroupTag(tag) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupTag = tag - } + txn.commitActionContext.setResourceGroupTag(tag) } // SetResourceGroupTagger sets the resource tagger for both write and read. // Before sending the request, if resourceGroupTag is not nil, use // resourceGroupTag directly, otherwise use resourceGroupTagger. func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { - txn.resourceGroupTagger = tagger txn.GetSnapshot().SetResourceGroupTagger(tagger) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupTagger = tagger - } + txn.commitActionContext.setResourceGroupTagger(tagger) } // SetResourceGroupName set resource group name for both read and write. func (txn *KVTxn) SetResourceGroupName(name string) { - txn.resourceGroupName = name txn.GetSnapshot().SetResourceGroupName(name) - if txn.committer != nil && txn.IsPipelined() { - txn.committer.resourceGroupName = name - } + txn.commitActionContext.setResourceGroupName(name) } // SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot. // interceptor.RPCInterceptor will be executed before each RPC request is initiated. // Note that SetRPCInterceptor will replace the previously set interceptor. func (txn *KVTxn) SetRPCInterceptor(it interceptor.RPCInterceptor) { - txn.interceptor = it txn.GetSnapshot().SetRPCInterceptor(it) + txn.commitActionContext.setRPCInterceptor(it) } // AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. func (txn *KVTxn) AddRPCInterceptor(it interceptor.RPCInterceptor) { - if txn.interceptor == nil { - txn.SetRPCInterceptor(it) - return - } - txn.interceptor = interceptor.ChainRPCInterceptors(txn.interceptor, it) txn.GetSnapshot().AddRPCInterceptor(it) + txn.commitActionContext.addRPCInterceptor(it) } // SetCommitCallback sets up a function that will be called when the transaction @@ -365,34 +334,34 @@ func (txn *KVTxn) SetEnable1PC(b bool) { // guarantee linearizability. Default value is false which means // linearizability is guaranteed. func (txn *KVTxn) SetCausalConsistency(b bool) { - txn.causalConsistency = b + txn.commitActionContext.setCausalConsistency(b) } // SetScope sets the geographical scope of the transaction. func (txn *KVTxn) SetScope(scope string) { - txn.scope = scope + txn.commitActionContext.setScope(scope) } // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { - txn.kvFilter = filter + txn.commitActionContext.setKVFilter(filter) } // SetCommitTSUpperBoundCheck provide a way to restrict the commit TS upper bound. // The 2PC processing will pass the commitTS for the checker function, if the function // returns false, the 2PC processing will abort. func (txn *KVTxn) SetCommitTSUpperBoundCheck(f func(commitTS uint64) bool) { - txn.commitTSUpperBoundCheck = f + txn.commitActionContext.setCommitTSUpperBoundCheck(f) } // SetDiskFullOpt sets whether current operation is allowed in each TiKV disk usage level. func (txn *KVTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) { - txn.diskFullOpt = level + txn.commitActionContext.setDiskFullOpt(level) } // SetTxnSource sets the source of the transaction. func (txn *KVTxn) SetTxnSource(txnSource uint64) { - txn.txnSource = txnSource + txn.commitActionContext.setTxnSource(txnSource) } // GetDiskFullOpt gets the options of current operation in each TiKV disk usage level. @@ -402,12 +371,12 @@ func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt { // ClearDiskFullOpt clears the options of current operation in each tikv disk usage level. func (txn *KVTxn) ClearDiskFullOpt() { - txn.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull + txn.commitActionContext.setDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } // SetAssertionLevel sets how strict the assertions in the transaction should be. func (txn *KVTxn) SetAssertionLevel(assertionLevel kvrpcpb.AssertionLevel) { - txn.assertionLevel = assertionLevel + txn.commitActionContext.setAssertionLevel(assertionLevel) } // IsPessimistic returns true if it is pessimistic. @@ -458,6 +427,7 @@ func (txn *KVTxn) InitPipelinedMemDB() error { flushedKeys += memdb.Len() flushedSize += memdb.Size() }() + txn.commitActionContext.setRunning() logutil.BgLogger().Info("[pipelined dml] flush memdb to kv store", zap.Int("keys", memdb.Len()), zap.String("size", units.HumanSize(float64(memdb.Size()))), zap.Int("flushed keys", flushedKeys), zap.String("flushed size", units.HumanSize(float64(flushedSize)))) @@ -554,11 +524,6 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } return txn.committer.pipelinedFlushMutations(bo, mutations, generation) }) - txn.committer.priority = txn.priority.ToPB() - txn.committer.syncLog = txn.syncLog - txn.committer.resourceGroupTag = txn.resourceGroupTag - txn.committer.resourceGroupTagger = txn.resourceGroupTagger - txn.committer.resourceGroupName = txn.resourceGroupName txn.us = unionstore.NewUnionStore(pipelinedMemDB, txn.snapshot) return nil } @@ -638,8 +603,6 @@ func (txn *KVTxn) Commit(ctx context.Context) error { txn.committer = committer } - committer.SetDiskFullOpt(txn.diskFullOpt) - committer.SetTxnSource(txn.txnSource) txn.committer.forUpdateTSConstraints = txn.forUpdateTSChecks defer committer.ttlManager.close() @@ -716,7 +679,6 @@ func (txn *KVTxn) Commit(ctx context.Context) error { func (txn *KVTxn) close() { txn.valid = false - txn.ClearDiskFullOpt() } // Rollback undoes the transaction operations to KV store. @@ -766,6 +728,8 @@ func (txn *KVTxn) Rollback() error { txn.committer.ttlManager.close() } txn.close() + // forbid access to the committer after rollback. + txn.committer = nil logutil.BgLogger().Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS())) if txn.isInternal() { metrics.TxnCmdHistogramWithRollbackInternal.Observe(time.Since(start).Seconds()) @@ -1611,10 +1575,7 @@ func (txn *KVTxn) GetSnapshot() *txnsnapshot.KVSnapshot { // SetBinlogExecutor sets the method to perform binlong synchronization. func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) { - txn.binlog = binlog - if txn.committer != nil { - txn.committer.binlog = binlog - } + txn.commitActionContext.setBinlog(binlog) } // GetClusterID returns store's cluster id. @@ -1634,15 +1595,202 @@ func (txn *KVTxn) Mem() uint64 { // SetRequestSourceInternal sets the scope of the request source. func (txn *KVTxn) SetRequestSourceInternal(internal bool) { - txn.RequestSource.SetRequestSourceInternal(internal) + txn.commitActionContext.setRequestSourceInternal(internal) } // SetRequestSourceType sets the type of the request source. func (txn *KVTxn) SetRequestSourceType(tp string) { - txn.RequestSource.SetRequestSourceType(tp) + txn.commitActionContext.setRequestSourceType(tp) } // SetExplicitRequestSourceType sets the explicit type of the request source. func (txn *KVTxn) SetExplicitRequestSourceType(tp string) { - txn.RequestSource.SetExplicitRequestSourceType(tp) + txn.commitActionContext.setExplicitRequestSourceType(tp) +} + +// commitActionContext is the context will be read in committer. +// If the action is processing in background, like actionPipelinedFlush, we must ensure no write to the context before the action is done. +// the edit of committer must though this context, so that it's protected with action running status. +// the commitActionContext also guarantee the option consistency between the committer and the txn. +type commitActionContext struct { + running atomic.Bool + txn *KVTxn + + // schemaVer is the infoSchema fetched at startTS. + schemaVer SchemaVer + syncLog bool + priority txnutil.Priority + binlog BinlogExecutor + schemaLeaseChecker SchemaLeaseChecker + causalConsistency bool + scope string + kvFilter KVFilter + resourceGroupTag []byte + diskFullOpt kvrpcpb.DiskFullOpt + txnSource uint64 + commitTSUpperBoundCheck func(uint64) bool + assertionLevel kvrpcpb.AssertionLevel + *util.RequestSource + // resourceGroupName is the name of tenant resource group. + resourceGroupName string + // use this when resourceGroupTag is nil + resourceGroupTagger tikvrpc.ResourceGroupTagger + // interceptor is used to decorate the RPC request logic related to the txn. + interceptor interceptor.RPCInterceptor +} + +func defaultCommitActionContext(txn *KVTxn) *commitActionContext { + return &commitActionContext{ + running: atomic.Bool{}, + txn: txn, + syncLog: false, + priority: txnutil.PriorityNormal, + diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: txn.snapshot.RequestSource, + } +} + +func (ctx *commitActionContext) applyToCommitter(committer *twoPhaseCommitter) { + committer.syncLog = ctx.syncLog + committer.priority = ctx.priority.ToPB() + committer.binlog = ctx.binlog + committer.resourceGroupTag = ctx.resourceGroupTag + committer.resourceGroupTagger = ctx.resourceGroupTagger + committer.diskFullOpt = ctx.diskFullOpt + committer.txnSource = ctx.txnSource +} + +func (ctx *commitActionContext) setRunning() { + ctx.running.Store(true) +} + +func (ctx *commitActionContext) assertIdle() { + if !intest.InTest { + return + } + if ctx.running.Load() { + panic("commit action is running") + } +} + +func (ctx *commitActionContext) setSchemaVer(schemaVer SchemaVer) { + ctx.assertIdle() + ctx.schemaVer = schemaVer +} + +func (ctx *commitActionContext) setForceSyncLog(syncLog bool) { + ctx.assertIdle() + ctx.syncLog = syncLog + if ctx.txn.committer != nil { + ctx.txn.committer.syncLog = syncLog + } +} + +func (ctx *commitActionContext) setPriority(pri txnutil.Priority) { + ctx.assertIdle() + ctx.priority = pri + if ctx.txn.committer != nil { + ctx.txn.committer.priority = pri.ToPB() + } +} + +func (ctx *commitActionContext) setBinlog(binlog BinlogExecutor) { + ctx.assertIdle() + ctx.binlog = binlog + if ctx.txn.committer != nil { + ctx.txn.committer.binlog = binlog + } +} + +func (ctx *commitActionContext) setSchemaLeaseChecker(schemaLeaseChecker SchemaLeaseChecker) { + ctx.assertIdle() + ctx.schemaLeaseChecker = schemaLeaseChecker +} + +func (ctx *commitActionContext) setCausalConsistency(causalConsistency bool) { + ctx.assertIdle() + ctx.causalConsistency = causalConsistency +} + +func (ctx *commitActionContext) setScope(scope string) { + ctx.assertIdle() + ctx.scope = scope +} + +func (ctx *commitActionContext) setKVFilter(kvFilter KVFilter) { + ctx.assertIdle() + ctx.kvFilter = kvFilter +} + +func (ctx *commitActionContext) setResourceGroupTag(resourceTag []byte) { + ctx.assertIdle() + ctx.resourceGroupTag = resourceTag + if ctx.txn.committer != nil { + ctx.txn.committer.resourceGroupTag = resourceTag + } +} + +func (ctx *commitActionContext) setResourceGroupTagger(resourceTagger tikvrpc.ResourceGroupTagger) { + ctx.assertIdle() + ctx.resourceGroupTagger = resourceTagger + if ctx.txn.committer != nil { + ctx.txn.committer.resourceGroupTagger = resourceTagger + } +} + +func (ctx *commitActionContext) setDiskFullOpt(level kvrpcpb.DiskFullOpt) { + ctx.assertIdle() + ctx.diskFullOpt = level + if ctx.txn.committer != nil { + ctx.txn.committer.diskFullOpt = level + } +} + +func (ctx *commitActionContext) setTxnSource(txnSource uint64) { + ctx.assertIdle() + ctx.txnSource = txnSource + if ctx.txn.committer != nil { + ctx.txn.committer.txnSource = txnSource + } +} + +func (ctx *commitActionContext) setCommitTSUpperBoundCheck(check func(uint64) bool) { + ctx.assertIdle() + ctx.commitTSUpperBoundCheck = check +} + +func (ctx *commitActionContext) setAssertionLevel(level kvrpcpb.AssertionLevel) { + ctx.assertIdle() + ctx.assertionLevel = level +} + +func (ctx *commitActionContext) setRequestSourceInternal(internal bool) { + ctx.RequestSource.SetRequestSourceInternal(internal) +} + +func (ctx *commitActionContext) setRequestSourceType(tp string) { + ctx.RequestSource.SetRequestSourceType(tp) +} + +func (ctx *commitActionContext) setExplicitRequestSourceType(tp string) { + ctx.RequestSource.SetExplicitRequestSourceType(tp) +} + +func (ctx *commitActionContext) setResourceGroupName(name string) { + ctx.assertIdle() + ctx.resourceGroupName = name +} + +func (ctx *commitActionContext) setRPCInterceptor(it interceptor.RPCInterceptor) { + ctx.assertIdle() + ctx.interceptor = it +} + +func (ctx *commitActionContext) addRPCInterceptor(it interceptor.RPCInterceptor) { + ctx.assertIdle() + if ctx.interceptor == nil { + ctx.interceptor = it + } else { + ctx.interceptor = interceptor.ChainRPCInterceptors(ctx.interceptor, it) + } } diff --git a/util/intest/common.go b/util/intest/common.go new file mode 100644 index 000000000..4c122cbab --- /dev/null +++ b/util/intest/common.go @@ -0,0 +1,19 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !intest + +package intest + +const InTest = false diff --git a/util/intest/intest.go b/util/intest/intest.go new file mode 100644 index 000000000..8beb02ee1 --- /dev/null +++ b/util/intest/intest.go @@ -0,0 +1,20 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build intest + +package intest + +// InTest checks if the code is running in test. +const InTest = true