Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 111 additions & 52 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"iter"
"time"

"github.com/10gen/migration-verifier/chanutil"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mmongo/cursor"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/v2/bson"
Expand All @@ -30,6 +32,11 @@ const (
docKeyInHashedCompare = "k"
)

type seqWithTs struct {
seq iter.Seq2[bson.Raw, error]
ts bson.Timestamp
}

type docWithTs struct {
doc bson.Raw
ts bson.Timestamp
Expand All @@ -45,7 +52,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
types.ByteCount,
error,
) {
var srcChannel, dstChannel <-chan docWithTs
var srcChannel, dstChannel <-chan seqWithTs
var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error

results := []VerificationResult{}
Expand Down Expand Up @@ -100,7 +107,7 @@ func (verifier *Verifier) compareDocsFromChannels(
workerNum int,
fi *retry.FuncInfo,
task *VerificationTask,
srcChannel, dstChannel <-chan docWithTs,
srcChannel, dstChannel <-chan seqWithTs,
) (
[]VerificationResult,
types.DocumentCount,
Expand Down Expand Up @@ -204,7 +211,7 @@ func (verifier *Verifier) compareDocsFromChannels(
for !srcClosed || !dstClosed {
simpleTimerReset(readTimer, readTimeout)

var srcDocWithTs, dstDocWithTs docWithTs
var srcDocsWithTs, dstDocsWithTs seqWithTs

eg, egCtx := contextplus.ErrGroup(ctx)

Expand All @@ -219,21 +226,13 @@ func (verifier *Verifier) compareDocsFromChannels(
"failed to read from source after %s",
readTimeout,
)
case srcDocWithTs, alive = <-srcChannel:
case srcDocsWithTs, alive = <-srcChannel:
if !alive {
srcClosed = true
break
}

fi.NoteSuccess("received document from source")

srcDocCount++
srcByteCount += types.ByteCount(len(srcDocWithTs.doc))
verifier.workerTracker.SetSrcCounts(
workerNum,
srcDocCount,
srcByteCount,
)
}

return nil
Expand All @@ -251,7 +250,7 @@ func (verifier *Verifier) compareDocsFromChannels(
"failed to read from destination after %s",
readTimeout,
)
case dstDocWithTs, alive = <-dstChannel:
case dstDocsWithTs, alive = <-dstChannel:
if !alive {
dstClosed = true
break
Expand All @@ -271,32 +270,72 @@ func (verifier *Verifier) compareDocsFromChannels(
)
}

if srcDocWithTs.doc != nil {
err := handleNewDoc(srcDocWithTs, true)
if srcDocsWithTs.seq != nil {
for doc, err := range srcDocsWithTs.seq {
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"reading batch of docs from source (task: %s)",
task.PrimaryKey,
)
}

if err != nil {
srcDocCount++
srcByteCount += types.ByteCount(len(doc))
verifier.workerTracker.SetSrcCounts(
workerNum,
srcDocCount,
srcByteCount,
)

return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
srcDocWithTs.doc.Lookup("_id"),
err := handleNewDoc(
docWithTs{
doc: doc,
ts: srcDocsWithTs.ts,
},
true,
)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
doc.Lookup("_id"),
)
}
}

}

if dstDocWithTs.doc != nil {
err := handleNewDoc(dstDocWithTs, false)
if dstDocsWithTs.seq != nil {
for doc, err := range dstDocsWithTs.seq {
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"reading batch of docs from destination (task: %s)",
task.PrimaryKey,
)
}

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
dstDocWithTs.doc.Lookup("_id"),
err := handleNewDoc(
docWithTs{
doc: doc,
ts: dstDocsWithTs.ts,
},
false,
)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
doc.Lookup("_id"),
)
}
}
}
}
Expand Down Expand Up @@ -427,13 +466,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) {
func (verifier *Verifier) getFetcherChannelsAndCallbacks(
task *VerificationTask,
) (
<-chan docWithTs,
<-chan docWithTs,
<-chan seqWithTs,
<-chan seqWithTs,
func(context.Context, *retry.FuncInfo) error,
func(context.Context, *retry.FuncInfo) error,
) {
srcChannel := make(chan docWithTs)
dstChannel := make(chan docWithTs)
srcChannel := make(chan seqWithTs)
dstChannel := make(chan seqWithTs)

readSrcCallback := func(ctx context.Context, state *retry.FuncInfo) error {
// We open a session here so that we can read the session’s cluster
Expand Down Expand Up @@ -510,38 +549,44 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
}

func iterateCursorToChannel(
sctx context.Context,
ctx context.Context,
state *retry.FuncInfo,
cursor *mongo.Cursor,
writer chan<- docWithTs,
myCursor *cursor.BatchCursor,
writer chan<- seqWithTs,
) error {
defer close(writer)

sess := mongo.SessionFromContext(sctx)
for {
seq := myCursor.GetCurrentBatchIterator()

for cursor.Next(sctx) {
state.NoteSuccess("received a document")

clusterTime, err := util.GetClusterTimeFromSession(sess)
ct, err := myCursor.GetClusterTime()
if err != nil {
return errors.Wrap(err, "reading cluster time from session")
return errors.Wrap(err, "reading cluster time from batch")
}

err = chanutil.WriteWithDoneCheck(
sctx,
ctx,
writer,
docWithTs{
doc: slices.Clone(cursor.Current),
ts: clusterTime,
seqWithTs{
seq: seq,
ts: ct,
},
)

if err != nil {
return errors.Wrapf(err, "sending document to compare thread")
return errors.Wrapf(err, "sending iterator to compare thread")
}

if myCursor.IsFinished() {
return nil
}
}

return errors.Wrap(cursor.Err(), "failed to iterate cursor")
if err := myCursor.GetNext(ctx); err != nil {
return errors.Wrap(err, "failed to iterate cursor")
}
}
}

func getMapKey(docKeyValues []bson.RawValue) string {
Expand All @@ -555,8 +600,13 @@ func getMapKey(docKeyValues []bson.RawValue) string {
return keyBuffer.String()
}

func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
func (verifier *Verifier) getDocumentsCursor(
ctx context.Context,
collection *mongo.Collection,
clusterInfo *util.ClusterInfo,
startAtTs *bson.Timestamp,
task *VerificationTask,
) (*cursor.BatchCursor, error) {
var findOptions bson.D
runCommandOptions := options.RunCmd()
var andPredicates bson.A
Expand Down Expand Up @@ -673,7 +723,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
}
}

return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions)
c, err := cursor.New(
collection.Database(),
collection.Database().RunCommand(ctx, cmd, runCommandOptions),
)

if err == nil {
c.SetSession(mongo.SessionFromContext(ctx))
}
Comment on lines +731 to +733
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why SetSession is exported and called in the verifier code? Can we set the session in the cursor constructor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it that way for a bit, but then we’d need 2 constructors (one with a session & the other without).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m starting to wonder, though, if it should always require a session. The driver does some funny-business under the hood to ensure that cursors always send the same LSID as the initial request.


return c, err
}

func transformPipelineForToHashedIndexKey(
Expand Down
13 changes: 8 additions & 5 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mseq"
"github.com/10gen/migration-verifier/mslices"
"github.com/cespare/permute/v2"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -1150,13 +1151,15 @@ func TestVerifierCompareDocs(t *testing.T) {

namespace := "testdb.testns"

makeDocChannel := func(docs []bson.D) <-chan docWithTs {
theChan := make(chan docWithTs, len(docs))
makeDocChannel := func(docs []bson.D) <-chan seqWithTs {
theChan := make(chan seqWithTs, len(docs))

for d, doc := range docs {
theChan <- docWithTs{
doc: testutil.MustMarshal(doc),
ts: bson.Timestamp{1, uint32(d)},
theChan <- seqWithTs{
seq: mseq.FromSliceWithNilErr(
mslices.Of(testutil.MustMarshal(doc)),
),
ts: bson.Timestamp{1, uint32(d)},
}
}

Expand Down
Loading