Skip to content

Commit

Permalink
VReplication: Improve MoveTables Create Error Handling (#13737)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 21, 2023
1 parent a43e283 commit 221e92d
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 81 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st

// WaitForAuthoritative waits for a table to become authoritative
func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error {
timeout := time.After(10 * time.Second)
timeout := time.After(60 * time.Second)
for {
select {
case <-timeout:
Expand Down
5 changes: 0 additions & 5 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr
if err != nil {
return err
}
if mz.isPartial {
if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil {
return err
}
}
if err := mz.deploySchema(); err != nil {
return err
}
Expand Down
102 changes: 68 additions & 34 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/sqlerror"

Expand Down Expand Up @@ -947,7 +948,7 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI
// MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface.
// It passes the embedded TabletRequest object to the given keyspace's
// target primary tablets that will be executing the workflow.
func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error) {
func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (res *vtctldatapb.WorkflowStatusResponse, err error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesCreate")
defer span.Finish()

Expand All @@ -964,7 +965,6 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
tables = req.IncludeTables
externalTopo *topo.Server
sourceTopo *topo.Server = s.ts
err error
)

// When the source is an external cluster mounted using the Mount command.
Expand All @@ -978,6 +978,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}

var vschema *vschemapb.Keyspace
var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create
vschema, err = s.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1020,43 +1021,14 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace)
if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return nil, err
}
}
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
ms := &vtctldatapb.MaterializeSettings{
Workflow: req.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES,
Expand Down Expand Up @@ -1101,6 +1073,68 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}

// If we get an error after this point, where the vreplication streams/records
// have been created, then we clean up the workflow's artifacts.
defer func() {
if err != nil {
ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow)
if cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
return
}
if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil {
err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr)
}
}
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing
// rules fails to save, we may generate duplicate table errors.
if mz.isPartial {
if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil {
return nil, err
}
}

rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}

if ms.SourceTimeZone != "" {
if err := mz.checkTZConversion(ctx, ms.SourceTimeZone); err != nil {
return nil, err
Expand Down
186 changes: 185 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"runtime/debug"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -30,6 +31,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/vindexes"

Expand Down Expand Up @@ -64,6 +66,8 @@ const (
stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1"
getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`"
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
)

var (
Expand Down Expand Up @@ -313,7 +317,7 @@ func TestMoveTables(t *testing.T) {
),
fmt.Sprintf("1|%s", bls),
), nil)
ftc.vrdbClient.ExpectRequest(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:200' where id=1`, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult(
Expand Down Expand Up @@ -568,3 +572,183 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
})
}
}

// TestFailedMoveTablesCreateCleanup tests that the workflow
// and its artifacts are cleaned up when the workflow creation
// fails -- specifically after the point where we have created
// the workflow streams.
func TestFailedMoveTablesCreateCleanup(t *testing.T) {
ctx := context.Background()
sourceKs := "sourceks"
sourceTabletUID := 200
shard := "0"
targetTabletUID := 300
targetKs := "targetks"
wf := "testwf"
table := defaultSchema.TableDefinitions[0].Name
invalidTimeZone := "NOPE"
bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}}",
sourceKs, shard, table, table)
tenv := newTestEnv(t, sourceKs, []string{shard})
defer tenv.close()
ws := workflow.NewServer(tenv.ts, tenv.tmc)

sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard)
defer tenv.deleteTablet(sourceTablet.tablet)
targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard)
defer tenv.deleteTablet(targetTablet.tablet)

tenv.mysqld.Schema = defaultSchema
tenv.mysqld.Schema.DatabaseSchema = tenv.dbName
tenv.mysqld.FetchSuperQueryMap = make(map[string]*sqltypes.Result)
tenv.mysqld.FetchSuperQueryMap[`select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where .*`] = sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"character_set_name|collation_name|column_name|data_type|column_type|extra",
"varchar|varchar|varchar|varchar|varchar|varchar",
),
"NULL|NULL|id|bigint|bigint|",
"NULL|NULL|c2|bigint|bigint|",
)

// Let's be sure that the routing rules are empty to start.
err := topotools.SaveRoutingRules(ctx, tenv.ts, nil)
require.NoError(t, err, "failed to save routing rules")

tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
),
)
targetTablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(
fmt.Sprintf("%s %s",
insertVReplicationPrefix,
fmt.Sprintf(`values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}} source_time_zone:\"%s\" target_time_zone:\"UTC\"', '', 0, 0, '%s', 'primary', now(), 0, 'Stopped', '%s', 1, 0, 0)`,
wf, sourceKs, shard, table, table, invalidTimeZone, strings.Join(tenv.cells, ","), tenv.dbName),
),
&sqltypes.Result{
RowsAffected: 1,
InsertID: 1,
},
nil,
)
targetTablet.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getVReplicationRecord,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source",
"int64|varchar",
),
fmt.Sprintf("1|%s", bls),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID),
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
"varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64",
),
fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"count(distinct table_name)",
"int64",
),
"1",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
"varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64",
),
fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"count(distinct table_name)",
"int64",
),
"1",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"@@binlog_row_image",
"varchar",
),
"FULL",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil)

tenv.tmc.setVReplicationExecResults(targetTablet.tablet,
fmt.Sprintf("select convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
fmt.Sprintf("convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone),
"datetime",
),
"NULL",
),
)

// We expect the workflow creation to fail due to the invalid time
// zone and thus the workflow iteslf to be cleaned up.
tenv.tmc.setVReplicationExecResults(sourceTablet.tablet,
fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)),
&sqltypes.Result{RowsAffected: 1},
)
tenv.tmc.setVReplicationExecResults(targetTablet.tablet,
fmt.Sprintf(deleteWorkflow, targetKs, wf),
&sqltypes.Result{RowsAffected: 1},
)

// Save the current target vschema.
vs, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")

_, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: wf,
SourceKeyspace: sourceKs,
TargetKeyspace: targetKs,
Cells: tenv.cells,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY},
IncludeTables: []string{table},
SourceTimeZone: invalidTimeZone,
})
require.ErrorContains(t, err, fmt.Sprintf("unable to perform time_zone conversions from %s to UTC", invalidTimeZone))

// Check that there are no orphaned routing rules.
rules, err := topotools.GetRoutingRules(ctx, tenv.ts)
require.NoError(t, err, "failed to get routing rules")
require.Equal(t, 0, len(rules), "expected no routing rules to be present")

// Check that our vschema changes were also rolled back.
vs2, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")
require.Equal(t, vs, vs2, "expected vschema to be unchanged")
}
Loading

0 comments on commit 221e92d

Please sign in to comment.