From 313f9e6d3055315da7bdca979e2401de6ba1559f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 14 Nov 2024 13:51:54 -0500 Subject: [PATCH 1/2] save --- internal/verifier/compare.go | 93 ++++++++++++++++++++++------- internal/verifier/unit_test_util.go | 2 +- 2 files changed, 72 insertions(+), 23 deletions(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index de020478..3d2fb698 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -4,6 +4,7 @@ import ( "bytes" "context" + "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -21,31 +22,43 @@ func (verifier *Verifier) FetchAndCompareDocuments( types.ByteCount, error, ) { - // This function spawns three threads: one to read from the source, - // another to read from the destination, and a third one to receive the - // docs from the other 2 threads and compare them. It’s done this way, - // rather than fetch-everything-then-compare, to minimize memory usage. - errGroup, ctx := errgroup.WithContext(givenCtx) - - srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task) - - results := []VerificationResult{} + var results []VerificationResult var docCount types.DocumentCount var byteCount types.ByteCount - errGroup.Go(func() error { - var err error - results, docCount, byteCount, err = verifier.compareDocsFromChannels( - ctx, - task, - srcChannel, - dstChannel, - ) + retryer := retry.New(retry.DefaultDurationLimit) - return err - }) + err := retryer.RunForTransientErrorsOnly( + givenCtx, + verifier.logger, + func(_ *retry.Info) error { + results = []VerificationResult{} + docCount = 0 + byteCount = 0 + + // This function spawns three threads: one to read from the source, + // another to read from the destination, and a third one to receive the + // docs from the other 2 threads and compare them. It’s done this way, + // rather than fetch-everything-then-compare, to minimize memory usage. + errGroup, ctx := errgroup.WithContext(givenCtx) + + srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task) - err := errGroup.Wait() + errGroup.Go(func() error { + var err error + results, docCount, byteCount, err = verifier.compareDocsFromChannels( + ctx, + task, + srcChannel, + dstChannel, + ) + + return err + }) + + return errGroup.Wait() + }, + ) return results, docCount, byteCount, err } @@ -150,6 +163,14 @@ func (verifier *Verifier) compareDocsFromChannels( byteCount += types.ByteCount(len(doc)) err = handleNewDoc(doc, true) + + if err != nil { + err = errors.Wrapf( + err, + "comparer thread failed to handle source doc with ID %v", + doc.Lookup("_id"), + ) + } } } @@ -164,10 +185,22 @@ func (verifier *Verifier) compareDocsFromChannels( } err = handleNewDoc(doc, false) + + if err != nil { + err = errors.Wrapf( + err, + "comparer thread failed to handle destination doc with ID %v", + doc.Lookup("_id"), + ) + } } } } + if err != nil { + return nil, 0, 0, errors.Wrap(err, "comparer thread failed") + } + // We got here because both srcChannel and dstChannel are closed, // which means we have processed all documents with the same mapKey // between source & destination. @@ -225,7 +258,15 @@ func (verifier *Verifier) getFetcherChannels( ) if err == nil { - err = iterateCursorToChannel(ctx, cursor, srcChannel) + err = errors.Wrap( + iterateCursorToChannel(ctx, cursor, srcChannel), + "failed to read source documents", + ) + } else { + err = errors.Wrap( + err, + "failed to find source documents", + ) } return err @@ -241,7 +282,15 @@ func (verifier *Verifier) getFetcherChannels( ) if err == nil { - err = iterateCursorToChannel(ctx, cursor, dstChannel) + err = errors.Wrap( + iterateCursorToChannel(ctx, cursor, dstChannel), + "failed to read destination documents", + ) + } else { + err = errors.Wrap( + err, + "failed to find destination documents", + ) } return err diff --git a/internal/verifier/unit_test_util.go b/internal/verifier/unit_test_util.go index 31db1674..806bc3ff 100644 --- a/internal/verifier/unit_test_util.go +++ b/internal/verifier/unit_test_util.go @@ -123,7 +123,7 @@ func (suite *WithMongodsTestSuite) startReplSets() { }, } err = directClient.Database("admin").RunCommand(ctx, command).Err() - suite.Require().NoError(err) + suite.Require().NoError(err, "should initiate replication") } } From 5c035cf8ed0e04ea894e2329e74ae4f673987aab Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 14 Nov 2024 14:21:44 -0500 Subject: [PATCH 2/2] check for errors --- internal/verifier/compare.go | 57 ++++++++++++++---------------------- 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 3d2fb698..3e786b38 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -4,7 +4,6 @@ import ( "bytes" "context" - "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -22,43 +21,31 @@ func (verifier *Verifier) FetchAndCompareDocuments( types.ByteCount, error, ) { - var results []VerificationResult + // This function spawns three threads: one to read from the source, + // another to read from the destination, and a third one to receive the + // docs from the other 2 threads and compare them. It’s done this way, + // rather than fetch-everything-then-compare, to minimize memory usage. + errGroup, ctx := errgroup.WithContext(givenCtx) + + srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task) + + results := []VerificationResult{} var docCount types.DocumentCount var byteCount types.ByteCount - retryer := retry.New(retry.DefaultDurationLimit) - - err := retryer.RunForTransientErrorsOnly( - givenCtx, - verifier.logger, - func(_ *retry.Info) error { - results = []VerificationResult{} - docCount = 0 - byteCount = 0 - - // This function spawns three threads: one to read from the source, - // another to read from the destination, and a third one to receive the - // docs from the other 2 threads and compare them. It’s done this way, - // rather than fetch-everything-then-compare, to minimize memory usage. - errGroup, ctx := errgroup.WithContext(givenCtx) - - srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task) - - errGroup.Go(func() error { - var err error - results, docCount, byteCount, err = verifier.compareDocsFromChannels( - ctx, - task, - srcChannel, - dstChannel, - ) - - return err - }) - - return errGroup.Wait() - }, - ) + errGroup.Go(func() error { + var err error + results, docCount, byteCount, err = verifier.compareDocsFromChannels( + ctx, + task, + srcChannel, + dstChannel, + ) + + return err + }) + + err := errGroup.Wait() return results, docCount, byteCount, err }