Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Backport some panic protection during ERS (#196)
Browse files Browse the repository at this point in the history
Backport some panic protection during ERS

This takes the core of the change from vitessio#7486 and backports it into 8.0.

Signed-off-by: Richard Bailey <rbailey@slack-corp.com>
  • Loading branch information
setassociative committed Mar 9, 2021
1 parent f8a19a5 commit 220e4db
Showing 1 changed file with 73 additions and 8 deletions.
81 changes: 73 additions & 8 deletions go/vt/wrangler/reparent.go
Expand Up @@ -976,50 +976,108 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events
replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout)
defer replCancel()

replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background())
allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background())

// Reset replication on all replicas to point to the new master, and
// insert test row in the new master.
// Go through all the tablets:
// - new master: populate the reparent journal
// - everybody else: reparent to new master, wait for row
event.DispatchUpdate(ev, "reparenting all tablets")
now := time.Now().UnixNano()
errChan = make(chan error)
replWg := sync.WaitGroup{}
// we will reuse the concurrency.AllErrorRecorder for the actual reparent
// starting here because we've validated above that there were zero errors
// up to this point

handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error {
wr.logger.Infof("populating reparent journal on new master %v", alias)
return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp)
}
handleReplica := func(alias string, tabletInfo *topo.TabletInfo) {
var err error
defer func() { errChan <- err }()
defer replWg.Done()

wr.logger.Infof("setting new master on replica %v", alias)
forceStart := false
if status, ok := statusMap[alias]; ok {
forceStart = replicaWasRunning(status)
fs, err := replicaWasRunning(status)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err)
rec.RecordError(err)

return
}

forceStart = fs
}
err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart)
err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err)
rec.RecordError(err)
return
}

// Signal that at least one goroutine succeeded to SetMaster.
replSuccessCancel()
}

numReplicas := 0

for alias, tabletInfo := range tabletMap {
if alias == newMasterTabletAliasStr {
continue
} else if !ignoredTablets.Has(alias) {
replWg.Add(1)
numReplicas++
go handleReplica(alias, tabletInfo)
}
}

// Spin up a background goroutine to wait until all replica goroutines
// finished. Polling this way allows us to have promoteNewPrimary return
// success as soon as (a) the primary successfully populates its reparent
// journal and (b) at least one replica successfully begins replicating.
//
// If we were to follow the more common pattern of blocking on replWg.Wait()
// in the main body of promoteNewPrimary, we would be bound to the
// time of slowest replica, instead of the time of the fastest successful
// replica, and we want ERS to be fast.
go func() {
replWg.Wait()
allReplicasDoneCancel()
}()

masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr])
if masterErr != nil {
wr.logger.Warningf("master failed to PopulateReparentJournal")
replCancel()
return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr)
}

return nil
select {
case <-replSuccessCtx.Done():
// At least one replica was able to SetMaster successfully
return nil
case <-allReplicasDoneCtx.Done():
// There are certain timing issues between replSuccessCtx.Done firing
// and allReplicasDoneCtx.Done firing, so we check again if truly all
// replicas failed (where `numReplicas` goroutines recorded an error) or
// one or more actually managed to succeed.
errCount := len(rec.Errors)

switch {
case errCount > numReplicas:
// Technically, rec.Errors should never be greater than numReplicas,
// but it's better to err on the side of caution here, but also
// we're going to be explicit that this is doubly unexpected.
return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error())
case errCount == numReplicas:
return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error())
default:
return nil
}
}
}

// waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded,
Expand Down Expand Up @@ -1233,6 +1291,13 @@ func (wr *Wrangler) TabletExternallyReparented(ctx context.Context, newMasterAli
return nil
}

func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) bool {
return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning
// replicaWasRunning returns true if a StopReplicationStatus indicates that the
// replica had running replication threads before being stopped. It returns an
// error if the Before state of replication is nil.
func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) (bool, error) {
if stopReplicationStatus == nil || stopReplicationStatus.Before == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopReplicationStatus)
}

return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning, nil
}

0 comments on commit 220e4db

Please sign in to comment.