Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/reportutils/reportutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,6 @@ func FindBestUnit[T num16Plus](count T) DataUnit {
// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit.
// Use it to format a single count of bytes.
func FmtBytes[T num16Plus](count T) string {
return BytesToUnit(count, FindBestUnit(count))
unit := FindBestUnit(count)
return BytesToUnit(count, unit) + " " + string(unit)
}
11 changes: 5 additions & 6 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
err = nil
}

if err != nil {
if err == nil {
verifier.logger.Debug().
Int("generation", generation).
Msgf("Check finished.")
Expand Down Expand Up @@ -432,11 +432,6 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
duration := verifier.workerSleepDelayMillis * time.Millisecond

if duration > 0 {
verifier.logger.Debug().
Int("workerNum", workerNum).
Stringer("duration", duration).
Msg("No tasks found. Sleeping.")

time.Sleep(duration)
}

Expand All @@ -453,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
switch task.Type {
case verificationTaskVerifyCollection:
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
verifier.workerTracker.Unset(workerNum)

if err != nil {
return err
}
Expand All @@ -464,6 +461,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
}
case verificationTaskVerifyDocuments:
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
verifier.workerTracker.Unset(workerNum)

if err != nil {
return err
}
Expand Down
128 changes: 78 additions & 50 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels(
error,
) {
results := []VerificationResult{}
var docCount types.DocumentCount
var byteCount types.ByteCount
var srcDocCount types.DocumentCount
var srcByteCount types.ByteCount

mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
mapKeyFieldNames[0] = "_id"
Expand Down Expand Up @@ -142,67 +142,95 @@ func (verifier *Verifier) compareDocsFromChannels(
}
}()

// We always read src & dst back & forth. This ensures that, if one side
// We always read src & dst together. This ensures that, if one side
// lags the other significantly, we won’t keep caching the faster side’s
// documents and thus consume more & more memory.
for !srcClosed || !dstClosed {
simpleTimerReset(readTimer, readTimeout)

var srcDoc, dstDoc bson.Raw

eg, egCtx := errgroup.WithContext(ctx)

if !srcClosed {
simpleTimerReset(readTimer, readTimeout)

select {
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case <-readTimer.C:
return nil, 0, 0, errors.Errorf(
"failed to read from source after %s",
readTimeout,
)
case doc, alive := <-srcChannel:
if !alive {
srcClosed = true
break
eg.Go(func() error {
var alive bool
select {
case <-egCtx.Done():
return egCtx.Err()
case <-readTimer.C:
return errors.Errorf(
"failed to read from source after %s",
readTimeout,
)
case srcDoc, alive = <-srcChannel:
if !alive {
srcClosed = true
break
}

srcDocCount++
srcByteCount += types.ByteCount(len(srcDoc))
}
docCount++
byteCount += types.ByteCount(len(doc))

err := handleNewDoc(doc, true)
return nil
})
}

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle source doc with ID %v",
doc.Lookup("_id"),
if !dstClosed {
eg.Go(func() error {
var alive bool
select {
case <-egCtx.Done():
return egCtx.Err()
case <-readTimer.C:
return errors.Errorf(
"failed to read from destination after %s",
readTimeout,
)
case dstDoc, alive = <-dstChannel:
if !alive {
dstClosed = true
break
}
}
}

return nil
})
}

if !dstClosed {
simpleTimerReset(readTimer, readTimeout)

select {
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case <-readTimer.C:
return nil, 0, 0, errors.Errorf(
"failed to read from destination after %s",
readTimeout,
if err := eg.Wait(); err != nil {
return nil, 0, 0, errors.Wrap(
err,
"failed to read documents",
)
}

if srcDoc != nil {
err := handleNewDoc(srcDoc, true)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
srcDoc.Lookup("_id"),
)
case doc, alive := <-dstChannel:
if !alive {
dstClosed = true
break
}
}
}

err := handleNewDoc(doc, false)
if dstDoc != nil {
err := handleNewDoc(dstDoc, false)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle destination doc with ID %v",
doc.Lookup("_id"),
)
}
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
dstDoc.Lookup("_id"),
)
}
}
}
Expand Down Expand Up @@ -243,7 +271,7 @@ func (verifier *Verifier) compareDocsFromChannels(
)
}

return results, docCount, byteCount, nil
return results, srcDocCount, srcByteCount, nil
}

func simpleTimerReset(t *time.Timer, dur time.Duration) {
Expand Down
49 changes: 28 additions & 21 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
}
}
findCmd := append(bson.D{{"find", collection.Name()}}, findOptions...)
verifier.logger.Debug().
Interface("task", task.PrimaryKey).
Str("findCmd", fmt.Sprintf("%s", findCmd)).
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
Msg("getDocuments findCmd.")

// Suppress this log for recheck tasks because the list of IDs can be
// quite long.
if len(task.Ids) == 0 {
verifier.logger.Debug().
Interface("task", task.PrimaryKey).
Str("findCmd", fmt.Sprintf("%s", findCmd)).
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
Msg("getDocuments findCmd.")
}

return collection.Database().RunCommandCursor(ctx, findCmd, runCommandOptions)
}
Expand Down Expand Up @@ -595,16 +600,6 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
Interface("task", task.PrimaryKey).
Msg("Processing document comparison task.")

defer func() {
elapsed := time.Since(start)

verifier.logger.Debug().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Stringer("timeElapsed", elapsed).
Msg("Finished document comparison task.")
}()

problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(
ctx,
task,
Expand Down Expand Up @@ -681,12 +676,24 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
}
}

return errors.Wrapf(
verifier.UpdateVerificationTask(ctx, task),
"failed to persist task %s's new status (%#q)",
task.PrimaryKey,
task.Status,
)
err = verifier.UpdateVerificationTask(ctx, task)

if err != nil {
return errors.Wrapf(
err,
"failed to persist task %s's new status (%#q)",
task.PrimaryKey,
task.Status,
)
}

verifier.logger.Debug().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Stringer("timeElapsed", time.Since(start)).
Msg("Finished document comparison task.")

return nil
}

func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) {
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestVerifierCompareDocs(t *testing.T) {
{{"_id", id}, {"sharded", 123}},
},
compareFn: func(t *testing.T, mismatchedIds []VerificationResult) {
assert.Empty(t, mismatchedIds)
assert.Empty(t, mismatchedIds, "should be no problems")
},
},
}
Expand Down Expand Up @@ -1491,7 +1491,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
status := waitForTasks()
suite.Require().Greater(status.CompletedTasks, 1)
suite.Require().Greater(status.TotalTasks, 1)
suite.Require().Equal(status.FailedTasks, 0)
suite.Require().Zero(status.FailedTasks, "there should be no failed tasks")

// Insert another document that is not in the filter.
// This should trigger a recheck despite being outside the filter.
Expand Down