diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 3c71cef5..e03a0c39 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -15,13 +15,16 @@ import ( "golang.org/x/exp/constraints" ) +const fauxDocSizeForDeleteEvents = 1024 + // ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'. type ParsedEvent struct { - ID interface{} `bson:"_id"` - OpType string `bson:"operationType"` - Ns *Namespace `bson:"ns,omitempty"` - DocKey DocKey `bson:"documentKey,omitempty"` - ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"` + ID interface{} `bson:"_id"` + OpType string `bson:"operationType"` + Ns *Namespace `bson:"ns,omitempty"` + DocKey DocKey `bson:"documentKey,omitempty"` + FullDocument bson.Raw `bson:"fullDocument,omitempty"` + ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"` } func (pe *ParsedEvent) String() string { @@ -79,12 +82,14 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] collNames[i] = changeEvent.Ns.Coll docIDs[i] = changeEvent.DocKey.ID - // We don't know the document sizes for documents for all change events, - // so just be conservative and assume they are maximum size. - // - // Note that this prevents us from being able to report a meaningful - // total data size for noninitial generations in the log. - dataSizes[i] = maxBSONObjSize + if changeEvent.FullDocument == nil { + // This happens for deletes and for some updates. + // The document is probably, but not necessarily, deleted. + dataSizes[i] = fauxDocSizeForDeleteEvents + } else { + // This happens for inserts, replaces, and most updates. + dataSizes[i] = len(changeEvent.FullDocument) + } default: return UnknownEventError{Event: &changeEvent} } @@ -97,6 +102,13 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) } +// GetChangeStreamFilter returns an aggregation pipeline that filters +// namespaces as per configuration. +// +// NB: Ideally we could make the change stream give $bsonSize(fullDocument) +// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still +// want to verify migrations from 4.2. fullDocument is unlikely to be a +// bottleneck anyway. func (verifier *Verifier) GetChangeStreamFilter() []bson.D { if len(verifier.srcNamespaces) == 0 { return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}} @@ -236,7 +248,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // StartChangeStream starts the change stream. func (verifier *Verifier) StartChangeStream(ctx context.Context) error { pipeline := verifier.GetChangeStreamFilter() - opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) + opts := options.ChangeStream(). + SetMaxAwaitTime(1 * time.Second). + SetFullDocument(options.UpdateLookup) savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) if err != nil { diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index fe4ff0ed..735bcf7f 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -4,6 +4,7 @@ import ( "context" "strings" + "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/types" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" @@ -48,6 +49,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { DB: "the", Coll: "namespace", }, + FullDocument: testutil.MustMarshal(bson.D{{"foo", 1}}), } err := verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) @@ -63,7 +65,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { CollectionName: "namespace", DocumentID: "theDocID", }, - DataSize: maxBSONObjSize, + DataSize: len(event.FullDocument), }, }, recheckDocs,