diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index de020478..3e786b38 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -150,6 +150,14 @@ func (verifier *Verifier) compareDocsFromChannels( byteCount += types.ByteCount(len(doc)) err = handleNewDoc(doc, true) + + if err != nil { + err = errors.Wrapf( + err, + "comparer thread failed to handle source doc with ID %v", + doc.Lookup("_id"), + ) + } } } @@ -164,10 +172,22 @@ func (verifier *Verifier) compareDocsFromChannels( } err = handleNewDoc(doc, false) + + if err != nil { + err = errors.Wrapf( + err, + "comparer thread failed to handle destination doc with ID %v", + doc.Lookup("_id"), + ) + } } } } + if err != nil { + return nil, 0, 0, errors.Wrap(err, "comparer thread failed") + } + // We got here because both srcChannel and dstChannel are closed, // which means we have processed all documents with the same mapKey // between source & destination. @@ -225,7 +245,15 @@ func (verifier *Verifier) getFetcherChannels( ) if err == nil { - err = iterateCursorToChannel(ctx, cursor, srcChannel) + err = errors.Wrap( + iterateCursorToChannel(ctx, cursor, srcChannel), + "failed to read source documents", + ) + } else { + err = errors.Wrap( + err, + "failed to find source documents", + ) } return err @@ -241,7 +269,15 @@ func (verifier *Verifier) getFetcherChannels( ) if err == nil { - err = iterateCursorToChannel(ctx, cursor, dstChannel) + err = errors.Wrap( + iterateCursorToChannel(ctx, cursor, dstChannel), + "failed to read destination documents", + ) + } else { + err = errors.Wrap( + err, + "failed to find destination documents", + ) } return err diff --git a/internal/verifier/unit_test_util.go b/internal/verifier/unit_test_util.go index 31db1674..806bc3ff 100644 --- a/internal/verifier/unit_test_util.go +++ b/internal/verifier/unit_test_util.go @@ -123,7 +123,7 @@ func (suite *WithMongodsTestSuite) startReplSets() { }, } err = directClient.Database("admin").RunCommand(ctx, command).Err() - suite.Require().NoError(err) + suite.Require().NoError(err, "should initiate replication") } }