From 220e4db9c8ed53649a79cfb5914f414c6efcfb7b Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 8 Mar 2021 18:01:58 -0800 Subject: [PATCH] Backport some panic protection during ERS (#196) Backport some panic protection during ERS This takes the core of the change from https://github.com/vitessio/vitess/pull/7486 and backports it into 8.0. Signed-off-by: Richard Bailey --- go/vt/wrangler/reparent.go | 81 ++++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index e9dc22a0eb5..72f98dcbe32 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -976,6 +976,9 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout) defer replCancel() + replSuccessCtx, replSuccessCancel := context.WithCancel(context.Background()) + allReplicasDoneCtx, allReplicasDoneCancel := context.WithCancel(context.Background()) + // Reset replication on all replicas to point to the new master, and // insert test row in the new master. // Go through all the tablets: @@ -983,35 +986,68 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events // - everybody else: reparent to new master, wait for row event.DispatchUpdate(ev, "reparenting all tablets") now := time.Now().UnixNano() - errChan = make(chan error) + replWg := sync.WaitGroup{} + // we will reuse the concurrency.AllErrorRecorder for the actual reparent + // starting here because we've validated above that there were zero errors + // up to this point handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { wr.logger.Infof("populating reparent journal on new master %v", alias) return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) } handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { - var err error - defer func() { errChan <- err }() + defer replWg.Done() wr.logger.Infof("setting new master on replica %v", alias) forceStart := false if status, ok := statusMap[alias]; ok { - forceStart = replicaWasRunning(status) + fs, err := replicaWasRunning(status) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v could not determine StopReplicationStatus: %v", alias, err) + rec.RecordError(err) + + return + } + + forceStart = fs } - err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) + err := wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) if err != nil { err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) + rec.RecordError(err) + return } + + // Signal that at least one goroutine succeeded to SetMaster. + replSuccessCancel() } + numReplicas := 0 + for alias, tabletInfo := range tabletMap { if alias == newMasterTabletAliasStr { continue } else if !ignoredTablets.Has(alias) { + replWg.Add(1) + numReplicas++ go handleReplica(alias, tabletInfo) } } + // Spin up a background goroutine to wait until all replica goroutines + // finished. Polling this way allows us to have promoteNewPrimary return + // success as soon as (a) the primary successfully populates its reparent + // journal and (b) at least one replica successfully begins replicating. + // + // If we were to follow the more common pattern of blocking on replWg.Wait() + // in the main body of promoteNewPrimary, we would be bound to the + // time of slowest replica, instead of the time of the fastest successful + // replica, and we want ERS to be fast. + go func() { + replWg.Wait() + allReplicasDoneCancel() + }() + masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) if masterErr != nil { wr.logger.Warningf("master failed to PopulateReparentJournal") @@ -1019,7 +1055,29 @@ func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr) } - return nil + select { + case <-replSuccessCtx.Done(): + // At least one replica was able to SetMaster successfully + return nil + case <-allReplicasDoneCtx.Done(): + // There are certain timing issues between replSuccessCtx.Done firing + // and allReplicasDoneCtx.Done firing, so we check again if truly all + // replicas failed (where `numReplicas` goroutines recorded an error) or + // one or more actually managed to succeed. + errCount := len(rec.Errors) + + switch { + case errCount > numReplicas: + // Technically, rec.Errors should never be greater than numReplicas, + // but it's better to err on the side of caution here, but also + // we're going to be explicit that this is doubly unexpected. + return vterrors.Wrapf(rec.Error(), "received more errors (= %d) than replicas (= %d), which should be impossible: %v", errCount, numReplicas, rec.Error()) + case errCount == numReplicas: + return vterrors.Wrapf(rec.Error(), "%d replica(s) failed: %v", numReplicas, rec.Error()) + default: + return nil + } + } } // waitOnNMinusOneTablets will wait until N-1 tablets have responded via a supplied error channel. In that case that N-1 tablets have responded, @@ -1233,6 +1291,13 @@ func (wr *Wrangler) TabletExternallyReparented(ctx context.Context, newMasterAli return nil } -func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) bool { - return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning +// replicaWasRunning returns true if a StopReplicationStatus indicates that the +// replica had running replication threads before being stopped. It returns an +// error if the Before state of replication is nil. +func replicaWasRunning(stopReplicationStatus *replicationdatapb.StopReplicationStatus) (bool, error) { + if stopReplicationStatus == nil || stopReplicationStatus.Before == nil { + return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not determine Before state of StopReplicationStatus %v", stopReplicationStatus) + } + + return stopReplicationStatus.Before.IoThreadRunning || stopReplicationStatus.Before.SqlThreadRunning, nil }