From e00a6f614f9a86ac9a90fb0d035fdd1e6ba99b81 Mon Sep 17 00:00:00 2001 From: deepthi Date: Wed, 1 Jul 2020 16:41:00 -0700 Subject: [PATCH 1/9] failing unit test for multi cell Signed-off-by: deepthi --- go/vt/discovery/tablet_picker_test.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 5ff24df64c7..501035ddbda 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -48,6 +48,25 @@ func TestPickSimple(t *testing.T) { } } +func TestPickFromOtherCell(t *testing.T) { + te := newPickerTestEnv(t) + te.cell = "otherCell" + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + defer deleteTablet(te, want) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, "cell,otherCell", te.keyspace, te.shard, "replica", 1*time.Second, 1*time.Second, 1*time.Minute) + require.NoError(t, err) + defer tp.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if !proto.Equal(want, tablet) { + t.Errorf("Pick: %v, want %v", tablet, want) + } +} + func TestPickFromTwoHealthy(t *testing.T) { te := newPickerTestEnv(t) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) @@ -127,7 +146,7 @@ func newPickerTestEnv(t *testing.T) *pickerTestEnv { keyspace: "ks", shard: "0", cell: "cell", - topoServ: memorytopo.NewServer("cell"), + topoServ: memorytopo.NewServer("cell", "otherCell"), } err := te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{}) require.NoError(t, err) From b0e1c8c6b7f06aa0b0dfb3796035e08b5035d8b3 Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 6 Jul 2020 15:54:02 -0700 Subject: [PATCH 2/9] implement multi cell tablet picker Signed-off-by: deepthi --- go/vt/discovery/healthcheck_test.go | 11 +++ .../legacy_healthcheck_flaky_test.go | 12 --- go/vt/discovery/tablet_picker.go | 87 ++++++++++++------- go/vt/discovery/tablet_picker_test.go | 37 ++------ .../tabletmanager/vreplication/controller.go | 14 ++- go/vt/wrangler/vdiff.go | 6 +- 6 files changed, 79 insertions(+), 88 deletions(-) diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 71f890254b6..c01df132f6b 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -862,6 +862,17 @@ func checkErrorCounter(keyspace, shard string, tabletType topodatapb.TabletType, return nil } +func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.StreamHealthResponse) *fakeConn { + key := TabletToMapKey(tablet) + conn := &fakeConn{ + QueryService: fakes.ErrorQueryService, + tablet: tablet, + fixedResult: fixedResult, + } + connMap[key] = conn + return conn +} + var mustMatch = utils.MustMatchFn( []interface{}{ // types with unexported fields TabletHealth{}, diff --git a/go/vt/discovery/legacy_healthcheck_flaky_test.go b/go/vt/discovery/legacy_healthcheck_flaky_test.go index 7f4beebef80..733b8392fcc 100644 --- a/go/vt/discovery/legacy_healthcheck_flaky_test.go +++ b/go/vt/discovery/legacy_healthcheck_flaky_test.go @@ -33,7 +33,6 @@ import ( "vitess.io/vitess/go/vt/status" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconn" querypb "vitess.io/vitess/go/vt/proto/query" @@ -660,17 +659,6 @@ func (l *listener) StatsUpdate(ts *LegacyTabletStats) { l.output <- ts } -func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.StreamHealthResponse) *fakeConn { - key := TabletToMapKey(tablet) - conn := &fakeConn{ - QueryService: fakes.ErrorQueryService, - tablet: tablet, - fixedResult: fixedResult, - } - connMap[key] = conn - return conn -} - func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { key := TabletToMapKey(tablet) if qs, ok := connMap[key]; ok { diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 2bda68fec42..8e5c86c6d54 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -19,8 +19,15 @@ package discovery import ( "fmt" "math/rand" + "strings" "time" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + + "vitess.io/vitess/go/vt/vttablet/tabletconn" + + "vitess.io/vitess/go/vt/log" + "golang.org/x/net/context" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -31,64 +38,80 @@ import ( // TabletPicker gives a simplified API for picking tablets. type TabletPicker struct { ts *topo.Server - cell string + cells []string keyspace string shard string tabletTypes []topodatapb.TabletType - - healthCheck LegacyHealthCheck - watcher *LegacyTopologyWatcher - statsCache *LegacyTabletStatsCache } // NewTabletPicker returns a TabletPicker. -func NewTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypesStr string, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout time.Duration) (*TabletPicker, error) { +func NewTabletPicker(ts *topo.Server, cell, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr) if err != nil { return nil, fmt.Errorf("failed to parse list of tablet types: %v", tabletTypesStr) } - // These have to be initialized in the following sequence (watcher must be last). - healthCheck := NewLegacyHealthCheck(healthcheckRetryDelay, healthcheckTimeout) - statsCache := NewLegacyTabletStatsCache(healthCheck, ts, cell) - watcher := NewLegacyShardReplicationWatcher(ctx, ts, healthCheck, cell, keyspace, shard, healthcheckTopologyRefresh, DefaultTopoReadConcurrency) + cells := strings.Split(cell, ",") return &TabletPicker{ ts: ts, - cell: cell, + cells: cells, keyspace: keyspace, shard: shard, tabletTypes: tabletTypes, - healthCheck: healthCheck, - watcher: watcher, - statsCache: statsCache, }, nil } // PickForStreaming picks all healthy tablets including the non-serving ones. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - // wait for any of required the tablets (useful for the first run at least, fast for next runs) - if err := tp.statsCache.WaitByFilter(ctx, tp.keyspace, tp.shard, tp.tabletTypes, RemoveUnhealthyTablets); err != nil { - return nil, vterrors.Wrapf(err, "error waiting for tablets for %v %v %v", tp.cell, tp.keyspace, tp.shard) + // TODO: parse tp.cell and call this for one cell at a time? + candidates := tp.getAllTablets(ctx) + if len(candidates) == 0 { + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for %v %v %v", tp.cells, tp.keyspace, tp.shard) } - - // Refilter the tablets list based on the same criteria. - var addrs []LegacyTabletStats - for _, tabletType := range tp.tabletTypes { - list := RemoveUnhealthyTablets(tp.statsCache.GetTabletStats(tp.keyspace, tp.shard, tabletType)) - addrs = append(addrs, list...) - } - if len(addrs) > 0 { - return addrs[rand.Intn(len(addrs))].Tablet, nil + for { + idx := rand.Intn(len(candidates)) + alias := candidates[idx] + // get tablet + ti, err := tp.ts.GetTablet(ctx, alias) + if err != nil { + log.Warningf("unable to get tablet for alias %v", alias) + candidates = append(candidates[:idx], candidates[idx+1:]...) + if len(candidates) == 0 { + break + } + continue + } + // try to connect to tablet + conn, err := tabletconn.GetDialer()(ti.Tablet, true) + if err != nil { + log.Warningf("unable to connect to tablet for alias %v", alias) + candidates = append(candidates[:idx], candidates[idx+1:]...) + if len(candidates) == 0 { + break + } + continue + } + _ = conn.Close(ctx) + return ti.Tablet, nil } - // Unreachable. - return nil, fmt.Errorf("can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes) } -// Close shuts down TabletPicker. -func (tp *TabletPicker) Close() { - tp.watcher.Stop() - tp.healthCheck.Close() +func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { + result := make([]*topodatapb.TabletAlias, 0) + for _, cell := range tp.cells { + sri, err := tp.ts.GetShardReplication(ctx, cell, tp.keyspace, tp.shard) + if err != nil { + log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard) + continue + } + + for _, node := range sri.Nodes { + result = append(result, node.TabletAlias) + } + } + return result } func init() { diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 501035ddbda..f7042859e43 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -37,9 +37,8 @@ func TestPickSimple(t *testing.T) { want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) defer deleteTablet(te, want) - tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica", 1*time.Second, 1*time.Second, 1*time.Minute) + tp, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica") require.NoError(t, err) - defer tp.Close() tablet, err := tp.PickForStreaming(context.Background()) require.NoError(t, err) @@ -54,9 +53,8 @@ func TestPickFromOtherCell(t *testing.T) { want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) defer deleteTablet(te, want) - tp, err := NewTabletPicker(context.Background(), te.topoServ, "cell,otherCell", te.keyspace, te.shard, "replica", 1*time.Second, 1*time.Second, 1*time.Minute) + tp, err := NewTabletPicker(te.topoServ, "cell,otherCell", te.keyspace, te.shard, "replica") require.NoError(t, err) - defer tp.Close() ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() @@ -74,9 +72,8 @@ func TestPickFromTwoHealthy(t *testing.T) { want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, true, true) defer deleteTablet(te, want2) - tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute) + tp, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) - defer tp.Close() // In 20 attempts, both tablet types must be picked at least once. var picked1, picked2 bool @@ -94,39 +91,15 @@ func TestPickFromTwoHealthy(t *testing.T) { assert.True(t, picked2) } -func TestPickFromSomeUnhealthy(t *testing.T) { - te := newPickerTestEnv(t) - defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false)) - want := addTablet(te, 101, topodatapb.TabletType_RDONLY, false, true) - defer deleteTablet(te, want) - - tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute) - require.NoError(t, err) - defer tp.Close() - - tablet, err := tp.PickForStreaming(context.Background()) - require.NoError(t, err) - if !proto.Equal(tablet, want) { - t.Errorf("Pick:\n%v, want\n%v", tablet, want) - } -} - func TestPickError(t *testing.T) { te := newPickerTestEnv(t) defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false)) - _, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "badtype", 1*time.Second, 1*time.Second, 1*time.Minute) + _, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "badtype") assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute) + _, err = NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) - defer tp.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - _, err = tp.PickForStreaming(ctx) - require.Error(t, err) - assert.Contains(t, err.Error(), "error waiting for tablets") } type pickerTestEnv struct { diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7d5acf97cbf..6b28641d082 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -40,10 +40,11 @@ import ( ) var ( - healthcheckTopologyRefresh = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") - healthcheckRetryDelay = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay") - healthcheckTimeout = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay") - retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection") + // deprecated flags (7.0) + _ = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") + _ = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "healthcheck retry delay") + _ = flag.Duration("vreplication_healthcheck_timeout", 1*time.Minute, "healthcheck retry delay") + retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay before retrying a failed binlog connection") ) // controller is created by Engine. Members are initialized upfront. @@ -112,7 +113,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if v := params["tablet_types"]; v != "" { tabletTypesStr = v } - tp, err := discovery.NewTabletPicker(ctx, ts, cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, *healthcheckTopologyRefresh, *healthcheckRetryDelay, *healthcheckTimeout) + tp, err := discovery.NewTabletPicker(ts, cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) if err != nil { return nil, err } @@ -130,9 +131,6 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor func (ct *controller) run(ctx context.Context) { defer func() { log.Infof("stream %v: stopped", ct.id) - if ct.tabletPicker != nil { - ct.tabletPicker.Close() - } close(ct.done) }() diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 7a635533891..ff3b8a39e4f 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -429,11 +429,10 @@ func (df *vdiff) selectTablets(ctx context.Context, healthcheckTopologyRefresh, go func() { defer wg.Done() err1 = df.forAll(df.sources, func(shard string, source *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.wr.ts, df.sourceCell, df.ts.sourceKeyspace, shard, df.tabletTypesStr, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout) + tp, err := discovery.NewTabletPicker(df.ts.wr.ts, df.sourceCell, df.ts.sourceKeyspace, shard, df.tabletTypesStr) if err != nil { return err } - defer tp.Close() tablet, err := tp.PickForStreaming(ctx) if err != nil { @@ -448,11 +447,10 @@ func (df *vdiff) selectTablets(ctx context.Context, healthcheckTopologyRefresh, go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.wr.ts, df.targetCell, df.ts.targetKeyspace, shard, df.tabletTypesStr, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout) + tp, err := discovery.NewTabletPicker(df.ts.wr.ts, df.targetCell, df.ts.targetKeyspace, shard, df.tabletTypesStr) if err != nil { return err } - defer tp.Close() tablet, err := tp.PickForStreaming(ctx) if err != nil { From 4361c56c4902267d0768d17b3788ea6d2e113c9b Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 13 Jul 2020 18:34:33 -0700 Subject: [PATCH 3/9] tablet_picker: now works with cell aliases Signed-off-by: deepthi --- go/vt/discovery/tablet_picker.go | 25 ++++--- go/vt/discovery/tablet_picker_test.go | 66 ++++++++++++------- go/vt/topo/cells_aliases.go | 22 +++++++ go/vt/vtctl/vtctl.go | 3 +- .../tabletmanager/vreplication/controller.go | 4 +- go/vt/wrangler/vdiff.go | 10 +-- go/vt/wrangler/vdiff_test.go | 18 ++--- 7 files changed, 100 insertions(+), 48 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 8e5c86c6d54..a92670b123f 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,11 +17,11 @@ limitations under the License. package discovery import ( - "fmt" "math/rand" - "strings" "time" + "vitess.io/vitess/go/vt/topo/topoproto" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -31,7 +31,6 @@ import ( "golang.org/x/net/context" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" ) @@ -45,14 +44,11 @@ type TabletPicker struct { } // NewTabletPicker returns a TabletPicker. -func NewTabletPicker(ts *topo.Server, cell, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { +func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr) if err != nil { - return nil, fmt.Errorf("failed to parse list of tablet types: %v", tabletTypesStr) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr) } - - cells := strings.Split(cell, ",") - return &TabletPicker{ ts: ts, cells: cells, @@ -64,7 +60,6 @@ func NewTabletPicker(ts *topo.Server, cell, keyspace, shard, tabletTypesStr stri // PickForStreaming picks all healthy tablets including the non-serving ones. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - // TODO: parse tp.cell and call this for one cell at a time? candidates := tp.getAllTablets(ctx) if len(candidates) == 0 { return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for %v %v %v", tp.cells, tp.keyspace, tp.shard) @@ -100,7 +95,19 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { result := make([]*topodatapb.TabletAlias, 0) + actualCells := make([]string, 0) for _, cell := range tp.cells { + // check if cell is actually an alias + // non-blocking read so that this is fast + alias, err := tp.ts.GetCellsAlias(ctx, cell, false) + if err != nil { + // either cellAlias doesn't exist or it isn't a cell alias at all. In that case assume it is a cell + actualCells = append(actualCells, cell) + } else { + actualCells = append(actualCells, alias.Cells...) + } + } + for _, cell := range actualCells { sri, err := tp.ts.GetShardReplication(ctx, cell, tp.keyspace, tp.shard) if err != nil { log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index f7042859e43..aab74a31877 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -33,11 +33,11 @@ import ( ) func TestPickSimple(t *testing.T) { - te := newPickerTestEnv(t) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + te := newPickerTestEnv(t, []string{"cell"}) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(te, want) - tp, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) tablet, err := tp.PickForStreaming(context.Background()) @@ -48,12 +48,11 @@ func TestPickSimple(t *testing.T) { } func TestPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t) - te.cell = "otherCell" - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) defer deleteTablet(te, want) - tp, err := NewTabletPicker(te.topoServ, "cell,otherCell", te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) @@ -66,13 +65,13 @@ func TestPickFromOtherCell(t *testing.T) { } func TestPickFromTwoHealthy(t *testing.T) { - te := newPickerTestEnv(t) - want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true) + te := newPickerTestEnv(t, []string{"cell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(te, want1) - want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, true, true) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) defer deleteTablet(te, want2) - tp, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) // In 20 attempts, both tablet types must be picked at least once. @@ -91,14 +90,32 @@ func TestPickFromTwoHealthy(t *testing.T) { assert.True(t, picked2) } +func TestPickUsingCellAlias(t *testing.T) { + // test env puts all cells into an alias called "cella" + te := newPickerTestEnv(t, []string{"cell"}) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want) + + tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if !proto.Equal(want, tablet) { + t.Errorf("Pick: %v, want %v", tablet, want) + } +} + func TestPickError(t *testing.T) { - te := newPickerTestEnv(t) - defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false)) + te := newPickerTestEnv(t, []string{"cell"}) + defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", false, false)) - _, err := NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "badtype") + _, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype") assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - _, err = NewTabletPicker(te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly") + _, err = NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) } @@ -106,32 +123,37 @@ type pickerTestEnv struct { t *testing.T keyspace string shard string - cell string + cells []string topoServ *topo.Server } -func newPickerTestEnv(t *testing.T) *pickerTestEnv { +func newPickerTestEnv(t *testing.T, cells []string) *pickerTestEnv { ctx := context.Background() te := &pickerTestEnv{ t: t, keyspace: "ks", shard: "0", - cell: "cell", - topoServ: memorytopo.NewServer("cell", "otherCell"), + cells: cells, + topoServ: memorytopo.NewServer(cells...), } - err := te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{}) + // create cell alias + err := te.topoServ.CreateCellsAlias(ctx, "cella", &topodatapb.CellsAlias{ + Cells: cells, + }) + require.NoError(t, err) + err = te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{}) require.NoError(t, err) err = te.topoServ.CreateShard(ctx, te.keyspace, te.shard) require.NoError(t, err) return te } -func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, serving, healthy bool) *topodatapb.Tablet { +func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet { tablet := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ - Cell: te.cell, + Cell: cell, Uid: uint32(id), }, Keyspace: te.keyspace, diff --git a/go/vt/topo/cells_aliases.go b/go/vt/topo/cells_aliases.go index c751bd1da84..5efe690df1b 100644 --- a/go/vt/topo/cells_aliases.go +++ b/go/vt/topo/cells_aliases.go @@ -74,6 +74,28 @@ func (ts *Server) GetCellsAliases(ctx context.Context, strongRead bool) (ret map } } +// GetCellsAlias returns the CellsAlias that matches the given name. +func (ts *Server) GetCellsAlias(ctx context.Context, name string, strongRead bool) (*topodatapb.CellsAlias, error) { + conn := ts.globalCell + if !strongRead { + conn = ts.globalReadOnlyCell + } + + aliasPath := pathForCellsAlias(name) + contents, _, err := conn.Get(ctx, aliasPath) + if err != nil { + return nil, err + } + + // Unpack the contents. + cellsAlias := &topodatapb.CellsAlias{} + if err := proto.Unmarshal(contents, cellsAlias); err != nil { + return nil, err + } + + return cellsAlias, nil +} + // DeleteCellsAlias deletes the specified CellsAlias func (ts *Server) DeleteCellsAlias(ctx context.Context, alias string) error { ts.clearCellAliasesCache() diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index eb103f4db9d..00b10134feb 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1961,8 +1961,7 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla return err } - _, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, - *HealthCheckTopologyRefresh, *HealthcheckRetryDelay, *HealthCheckTimeout, *format) + _, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *format) return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 6b28641d082..6e54330d1f3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "strconv" + "strings" "time" "vitess.io/vitess/go/vt/discovery" @@ -113,7 +114,8 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if v := params["tablet_types"]; v != "" { tabletTypesStr = v } - tp, err := discovery.NewTabletPicker(ts, cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) + cells := strings.Split(cell, ",") + tp, err := discovery.NewTabletPicker(ts, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) if err != nil { return nil, err } diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index ff3b8a39e4f..850799bc572 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -110,7 +110,7 @@ type shardStreamer struct { // VDiff reports differences between the sources and targets of a vreplication workflow. func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceCell, targetCell, tabletTypesStr string, - filteredReplicationWaitTime, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout time.Duration, + filteredReplicationWaitTime time.Duration, format string) (map[string]*DiffReport, error) { // Assign defaults to sourceCell and targetCell if not specified. if sourceCell == "" && targetCell == "" { @@ -176,7 +176,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflow, sourceC if err = df.buildVDiffPlan(ctx, oneFilter, schm); err != nil { return nil, vterrors.Wrap(err, "buildVDiffPlan") } - if err := df.selectTablets(ctx, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout); err != nil { + if err := df.selectTablets(ctx); err != nil { return nil, vterrors.Wrap(err, "selectTablets") } defer func(ctx context.Context) { @@ -420,7 +420,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []int) *e } // selectTablets selects the tablets that will be used for the diff. -func (df *vdiff) selectTablets(ctx context.Context, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout time.Duration) error { +func (df *vdiff) selectTablets(ctx context.Context) error { var wg sync.WaitGroup var err1, err2 error @@ -429,7 +429,7 @@ func (df *vdiff) selectTablets(ctx context.Context, healthcheckTopologyRefresh, go func() { defer wg.Done() err1 = df.forAll(df.sources, func(shard string, source *shardStreamer) error { - tp, err := discovery.NewTabletPicker(df.ts.wr.ts, df.sourceCell, df.ts.sourceKeyspace, shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(df.ts.wr.ts, []string{df.sourceCell}, df.ts.sourceKeyspace, shard, df.tabletTypesStr) if err != nil { return err } @@ -447,7 +447,7 @@ func (df *vdiff) selectTablets(ctx context.Context, healthcheckTopologyRefresh, go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(df.ts.wr.ts, df.targetCell, df.ts.targetKeyspace, shard, df.tabletTypesStr) + tp, err := discovery.NewTabletPicker(df.ts.wr.ts, []string{df.targetCell}, df.ts.targetKeyspace, shard, df.tabletTypesStr) if err != nil { return err } diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index c81d3e6a742..146a398d2b6 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -557,7 +557,7 @@ func TestVDiffUnsharded(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, tcase.source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, tcase.target) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "") require.NoError(t, err) assert.Equal(t, tcase.dr, dr["t1"], tcase.id) } @@ -619,7 +619,7 @@ func TestVDiffSharded(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "") require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 3, @@ -685,7 +685,7 @@ func TestVDiffAggregates(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "") require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 5, @@ -749,7 +749,7 @@ func TestVDiffPKWeightString(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "") require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 4, @@ -813,7 +813,7 @@ func TestVDiffNoPKWeightString(t *testing.T) { ), ) - dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + dr, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 30*time.Second, "") require.NoError(t, err) wantdr := &DiffReport{ ProcessedRows: 4, @@ -851,11 +851,11 @@ func TestVDiffDefaults(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) - _, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + _, err := env.wr.VDiff(context.Background(), "target", env.workflow, "", "", "replica", 30*time.Second, "") require.NoError(t, err) - _, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, "", env.cell, "replica", 30*time.Second, "") require.NoError(t, err) - _, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + _, err = env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, "", "replica", 30*time.Second, "") require.NoError(t, err) } @@ -888,6 +888,6 @@ func TestVDiffReplicationWait(t *testing.T) { env.tablets[101].setResults("select c1, c2 from t1 order by c1 asc", vdiffSourceGtid, source) env.tablets[201].setResults("select c1, c2 from t1 order by c1 asc", vdiffTargetMasterPosition, target) - _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, 1*time.Second, 1*time.Second, 1*time.Minute, "") + _, err := env.wr.VDiff(context.Background(), "target", env.workflow, env.cell, env.cell, "replica", 0*time.Second, "") require.EqualError(t, err, "startQueryStreams(sources): WaitForPosition for tablet cell-0000000101: context deadline exceeded") } From 88a7cbf3c6c7740dac20cf5b6a55abfc9031339b Mon Sep 17 00:00:00 2001 From: deepthi Date: Tue, 14 Jul 2020 08:38:19 -0700 Subject: [PATCH 4/9] tablet_picker: must respect provided tablet_types Signed-off-by: deepthi --- go/vt/discovery/tablet_picker.go | 13 +++++++++++++ go/vt/discovery/tablet_picker_test.go | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index a92670b123f..56e9fa51a6d 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -77,6 +77,11 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } + if !tabletTypeIn(ti.Tablet.Type, tp.tabletTypes) { + // tablet is not of one of the desired types + continue + } + // try to connect to tablet conn, err := tabletconn.GetDialer()(ti.Tablet, true) if err != nil { @@ -93,6 +98,14 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes) } +func tabletTypeIn(tabletType topodatapb.TabletType, tabletTypes []topodatapb.TabletType) bool { + for _, t := range tabletTypes { + if tabletType == t { + return true + } + } + return false +} func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { result := make([]*topodatapb.TabletAlias, 0) actualCells := make([]string, 0) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index aab74a31877..9b2c73acac4 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -90,6 +90,25 @@ func TestPickFromTwoHealthy(t *testing.T) { assert.True(t, picked2) } +func TestPickRespectsTabletType(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want) + dont := addTablet(te, 101, topodatapb.TabletType_MASTER, "cell", true, true) + defer deleteTablet(te, dont) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + require.NoError(t, err) + + // In 20 attempts, master tablet must be never picked + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(context.Background()) + require.NoError(t, err) + require.NotNil(t, tablet) + require.True(t, proto.Equal(tablet, want), "picked wrong tablet type") + } +} + func TestPickUsingCellAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell"}) From 5abf76444c7c45e39c66bd6c7bd7c279a2bade92 Mon Sep 17 00:00:00 2001 From: deepthi Date: Tue, 14 Jul 2020 14:07:12 -0700 Subject: [PATCH 5/9] tablet_picker: unit tests Signed-off-by: deepthi --- go/vt/discovery/tablet_picker_test.go | 181 ++++++++++++++++++++++---- 1 file changed, 155 insertions(+), 26 deletions(-) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 9b2c73acac4..7f5d17db93e 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -42,26 +42,7 @@ func TestPickSimple(t *testing.T) { tablet, err := tp.PickForStreaming(context.Background()) require.NoError(t, err) - if !proto.Equal(want, tablet) { - t.Errorf("Pick: %v, want %v", tablet, want) - } -} - -func TestPickFromOtherCell(t *testing.T) { - te := newPickerTestEnv(t, []string{"cell", "otherCell"}) - want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(te, want) - - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - tablet, err := tp.PickForStreaming(ctx) - require.NoError(t, err) - if !proto.Equal(want, tablet) { - t.Errorf("Pick: %v, want %v", tablet, want) - } + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } func TestPickFromTwoHealthy(t *testing.T) { @@ -109,22 +90,167 @@ func TestPickRespectsTabletType(t *testing.T) { } } -func TestPickUsingCellAlias(t *testing.T) { - // test env puts all cells into an alias called "cella" - te := newPickerTestEnv(t, []string{"cell"}) +func TestPickMultiCell(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(te, want) - tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) +} + +func TestPickFromOtherCell(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(te, want) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() tablet, err := tp.PickForStreaming(ctx) require.NoError(t, err) - if !proto.Equal(want, tablet) { - t.Errorf("Pick: %v, want %v", tablet, want) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) +} + +func TestDontPickFromOtherCell(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(te, want2) + + tp, err := NewTabletPicker(te.topoServ, []string{"cell"}, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // In 20 attempts, only want1 must be picked because TabletPicker.cells = "cell" + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } } + assert.True(t, picked1) + assert.False(t, picked2) +} + +func TestPickMultiCellTwoTablets(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(te, want2) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // In 20 attempts, both tablet types must be picked at least once. + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + } + assert.True(t, picked1) + assert.True(t, picked2) +} + +func TestPickMultiCellTwoTabletTypes(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "otherCell", true, true) + defer deleteTablet(te, want2) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // In 20 attempts, both tablet types must be picked at least once. + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + } + assert.True(t, picked1) + assert.True(t, picked2) +} + +func TestPickUsingCellAlias(t *testing.T) { + // test env puts all cells into an alias called "cella" + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want1) + + tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel1() + tablet, err := tp.PickForStreaming(ctx1) + require.NoError(t, err) + assert.True(t, proto.Equal(want1, tablet), "Pick: %v, want %v", tablet, want1) + + // create a tablet in the other cell, it should be picked + deleteTablet(te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) + defer deleteTablet(te, want2) + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() + tablet, err = tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(want2, tablet), "Pick: %v, want %v", tablet, want2) + + // addTablet again and test that both are picked at least once + want1 = addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + ctx3, cancel3 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel3() + + // In 20 attempts, both tablet types must be picked at least once. + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(ctx3) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + } + assert.True(t, picked1) + assert.True(t, picked2) } func TestPickError(t *testing.T) { @@ -205,6 +331,9 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) { + if tablet == nil { + return + } //log error if err := te.topoServ.DeleteTablet(context.Background(), tablet.Alias); err != nil { log.Errorf("failed to DeleteTablet with alias : %v", err) From 1e459fd3f15c05377ab78d041a3f9dba5535e540 Mon Sep 17 00:00:00 2001 From: deepthi Date: Wed, 15 Jul 2020 16:02:01 -0700 Subject: [PATCH 6/9] vreplication multi-cell endtoend test Signed-off-by: deepthi --- go/test/endtoend/vreplication/cluster.go | 184 ++++++++++-------- .../vreplication/vreplication_test.go | 51 ++++- .../tabletmanager/vreplication/controller.go | 2 + 3 files changed, 145 insertions(+), 92 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index 30ad922e341..fb45df780c6 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -7,10 +7,11 @@ import ( "os" "os/exec" "strings" - _ "strings" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -91,7 +92,7 @@ func NewVitessCluster(name string) (cluster *VitessCluster, err error) { } // InitCluster creates the global processes needed for a cluster -func InitCluster(t *testing.T, cellName string) *VitessCluster { +func InitCluster(t *testing.T, cellNames []string) *VitessCluster { initGlobals() vc, _ := NewVitessCluster("Vdemo") assert.NotNil(t, vc) @@ -101,20 +102,25 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster { assert.Nil(t, topo.Setup("etcd2", nil)) topo.ManageTopoDir("mkdir", "/vitess/global") vc.Topo = topo - topo.ManageTopoDir("mkdir", "/vitess/"+cellName) + for _, cellName := range cellNames { + topo.ManageTopoDir("mkdir", "/vitess/"+cellName) + } vtctld := cluster.VtctldProcessInstance(globalConfig.vtctldPort, globalConfig.vtctldGrpcPort, globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir) vc.Vtctld = vtctld assert.NotNil(t, vc.Vtctld) - vc.Vtctld.Setup(cellName) + // use first cell as `-cell` and all cells as `-cells_to_watch` + vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ",")) vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname) assert.NotNil(t, vc.Vtctl) - vc.Vtctl.AddCellInfo(cellName) - cell, err := vc.AddCell(t, cellName) - assert.Nil(t, err) - assert.NotNil(t, cell) + for _, cellName := range cellNames { + vc.Vtctl.AddCellInfo(cellName) + cell, err := vc.AddCell(t, cellName) + assert.Nil(t, err) + assert.NotNil(t, cell) + } vc.VtctlClient = cluster.VtctlClientProcessInstance(globalConfig.hostname, vc.Vtctld.GrpcPort, globalConfig.tmpDir) assert.NotNil(t, vc.VtctlClient) @@ -123,7 +129,7 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster { } // AddKeyspace creates a keyspace with specified shard keys and number of replica/read-only tablets -func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) { +func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) { keyspace := &Keyspace{ Name: ksName, Shards: make(map[string]*Shard), @@ -132,10 +138,16 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh if err := vc.Vtctl.CreateKeyspace(keyspace.Name); err != nil { t.Fatalf(err.Error()) } - cell.Keyspaces[ksName] = keyspace - if err := vc.AddShards(t, cell, keyspace, shards, numReplicas, numRdonly, tabletIDBase); err != nil { - t.Fatalf(err.Error()) + cellsToWatch := "" + for i, cell := range cells { + if i > 0 { + cellsToWatch = cellsToWatch + "," + } + cell.Keyspaces[ksName] = keyspace + cellsToWatch = cellsToWatch + cell.Name } + require.NoError(t, vc.AddShards(t, cells, keyspace, shards, numReplicas, numRdonly, tabletIDBase)) + if schema != "" { if err := vc.VtctlClient.ApplySchema(ksName, schema); err != nil { t.Fatalf(err.Error()) @@ -148,9 +160,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh } } keyspace.VSchema = vschema - if len(cell.Vtgates) == 0 { - fmt.Println("Starting vtgate") - vc.StartVtgate(t, cell) + for _, cell := range cells { + if len(cell.Vtgates) == 0 { + fmt.Println("Starting vtgate") + vc.StartVtgate(t, cell, cellsToWatch) + } } _ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName) return keyspace, nil @@ -194,87 +208,92 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace, } // AddShards creates shards given list of comma-separated keys with specified tablets in each shard -func (vc *VitessCluster) AddShards(t *testing.T, cell *Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error { +func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error { arrNames := strings.Split(names, ",") fmt.Printf("Addshards got %d shards with %+v\n", len(arrNames), arrNames) isSharded := len(arrNames) > 1 + masterTabletUID := 0 for ind, shardName := range arrNames { - if _, ok := keyspace.Shards[shardName]; ok { - fmt.Printf("Shard %s already exists, not adding\n", shardName) - continue - } tabletID := tabletIDBase + ind*100 tabletIndex := 0 - dbProcesses := make([]*exec.Cmd, 0) - tablets := make([]*Tablet, 0) - - fmt.Printf("Adding Shard %s\n", shardName) - if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil { - t.Fatalf("CreateShard command failed with %+v\n", err) - } - shard := &Shard{Name: shardName, IsSharded: isSharded, Tablets: make(map[string]*Tablet, 1)} - fmt.Println("Adding Master tablet") - master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex) - if err != nil { - t.Fatalf(err.Error()) - } - assert.NotNil(t, master) - tabletIndex++ - master.Vttablet.VreplicationTabletType = "MASTER" - tablets = append(tablets, master) - dbProcesses = append(dbProcesses, proc) - for i := 0; i < numReplicas; i++ { - fmt.Println("Adding Replica tablet") - tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex) - if err != nil { - t.Fatalf(err.Error()) + if _, ok := keyspace.Shards[shardName]; ok { + fmt.Printf("Shard %s already exists, not adding\n", shardName) + } else { + fmt.Printf("Adding Shard %s\n", shardName) + if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil { + t.Fatalf("CreateShard command failed with %+v\n", err) } - assert.NotNil(t, tablet) - tabletIndex++ - tablets = append(tablets, tablet) - dbProcesses = append(dbProcesses, proc) + keyspace.Shards[shardName] = shard } - for i := 0; i < numRdonly; i++ { - fmt.Println("Adding RdOnly tablet") - tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex) - if err != nil { - t.Fatalf(err.Error()) + for i, cell := range cells { + dbProcesses := make([]*exec.Cmd, 0) + tablets := make([]*Tablet, 0) + if i == 0 { + // only add master tablet for first cell, so first time CreateShard is called + fmt.Println("Adding Master tablet") + master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex) + if err != nil { + t.Fatalf(err.Error()) + } + assert.NotNil(t, master) + tabletIndex++ + master.Vttablet.VreplicationTabletType = "MASTER" + tablets = append(tablets, master) + dbProcesses = append(dbProcesses, proc) + masterTabletUID = master.Vttablet.TabletUID } - assert.NotNil(t, tablet) - tabletIndex++ - tablets = append(tablets, tablet) - dbProcesses = append(dbProcesses, proc) - } - keyspace.Shards[shardName] = shard - for ind, proc := range dbProcesses { - fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name) - if err := proc.Wait(); err != nil { - t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet) + for i := 0; i < numReplicas; i++ { + fmt.Println("Adding Replica tablet") + tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex) + if err != nil { + t.Fatalf(err.Error()) + } + assert.NotNil(t, tablet) + tabletIndex++ + tablets = append(tablets, tablet) + dbProcesses = append(dbProcesses, proc) } - } - for ind, tablet := range tablets { - fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name) - if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), - keyspace.Name, false); err != nil { - t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet) + for i := 0; i < numRdonly; i++ { + fmt.Println("Adding RdOnly tablet") + tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex) + if err != nil { + t.Fatalf(err.Error()) + } + assert.NotNil(t, tablet) + tabletIndex++ + tablets = append(tablets, tablet) + dbProcesses = append(dbProcesses, proc) } - fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name) - if err := tablet.Vttablet.Setup(); err != nil { - t.Fatalf(err.Error()) + + for ind, proc := range dbProcesses { + fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name) + if err := proc.Wait(); err != nil { + t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet) + } + } + for ind, tablet := range tablets { + fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name) + if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), + keyspace.Name, false); err != nil { + t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet) + } + fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name) + if err := tablet.Vttablet.Setup(); err != nil { + t.Fatalf(err.Error()) + } } } - fmt.Printf("InitShardMaster for %d\n", master.Vttablet.TabletUID) - err = vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cell.Name, master.Vttablet.TabletUID) - if err != nil { - t.Fatal(err.Error()) - } + require.NotEqual(t, 0, masterTabletUID, "Should have created a master tablet") + fmt.Printf("InitShardMaster for %d\n", masterTabletUID) + require.NoError(t, vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cells[0].Name, masterTabletUID)) fmt.Printf("Finished creating shard %s\n", shard.Name) } return nil } +// DeleteShard deletes a shard func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName string, shardName string) { shard := vc.Cells[cellName].Keyspaces[ksName].Shards[shardName] assert.NotNil(t, shard) @@ -291,13 +310,13 @@ func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName strin } // StartVtgate starts a vtgate process -func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell) { +func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell, cellsToWatch string) { vtgate := cluster.VtgateProcessInstance( globalConfig.vtgatePort, globalConfig.vtgateGrpcPort, globalConfig.vtgateMySQLPort, cell.Name, - cell.Name, + cellsToWatch, globalConfig.hostname, globalConfig.tabletTypes, globalConfig.topoPort, @@ -405,12 +424,13 @@ func (vc *VitessCluster) execTabletQuery(vttablet *cluster.VttabletProcess, quer Uname: "vt_dba", } ctx := context.Background() - if conn, err := mysql.Connect(ctx, &vtParams); err != nil { + var conn *mysql.Conn + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { return nil, err - } else { - qr, err := conn.ExecuteFetch(query, 1000, true) - return qr, err } + qr, err := conn.ExecuteFetch(query, 1000, true) + return qr, err } func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName string, tabletType string) map[string]*cluster.VttabletProcess { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 7d657f6d9f6..c879dd1a67c 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" @@ -47,13 +49,13 @@ func init() { func TestBasicVreplicationWorkflow(t *testing.T) { cellName := "zone1" - vc = InitCluster(t, cellName) + vc = InitCluster(t, []string{cellName}) assert.NotNil(t, vc) defer vc.TearDown() cell = vc.Cells[cellName] - vc.AddKeyspace(t, cell, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) + vc.AddKeyspace(t, []*Cell{cell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) vtgate = cell.Vtgates[0] assert.NotNil(t, vtgate) vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) @@ -62,7 +64,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) - shardCustomer(t, true) + shardCustomer(t, true, cell, cell) shardOrders(t) shardMerchant(t) @@ -83,6 +85,33 @@ func TestBasicVreplicationWorkflow(t *testing.T) { expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1) } +func TestMultiCellVreplicationWorkflow(t *testing.T) { + cells := []string{"zone1", "zone2"} + + vc = InitCluster(t, cells) + assert.NotNil(t, vc) + + defer vc.TearDown() + + cell1 := vc.Cells["zone1"] + cell2 := vc.Cells["zone2"] + vc.AddKeyspace(t, []*Cell{cell1, cell2}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) + + vtgate = cell1.Vtgates[0] + assert.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) + + vtgateConn = getConnection(t, globalConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t) + insertInitialData(t) + shardCustomer(t, true, cell1, cell2) + // TODO: Test resharding once -cells option is added to Reshard command + //insertMoreCustomers(t, 16) + //reshardCustomer2to4Split(t) + //expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) +} + func insertInitialData(t *testing.T) { fmt.Printf("Inserting initial data\n") lines, _ := ioutil.ReadFile("unsharded_init_data.sql") @@ -110,8 +139,12 @@ func insertMoreCustomers(t *testing.T, numCustomers int) { execVtgateQuery(t, vtgateConn, "customer", sql) } -func shardCustomer(t *testing.T, testReverse bool) { - if _, err := vc.AddKeyspace(t, cell, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { +func shardCustomer(t *testing.T, testReverse bool, cell, sourceCell *Cell) { + cells := []*Cell{cell} + if cell != sourceCell { + cells = append(cells, sourceCell) + } + if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { t.Fatal(err) } if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "-80"), 1); err != nil { @@ -121,7 +154,7 @@ func shardCustomer(t *testing.T, testReverse bool) { t.Fatal(err) } - if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+cell.Name, "-workflow=p2c", + if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+sourceCell.Name, "-workflow=p2c", "-tablet_types="+"replica,rdonly", "product", "customer", "customer"); err != nil { t.Fatalf("MoveTables command failed with %+v\n", err) } @@ -349,9 +382,7 @@ func reshardCustomer3to1Merge(t *testing.T) { //to unsharded func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string) { ksWorkflow := ksName + "." + workflow keyspace := vc.Cells[cell.Name].Keyspaces[ksName] - if err := vc.AddShards(t, cell, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase); err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, vc.AddShards(t, []*Cell{cell}, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase)) arrShardNames := strings.Split(targetShards, ",") for _, shardName := range arrShardNames { @@ -444,7 +475,7 @@ func shardOrders(t *testing.T) { } func shardMerchant(t *testing.T) { - if _, err := vc.AddKeyspace(t, cell, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { + if _, err := vc.AddKeyspace(t, []*Cell{cell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { t.Fatal(err) } if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "-80"), 1); err != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 6e54330d1f3..cb0e722c299 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -83,6 +83,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor blpStats: blpStats, done: make(chan struct{}), } + log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id id, err := strconv.Atoi(params["id"]) @@ -114,6 +115,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if v := params["tablet_types"]; v != "" { tabletTypesStr = v } + log.Infof("creating tablet picker for source keyspace/shard %v/%v with cell: %v and tabletTypes: %v", ct.source.Keyspace, ct.source.Shard, cell, tabletTypesStr) cells := strings.Split(cell, ",") tp, err := discovery.NewTabletPicker(ts, cells, ct.source.Keyspace, ct.source.Shard, tabletTypesStr) if err != nil { From b9a4ea60f0324beba2e5e760ed9105c11fe8db99 Mon Sep 17 00:00:00 2001 From: deepthi Date: Wed, 15 Jul 2020 18:03:58 -0700 Subject: [PATCH 7/9] vreplication cell-alias endtoend test Signed-off-by: deepthi --- .../vreplication/vreplication_test.go | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c879dd1a67c..47493e99785 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -64,7 +64,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) - shardCustomer(t, true, cell, cell) + shardCustomer(t, true, []*Cell{cell}, cellName) shardOrders(t) shardMerchant(t) @@ -105,7 +105,38 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t) insertInitialData(t) - shardCustomer(t, true, cell1, cell2) + shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name) + // TODO: Test resharding once -cells option is added to Reshard command + //insertMoreCustomers(t, 16) + //reshardCustomer2to4Split(t) + //expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) +} + +func TestCellAliasVreplicationWorkflow(t *testing.T) { + cells := []string{"zone1", "zone2"} + + vc = InitCluster(t, cells) + assert.NotNil(t, vc) + + defer vc.TearDown() + + cell1 := vc.Cells["zone1"] + cell2 := vc.Cells["zone2"] + vc.AddKeyspace(t, []*Cell{cell1, cell2}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100) + + // Add cell alias containing only zone2 + result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "-cells", "zone2", "alias") + require.NoError(t, err, "command failed with output: %v", result) + + vtgate = cell1.Vtgates[0] + assert.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "product", "0"), 1) + + vtgateConn = getConnection(t, globalConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t) + insertInitialData(t) + shardCustomer(t, true, []*Cell{cell1, cell2}, "alias") // TODO: Test resharding once -cells option is added to Reshard command //insertMoreCustomers(t, 16) //reshardCustomer2to4Split(t) @@ -139,11 +170,7 @@ func insertMoreCustomers(t *testing.T, numCustomers int) { execVtgateQuery(t, vtgateConn, "customer", sql) } -func shardCustomer(t *testing.T, testReverse bool, cell, sourceCell *Cell) { - cells := []*Cell{cell} - if cell != sourceCell { - cells = append(cells, sourceCell) - } +func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) { if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { t.Fatal(err) } @@ -154,11 +181,14 @@ func shardCustomer(t *testing.T, testReverse bool, cell, sourceCell *Cell) { t.Fatal(err) } - if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+sourceCell.Name, "-workflow=p2c", + if err := vc.VtctlClient.ExecuteCommand("MoveTables", "-cell="+sourceCellOrAlias, "-workflow=p2c", "-tablet_types="+"replica,rdonly", "product", "customer", "customer"); err != nil { t.Fatalf("MoveTables command failed with %+v\n", err) } + // Assume we are operating on first cell + cell := cells[0] + customerTab1 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["-80"].Tablets["zone1-200"].Vttablet customerTab2 := vc.Cells[cell.Name].Keyspaces["customer"].Shards["80-"].Tablets["zone1-300"].Vttablet From 166607d65aa45a1c52b6d170ca18b9859e54b989 Mon Sep 17 00:00:00 2001 From: deepthi Date: Wed, 15 Jul 2020 18:40:13 -0700 Subject: [PATCH 8/9] tablet_picker: special handling for master Signed-off-by: deepthi --- go/vt/discovery/tablet_picker.go | 16 +++++++++++++++- go/vt/discovery/tablet_picker_test.go | 22 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 56e9fa51a6d..cd89c9dc222 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -65,7 +65,11 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for %v %v %v", tp.cells, tp.keyspace, tp.shard) } for { - idx := rand.Intn(len(candidates)) + idx := 0 + // if there is only one candidate we use that, otherwise we find one randomly + if len(candidates) > 1 { + idx = rand.Intn(len(candidates)) + } alias := candidates[idx] // get tablet ti, err := tp.ts.GetTablet(ctx, alias) @@ -107,7 +111,17 @@ func tabletTypeIn(tabletType topodatapb.TabletType, tabletTypes []topodatapb.Tab return false } func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { + // Special handling for MASTER tablet type + // Since there is only one master, we ignore cell and find the master result := make([]*topodatapb.TabletAlias, 0) + if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_MASTER { + si, err := tp.ts.GetShard(ctx, tp.keyspace, tp.shard) + if err != nil { + return result + } + result = append(result, si.MasterAlias) + return result + } actualCells := make([]string, 0) for _, cell := range tp.cells { // check if cell is actually an alias diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 7f5d17db93e..554310d87da 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -105,6 +105,28 @@ func TestPickMultiCell(t *testing.T) { assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) } +func TestPickMaster(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell", "otherCell"}) + want := addTablet(te, 100, topodatapb.TabletType_MASTER, "cell", true, true) + defer deleteTablet(te, want) + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.MasterAlias = want.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(te.topoServ, []string{"otherCell"}, te.keyspace, te.shard, "master") + require.NoError(t, err) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) +} + func TestPickFromOtherCell(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) From 2d9c8e3105fcfa753aab7133a90e2f70c53ece6e Mon Sep 17 00:00:00 2001 From: deepthi Date: Fri, 17 Jul 2020 10:02:32 -0700 Subject: [PATCH 9/9] use existing func, add more unit tests to cover all errors Signed-off-by: deepthi --- go/vt/discovery/tablet_picker.go | 18 +++++---------- go/vt/discovery/tablet_picker_test.go | 33 +++++++++++++++------------ 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index cd89c9dc222..6b760c42f02 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -58,11 +58,13 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp }, nil } -// PickForStreaming picks all healthy tablets including the non-serving ones. +// PickForStreaming picks an available tablet +// All tablets that belong to tp.cells are evaluated and one is +// chosen at random func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { candidates := tp.getAllTablets(ctx) if len(candidates) == 0 { - return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for %v %v %v", tp.cells, tp.keyspace, tp.shard) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for cells:%v, keyspace/shard:%v/%v, tablet types:%v", tp.cells, tp.keyspace, tp.shard, tp.tabletTypes) } for { idx := 0 @@ -81,7 +83,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - if !tabletTypeIn(ti.Tablet.Type, tp.tabletTypes) { + if !topoproto.IsTypeInList(ti.Tablet.Type, tp.tabletTypes) { // tablet is not of one of the desired types continue } @@ -99,17 +101,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table _ = conn.Close(ctx) return ti.Tablet, nil } - return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes) + return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for keyspace/shard:%v/%v tablet types:%v", tp.keyspace, tp.shard, tp.tabletTypes) } -func tabletTypeIn(tabletType topodatapb.TabletType, tabletTypes []topodatapb.TabletType) bool { - for _, t := range tabletTypes { - if tabletType == t { - return true - } - } - return false -} func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { // Special handling for MASTER tablet type // Since there is only one master, we ignore cell and find the master diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 554310d87da..76b30703545 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -277,13 +277,18 @@ func TestPickUsingCellAlias(t *testing.T) { func TestPickError(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) - defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", false, false)) - _, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype") assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - _, err = NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "no tablets available for cells:[cell], keyspace/shard:ks/0, tablet types:[REPLICA RDONLY]") + defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_REPLICA, "cell", false, false)) + _, err = tp.PickForStreaming(ctx) + require.EqualError(t, err, "can't find any healthy source tablet for keyspace/shard:ks/0 tablet types:[REPLICA RDONLY]") } type pickerTestEnv struct { @@ -334,19 +339,17 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) - var herr string - if !healthy { - herr = "err" + if healthy { + _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, + }) } - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: herr}, - }) return tablet }