From 4e5e79016f144f92fce6d05b449dc1b025c42020 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 3 Nov 2025 19:53:56 -0500 Subject: [PATCH 01/16] dedupe --- internal/verifier/change_stream.go | 6 +++++- internal/verifier/recheck.go | 26 +++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index d4330077..42cf2d18 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -261,7 +261,11 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch Stringer("lag", lag). Msg("Persisting rechecks for change events.") - return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) + return verifier.insertRecheckDocs( + ctx, + lo.Ternary(eventOrigin == src, recheckCauseSource, recheckCauseDestination), + dbNames, collNames, docIDs, dataSizes, + ) } // GetChangeStreamFilter returns an aggregation pipeline that filters diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 127eae89..a998d8f9 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -35,6 +35,7 @@ type RecheckPrimaryKey struct { SrcDatabaseName string `bson:"db"` SrcCollectionName string `bson:"coll"` DocumentID bson.RawValue `bson:"docID"` + Cause int32 // to prevent dupe-key proliferation } var _ bson.Marshaler = &RecheckPrimaryKey{} @@ -47,6 +48,7 @@ func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) { Type: bsoncore.Type(rk.DocumentID.Type), Data: rk.DocumentID.Value, }). + AppendInt32("cause", rk.Cause). Build(), nil } @@ -88,6 +90,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( return verifier.insertRecheckDocs( ctx, + recheckCauseMismatch, dbNames, collNames, documentIDs, @@ -95,8 +98,15 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( ) } +const ( + recheckCauseMismatch = 0 + recheckCauseSource = 1 + recheckCauseDestination = 2 +) + func (verifier *Verifier) insertRecheckDocs( ctx context.Context, + cause int32, dbNames []string, collNames []string, documentIDs []bson.RawValue, @@ -168,6 +178,7 @@ func (verifier *Verifier) insertRecheckDocs( SrcDatabaseName: dbName, SrcCollectionName: collNames[i], DocumentID: rawDocIDs[i], + Cause: cause, }, DataSize: dataSizes[i], } @@ -364,7 +375,11 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e cursor, err := recheckColl.Find( ctx, bson.D{}, - options.Find().SetSort(bson.D{{"_id", 1}}), + options.Find(). + SetSort(bson.D{{"_id", 1}}). + SetProjection(bson.D{ + {"_id.cause", 0}, + }), ) if err != nil { return err @@ -403,6 +418,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e return nil } + var lastIDRaw bson.RawValue + // We group these here using a sort rather than using aggregate because aggregate is // subject to a 16MB limit on group size. for cursor.Next(ctx) { @@ -439,8 +456,15 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e idsSizer = util.BSONArraySizer{} dataSizeAccum = 0 idAccum = idAccum[:0] + lastIDRaw = bson.RawValue{} } + if idRaw.Equal(lastIDRaw) { + continue + } + + lastIDRaw = idRaw + idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) From 4867147711fd1990187f292b80c17b0b22662d07 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 3 Nov 2025 19:54:30 -0500 Subject: [PATCH 02/16] i32 --- internal/verifier/change_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 42cf2d18..181aaf4f 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -263,7 +263,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch return verifier.insertRecheckDocs( ctx, - lo.Ternary(eventOrigin == src, recheckCauseSource, recheckCauseDestination), + lo.Ternary[int32](eventOrigin == src, recheckCauseSource, recheckCauseDestination), dbNames, collNames, docIDs, dataSizes, ) } From b46272c9042936371c95ac561ccaea78630cfbd3 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 3 Nov 2025 20:02:47 -0500 Subject: [PATCH 03/16] move --- internal/verifier/recheck.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index a998d8f9..98af6284 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -22,6 +22,10 @@ const ( recheckBatchCountLimit = 1000 recheckQueueCollectionNameBase = "recheckQueue" + + recheckCauseMismatch = 0 + recheckCauseSource = 1 + recheckCauseDestination = 2 ) // RecheckPrimaryKey stores the implicit type of recheck to perform @@ -98,12 +102,6 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( ) } -const ( - recheckCauseMismatch = 0 - recheckCauseSource = 1 - recheckCauseDestination = 2 -) - func (verifier *Verifier) insertRecheckDocs( ctx context.Context, cause int32, From d11bb041e648681c8102e1957652a60e11660877 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 3 Nov 2025 20:06:24 -0500 Subject: [PATCH 04/16] comments --- internal/verifier/recheck.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 98af6284..a421aaab 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -457,6 +457,11 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e lastIDRaw = bson.RawValue{} } + // We’re iterating the rechecks in order such that, if the same doc + // gets enqueued from multiple sources, we’ll see those records + // consecutively. We can deduplicate here, then, by checking to see if + // the doc ID has changed. (NB: At this point we know the namespace + // has *not* changed because we just checked for that.) if idRaw.Equal(lastIDRaw) { continue } From f5daf2314cec4d986b99d1b150432a0299232319 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 01:46:51 -0500 Subject: [PATCH 05/16] fix compile --- internal/verifier/recheck_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index f172bd71..063481b6 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -499,5 +499,12 @@ func insertRecheckDocs( }, ) - return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes) + return verifier.insertRecheckDocs( + ctx, + recheckCauseMismatch, + dbNames, + collNames, + rawIDs, + dataSizes, + ) } From 7ac0bea3119a493c5407323761a001fd12c8fcaa Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 10:36:09 -0500 Subject: [PATCH 06/16] double rechecks --- internal/verifier/change_stream_test.go | 4 ++++ internal/verifier/recheck_test.go | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index bbc1dafe..93b9efda 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -306,6 +306,10 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { "the verifier should enqueue a recheck", ) + require.Len(suite.T(), recheckDocs, 1) + + delete(recheckDocs[0], "cause") + suite.Assert().Equal( bson.D{ {"db", suite.DBNameForTest()}, diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 063481b6..deeb1950 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -12,6 +12,7 @@ import ( "github.com/10gen/migration-verifier/mslices" "github.com/rs/zerolog" "github.com/samber/lo" + "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" ) @@ -67,6 +68,11 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { suite.Require().NoError(err) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) + + require.Len(suite.T(), recheckDocs, 1) + + recheckDocs[0].PrimaryKey.Cause = 0 + suite.Assert().Equal( []RecheckDoc{ { @@ -199,7 +205,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() { ) recheckDocs := suite.fetchVerifierRechecks(ctx, verifier2) - suite.Require().Len(recheckDocs, 2, "expect # of rechecks: %+v", recheckDocs) + suite.Require().Len(recheckDocs, 4, "expect # of rechecks: %+v", recheckDocs) } func (suite *IntegrationTestSuite) TestDuplicateRecheck() { From 8c0fea628c045345fe893ceec5440f7ff472a93f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 10:46:12 -0500 Subject: [PATCH 07/16] =?UTF-8?q?fix=20tests=20again=20=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/verifier/change_stream_test.go | 6 ++++++ internal/verifier/recheck_test.go | 15 +++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 93b9efda..b54e0fa9 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -308,6 +308,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { require.Len(suite.T(), recheckDocs, 1) + recheckDocs[0]["_id"] = lo.Filter( + recheckDocs[0]["_id"].(bson.D), + func(el bson.E, _ int) bool { + return el.Key != "cause" + }, + ) delete(recheckDocs[0], "cause") suite.Assert().Equal( diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index deeb1950..5a8956c4 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -69,17 +69,24 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { recheckDocs = suite.fetchRecheckDocs(ctx, verifier) - require.Len(suite.T(), recheckDocs, 1) + require.Len(suite.T(), recheckDocs, 2) - recheckDocs[0].PrimaryKey.Cause = 0 - - suite.Assert().Equal( + suite.Assert().ElementsMatch( []RecheckDoc{ { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "the", SrcCollectionName: "namespace", DocumentID: mbson.ToRawValue("theDocID"), + Cause: recheckCauseMismatch, + }, + }, + { + PrimaryKey: RecheckPrimaryKey{ + SrcDatabaseName: "the", + SrcCollectionName: "namespace", + DocumentID: mbson.ToRawValue("theDocID"), + Cause: recheckCauseSource, }, }, }, From 63c6bdf27163a20a571d7ce0c81377561ffece6c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 11:09:58 -0500 Subject: [PATCH 08/16] save --- internal/verifier/change_stream_test.go | 16 +++++++++++++++- internal/verifier/recheck_test.go | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index b54e0fa9..72c5145a 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -345,7 +345,21 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve recheckDocs := []bson.M{} recheckColl := verifier.getRecheckQueueCollection(verifier.generation) - cursor, err := recheckColl.Find(ctx, bson.D{}) + cursor, err := recheckColl.Aggregate( + ctx, + mongo.Pipeline{ + {{"$addFields", bson.D{ + {"_id.cause", "$$REMOVE"}, + }}}, + {{"$group", bson.D{ + {"_id", "$_id"}, + {"doc", bson.D{{"$first", "$$ROOT"}}}, + }}}, + {{"$replaceRoot", bson.D{ + {"newRoot", "$doc"}, + }}}, + }, + ) if !errors.Is(err, mongo.ErrNoDocuments) { suite.Require().NoError(err) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 5a8956c4..acd3532a 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -212,7 +212,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() { ) recheckDocs := suite.fetchVerifierRechecks(ctx, verifier2) - suite.Require().Len(recheckDocs, 4, "expect # of rechecks: %+v", recheckDocs) + suite.Require().Len(recheckDocs, 2, "expect # of rechecks: %+v", recheckDocs) } func (suite *IntegrationTestSuite) TestDuplicateRecheck() { From 26bc48f4bef9be91d8729b624d4115d15fc43424 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 11:11:07 -0500 Subject: [PATCH 09/16] revert --- internal/verifier/recheck_test.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index acd3532a..33a12553 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -12,7 +12,6 @@ import ( "github.com/10gen/migration-verifier/mslices" "github.com/rs/zerolog" "github.com/samber/lo" - "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" ) @@ -69,24 +68,13 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { recheckDocs = suite.fetchRecheckDocs(ctx, verifier) - require.Len(suite.T(), recheckDocs, 2) - - suite.Assert().ElementsMatch( + suite.Assert().Equal( []RecheckDoc{ { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "the", SrcCollectionName: "namespace", DocumentID: mbson.ToRawValue("theDocID"), - Cause: recheckCauseMismatch, - }, - }, - { - PrimaryKey: RecheckPrimaryKey{ - SrcDatabaseName: "the", - SrcCollectionName: "namespace", - DocumentID: mbson.ToRawValue("theDocID"), - Cause: recheckCauseSource, }, }, }, From 162a74b4b7858d9b7b06f368e1df49179f6533d8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 11:23:44 -0500 Subject: [PATCH 10/16] dedupe --- internal/verifier/recheck_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 33a12553..e0d78551 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) @@ -86,11 +87,23 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc { metaColl := verifier.getRecheckQueueCollection(verifier.generation) - cursor, err := metaColl.Find( + cursor, err := metaColl.Aggregate( ctx, - bson.D{}, - options.Find().SetProjection(bson.D{{"dataSize", 0}}), + mongo.Pipeline{ + {{"$addFields", bson.D{ + {"_id.cause", "$$REMOVE"}, + {"dataSize", "$$REMOVE"}, + }}}, + {{"$group", bson.D{ + {"_id", "$_id"}, + {"doc", bson.D{{"$first", "$$ROOT"}}}, + }}}, + {{"$replaceRoot", bson.D{ + {"newRoot", "$doc"}, + }}}, + }, ) + suite.Require().NoError(err, "find recheck docs") var results []RecheckDoc From b203cbaec56070a137d6020746f34d1645e4bf5e Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 12:44:35 -0500 Subject: [PATCH 11/16] dedupe in aggregation --- internal/verifier/recheck.go | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index a421aaab..5ac492e6 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -370,14 +370,19 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e // The sort here is important because the recheck _id is an embedded // document that includes the namespace. Thus, all rechecks for a given // namespace will be consecutive in this query’s result. - cursor, err := recheckColl.Find( + cursor, err := recheckColl.Aggregate( ctx, - bson.D{}, - options.Find(). - SetSort(bson.D{{"_id", 1}}). - SetProjection(bson.D{ - {"_id.cause", 0}, - }), + mongo.Pipeline{ + {{"$sort", bson.D{{"_id", 1}}}}, + {{"$addFields", bson.D{{"_id.cause", 0}}}}, + {{"$group", bson.D{ + {"_id", "$_id"}, + {"doc", bson.D{{"$first", "$$ROOT"}}}, + }}}, + {{"$replaceRoot", bson.D{ + {"newRoot", "$doc"}, + }}}, + }, ) if err != nil { return err @@ -416,8 +421,6 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e return nil } - var lastIDRaw bson.RawValue - // We group these here using a sort rather than using aggregate because aggregate is // subject to a 16MB limit on group size. for cursor.Next(ctx) { @@ -454,20 +457,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e idsSizer = util.BSONArraySizer{} dataSizeAccum = 0 idAccum = idAccum[:0] - lastIDRaw = bson.RawValue{} - } - - // We’re iterating the rechecks in order such that, if the same doc - // gets enqueued from multiple sources, we’ll see those records - // consecutively. We can deduplicate here, then, by checking to see if - // the doc ID has changed. (NB: At this point we know the namespace - // has *not* changed because we just checked for that.) - if idRaw.Equal(lastIDRaw) { - continue } - lastIDRaw = idRaw - idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) From bef5374a8d631090c5b10b624b7b069dee0769e1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 13:34:22 -0500 Subject: [PATCH 12/16] note a start time --- internal/verifier/recheck.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 5ac492e6..364b17d3 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "time" "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/reportutils" @@ -353,6 +354,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e Int("rechecksCount", int(rechecksCount)). Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.") + startTime := time.Now() + // We generate one recheck task per collection, unless // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding // the 16MB BSON limit) @@ -479,6 +482,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e Int("generation", 1+prevGeneration). Int64("totalDocs", int64(totalDocs)). Str("totalData", reportutils.FmtBytes(totalRecheckData)). + Stringer("timeElapsed", time.Since(startTime)). Msg("Scheduled documents for recheck in the new generation.") } From a6a01d2167d679f8894f735f421289bfa0227003 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 13:47:48 -0500 Subject: [PATCH 13/16] Revert "dedupe in aggregation" This reverts commit b203cbaec56070a137d6020746f34d1645e4bf5e. --- internal/verifier/recheck.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 364b17d3..27f93322 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -373,19 +373,14 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e // The sort here is important because the recheck _id is an embedded // document that includes the namespace. Thus, all rechecks for a given // namespace will be consecutive in this query’s result. - cursor, err := recheckColl.Aggregate( + cursor, err := recheckColl.Find( ctx, - mongo.Pipeline{ - {{"$sort", bson.D{{"_id", 1}}}}, - {{"$addFields", bson.D{{"_id.cause", 0}}}}, - {{"$group", bson.D{ - {"_id", "$_id"}, - {"doc", bson.D{{"$first", "$$ROOT"}}}, - }}}, - {{"$replaceRoot", bson.D{ - {"newRoot", "$doc"}, - }}}, - }, + bson.D{}, + options.Find(). + SetSort(bson.D{{"_id", 1}}). + SetProjection(bson.D{ + {"_id.cause", 0}, + }), ) if err != nil { return err @@ -424,6 +419,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e return nil } + var lastIDRaw bson.RawValue + // We group these here using a sort rather than using aggregate because aggregate is // subject to a 16MB limit on group size. for cursor.Next(ctx) { @@ -460,8 +457,20 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e idsSizer = util.BSONArraySizer{} dataSizeAccum = 0 idAccum = idAccum[:0] + lastIDRaw = bson.RawValue{} + } + + // We’re iterating the rechecks in order such that, if the same doc + // gets enqueued from multiple sources, we’ll see those records + // consecutively. We can deduplicate here, then, by checking to see if + // the doc ID has changed. (NB: At this point we know the namespace + // has *not* changed because we just checked for that.) + if idRaw.Equal(lastIDRaw) { + continue } + lastIDRaw = idRaw + idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) From 99246c9a3be14c6162e8f75515c2ec9d169fe4a4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 13:50:57 -0500 Subject: [PATCH 14/16] types --- internal/verifier/change_stream.go | 2 +- internal/verifier/recheck.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 181aaf4f..42cf2d18 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -263,7 +263,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch return verifier.insertRecheckDocs( ctx, - lo.Ternary[int32](eventOrigin == src, recheckCauseSource, recheckCauseDestination), + lo.Ternary(eventOrigin == src, recheckCauseSource, recheckCauseDestination), dbNames, collNames, docIDs, dataSizes, ) } diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 27f93322..5e11d8cd 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -24,9 +24,9 @@ const ( recheckQueueCollectionNameBase = "recheckQueue" - recheckCauseMismatch = 0 - recheckCauseSource = 1 - recheckCauseDestination = 2 + recheckCauseMismatch int32 = 0 + recheckCauseSource int32 = 1 + recheckCauseDestination int32 = 2 ) // RecheckPrimaryKey stores the implicit type of recheck to perform From 5c55dfe69366f4ce3f83884f3822f0455f20df26 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 14:04:29 -0500 Subject: [PATCH 15/16] rand instead of cause --- internal/verifier/change_stream_test.go | 6 ++--- internal/verifier/recheck.go | 33 ++++++++++++------------- internal/verifier/recheck_test.go | 12 ++------- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 72c5145a..afa6c439 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -311,10 +311,10 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { recheckDocs[0]["_id"] = lo.Filter( recheckDocs[0]["_id"].(bson.D), func(el bson.E, _ int) bool { - return el.Key != "cause" + return el.Key != "rand" }, ) - delete(recheckDocs[0], "cause") + delete(recheckDocs[0], "rand") suite.Assert().Equal( bson.D{ @@ -349,7 +349,7 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve ctx, mongo.Pipeline{ {{"$addFields", bson.D{ - {"_id.cause", "$$REMOVE"}, + {"_id.rand", "$$REMOVE"}, }}}, {{"$group", bson.D{ {"_id", "$_id"}, diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 5e11d8cd..ebb46a24 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "math/rand/v2" "time" "github.com/10gen/migration-verifier/contextplus" @@ -23,10 +24,6 @@ const ( recheckBatchCountLimit = 1000 recheckQueueCollectionNameBase = "recheckQueue" - - recheckCauseMismatch int32 = 0 - recheckCauseSource int32 = 1 - recheckCauseDestination int32 = 2 ) // RecheckPrimaryKey stores the implicit type of recheck to perform @@ -40,7 +37,17 @@ type RecheckPrimaryKey struct { SrcDatabaseName string `bson:"db"` SrcCollectionName string `bson:"coll"` DocumentID bson.RawValue `bson:"docID"` - Cause int32 // to prevent dupe-key proliferation + + // Rand is here to allow “duplicate” entries. We do this because, with + // multiple change streams returning the same events, we expect duplicate + // key errors to be frequent. The server is quite slow in handling such + // errors, though. To avoid that, while still allowing the _id index to + // facilitate easy sorting of the duplicates, we set this field to a + // random value on each entry. + // + // This also avoids duplicate-key slowness where the source workload + // involves frequent writes to a small number of documents. + Rand int32 } var _ bson.Marshaler = &RecheckPrimaryKey{} @@ -53,7 +60,7 @@ func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) { Type: bsoncore.Type(rk.DocumentID.Type), Data: rk.DocumentID.Value, }). - AppendInt32("cause", rk.Cause). + AppendInt32("rand", rk.Rand). Build(), nil } @@ -93,19 +100,11 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( Int("count", len(documentIDs)). Msg("Persisting rechecks for mismatched or missing documents.") - return verifier.insertRecheckDocs( - ctx, - recheckCauseMismatch, - dbNames, - collNames, - documentIDs, - dataSizes, - ) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) } func (verifier *Verifier) insertRecheckDocs( ctx context.Context, - cause int32, dbNames []string, collNames []string, documentIDs []bson.RawValue, @@ -177,7 +176,7 @@ func (verifier *Verifier) insertRecheckDocs( SrcDatabaseName: dbName, SrcCollectionName: collNames[i], DocumentID: rawDocIDs[i], - Cause: cause, + Rand: rand.Int32(), }, DataSize: dataSizes[i], } @@ -379,7 +378,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e options.Find(). SetSort(bson.D{{"_id", 1}}). SetProjection(bson.D{ - {"_id.cause", 0}, + {"_id.rand", 0}, }), ) if err != nil { diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index e0d78551..e6b5ff81 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -68,7 +68,6 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { suite.Require().NoError(err) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) - suite.Assert().Equal( []RecheckDoc{ { @@ -91,7 +90,7 @@ func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifie ctx, mongo.Pipeline{ {{"$addFields", bson.D{ - {"_id.cause", "$$REMOVE"}, + {"_id.rand", "$$REMOVE"}, {"dataSize", "$$REMOVE"}, }}}, {{"$group", bson.D{ @@ -513,12 +512,5 @@ func insertRecheckDocs( }, ) - return verifier.insertRecheckDocs( - ctx, - recheckCauseMismatch, - dbNames, - collNames, - rawIDs, - dataSizes, - ) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes) } From 0dff2ec1ba3af68ec6c5dcf6c055c10a49aaa9a4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 14:05:35 -0500 Subject: [PATCH 16/16] revert --- internal/verifier/change_stream.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 42cf2d18..d4330077 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -261,11 +261,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch Stringer("lag", lag). Msg("Persisting rechecks for change events.") - return verifier.insertRecheckDocs( - ctx, - lo.Ternary(eventOrigin == src, recheckCauseSource, recheckCauseDestination), - dbNames, collNames, docIDs, dataSizes, - ) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) } // GetChangeStreamFilter returns an aggregation pipeline that filters