Skip to content

Commit

Permalink
Merge pull request #6901 from planetscale/online-ddl-schema-updates
Browse files Browse the repository at this point in the history
Online DDL: followups in multiple trajectories
  • Loading branch information
shlomi-noach committed Nov 8, 2020
2 parents d524269 + 2984d16 commit c02c53a
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 50 deletions.
2 changes: 2 additions & 0 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type OnlineDDL struct {
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
}

// FromJSON creates an OnlineDDL from json
Expand Down
111 changes: 93 additions & 18 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -110,6 +111,7 @@ type Executor struct {
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
tabletAlias *topodatapb.TabletAlias

keyspace string
shard string
Expand Down Expand Up @@ -143,9 +145,10 @@ func PTOSCFileName() (fileName string, isOverride bool) {
}

// NewExecutor creates a new gh-ost executor.
func NewExecutor(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Executor {
func NewExecutor(env tabletenv.Env, tabletAlias topodatapb.TabletAlias, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType) *Executor {
return &Executor{
env: env,
env: env,
tabletAlias: &tabletAlias,

pool: connpool.NewPool(env, "ExecutorPool", tabletenv.ConnPoolConfig{
Size: 1,
Expand All @@ -168,6 +171,11 @@ func (e *Executor) execQuery(ctx context.Context, query string) (result *sqltype
return conn.Exec(ctx, query, math.MaxInt32, true)
}

// TabletAliasString returns tablet alias as string (duh)
func (e *Executor) TabletAliasString() string {
return topoproto.TabletAliasString(e.tabletAlias)
}

func (e *Executor) initSchema(ctx context.Context) error {
e.initMutex.Lock()
defer e.initMutex.Unlock()
Expand Down Expand Up @@ -403,7 +411,7 @@ export ONLINE_DDL_PASSWORD
}
onHookContent := func(status schema.OnlineDDLStatus) string {
return fmt.Sprintf(`#!/bin/bash
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"
curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"
`, *servenv.Port, onlineDDL.UUID, string(status))
}
if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning)); err != nil {
Expand Down Expand Up @@ -486,7 +494,7 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr
fmt.Sprintf("--serve-socket-file=%s", serveSocketFile),
fmt.Sprintf("--hooks-path=%s", tempDir),
fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?p=low`, *servenv.Port),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=online-ddl:gh-ost:%s&p=low`, *servenv.Port, onlineDDL.UUID),
fmt.Sprintf(`--database=%s`, e.dbName),
fmt.Sprintf(`--table=%s`, onlineDDL.Table),
fmt.Sprintf(`--alter=%s`, alterOptions),
Expand Down Expand Up @@ -621,7 +629,7 @@ export MYSQL_PWD
my ($self, %args) = @_;
return sub {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?p=low")) {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app=online-ddl:pt-osc:{{MIGRATION_UUID}}&p=low")) {
# Got HTTP 200 OK, means throttler is happy
return 0;
} else {
Expand Down Expand Up @@ -785,14 +793,16 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
return nil, ErrMigrationNotFound
}
onlineDDL = &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: sqlparser.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: sqlparser.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
}
return onlineDDL, nil
}
Expand Down Expand Up @@ -1042,7 +1052,6 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
return countRunnning, err
}
for _, row := range r.Named().Rows {
// A pt-osc UUID is found which claims to be 'running'. Is it?
uuid := row["migration_uuid"].ToString()
// Since pt-osc doesn't have a "liveness" plugin entry point, we do it externally:
// if the process is alive, we update the `liveness_timestamp` for this migration.
Expand Down Expand Up @@ -1086,6 +1095,12 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
return err
}
}
if onlineDDL.TabletAlias != e.TabletAliasString() {
// This means another tablet started the migration, and the migration has failed due to the tablet failure (e.g. master failover)
if err := e.updateTabletFailure(ctx, onlineDDL.UUID); err != nil {
return err
}
}
if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed); err != nil {
return err
}
Expand All @@ -1094,6 +1109,13 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error {
return nil
}

