Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add support for execute prepared statement to staleread.Processor #32941

Merged
merged 29 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
21abdcb
*: Provide staleReadProcessor to process stale read
lcwangchao Mar 1, 2022
0134b55
update
lcwangchao Mar 1, 2022
0c52cb1
build
lcwangchao Mar 1, 2022
970b651
update
lcwangchao Mar 1, 2022
76c2a42
fix
lcwangchao Mar 1, 2022
4ff98bf
fix
lcwangchao Mar 1, 2022
c911be8
update
lcwangchao Mar 1, 2022
7b52912
add tests
lcwangchao Mar 2, 2022
5428abf
Merge branch 'master' into stalereadprocessor
lcwangchao Mar 2, 2022
7fbd50c
update
lcwangchao Mar 3, 2022
5a12637
Merge branch 'master' into stalereadprocessor
lcwangchao Mar 3, 2022
f77e8bb
execute
lcwangchao Mar 3, 2022
7b44a26
comments
lcwangchao Mar 3, 2022
f9de325
Merge branch 'master' into stalereadprocessor
lcwangchao Mar 3, 2022
f6d73e6
Merge branch 'stalereadprocessor' into stalereadprocessorexecute
lcwangchao Mar 3, 2022
617d9ab
update
lcwangchao Mar 3, 2022
8dbfa19
update
lcwangchao Mar 4, 2022
327b1fd
Merge branch 'master' into stalereadprocessor
lcwangchao Mar 4, 2022
adfe268
address comments
lcwangchao Mar 8, 2022
dde201b
Merge branch 'stalereadprocessor' into stalereadprocessorexecute
lcwangchao Mar 8, 2022
b1454ca
Merge branch 'master' into stalereadprocessorexecute
lcwangchao Mar 8, 2022
a93c279
update
lcwangchao Mar 8, 2022
4c11140
add tests
lcwangchao Mar 9, 2022
f795ca8
update
lcwangchao Mar 9, 2022
e5b8d50
update
lcwangchao Mar 9, 2022
544caf6
Merge branch 'master' into stalereadprocessorexecute
lcwangchao Apr 7, 2022
1232422
update
lcwangchao Apr 7, 2022
725b319
Merge branch 'master' into stalereadprocessorexecute
lcwangchao Apr 7, 2022
d3e0e8a
Merge branch 'master' into stalereadprocessorexecute
ti-chi-bot Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 28 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,12 +719,6 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
b.snapshotTS = v.SnapshotTS
b.isStaleness = v.IsStaleness
b.readReplicaScope = v.ReadReplicaScope
if b.snapshotTS != 0 {
b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
}
e := &ExecuteExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand All @@ -735,6 +729,34 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
plan: v.Plan,
outputNames: v.OutputNames(),
}

failpoint.Inject("assertStaleReadValuesSameWithExecuteAndBuilder", func() {
// This fail point is used to assert the behavior after refactoring is exactly the same with the previous implement.
// Some variables in `plannercore.Execute` is deprecated and only be used for asserting now.
if b.snapshotTS != v.SnapshotTS {
panic(fmt.Sprintf("%d != %d", b.snapshotTS, v.SnapshotTS))
}

if b.isStaleness != v.IsStaleness {
panic(fmt.Sprintf("%v != %v", b.isStaleness, v.IsStaleness))
}

if b.readReplicaScope != v.ReadReplicaScope {
panic(fmt.Sprintf("%s != %s", b.readReplicaScope, v.ReadReplicaScope))
}

if v.SnapshotTS != 0 {
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
if err != nil {
panic(err)
}

if b.is.SchemaMetaVersion() != is.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", b.is.SchemaMetaVersion(), is.SchemaMetaVersion()))
}
}
})

failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
Expand Down
22 changes: 13 additions & 9 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
ID uint32, is infoschema.InfoSchema, snapshotTS uint64, args []types.Datum) (*ExecStmt, bool, bool, error) {
ID uint32, is infoschema.InfoSchema, snapshotTS uint64, replicaReadScope string, args []types.Datum) (*ExecStmt, bool, bool, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
Expand All @@ -356,6 +356,8 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
if err := ResetContextOfStmt(sctx, execStmt); err != nil {
return nil, false, false, err
}
isStaleness := snapshotTS != 0
sctx.GetSessionVars().StmtCtx.IsStaleness = isStaleness
execStmt.BinaryArgs = args
execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
Expand All @@ -368,14 +370,16 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
SnapshotTS: snapshotTS,
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execStmt,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
IsStaleness: isStaleness,
SnapshotTS: snapshotTS,
ReplicaReadScope: replicaReadScope,
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[ID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
Expand Down
3 changes: 2 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/mysql"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -158,7 +159,7 @@ func TestPrepared(t *testing.T) {

// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), stmtID,
tk.Session().GetInfoSchema().(infoschema.InfoSchema), 0, []types.Datum{types.NewDatum(1)})
tk.Session().GetInfoSchema().(infoschema.InfoSchema), 0, kv.GlobalReplicaScope, []types.Datum{types.NewDatum(1)})
require.NoError(t, err)
require.Equal(t, query, stmt.OriginText())

Expand Down
56 changes: 56 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,18 @@ import (
"github.com/tikv/client-go/v2/oracle"
)

func enableStalereadCommonFailPoint(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return"))
return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan"))
}
}

