Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: resolve ReplicaReadType dependencies #24653

Merged
merged 4 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -324,7 +323,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: tikvstore.ReplicaReadLeader,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -400,7 +399,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: tikvstore.ReplicaReadLeader,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -447,7 +446,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: tikvstore.ReplicaReadLeader,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -494,7 +493,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: tikvstore.ReplicaReadLeader,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -577,10 +576,10 @@ func (s *testSuite) TestRequestBuilder6(c *C) {
}

func (s *testSuite) TestRequestBuilder7(c *C) {
for _, replicaRead := range []tikvstore.ReplicaReadType{
tikvstore.ReplicaReadLeader,
tikvstore.ReplicaReadFollower,
tikvstore.ReplicaReadMixed,
for _, replicaRead := range []kv.ReplicaReadType{
kv.ReplicaReadLeader,
kv.ReplicaReadFollower,
kv.ReplicaReadMixed,
} {
vars := variable.NewSessionVars()
vars.SetReplicaRead(replicaRead)
Expand Down
5 changes: 2 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1322,7 +1321,7 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
Expand All @@ -1345,7 +1344,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.IsolationLevel, kv.RC)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}

rander := rand.New(rand.NewSource(e.randSeed))
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -38,7 +39,6 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().SetReplicaRead(tikvstore.ReplicaReadFollower)
ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower)
tk.MustExec("analyze table t")
}

Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ type Request struct {
// call would not corresponds to a whole region result.
Streaming bool
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead tikvstore.ReplicaReadType
ReplicaRead ReplicaReadType
// StoreType represents this request is sent to the which type of store.
StoreType StoreType
// Cacheable is true if the request can be cached. Currently only deterministic DAG requests can be cached.
Expand Down
17 changes: 17 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,20 @@ const (
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
)

// IsFollowerRead checks if follower is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}
3 changes: 1 addition & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -533,7 +532,7 @@ func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHin
warns = append(warns, warn)
}
stmtHints.HasReplicaReadHint = true
stmtHints.ReplicaRead = byte(tikvstore.ReplicaReadFollower)
stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower)
}
// Handle MAX_EXECUTION_TIME
if maxExecutionTimeCnt != 0 {
Expand Down
5 changes: 2 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -1891,7 +1890,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
s.txn.SetVars(s.sessionVars.KVVars)
if s.sessionVars.GetReplicaRead().IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
return &s.txn, nil
Expand Down Expand Up @@ -1955,7 +1954,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, tikvstore.ReplicaReadFollower)
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
Expand Down
13 changes: 6 additions & 7 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -3064,11 +3063,11 @@ func (s *testSessionSuite2) TestReplicaRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, tikvstore.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, tikvstore.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, tikvstore.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite3) TestIsolationRead(c *C) {
Expand Down Expand Up @@ -3153,12 +3152,12 @@ func (s *testSessionSuite2) TestStmtHints(c *C) {
c.Assert(tk.Se.GetSessionVars().GetEnableCascadesPlanner(), IsTrue)

// Test READ_CONSISTENT_REPLICA hint
tk.Se.GetSessionVars().SetReplicaRead(tikvstore.ReplicaReadLeader)
tk.Se.GetSessionVars().SetReplicaRead(kv.ReplicaReadLeader)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, tikvstore.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA(), READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, tikvstore.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}

func (s *testSessionSuite3) TestPessimisticLockOnPartition(c *C) {
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ type SessionVars struct {
enableIndexMerge bool

// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead tikvstore.ReplicaReadType
replicaRead kv.ReplicaReadType

// IsolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
IsolationReadEngines map[kv.StoreType]struct{}
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func NewSessionVars() *SessionVars {
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
enableIndexMerge: false,
EnableNoopFuncs: DefTiDBEnableNoopFuncs,
replicaRead: tikvstore.ReplicaReadLeader,
replicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
EvolvePlanBaselines: DefTiDBEvolvePlanBaselines,
Expand Down Expand Up @@ -1179,15 +1179,15 @@ func (s *SessionVars) SetEnableIndexMerge(val bool) {
}

// GetReplicaRead get ReplicaRead from sql hints and SessionVars.replicaRead.
func (s *SessionVars) GetReplicaRead() tikvstore.ReplicaReadType {
func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType {
if s.StmtCtx.HasReplicaReadHint {
return tikvstore.ReplicaReadType(s.StmtCtx.ReplicaRead)
return kv.ReplicaReadType(s.StmtCtx.ReplicaRead)
}
return s.replicaRead
}

// SetReplicaRead set SessionVars.replicaRead.
func (s *SessionVars) SetReplicaRead(val tikvstore.ReplicaReadType) {
func (s *SessionVars) SetReplicaRead(val kv.ReplicaReadType) {
s.replicaRead = val
}

Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,11 +1246,11 @@ var defaultSysVars = []*SysVar{
}},
{Scope: ScopeSession, Name: TiDBReplicaRead, Value: "leader", Type: TypeEnum, PossibleValues: []string{"leader", "follower", "leader-and-follower"}, SetSession: func(s *SessionVars, val string) error {
if strings.EqualFold(val, "follower") {
s.SetReplicaRead(tikvstore.ReplicaReadFollower)
s.SetReplicaRead(kv.ReplicaReadFollower)
} else if strings.EqualFold(val, "leader-and-follower") {
s.SetReplicaRead(tikvstore.ReplicaReadMixed)
s.SetReplicaRead(kv.ReplicaReadMixed)
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.SetReplicaRead(tikvstore.ReplicaReadLeader)
s.SetReplicaRead(kv.ReplicaReadLeader)
}
return nil
}},
Expand Down
8 changes: 4 additions & 4 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/testleak"
)

Expand Down Expand Up @@ -431,19 +431,19 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
val, err = GetSessionOrGlobalSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "follower")
c.Assert(v.GetReplicaRead(), Equals, tikvstore.ReplicaReadFollower)
c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadFollower)
err = SetSessionSystemVar(v, TiDBReplicaRead, "leader")
c.Assert(err, IsNil)
val, err = GetSessionOrGlobalSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader")
c.Assert(v.GetReplicaRead(), Equals, tikvstore.ReplicaReadLeader)
c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadLeader)
err = SetSessionSystemVar(v, TiDBReplicaRead, "leader-and-follower")
c.Assert(err, IsNil)
val, err = GetSessionOrGlobalSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader-and-follower")
c.Assert(v.GetReplicaRead(), Equals, tikvstore.ReplicaReadMixed)
c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadMixed)

