Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,30 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection,

func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.WaitGroup) {
defer wg.Done()
verifier.logger.Debug().Msgf("[Worker %d] Started", workerNum)

verifier.logger.Debug().
Int("workerNum", workerNum).
Msg("Worker started.")

defer verifier.logger.Debug().
Int("workerNum", workerNum).
Msg("Worker finished.")

for {
select {
case <-ctx.Done():
return
default:
task, err := verifier.FindNextVerifyTaskAndUpdate()
if errors.Is(err, mongo.ErrNoDocuments) {
verifier.logger.Debug().Msgf("[Worker %d] No tasks found, sleeping...", workerNum)
time.Sleep(verifier.workerSleepDelayMillis * time.Millisecond)
duration := verifier.workerSleepDelayMillis * time.Millisecond

verifier.logger.Debug().
Int("workerNum", workerNum).
Stringer("duration", duration).
Msg("No tasks found. Sleeping.")

time.Sleep(duration)
continue
} else if err != nil {
panic(err)
Expand Down
53 changes: 45 additions & 8 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,11 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
}
}
findCmd := append(bson.D{{"find", collection.Name()}}, findOptions...)
verifier.logger.Debug().Msgf("getDocuments findCmd: %s opts: %v", findCmd, *runCommandOptions)
verifier.logger.Debug().
Interface("task", task.PrimaryKey).
Str("findCmd", fmt.Sprintf("%s", findCmd)).
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
Msg("getDocuments findCmd.")

return collection.Database().RunCommandCursor(ctx, findCmd, runCommandOptions)
}
Expand Down Expand Up @@ -540,7 +544,22 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw
}

func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTask) {
verifier.logger.Debug().Msgf("[Worker %d] Processing verify task", workerNum)
start := time.Now()

verifier.logger.Debug().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Msg("Processing document comparison task.")

defer func() {
elapsed := time.Since(start)

verifier.logger.Debug().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Stringer("timeElapsed", elapsed).
Msg("Finished document comparison task.")
}()

problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(
context.Background(),
Expand All @@ -549,7 +568,11 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas

if err != nil {
task.Status = verificationTaskFailed
verifier.logger.Error().Msgf("[Worker %d] Error comparing docs: %+v", workerNum, err)
verifier.logger.Error().
Err(err).
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Msg("Failed to fetch and compare documents for document comparison task.")
} else {
task.SourceDocumentCount = docsCount
task.SourceByteCount = bytesCount
Expand All @@ -560,10 +583,15 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas
task.Status = verificationTaskFailed
// We know we won't change lastGeneration while verification tasks are running, so no mutex needed here.
if verifier.lastGeneration {
verifier.logger.Error().Msgf("[Worker %d] Verification Task %+v failed critical section and is a true error",
workerNum, task.PrimaryKey)
verifier.logger.Error().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Msg("Document comparison task failed critical section and is a true error.")
} else {
verifier.logger.Debug().Msgf("[Worker %d] Verification Task %+v failed, may pass next generation", workerNum, task.PrimaryKey)
verifier.logger.Debug().
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Msg("Document comparison task failed, but it may pass in the next generation.")

var mismatches []VerificationResult
var missingIds []interface{}
Expand Down Expand Up @@ -594,15 +622,24 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas
// mismatched & missing docs.
err := verifier.InsertFailedCompareRecheckDocs(task.QueryFilter.Namespace, idsToRecheck, dataSizes)
if err != nil {
verifier.logger.Error().Msgf("[Worker %d] Error inserting document mismatch into Recheck queue: %+v", workerNum, err)
verifier.logger.Error().
Err(err).
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Int("rechecksCount", len(idsToRecheck)).
Msg("Failed to enqueue rechecks after document mismatches.")
}
}
}
}

err = verifier.UpdateVerificationTask(task)
if err != nil {
verifier.logger.Error().Msgf("Failed updating verification status: %v", err)
verifier.logger.Error().
Err(err).
Int("workerNum", workerNum).
Interface("task", task.PrimaryKey).
Msg("Failed to update task status.")
}
}

Expand Down
Loading