Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session, util: update session to use new APIs #22652

Merged
merged 14 commits into from
Feb 19, 2021
144 changes: 72 additions & 72 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ var (

func checkBootstrapped(s Session) (bool, error) {
// Check if system db exists.
_, err := s.Execute(context.Background(), fmt.Sprintf("USE %s;", mysql.SystemDB))
_, err := s.ExecuteInternal(context.Background(), "USE %n", mysql.SystemDB)
if err != nil && infoschema.ErrDatabaseNotExists.NotEqual(err) {
logutil.BgLogger().Fatal("check bootstrap error",
zap.Error(err))
Expand All @@ -565,20 +565,21 @@ func checkBootstrapped(s Session) (bool, error) {
// getTiDBVar gets variable value from mysql.tidb table.
// Those variables are used by TiDB server.
func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
sql := fmt.Sprintf(`SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s"`,
mysql.SystemDB, mysql.TiDBTable, name)
ctx := context.Background()
rs, err := s.Execute(ctx, sql)
rs, err := s.ExecuteInternal(ctx, `SELECT HIGH_PRIORITY VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME= %?`,
mysql.SystemDB,
mysql.TiDBTable,
name,
)
if err != nil {
return "", true, errors.Trace(err)
}
if len(rs) != 1 {
if rs == nil {
return "", true, errors.New("Wrong number of Recordset")
}
r := rs[0]
defer terror.Call(r.Close)
req := r.NewChunk()
err = r.Next(ctx, req)
defer terror.Call(rs.Close)
req := rs.NewChunk()
err = rs.Next(ctx, req)
if err != nil || req.NumRows() == 0 {
return "", true, errors.Trace(err)
}
Expand All @@ -604,7 +605,7 @@ func upgrade(s Session) {
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")
_, err = s.ExecuteInternal(context.Background(), "COMMIT")

if err != nil {
sleepTime := 1 * time.Second
Expand Down Expand Up @@ -651,18 +652,15 @@ func upgradeToVer3(s Session, ver int64) {
return
}
// Version 3 fix tx_read_only variable value.
sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %s.%s SET variable_value = '0' WHERE variable_name = 'tx_read_only';",
mysql.SystemDB, mysql.GlobalVariablesTable)
mustExecute(s, sql)
mustExecute(s, "UPDATE HIGH_PRIORITY %n.%n SET variable_value = '0' WHERE variable_name = 'tx_read_only';", mysql.SystemDB, mysql.GlobalVariablesTable)
}

// upgradeToVer4 updates to version 4.
func upgradeToVer4(s Session, ver int64) {
if ver >= version4 {
return
}
sql := CreateStatsMetaTable
mustExecute(s, sql)
mustExecute(s, CreateStatsMetaTable)
}

func upgradeToVer5(s Session, ver int64) {
Expand Down Expand Up @@ -696,7 +694,7 @@ func upgradeToVer8(s Session, ver int64) {
return
}
// This is a dummy upgrade, it checks whether upgradeToVer7 success, if not, do it again.
if _, err := s.Execute(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil {
if _, err := s.ExecuteInternal(context.Background(), "SELECT HIGH_PRIORITY `Process_priv` FROM mysql.user LIMIT 0"); err == nil {
return
}
upgradeToVer7(s, ver)
Expand All @@ -712,7 +710,7 @@ func upgradeToVer9(s Session, ver int64) {
}

func doReentrantDDL(s Session, sql string, ignorableErrs ...error) {
_, err := s.Execute(context.Background(), sql)
_, err := s.ExecuteInternal(context.Background(), sql)
for _, ignorableErr := range ignorableErrs {
if terror.ErrorEqual(err, ignorableErr) {
return
Expand All @@ -738,7 +736,7 @@ func upgradeToVer11(s Session, ver int64) {
if ver >= version11 {
return
}
_, err := s.Execute(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`")
_, err := s.ExecuteInternal(context.Background(), "ALTER TABLE mysql.user ADD COLUMN `References_priv` ENUM('N','Y') CHARACTER SET utf8 NOT NULL DEFAULT 'N' AFTER `Grant_priv`")
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
return
Expand All @@ -753,21 +751,20 @@ func upgradeToVer12(s Session, ver int64) {
return
}
ctx := context.Background()
_, err := s.Execute(ctx, "BEGIN")
_, err := s.ExecuteInternal(ctx, "BEGIN")
terror.MustNil(err)
sql := "SELECT HIGH_PRIORITY user, host, password FROM mysql.user WHERE password != ''"
rs, err := s.Execute(ctx, sql)
rs, err := s.ExecuteInternal(ctx, sql)
if terror.ErrorEqual(err, core.ErrUnknownColumn) {
sql := "SELECT HIGH_PRIORITY user, host, authentication_string FROM mysql.user WHERE authentication_string != ''"
rs, err = s.Execute(ctx, sql)
rs, err = s.ExecuteInternal(ctx, sql)
}
terror.MustNil(err)
r := rs[0]
sqls := make([]string, 0, 1)
defer terror.Call(r.Close)
req := r.NewChunk()
defer terror.Call(rs.Close)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
for err == nil && req.NumRows() != 0 {
for row := it.Begin(); row != it.End(); row = it.Next() {
user := row.GetString(0)
Expand All @@ -779,7 +776,7 @@ func upgradeToVer12(s Session, ver int64) {
updateSQL := fmt.Sprintf(`UPDATE HIGH_PRIORITY mysql.user SET password = "%s" WHERE user="%s" AND host="%s"`, newPass, user, host)
sqls = append(sqls, updateSQL)
}
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
}
terror.MustNil(err)

Expand Down Expand Up @@ -809,7 +806,7 @@ func upgradeToVer13(s Session, ver int64) {
}
ctx := context.Background()
for _, sql := range sqls {
_, err := s.Execute(ctx, sql)
_, err := s.ExecuteInternal(ctx, sql)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
continue
Expand Down Expand Up @@ -838,7 +835,7 @@ func upgradeToVer14(s Session, ver int64) {
}
ctx := context.Background()
for _, sql := range sqls {
_, err := s.Execute(ctx, sql)
_, err := s.ExecuteInternal(ctx, sql)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrColumnExists) {
continue
Expand All @@ -853,7 +850,7 @@ func upgradeToVer15(s Session, ver int64) {
return
}
var err error
_, err = s.Execute(context.Background(), CreateGCDeleteRangeTable)
_, err = s.ExecuteInternal(context.Background(), CreateGCDeleteRangeTable)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer15 error", zap.Error(err))
}
Expand Down Expand Up @@ -923,9 +920,13 @@ func upgradeToVer23(s Session, ver int64) {

// writeSystemTZ writes system timezone info into mysql.tidb
func writeSystemTZ(s Session) {
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%s", "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`,
mysql.SystemDB, mysql.TiDBTable, tidbSystemTZ, timeutil.InferSystemTZ(), timeutil.InferSystemTZ())
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Global System Timezone.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
mysql.SystemDB,
mysql.TiDBTable,
tidbSystemTZ,
timeutil.InferSystemTZ(),
timeutil.InferSystemTZ(),
)
}

// upgradeToVer24 initializes `System` timezone according to docs/design/2018-09-10-adding-tz-env.md
Expand Down Expand Up @@ -1054,7 +1055,7 @@ func upgradeToVer38(s Session, ver int64) {
return
}
var err error
_, err = s.Execute(context.Background(), CreateGlobalPrivTable)
_, err = s.ExecuteInternal(context.Background(), CreateGlobalPrivTable)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer38 error", zap.Error(err))
}
Expand All @@ -1066,9 +1067,9 @@ func writeNewCollationParameter(s Session, flag bool) {
if flag {
b = varTrue
}
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`,
mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbNewCollationEnabled, b, comment, b,
)
}

func upgradeToVer40(s Session, ver int64) {
Expand Down Expand Up @@ -1104,14 +1105,14 @@ func upgradeToVer42(s Session, ver int64) {

// Convert statement summary global variables to non-empty values.
func writeStmtSummaryVars(s Session) {
sql := fmt.Sprintf("UPDATE %s.%s SET variable_value='%%s' WHERE variable_name='%%s' AND variable_value=''", mysql.SystemDB, mysql.GlobalVariablesTable)
sql := "UPDATE %n.%n SET variable_value= %? WHERE variable_name= %? AND variable_value=''"
stmtSummaryConfig := config.GetGlobalConfig().StmtSummary
mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary))
mustExecute(s, fmt.Sprintf(sql, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery))
mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval))
mustExecute(s, fmt.Sprintf(sql, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize))
mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount))
mustExecute(s, fmt.Sprintf(sql, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength))
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.Enable), variable.TiDBEnableStmtSummary)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, variable.BoolToOnOff(stmtSummaryConfig.EnableInternalQuery), variable.TiDBStmtSummaryInternalQuery)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.RefreshInterval), variable.TiDBStmtSummaryRefreshInterval)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.Itoa(stmtSummaryConfig.HistorySize), variable.TiDBStmtSummaryHistorySize)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxStmtCount), 10), variable.TiDBStmtSummaryMaxStmtCount)
mustExecute(s, sql, mysql.SystemDB, mysql.GlobalVariablesTable, strconv.FormatUint(uint64(stmtSummaryConfig.MaxSQLLength), 10), variable.TiDBStmtSummaryMaxSQLLength)
}

func upgradeToVer43(s Session, ver int64) {
Expand Down Expand Up @@ -1219,13 +1220,12 @@ func upgradeToVer55(s Session, ver int64) {

selectSQL := "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(names, quoteCommaQuote) + "')"
ctx := context.Background()
rs, err := s.Execute(ctx, selectSQL)
rs, err := s.ExecuteInternal(ctx, selectSQL)
terror.MustNil(err)
r := rs[0]
defer terror.Call(r.Close)
req := r.NewChunk()
defer terror.Call(rs.Close)
req := rs.NewChunk()
it := chunk.NewIterator4Chunk(req)
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
for err == nil && req.NumRows() != 0 {
for row := it.Begin(); row != it.End(); row = it.Next() {
n := strings.ToLower(row.GetString(0))
Expand All @@ -1234,7 +1234,7 @@ func upgradeToVer55(s Session, ver int64) {
return
}
}
err = r.Next(ctx, req)
err = rs.Next(ctx, req)
}
terror.MustNil(err)

Expand Down Expand Up @@ -1270,9 +1270,9 @@ func initBindInfoTable(s Session) {
}

func insertBuiltinBindInfoRow(s Session) {
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`,
bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES (%?, %?, "mysql", %?, "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", %?)`,
bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin,
)
}

func upgradeToVer59(s Session, ver int64) {
Expand Down Expand Up @@ -1400,9 +1400,9 @@ func updateBindInfo(iter *chunk.Iterator4Chunk, p *parser.Parser, bindMap map[st

func writeMemoryQuotaQuery(s Session) {
comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x+"
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`,
mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbDefMemoryQuotaQuery, 32<<30, comment, 32<<30,
)
}

func upgradeToVer62(s Session, ver int64) {
Expand Down Expand Up @@ -1431,17 +1431,17 @@ func upgradeToVer64(s Session, ver int64) {

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%s', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%s'`,
mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
mysql.SystemDB, mysql.TiDBTable, tidbDefOOMAction, config.OOMActionLog, comment, config.OOMActionLog,
)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", "%d", "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%d"`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion,
)
}

// getBootstrapVersion gets bootstrap version from mysql.tidb table;
Expand All @@ -1461,7 +1461,7 @@ func doDDLWorks(s Session) {
// Create a test database.
mustExecute(s, "CREATE DATABASE IF NOT EXISTS test")
// Create system db.
mustExecute(s, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", mysql.SystemDB))
mustExecute(s, "CREATE DATABASE IF NOT EXISTS %n", mysql.SystemDB)
// Create user table.
mustExecute(s, CreateUserTable)
// Create privilege tables.
Expand Down Expand Up @@ -1507,6 +1507,7 @@ func doDDLWorks(s Session) {

// doDMLWorks executes DML statements in bootstrap stage.
// All the statements run in a single transaction.
// TODO: sanitize.
func doDMLWorks(s Session) {
mustExecute(s, "BEGIN")

Expand Down Expand Up @@ -1548,14 +1549,13 @@ func doDMLWorks(s Session) {
strings.Join(values, ", "))
mustExecute(s, sql)

sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%s", "Bootstrap flag. Do not delete.")
ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap flag. Do not delete.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE=%?`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, varTrue, varTrue,
)

sql = fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES("%s", "%d", "Bootstrap version. Do not delete.")`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion)
mustExecute(s, sql)
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, "Bootstrap version. Do not delete.")`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion,
)

writeSystemTZ(s)

Expand All @@ -1565,7 +1565,7 @@ func doDMLWorks(s Session) {

writeStmtSummaryVars(s)

_, err := s.Execute(context.Background(), "COMMIT")
_, err := s.ExecuteInternal(context.Background(), "COMMIT")
if err != nil {
sleepTime := 1 * time.Second
logutil.BgLogger().Info("doDMLWorks failed", zap.Error(err), zap.Duration("sleeping time", sleepTime))
Expand All @@ -1582,8 +1582,8 @@ func doDMLWorks(s Session) {
}
}

func mustExecute(s Session, sql string) {
_, err := s.ExecuteInternal(context.Background(), sql)
func mustExecute(s Session, sql string, args ...interface{}) {
_, err := s.ExecuteInternal(context.Background(), sql, args...)
if err != nil {
debug.PrintStack()
logutil.BgLogger().Fatal("mustExecute error", zap.Error(err))
Expand Down
Loading