diff --git a/go/vt/sidecardb/schema/vdiff/vdiff.sql b/go/vt/sidecardb/schema/vdiff/vdiff.sql index 5eae9270460..52392bde427 100644 --- a/go/vt/sidecardb/schema/vdiff/vdiff.sql +++ b/go/vt/sidecardb/schema/vdiff/vdiff.sql @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff `started_at` timestamp NULL DEFAULT NULL, `liveness_timestamp` timestamp NULL DEFAULT NULL, `completed_at` timestamp NULL DEFAULT NULL, - `last_error` varbinary(512) DEFAULT NULL, + `last_error` varbinary(1024) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uuid_idx` (`vdiff_uuid`), KEY `state` (`state`), diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index cf77502fb32..75b0e37d630 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -85,7 +85,7 @@ func TestEngineOpen(t *testing.T) { ), nil) // Now let's short circuit the vdiff as we know that the open has worked as expected. - shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) + shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient) vdenv.vde.Open(context.Background(), vdiffenv.vre) defer vdenv.vde.Close() @@ -132,7 +132,7 @@ func TestVDiff(t *testing.T) { ), fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName), ), nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) @@ -194,7 +194,7 @@ func TestVDiff(t *testing.T) { vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil) - vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil) vdenv.vde.mu.Lock() @@ -271,7 +271,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) { ), nil) // At this point we know that we kicked off the expected retry so we can short circit the vdiff. - shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) + shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient) expectedControllerCnt++ } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 9f69e9ed86d..d5e8c134814 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -197,7 +197,7 @@ func resetBinlogClient() { // has verified the necessary behavior. func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) { dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test")) - dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil) + dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil) dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index f9f48cc72e9..72da9f15ada 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -38,8 +38,9 @@ const ( IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where vd.id = %a` - // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1` - sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d" + // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`. + // It also truncates the error if needed to ensure that we can save the state when the error text is very long. + sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = '' where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'` sqlGetVReplicationEntry = "select * from _vt.vreplication %s" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e65a0bad253..d3761436285 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -34,11 +34,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -46,6 +41,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // how long to wait for background operations to complete @@ -72,6 +73,12 @@ type tableDiffer struct { sourceQuery string table *tabletmanagerdatapb.TableDefinition lastPK *querypb.QueryResult + + // wgShardStreamers is used, with a cancellable context, to wait for all shard streamers + // to finish after each diff is complete. + wgShardStreamers sync.WaitGroup + shardStreamsCtx context.Context + shardStreamsCancel context.CancelFunc } func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer { @@ -121,19 +128,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() + td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) + if err := td.selectTablets(ctx); err != nil { return err } if err := td.syncSourceStreams(ctx); err != nil { return err } - if err := td.startSourceDataStreams(ctx); err != nil { + if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { return err } if err := td.syncTargetStreams(ctx); err != nil { return err } - if err := td.startTargetDataStream(ctx); err != nil { + if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { return err } td.setupRowSorters() @@ -203,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { var ( wg sync.WaitGroup sourceErr, targetErr error - targetTablet *topodata.Tablet + targetTablet *topodatapb.Tablet ) // The cells from the vdiff record are a comma separated list. @@ -254,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } -func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) { tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err @@ -354,10 +363,12 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) { log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query) + td.wgShardStreamers.Add(1) defer func() { log.Infof("streamOneShard End on %s", participant.tablet.Alias.String()) close(participant.result) close(gtidch) + td.wgShardStreamers.Done() }() participant.err = func() error { conn, err := tabletconn.GetDialer()(participant.tablet, false) diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index e27d421d398..d7d2583a5d3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -134,6 +134,14 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa } func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error { + defer func() { + if td.shardStreamsCancel != nil { + td.shardStreamsCancel() + } + // Wait for all the shard streams to finish before returning. + td.wgShardStreamers.Wait() + }() + select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")