From f1674d2b93489be4ef17cc23a5aefe4cf7b5dc02 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 16:13:45 -0400 Subject: [PATCH 01/17] Rewrite compare to use batch cursor --- internal/verifier/compare.go | 163 +++++++++++++++++-------- mmongo/cursor/cursor.go | 223 +++++++++++++++++++++++++++++++++++ 2 files changed, 335 insertions(+), 51 deletions(-) create mode 100644 mmongo/cursor/cursor.go diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 8455f26b..fd7014d5 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "iter" "time" "github.com/10gen/migration-verifier/chanutil" @@ -12,6 +13,7 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mmongo/cursor" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -31,6 +33,11 @@ const ( docKeyInHashedCompare = "k" ) +type seqWithTs struct { + seq iter.Seq2[bson.Raw, error] + ts primitive.Timestamp +} + type docWithTs struct { doc bson.Raw ts primitive.Timestamp @@ -46,7 +53,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( types.ByteCount, error, ) { - var srcChannel, dstChannel <-chan docWithTs + var srcChannel, dstChannel <-chan seqWithTs var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error results := []VerificationResult{} @@ -101,7 +108,7 @@ func (verifier *Verifier) compareDocsFromChannels( workerNum int, fi *retry.FuncInfo, task *VerificationTask, - srcChannel, dstChannel <-chan docWithTs, + srcChannel, dstChannel <-chan seqWithTs, ) ( []VerificationResult, types.DocumentCount, @@ -205,7 +212,7 @@ func (verifier *Verifier) compareDocsFromChannels( for !srcClosed || !dstClosed { simpleTimerReset(readTimer, readTimeout) - var srcDocWithTs, dstDocWithTs docWithTs + var srcDocsWithTs, dstDocsWithTs seqWithTs eg, egCtx := contextplus.ErrGroup(ctx) @@ -220,21 +227,13 @@ func (verifier *Verifier) compareDocsFromChannels( "failed to read from source after %s", readTimeout, ) - case srcDocWithTs, alive = <-srcChannel: + case srcDocsWithTs, alive = <-srcChannel: if !alive { srcClosed = true break } fi.NoteSuccess("received document from source") - - srcDocCount++ - srcByteCount += types.ByteCount(len(srcDocWithTs.doc)) - verifier.workerTracker.SetSrcCounts( - workerNum, - srcDocCount, - srcByteCount, - ) } return nil @@ -252,7 +251,7 @@ func (verifier *Verifier) compareDocsFromChannels( "failed to read from destination after %s", readTimeout, ) - case dstDocWithTs, alive = <-dstChannel: + case dstDocsWithTs, alive = <-dstChannel: if !alive { dstClosed = true break @@ -272,32 +271,72 @@ func (verifier *Verifier) compareDocsFromChannels( ) } - if srcDocWithTs.doc != nil { - err := handleNewDoc(srcDocWithTs, true) + if srcDocsWithTs.seq != nil { + for doc, err := range srcDocsWithTs.seq { + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "reading batch of docs from source (task: %s)", + task.PrimaryKey, + ) + } - if err != nil { + srcDocCount++ + srcByteCount += types.ByteCount(len(doc)) + verifier.workerTracker.SetSrcCounts( + workerNum, + srcDocCount, + srcByteCount, + ) - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle %#q's source doc (task: %s) with ID %v", - namespace, - task.PrimaryKey, - srcDocWithTs.doc.Lookup("_id"), + err := handleNewDoc( + docWithTs{ + doc: doc, + ts: srcDocsWithTs.ts, + }, + true, ) + + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's source doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + doc.Lookup("_id"), + ) + } } + } - if dstDocWithTs.doc != nil { - err := handleNewDoc(dstDocWithTs, false) + if dstDocsWithTs.seq != nil { + for doc, err := range dstDocsWithTs.seq { + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "reading batch of docs from destination (task: %s)", + task.PrimaryKey, + ) + } - if err != nil { - return nil, 0, 0, errors.Wrapf( - err, - "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v", - namespace, - task.PrimaryKey, - dstDocWithTs.doc.Lookup("_id"), + err := handleNewDoc( + docWithTs{ + doc: doc, + ts: dstDocsWithTs.ts, + }, + false, ) + + if err != nil { + return nil, 0, 0, errors.Wrapf( + err, + "comparer thread failed to handle %#q's destination doc (task: %s) with ID %v", + namespace, + task.PrimaryKey, + doc.Lookup("_id"), + ) + } } } } @@ -428,13 +467,13 @@ func simpleTimerReset(t *time.Timer, dur time.Duration) { func (verifier *Verifier) getFetcherChannelsAndCallbacks( task *VerificationTask, ) ( - <-chan docWithTs, - <-chan docWithTs, + <-chan seqWithTs, + <-chan seqWithTs, func(context.Context, *retry.FuncInfo) error, func(context.Context, *retry.FuncInfo) error, ) { - srcChannel := make(chan docWithTs) - dstChannel := make(chan docWithTs) + srcChannel := make(chan seqWithTs) + dstChannel := make(chan seqWithTs) readSrcCallback := func(ctx context.Context, state *retry.FuncInfo) error { // We open a session here so that we can read the session’s cluster @@ -511,36 +550,44 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( } func iterateCursorToChannel( - sctx mongo.SessionContext, + ctx context.Context, state *retry.FuncInfo, - cursor *mongo.Cursor, - writer chan<- docWithTs, + myCursor *cursor.Cursor, + writer chan<- seqWithTs, ) error { defer close(writer) - for cursor.Next(sctx) { + for { + seq := myCursor.GetCurrentBatch() + state.NoteSuccess("received a document") - clusterTime, err := util.GetClusterTimeFromSession(sctx) + ct, err := myCursor.GetClusterTime() if err != nil { - return errors.Wrap(err, "reading cluster time from session") + return errors.Wrap(err, "reading cluster time from batch") } err = chanutil.WriteWithDoneCheck( - sctx, + ctx, writer, - docWithTs{ - doc: slices.Clone(cursor.Current), - ts: clusterTime, + seqWithTs{ + seq: seq, + ts: ct, }, ) if err != nil { - return errors.Wrapf(err, "sending document to compare thread") + return errors.Wrapf(err, "sending iterator to compare thread") } - } - return errors.Wrap(cursor.Err(), "failed to iterate cursor") + if myCursor.IsFinished() { + return nil + } + + if err := myCursor.GetNext(ctx); err != nil { + return errors.Wrap(err, "failed to iterate cursor") + } + } } func getMapKey(docKeyValues []bson.RawValue) string { @@ -554,8 +601,13 @@ func getMapKey(docKeyValues []bson.RawValue) string { return keyBuffer.String() } -func (verifier *Verifier) getDocumentsCursor(ctx mongo.SessionContext, collection *mongo.Collection, clusterInfo *util.ClusterInfo, - startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { +func (verifier *Verifier) getDocumentsCursor( + ctx mongo.SessionContext, + collection *mongo.Collection, + clusterInfo *util.ClusterInfo, + startAtTs *primitive.Timestamp, + task *VerificationTask, +) (*cursor.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() var andPredicates bson.A @@ -672,7 +724,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx mongo.SessionContext, collectio } } - return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions) + c, err := cursor.New( + collection.Database(), + collection.Database().RunCommand(ctx, cmd, runCommandOptions), + ) + + if err == nil { + c.SetSession(mongo.SessionFromContext(ctx)) + } + + return c, err } func transformPipelineForToHashedIndexKey( diff --git a/mmongo/cursor/cursor.go b/mmongo/cursor/cursor.go new file mode 100644 index 00000000..a3d67145 --- /dev/null +++ b/mmongo/cursor/cursor.go @@ -0,0 +1,223 @@ +// Package cursor exposes a cursor implementation that facilitates easy +// batch reads as well as reading of custom cursor properties like +// resume tokens. +package cursor + +import ( + "context" + "fmt" + "io" + "iter" + "strings" + "time" + + "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +var ( + clusterTimePath = mslices.Of("$clusterTime", "clusterTime") +) + +// Cursor is like mongo.Cursor, but it exposes documents per batch rather than +// a per-document reader. It also exposes cursor metadata, which facilitates +// things like resumable $natural scans. +type Cursor struct { + sess mongo.Session + maxAwaitTime option.Option[time.Duration] + id int64 + ns string + db *mongo.Database + rawResp bson.Raw + curBatch bson.Raw // an undecoded array + //cursorExtra ExtraMap +} + +// ExtraMap represents “extra” data points in cursor metadata. +type ExtraMap = map[string]bson.RawValue + +// GetCurrentBatch returns an iterator over the Cursor’s current batch. +func (c *Cursor) GetCurrentBatch() iter.Seq2[bson.Raw, error] { + batch := c.curBatch + + // NB: This MUST NOT close around c (the receiver), or else there can be + // a race condition between this callback and GetNext(). + return func(yield func(bson.Raw, error) bool) { + iterator := &bsoncore.DocumentSequence{ + Style: bsoncore.ArrayStyle, + Data: batch, + } + + for { + doc, err := iterator.Next() + if errors.Is(err, io.EOF) { + return + } + + if !yield(bson.Raw(doc), err) { + return + } + } + } +} + +// GetClusterTime returns the server response’s cluster time. +func (c *Cursor) GetClusterTime() (primitive.Timestamp, error) { + ctRV, err := c.rawResp.LookupErr(clusterTimePath...) + + if err != nil { + return primitive.Timestamp{}, errors.Wrapf( + err, + "extracting %#q from server response", + clusterTimePath, + ) + } + + ts, err := mbson.CastRawValue[primitive.Timestamp](ctRV) + if err != nil { + return primitive.Timestamp{}, errors.Wrapf( + err, + "parsing server response’s %#q", + clusterTimePath, + ) + } + + return ts, nil +} + +// IsFinished indicates whether the present batch is the final one. +func (c *Cursor) IsFinished() bool { + return c.id == 0 +} + +// GetNext fetches the next batch of responses from the server and caches it +// for access via GetCurrentBatch(). +// +// extraPieces are things you want to add to the underlying `getMore` +// server call, such as `batchSize`. +func (c *Cursor) GetNext(ctx context.Context, extraPieces ...bson.E) error { + if c.IsFinished() { + panic("internal error: cursor already finished!") + } + + nsDB, nsColl, found := strings.Cut(c.ns, ".") + if !found { + panic("Malformed namespace from cursor (expect a dot): " + c.ns) + } + if nsDB != c.db.Name() { + panic(fmt.Sprintf("db from cursor (%s) mismatches db struct (%s)", nsDB, c.db.Name())) + } + + cmd := bson.D{ + {"getMore", c.id}, + {"collection", nsColl}, + } + + if awaitTime, has := c.maxAwaitTime.Get(); has { + cmd = append(cmd, bson.E{"maxTimeMS", awaitTime.Milliseconds()}) + } + + cmd = append(cmd, extraPieces...) + + if c.sess != nil { + ctx = mongo.NewSessionContext(ctx, c.sess) + } + resp := c.db.RunCommand(ctx, cmd) + + raw, err := resp.Raw() + if err != nil { + return fmt.Errorf("iterating %#q’s cursor: %w", c.ns, err) + } + + //c.curBatch = extractBatch(raw, "nextBatch") baseResp.Cursor.NextBatch + c.curBatch = lo.Must(raw.LookupErr("cursor", "nextBatch")).Array() + c.rawResp = raw + //c.cursorExtra = baseResp.Cursor.Extra + c.id = lo.Must(raw.LookupErr("cursor", "id")).AsInt64() //baseResp.Cursor.ID + + return nil +} + +type cursorResponse struct { + ID int64 + Ns string + + // These are both BSON arrays. We use bson.Raw here to delay parsing + // and avoid allocating a large slice. + FirstBatch bson.Raw + NextBatch bson.Raw +} + +type baseResponse struct { + Cursor cursorResponse +} + +// New creates a Cursor from the response of a cursor-returning command +// like `find` or `bulkWrite`. +// +// Use this control (rather than the Go driver’s cursor implementation) +// to extract parts of the cursor responses that the driver’s API doesn’t +// expose. This is useful, e.g., to do a resumable $natural scan by +// extracting resume tokens from `find` responses. +// +// See NewWithSession() as well. +func New( + db *mongo.Database, + resp *mongo.SingleResult, +) (*Cursor, error) { + raw, err := resp.Raw() + if err != nil { + return nil, errors.Wrapf(err, "cursor open failed") + } + + baseResp := baseResponse{} + + err = bson.Unmarshal(raw, &baseResp) + if err != nil { + return nil, errors.Wrapf(err, "failed to decode cursor-open response to %T", baseResp) + } + + return &Cursor{ + db: db, + id: baseResp.Cursor.ID, + ns: baseResp.Cursor.Ns, + rawResp: raw, + curBatch: baseResp.Cursor.FirstBatch, + }, nil +} + +func (c *Cursor) SetSession(sess mongo.Session) { + c.sess = sess +} + +func (c *Cursor) SetMaxAwaitTime(d time.Duration) { + c.maxAwaitTime = option.Some(d) +} + +// GetResumeToken is a convenience function that extracts the +// post-batch resume token from the cursor. +func GetResumeToken(c *Cursor) (bson.Raw, error) { + var resumeToken bson.Raw + + tokenRV, err := c.rawResp.LookupErr("cursor", "postBatchResumeToken") + if err != nil { + return nil, errors.Wrapf(err, "extracting change stream’s resume token") + } + + resumeToken, err = mbson.CastRawValue[bson.Raw](tokenRV) + if err != nil { + return nil, errors.Wrap( + err, + "parsing change stream’s resume token", + ) + } + + return resumeToken, nil +} From ae55af21b7a6a9b8cc41b70b88da9885a302edcd Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 16:15:58 -0400 Subject: [PATCH 02/17] fix testing --- internal/verifier/migration_verifier_test.go | 13 ++++++---- mseq/mseq.go | 26 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 mseq/mseq.go diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 86b5d883..cd8630ce 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -26,6 +26,7 @@ import ( "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mseq" "github.com/10gen/migration-verifier/mslices" "github.com/cespare/permute/v2" "github.com/rs/zerolog" @@ -1146,13 +1147,15 @@ func TestVerifierCompareDocs(t *testing.T) { namespace := "testdb.testns" - makeDocChannel := func(docs []bson.D) <-chan docWithTs { - theChan := make(chan docWithTs, len(docs)) + makeDocChannel := func(docs []bson.D) <-chan seqWithTs { + theChan := make(chan seqWithTs, len(docs)) for d, doc := range docs { - theChan <- docWithTs{ - doc: testutil.MustMarshal(doc), - ts: primitive.Timestamp{1, uint32(d)}, + theChan <- seqWithTs{ + seq: mseq.FromSliceWithErr( + mslices.Of(testutil.MustMarshal(doc)), + ), + ts: primitive.Timestamp{1, uint32(d)}, } } diff --git a/mseq/mseq.go b/mseq/mseq.go new file mode 100644 index 00000000..3df55d25 --- /dev/null +++ b/mseq/mseq.go @@ -0,0 +1,26 @@ +package mseq + +import "iter" + +func FromSlice[T any](s []T) iter.Seq[T] { + return func(yield func(T) bool) { + for _, v := range s { + if !yield(v) { + return + } + } + } +} + +// FromSliceWithErr is like FromSlice but returns a Seq2 +// whose second return is always a nil error. This is useful +// in testing. +func FromSliceWithErr[T any](s []T) iter.Seq2[T, error] { + return func(yield func(T, error) bool) { + for _, v := range s { + if !yield(v, nil) { + return + } + } + } +} From fd568f0f4e6de42af9d1af8da21bc6e98c46e280 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 16:36:25 -0400 Subject: [PATCH 03/17] tweaks --- internal/verifier/compare.go | 6 ++--- mmongo/cursor/{cursor.go => batch.go} | 36 ++++++++++++++++----------- 2 files changed, 25 insertions(+), 17 deletions(-) rename mmongo/cursor/{cursor.go => batch.go} (81%) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index fd7014d5..080da9c5 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -552,13 +552,13 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( func iterateCursorToChannel( ctx context.Context, state *retry.FuncInfo, - myCursor *cursor.Cursor, + myCursor *cursor.BatchCursor, writer chan<- seqWithTs, ) error { defer close(writer) for { - seq := myCursor.GetCurrentBatch() + seq := myCursor.GetCurrentBatchIterator() state.NoteSuccess("received a document") @@ -607,7 +607,7 @@ func (verifier *Verifier) getDocumentsCursor( clusterInfo *util.ClusterInfo, startAtTs *primitive.Timestamp, task *VerificationTask, -) (*cursor.Cursor, error) { +) (*cursor.BatchCursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() var andPredicates bson.A diff --git a/mmongo/cursor/cursor.go b/mmongo/cursor/batch.go similarity index 81% rename from mmongo/cursor/cursor.go rename to mmongo/cursor/batch.go index a3d67145..1cc569f7 100644 --- a/mmongo/cursor/cursor.go +++ b/mmongo/cursor/batch.go @@ -26,10 +26,10 @@ var ( clusterTimePath = mslices.Of("$clusterTime", "clusterTime") ) -// Cursor is like mongo.Cursor, but it exposes documents per batch rather than +// BatchCursor is like mongo.Cursor, but it exposes documents per batch rather than // a per-document reader. It also exposes cursor metadata, which facilitates // things like resumable $natural scans. -type Cursor struct { +type BatchCursor struct { sess mongo.Session maxAwaitTime option.Option[time.Duration] id int64 @@ -40,11 +40,15 @@ type Cursor struct { //cursorExtra ExtraMap } -// ExtraMap represents “extra” data points in cursor metadata. -type ExtraMap = map[string]bson.RawValue +// GetCurrentBatchIterator returns an iterator over the Cursor’s current batch. +func (c *BatchCursor) GetCurrentBatchIterator() iter.Seq2[bson.Raw, error] { + // NB: Use of iter.Seq2 to return an error is a bit controversial. + // The pattern is used here in order to minimize the odds that a caller + // would overlook the need to check the error, which seems more probable + // with various other patterns. + // + // See “https://sinclairtarget.com/blog/2025/07/error-handling-with-iterators-in-go/”. -// GetCurrentBatch returns an iterator over the Cursor’s current batch. -func (c *Cursor) GetCurrentBatch() iter.Seq2[bson.Raw, error] { batch := c.curBatch // NB: This MUST NOT close around c (the receiver), or else there can be @@ -64,12 +68,16 @@ func (c *Cursor) GetCurrentBatch() iter.Seq2[bson.Raw, error] { if !yield(bson.Raw(doc), err) { return } + + if err != nil { + panic(fmt.Sprintf("Iteration must stop after error (%v)", err)) + } } } } // GetClusterTime returns the server response’s cluster time. -func (c *Cursor) GetClusterTime() (primitive.Timestamp, error) { +func (c *BatchCursor) GetClusterTime() (primitive.Timestamp, error) { ctRV, err := c.rawResp.LookupErr(clusterTimePath...) if err != nil { @@ -93,7 +101,7 @@ func (c *Cursor) GetClusterTime() (primitive.Timestamp, error) { } // IsFinished indicates whether the present batch is the final one. -func (c *Cursor) IsFinished() bool { +func (c *BatchCursor) IsFinished() bool { return c.id == 0 } @@ -102,7 +110,7 @@ func (c *Cursor) IsFinished() bool { // // extraPieces are things you want to add to the underlying `getMore` // server call, such as `batchSize`. -func (c *Cursor) GetNext(ctx context.Context, extraPieces ...bson.E) error { +func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error { if c.IsFinished() { panic("internal error: cursor already finished!") } @@ -171,7 +179,7 @@ type baseResponse struct { func New( db *mongo.Database, resp *mongo.SingleResult, -) (*Cursor, error) { +) (*BatchCursor, error) { raw, err := resp.Raw() if err != nil { return nil, errors.Wrapf(err, "cursor open failed") @@ -184,7 +192,7 @@ func New( return nil, errors.Wrapf(err, "failed to decode cursor-open response to %T", baseResp) } - return &Cursor{ + return &BatchCursor{ db: db, id: baseResp.Cursor.ID, ns: baseResp.Cursor.Ns, @@ -193,17 +201,17 @@ func New( }, nil } -func (c *Cursor) SetSession(sess mongo.Session) { +func (c *BatchCursor) SetSession(sess mongo.Session) { c.sess = sess } -func (c *Cursor) SetMaxAwaitTime(d time.Duration) { +func (c *BatchCursor) SetMaxAwaitTime(d time.Duration) { c.maxAwaitTime = option.Some(d) } // GetResumeToken is a convenience function that extracts the // post-batch resume token from the cursor. -func GetResumeToken(c *Cursor) (bson.Raw, error) { +func GetResumeToken(c *BatchCursor) (bson.Raw, error) { var resumeToken bson.Raw tokenRV, err := c.rawResp.LookupErr("cursor", "postBatchResumeToken") From e6f34e23d13d8cbd4d52a743aff0b2caa00b656e Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 16:39:48 -0400 Subject: [PATCH 04/17] doc --- mmongo/cursor/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 1cc569f7..4739eef7 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -40,7 +40,7 @@ type BatchCursor struct { //cursorExtra ExtraMap } -// GetCurrentBatchIterator returns an iterator over the Cursor’s current batch. +// GetCurrentBatchIterator returns an iterator over the BatchCursor’s current batch. func (c *BatchCursor) GetCurrentBatchIterator() iter.Seq2[bson.Raw, error] { // NB: Use of iter.Seq2 to return an error is a bit controversial. // The pattern is used here in order to minimize the odds that a caller From e7841f4789dd8159e1a194e7c0e0da011f6ee473 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 16:43:11 -0400 Subject: [PATCH 05/17] more commenst --- mmongo/cursor/batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 4739eef7..f1e94e1e 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -41,6 +41,8 @@ type BatchCursor struct { } // GetCurrentBatchIterator returns an iterator over the BatchCursor’s current batch. +// Note that the iteratees are NOT copied; the expectation is that each batch +// will be iterated exactly once. (Nothing *requires* that, of course.) func (c *BatchCursor) GetCurrentBatchIterator() iter.Seq2[bson.Raw, error] { // NB: Use of iter.Seq2 to return an error is a bit controversial. // The pattern is used here in order to minimize the odds that a caller From dc918025650cc89b7de85eec5e96dfac7af388f8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 29 Oct 2025 20:53:32 -0400 Subject: [PATCH 06/17] comment --- mmongo/cursor/batch.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index f1e94e1e..cbfd3a93 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -27,8 +27,9 @@ var ( ) // BatchCursor is like mongo.Cursor, but it exposes documents per batch rather than -// a per-document reader. It also exposes cursor metadata, which facilitates -// things like resumable $natural scans. +// a per-document reader. This facilitates more efficient memory usage +// because there is no need to clone each document individually, as +// mongo.Cursor requires. type BatchCursor struct { sess mongo.Session maxAwaitTime option.Option[time.Duration] From 945ada34fa58f4c4818c095a9363f70b7679c5b3 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 30 Oct 2025 20:33:09 -0400 Subject: [PATCH 07/17] gotta use raw after all --- mmongo/cursor/batch.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 863002a2..e05b93ed 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -149,7 +149,6 @@ func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error return fmt.Errorf("iterating %#q’s cursor: %w", c.ns, err) } - //c.curBatch = extractBatch(raw, "nextBatch") baseResp.Cursor.NextBatch c.curBatch = lo.Must(raw.LookupErr("cursor", "nextBatch")).Array() c.rawResp = raw //c.cursorExtra = baseResp.Cursor.Extra @@ -164,8 +163,8 @@ type cursorResponse struct { // These are both BSON arrays. We use bson.Raw here to delay parsing // and avoid allocating a large slice. - FirstBatch bson.RawArray - NextBatch bson.RawArray + FirstBatch bson.Raw + NextBatch bson.Raw } type baseResponse struct { @@ -202,7 +201,7 @@ func New( id: baseResp.Cursor.ID, ns: baseResp.Cursor.Ns, rawResp: raw, - curBatch: baseResp.Cursor.FirstBatch, + curBatch: bson.RawArray(baseResp.Cursor.FirstBatch), }, nil } From 1e8c16faa2530deac5564892e95d549d42555585 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 30 Oct 2025 20:56:29 -0400 Subject: [PATCH 08/17] no Must --- mmongo/cursor/batch.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index e05b93ed..3b900507 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -15,7 +15,6 @@ import ( "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" - "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" @@ -149,10 +148,28 @@ func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error return fmt.Errorf("iterating %#q’s cursor: %w", c.ns, err) } - c.curBatch = lo.Must(raw.LookupErr("cursor", "nextBatch")).Array() + nextBatch, err := raw.LookupErr("cursor", "nextBatch") + if err != nil { + return errors.Wrap(err, "extracting nextBatch") + } + + var ok bool + c.curBatch, ok = nextBatch.ArrayOK() + if !ok { + return fmt.Errorf("nextBatch should be BSON %s but found %s", bson.TypeArray, nextBatch.Type) + } + c.rawResp = raw - //c.cursorExtra = baseResp.Cursor.Extra - c.id = lo.Must(raw.LookupErr("cursor", "id")).AsInt64() //baseResp.Cursor.ID + + cursorID, err := raw.LookupErr("cursor", "id") + if err != nil { + return errors.Wrap(err, "extracting cursor ID") + } + + c.id, ok = cursorID.AsInt64OK() + if !ok { + return fmt.Errorf("cursor.id should be numeric but found BSON %s", cursorID.Type) + } return nil } From d39afb78b4985f4168661442f37e58920bcee567 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 10:43:57 -0400 Subject: [PATCH 09/17] save thus far --- internal/verifier/change_stream.go | 351 ++++++++++--------- internal/verifier/migration_verifier_test.go | 6 +- internal/verifier/recheck_test.go | 2 +- 3 files changed, 188 insertions(+), 171 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 64af7838..e7409c60 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -5,20 +5,17 @@ import ( "fmt" "time" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/mmongo/cursor" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" - clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" "github.com/rs/zerolog" - "github.com/samber/lo" "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -56,7 +53,7 @@ func (uee UnknownEventError) Error() string { } type changeEventBatch struct { - events []ParsedEvent + events []*ParsedEvent clusterTime bson.Timestamp } @@ -80,41 +77,42 @@ type ChangeStreamReader struct { startAtTs *bson.Timestamp - lag *msync.TypedAtomic[option.Option[time.Duration]] - batchSizeHistory *history.History[int] + lag *msync.TypedAtomic[option.Option[time.Duration]] onDDLEvent ddlEventHandling } func (verifier *Verifier) initializeChangeStreamReaders() { - srcReader := &ChangeStreamReader{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - } - verifier.srcChangeStreamReader = srcReader - - dstReader := &ChangeStreamReader{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - } - verifier.dstChangeStreamReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.readerError = util.NewEventual[error]() - csr.handlerError = util.NewEventual[error]() - csr.doneChan = make(chan struct{}) - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) + verifier.srcChangeStreamReader = &ChangeStreamReader{ + readerType: src, + logger: verifier.logger, + namespaces: verifier.srcNamespaces, + metaDB: verifier.metaClient.Database(verifier.metaDBName), + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + changeStreamRunning: false, + changeEventBatchChan: make(chan changeEventBatch), + writesOffTs: util.NewEventual[bson.Timestamp](), + readerError: util.NewEventual[error](), + handlerError: util.NewEventual[error](), + doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + } + verifier.dstChangeStreamReader = &ChangeStreamReader{ + readerType: dst, + logger: verifier.logger, + namespaces: verifier.dstNamespaces, + metaDB: verifier.metaClient.Database(verifier.metaDBName), + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + changeStreamRunning: false, + changeEventBatchChan: make(chan changeEventBatch), + writesOffTs: util.NewEventual[bson.Timestamp](), + readerError: util.NewEventual[error](), + handlerError: util.NewEventual[error](), + doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + onDDLEvent: onDDLEventAllow, } } @@ -237,7 +235,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch dataSizes[i] = len(changeEvent.FullDocument) } - if err := eventRecorder.AddEvent(&changeEvent); err != nil { + if err := eventRecorder.AddEvent(changeEvent); err != nil { return errors.Wrapf( err, "failed to augment stats with %s change event (%+v)", @@ -349,50 +347,27 @@ func (csr *ChangeStreamReader) hasBsonSize() bool { func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, - sess *mongo.Session, + csCursor *cursor.BatchCursor, ) error { - eventsRead := 0 - var changeEvents []ParsedEvent - - latestEvent := option.None[ParsedEvent]() - + changeEvents := make([]*ParsedEvent, 0, 10_000) // TODO var batchTotalBytes int - for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + var latestEvent option.Option[ParsedEvent] - if cs.Err() != nil { - return errors.Wrap(cs.Err(), "change stream iteration failed") - } - - if !gotEvent { - break + for rawEvent, err := range csCursor.GetCurrentBatchIterator() { + if err != nil { + return errors.Wrapf(err, "reading batch of events") } - if changeEvents == nil { - batchSize := cs.RemainingBatchLength() + 1 - - ri.NoteSuccess("received a batch of %d change event(s)", batchSize) + var newEvent ParsedEvent - changeEvents = make([]ParsedEvent, batchSize) + if err := bson.Unmarshal(rawEvent, &newEvent); err != nil { + return errors.Wrapf(err, "parsing change event") } - batchTotalBytes += len(cs.Current) - - if err := cs.Decode(&changeEvents[eventsRead]); err != nil { - return errors.Wrapf(err, "failed to decode change event to %T", changeEvents[eventsRead]) - } + batchTotalBytes += len(rawEvent) + changeEvents = append(changeEvents, &newEvent) - // This only logs in tests. - csr.logger.Trace(). - Stringer("changeStream", csr). - Any("event", changeEvents[eventsRead]). - Int("eventsPreviouslyReadInBatch", eventsRead). - Int("batchEvents", len(changeEvents)). - Int("batchBytes", batchTotalBytes). - Msg("Received a change event.") - - opType := changeEvents[eventsRead].OpType + opType := newEvent.OpType if !supportedEventOpTypes.Contains(opType) { // We expect certain DDL events on the destination as part of @@ -403,7 +378,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( if csr.onDDLEvent == onDDLEventAllow { csr.logger.Info(). Stringer("changeStream", csr). - Stringer("event", cs.Current). + Stringer("event", rawEvent). Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") // Discard this event, then keep reading. @@ -411,30 +386,37 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( continue } else { - return UnknownEventError{Event: clone.Clone(cs.Current)} + return UnknownEventError{Event: rawEvent} } } // This shouldn’t happen, but just in case: - if changeEvents[eventsRead].Ns == nil { - return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead]) + if newEvent.Ns == nil { + return errors.Errorf("Change event lacks a namespace: %+v", rawEvent) } - if changeEvents[eventsRead].ClusterTime != nil && + if newEvent.ClusterTime != nil && (csr.lastChangeEventTime == nil || - csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) { + csr.lastChangeEventTime.Before(*newEvent.ClusterTime)) { - csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime - latestEvent = option.Some(changeEvents[eventsRead]) + csr.lastChangeEventTime = newEvent.ClusterTime + latestEvent = option.Some(newEvent) } + } - eventsRead++ + clusterTS, err := csCursor.GetClusterTime() + if err != nil { + return errors.Wrapf(err, "extracting cluster time from server response") } - var tokenTs bson.Timestamp - tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + resumeToken, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrapf(err, "extracting resume token") + } + + tokenTS, err := extractTimestampFromResumeToken(resumeToken) if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) + lagSecs := int64(clusterTS.T) - int64(tokenTS.T) csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) } else { csr.logger.Warn(). @@ -442,14 +424,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) } - if eventsRead == 0 { + if len(changeEvents) == 0 { ri.NoteSuccess("received an empty change stream response") return nil } - csr.batchSizeHistory.Add(eventsRead) - if event, has := latestEvent.Get(); has { csr.logger.Trace(). Stringer("changeStreamReader", csr). @@ -468,7 +448,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( events: changeEvents, // NB: We know by now that OperationTime is non-nil. - clusterTime: *sess.OperationTime(), + clusterTime: clusterTS, }: } @@ -487,8 +467,7 @@ func (csr *ChangeStreamReader) wrapHandlerErrorForReader() error { func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, - sess *mongo.Session, + csCursor *cursor.BatchCursor, ) error { var lastPersistedTime time.Time @@ -497,7 +476,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( return nil } - err := csr.persistChangeStreamResumeToken(ctx, cs) + err := csr.persistChangeStreamResumeToken(ctx, csCursor) if err == nil { lastPersistedTime = time.Now() } @@ -541,32 +520,44 @@ func (csr *ChangeStreamReader) iterateChangeStream( // Read change events until the stream reaches the writesOffTs. // (i.e., the `getMore` call returns empty) for { - var curTs bson.Timestamp - curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, csCursor) + if err != nil { + return err + } + + rt, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrap(err, "extracting resume token") + } + + var curTS bson.Timestamp + curTS, err = extractTimestampFromResumeToken(rt) if err != nil { - return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return errors.Wrap(err, "extracting timestamp from change stream's resume token") } // writesOffTs never refers to a real event, // so we can stop once curTs >= writesOffTs. - if !curTs.Before(writesOffTs) { + if !curTS.Before(writesOffTs) { csr.logger.Debug(). - Any("currentTimestamp", curTs). + Any("resumeTokenTimestamp", curTS). Any("writesOffTimestamp", writesOffTs). Msgf("%s has reached the writesOff timestamp. Shutting down.", csr) break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) - - if err != nil { - return err + if err := csCursor.GetNext(ctx); err != nil { + return errors.Wrap(err, "reading change stream") } } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, csCursor) + + if err := csCursor.GetNext(ctx); err != nil { + return errors.Wrapf(err, "reading %s", csr) + } if err == nil { err = persistResumeTokenIfNeeded() @@ -605,23 +596,25 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, *mongo.Session, bson.Timestamp, error) { - pipeline := csr.GetChangeStreamFilter() - opts := options.ChangeStream(). - SetMaxAwaitTime(maxChangeStreamAwaitTime) +) (*cursor.BatchCursor, bson.Timestamp, error) { + + changeStreamStage := bson.D{ + {"allChangesForCluster", true}, + } if csr.clusterInfo.VersionArray[0] >= 6 { - opts = opts.SetCustomPipeline( - bson.M{ - "showSystemEvents": true, - "showExpandedEvents": true, - }, + changeStreamStage = append( + changeStreamStage, + bson.D{ + {"showSystemEvents", true}, + {"showExpandedEvents", true}, + }..., ) } savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() @@ -641,50 +634,96 @@ func (csr *ChangeStreamReader) createChangeStream( logEvent.Msg("Starting change stream from persisted resume token.") - opts = opts.SetStartAfter(savedResumeToken) + changeStreamStage = append( + changeStreamStage, + bson.D{ + {"startAfter", savedResumeToken}, + }..., + ) } else { csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } sess, err := csr.watcherClient.StartSession() if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") } - sctx := mongo.NewSessionContext(ctx, sess) - changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") + + aggregateCmd := bson.D{ + {"aggregate", 1}, + {"cursor", bson.D{}}, + {"pipeline", append( + mongo.Pipeline{ + {{"$changeStream", changeStreamStage}}, + }, + csr.GetChangeStreamFilter()..., + )}, } - err = csr.persistChangeStreamResumeToken(ctx, changeStream) + sctx := mongo.NewSessionContext(ctx, sess) + adminDB := sess.Client().Database("admin") + result := adminDB.RunCommand(sctx, aggregateCmd) + myCursor, err := cursor.New(adminDB, result) + if err != nil { - return nil, nil, bson.Timestamp{}, err + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") } - startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + if savedResumeToken == nil { + err = csr.persistChangeStreamResumeToken(ctx, myCursor) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "persisting initial resume token") + } } - // With sharded clusters the resume token might lead the cluster time - // by 1 increment. In that case we need the actual cluster time; - // otherwise we will get errors. - clusterTime, err := util.GetClusterTimeFromSession(sess) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + myCursor.SetSession(sess) + myCursor.SetMaxAwaitTime(maxChangeStreamAwaitTime) + + var startTS bson.Timestamp + for firstEvent, err := range myCursor.GetCurrentBatchIterator() { + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "reading first event") + } + + // If there is no `startAfter`, then the change stream’s first response + // should have no events. If that invariant breaks then we will have + // just persisted a resume token that exceeds events we have yet to + // process. + if savedResumeToken == nil { + panic(fmt.Sprintf("plain change stream first response should be empty; instead got: %v", firstEvent)) + } + + ct, err := firstEvent.LookupErr("clusterTime") + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "extracting first event’s cluster time") + } + + if err := mbson.UnmarshalRawValue(ct, &startTS); err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "parsing first event’s cluster time") + } + + break } - csr.logger.Debug(). - Any("resumeTokenTimestamp", startTs). - Any("clusterTime", clusterTime). - Stringer("changeStreamReader", csr). - Msg("Using earlier time as start timestamp.") + if savedResumeToken == nil { + resumeToken, err := cursor.GetResumeToken(myCursor) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap( + err, + "extracting change stream’s resume token", + ) + } - if startTs.After(clusterTime) { - startTs = clusterTime + startTS, err = extractTimestampFromResumeToken(resumeToken) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap( + err, + "extracting timestamp from change stream’s resume token", + ) + } } - return changeStream, sess, startTs, nil + return myCursor, startTS, nil } // StartChangeStream starts the change stream. @@ -712,7 +751,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) + csCursor, startTs, err := csr.createChangeStream(ctx) if err != nil { logEvent := csr.logger.Debug(). Err(err). @@ -730,8 +769,6 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return err } - defer changeStream.Close(ctx) - logEvent := csr.logger.Debug(). Stringer("changeStreamReader", csr). Any("startTimestamp", startTs) @@ -746,7 +783,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { logEvent.Msg("Retried change stream open succeeded.") } - return csr.iterateChangeStream(ctx, ri, changeStream, sess) + return csr.iterateChangeStream(ctx, ri, csCursor) }, "running %s", csr, ).Run(ctx, csr.logger) @@ -758,12 +795,12 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { result := <-initialCreateResultChan - startTs, err := result.Get() + startTS, err := result.Get() if err != nil { return err } - csr.startAtTs = &startTs + csr.startAtTs = &startTS csr.changeStreamRunning = true @@ -774,29 +811,6 @@ func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { return csr.lag.Load() } -func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] { - logs := csr.batchSizeHistory.Get() - lastLog, hasLogs := lo.Last(logs) - - if hasLogs && lastLog.At != logs[0].At { - span := lastLog.At.Sub(logs[0].At) - - // Each log contains a time and a # of events that happened since - // the prior log. Thus, each log’s Datum is a count of events that - // happened before the timestamp. Since we want the # of events that - // happened between the first & last times, we only want events *after* - // the first time. Thus, we skip the first log entry here. - totalEvents := 0 - for _, log := range logs[1:] { - totalEvents += log.Datum - } - - return option.Some(util.DivideToF64(totalEvents, span.Seconds())) - } - - return option.None[float64]() -} - func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Any("timestamp", ts). @@ -837,11 +851,14 @@ func (csr *ChangeStreamReader) resumeTokenDocID() string { } } -func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error { - token := cs.ResumeToken() +func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, csCursor *cursor.BatchCursor) error { + token, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrap(err, "reading cursor’s resume token") + } coll := csr.getChangeStreamMetadataCollection() - _, err := coll.ReplaceOne( + _, err = coll.ReplaceOne( ctx, bson.D{{"_id", csr.resumeTokenDocID()}}, token, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index de89a602..c5d0c8ce 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -684,7 +684,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err := verifier.HandleChangeStreamEvents( ctx, changeEventBatch{ - events: []ParsedEvent{{ + events: []*ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll2"}, DocID: mbson.ToRawValue("heyhey"), @@ -700,7 +700,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err = verifier.HandleChangeStreamEvents( ctx, changeEventBatch{ - events: []ParsedEvent{{ + events: []*ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll1"}, DocID: mbson.ToRawValue("hoohoo"), @@ -972,7 +972,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { } batch := changeEventBatch{ - events: mslices.Of(event), + events: mslices.Of(&event), } err = verifier.HandleChangeStreamEvents(ctx, batch, src) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index f172bd71..7266bd36 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -61,7 +61,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { err := verifier.HandleChangeStreamEvents( ctx, - changeEventBatch{events: mslices.Of(event)}, + changeEventBatch{events: mslices.Of(&event)}, src, ) suite.Require().NoError(err) From 78caf674c978a73d783fc900af4c052ea045a4f4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 10:48:25 -0400 Subject: [PATCH 10/17] fix history --- internal/verifier/change_stream.go | 89 +++++++++++++++++++----------- 1 file changed, 58 insertions(+), 31 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e7409c60..c7d19c08 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -5,17 +5,20 @@ import ( "fmt" "time" + "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mmongo/cursor" + "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/samber/lo" "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -77,42 +80,41 @@ type ChangeStreamReader struct { startAtTs *bson.Timestamp - lag *msync.TypedAtomic[option.Option[time.Duration]] + lag *msync.TypedAtomic[option.Option[time.Duration]] + batchSizeHistory *history.History[int] onDDLEvent ddlEventHandling } func (verifier *Verifier) initializeChangeStreamReaders() { - verifier.srcChangeStreamReader = &ChangeStreamReader{ - readerType: src, - logger: verifier.logger, - namespaces: verifier.srcNamespaces, - metaDB: verifier.metaClient.Database(verifier.metaDBName), - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - changeStreamRunning: false, - changeEventBatchChan: make(chan changeEventBatch), - writesOffTs: util.NewEventual[bson.Timestamp](), - readerError: util.NewEventual[error](), - handlerError: util.NewEventual[error](), - doneChan: make(chan struct{}), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), - } - verifier.dstChangeStreamReader = &ChangeStreamReader{ - readerType: dst, - logger: verifier.logger, - namespaces: verifier.dstNamespaces, - metaDB: verifier.metaClient.Database(verifier.metaDBName), - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - changeStreamRunning: false, - changeEventBatchChan: make(chan changeEventBatch), - writesOffTs: util.NewEventual[bson.Timestamp](), - readerError: util.NewEventual[error](), - handlerError: util.NewEventual[error](), - doneChan: make(chan struct{}), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), - onDDLEvent: onDDLEventAllow, + srcReader := &ChangeStreamReader{ + readerType: src, + namespaces: verifier.srcNamespaces, + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + } + verifier.srcChangeStreamReader = srcReader + + dstReader := &ChangeStreamReader{ + readerType: dst, + namespaces: verifier.dstNamespaces, + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + onDDLEvent: onDDLEventAllow, + } + verifier.dstChangeStreamReader = dstReader + + // Common elements in both readers: + for _, csr := range mslices.Of(srcReader, dstReader) { + csr.logger = verifier.logger + csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) + csr.changeEventBatchChan = make(chan changeEventBatch) + csr.writesOffTs = util.NewEventual[bson.Timestamp]() + csr.readerError = util.NewEventual[error]() + csr.handlerError = util.NewEventual[error]() + csr.doneChan = make(chan struct{}) + csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) + csr.batchSizeHistory = history.New[int](time.Minute) } } @@ -430,6 +432,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } + csr.batchSizeHistory.Add(len(changeEvents)) + if event, has := latestEvent.Get(); has { csr.logger.Trace(). Stringer("changeStreamReader", csr). @@ -811,6 +815,29 @@ func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { return csr.lag.Load() } +func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] { + logs := csr.batchSizeHistory.Get() + lastLog, hasLogs := lo.Last(logs) + + if hasLogs && lastLog.At != logs[0].At { + span := lastLog.At.Sub(logs[0].At) + + // Each log contains a time and a # of events that happened since + // the prior log. Thus, each log’s Datum is a count of events that + // happened before the timestamp. Since we want the # of events that + // happened between the first & last times, we only want events *after* + // the first time. Thus, we skip the first log entry here. + totalEvents := 0 + for _, log := range logs[1:] { + totalEvents += log.Datum + } + + return option.Some(util.DivideToF64(totalEvents, span.Seconds())) + } + + return option.None[float64]() +} + func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Any("timestamp", ts). From dfcdc05b4436ef7167956cd16411d3cb6866d543 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 10:59:47 -0400 Subject: [PATCH 11/17] comments & rename --- internal/verifier/migration_verifier_test.go | 2 +- mseq/mseq.go | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index de89a602..b4e35638 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1156,7 +1156,7 @@ func TestVerifierCompareDocs(t *testing.T) { for d, doc := range docs { theChan <- seqWithTs{ - seq: mseq.FromSliceWithErr( + seq: mseq.FromSliceWithNilErr( mslices.Of(testutil.MustMarshal(doc)), ), ts: bson.Timestamp{1, uint32(d)}, diff --git a/mseq/mseq.go b/mseq/mseq.go index 3df55d25..c4b41ec5 100644 --- a/mseq/mseq.go +++ b/mseq/mseq.go @@ -2,6 +2,9 @@ package mseq import "iter" +// FromSlice returns an iterator over a slice. +// +// NB: See slices.Collect for the opposite operation. func FromSlice[T any](s []T) iter.Seq[T] { return func(yield func(T) bool) { for _, v := range s { @@ -12,10 +15,10 @@ func FromSlice[T any](s []T) iter.Seq[T] { } } -// FromSliceWithErr is like FromSlice but returns a Seq2 +// FromSliceWithNilErr is like FromSlice but returns a Seq2 // whose second return is always a nil error. This is useful // in testing. -func FromSliceWithErr[T any](s []T) iter.Seq2[T, error] { +func FromSliceWithNilErr[T any](s []T) iter.Seq2[T, error] { return func(yield func(T, error) bool) { for _, v := range s { if !yield(v, nil) { From c076f2f4cbde75012efd691f8d3ee952ec5dd3e5 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 11:22:49 -0400 Subject: [PATCH 12/17] get count --- internal/verifier/change_stream.go | 13 +++++++++---- internal/verifier/migration_verifier_test.go | 6 +++--- internal/verifier/recheck_test.go | 2 +- mmongo/cursor/batch.go | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index c7d19c08..e34359b4 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -56,7 +56,7 @@ func (uee UnknownEventError) Error() string { } type changeEventBatch struct { - events []*ParsedEvent + events []ParsedEvent clusterTime bson.Timestamp } @@ -237,7 +237,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch dataSizes[i] = len(changeEvent.FullDocument) } - if err := eventRecorder.AddEvent(changeEvent); err != nil { + if err := eventRecorder.AddEvent(&changeEvent); err != nil { return errors.Wrapf( err, "failed to augment stats with %s change event (%+v)", @@ -351,7 +351,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ri *retry.FuncInfo, csCursor *cursor.BatchCursor, ) error { - changeEvents := make([]*ParsedEvent, 0, 10_000) // TODO + batchLen, err := csCursor.GetCurrentBatchLength() + if err != nil { + return err + } + + changeEvents := make([]ParsedEvent, 0, batchLen) var batchTotalBytes int var latestEvent option.Option[ParsedEvent] @@ -367,7 +372,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( } batchTotalBytes += len(rawEvent) - changeEvents = append(changeEvents, &newEvent) + changeEvents = append(changeEvents, newEvent) opType := newEvent.OpType if !supportedEventOpTypes.Contains(opType) { diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 6469385f..b4e35638 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -684,7 +684,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err := verifier.HandleChangeStreamEvents( ctx, changeEventBatch{ - events: []*ParsedEvent{{ + events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll2"}, DocID: mbson.ToRawValue("heyhey"), @@ -700,7 +700,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err = verifier.HandleChangeStreamEvents( ctx, changeEventBatch{ - events: []*ParsedEvent{{ + events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll1"}, DocID: mbson.ToRawValue("hoohoo"), @@ -972,7 +972,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { } batch := changeEventBatch{ - events: mslices.Of(&event), + events: mslices.Of(event), } err = verifier.HandleChangeStreamEvents(ctx, batch, src) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 7266bd36..f172bd71 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -61,7 +61,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { err := verifier.HandleChangeStreamEvents( ctx, - changeEventBatch{events: mslices.Of(&event)}, + changeEventBatch{events: mslices.Of(event)}, src, ) suite.Require().NoError(err) diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 3b900507..2dd0d998 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -38,6 +38,21 @@ type BatchCursor struct { curBatch bson.RawArray } +// GetCurrentBatchLength returns the number of documents in the current batch. +func (c *BatchCursor) GetCurrentBatchLength() (int, error) { + count := 0 + + for _, err := range c.GetCurrentBatchIterator() { + if err != nil { + return 0, errors.Wrap(err, "counting documents in batch") + } + + count++ + } + + return count, nil +} + // GetCurrentBatchIterator returns an iterator over the BatchCursor’s current batch. // Note that the iteratees are NOT copied; the expectation is that each batch // will be iterated exactly once. (Nothing *requires* that, of course.) From 2e21fbf3f3de8fde97a1dae72cafd592a75c852a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 12:13:13 -0400 Subject: [PATCH 13/17] undo needless --- internal/verifier/change_stream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e34359b4..51a62083 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -539,17 +539,17 @@ func (csr *ChangeStreamReader) iterateChangeStream( return errors.Wrap(err, "extracting resume token") } - var curTS bson.Timestamp - curTS, err = extractTimestampFromResumeToken(rt) + var curTs bson.Timestamp + curTs, err = extractTimestampFromResumeToken(rt) if err != nil { return errors.Wrap(err, "extracting timestamp from change stream's resume token") } // writesOffTs never refers to a real event, // so we can stop once curTs >= writesOffTs. - if !curTS.Before(writesOffTs) { + if !curTs.Before(writesOffTs) { csr.logger.Debug(). - Any("resumeTokenTimestamp", curTS). + Any("resumeTokenTimestamp", curTs). Any("writesOffTimestamp", writesOffTs). Msgf("%s has reached the writesOff timestamp. Shutting down.", csr) From e9bb968983b10483bb01f236a5eede8b77d8ca51 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 12:15:31 -0400 Subject: [PATCH 14/17] undo spelling --- internal/verifier/change_stream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 51a62083..a681ae84 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -688,7 +688,7 @@ func (csr *ChangeStreamReader) createChangeStream( myCursor.SetSession(sess) myCursor.SetMaxAwaitTime(maxChangeStreamAwaitTime) - var startTS bson.Timestamp + var startTs bson.Timestamp for firstEvent, err := range myCursor.GetCurrentBatchIterator() { if err != nil { return nil, bson.Timestamp{}, errors.Wrap(err, "reading first event") @@ -707,7 +707,7 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, bson.Timestamp{}, errors.Wrap(err, "extracting first event’s cluster time") } - if err := mbson.UnmarshalRawValue(ct, &startTS); err != nil { + if err := mbson.UnmarshalRawValue(ct, &startTs); err != nil { return nil, bson.Timestamp{}, errors.Wrap(err, "parsing first event’s cluster time") } @@ -723,7 +723,7 @@ func (csr *ChangeStreamReader) createChangeStream( ) } - startTS, err = extractTimestampFromResumeToken(resumeToken) + startTs, err = extractTimestampFromResumeToken(resumeToken) if err != nil { return nil, bson.Timestamp{}, errors.Wrap( err, @@ -732,7 +732,7 @@ func (csr *ChangeStreamReader) createChangeStream( } } - return myCursor, startTS, nil + return myCursor, startTs, nil } // StartChangeStream starts the change stream. From c5462200c3a6877feb514155574286de5d452ddc Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 15:20:39 -0400 Subject: [PATCH 15/17] fix read preference --- internal/verifier/change_stream.go | 16 ++++++++++++---- internal/verifier/compare.go | 1 + mmongo/cursor/batch.go | 27 +++++++++++++++++++-------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index a681ae84..ea3e611a 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -23,6 +23,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" "golang.org/x/exp/slices" ) @@ -67,9 +68,10 @@ type ChangeStreamReader struct { logger *logger.Logger namespaces []string - metaDB *mongo.Database - watcherClient *mongo.Client - clusterInfo util.ClusterInfo + metaDB *mongo.Database + watcherClient *mongo.Client + readPreference *readpref.ReadPref + clusterInfo util.ClusterInfo changeStreamRunning bool changeEventBatchChan chan changeEventBatch @@ -108,6 +110,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { for _, csr := range mslices.Of(srcReader, dstReader) { csr.logger = verifier.logger csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) + csr.readPreference = verifier.readPreference csr.changeEventBatchChan = make(chan changeEventBatch) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() @@ -671,7 +674,11 @@ func (csr *ChangeStreamReader) createChangeStream( sctx := mongo.NewSessionContext(ctx, sess) adminDB := sess.Client().Database("admin") - result := adminDB.RunCommand(sctx, aggregateCmd) + result := adminDB.RunCommand( + sctx, + aggregateCmd, + options.RunCmd().SetReadPreference(csr.readPreference), + ) myCursor, err := cursor.New(adminDB, result) if err != nil { @@ -687,6 +694,7 @@ func (csr *ChangeStreamReader) createChangeStream( myCursor.SetSession(sess) myCursor.SetMaxAwaitTime(maxChangeStreamAwaitTime) + myCursor.SetReadPreference(csr.readPreference) var startTs bson.Timestamp for firstEvent, err := range myCursor.GetCurrentBatchIterator() { diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 8c97d43b..f220f40a 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -730,6 +730,7 @@ func (verifier *Verifier) getDocumentsCursor( if err == nil { c.SetSession(mongo.SessionFromContext(ctx)) + c.SetReadPreference(verifier.readPreference) } return c, err diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 2dd0d998..1c1f18d8 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -17,6 +17,8 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" ) @@ -29,13 +31,14 @@ var ( // because there is no need to clone each document individually, as // mongo.Cursor requires. type BatchCursor struct { - sess *mongo.Session - maxAwaitTime option.Option[time.Duration] - id int64 - ns string - db *mongo.Database - rawResp bson.Raw - curBatch bson.RawArray + sess *mongo.Session + readPreference *readpref.ReadPref + maxAwaitTime option.Option[time.Duration] + id int64 + ns string + db *mongo.Database + rawResp bson.Raw + curBatch bson.RawArray } // GetCurrentBatchLength returns the number of documents in the current batch. @@ -156,7 +159,11 @@ func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error if c.sess != nil { ctx = mongo.NewSessionContext(ctx, c.sess) } - resp := c.db.RunCommand(ctx, cmd) + resp := c.db.RunCommand( + ctx, + cmd, + options.RunCmd().SetReadPreference(c.readPreference), + ) raw, err := resp.Raw() if err != nil { @@ -241,6 +248,10 @@ func (c *BatchCursor) SetSession(sess *mongo.Session) { c.sess = sess } +func (c *BatchCursor) SetReadPreference(rp *readpref.ReadPref) { + c.readPreference = rp +} + func (c *BatchCursor) SetMaxAwaitTime(d time.Duration) { c.maxAwaitTime = option.Some(d) } From 6499383095f0d2acb64a44425738f192d6e92685 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 15:32:46 -0400 Subject: [PATCH 16/17] set read pref at client level --- internal/verifier/uri.go | 7 +++++-- main/migration_verifier.go | 11 ++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/verifier/uri.go b/internal/verifier/uri.go index c5ed2cb8..dfef812d 100644 --- a/internal/verifier/uri.go +++ b/internal/verifier/uri.go @@ -11,7 +11,8 @@ import ( ) func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error { - opts := verifier.getClientOpts(uri) + opts := verifier.getClientOpts(uri). + SetReadPreference(verifier.readPreference) var err error verifier.srcClient, err = mongo.Connect(opts) if err != nil { @@ -90,7 +91,9 @@ func isVersionSupported(version []int) bool { } func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error { - opts := verifier.getClientOpts(uri) + opts := verifier.getClientOpts(uri). + SetReadPreference(verifier.readPreference) + var err error verifier.dstClient, err = mongo.Connect(opts) if err != nil { diff --git a/main/migration_verifier.go b/main/migration_verifier.go index aa1c8a65..35b3fd4a 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -276,8 +276,13 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err Int("processID", os.Getpid()). Msg("migration-verifier started.") + err := v.SetReadPreference(cCtx.String(readPreference)) + if err != nil { + return nil, err + } + srcConnStr := cCtx.String(srcURI) - _, srcConnStr, err := mmongo.MaybeAddDirectConnection(srcConnStr) + _, srcConnStr, err = mmongo.MaybeAddDirectConnection(srcConnStr) if err != nil { return nil, errors.Wrap(err, "parsing source connection string") } @@ -350,10 +355,6 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err } v.SetDocCompareMethod(docCompareMethod) - err = v.SetReadPreference(cCtx.String(readPreference)) - if err != nil { - return nil, err - } v.SetFailureDisplaySize(cCtx.Int64(failureDisplaySize)) return v, nil } From c62e625b64234a13bde888962f80c63fb8d55aae Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 31 Oct 2025 16:33:48 -0400 Subject: [PATCH 17/17] label --- internal/verifier/change_stream.go | 11 ++++++++++- internal/verifier/check.go | 13 ++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index ea3e611a..d7efdb97 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "runtime/pprof" "time" "github.com/10gen/migration-verifier/history" @@ -800,7 +801,15 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { logEvent.Msg("Retried change stream open succeeded.") } - return csr.iterateChangeStream(ctx, ri, csCursor) + pprof.Do( + ctx, + pprof.Labels("component", "iterate-change-stream"), + func(ctx context.Context) { + err = csr.iterateChangeStream(ctx, ri, csCursor) + }, + ) + + return err }, "running %s", csr, ).Run(ctx, csr.logger) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index bd64874f..76a5dca2 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "runtime/pprof" "time" "github.com/10gen/migration-verifier/contextplus" @@ -273,7 +274,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh return errors.Wrapf(err, "failed to start %s", csReader) } ceHandlerGroup.Go(func() error { - return verifier.RunChangeEventHandler(groupCtx, csReader) + var err error + + pprof.Do( + groupCtx, + pprof.Labels("component", "change-event-handler"), + func(ctx context.Context) { + err = verifier.RunChangeEventHandler(groupCtx, csReader) + }, + ) + + return err }) } }