Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: move TxnScope into kv #24715

Merged
merged 6 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1343,7 +1342,7 @@ func getMaxTableHandle(ctx *testMaxTableRowIDContext, store kv.Storage) (kv.Hand
c := ctx.c
d := ctx.d
tbl := ctx.tbl
curVer, err := store.CurrentVersion(oracle.GlobalTxnScope)
curVer, err := store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
maxHandle, emptyTable, err := d.GetTableMaxHandle(curVer.Ver, tbl.(table.PhysicalTable))
c.Assert(err, IsNil)
Expand Down
3 changes: 1 addition & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -451,7 +450,7 @@ func doBatchInsert(s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint

// getNowTS gets the current timestamp, in TSO.
func getNowTSO(ctx sessionctx.Context) (uint64, error) {
currVer, err := ctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
currVer, err := ctx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
3 changes: 1 addition & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -534,7 +533,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior
}

func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
ver, err = store.CurrentVersion(oracle.GlobalTxnScope)
ver, err = store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return ver, errors.Trace(err)
} else if ver.Ver <= 0 {
Expand Down
9 changes: 4 additions & 5 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -236,7 +235,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.txnScope = sv.TxnCtx.TxnScope
builder.IsStaleness = sv.TxnCtx.IsStaleness
if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope {
if builder.IsStaleness && builder.txnScope != kv.GlobalTxnScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down Expand Up @@ -279,9 +278,9 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = oracle.GlobalTxnScope
builder.txnScope = kv.GlobalTxnScope
}
if builder.txnScope == oracle.GlobalTxnScope || builder.is == nil {
if builder.txnScope == kv.GlobalTxnScope || builder.is == nil {
return nil
}
visitPhysicalTableID := make(map[int64]struct{})
Expand Down Expand Up @@ -600,7 +599,7 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra

// VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation.
func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool {
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
if txnScope == "" || txnScope == kv.GlobalTxnScope {
return true
}
bundle, ok := is.BundleByName(placement.GroupID(physicalTableID))
Expand Down
3 changes: 1 addition & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -336,7 +335,7 @@ func (do *Domain) Reload() error {
defer do.m.Unlock()

startTime := time.Now()
ver, err := do.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := do.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (*testSuite) TestT(c *C) {

// for schemaValidator
schemaVer := dom.SchemaValidator.(*schemaValidator).LatestSchemaVersion()
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
ts := ver.Ver

Expand All @@ -360,7 +360,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(succ, Equals, ResultSucc)
time.Sleep(ddlLease)

ver, err = store.CurrentVersion(oracle.GlobalTxnScope)
ver, err = store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
ts = ver.Ver
_, succ = dom.SchemaValidator.Check(ts, schemaVer, nil)
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
pl := is.manager.ShowProcessList()

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope)
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
return
Expand Down
3 changes: 1 addition & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -122,7 +121,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2693,11 +2693,11 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

curVer1, _ := s.store.CurrentVersion(oracle.GlobalTxnScope)
curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
snapshotTime := time.Now()
time.Sleep(time.Millisecond)
curVer2, _ := s.store.CurrentVersion(oracle.GlobalTxnScope)
curVer2, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
tk.MustExec("insert history_read values (2)")
tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2"))
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
Expand Down
5 changes: 2 additions & 3 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -153,7 +152,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down Expand Up @@ -392,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)

func (e *PointGetExecutor) verifyTxnScope() error {
txnScope := e.txn.GetOption(kv.TxnScope).(string)
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
if txnScope == "" || txnScope == kv.GlobalTxnScope {
return nil
}
var tblID int64
Expand Down
3 changes: 2 additions & 1 deletion executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: oracle.GlobalTxnScope,
txnScope: kv.GlobalTxnScope,
zone: "",
},
}
Expand Down
3 changes: 1 addition & 2 deletions kv/fault_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

type testFaultInjectionSuite struct{}
Expand All @@ -36,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
storage := NewInjectedStore(newMockStorage(), &cfg)
txn, err := storage.Begin()
c.Assert(err, IsNil)
_, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0))
_, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(GlobalTxnScope).SetStartTs(0))
c.Assert(err, IsNil)
ver := Version{Ver: 1}
snap := storage.GetSnapshot(ver)
Expand Down
3 changes: 1 addition & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/oracle"
)

