Skip to content

Commit

Permalink
Online DDL: support migration cut-over backoff and forced cut-over (v…
Browse files Browse the repository at this point in the history
…itessio#14546)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
2 people authored and ejortegau committed Dec 13, 2023
1 parent 764d70c commit bebddba
Show file tree
Hide file tree
Showing 33 changed files with 12,832 additions and 10,484 deletions.
33 changes: 33 additions & 0 deletions go/cmd/vtctldclient/command/onlineddl.go
Expand Up @@ -102,6 +102,14 @@ var (
Args: cobra.ExactArgs(2),
RunE: commandOnlineDDLUnthrottle,
}
OnlineDDLForceCutOver = &cobra.Command{
Use: "force-cutover <keyspace> <uuid|all>",
Short: "Mark a given schema migration, or all pending migrations, for forced cut over.",
Example: "OnlineDDL force-cutover test_keyspace 82fa54ac_e83e_11ea_96b7_f875a4d24e90",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(2),
RunE: commandOnlineDDLForceCutOver,
}
OnlineDDLShow = &cobra.Command{
Use: "show",
Short: "Display information about online DDL operations.",
Expand Down Expand Up @@ -184,6 +192,30 @@ func commandOnlineDDLCleanup(cmd *cobra.Command, args []string) error {
return nil
}

func commandOnlineDDLForceCutOver(cmd *cobra.Command, args []string) error {
keyspace, uuid, err := analyzeOnlineDDLCommandWithUuidOrAllArgument(cmd)
if err != nil {
return err
}
cli.FinishedParsing(cmd)

resp, err := client.ForceCutOverSchemaMigration(commandCtx, &vtctldatapb.ForceCutOverSchemaMigrationRequest{
Keyspace: keyspace,
Uuid: uuid,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)
return nil
}

func commandOnlineDDLComplete(cmd *cobra.Command, args []string) error {
keyspace, uuid, err := analyzeOnlineDDLCommandWithUuidOrAllArgument(cmd)
if err != nil {
Expand Down Expand Up @@ -393,6 +425,7 @@ func init() {
OnlineDDL.AddCommand(OnlineDDLRetry)
OnlineDDL.AddCommand(OnlineDDLThrottle)
OnlineDDL.AddCommand(OnlineDDLUnthrottle)
OnlineDDL.AddCommand(OnlineDDLForceCutOver)

OnlineDDLShow.Flags().BoolVar(&onlineDDLShowArgs.JSON, "json", false, "Output JSON instead of human-readable table.")
OnlineDDLShow.Flags().StringVar(&onlineDDLShowArgs.OrderStr, "order", "asc", "Sort the results by `id` property of the Schema migration.")
Expand Down
1 change: 1 addition & 0 deletions go/mysql/flavor.go
Expand Up @@ -56,6 +56,7 @@ const (
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability
)

const (
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/flavor_mysql.go
Expand Up @@ -423,6 +423,8 @@ func (mysqlFlavor80) supportsCapability(serverVersion string, capability FlavorC
return ServerVersionAtLeast(serverVersion, 8, 0, 21)
case CheckConstraintsCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 16)
case PerformanceSchemaDataLocksTableCapability:
return true, nil
default:
return false, nil
}
Expand Down
10 changes: 10 additions & 0 deletions go/mysql/flavor_test.go
Expand Up @@ -170,6 +170,16 @@ func TestGetFlavor(t *testing.T) {
capability: CheckConstraintsCapability,
isCapable: true,
},
{
version: "5.7.38",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.20",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: true,
},
}
for _, tc := range testcases {
name := fmt.Sprintf("%s %v", tc.version, tc.capability)
Expand Down
21 changes: 11 additions & 10 deletions go/test/endtoend/cluster/vttablet_process.go
Expand Up @@ -449,11 +449,7 @@ func (vttablet *VttabletProcess) CreateDB(keyspace string) error {

// QueryTablet lets you execute a query in this tablet and get the result
func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useDb bool) (*sqltypes.Result, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
conn, err := vttablet.conn(&dbParams)
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return nil, err
}
Expand All @@ -464,11 +460,7 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
// QueryTabletMultiple lets you execute multiple queries -- without any
// results -- against the tablet.
func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
conn, err := vttablet.conn(&dbParams)
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return err
}
Expand All @@ -484,6 +476,15 @@ func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace
return nil
}

// TabletConn opens a MySQL connection on this tablet
func (vttablet *VttabletProcess) TabletConn(keyspace string, useDb bool) (*mysql.Conn, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return vttablet.conn(&dbParams)
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
157 changes: 157 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Expand Up @@ -200,6 +200,29 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) {
}
}

func waitForMessage(t *testing.T, uuid string, messageSubstring string) {
ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row.AsString("message", "")
if strings.Contains(message, messageSubstring) {
return
}
}
select {
case <-ticker.C:
case <-ctx.Done():
}
require.NoError(t, ctx.Err())
}
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -366,6 +389,9 @@ func testScheduler(t *testing.T) {
alterNonexistent = `
ALTER TABLE nonexistent FORCE
`
populateT1Statement = `
insert into t1_test values (1, 'new_row')
`
)

testReadTimestamp := func(t *testing.T, uuid string, timestampColumn string) (timestamp string) {
Expand Down Expand Up @@ -490,6 +516,109 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
})

t.Run("Postpone completion ALTER", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
t.Run("check postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(1), postponeCompletion)
}
})
t.Run("complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("check no postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(0), postponeCompletion)
}
})
})

