Skip to content

Commit

Permalink
Address review comments: replace -start_stopped by -auto_start
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 9, 2021
1 parent 52c965f commit b50442d
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 45 deletions.
14 changes: 7 additions & 7 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1884,7 +1884,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets")

startStopped := subFlags.Bool("start_stopped", false, "Do not start streams: streams start off in the Stopped state")
autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
Expand All @@ -1900,7 +1900,7 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
source := strings.Split(subFlags.Arg(1), ",")
target := strings.Split(subFlags.Arg(2), ",")
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells,
*tabletTypes, *startStopped, *stopAfterCopy)
*tabletTypes, *autoStart, *stopAfterCopy)
}

func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand All @@ -1916,7 +1916,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")

startStopped := subFlags.Bool("start_stopped", false, "Do not start streams: streams start off in the Stopped state")
autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
Expand All @@ -1942,7 +1942,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
target := subFlags.Arg(1)
tableSpecs := subFlags.Arg(2)
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables,
*excludes, *startStopped, *stopAfterCopy)
*excludes, *autoStart, *stopAfterCopy)
}

// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
Expand All @@ -1969,7 +1969,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")

startStopped := subFlags.Bool("start_stopped", false, "Do not start streams: streams start off in the Stopped state")
autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

// MoveTables-only params
Expand Down Expand Up @@ -2009,7 +2009,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
TargetKeyspace: target,
Workflow: workflow,
DryRun: *dryRun,
StartStopped: *startStopped,
AutoStart: *autoStart,
StopAfterCopy: *stopAfterCopy,
}

Expand Down Expand Up @@ -2170,7 +2170,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if err != nil {
return err
}
if *startStopped {
if !*autoStart {
wr.Logger().Printf("Workflow has been created in Stopped state\n")
break
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (

// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypes string, allTables bool, excludeTables string, startStopped, stopAfterCopy bool) error {
cell, tabletTypes string, allTables bool, excludeTables string, autoRun, stopAfterCopy bool) error {
//FIXME validate tableSpecs, allTables, excludeTables
var tables []string
var err error
Expand Down Expand Up @@ -223,10 +223,10 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
workflow, targetKeyspace)
return fmt.Errorf(msg)
}
if !startStopped {
if autoRun {
return mz.startStreams(ctx)
}
wr.Logger().Infof("Streams will not be started since -start_stopped is specified")
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")

return nil
}
Expand Down
16 changes: 8 additions & 8 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestMigrateTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", false, false)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -103,11 +103,11 @@ func TestMissingTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", false, false)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", false, false)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", false, false)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, false, false)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false)
require.NoError(t, err)
require.EqualValues(t, tcase.want, targetTables(env))
})
Expand Down Expand Up @@ -195,9 +195,9 @@ func TestMoveTablesStopFlags(t *testing.T) {

env.tmc.expectVRQuery(200, insert, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
// -start_stopped is tested by NOT expecting the update query which sets state to RUNNING
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", true, true)
"", false, "", false, true)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
Expand All @@ -223,7 +223,7 @@ func TestMigrateVSchema(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", false, false)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type refStream struct {

// Reshard initiates a resharding workflow.
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string,
skipSchemaCopy bool, cell, tabletTypes string, startStopped, stopAfterCopy bool) error {
skipSchemaCopy bool, cell, tabletTypes string, autoRun, stopAfterCopy bool) error {
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}
Expand All @@ -81,12 +81,12 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
return vterrors.Wrap(err, "createStreams")
}

if !startStopped {
if autoRun {
if err := rs.startStreams(ctx); err != nil {
return vterrors.Wrap(err, "startStreams")
}
} else {
wr.Logger().Infof("Streams will not be started since -start_stopped is specified")
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")
}
return nil
}
Expand Down

0 comments on commit b50442d

Please sign in to comment.