diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 312273e0c84..997ab222255 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strings" + "sync" "testing" "time" @@ -65,6 +66,7 @@ func TestSchemaVersioning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg := sync.WaitGroup{} tsv.EnableHistorian(true) tsv.SetTracking(true) time.Sleep(100 * time.Millisecond) // wait for _vt tables to be created @@ -155,7 +157,9 @@ func TestSchemaVersioning(t *testing.T) { } return nil } + wg.Add(1) go func() { + defer wg.Done() defer close(eventCh) req := &binlogdatapb.VStreamRequest{Target: target, Position: "current", TableLastPKs: nil, Filter: filter} if err := tsv.VStream(ctx, req, send); err != nil { @@ -186,6 +190,7 @@ func TestSchemaVersioning(t *testing.T) { } runCases(ctx, t, cases, eventCh) cancel() + wg.Wait() log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n") ctx, cancel = context.WithCancel(context.Background()) @@ -214,7 +219,9 @@ func TestSchemaVersioning(t *testing.T) { } return nil } + wg.Add(1) go func() { + defer wg.Done() defer close(eventCh) req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter} if err := tsv.VStream(ctx, req, send); err != nil { @@ -257,6 +264,7 @@ func TestSchemaVersioning(t *testing.T) { expectLogs(ctx, t, "Past stream", eventCh, output) cancel() + wg.Wait() log.Infof("\n\n\n=============================================== PAST EVENTS WITHOUT TRACK VERSIONS START HERE ======================\n\n\n") tsv.EnableHistorian(false) @@ -286,7 +294,9 @@ func TestSchemaVersioning(t *testing.T) { } return nil } + wg.Add(1) go func() { + defer wg.Done() defer close(eventCh) req := &binlogdatapb.VStreamRequest{Target: target, Position: startPos, TableLastPKs: nil, Filter: filter} if err := tsv.VStream(ctx, req, send); err != nil { @@ -331,6 +341,7 @@ func TestSchemaVersioning(t *testing.T) { expectLogs(ctx, t, "Past stream", eventCh, output) cancel() + wg.Wait() client := framework.NewClient() client.Execute("drop table vitess_version", nil)