Skip to content

Commit

Permalink
VReplication: Improve workflow cancel/delete (#15977)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 12, 2024
1 parent 8cf7f31 commit 6d8bb74
Show file tree
Hide file tree
Showing 13 changed files with 1,452 additions and 227 deletions.
10 changes: 8 additions & 2 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type VttabletProcess struct {
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbName string
DbPassword string
DbPort int
DbFlavor string
Expand Down Expand Up @@ -148,6 +149,8 @@ func (vttablet *VttabletProcess) Setup() (err error) {
return
}

vttablet.DbName = "vt_" + vttablet.Keyspace

vttablet.exit = make(chan error)
go func() {
if vttablet.proc != nil {
Expand Down Expand Up @@ -442,8 +445,11 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro

// CreateDB creates the database for keyspace
func (vttablet *VttabletProcess) CreateDB(keyspace string) error {
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS vt_%s", keyspace), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace), keyspace, false)
if vttablet.DbName == "" {
vttablet.DbName = "vt_" + keyspace
}
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS %s", vttablet.DbName), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS %s", vttablet.DbName), keyspace, false)
return err
}

Expand Down
39 changes: 15 additions & 24 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout ti
select {
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v",
query, timeout, qr.Rows))
query, timeout, qr))
case <-ticker.C:
log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick)
}
Expand Down Expand Up @@ -147,19 +147,6 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri
return qr
}

func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result {
if strings.TrimSpace(query) == "" {
return nil
}
if database != "" {
execQuery(t, conn, "use `"+database+"`;")
}
execQuery(t, conn, "begin")
qr := execQueryWithRetry(t, conn, query, timeout)
execQuery(t, conn, "commit")
return qr
}

func checkHealth(t *testing.T, url string) bool {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -516,20 +503,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
require.NotEmpty(t, output)
gotDryRun := strings.Split(output, "\n")
require.True(t, len(gotDryRun) > 3)
startRow := 3
if strings.Contains(gotDryRun[0], "deprecated") {
var startRow int
if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient
startRow = 3
} else if strings.Contains(gotDryRun[0], "deprecated") {
startRow = 4
} else {
startRow = 2
}
gotDryRun = gotDryRun[startRow : len(gotDryRun)-1]
if len(want) != len(gotDryRun) {
t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
}
var match, fail bool
fail = false
for i, w := range want {
w = strings.TrimSpace(w)
g := strings.TrimSpace(gotDryRun[i])
if w[0] == '/' {
if len(w) > 0 && w[0] == '/' {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
Expand All @@ -538,11 +529,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

Expand Down Expand Up @@ -578,7 +569,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
t.Fatalf("%v %v", err, output)
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
Expand All @@ -602,8 +593,8 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
// confirmAllStreamsRunning confirms that all of the workflow's streams are
// in the running state.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
Expand Down Expand Up @@ -801,7 +792,7 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool

func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
query := fmt.Sprintf("select count(*) from %s", table)
qr := execVtgateQuery(t, vtgateConn, "", query)
qr := execQuery(t, vtgateConn, query)
numRows, _ := qr.Rows[0][0].ToInt()
return numRows
}
Expand Down
51 changes: 32 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")

expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
Expand Down Expand Up @@ -605,7 +606,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)

// Add cell alias containing only zone2
result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias")
result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias")
require.NoError(t, err, "command failed with output: %v", result)

verifyClusterHealth(t, vc)
Expand Down Expand Up @@ -722,10 +723,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
require.NoError(t, err)
defer dbc.Close()
if res := execQuery(t, dbc, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, dbc, tablet.DbName, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
dec80Replicated = true
}
}
Expand Down Expand Up @@ -833,7 +837,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, workflowType, ksWorkflow, true)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow)
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
Expand Down Expand Up @@ -942,7 +946,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
var err error

for _, shard := range strings.Split("-80,80-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
if err == nil {
t.Fatal("GetShard merchant:-80 failed")
}
Expand All @@ -951,7 +955,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
Expand Down Expand Up @@ -1400,7 +1404,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) {
waitDuration := 500 * time.Millisecond
duration := maxWait
for duration > 0 {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow)
require.NoError(t, err)
lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")

Expand Down Expand Up @@ -1483,7 +1487,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
}

func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace)
require.NoError(t, err)
}

Expand All @@ -1494,19 +1498,24 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--tablet-types=rdonly,replica", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--dry-run")
if err == nil {
return
}
Expand All @@ -1532,11 +1541,13 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=rdonly")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica",
command, ksWorkflow)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=replica")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

Expand Down Expand Up @@ -1575,8 +1586,10 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run",
"SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
}
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,28 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for up to 30s",
"Create reverse replication workflow p2c_reverse",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Switch writes completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
"Start reverse replication streams on:",
" tablet 100 ",
"Mark vreplication streams frozen on:",
" Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer",
" Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer",
"Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]",
"Start reverse vreplication streams on: [tablet:100]",
"Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]",
"Unlock keyspace customer",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsSwitchWritesM2m3 = []string{
Expand Down
Loading

0 comments on commit 6d8bb74

Please sign in to comment.