diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index bbc1dafe..afa6c439 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -306,6 +306,16 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { "the verifier should enqueue a recheck", ) + require.Len(suite.T(), recheckDocs, 1) + + recheckDocs[0]["_id"] = lo.Filter( + recheckDocs[0]["_id"].(bson.D), + func(el bson.E, _ int) bool { + return el.Key != "rand" + }, + ) + delete(recheckDocs[0], "rand") + suite.Assert().Equal( bson.D{ {"db", suite.DBNameForTest()}, @@ -335,7 +345,21 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve recheckDocs := []bson.M{} recheckColl := verifier.getRecheckQueueCollection(verifier.generation) - cursor, err := recheckColl.Find(ctx, bson.D{}) + cursor, err := recheckColl.Aggregate( + ctx, + mongo.Pipeline{ + {{"$addFields", bson.D{ + {"_id.rand", "$$REMOVE"}, + }}}, + {{"$group", bson.D{ + {"_id", "$_id"}, + {"doc", bson.D{{"$first", "$$ROOT"}}}, + }}}, + {{"$replaceRoot", bson.D{ + {"newRoot", "$doc"}, + }}}, + }, + ) if !errors.Is(err, mongo.ErrNoDocuments) { suite.Require().NoError(err) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 127eae89..ebb46a24 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -3,6 +3,8 @@ package verifier import ( "context" "fmt" + "math/rand/v2" + "time" "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/reportutils" @@ -35,6 +37,17 @@ type RecheckPrimaryKey struct { SrcDatabaseName string `bson:"db"` SrcCollectionName string `bson:"coll"` DocumentID bson.RawValue `bson:"docID"` + + // Rand is here to allow “duplicate” entries. We do this because, with + // multiple change streams returning the same events, we expect duplicate + // key errors to be frequent. The server is quite slow in handling such + // errors, though. To avoid that, while still allowing the _id index to + // facilitate easy sorting of the duplicates, we set this field to a + // random value on each entry. + // + // This also avoids duplicate-key slowness where the source workload + // involves frequent writes to a small number of documents. + Rand int32 } var _ bson.Marshaler = &RecheckPrimaryKey{} @@ -47,6 +60,7 @@ func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) { Type: bsoncore.Type(rk.DocumentID.Type), Data: rk.DocumentID.Value, }). + AppendInt32("rand", rk.Rand). Build(), nil } @@ -86,13 +100,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( Int("count", len(documentIDs)). Msg("Persisting rechecks for mismatched or missing documents.") - return verifier.insertRecheckDocs( - ctx, - dbNames, - collNames, - documentIDs, - dataSizes, - ) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) } func (verifier *Verifier) insertRecheckDocs( @@ -168,6 +176,7 @@ func (verifier *Verifier) insertRecheckDocs( SrcDatabaseName: dbName, SrcCollectionName: collNames[i], DocumentID: rawDocIDs[i], + Rand: rand.Int32(), }, DataSize: dataSizes[i], } @@ -344,6 +353,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e Int("rechecksCount", int(rechecksCount)). Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.") + startTime := time.Now() + // We generate one recheck task per collection, unless // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding // the 16MB BSON limit) @@ -364,7 +375,11 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e cursor, err := recheckColl.Find( ctx, bson.D{}, - options.Find().SetSort(bson.D{{"_id", 1}}), + options.Find(). + SetSort(bson.D{{"_id", 1}}). + SetProjection(bson.D{ + {"_id.rand", 0}, + }), ) if err != nil { return err @@ -403,6 +418,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e return nil } + var lastIDRaw bson.RawValue + // We group these here using a sort rather than using aggregate because aggregate is // subject to a 16MB limit on group size. for cursor.Next(ctx) { @@ -439,8 +456,20 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e idsSizer = util.BSONArraySizer{} dataSizeAccum = 0 idAccum = idAccum[:0] + lastIDRaw = bson.RawValue{} + } + + // We’re iterating the rechecks in order such that, if the same doc + // gets enqueued from multiple sources, we’ll see those records + // consecutively. We can deduplicate here, then, by checking to see if + // the doc ID has changed. (NB: At this point we know the namespace + // has *not* changed because we just checked for that.) + if idRaw.Equal(lastIDRaw) { + continue } + lastIDRaw = idRaw + idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) @@ -461,6 +490,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e Int("generation", 1+prevGeneration). Int64("totalDocs", int64(totalDocs)). Str("totalData", reportutils.FmtBytes(totalRecheckData)). + Stringer("timeElapsed", time.Since(startTime)). Msg("Scheduled documents for recheck in the new generation.") } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index f172bd71..e6b5ff81 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) @@ -85,11 +86,23 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc { metaColl := verifier.getRecheckQueueCollection(verifier.generation) - cursor, err := metaColl.Find( + cursor, err := metaColl.Aggregate( ctx, - bson.D{}, - options.Find().SetProjection(bson.D{{"dataSize", 0}}), + mongo.Pipeline{ + {{"$addFields", bson.D{ + {"_id.rand", "$$REMOVE"}, + {"dataSize", "$$REMOVE"}, + }}}, + {{"$group", bson.D{ + {"_id", "$_id"}, + {"doc", bson.D{{"$first", "$$ROOT"}}}, + }}}, + {{"$replaceRoot", bson.D{ + {"newRoot", "$doc"}, + }}}, + }, ) + suite.Require().NoError(err, "find recheck docs") var results []RecheckDoc