From 7bb7dfe30fbad1385a135eb4741655c52a694c07 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 13:31:39 -0500 Subject: [PATCH 1/4] fix retry on doc-read failure --- internal/retry/retry.go | 4 ++++ internal/retry/retryer.go | 22 ++++++++++++++------ internal/verifier/compare.go | 39 ++++++++++++++++++++---------------- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 362d3f00..f0fed999 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -74,6 +74,10 @@ func (r *Retryer) runRetryLoop( sleepTime := minSleepTime for { + if beforeFunc, hasBefore := r.before.Get(); hasBefore { + beforeFunc() + } + eg, egCtx := errgroup.WithContext(ctx) for i, curFunc := range funcs { diff --git a/internal/retry/retryer.go b/internal/retry/retryer.go index 15ba15d9..c8165c82 100644 --- a/internal/retry/retryer.go +++ b/internal/retry/retryer.go @@ -2,24 +2,27 @@ package retry import ( "time" + + "github.com/10gen/migration-verifier/option" ) // Retryer handles retrying operations that fail because of network failures. type Retryer struct { retryLimit time.Duration retryRandomly bool + before option.Option[func()] additionalErrorCodes []int } // New returns a new retryer. -func New(retryLimit time.Duration) Retryer { +func New(retryLimit time.Duration) *Retryer { return NewWithRandomlyRetries(retryLimit, false) } // NewWithRandomlyRetries returns a new retryer, but allows the option of setting the // retryRandomly field. -func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) Retryer { - return Retryer{ +func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) *Retryer { + return &Retryer{ retryLimit: retryLimit, retryRandomly: retryRandomly, } @@ -29,9 +32,16 @@ func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) Retrye // this method. This allows for a single function to customize the codes it // wants to retry on. Note that if the Retryer already has additional custom // error codes set, these are _replaced_ when this method is called. -func (r Retryer) WithErrorCodes(codes ...int) Retryer { - r2 := r +func (r *Retryer) WithErrorCodes(codes ...int) *Retryer { + r2 := *r r2.additionalErrorCodes = codes - return r2 + return &r2 +} + +func (r *Retryer) WithBefore(todo func()) *Retryer { + r2 := *r + r2.before = option.Some(todo) + + return &r2 } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index e619255c..16ba9d95 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -25,7 +25,8 @@ func (verifier *Verifier) FetchAndCompareDocuments( types.ByteCount, error, ) { - srcChannel, dstChannel, readSrcCallback, readDstCallback := verifier.getFetcherChannelsAndCallbacks(task) + var srcChannel, dstChannel <-chan bson.Raw + var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error results := []VerificationResult{} var docCount types.DocumentCount @@ -33,23 +34,27 @@ func (verifier *Verifier) FetchAndCompareDocuments( retryer := retry.New(retry.DefaultDurationLimit) - err := retryer.Run( - givenCtx, - verifier.logger, - readSrcCallback, - readDstCallback, - func(ctx context.Context, _ *retry.FuncInfo) error { - var err error - results, docCount, byteCount, err = verifier.compareDocsFromChannels( - ctx, - task, - srcChannel, - dstChannel, - ) + err := retryer. + WithBefore(func() { + srcChannel, dstChannel, readSrcCallback, readDstCallback = verifier.getFetcherChannelsAndCallbacks(task) + }). + Run( + givenCtx, + verifier.logger, + readSrcCallback, + readDstCallback, + func(ctx context.Context, _ *retry.FuncInfo) error { + var err error + results, docCount, byteCount, err = verifier.compareDocsFromChannels( + ctx, + task, + srcChannel, + dstChannel, + ) - return err - }, - ) + return err + }, + ) return results, docCount, byteCount, err } From 006d3f9970fc20a1546c075bf05d983f79ba3a9f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 14:23:22 -0500 Subject: [PATCH 2/4] pointers --- internal/partitions/partitions.go | 6 +++--- internal/uuidutil/get_uuid.go | 4 ++-- internal/verifier/mongos_refresh.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/partitions/partitions.go b/internal/partitions/partitions.go index 278f875b..01419808 100644 --- a/internal/partitions/partitions.go +++ b/internal/partitions/partitions.go @@ -119,7 +119,7 @@ const ( func PartitionCollectionWithSize( ctx context.Context, uuidEntry *uuidutil.NamespaceAndUUID, - retryer retry.Retryer, + retryer *retry.Retryer, srcClient *mongo.Client, replicatorList []Replicator, subLogger *logger.Logger, @@ -137,7 +137,7 @@ func PartitionCollectionWithSize( partitions, docCount, byteCount, err := PartitionCollectionWithParameters( ctx, uuidEntry, - &retryer, + retryer, srcClient, replicatorList, defaultSampleRate, @@ -153,7 +153,7 @@ func PartitionCollectionWithSize( return PartitionCollectionWithParameters( ctx, uuidEntry, - &retryer, + retryer, srcClient, replicatorList, defaultSampleRate, diff --git a/internal/uuidutil/get_uuid.go b/internal/uuidutil/get_uuid.go index a8a7e576..86996933 100644 --- a/internal/uuidutil/get_uuid.go +++ b/internal/uuidutil/get_uuid.go @@ -27,7 +27,7 @@ type NamespaceAndUUID struct { CollName string } -func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, retryer retry.Retryer, db *mongo.Database, collName string) (*NamespaceAndUUID, error) { +func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*NamespaceAndUUID, error) { binaryUUID, uuidErr := GetCollectionUUID(ctx, logger, retryer, db, collName) if uuidErr != nil { return nil, uuidErr @@ -39,7 +39,7 @@ func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, r }, nil } -func GetCollectionUUID(ctx context.Context, logger *logger.Logger, retryer retry.Retryer, db *mongo.Database, collName string) (*primitive.Binary, error) { +func GetCollectionUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*primitive.Binary, error) { filter := bson.D{{"name", collName}} opts := options.ListCollections().SetNameOnly(false) diff --git a/internal/verifier/mongos_refresh.go b/internal/verifier/mongos_refresh.go index 5b12d6aa..ec8fa70f 100644 --- a/internal/verifier/mongos_refresh.go +++ b/internal/verifier/mongos_refresh.go @@ -137,7 +137,7 @@ func RefreshAllMongosInstances( func getAnyExistingShardConnectionStr( ctx context.Context, l *logger.Logger, - r retry.Retryer, + r *retry.Retryer, client *mongo.Client, ) (string, error) { res, err := runListShards(ctx, l, r, client) @@ -169,7 +169,7 @@ func getAnyExistingShardConnectionStr( func runListShards( ctx context.Context, l *logger.Logger, - r retry.Retryer, + r *retry.Retryer, client *mongo.Client, ) (*mongo.SingleResult, error) { var res *mongo.SingleResult From e21f98fe88af0f0ac28e451ef605662f1b48f066 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 15:25:49 -0500 Subject: [PATCH 3/4] panics & fix --- internal/retry/retry.go | 13 +++++++++++-- internal/verifier/compare.go | 8 ++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index f0fed999..832b4cf6 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -44,9 +44,9 @@ type RetryCallback = func(context.Context, *FuncInfo) error // This returns an error if the duration limit is reached, or if f() returns a // non-transient error. func (r *Retryer) Run( - ctx context.Context, logger *logger.Logger, f ...RetryCallback, + ctx context.Context, logger *logger.Logger, funcs ...RetryCallback, ) error { - return r.runRetryLoop(ctx, logger, f) + return r.runRetryLoop(ctx, logger, funcs) } // runRetryLoop contains the core logic for the retry loops. @@ -80,8 +80,17 @@ func (r *Retryer) runRetryLoop( eg, egCtx := errgroup.WithContext(ctx) for i, curFunc := range funcs { + if curFunc == nil { + panic("curFunc should be non-nil") + } eg.Go(func() error { + if curFunc == nil { + panic("curFunc should be non-nil") + } + if funcinfos[i] == nil { + panic(fmt.Sprintf("funcinfos[%d] should be non-nil", i)) + } err := curFunc(egCtx, funcinfos[i]) if err != nil { diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 16ba9d95..802d0bae 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -41,8 +41,12 @@ func (verifier *Verifier) FetchAndCompareDocuments( Run( givenCtx, verifier.logger, - readSrcCallback, - readDstCallback, + func(ctx context.Context, fi *retry.FuncInfo) error { + return readSrcCallback(ctx, fi) + }, + func(ctx context.Context, fi *retry.FuncInfo) error { + return readDstCallback(ctx, fi) + }, func(ctx context.Context, _ *retry.FuncInfo) error { var err error results, docCount, byteCount, err = verifier.compareDocsFromChannels( From 201ffe87e0e4c7a14ea0ca31609d4fd217427fce Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 19:29:36 -0500 Subject: [PATCH 4/4] move panics out --- internal/retry/retry.go | 9 +++------ internal/retry/retryer.go | 5 +++++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 832b4cf6..470da5b5 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -83,14 +83,11 @@ func (r *Retryer) runRetryLoop( if curFunc == nil { panic("curFunc should be non-nil") } + if funcinfos[i] == nil { + panic(fmt.Sprintf("funcinfos[%d] should be non-nil", i)) + } eg.Go(func() error { - if curFunc == nil { - panic("curFunc should be non-nil") - } - if funcinfos[i] == nil { - panic(fmt.Sprintf("funcinfos[%d] should be non-nil", i)) - } err := curFunc(egCtx, funcinfos[i]) if err != nil { diff --git a/internal/retry/retryer.go b/internal/retry/retryer.go index c8165c82..6c269113 100644 --- a/internal/retry/retryer.go +++ b/internal/retry/retryer.go @@ -39,6 +39,11 @@ func (r *Retryer) WithErrorCodes(codes ...int) *Retryer { return &r2 } +// WithBefore sets a callback that always runs before any retryer callback. +// +// This is useful if there are multiple callbacks and you need to reset some +// condition before each retryer iteration. (In the single-callback case it’s +// largely redundant.) func (r *Retryer) WithBefore(todo func()) *Retryer { r2 := *r r2.before = option.Some(todo)