From fa5afd267c95c5ca11d3b358ca0e3d43a37d4dbc Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 3 Nov 2025 20:36:46 -0500 Subject: [PATCH] Revert "REP-6766 Compare documents using a batch cursor (#148)" This reverts commit d98f02c5a865f46f74bcf304deaf14ef186825b6. --- internal/verifier/compare.go | 163 ++++-------- internal/verifier/migration_verifier_test.go | 13 +- mmongo/cursor/batch.go | 252 ------------------- mseq/mseq.go | 29 --- 4 files changed, 57 insertions(+), 400 deletions(-) delete mode 100644 mmongo/cursor/batch.go delete mode 100644 mseq/mseq.go diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 8c97d43b..7cfcfa7a 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "iter" "time" "github.com/10gen/migration-verifier/chanutil" @@ -13,7 +12,6 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" - "github.com/10gen/migration-verifier/mmongo/cursor" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/v2/bson" @@ -32,11 +30,6 @@ const ( docKeyInHashedCompare = "k" ) -type seqWithTs struct { - seq iter.Seq2[bson.Raw, error] - ts bson.Timestamp -} - type docWithTs struct { doc bson.Raw ts bson.Timestamp @@ -52,7 +45,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( types.ByteCount, error, ) { - var srcChannel, dstChannel <-chan seqWithTs + var srcChannel, dstChannel <-chan docWithTs var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error results := []VerificationResult{} @@ -107,7 +100,7 @@ func (verifier *Verifier) compareDocsFromChannels( workerNum int, fi *retry.FuncInfo, task *VerificationTask, - srcChannel, dstChannel <-chan seqWithTs, + srcChannel, dstChannel <-chan docWithTs, ) ( []VerificationResult, types.DocumentCount, @@ -211,7 +204,7 @@ func (verifier *Verifier) compareDocsFromChannels( for !srcClosed || !dstClosed { simpleTimerReset(readTimer, readTimeout) - var srcDocsWithTs, dstDocsWithTs seqWithTs + var srcDocWithTs, dstDocWithTs docWithTs eg, egCtx := contextplus.ErrGroup(ctx) @@ -226,13 +219,21 @@ func (verifier *Verifier) compareDocsFromChannels( "failed to read from source after %s", readTimeout, ) - case srcDocsWithTs, alive = <-srcChannel: + case srcDocWithTs, alive = <-srcChannel: if !alive { srcClosed = true break } fi.NoteSuccess("received document from source") + + srcDocCount++ + srcByteCount += types.ByteCount(len(srcDocWithTs.doc)) + verifier.workerTracker.SetSrcCounts( + workerNum, + srcDocCount, + srcByteCount, + ) } return nil @@ -250,7 +251,7 @@ func (verifier *Verifier) compareDocsFromChannels( "failed to read from destination after %s", readTimeout, ) - case dstDocsWithTs, alive = <-dstChannel: + case dstDocWithTs, alive = <-dstChannel: if !alive { dstClosed = true break @@ -270,72 +271,32 @@ func (verifier *Verifier) compareDocsFromChannels( ) } - if srcDocsWithTs.seq != nil { - for doc, err := range srcDocsWithTs.seq { - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "reading batch of docs from source (task: %s)", - task.PrimaryKey, - ) - } + if srcDocWithTs.doc != nil { + err := handleNewDoc(srcDocWithTs, true) - srcDocCount++ - srcByteCount += types.ByteCount(len(doc)) - verifier.workerTracker.SetSrcCounts( - workerNum, - srcDocCount, - srcByteCount, - ) + if err != nil { - err := handleNewDoc( - docWithTs{ - doc: doc, - ts: srcDocsWithTs.ts, - }, - true, + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's source doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + srcDocWithTs.doc.Lookup("_id"), ) - - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle %#q's source doc (task: %s) with ID %v", - namespace, - task.PrimaryKey, - doc.Lookup("_id"), - ) - } } - } - if dstDocsWithTs.seq != nil { - for doc, err := range dstDocsWithTs.seq { - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "reading batch of docs from destination (task: %s)", - task.PrimaryKey, - ) - } + if dstDocWithTs.doc != nil { + err := handleNewDoc(dstDocWithTs, false) - err := handleNewDoc( - docWithTs{ - doc: doc, - ts: dstDocsWithTs.ts, - }, - false, + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + dstDocWithTs.doc.Lookup("_id"), ) - - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v", - namespace, - task.PrimaryKey, - doc.Lookup("_id"), - ) - } } } } @@ -466,13 +427,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) { func (verifier *Verifier) getFetcherChannelsAndCallbacks( task *VerificationTask, ) ( - <-chan seqWithTs, - <-chan seqWithTs, + <-chan docWithTs, + <-chan docWithTs, func(context.Context, *retry.FuncInfo) error, func(context.Context, *retry.FuncInfo) error, ) { - srcChannel := make(chan seqWithTs) - dstChannel := make(chan seqWithTs) + srcChannel := make(chan docWithTs) + dstChannel := make(chan docWithTs) readSrcCallback := func(ctx context.Context, state *retry.FuncInfo) error { // We open a session here so that we can read the session’s cluster @@ -549,44 +510,38 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( } func iterateCursorToChannel( - ctx context.Context, + sctx context.Context, state *retry.FuncInfo, - myCursor *cursor.BatchCursor, - writer chan<- seqWithTs, + cursor *mongo.Cursor, + writer chan<- docWithTs, ) error { defer close(writer) - for { - seq := myCursor.GetCurrentBatchIterator() + sess := mongo.SessionFromContext(sctx) + for cursor.Next(sctx) { state.NoteSuccess("received a document") - ct, err := myCursor.GetClusterTime() + clusterTime, err := util.GetClusterTimeFromSession(sess) if err != nil { - return errors.Wrap(err, "reading cluster time from batch") + return errors.Wrap(err, "reading cluster time from session") } err = chanutil.WriteWithDoneCheck( - ctx, + sctx, writer, - seqWithTs{ - seq: seq, - ts: ct, + docWithTs{ + doc: slices.Clone(cursor.Current), + ts: clusterTime, }, ) if err != nil { - return errors.Wrapf(err, "sending iterator to compare thread") - } - - if myCursor.IsFinished() { - return nil - } - - if err := myCursor.GetNext(ctx); err != nil { - return errors.Wrap(err, "failed to iterate cursor") + return errors.Wrapf(err, "sending document to compare thread") } } + + return errors.Wrap(cursor.Err(), "failed to iterate cursor") } func getMapKey(docKeyValues []bson.RawValue) string { @@ -600,13 +555,8 @@ func getMapKey(docKeyValues []bson.RawValue) string { return keyBuffer.String() } -func (verifier *Verifier) getDocumentsCursor( - ctx context.Context, - collection *mongo.Collection, - clusterInfo *util.ClusterInfo, - startAtTs *bson.Timestamp, - task *VerificationTask, -) (*cursor.BatchCursor, error) { +func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, + startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() var andPredicates bson.A @@ -723,16 +673,7 @@ func (verifier *Verifier) getDocumentsCursor( } } - c, err := cursor.New( - collection.Database(), - collection.Database().RunCommand(ctx, cmd, runCommandOptions), - ) - - if err == nil { - c.SetSession(mongo.SessionFromContext(ctx)) - } - - return c, err + return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions) } func transformPipelineForToHashedIndexKey( diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index b4e35638..92ab6bf3 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -26,7 +26,6 @@ import ( "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mseq" "github.com/10gen/migration-verifier/mslices" "github.com/cespare/permute/v2" "github.com/rs/zerolog" @@ -1151,15 +1150,13 @@ func TestVerifierCompareDocs(t *testing.T) { namespace := "testdb.testns" - makeDocChannel := func(docs []bson.D) <-chan seqWithTs { - theChan := make(chan seqWithTs, len(docs)) + makeDocChannel := func(docs []bson.D) <-chan docWithTs { + theChan := make(chan docWithTs, len(docs)) for d, doc := range docs { - theChan <- seqWithTs{ - seq: mseq.FromSliceWithNilErr( - mslices.Of(testutil.MustMarshal(doc)), - ), - ts: bson.Timestamp{1, uint32(d)}, + theChan <- docWithTs{ + doc: testutil.MustMarshal(doc), + ts: bson.Timestamp{1, uint32(d)}, } } diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go deleted file mode 100644 index 3b900507..00000000 --- a/mmongo/cursor/batch.go +++ /dev/null @@ -1,252 +0,0 @@ -// Package cursor exposes a cursor implementation that facilitates easy -// batch reads as well as reading of custom cursor properties like -// resume tokens. -package cursor - -import ( - "context" - "fmt" - "io" - "iter" - "strings" - "time" - - "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/option" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/v2/bson" - "go.mongodb.org/mongo-driver/v2/mongo" - "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" -) - -var ( - clusterTimePath = mslices.Of("$clusterTime", "clusterTime") -) - -// BatchCursor is like mongo.Cursor, but it exposes documents per batch rather than -// a per-document reader. This facilitates more efficient memory usage -// because there is no need to clone each document individually, as -// mongo.Cursor requires. -type BatchCursor struct { - sess *mongo.Session - maxAwaitTime option.Option[time.Duration] - id int64 - ns string - db *mongo.Database - rawResp bson.Raw - curBatch bson.RawArray -} - -// GetCurrentBatchIterator returns an iterator over the BatchCursor’s current batch. -// Note that the iteratees are NOT copied; the expectation is that each batch -// will be iterated exactly once. (Nothing *requires* that, of course.) -func (c *BatchCursor) GetCurrentBatchIterator() iter.Seq2[bson.Raw, error] { - // NB: Use of iter.Seq2 to return an error is a bit controversial. - // The pattern is used here in order to minimize the odds that a caller - // would overlook the need to check the error, which seems more probable - // with various other patterns. - // - // See “https://sinclairtarget.com/blog/2025/07/error-handling-with-iterators-in-go/”. - - batch := c.curBatch - - // NB: This MUST NOT close around c (the receiver), or else there can be - // a race condition between this callback and GetNext(). - return func(yield func(bson.Raw, error) bool) { - iterator := &bsoncore.Iterator{ - List: bsoncore.Array(batch), - } - - for { - val, err := iterator.Next() - if errors.Is(err, io.EOF) { - return - } - - doc, ok := val.DocumentOK() - if !ok { - err = fmt.Errorf("expected BSON %s but found %s", bson.TypeEmbeddedDocument, val.Type) - } - - if !yield(bson.Raw(doc), err) { - return - } - - if err != nil { - panic(fmt.Sprintf("Iteration must stop after error (%v)", err)) - } - } - } -} - -// GetClusterTime returns the server response’s cluster time. -func (c *BatchCursor) GetClusterTime() (bson.Timestamp, error) { - ctRV, err := c.rawResp.LookupErr(clusterTimePath...) - - if err != nil { - return bson.Timestamp{}, errors.Wrapf( - err, - "extracting %#q from server response", - clusterTimePath, - ) - } - - ts, err := mbson.CastRawValue[bson.Timestamp](ctRV) - if err != nil { - return bson.Timestamp{}, errors.Wrapf( - err, - "parsing server response’s %#q", - clusterTimePath, - ) - } - - return ts, nil -} - -// IsFinished indicates whether the present batch is the final one. -func (c *BatchCursor) IsFinished() bool { - return c.id == 0 -} - -// GetNext fetches the next batch of responses from the server and caches it -// for access via GetCurrentBatch(). -// -// extraPieces are things you want to add to the underlying `getMore` -// server call, such as `batchSize`. -func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error { - if c.IsFinished() { - panic("internal error: cursor already finished!") - } - - nsDB, nsColl, found := strings.Cut(c.ns, ".") - if !found { - panic("Malformed namespace from cursor (expect a dot): " + c.ns) - } - if nsDB != c.db.Name() { - panic(fmt.Sprintf("db from cursor (%s) mismatches db struct (%s)", nsDB, c.db.Name())) - } - - cmd := bson.D{ - {"getMore", c.id}, - {"collection", nsColl}, - } - - if awaitTime, has := c.maxAwaitTime.Get(); has { - cmd = append(cmd, bson.E{"maxTimeMS", awaitTime.Milliseconds()}) - } - - cmd = append(cmd, extraPieces...) - - if c.sess != nil { - ctx = mongo.NewSessionContext(ctx, c.sess) - } - resp := c.db.RunCommand(ctx, cmd) - - raw, err := resp.Raw() - if err != nil { - return fmt.Errorf("iterating %#q’s cursor: %w", c.ns, err) - } - - nextBatch, err := raw.LookupErr("cursor", "nextBatch") - if err != nil { - return errors.Wrap(err, "extracting nextBatch") - } - - var ok bool - c.curBatch, ok = nextBatch.ArrayOK() - if !ok { - return fmt.Errorf("nextBatch should be BSON %s but found %s", bson.TypeArray, nextBatch.Type) - } - - c.rawResp = raw - - cursorID, err := raw.LookupErr("cursor", "id") - if err != nil { - return errors.Wrap(err, "extracting cursor ID") - } - - c.id, ok = cursorID.AsInt64OK() - if !ok { - return fmt.Errorf("cursor.id should be numeric but found BSON %s", cursorID.Type) - } - - return nil -} - -type cursorResponse struct { - ID int64 - Ns string - - // These are both BSON arrays. We use bson.Raw here to delay parsing - // and avoid allocating a large slice. - FirstBatch bson.Raw - NextBatch bson.Raw -} - -type baseResponse struct { - Cursor cursorResponse -} - -// New creates a Cursor from the response of a cursor-returning command -// like `find` or `bulkWrite`. -// -// Use this control (rather than the Go driver’s cursor implementation) -// to extract parts of the cursor responses that the driver’s API doesn’t -// expose. This is useful, e.g., to do a resumable $natural scan by -// extracting resume tokens from `find` responses. -// -// See NewWithSession() as well. -func New( - db *mongo.Database, - resp *mongo.SingleResult, -) (*BatchCursor, error) { - raw, err := resp.Raw() - if err != nil { - return nil, errors.Wrapf(err, "cursor open failed") - } - - baseResp := baseResponse{} - - err = bson.Unmarshal(raw, &baseResp) - if err != nil { - return nil, errors.Wrapf(err, "failed to decode cursor-open response to %T", baseResp) - } - - return &BatchCursor{ - db: db, - id: baseResp.Cursor.ID, - ns: baseResp.Cursor.Ns, - rawResp: raw, - curBatch: bson.RawArray(baseResp.Cursor.FirstBatch), - }, nil -} - -func (c *BatchCursor) SetSession(sess *mongo.Session) { - c.sess = sess -} - -func (c *BatchCursor) SetMaxAwaitTime(d time.Duration) { - c.maxAwaitTime = option.Some(d) -} - -// GetResumeToken is a convenience function that extracts the -// post-batch resume token from the cursor. -func GetResumeToken(c *BatchCursor) (bson.Raw, error) { - var resumeToken bson.Raw - - tokenRV, err := c.rawResp.LookupErr("cursor", "postBatchResumeToken") - if err != nil { - return nil, errors.Wrapf(err, "extracting change stream’s resume token") - } - - resumeToken, err = mbson.CastRawValue[bson.Raw](tokenRV) - if err != nil { - return nil, errors.Wrap( - err, - "parsing change stream’s resume token", - ) - } - - return resumeToken, nil -} diff --git a/mseq/mseq.go b/mseq/mseq.go deleted file mode 100644 index c4b41ec5..00000000 --- a/mseq/mseq.go +++ /dev/null @@ -1,29 +0,0 @@ -package mseq - -import "iter" - -// FromSlice returns an iterator over a slice. -// -// NB: See slices.Collect for the opposite operation. -func FromSlice[T any](s []T) iter.Seq[T] { - return func(yield func(T) bool) { - for _, v := range s { - if !yield(v) { - return - } - } - } -} - -// FromSliceWithNilErr is like FromSlice but returns a Seq2 -// whose second return is always a nil error. This is useful -// in testing. -func FromSliceWithNilErr[T any](s []T) iter.Seq2[T, error] { - return func(yield func(T, error) bool) { - for _, v := range s { - if !yield(v, nil) { - return - } - } - } -}