err = SetSessionSystemVar(v, TiDBEnableStmtSummary, "ON")
c.Assert(err, IsNil)
Expand Down
3 changes: 2 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/driver/options"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand Down Expand Up @@ -697,7 +698,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}
}

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, &worker.replicaReadSeed, kvrpcpb.Context{
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: priorityToPB(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
Expand Down
32 changes: 32 additions & 0 deletions store/driver/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"github.com/pingcap/tidb/kv"
storekv "github.com/pingcap/tidb/store/tikv/kv"
)

// GetTiKVReplicaReadType maps kv.ReplicaReadType to tikv/kv.ReplicaReadType.
func GetTiKVReplicaReadType(t kv.ReplicaReadType) storekv.ReplicaReadType {
switch t {
case kv.ReplicaReadLeader:
return storekv.ReplicaReadLeader
case kv.ReplicaReadFollower:
return storekv.ReplicaReadFollower
case kv.ReplicaReadMixed:
return storekv.ReplicaReadMixed
}
return 0
}
5 changes: 3 additions & 2 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/driver/options"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
)

type tikvSnapshot struct {
Expand Down Expand Up @@ -76,7 +76,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
case kv.SnapshotTS:
s.KVSnapshot.SetSnapshotTS(val.(uint64))
case kv.ReplicaRead:
s.KVSnapshot.SetReplicaRead(val.(tikvstore.ReplicaReadType))
t := options.GetTiKVReplicaReadType(val.(kv.ReplicaReadType))
s.KVSnapshot.SetReplicaRead(t)
case kv.SampleStep:
s.KVSnapshot.SetSampleStep(val.(uint32))
case kv.TaskID:
Expand Down
Loading