diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 218ca9cbd6966..d61685a7f8a71 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -16,6 +16,8 @@ package kv import ( "context" "sync" + + "github.com/pingcap/tidb/store/tikv" ) // InjectionConfig is used for fault injections for KV components. @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithOption creates an injected Transaction with given option. -func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *InjectedStore) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 33b6535214b2c..4979dbf4268cd 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -35,7 +36,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 5d85261bc2111..9e41832678294 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -154,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/kv/kv.go b/kv/kv.go index 20b0fc84b7144..471dfe09a110b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/memory" @@ -339,59 +340,13 @@ type Driver interface { Open(path string) (Storage, error) } -// TransactionOption indicates the option when beginning a transaction -// `TxnScope` must be set for each object -// Every other fields are optional, but currently at most one of them can be set -type TransactionOption struct { - TxnScope string - StartTS *uint64 - PrevSec *uint64 - MinStartTS *uint64 - MaxPrevSec *uint64 -} - -// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used -func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} -} - -// SetMaxPrevSec set maxPrevSec -func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { - to.MaxPrevSec = &maxPrevSec - return to -} - -// SetMinStartTS set minStartTS -func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { - to.MinStartTS = &minStartTS - return to -} - -// SetStartTs set startTS -func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { - to.StartTS = &startTS - return to -} - -// SetPrevSec set prevSec -func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { - to.PrevSec = &prevSec - return to -} - -// SetTxnScope set txnScope -func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { - to.TxnScope = txnScope - return to -} - // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { // Begin a global transaction Begin() (Transaction, error) // Begin a transaction with given option - BeginWithOption(option TransactionOption) (Transaction, error) + BeginWithOption(option tikv.StartTSOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/session/session.go b/session/session.go index efd6706c4ffb3..c9b3f8f7a8abd 100644 --- a/session/session.go +++ b/session/session.go @@ -1975,7 +1975,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2768,7 +2768,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2801,22 +2801,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } diff --git a/session/txn.go b/session/txn.go index 133cafb976aae..294725f8efaa0 100644 --- a/session/txn.go +++ b/session/txn.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" @@ -436,14 +437,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index cc0f217280f31..2d93b7eda4abb 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithOption begins a transaction with given option -func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *tikvStore) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, derr.ToTiDBErr(err) diff --git a/store/helper/helper.go b/store/helper/helper.go index 49aa7cf2107e0..8eb9b9d7db828 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,7 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) + BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 6221ef855707d..7d78d1a9b7418 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{}, } // BeginWithOption begins a transaction with given option -func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return newTiKVTxn(s.KVStore.BeginWithOption(option)) } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index edaef3b4744d7..622313f382abd 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -189,11 +188,11 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithOption(tidbkv.DefaultTransactionOption()) + return s.BeginWithOption(DefaultStartTSOption()) } -// BeginWithOption begins a transaction with the given TransactionOption -func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) { +// BeginWithOption begins a transaction with the given StartTSOption +func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) { return newTiKVTxnWithOptions(s, options) } @@ -389,6 +388,7 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 { return safeTS.(uint64) } +// setSafeTS sets safeTs for store storeID, export for testing func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index 1a8dc5062218d..3e80e6310fe4b 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error { return saveSafePoint(s.GetSafePointKV(), v) } +// SetRegionCacheStore is used to set a store in region cache, for testing only +func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { + s.regionCache.storeMu.Lock() + defer s.regionCache.storeMu.Unlock() + s.regionCache.storeMu.stores[id] = &Store{ + storeID: id, + storeType: storeType, + state: state, + labels: labels, + } +} + +// SetSafeTS is used to set safeTS for the store with `storeID` +func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) { + s.setSafeTS(storeID, safeTS) +} + // TxnProbe wraps a txn and exports internal states for testing purpose. type TxnProbe struct { *KVTxn diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 5589752043b2b..43b682160c514 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - tidbkv "github.com/pingcap/tidb/kv" drivertxn "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) + txn1, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(committer.GetStartTS() + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.GetCtx(), []byte("x")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) + txn2, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.GetCtx(), []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go similarity index 54% rename from store/tikv/extract_start_ts_test.go rename to store/tikv/tests/extract_start_ts_test.go index b392ca365cde8..566211006b66c 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -11,20 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) type extractStartTsSuite struct { - store *KVStore + store *tikv.KVStore } var _ = SerialSuites(&extractStartTsSuite{}) @@ -33,31 +33,24 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { client, pdClient, cluster, err := unistore.New("") c.Assert(err, IsNil) unistore.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - store.regionCache.storeMu.stores[2] = &Store{ - storeID: 2, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: oracle.LocalTxnScope, - }, + probe := tikv.StoreProbe{KVStore: store} + probe.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: oracle.LocalTxnScope, }, - } - store.regionCache.storeMu.stores[3] = &Store{ - storeID: 3, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{{ - Key: DCLabelKey, + }) + probe.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, Value: "Some Random Label", - }}, - } - store.setSafeTS(2, 102) - store.setSafeTS(3, 101) - s.store = store + }, + }) + probe.SetSafeTS(2, 102) + probe.SetSafeTS(3, 101) + s.store = probe.KVStore } func (s *extractStartTsSuite) TestExtractStartTs(c *C) { @@ -69,26 +62,26 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ // StartTS setted - {100, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, expected) } @@ -97,18 +90,19 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.setSafeTS(2, 0x8000000000000002) - s.store.setSafeTS(3, 0x8000000000000001) + probe := tikv.StoreProbe{KVStore: s.store} + probe.SetSafeTS(2, 0x8000000000000002) + probe.SetSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ - {0x8000000000000001, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, cs.expectedTS) } } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index beeeafe66a063..d228b834e00dc 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" tikverr "github.com/pingcap/tidb/store/tikv/error" tikv "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -55,6 +54,52 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } +// StartTSOption indicates the option when beginning a transaction +// `TxnScope` must be set for each object +// Every other fields are optional, but currently at most one of them can be set +type StartTSOption struct { + TxnScope string + StartTS *uint64 + PrevSec *uint64 + MinStartTS *uint64 + MaxPrevSec *uint64 +} + +// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used +func DefaultStartTSOption() StartTSOption { + return StartTSOption{TxnScope: oracle.GlobalTxnScope} +} + +// SetMaxPrevSec returns a new StartTSOption with MaxPrevSec set to maxPrevSec +func (to StartTSOption) SetMaxPrevSec(maxPrevSec uint64) StartTSOption { + to.MaxPrevSec = &maxPrevSec + return to +} + +// SetMinStartTS returns a new StartTSOption with MinStartTS set to minStartTS +func (to StartTSOption) SetMinStartTS(minStartTS uint64) StartTSOption { + to.MinStartTS = &minStartTS + return to +} + +// SetStartTs returns a new StartTSOption with StartTS set to startTS +func (to StartTSOption) SetStartTs(startTS uint64) StartTSOption { + to.StartTS = &startTS + return to +} + +// SetPrevSec returns a new StartTSOption with PrevSec set to prevSec +func (to StartTSOption) SetPrevSec(prevSec uint64) StartTSOption { + to.PrevSec = &prevSec + return to +} + +// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope +func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption { + to.TxnScope = txnScope + return to +} + // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { snapshot *KVSnapshot @@ -90,23 +135,24 @@ type KVTxn struct { kvFilter KVFilter } -func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) { +// ExtractStartTs use `option` to get the proper startTS for a transaction +func ExtractStartTs(store *KVStore, option StartTSOption) (uint64, error) { var startTs uint64 var err error - if options.StartTS != nil { - startTs = *options.StartTS - } else if options.PrevSec != nil { + if option.StartTS != nil { + startTs = *option.StartTS + } else if option.PrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getStalenessTimestamp(bo, options.TxnScope, *options.PrevSec) - } else if options.MinStartTS != nil { + startTs, err = store.getStalenessTimestamp(bo, option.TxnScope, *option.PrevSec) + } else if option.MinStartTS != nil { stores := make([]*Store, 0) allStores := store.regionCache.getStoresByType(tikvrpc.TiKV) - if options.TxnScope != oracle.GlobalTxnScope { + if option.TxnScope != oracle.GlobalTxnScope { for _, store := range allStores { if store.IsLabelsMatch([]*metapb.StoreLabel{ { Key: DCLabelKey, - Value: options.TxnScope, + Value: option.TxnScope, }, }) { stores = append(stores, store) @@ -116,32 +162,32 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error stores = allStores } safeTS := store.getMinSafeTSByStores(stores) - startTs = *options.MinStartTS + startTs = *option.MinStartTS // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. if startTs < safeTS { startTs = safeTS } - } else if options.MaxPrevSec != nil { + } else if option.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - minStartTS, err := store.getStalenessTimestamp(bo, options.TxnScope, *options.MaxPrevSec) + minStartTS, err := store.getStalenessTimestamp(bo, option.TxnScope, *option.MaxPrevSec) if err != nil { return 0, errors.Trace(err) } - options.MinStartTS = &minStartTS - return extractStartTs(store, options) + option.MinStartTS = &minStartTS + return ExtractStartTs(store, option) } else { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getTimestampWithRetry(bo, options.TxnScope) + startTs, err = store.getTimestampWithRetry(bo, option.TxnScope) } return startTs, err } -func newTiKVTxnWithOptions(store *KVStore, options kv.TransactionOption) (*KVTxn, error) { +func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } - startTs, err := extractStartTs(store, options) + startTs, err := ExtractStartTs(store, options) if err != nil { return nil, errors.Trace(err) } diff --git a/util/mock/context.go b/util/mock/context.go index d6a5f1d913902..6df2c9c10d356 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" @@ -204,7 +205,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 7c86de4b3cb72..3adba59e115e5 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -38,7 +39,7 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } // BeginWithOption implements kv.Storage interface. -func (s *Store) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *Store) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return s.Begin() }