Skip to content

Commit

Permalink
partial backport VDiff tablet selection: pick non-serving tablets in …
Browse files Browse the repository at this point in the history
…Reshard workflows vitessio#14413

Signed-off-by: Vilius Okockis <vilius.okockis@vinted.com>
  • Loading branch information
DeathBorn committed Apr 23, 2024
1 parent 2ecbf0b commit 6efcd7b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 37 deletions.
81 changes: 46 additions & 35 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -140,6 +141,7 @@ type TabletPicker struct {
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
options TabletPickerOptions
}

// NewTabletPicker returns a TabletPicker.
Expand Down Expand Up @@ -232,6 +234,7 @@ func NewTabletPicker(
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
options: options,
}

for _, ignoreTablet := range ignoreTablets {
Expand Down Expand Up @@ -291,11 +294,45 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

func (tp *TabletPicker) sortCandidates(ctx context.Context, candidates []*topo.TabletInfo) []*topo.TabletInfo {
rand.Seed(time.Now().UnixNano())
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
return candidates
}

// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// Keep trying at intervals (tabletPickerRetryDelay) until a healthy
// serving tablet is found or the context is cancelled.
for {
Expand All @@ -305,36 +342,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
candidates = tp.sortCandidates(ctx, candidates)
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
Expand All @@ -349,7 +357,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
log.Infof("Tablet picker found a healthy tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}
Expand Down Expand Up @@ -442,7 +450,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
69 changes: 69 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
"vitess.io/vitess/go/vt/topo/memorytopo"
)

const (
contextTimeout = 5 * time.Second
numTestIterations = 50
)

func TestPickPrimary(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want := addTablet(te, 100, topodatapb.TabletType_MASTER, "cell", true, true)
Expand Down Expand Up @@ -541,6 +546,70 @@ func TestPickFallbackType(t *testing.T) {
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

// TestPickNonServingTablets validates that non serving tablets are included when the
// IncludeNonServingTablets option is set. Unhealthy tablets should not be picked, irrespective of this option.
func TestPickNonServingTablets(t *testing.T) {
cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,master"
options := TabletPickerOptions{}
te := newPickerTestEnv(t, cells)

// Tablet should be selected as it is healthy and serving.
primaryTablet := addTablet(te, 100, topodatapb.TabletType_MASTER, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

// Tablet should be selected because the IncludeNonServingTablets option is set and it is healthy.
replicaTablet2 := addTablet(te, 300, topodatapb.TabletType_REPLICA, localCell, false, true)
defer deleteTablet(t, te, replicaTablet2)

var cancel context.CancelFunc
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.MasterAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, contextTimeout)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
// IncludeNonServingTablets is false: only the healthy serving tablet should be picked.
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)

options.IncludeNonServingTablets = true
tp, err = NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx3, cancel3 := context.WithTimeout(ctx, contextTimeout)
defer cancel3()
var picked1, picked2, picked3 bool
// IncludeNonServingTablets is true: both the healthy tablets should be picked even though one is not serving.
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx3)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
picked1 = true
}
if proto.Equal(tablet, replicaTablet) {
picked2 = true
}
if proto.Equal(tablet, replicaTablet2) {
picked3 = true
}
}
assert.True(t, picked1)
assert.False(t, picked2)
assert.True(t, picked3)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,8 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
if ts.ExternalTopo() != nil {
sourceTopo = ts.ExternalTopo()
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell,
df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return err
}
Expand All @@ -589,8 +590,18 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
wg.Add(1)
go func() {
defer wg.Done()
includeNonServingTablets := false
if df.ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
// For resharding, the target shards could be non-serving if traffic has already been switched once.
// When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched
// it is set to false for the shards we are switching from. We don't have a way to know if we have
// switched or not, so we just include non-serving tablets for all reshards.
includeNonServingTablets = true
}
err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error {
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell,
df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr,
discovery.TabletPickerOptions{IncludeNonServingTablets: includeNonServingTablets})
if err != nil {
return err
}
Expand Down

0 comments on commit 6efcd7b

Please sign in to comment.