func TestExactStalenessTransaction(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()
testcases := []struct {
name string
preSQL string
Expand Down Expand Up @@ -94,6 +105,9 @@ func TestExactStalenessTransaction(t *testing.T) {
}

func TestSelectAsOf(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -249,6 +263,9 @@ func TestSelectAsOf(t *testing.T) {
}

func TestStaleReadKVRequest(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -344,6 +361,9 @@ func TestStaleReadKVRequest(t *testing.T) {
}

func TestStalenessAndHistoryRead(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -428,6 +448,9 @@ func TestStalenessAndHistoryRead(t *testing.T) {
}

func TestTimeBoundedStalenessTxn(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -524,6 +547,9 @@ func TestStalenessTransactionSchemaVer(t *testing.T) {
}

func TestSetTransactionReadOnlyAsOf(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04")
require.NoError(t, err)
store, clean := testkit.CreateMockStore(t)
Expand Down Expand Up @@ -591,6 +617,9 @@ func TestSetTransactionReadOnlyAsOf(t *testing.T) {
}

func TestValidateReadOnlyInStalenessTransaction(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

errMsg1 := ".*only support read-only statement during read-only staleness transactions.*"
errMsg2 := ".*select lock hasn't been supported in stale read yet.*"
testcases := []struct {
Expand Down Expand Up @@ -770,6 +799,9 @@ func TestValidateReadOnlyInStalenessTransaction(t *testing.T) {
}

func TestSpecialSQLInStalenessTxn(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -825,6 +857,9 @@ func TestSpecialSQLInStalenessTxn(t *testing.T) {
}

func TestAsOfTimestampCompatibility(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -882,6 +917,9 @@ func TestAsOfTimestampCompatibility(t *testing.T) {
}

func TestSetTransactionInfoSchema(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -923,6 +961,9 @@ func TestSetTransactionInfoSchema(t *testing.T) {
}

func TestStaleSelect(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1009,6 +1050,9 @@ func TestStaleReadFutureTime(t *testing.T) {
}

func TestStaleReadPrepare(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1065,6 +1109,9 @@ func TestStaleReadPrepare(t *testing.T) {
}

func TestStmtCtxStaleFlag(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1160,6 +1207,9 @@ func TestStmtCtxStaleFlag(t *testing.T) {
}

func TestStaleSessionQuery(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1200,6 +1250,9 @@ func TestStaleSessionQuery(t *testing.T) {
}

func TestStaleReadCompatibility(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1246,6 +1299,9 @@ func TestStaleReadCompatibility(t *testing.T) {
}

func TestStaleReadNoExtraTSORequest(t *testing.T) {
disableCommonFailPoint := enableStalereadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down
52 changes: 35 additions & 17 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -186,12 +187,15 @@ type Prepare struct {
type Execute struct {
baseSchemaProducer

Name string
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
SnapshotTS uint64
IsStaleness bool
Name string
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
// Deprecated: SnapshotTS now is only used for asserting after refactoring stale read, it will be removed later.
SnapshotTS uint64
// Deprecated: IsStaleness now is only used for asserting after refactoring stale read, it will be removed later.
IsStaleness bool
// Deprecated: ReadReplicaScope now is only used for asserting after refactoring stale read, it will be removed later.
ReadReplicaScope string
Stmt ast.StmtNode
StmtType string
Expand Down Expand Up @@ -266,17 +270,33 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
vars.PreparedParams = append(vars.PreparedParams, val)
}
}
snapshotTS, readReplicaScope, isStaleness, err := e.handleExecuteBuilderOption(sctx, preparedObj)

// Just setting `e.SnapshotTS`, `e.ReadReplicaScope` and `e.IsStaleness` with the return value of `handleExecuteBuilderOption`
// for asserting the stale read context after refactoring is exactly the same with the previous logic.
snapshotTS, readReplicaScope, isStaleness, err := handleExecuteBuilderOption(sctx, preparedObj)
if err != nil {
return err
}
if isStaleness {
is, err = domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
e.SnapshotTS = snapshotTS
e.ReadReplicaScope = readReplicaScope
e.IsStaleness = isStaleness

failpoint.Inject("assertStaleReadForOptimizePreparedPlan", func() {
if isStaleness != sctx.GetSessionVars().StmtCtx.IsStaleness {
panic(fmt.Sprintf("%v != %v", isStaleness, sctx.GetSessionVars().StmtCtx.IsStaleness))
}
sctx.GetSessionVars().StmtCtx.IsStaleness = true
}

if isStaleness {
is2, err := domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
panic(err)
}

if is.SchemaMetaVersion() != is2.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", is.SchemaMetaVersion(), is2.SchemaMetaVersion()))
}
}
})
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
Expand Down Expand Up @@ -311,14 +331,12 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
if err != nil {
return err
}
e.SnapshotTS = snapshotTS
e.ReadReplicaScope = readReplicaScope
e.IsStaleness = isStaleness
e.Stmt = prepared.Stmt
return nil
}

func (e *Execute) handleExecuteBuilderOption(sctx sessionctx.Context,
// Deprecated: it will be removed later. Now it is only used for asserting
func handleExecuteBuilderOption(sctx sessionctx.Context,
preparedObj *CachedPrepareStmt) (snapshotTS uint64, readReplicaScope string, isStaleness bool, err error) {
snapshotTS = 0
readReplicaScope = oracle.GlobalTxnScope
Expand Down