Skip to content
Merged
26 changes: 25 additions & 1 deletion internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,16 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
"the verifier should enqueue a recheck",
)

require.Len(suite.T(), recheckDocs, 1)

recheckDocs[0]["_id"] = lo.Filter(
recheckDocs[0]["_id"].(bson.D),
func(el bson.E, _ int) bool {
return el.Key != "rand"
},
)
delete(recheckDocs[0], "rand")

suite.Assert().Equal(
bson.D{
{"db", suite.DBNameForTest()},
Expand Down Expand Up @@ -335,7 +345,21 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve
recheckDocs := []bson.M{}

recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
cursor, err := recheckColl.Find(ctx, bson.D{})
cursor, err := recheckColl.Aggregate(
ctx,
mongo.Pipeline{
{{"$addFields", bson.D{
{"_id.rand", "$$REMOVE"},
}}},
{{"$group", bson.D{
{"_id", "$_id"},
{"doc", bson.D{{"$first", "$$ROOT"}}},
}}},
{{"$replaceRoot", bson.D{
{"newRoot", "$doc"},
}}},
},
)

if !errors.Is(err, mongo.ErrNoDocuments) {
suite.Require().NoError(err)
Expand Down
46 changes: 38 additions & 8 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package verifier
import (
"context"
"fmt"
"math/rand/v2"
"time"

"github.com/10gen/migration-verifier/contextplus"
"github.com/10gen/migration-verifier/internal/reportutils"
Expand Down Expand Up @@ -35,6 +37,17 @@ type RecheckPrimaryKey struct {
SrcDatabaseName string `bson:"db"`
SrcCollectionName string `bson:"coll"`
DocumentID bson.RawValue `bson:"docID"`

// Rand is here to allow “duplicate” entries. We do this because, with
// multiple change streams returning the same events, we expect duplicate
// key errors to be frequent. The server is quite slow in handling such
// errors, though. To avoid that, while still allowing the _id index to
// facilitate easy sorting of the duplicates, we set this field to a
// random value on each entry.
//
// This also avoids duplicate-key slowness where the source workload
// involves frequent writes to a small number of documents.
Rand int32
}

var _ bson.Marshaler = &RecheckPrimaryKey{}
Expand All @@ -47,6 +60,7 @@ func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) {
Type: bsoncore.Type(rk.DocumentID.Type),
Data: rk.DocumentID.Value,
}).
AppendInt32("rand", rk.Rand).
Build(), nil
}

Expand Down Expand Up @@ -86,13 +100,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
Int("count", len(documentIDs)).
Msg("Persisting rechecks for mismatched or missing documents.")

return verifier.insertRecheckDocs(
ctx,
dbNames,
collNames,
documentIDs,
dataSizes,
)
return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes)
}

func (verifier *Verifier) insertRecheckDocs(
Expand Down Expand Up @@ -168,6 +176,7 @@ func (verifier *Verifier) insertRecheckDocs(
SrcDatabaseName: dbName,
SrcCollectionName: collNames[i],
DocumentID: rawDocIDs[i],
Rand: rand.Int32(),
},
DataSize: dataSizes[i],
}
Expand Down Expand Up @@ -344,6 +353,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
Int("rechecksCount", int(rechecksCount)).
Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.")

startTime := time.Now()

// We generate one recheck task per collection, unless
// 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
// the 16MB BSON limit)
Expand All @@ -364,7 +375,11 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
cursor, err := recheckColl.Find(
ctx,
bson.D{},
options.Find().SetSort(bson.D{{"_id", 1}}),
options.Find().
SetSort(bson.D{{"_id", 1}}).
SetProjection(bson.D{
{"_id.rand", 0},
}),
)
if err != nil {
return err
Expand Down Expand Up @@ -403,6 +418,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
return nil
}

var lastIDRaw bson.RawValue

// We group these here using a sort rather than using aggregate because aggregate is
// subject to a 16MB limit on group size.
for cursor.Next(ctx) {
Expand Down Expand Up @@ -439,8 +456,20 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
idsSizer = util.BSONArraySizer{}
dataSizeAccum = 0
idAccum = idAccum[:0]
lastIDRaw = bson.RawValue{}
}

// We’re iterating the rechecks in order such that, if the same doc
// gets enqueued from multiple sources, we’ll see those records
// consecutively. We can deduplicate here, then, by checking to see if
// the doc ID has changed. (NB: At this point we know the namespace
// has *not* changed because we just checked for that.)
if idRaw.Equal(lastIDRaw) {
continue
}

lastIDRaw = idRaw

idsSizer.Add(idRaw)
dataSizeAccum += int64(doc.DataSize)
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
Expand All @@ -461,6 +490,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
Int("generation", 1+prevGeneration).
Int64("totalDocs", int64(totalDocs)).
Str("totalData", reportutils.FmtBytes(totalRecheckData)).
Stringer("timeElapsed", time.Since(startTime)).
Msg("Scheduled documents for recheck in the new generation.")
}

Expand Down
19 changes: 16 additions & 3 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)

Expand Down Expand Up @@ -85,11 +86,23 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc {
metaColl := verifier.getRecheckQueueCollection(verifier.generation)

cursor, err := metaColl.Find(
cursor, err := metaColl.Aggregate(
ctx,
bson.D{},
options.Find().SetProjection(bson.D{{"dataSize", 0}}),
mongo.Pipeline{
{{"$addFields", bson.D{
{"_id.rand", "$$REMOVE"},
{"dataSize", "$$REMOVE"},
}}},
{{"$group", bson.D{
{"_id", "$_id"},
{"doc", bson.D{{"$first", "$$ROOT"}}},
}}},
{{"$replaceRoot", bson.D{
{"newRoot", "$doc"},
}}},
},
)

suite.Require().NoError(err, "find recheck docs")

var results []RecheckDoc
Expand Down