Skip to content

Commit

Permalink
Merge pull request #5456 from planetscale/ss-vrepl-workflow-name
Browse files Browse the repository at this point in the history
vrepl: use <keyspace.workflow>
  • Loading branch information
sougou committed Nov 24, 2019
2 parents 3b31583 + 771220e commit 7983603
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ var commands = []commandGroup{
"<from_keyspace> <to_keyspace> <tables>",
"Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"},
{"VDiff", commandVDiff,
"-workflow=<workflow> <target keyspace> [-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=REPLICA] [-filtered_replication_wait_time=30s]",
"[-source_cell=<cell>] [-target_cell=<cell>] [-tablet_types=replica] [-filtered_replication_wait_time=30s] <keyspace.workflow>",
"Perform a diff of all tables in the workflow"},
{"MigrateServedTypes", commandMigrateServedTypes,
"[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] <keyspace/shard> <served tablet type>",
Expand All @@ -323,10 +323,10 @@ var commands = []commandGroup{
"[-cells=c1,c2,...] [-reverse] <destination keyspace/shard> <served tablet type>",
"Makes the <destination keyspace/shard> serve the given type. This command also rebuilds the serving graph."},
{"MigrateReads", commandMigrateReads,
"[-cells=c1,c2,...] [-reverse] -workflow=workflow <target keyspace> <tablet type>",
"[-cells=c1,c2,...] [-reverse] -tablet_type={replica|rdonly} <keyspace.workflow>",
"Migrate read traffic for the specified workflow."},
{"MigrateWrites", commandMigrateWrites,
"[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] -workflow=workflow <target keyspace>",
"[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] <keyspace.workflow>",
"Migrate write traffic for the specified workflow."},
{"CancelResharding", commandCancelResharding,
"<keyspace/shard>",
Expand Down Expand Up @@ -1811,24 +1811,35 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl
}

func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
sourceCell := subFlags.String("source_cell", "", "The source cell to compare from")
targetCell := subFlags.String("target_cell", "", "The target cell to compare with")
tabletTypes := subFlags.String("tablet_types", "", "Tablet types for source and target")
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
if err := subFlags.Parse(args); err != nil {
return err
}

if subFlags.NArg() != 1 {
return fmt.Errorf("the <target keyspace> is required")
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}

targetKeyspace := subFlags.Arg(0)
_, err := wr.VDiff(ctx, targetKeyspace, *workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime,
_, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime,
*HealthCheckTopologyRefresh, *HealthcheckRetryDelay, *HealthCheckTimeout)
return err
}

func splitKeyspaceWorkflow(in string) (keyspace, workflow string, err error) {
splits := strings.Split(in, ".")
if len(splits) != 2 {
return "", "", fmt.Errorf("invalid format for <keyspace.workflow>: %s", in)
}
return splits[0], splits[1], nil
}

func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
Expand Down Expand Up @@ -1889,16 +1900,15 @@ func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFla
func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.")
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
tabletType := subFlags.String("tablet_type", "", "Tablet type (replica or rdonly)")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("the <target keyspace> and <tablet type> arguments are required for the MigrateReads command")
}

keyspace := subFlags.Arg(0)
servedType, err := parseTabletType(subFlags.Arg(2), []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY})
if *tabletType == "" {
return fmt.Errorf("-tablet_type must be specified")
}
servedType, err := parseTabletType(*tabletType, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY})
if err != nil {
return err
}
Expand All @@ -1910,29 +1920,34 @@ func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *f
if *reverse {
direction = wrangler.DirectionBackward
}
if *workflow == "" {
return fmt.Errorf("a -workflow=workflow argument is required")
if subFlags.NArg() != 1 {
return fmt.Errorf("<keyspace.workflow> is required")
}
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}
return wr.MigrateReads(ctx, keyspace, *workflow, servedType, cells, direction)

return wr.MigrateReads(ctx, keyspace, workflow, servedType, cells, direction)
}

func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
cancelMigrate := subFlags.Bool("cancel", false, "Cancel the failed migration and serve from source")
workflow := subFlags.String("workflow", "", "Specifies the workflow name")
if err := subFlags.Parse(args); err != nil {
return err
}

if subFlags.NArg() != 1 {
return fmt.Errorf("the <target keyspace> argument is required for the MigrateWrites command")
return fmt.Errorf("<keyspace.workflow> is required")
}

keyspace := subFlags.Arg(0)
if *workflow == "" {
return fmt.Errorf("a -workflow=workflow argument is required")
keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
return err
}
journalID, err := wr.MigrateWrites(ctx, keyspace, *workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication)

journalID, err := wr.MigrateWrites(ctx, keyspace, workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication)
if err != nil {
return err
}
Expand Down

0 comments on commit 7983603

Please sign in to comment.