Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff tablet selection: pick non-serving tablets in Reshard workflows #14413

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 45 additions & 34 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -137,6 +138,7 @@ type TabletPicker struct {
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
options TabletPickerOptions
}

// NewTabletPicker returns a TabletPicker.
Expand Down Expand Up @@ -232,6 +234,7 @@ func NewTabletPicker(
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
options: options,
}

for _, ignoreTablet := range ignoreTablets {
Expand Down Expand Up @@ -292,6 +295,40 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

func (tp *TabletPicker) sortCandidates(ctx context.Context, candidates []*topo.TabletInfo) []*topo.TabletInfo {
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
return candidates
}

// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
Expand All @@ -305,36 +342,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
candidates = tp.sortCandidates(ctx, candidates)
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
Expand All @@ -349,7 +357,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
log.Infof("Tablet picker found a healthy tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}
Expand Down Expand Up @@ -443,7 +451,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
71 changes: 71 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
contextTimeout = 5 * time.Second
numTestIterations = 50
)

func TestPickPrimary(t *testing.T) {
defer utils.EnsureNoLeaks(t)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
Expand Down Expand Up @@ -591,6 +596,72 @@ func TestPickFallbackType(t *testing.T) {
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

// TestPickNonServingTablets validates that non serving tablets are included when the
// IncludeNonServingTablets option is set. Unhealthy tablets should not be picked, irrespective of this option.
func TestPickNonServingTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)

cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,primary"
options := TabletPickerOptions{}
te := newPickerTestEnv(t, ctx, cells)

// Tablet should be selected as it is healthy and serving.
primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(ctx, te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

// Tablet should be selected because the IncludeNonServingTablets option is set and it is healthy.
replicaTablet2 := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, localCell, false, true)
defer deleteTablet(t, te, replicaTablet2)

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, contextTimeout)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
// IncludeNonServingTablets is false: only the healthy serving tablet should be picked.
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)

options.IncludeNonServingTablets = true
tp, err = NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx3, cancel3 := context.WithTimeout(ctx, contextTimeout)
defer cancel3()
var picked1, picked2, picked3 bool
// IncludeNonServingTablets is true: both the healthy tablets should be picked even though one is not serving.
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx3)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
picked1 = true
}
if proto.Equal(tablet, replicaTablet) {
picked2 = true
}
if proto.Equal(tablet, replicaTablet2) {
picked3 = true
}
}
assert.True(t, picked1)
assert.False(t, picked2)
assert.True(t, picked3)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type controller struct {
id int64 // id from row in _vt.vdiff
uuid string
workflow string
workflowType binlogdatapb.VReplicationWorkflowType
cancel context.CancelFunc
dbClientFactory func() binlogplayer.DBClient
ts *topo.Server
Expand Down Expand Up @@ -227,6 +228,12 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
ct.sourceKeyspace = bls.Keyspace
ct.filter = bls.Filter
}

workflowType, err := row["workflow_type"].ToInt64()
if err != nil {
return err
}
ct.workflowType = binlogdatapb.VReplicationWorkflowType(workflowType)
}

if err := ct.validate(); err != nil {
Expand Down
22 changes: 17 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,13 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
}
sourceTopoServer = extTS
}
tabletPickerOptions := discovery.TabletPickerOptions{}
wg.Add(1)
go func() {
defer wg.Done()
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
sourceTablet, err := td.pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.sourceKeyspace,
source.shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions)
if err != nil {
return err
}
Expand All @@ -245,8 +247,15 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if td.wd.ct.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
// For resharding, the target shards could be non-serving if traffic has already been switched once.
// When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched
// it is set to false for the shards we are switching from. We don't have a way to know if we have
// switched or not, so we just include non-serving tablets for all reshards.
tabletPickerOptions.IncludeNonServingTablets = true
}
targetTablet, targetErr = td.pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions)
if targetErr != nil {
return
}
Expand All @@ -263,8 +272,11 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace,
shard, tabletTypes string, options discovery.TabletPickerOptions) (*topodatapb.Tablet, error) {

tp, err := discovery.NewTabletPicker(ctx, ts, cells, td.wd.ct.vde.thisTablet.Alias.Cell, keyspace,
shard, tabletTypes, options)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {

log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
log.Errorf("INTERNAL: unable to setState() in controller. Attempting to set error text: [%v]; setState() error is: %v", err, errSetState)
log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err)
return err // yes, err and not errSetState.
}
log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
return nil // this will cause vreplicate to quit the workflow
}
return err
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
if ts.ExternalTopo() != nil {
sourceTopo = ts.ExternalTopo()
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell,
df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return err
}
Expand All @@ -827,8 +828,18 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
wg.Add(1)
go func() {
defer wg.Done()
includeNonServingTablets := false
if df.ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
// For resharding, the target shards could be non-serving if traffic has already been switched once.
// When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched
// it is set to false for the shards we are switching from. We don't have a way to know if we have
// switched or not, so we just include non-serving tablets for all reshards.
includeNonServingTablets = true
}
err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error {
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell,
df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr,
discovery.TabletPickerOptions{IncludeNonServingTablets: includeNonServingTablets})
if err != nil {
return err
}
Expand Down
Loading