// retryTabletFailureMigrations looks for migrations failed by tablet failure (e.g. by failover)
// and retry them (put them back in the queue)
func (e *Executor) retryTabletFailureMigrations(ctx context.Context) error {
_, err := e.retryMigration(ctx, sqlWhereTabletFailure)
return err
}

// gcArtifacts garbage-collects migration artifacts from completed/failed migrations
func (e *Executor) gcArtifactTable(ctx context.Context, artifactTable string) error {
tableExists, err := e.tableExists(ctx, artifactTable)
Expand Down Expand Up @@ -1156,11 +1178,13 @@ func (e *Executor) onMigrationCheckTick() {
return
}
ctx := context.Background()

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return
}
if err := e.retryTabletFailureMigrations(ctx); err != nil {
log.Error(err)
}
if err := e.scheduleNextMigration(ctx); err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -1244,6 +1268,22 @@ func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts .
return err
}

// updateTabletFailure marks a given migration as "tablet_failed"
func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateTabletFailure, "_vt",
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
_, err = e.execQuery(ctx, bound)
return err
}

func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error {
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus, "_vt",
":migration_status",
Expand All @@ -1261,18 +1301,49 @@ func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, statu
return err
}

func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error {
if progress <= 0 {
// progress starts at 0, and can only increase.
// A value of "0" either means "This is the actual current progress" or "No information"
// In both cases there's nothing to update
return nil
}
parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationProgress, "_vt",
":migration_progress",
":migration_uuid",
)
bindVars := map[string]*querypb.BindVariable{
"migration_progress": sqltypes.Float64BindVariable(progress),
"migration_uuid": sqltypes.StringBindVariable(uuid),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
_, err = e.execQuery(ctx, bound)
return err
}

func (e *Executor) retryMigration(ctx context.Context, whereExpr string) (result *sqltypes.Result, err error) {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, "_vt", whereExpr)
result, err = e.execQuery(ctx, parsed.Query)
parsed := sqlparser.BuildParsedQuery(sqlRetryMigration, "_vt", ":tablet", whereExpr)
bindVars := map[string]*querypb.BindVariable{
"tablet": sqltypes.StringBindVariable(e.TabletAliasString()),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
result, err = e.execQuery(ctx, bound)
return result, err
}

// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks.
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam string) (err error) {
func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statusParam, dryrunParam, progressParam string) (err error) {
status := schema.OnlineDDLStatus(statusParam)
dryRun := (dryrunParam == "true")
var progressPct float64
if pct, err := strconv.ParseFloat(progressParam, 32); err == nil {
progressPct = pct
}

if dryRun && status != schema.OnlineDDLStatusFailed {
// We don't consider dry-run reports unless there's a failure
Expand Down Expand Up @@ -1305,6 +1376,9 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, uuidParam, statu
if err = e.updateMigrationStatus(ctx, uuidParam, status); err != nil {
return err
}
if err = e.updateMigrationProgress(ctx, uuidParam, progressPct); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -1335,6 +1409,7 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
// We can fill them in.
vx.ReplaceInsertColumnVal("shard", vx.ToStringVal(e.shard))
vx.ReplaceInsertColumnVal("mysql_schema", vx.ToStringVal(e.dbName))
vx.AddOrReplaceInsertColumnVal("tablet", vx.ToStringVal(e.TabletAliasString()))
return response(e.execQuery(ctx, vx.Query))
case *sqlparser.Update:
match, err := sqlparser.QueryMatchesTemplates(vx.Query, vexecUpdateTemplates)
Expand Down
47 changes: 42 additions & 5 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ const (
artifacts varchar(1024) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uuid_idx (migration_uuid),
KEY keyspace_shard_idx (keyspace,shard),
KEY keyspace_shard_idx (keyspace(64),shard(64)),
KEY status_idx (migration_status, liveness_timestamp),
KEY cleanup_status_idx (cleanup_timestamp, migration_status)
) engine=InnoDB DEFAULT CHARSET=utf8mb4`
alterSchemaMigrationsTableRetries = "ALTER TABLE %s.schema_migrations add column retries int unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTablet = "ALTER TABLE %s.schema_migrations add column tablet varchar(128) NOT NULL DEFAULT ''"
alterSchemaMigrationsTableArtifacts = "ALTER TABLE %s.schema_migrations modify artifacts TEXT NOT NULL"
alterSchemaMigrationsTableTabletFailure = "ALTER TABLE %s.schema_migrations add column tablet_failure tinyint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTabletFailureIndex = "ALTER TABLE %s.schema_migrations add KEY tablet_failure_idx (tablet_failure, migration_status, retries)"
alterSchemaMigrationsTableProgress = "ALTER TABLE %s.schema_migrations add column progress float NOT NULL DEFAULT 0"

sqlScheduleSingleMigration = `UPDATE %s.schema_migrations
SET
migration_status='ready',
Expand All @@ -65,6 +72,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationProgress = `UPDATE %s.schema_migrations
SET progress=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationStartedTimestamp = `UPDATE %s.schema_migrations
SET started_timestamp=IFNULL(started_timestamp, NOW())
WHERE
Expand All @@ -81,22 +93,36 @@ const (
migration_uuid=%a
`
sqlUpdateArtifacts = `UPDATE %s.schema_migrations
SET artifacts=%a
SET artifacts=concat(%a, ',', artifacts)
WHERE
migration_uuid=%a
`
sqlUpdateTabletFailure = `UPDATE %s.schema_migrations
SET tablet_failure=1
WHERE
migration_uuid=%a
`
sqlRetryMigration = `UPDATE %s.schema_migrations
SET
migration_status='queued',
tablet=%a,
retries=retries + 1,
tablet_failure=0,
ready_timestamp=NULL,
started_timestamp=NULL,
liveness_timestamp=NULL,
completed_timestamp=NULL
completed_timestamp=NULL,
cleanup_timestamp=NULL
WHERE
migration_status IN ('failed', 'cancelled')
AND (%s)
LIMIT 1
`
sqlWhereTabletFailure = `
tablet_failure=1
AND migration_status='failed'
AND retries=0
`
sqlSelectRunningMigrations = `SELECT
migration_uuid
FROM %s.schema_migrations
Expand Down Expand Up @@ -141,7 +167,9 @@ const (
liveness_timestamp,
completed_timestamp,
migration_status,
log_path
log_path,
retries,
tablet
FROM %s.schema_migrations
WHERE
migration_uuid=%a
Expand All @@ -161,7 +189,10 @@ const (
started_timestamp,
liveness_timestamp,
completed_timestamp,
migration_status
migration_status,
log_path,
retries,
tablet
FROM %s.schema_migrations
WHERE
migration_status='ready'
Expand Down Expand Up @@ -201,4 +232,10 @@ var (
var applyDDL = []string{
fmt.Sprintf(sqlCreateSidecarDB, "_vt"),
fmt.Sprintf(sqlCreateSchemaMigrationsTable, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableRetries, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTablet, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableArtifacts, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailure, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableTabletFailureIndex, "_vt"),
fmt.Sprintf(alterSchemaMigrationsTableProgress, "_vt"),
}
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
}
return tsv.sm.Target().TabletType
}
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, topoServer, tabletTypeFunc)
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tabletTypeFunc)
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tabletTypeFunc, tsv.lagThrottler)

Expand Down Expand Up @@ -1528,7 +1528,8 @@ func (tsv *TabletServer) registerTwopczHandler() {
func (tsv *TabletServer) registerMigrationStatusHandler() {
tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) {
ctx := tabletenv.LocalContext()
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, r.URL.Query().Get("uuid"), r.URL.Query().Get("status"), r.URL.Query().Get("dryrun")); err != nil {
query := r.URL.Query()
if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress")); err != nil {
http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError)
return
}
Expand Down
Loading

0 comments on commit c02c53a

Please sign in to comment.