Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveTables/Reshard: add flags to start workflows in a stopped state and to stop workflow once copy is completed #7449

Merged
merged 6 commits into from
Feb 11, 2021
Merged
25 changes: 23 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,10 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1895,7 +1899,8 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
}
source := strings.Split(subFlags.Arg(1), ",")
target := strings.Split(subFlags.Arg(2), ",")
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells, *tabletTypes)
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells,
*tabletTypes, *autoStart, *stopAfterCopy)
}

func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand All @@ -1911,6 +1916,9 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1933,7 +1941,8 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
source := subFlags.Arg(0)
target := subFlags.Arg(1)
tableSpecs := subFlags.Arg(2)
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables, *excludes)
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables,
*excludes, *autoStart, *stopAfterCopy)
}

// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
Expand All @@ -1960,12 +1969,17 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

// MoveTables-only params
sourceKeyspace := subFlags.String("source", "", "Source keyspace")
tables := subFlags.String("tables", "", "A table spec or a list of tables")
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")
renameTables := subFlags.Bool("rename_tables", false, "Rename tables instead of dropping them")

// Reshard-only params
sourceShards := subFlags.String("source_shards", "", "Source shards")
targetShards := subFlags.String("target_shards", "", "Target shards")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to target shards")
Expand Down Expand Up @@ -1995,6 +2009,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
TargetKeyspace: target,
Workflow: workflow,
DryRun: *dryRun,
AutoStart: *autoStart,
StopAfterCopy: *stopAfterCopy,
}

printDetails := func() error {
Expand Down Expand Up @@ -2154,7 +2170,12 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if err != nil {
return err
}
if !*autoStart {
wr.Logger().Printf("Workflow has been created in Stopped state\n")
break
}
wr.Logger().Printf("Waiting for workflow to start:\n")

type streamCount struct {
total, running int64
}
Expand Down
10 changes: 8 additions & 2 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (

// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypes string, allTables bool, excludeTables string) error {
cell, tabletTypes string, allTables bool, excludeTables string, autoRun, stopAfterCopy bool) error {
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
//FIXME validate tableSpecs, allTables, excludeTables
var tables []string
var err error
Expand Down Expand Up @@ -186,6 +186,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
TargetKeyspace: targetKeyspace,
Cell: cell,
TabletTypes: tabletTypes,
StopAfterCopy: stopAfterCopy,
}
for _, table := range tables {
buf := sqlparser.NewTrackedBuffer(nil)
Expand Down Expand Up @@ -222,7 +223,12 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
workflow, targetKeyspace)
return fmt.Errorf(msg)
}
return mz.startStreams(ctx)
if autoRun {
return mz.startStreams(ctx)
}
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")

return nil
}

func (wr *Wrangler) validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error {
Expand Down
46 changes: 38 additions & 8 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"sort"
"strings"
"testing"

"context"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -63,7 +62,7 @@ func TestMigrateTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -104,11 +103,11 @@ func TestMissingTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -164,7 +163,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false)
require.NoError(t, err)
require.EqualValues(t, tcase.want, targetTables(env))
})
Expand All @@ -173,6 +172,37 @@ func TestMoveTablesAllAndExclude(t *testing.T) {

}

func TestMoveTablesStopFlags(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}

ctx := context.Background()
var err error
t.Run("StopStartedAndStopAfterCopyFlags", func(t *testing.T) {
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
// insert expects flag stop_after_copy to be true
insert := `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values .*stop_after_copy:true.*`

env.tmc.expectVRQuery(200, insert, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
}

func TestMigrateVSchema(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
Expand All @@ -193,7 +223,7 @@ func TestMigrateVSchema(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down
22 changes: 15 additions & 7 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type resharder struct {
refStreams map[string]*refStream
cell string //single cell or cellsAlias or comma-separated list of cells/cellsAliases
tabletTypes string
stopAfterCopy bool
}

type refStream struct {
Expand All @@ -61,14 +62,16 @@ type refStream struct {
}

// Reshard initiates a resharding workflow.
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool, cell, tabletTypes string) error {
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string,
skipSchemaCopy bool, cell, tabletTypes string, autoRun, stopAfterCopy bool) error {
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}
rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes)
if err != nil {
return vterrors.Wrap(err, "buildResharder")
}
rs.stopAfterCopy = stopAfterCopy
if !skipSchemaCopy {
if err := rs.copySchema(ctx); err != nil {
return vterrors.Wrap(err, "copySchema")
Expand All @@ -77,10 +80,14 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
if err := rs.createStreams(ctx); err != nil {
return vterrors.Wrap(err, "createStreams")
}
if err := rs.startStreams(ctx); err != nil {
return vterrors.Wrap(err, "startStream")
}

if autoRun {
if err := rs.startStreams(ctx); err != nil {
return vterrors.Wrap(err, "startStreams")
}
} else {
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")
}
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -301,9 +308,10 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}),
}
bls := &binlogdatapb.BinlogSource{
Keyspace: rs.keyspace,
Shard: source.ShardName(),
Filter: filter,
Keyspace: rs.keyspace,
Shard: source.ShardName(),
Filter: filter,
StopAfterCopy: rs.stopAfterCopy,
}
ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes)
}
Expand Down