Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 52 additions & 111 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"iter"
"time"

"github.com/10gen/migration-verifier/chanutil"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mmongo/cursor"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/v2/bson"
Expand All @@ -32,11 +30,6 @@ const (
docKeyInHashedCompare = "k"
)

type seqWithTs struct {
seq iter.Seq2[bson.Raw, error]
ts bson.Timestamp
}

type docWithTs struct {
doc bson.Raw
ts bson.Timestamp
Expand All @@ -52,7 +45,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
types.ByteCount,
error,
) {
var srcChannel, dstChannel <-chan seqWithTs
var srcChannel, dstChannel <-chan docWithTs
var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error

results := []VerificationResult{}
Expand Down Expand Up @@ -107,7 +100,7 @@ func (verifier *Verifier) compareDocsFromChannels(
workerNum int,
fi *retry.FuncInfo,
task *VerificationTask,
srcChannel, dstChannel <-chan seqWithTs,
srcChannel, dstChannel <-chan docWithTs,
) (
[]VerificationResult,
types.DocumentCount,
Expand Down Expand Up @@ -211,7 +204,7 @@ func (verifier *Verifier) compareDocsFromChannels(
for !srcClosed || !dstClosed {
simpleTimerReset(readTimer, readTimeout)

var srcDocsWithTs, dstDocsWithTs seqWithTs
var srcDocWithTs, dstDocWithTs docWithTs

eg, egCtx := contextplus.ErrGroup(ctx)

Expand All @@ -226,13 +219,21 @@ func (verifier *Verifier) compareDocsFromChannels(
"failed to read from source after %s",
readTimeout,
)
case srcDocsWithTs, alive = <-srcChannel:
case srcDocWithTs, alive = <-srcChannel:
if !alive {
srcClosed = true
break
}

fi.NoteSuccess("received document from source")

srcDocCount++
srcByteCount += types.ByteCount(len(srcDocWithTs.doc))
verifier.workerTracker.SetSrcCounts(
workerNum,
srcDocCount,
srcByteCount,
)
}

return nil
Expand All @@ -250,7 +251,7 @@ func (verifier *Verifier) compareDocsFromChannels(
"failed to read from destination after %s",
readTimeout,
)
case dstDocsWithTs, alive = <-dstChannel:
case dstDocWithTs, alive = <-dstChannel:
if !alive {
dstClosed = true
break
Expand All @@ -270,72 +271,32 @@ func (verifier *Verifier) compareDocsFromChannels(
)
}

if srcDocsWithTs.seq != nil {
for doc, err := range srcDocsWithTs.seq {
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"reading batch of docs from source (task: %s)",
task.PrimaryKey,
)
}
if srcDocWithTs.doc != nil {
err := handleNewDoc(srcDocWithTs, true)

srcDocCount++
srcByteCount += types.ByteCount(len(doc))
verifier.workerTracker.SetSrcCounts(
workerNum,
srcDocCount,
srcByteCount,
)
if err != nil {

err := handleNewDoc(
docWithTs{
doc: doc,
ts: srcDocsWithTs.ts,
},
true,
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
srcDocWithTs.doc.Lookup("_id"),
)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
doc.Lookup("_id"),
)
}
}

}

if dstDocsWithTs.seq != nil {
for doc, err := range dstDocsWithTs.seq {
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"reading batch of docs from destination (task: %s)",
task.PrimaryKey,
)
}
if dstDocWithTs.doc != nil {
err := handleNewDoc(dstDocWithTs, false)

err := handleNewDoc(
docWithTs{
doc: doc,
ts: dstDocsWithTs.ts,
},
false,
if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
dstDocWithTs.doc.Lookup("_id"),
)

if err != nil {
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
namespace,
task.PrimaryKey,
doc.Lookup("_id"),
)
}
}
}
}
Expand Down Expand Up @@ -466,13 +427,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) {
func (verifier *Verifier) getFetcherChannelsAndCallbacks(
task *VerificationTask,
) (
<-chan seqWithTs,
<-chan seqWithTs,
<-chan docWithTs,
<-chan docWithTs,
func(context.Context, *retry.FuncInfo) error,
func(context.Context, *retry.FuncInfo) error,
) {
srcChannel := make(chan seqWithTs)
dstChannel := make(chan seqWithTs)
srcChannel := make(chan docWithTs)
dstChannel := make(chan docWithTs)

readSrcCallback := func(ctx context.Context, state *retry.FuncInfo) error {
// We open a session here so that we can read the session’s cluster
Expand Down Expand Up @@ -549,44 +510,38 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
}

func iterateCursorToChannel(
ctx context.Context,
sctx context.Context,
state *retry.FuncInfo,
myCursor *cursor.BatchCursor,
writer chan<- seqWithTs,
cursor *mongo.Cursor,
writer chan<- docWithTs,
) error {
defer close(writer)

for {
seq := myCursor.GetCurrentBatchIterator()
sess := mongo.SessionFromContext(sctx)

for cursor.Next(sctx) {
state.NoteSuccess("received a document")

ct, err := myCursor.GetClusterTime()
clusterTime, err := util.GetClusterTimeFromSession(sess)
if err != nil {
return errors.Wrap(err, "reading cluster time from batch")
return errors.Wrap(err, "reading cluster time from session")
}

err = chanutil.WriteWithDoneCheck(
ctx,
sctx,
writer,
seqWithTs{
seq: seq,
ts: ct,
docWithTs{
doc: slices.Clone(cursor.Current),
ts: clusterTime,
},
)

if err != nil {
return errors.Wrapf(err, "sending iterator to compare thread")
}

if myCursor.IsFinished() {
return nil
}

if err := myCursor.GetNext(ctx); err != nil {
return errors.Wrap(err, "failed to iterate cursor")
return errors.Wrapf(err, "sending document to compare thread")
}
}

return errors.Wrap(cursor.Err(), "failed to iterate cursor")
}

func getMapKey(docKeyValues []bson.RawValue) string {
Expand All @@ -600,13 +555,8 @@ func getMapKey(docKeyValues []bson.RawValue) string {
return keyBuffer.String()
}

func (verifier *Verifier) getDocumentsCursor(
ctx context.Context,
collection *mongo.Collection,
clusterInfo *util.ClusterInfo,
startAtTs *bson.Timestamp,
task *VerificationTask,
) (*cursor.BatchCursor, error) {
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
var findOptions bson.D
runCommandOptions := options.RunCmd()
var andPredicates bson.A
Expand Down Expand Up @@ -723,16 +673,7 @@ func (verifier *Verifier) getDocumentsCursor(
}
}

c, err := cursor.New(
collection.Database(),
collection.Database().RunCommand(ctx, cmd, runCommandOptions),
)

if err == nil {
c.SetSession(mongo.SessionFromContext(ctx))
}

return c, err
return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions)
}

func transformPipelineForToHashedIndexKey(
Expand Down
13 changes: 5 additions & 8 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mseq"
"github.com/10gen/migration-verifier/mslices"
"github.com/cespare/permute/v2"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -1151,15 +1150,13 @@ func TestVerifierCompareDocs(t *testing.T) {

namespace := "testdb.testns"

makeDocChannel := func(docs []bson.D) <-chan seqWithTs {
theChan := make(chan seqWithTs, len(docs))
makeDocChannel := func(docs []bson.D) <-chan docWithTs {
theChan := make(chan docWithTs, len(docs))

for d, doc := range docs {
theChan <- seqWithTs{
seq: mseq.FromSliceWithNilErr(
mslices.Of(testutil.MustMarshal(doc)),
),
ts: bson.Timestamp{1, uint32(d)},
theChan <- docWithTs{
doc: testutil.MustMarshal(doc),
ts: bson.Timestamp{1, uint32(d)},
}
}

Expand Down
Loading