diff --git a/go/test/endtoend/onlineddl/onlineddl_test.go b/go/test/endtoend/onlineddl/onlineddl_test.go index bc80ee466b3..a961a19209f 100644 --- a/go/test/endtoend/onlineddl/onlineddl_test.go +++ b/go/test/endtoend/onlineddl/onlineddl_test.go @@ -48,33 +48,41 @@ var ( ddlStrategyUnchanged = "-" createTable = ` CREATE TABLE %s ( - id BIGINT(20) not NULL, - msg varchar(64), - PRIMARY KEY (id) + id bigint(20) NOT NULL, + msg varchar(64), + PRIMARY KEY (id) ) ENGINE=InnoDB;` // To verify non online-DDL behavior alterTableNormalStatement = ` ALTER TABLE %s - ADD COLUMN non_online INT UNSIGNED NOT NULL` + ADD COLUMN non_online int UNSIGNED NOT NULL` // A trivial statement which must succeed and does not change the schema alterTableTrivialStatement = ` ALTER TABLE %s - ENGINE=InnoDB` + ENGINE=InnoDB` // The following statement is valid alterTableSuccessfulStatement = ` ALTER TABLE %s - MODIFY id BIGINT UNSIGNED NOT NULL, - ADD COLUMN ghost_col INT NOT NULL, - ADD INDEX idx_msg(msg)` + MODIFY id bigint UNSIGNED NOT NULL, + ADD COLUMN ghost_col int NOT NULL, + ADD INDEX idx_msg(msg)` // The following statement will fail because gh-ost requires some shared unique key alterTableFailedStatement = ` ALTER TABLE %s - DROP PRIMARY KEY, - DROP COLUMN ghost_col` + DROP PRIMARY KEY, + DROP COLUMN ghost_col` // We will run this query with "gh-ost --max-load=Threads_running=1" alterTableThrottlingStatement = ` ALTER TABLE %s - DROP COLUMN ghost_col` + DROP COLUMN ghost_col` + onlineDDLCreateTableStatement = ` + CREATE TABLE %s ( + id bigint NOT NULL, + online_ddl_create_col INT NOT NULL, + PRIMARY KEY (id) + ) ENGINE=InnoDB;` + onlineDDLDropTableStatement = ` + DROP TABLE %s` ) func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp { @@ -156,33 +164,45 @@ func TestSchemaChange(t *testing.T) { assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards)) testWithInitialSchema(t) { - _ = testAlterTable(t, alterTableNormalStatement, string(schema.DDLStrategyNormal), "vtctl", "non_online") + _ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyNormal), "vtctl", "non_online") } { - uuid := testAlterTable(t, alterTableSuccessfulStatement, ddlStrategyUnchanged, "vtgate", "ghost_col") + uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, ddlStrategyUnchanged, "vtgate", "ghost_col") checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) checkCancelMigration(t, uuid, false) checkRetryMigration(t, uuid, false) } { - uuid := testAlterTable(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col") + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col") checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) checkCancelMigration(t, uuid, false) checkRetryMigration(t, uuid, false) } { - uuid := testAlterTable(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col") + uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col") checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning) checkCancelMigration(t, uuid, true) time.Sleep(2 * time.Second) checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) } { - uuid := testAlterTable(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col") + uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col") checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) checkCancelMigration(t, uuid, false) checkRetryMigration(t, uuid, true) } + { + uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "") + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkCancelMigration(t, uuid, false) + checkRetryMigration(t, uuid, false) + } + { + uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col") + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkCancelMigration(t, uuid, false) + checkRetryMigration(t, uuid, false) + } } func testWithInitialSchema(t *testing.T) { @@ -198,8 +218,8 @@ func testWithInitialSchema(t *testing.T) { checkTables(t, totalTableCount) } -// testAlterTable runs an online DDL, ALTER statement -func testAlterTable(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectColumn string) (uuid string) { +// testOnlineDDLStatement runs an online DDL, ALTER statement +func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectColumn string) (uuid string) { tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3) sqlQuery := fmt.Sprintf(alterStatement, tableName) if executeStrategy == "vtgate" { @@ -224,7 +244,9 @@ func testAlterTable(t *testing.T, alterStatement string, ddlStrategy string, exe time.Sleep(time.Second * 20) } - checkMigratedTable(t, tableName, expectColumn) + if expectColumn != "" { + checkMigratedTable(t, tableName, expectColumn) + } return uuid } diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 5e3f54bbb89..793027c1922 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -157,9 +157,6 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(ddl sqlparser.DDLStatement) (isOnl if ddl == nil { return false, strategy, options } - if ddl.GetAction() != sqlparser.AlterDDLAction { - return false, strategy, options - } strategy, options, _ = schema.ParseDDLStrategy(exec.ddlStrategy) if strategy != schema.DDLStrategyNormal { return true, strategy, options @@ -260,7 +257,14 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute tableName := "" switch ddl := stat.(type) { case sqlparser.DDLStatement: - tableName = ddl.GetTable().Name.String() + switch ddl.GetAction() { + case sqlparser.DropDDLAction: + // TODO (shlomi): break into distinct per-table DROP statements; on a future PR where + // we implement lazy DROP TABLE on Online DDL + tableName = ddl.GetFromTables()[0].Name.String() + default: + tableName = ddl.GetTable().Name.String() + } isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl) } exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy) diff --git a/go/vt/schemamanager/tablet_executor_test.go b/go/vt/schemamanager/tablet_executor_test.go index af0e2924634..7caae418247 100644 --- a/go/vt/schemamanager/tablet_executor_test.go +++ b/go/vt/schemamanager/tablet_executor_test.go @@ -236,7 +236,8 @@ func TestIsOnlineSchemaDDL(t *testing.T) { { query: "CREATE TABLE t(id int)", ddlStrategy: "gh-ost", - isOnlineDDL: false, + isOnlineDDL: true, + strategy: schema.DDLStrategyGhost, }, { query: "ALTER TABLE t ADD COLUMN i INT", diff --git a/go/vt/vtctld/api.go b/go/vt/vtctld/api.go index 01306867f68..6b5570dbf67 100644 --- a/go/vt/vtctld/api.go +++ b/go/vt/vtctld/api.go @@ -615,6 +615,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re req := struct { Keyspace, SQL string ReplicaTimeoutSeconds int + DDLStrategy string `json:"ddl_strategy,omitempty"` }{} if err := unmarshalRequest(r, &req); err != nil { return fmt.Errorf("can't unmarshal request: %v", err) @@ -635,6 +636,10 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re requestContext := fmt.Sprintf("vtctld/api:%s", apiCallUUID) executor := schemamanager.NewTabletExecutor(requestContext, wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second) + if err := executor.SetDDLStrategy(req.DDLStrategy); err != nil { + return fmt.Errorf("error setting DDL strategy: %v", err) + } + return schemamanager.Run(ctx, schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor) }) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 99d2691e049..08d450f6cff 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -94,10 +94,12 @@ var emptyResult = &sqltypes.Result{ var ghostOverridePath = flag.String("gh-ost-path", "", "override default gh-ost binary full path") var ptOSCOverridePath = flag.String("pt-osc-path", "", "override default pt-online-schema-change binary full path") var migrationCheckInterval = flag.Duration("migration_check_interval", 1*time.Minute, "Interval between migration checks") +var migrationNextCheckInterval = 5 * time.Second const ( - maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters - staleMigrationMinutes = 10 + maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters + staleMigrationMinutes = 10 + progressPctFull float64 = 100.0 ) var ( @@ -118,10 +120,11 @@ type Executor struct { shard string dbName string - initMutex sync.Mutex - migrationMutex sync.Mutex - migrationRunning int64 - lastMigrationUUID string + initMutex sync.Mutex + migrationMutex sync.Mutex + migrationRunning int64 + lastMigrationUUID string + tickReentranceFlag int64 ticks *timer.Timer isOpen bool @@ -222,6 +225,7 @@ func (e *Executor) Open() error { } e.pool.Open(e.env.Config().DB.AppWithDB(), e.env.Config().DB.DbaWithDB(), e.env.Config().DB.AppDebugWithDB()) e.ticks.Start(e.onMigrationCheckTick) + e.triggerNextCheckInterval() if _, err := sqlparser.QueryMatchesTemplates("select 1 from dual", vexecUpdateTemplates); err != nil { // this validates vexecUpdateTemplates @@ -246,6 +250,11 @@ func (e *Executor) Close() { e.isOpen = false } +// triggerNextCheckInterval the next tick sooner than normal +func (e *Executor) triggerNextCheckInterval() { + e.ticks.TriggerAfter(migrationNextCheckInterval) +} + func (e *Executor) ghostPanicFlagFileName(uuid string) string { return path.Join(os.TempDir(), fmt.Sprintf("ghost.%s.panic.flag", uuid)) } @@ -350,6 +359,25 @@ func (e *Executor) tableExists(ctx context.Context, tableName string) (bool, err return (row != nil), nil } +// executeDirectly runs a DDL query directly on the backend MySQL server +func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL) error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB()) + if err != nil { + return err + } + defer conn.Close() + + if _, err := conn.ExecuteFetch(onlineDDL.SQL, 0, false); err != nil { + return err + } + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + + return nil +} + // ExecuteWithGhost validates and runs a gh-ost process. // Validation included testing the backend MySQL server and the gh-ost binary itself // Execution runs first a dry run, then an actual migration @@ -931,6 +959,53 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error { return err } +func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { + failMigration := func(err error) error { + _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) + return err + } + + stmt, err := sqlparser.Parse(onlineDDL.SQL) + if err != nil { + return failMigration(fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", onlineDDL.SQL, err)) + } + switch stmt := stmt.(type) { + case sqlparser.DDLStatement: + switch stmt.GetAction() { + case sqlparser.CreateDDLAction, sqlparser.DropDDLAction: + go func() { + if err := e.executeDirectly(ctx, onlineDDL); err != nil { + failMigration(err) + } + }() + case sqlparser.AlterDDLAction: + switch onlineDDL.Strategy { + case schema.DDLStrategyGhost: + go func() { + if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { + failMigration(err) + } + }() + case schema.DDLStrategyPTOSC: + go func() { + if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { + failMigration(err) + } + }() + default: + { + return failMigration(fmt.Errorf("Unsupported strategy: %+v", onlineDDL.Strategy)) + } + } + } + default: + { + return failMigration(fmt.Errorf("Unsupported query type: %+v", onlineDDL.SQL)) + } + } + return nil +} + func (e *Executor) runNextMigration(ctx context.Context) error { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() @@ -956,25 +1031,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error { Options: row["options"].ToString(), Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), } - switch onlineDDL.Strategy { - case schema.DDLStrategyGhost: - go func() { - if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { - _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) - } - }() - case schema.DDLStrategyPTOSC: - go func() { - if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { - _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) - } - }() - default: - { - _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) - return fmt.Errorf("Unsupported strategy: %+v", onlineDDL.Strategy) - } - } + e.executeMigration(ctx, onlineDDL) // the query should only ever return a single row at the most // but let's make it also explicit here that we only run a single migration if i == 0 { @@ -1195,6 +1252,20 @@ func (e *Executor) gcArtifacts(ctx context.Context) error { // onMigrationCheckTick runs all migrations life cycle func (e *Executor) onMigrationCheckTick() { + // This function can be called by multiple triggers. First, there's the normal ticker. + // Then, any time a migration completes, we set a timer to trigger this function. + // also, any time a new INSERT arrives, we set a timer to trigger this function. + // Some of these may be correlated. To avoid spamming of this function we: + // - ensure the function is non-reentrant, using tickReentranceFlag + // - clean up tickReentranceFlag 1 second after function completes; this throttles calls to + // this function at no more than 1/sec rate. + if atomic.CompareAndSwapInt64(&e.tickReentranceFlag, 0, 1) { + defer time.AfterFunc(time.Second, func() { atomic.StoreInt64(&e.tickReentranceFlag, 0) }) + } else { + // An instance of this function is already running + return + } + if e.tabletTypeFunc() != topodatapb.TabletType_MASTER { return } @@ -1202,6 +1273,7 @@ func (e *Executor) onMigrationCheckTick() { log.Errorf("Executor.onMigrationCheckTick(): empty keyspace") return } + ctx := context.Background() if err := e.initSchema(ctx); err != nil { log.Error(err) @@ -1363,15 +1435,8 @@ func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result return result, err } -// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks. -func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) { - status := schema.OnlineDDLStatus(statusParam) - dryRun := (dryrunParam == "true") - var progressPct float64 - if pct, err := strconv.ParseFloat(progressParam, 32); err == nil { - progressPct = pct - } - +// onSchemaMigrationStatus is called when a status is set/changed for a running migration +func (e *Executor) onSchemaMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus, dryRun bool, progressPct float64) (err error) { if dryRun && status != schema.OnlineDDLStatusFailed { // We don't consider dry-run reports unless there's a failure return nil @@ -1379,37 +1444,57 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statu switch status { case schema.OnlineDDLStatusReady: { - err = e.updateMigrationTimestamp(ctx, "ready_timestamp", uuidParam) + err = e.updateMigrationTimestamp(ctx, "ready_timestamp", uuid) } case schema.OnlineDDLStatusRunning: { - _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) - err = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuidParam) + _ = e.updateMigrationStartedTimestamp(ctx, uuid) + err = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) } case schema.OnlineDDLStatusComplete: { - _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) - err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuidParam) + progressPct = progressPctFull + _ = e.updateMigrationStartedTimestamp(ctx, uuid) + err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuid) } case schema.OnlineDDLStatusFailed: { - _ = e.updateMigrationStartedTimestamp(ctx, uuidParam) - err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuidParam) + _ = e.updateMigrationStartedTimestamp(ctx, uuid) + err = e.updateMigrationTimestamp(ctx, "completed_timestamp", uuid) } } if err != nil { return err } - if err = e.updateMigrationStatus(ctx, uuidParam, status); err != nil { + if err = e.updateMigrationStatus(ctx, uuid, status); err != nil { return err } - if err = e.updateMigrationProgress(ctx, uuidParam, progressPct); err != nil { + if err = e.updateMigrationProgress(ctx, uuid, progressPct); err != nil { return err } + if !dryRun { + switch status { + case schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed: + e.triggerNextCheckInterval() + } + } + return nil } +// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks. +func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) { + status := schema.OnlineDDLStatus(statusParam) + dryRun := (dryrunParam == "true") + var progressPct float64 + if pct, err := strconv.ParseFloat(progressParam, 32); err == nil { + progressPct = pct + } + + return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct) +} + // VExec is called by a VExec invocation func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *querypb.QueryResult, err error) { response := func(result *sqltypes.Result, err error) (*querypb.QueryResult, error) { @@ -1437,6 +1522,7 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp vx.ReplaceInsertColumnVal("shard", vx.ToStringVal(e.shard)) vx.ReplaceInsertColumnVal("mysql_schema", vx.ToStringVal(e.dbName)) vx.AddOrReplaceInsertColumnVal("tablet", vx.ToStringVal(e.TabletAliasString())) + e.triggerNextCheckInterval() return response(e.execQuery(ctx, vx.Query)) case *sqlparser.Update: match, err := sqlparser.QueryMatchesTemplates(vx.Query, vexecUpdateTemplates)