Skip to content

Commit

Permalink
Revert "*: make prepare stmt retriable when schema is out-dated. (#4669
Browse files Browse the repository at this point in the history
…)"

This reverts commit 3dfbf52.
  • Loading branch information
tiancaiamao committed Sep 28, 2017
1 parent f451688 commit 45bba87
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
4 changes: 2 additions & 2 deletions ast/ast.go
Expand Up @@ -174,8 +174,8 @@ type Statement interface {
// Exec executes SQL and gets a Recordset.
Exec(ctx context.Context) (RecordSet, error)

// AstNode return the ast nodes for retry.
AstNode() StmtNode
// IsPrepared returns whether this statement is prepared statement.
IsPrepared() bool
}

// Visitor visits a Node.
Expand Down
17 changes: 9 additions & 8 deletions executor/adapter.go
Expand Up @@ -93,20 +93,20 @@ func (a *recordSet) Close() error {
type statement struct {
is infoschema.InfoSchema // The InfoSchema cannot change during execution, so we hold a reference to it.

ctx context.Context
text string
plan plan.Plan
startTime time.Time
expensive bool
astNode ast.StmtNode
ctx context.Context
text string
plan plan.Plan
startTime time.Time
isPreparedStmt bool
expensive bool
}

func (a *statement) OriginText() string {
return a.text
}

func (a *statement) AstNode() ast.StmtNode {
return a.astNode
func (a *statement) IsPrepared() bool {
return a.isPreparedStmt
}

// Exec implements the ast.Statement Exec interface.
Expand Down Expand Up @@ -248,6 +248,7 @@ func (a *statement) buildExecutor(ctx context.Context) (Executor, error) {
return nil, errors.Trace(err)
}
a.text = executorExec.Stmt.Text()
a.isPreparedStmt = true
a.plan = executorExec.Plan
e = executorExec.StmtExec
}
Expand Down
1 change: 0 additions & 1 deletion executor/compiler.go
Expand Up @@ -49,7 +49,6 @@ func (c *Compiler) Compile(ctx context.Context, node ast.StmtNode) (ast.Statemen
is: is,
plan: p,
text: node.Text(),
astNode: node,
expensive: isExpensive,
}
return sa, nil
Expand Down
11 changes: 6 additions & 5 deletions new_session_test.go
Expand Up @@ -899,11 +899,12 @@ func (s *testSchemaSuite) TestPrepareStmtCommitWhenSchemaChanged(c *C) {
tk1.MustExec("execute stmt using @a, @a")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk.MustExec("alter table t drop column b")
tk1.MustExec("execute stmt using @a, @a")
_, err := tk1.Exec("commit")
c.Assert(terror.ErrorEqual(err, executor.ErrWrongValueCountOnRow), IsTrue, Commentf("err %v", err))
// TODO: PrepareStmt should handle this.
//tk1.MustExec("begin")
//tk.MustExec("alter table t drop column b")
//tk1.MustExec("execute stmt using @a, @a")
//_, err := tk1.Exec("commit")
//c.Assert(terror.ErrorEqual(err, executor.ErrWrongValueCountOnRow), IsTrue, Commentf("err %v", err))
}

func (s *testSchemaSuite) TestCommitWhenSchemaChanged(c *C) {
Expand Down
29 changes: 19 additions & 10 deletions session.go
Expand Up @@ -403,8 +403,9 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
s.sessionVars.RetryInfo.ResetOffset()
for i, sr := range nh.history {
st := sr.st
txt := st.OriginText()
if infoSchemaChanged {
st, err = updateStatement(st, s)
st, err = updateStatement(st, s, txt)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -413,7 +414,6 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
if retryCnt == 0 {
// We do not have to log the query every time.
// We print the queries at the first try only.
txt := st.OriginText()
log.Warnf("[%d] Retry [%d] query [%d] %s", connID, retryCnt, i, sqlForLog(txt))
} else {
log.Warnf("[%d] Retry [%d] query [%d]", connID, retryCnt, i)
Expand Down Expand Up @@ -449,16 +449,25 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
return err
}

func updateStatement(st ast.Statement, s *session) (ast.Statement, error) {
func updateStatement(st ast.Statement, s *session, txt string) (ast.Statement, error) {
// statement maybe stale because of infoschema changed, this function will return the updated one.
// Rebuild plan if infoschema changed, reuse the statement otherwise.
resultSt, err := Compile(s, st.AstNode())
if err != nil {
// If a txn is inserting data when DDL is dropping column,
// it would fail to commit and retry, and run here then.
return resultSt, errors.Trace(err)
if st.IsPrepared() {
// TODO: Rebuild plan if infoschema changed, reuse the statement otherwise.
} else {
// Rebuild plan if infoschema changed, reuse the statement otherwise.
charset, collation := s.sessionVars.GetCharsetInfo()
stmt, err := s.parser.ParseOneStmt(txt, charset, collation)
if err != nil {
return st, errors.Trace(err)
}
st, err = Compile(s, stmt)
if err != nil {
// If a txn is inserting data when DDL is dropping column,
// it would fail to commit and retry, and run here then.
return st, errors.Trace(err)
}
}
return resultSt, nil
return st, nil
}

func sqlForLog(sql string) string {
Expand Down

0 comments on commit 45bba87

Please sign in to comment.