diff --git a/internal/reportutils/reportutils.go b/internal/reportutils/reportutils.go index cb7b348d..e3e34d12 100644 --- a/internal/reportutils/reportutils.go +++ b/internal/reportutils/reportutils.go @@ -183,5 +183,6 @@ func FindBestUnit[T num16Plus](count T) DataUnit { // FmtBytes is a convenience that combines BytesToUnit with FindBestUnit. // Use it to format a single count of bytes. func FmtBytes[T num16Plus](count T) string { - return BytesToUnit(count, FindBestUnit(count)) + unit := FindBestUnit(count) + return BytesToUnit(count, unit) + " " + string(unit) } diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 425a90d2..d65fd955 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -146,7 +146,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { err = nil } - if err != nil { + if err == nil { verifier.logger.Debug(). Int("generation", generation). Msgf("Check finished.") @@ -432,11 +432,6 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { duration := verifier.workerSleepDelayMillis * time.Millisecond if duration > 0 { - verifier.logger.Debug(). - Int("workerNum", workerNum). - Stringer("duration", duration). - Msg("No tasks found. Sleeping.") - time.Sleep(duration) } @@ -453,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { switch task.Type { case verificationTaskVerifyCollection: err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task) + verifier.workerTracker.Unset(workerNum) + if err != nil { return err } @@ -464,6 +461,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } case verificationTaskVerifyDocuments: err := verifier.ProcessVerifyTask(ctx, workerNum, task) + verifier.workerTracker.Unset(workerNum) + if err != nil { return err } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index d6a50324..f4d852e8 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels( error, ) { results := []VerificationResult{} - var docCount types.DocumentCount - var byteCount types.ByteCount + var srcDocCount types.DocumentCount + var srcByteCount types.ByteCount mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys)) mapKeyFieldNames[0] = "_id" @@ -142,67 +142,95 @@ func (verifier *Verifier) compareDocsFromChannels( } }() - // We always read src & dst back & forth. This ensures that, if one side + // We always read src & dst together. This ensures that, if one side // lags the other significantly, we won’t keep caching the faster side’s // documents and thus consume more & more memory. for !srcClosed || !dstClosed { + simpleTimerReset(readTimer, readTimeout) + + var srcDoc, dstDoc bson.Raw + + eg, egCtx := errgroup.WithContext(ctx) + if !srcClosed { - simpleTimerReset(readTimer, readTimeout) - - select { - case <-ctx.Done(): - return nil, 0, 0, ctx.Err() - case <-readTimer.C: - return nil, 0, 0, errors.Errorf( - "failed to read from source after %s", - readTimeout, - ) - case doc, alive := <-srcChannel: - if !alive { - srcClosed = true - break + eg.Go(func() error { + var alive bool + select { + case <-egCtx.Done(): + return egCtx.Err() + case <-readTimer.C: + return errors.Errorf( + "failed to read from source after %s", + readTimeout, + ) + case srcDoc, alive = <-srcChannel: + if !alive { + srcClosed = true + break + } + + srcDocCount++ + srcByteCount += types.ByteCount(len(srcDoc)) } - docCount++ - byteCount += types.ByteCount(len(doc)) - err := handleNewDoc(doc, true) + return nil + }) + } - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle source doc with ID %v", - doc.Lookup("_id"), + if !dstClosed { + eg.Go(func() error { + var alive bool + select { + case <-egCtx.Done(): + return egCtx.Err() + case <-readTimer.C: + return errors.Errorf( + "failed to read from destination after %s", + readTimeout, ) + case dstDoc, alive = <-dstChannel: + if !alive { + dstClosed = true + break + } } - } + + return nil + }) } - if !dstClosed { - simpleTimerReset(readTimer, readTimeout) - - select { - case <-ctx.Done(): - return nil, 0, 0, ctx.Err() - case <-readTimer.C: - return nil, 0, 0, errors.Errorf( - "failed to read from destination after %s", - readTimeout, + if err := eg.Wait(); err != nil { + return nil, 0, 0, errors.Wrap( + err, + "failed to read documents", + ) + } + + if srcDoc != nil { + err := handleNewDoc(srcDoc, true) + + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's source doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + srcDoc.Lookup("_id"), ) - case doc, alive := <-dstChannel: - if !alive { - dstClosed = true - break - } + } + } - err := handleNewDoc(doc, false) + if dstDoc != nil { + err := handleNewDoc(dstDoc, false) - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle destination doc with ID %v", - doc.Lookup("_id"), - ) - } + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + dstDoc.Lookup("_id"), + ) } } } @@ -243,7 +271,7 @@ func (verifier *Verifier) compareDocsFromChannels( ) } - return results, docCount, byteCount, nil + return results, srcDocCount, srcByteCount, nil } func simpleTimerReset(t *time.Timer, dur time.Duration) { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 9b056a51..2fc8fe23 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -503,11 +503,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo } } findCmd := append(bson.D{{"find", collection.Name()}}, findOptions...) - verifier.logger.Debug(). - Interface("task", task.PrimaryKey). - Str("findCmd", fmt.Sprintf("%s", findCmd)). - Str("options", fmt.Sprintf("%v", *runCommandOptions)). - Msg("getDocuments findCmd.") + + // Suppress this log for recheck tasks because the list of IDs can be + // quite long. + if len(task.Ids) == 0 { + verifier.logger.Debug(). + Interface("task", task.PrimaryKey). + Str("findCmd", fmt.Sprintf("%s", findCmd)). + Str("options", fmt.Sprintf("%v", *runCommandOptions)). + Msg("getDocuments findCmd.") + } return collection.Database().RunCommandCursor(ctx, findCmd, runCommandOptions) } @@ -595,16 +600,6 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, Interface("task", task.PrimaryKey). Msg("Processing document comparison task.") - defer func() { - elapsed := time.Since(start) - - verifier.logger.Debug(). - Int("workerNum", workerNum). - Interface("task", task.PrimaryKey). - Stringer("timeElapsed", elapsed). - Msg("Finished document comparison task.") - }() - problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments( ctx, task, @@ -681,12 +676,24 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, } } - return errors.Wrapf( - verifier.UpdateVerificationTask(ctx, task), - "failed to persist task %s's new status (%#q)", - task.PrimaryKey, - task.Status, - ) + err = verifier.UpdateVerificationTask(ctx, task) + + if err != nil { + return errors.Wrapf( + err, + "failed to persist task %s's new status (%#q)", + task.PrimaryKey, + task.Status, + ) + } + + verifier.logger.Debug(). + Int("workerNum", workerNum). + Interface("task", task.PrimaryKey). + Stringer("timeElapsed", time.Since(start)). + Msg("Finished document comparison task.") + + return nil } func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) { diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index a8fae37d..5c319434 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -574,7 +574,7 @@ func TestVerifierCompareDocs(t *testing.T) { {{"_id", id}, {"sharded", 123}}, }, compareFn: func(t *testing.T, mismatchedIds []VerificationResult) { - assert.Empty(t, mismatchedIds) + assert.Empty(t, mismatchedIds, "should be no problems") }, }, } @@ -1491,7 +1491,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { status := waitForTasks() suite.Require().Greater(status.CompletedTasks, 1) suite.Require().Greater(status.TotalTasks, 1) - suite.Require().Equal(status.FailedTasks, 0) + suite.Require().Zero(status.FailedTasks, "there should be no failed tasks") // Insert another document that is not in the filter. // This should trigger a recheck despite being outside the filter.