forceCutoverCapable, err := capableOf(mysql.PerformanceSchemaDataLocksTableCapability) // 8.0
require.NoError(t, err)
if forceCutoverCapable {
t.Run("force_cutover", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*2)
defer cancel()

t.Run("populate t1_test", func(t *testing.T) {
onlineddl.VtgateExecQuery(t, &vtParams, populateT1Statement, "")
})
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
commitTransactionChan := make(chan any)
transactionErrorChan := make(chan error)
t.Run("locking table rows", func(t *testing.T) {
go runInTransaction(t, ctx, shards[0].Vttablets[0], "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
})
t.Run("check no force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(0), forceCutOver) // disabled
}
})
t.Run("attempt to complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
})
t.Run("cut-over fail due to timeout", func(t *testing.T) {
waitForMessage(t, t1uuid, "due to context deadline exceeded")
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
})
t.Run("force_cutover", func(t *testing.T) {
onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true)
})
t.Run("check force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(1), forceCutOver) // enabled
}
})
t.Run("expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("expect transaction failure", func(t *testing.T) {
select {
case commitTransactionChan <- true: //good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
// Transaction will now attempt to commit. But we expect our "force_cutover" to have terminated
// the transaction's connection.
select {
case err := <-transactionErrorChan:
assert.ErrorContains(t, err, "broken pipe")
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
})
})
}
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
Expand Down Expand Up @@ -2400,3 +2529,31 @@ func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName s
statement = queryResult.Rows[0][1].ToString()
return statement
}

func runInTransaction(t *testing.T, ctx context.Context, tablet *cluster.Vttablet, query string, commitTransactionChan chan any, transactionErrorChan chan error) error {
conn, err := tablet.VttabletProcess.TabletConn(keyspaceName, true)
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch("begin", 0, false)
require.NoError(t, err)

_, err = conn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

if commitTransactionChan != nil {
// Wait for instruction to commit
select {
case <-commitTransactionChan:
// good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
}

_, err = conn.ExecuteFetch("commit", 0, false)
if transactionErrorChan != nil {
transactionErrorChan <- err
}
return err
}
15 changes: 15 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Expand Up @@ -206,6 +206,21 @@ func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}
}

// CheckForceMigrationCutOver marks a migration for forced cut-over, and expects success by counting affected rows.
func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectPossible bool) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a force_cutover",
sqltypes.StringBindVariable(uuid),
)
require.NoError(t, err)
r := VtgateExecQuery(t, vtParams, query, "")

if expectPossible {
assert.Equal(t, len(shards), int(r.RowsAffected))
} else {
assert.Equal(t, int(0), int(r.RowsAffected))
}
}

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool {
query, err := sqlparser.ParseAndBind("show vitess_migrations like %a",
Expand Down

0 comments on commit bebddba

Please sign in to comment.