Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: break dependency from store/tikv to tidb/kv cause by TransactionOption #24656

Merged
merged 11 commits into from
May 19, 2021
4 changes: 3 additions & 1 deletion kv/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv
import (
"context"
"sync"

"github.com/pingcap/tidb/store/tikv"
)

// InjectionConfig is used for fault injections for KV components.
Expand Down Expand Up @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) {
}

// BeginWithOption creates an injected Transaction with given option.
func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) {
func (s *InjectedStore) BeginWithOption(option tikv.TransactionOption) (Transaction, error) {
txn, err := s.Storage.BeginWithOption(option)
return &InjectedTransaction{
Transaction: txn,
Expand Down
3 changes: 2 additions & 1 deletion kv/fault_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

Expand All @@ -35,7 +36,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
storage := NewInjectedStore(newMockStorage(), &cfg)
txn, err := storage.Begin()
c.Assert(err, IsNil)
_, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0))
_, err = storage.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0))
c.Assert(err, IsNil)
ver := Version{Ver: 1}
snap := storage.GetSnapshot(ver)
Expand Down
3 changes: 2 additions & 1 deletion kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

Expand Down Expand Up @@ -154,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) {
return newMockTxn(), nil
}

func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) {
func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (Transaction, error) {
return newMockTxn(), nil
}

Expand Down
49 changes: 2 additions & 47 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -339,59 +340,13 @@ type Driver interface {
Open(path string) (Storage, error)
}

// TransactionOption indicates the option when beginning a transaction
// `TxnScope` must be set for each object
// Every other fields are optional, but currently at most one of them can be set
type TransactionOption struct {
TxnScope string
StartTS *uint64
PrevSec *uint64
MinStartTS *uint64
MaxPrevSec *uint64
}

// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used
func DefaultTransactionOption() TransactionOption {
return TransactionOption{TxnScope: oracle.GlobalTxnScope}
}

// SetMaxPrevSec set maxPrevSec
func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption {
to.MaxPrevSec = &maxPrevSec
return to
}

// SetMinStartTS set minStartTS
func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption {
to.MinStartTS = &minStartTS
return to
}

// SetStartTs set startTS
func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption {
to.StartTS = &startTS
return to
}

// SetPrevSec set prevSec
func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption {
to.PrevSec = &prevSec
return to
}

// SetTxnScope set txnScope
func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption {
to.TxnScope = txnScope
return to
}

// Storage defines the interface for storage.
// Isolation should be at least SI(SNAPSHOT ISOLATION)
type Storage interface {
// Begin a global transaction
Begin() (Transaction, error)
// Begin a transaction with given option
BeginWithOption(option TransactionOption) (Transaction, error)
BeginWithOption(option tikv.TransactionOption) (Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver Version) Snapshot
Expand Down
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1949,7 +1949,7 @@ func (s *session) NewTxn(ctx context.Context) error {
zap.String("txnScope", txnScope))
}

txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
if err != nil {
return err
}
Expand Down Expand Up @@ -2746,7 +2746,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
}

// no need to get txn from txnFutureCh since txn should init with startTs
txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
txn, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS))
if err != nil {
return err
}
Expand Down Expand Up @@ -2779,22 +2779,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc
txnScope := s.GetSessionVars().CheckAndGetTxnScope()
switch option.Mode {
case ast.TimestampBoundReadTimestamp:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS))
txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS))
if err != nil {
return err
}
case ast.TimestampBoundExactStaleness:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec))
if err != nil {
return err
}
case ast.TimestampBoundMaxStaleness:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec))
if err != nil {
return err
}
case ast.TimestampBoundMinReadTimestamp:
txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
txn, err = s.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS))
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -436,14 +437,14 @@ type txnFuture struct {
func (tf *txnFuture) wait() (kv.Transaction, error) {
startTS, err := tf.future.Wait()
if err == nil {
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS))
return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS))
} else if config.GetGlobalConfig().Store == "unistore" {
return nil, err
}

logutil.BgLogger().Warn("wait tso failed", zap.Error(err))
// It would retry get timestamp.
return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope))
return tf.store.BeginWithOption(tikv.DefaultTransactionOption().SetTxnScope(tf.txnScope))
}

func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
Expand Down
2 changes: 1 addition & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) {
}

// BeginWithOption begins a transaction with given option
func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
func (s *tikvStore) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithOption(option)
if err != nil {
return nil, derr.ToTiDBErr(err)
Expand Down
2 changes: 1 addition & 1 deletion store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13.
type Storage interface {
Begin() (kv.Transaction, error)
BeginWithOption(option kv.TransactionOption) (kv.Transaction, error)
BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error)
GetSnapshot(ver kv.Version) kv.Snapshot
GetClient() kv.Client
GetMPPClient() kv.MPPClient
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mockstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{},
}

// BeginWithOption begins a transaction with given option
func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) {
func (s *mockStorage) BeginWithOption(option tikv.TransactionOption) (kv.Transaction, error) {
return newTiKVTxn(s.KVStore.BeginWithOption(option))
}

Expand Down
6 changes: 3 additions & 3 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
Expand Down Expand Up @@ -187,11 +186,11 @@ func (s *KVStore) runSafePointChecker() {

// Begin a global transaction.
func (s *KVStore) Begin() (*KVTxn, error) {
return s.BeginWithOption(tidbkv.DefaultTransactionOption())
return s.BeginWithOption(DefaultTransactionOption())
}

// BeginWithOption begins a transaction with the given TransactionOption
func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) {
func (s *KVStore) BeginWithOption(options TransactionOption) (*KVTxn, error) {
return newTiKVTxnWithOptions(s, options)
}

Expand Down Expand Up @@ -366,6 +365,7 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 {
return safeTS.(uint64)
}

// setSafeTS sets safeTs for store storeID, export for testing
func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
Expand Down Expand Up @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {
return saveSafePoint(s.GetSafePointKV(), v)
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
s.regionCache.storeMu.Lock()
defer s.regionCache.storeMu.Unlock()
s.regionCache.storeMu.stores[id] = &Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
}
}

// SetSafeTS is used to set safeTS for the store with `storeID`
func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) {
s.setSafeTS(storeID, safeTS)
}

// TxnProbe wraps a txn and exports internal states for testing purpose.
type TxnProbe struct {
*KVTxn
Expand Down
5 changes: 2 additions & 3 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
drivertxn "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
// Use max.Uint64 to read the data and success.
// That means the final commitTS > startTS+2, it's not the one we provide.
// So we cover the rety commitTS logic.
txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2))
txn1, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2))
c.Assert(err, IsNil)
_, err = txn1.Get(bo.GetCtx(), []byte("x"))
c.Assert(tikverr.IsErrNotFound(err), IsTrue)

txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64))
txn2, err := s.store.BeginWithOption(tikv.DefaultTransactionOption().SetStartTs(math.MaxUint64))
c.Assert(err, IsNil)
val, err := txn2.Get(bo.GetCtx(), []byte("x"))
c.Assert(err, IsNil)
Expand Down
Loading