var _ = Suite(testMockSuite{})
Expand All @@ -29,7 +28,7 @@ func (s testMockSuite) TestInterface(c *C) {
storage := newMockStorage()
storage.GetClient()
storage.UUID()
version, err := storage.CurrentVersion(oracle.GlobalTxnScope)
version, err := storage.CurrentVersion(GlobalTxnScope)
c.Check(err, IsNil)
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
Expand Down
73 changes: 73 additions & 0 deletions kv/txn_scope_var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
)

// TxnScopeVar indicates the used txnScope for oracle
type TxnScopeVar struct {
// varValue indicates the value of @@txn_scope, which can only be `global` or `local`
varValue string
// txnScope indicates the value which the tidb-server holds to request tso to pd
txnScope string
}

// GetTxnScopeVar gets TxnScopeVar from config
func GetTxnScopeVar() TxnScopeVar {
isGlobal, location := config.GetTxnScopeFromConfig()
if isGlobal {
return NewGlobalTxnScopeVar()
}
return NewLocalTxnScopeVar(location)
}

// NewGlobalTxnScopeVar creates a Global TxnScopeVar
func NewGlobalTxnScopeVar() TxnScopeVar {
return newTxnScopeVar(GlobalTxnScope, GlobalTxnScope)
}

// NewLocalTxnScopeVar creates a Local TxnScopeVar with given real txnScope value.
func NewLocalTxnScopeVar(txnScope string) TxnScopeVar {
return newTxnScopeVar(LocalTxnScope, txnScope)
}

// GetVarValue returns the value of @@txn_scope which can only be `global` or `local`
func (t TxnScopeVar) GetVarValue() string {
return t.varValue
}

// GetTxnScope returns the value of the tidb-server holds to request tso to pd.
func (t TxnScopeVar) GetTxnScope() string {
return t.txnScope
}

func newTxnScopeVar(varValue string, txnScope string) TxnScopeVar {
return TxnScopeVar{
varValue: varValue,
txnScope: txnScope,
}
}

// Transaction scopes constants.
const (
// GlobalTxnScope is synced with PD's define of global scope.
// If we want to remove the dependency on store/tikv here, we need to map
// the two GlobalTxnScopes in the driver layer.
GlobalTxnScope = oracle.GlobalTxnScope
// LocalTxnScope indicates the transaction should use local ts.
LocalTxnScope = "local"
)
3 changes: 1 addition & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testleak"
. "github.com/pingcap/tidb/util/testutil"
)
Expand Down Expand Up @@ -291,7 +290,7 @@ func (s *testSuite) TestSnapshot(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

ver1, _ := store.CurrentVersion(oracle.GlobalTxnScope)
ver1, _ := store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
txn, _ = store.Begin()
m = meta.NewMeta(txn)
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/fastrand"
Expand Down Expand Up @@ -311,9 +311,9 @@ func setSSLVariable(ca, key, cert string) {
func setTxnScope() {
variable.SetSysVar("txn_scope", func() string {
if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal {
return oracle.GlobalTxnScope
return kv.GlobalTxnScope
}
return oracle.LocalTxnScope
return kv.LocalTxnScope
}())
}

Expand Down
4 changes: 2 additions & 2 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,14 +2029,14 @@ func (s *testPessimisticSuite) TestSelectForUpdateConflictRetry(c *C) {
tsCh := make(chan uint64)
go func() {
tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 1")
lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope})
c.Assert(err, IsNil)
tsCh <- lastTS
tk3.MustExec("commit")
tsCh <- lastTS
}()
// tk2LastTS should be its forUpdateTS
tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope})
c.Assert(err, IsNil)
tk2.MustExec("commit")

Expand Down
3 changes: 1 addition & 2 deletions session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -426,7 +425,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
}
c.Assert(err, IsNil)
}
curVer, err := se.store.CurrentVersion(oracle.GlobalTxnScope)
curVer, err := se.store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1)
mutationVals, err := txn.BatchGet(ctx, checkKeys)
Expand Down
Loading