diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 64af7838..d7efdb97 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "runtime/pprof" "time" "github.com/10gen/migration-verifier/history" @@ -11,11 +12,11 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mmongo/cursor" "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" - clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/lo" @@ -23,6 +24,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" "golang.org/x/exp/slices" ) @@ -67,9 +69,10 @@ type ChangeStreamReader struct { logger *logger.Logger namespaces []string - metaDB *mongo.Database - watcherClient *mongo.Client - clusterInfo util.ClusterInfo + metaDB *mongo.Database + watcherClient *mongo.Client + readPreference *readpref.ReadPref + clusterInfo util.ClusterInfo changeStreamRunning bool changeEventBatchChan chan changeEventBatch @@ -108,6 +111,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { for _, csr := range mslices.Of(srcReader, dstReader) { csr.logger = verifier.logger csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) + csr.readPreference = verifier.readPreference csr.changeEventBatchChan = make(chan changeEventBatch) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() @@ -349,50 +353,32 @@ func (csr *ChangeStreamReader) hasBsonSize() bool { func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, - sess *mongo.Session, + csCursor *cursor.BatchCursor, ) error { - eventsRead := 0 - var changeEvents []ParsedEvent - - latestEvent := option.None[ParsedEvent]() + batchLen, err := csCursor.GetCurrentBatchLength() + if err != nil { + return err + } + changeEvents := make([]ParsedEvent, 0, batchLen) var batchTotalBytes int - for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + var latestEvent option.Option[ParsedEvent] - if cs.Err() != nil { - return errors.Wrap(cs.Err(), "change stream iteration failed") - } - - if !gotEvent { - break + for rawEvent, err := range csCursor.GetCurrentBatchIterator() { + if err != nil { + return errors.Wrapf(err, "reading batch of events") } - if changeEvents == nil { - batchSize := cs.RemainingBatchLength() + 1 - - ri.NoteSuccess("received a batch of %d change event(s)", batchSize) + var newEvent ParsedEvent - changeEvents = make([]ParsedEvent, batchSize) + if err := bson.Unmarshal(rawEvent, &newEvent); err != nil { + return errors.Wrapf(err, "parsing change event") } - batchTotalBytes += len(cs.Current) + batchTotalBytes += len(rawEvent) + changeEvents = append(changeEvents, newEvent) - if err := cs.Decode(&changeEvents[eventsRead]); err != nil { - return errors.Wrapf(err, "failed to decode change event to %T", changeEvents[eventsRead]) - } - - // This only logs in tests. - csr.logger.Trace(). - Stringer("changeStream", csr). - Any("event", changeEvents[eventsRead]). - Int("eventsPreviouslyReadInBatch", eventsRead). - Int("batchEvents", len(changeEvents)). - Int("batchBytes", batchTotalBytes). - Msg("Received a change event.") - - opType := changeEvents[eventsRead].OpType + opType := newEvent.OpType if !supportedEventOpTypes.Contains(opType) { // We expect certain DDL events on the destination as part of @@ -403,7 +389,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( if csr.onDDLEvent == onDDLEventAllow { csr.logger.Info(). Stringer("changeStream", csr). - Stringer("event", cs.Current). + Stringer("event", rawEvent). Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") // Discard this event, then keep reading. @@ -411,30 +397,37 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( continue } else { - return UnknownEventError{Event: clone.Clone(cs.Current)} + return UnknownEventError{Event: rawEvent} } } // This shouldn’t happen, but just in case: - if changeEvents[eventsRead].Ns == nil { - return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead]) + if newEvent.Ns == nil { + return errors.Errorf("Change event lacks a namespace: %+v", rawEvent) } - if changeEvents[eventsRead].ClusterTime != nil && + if newEvent.ClusterTime != nil && (csr.lastChangeEventTime == nil || - csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) { + csr.lastChangeEventTime.Before(*newEvent.ClusterTime)) { - csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime - latestEvent = option.Some(changeEvents[eventsRead]) + csr.lastChangeEventTime = newEvent.ClusterTime + latestEvent = option.Some(newEvent) } + } - eventsRead++ + clusterTS, err := csCursor.GetClusterTime() + if err != nil { + return errors.Wrapf(err, "extracting cluster time from server response") } - var tokenTs bson.Timestamp - tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + resumeToken, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrapf(err, "extracting resume token") + } + + tokenTS, err := extractTimestampFromResumeToken(resumeToken) if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) + lagSecs := int64(clusterTS.T) - int64(tokenTS.T) csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) } else { csr.logger.Warn(). @@ -442,13 +435,13 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) } - if eventsRead == 0 { + if len(changeEvents) == 0 { ri.NoteSuccess("received an empty change stream response") return nil } - csr.batchSizeHistory.Add(eventsRead) + csr.batchSizeHistory.Add(len(changeEvents)) if event, has := latestEvent.Get(); has { csr.logger.Trace(). @@ -468,7 +461,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( events: changeEvents, // NB: We know by now that OperationTime is non-nil. - clusterTime: *sess.OperationTime(), + clusterTime: clusterTS, }: } @@ -487,8 +480,7 @@ func (csr *ChangeStreamReader) wrapHandlerErrorForReader() error { func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, - sess *mongo.Session, + csCursor *cursor.BatchCursor, ) error { var lastPersistedTime time.Time @@ -497,7 +489,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( return nil } - err := csr.persistChangeStreamResumeToken(ctx, cs) + err := csr.persistChangeStreamResumeToken(ctx, csCursor) if err == nil { lastPersistedTime = time.Now() } @@ -541,32 +533,44 @@ func (csr *ChangeStreamReader) iterateChangeStream( // Read change events until the stream reaches the writesOffTs. // (i.e., the `getMore` call returns empty) for { + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, csCursor) + if err != nil { + return err + } + + rt, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrap(err, "extracting resume token") + } + var curTs bson.Timestamp - curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + curTs, err = extractTimestampFromResumeToken(rt) if err != nil { - return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return errors.Wrap(err, "extracting timestamp from change stream's resume token") } // writesOffTs never refers to a real event, // so we can stop once curTs >= writesOffTs. if !curTs.Before(writesOffTs) { csr.logger.Debug(). - Any("currentTimestamp", curTs). + Any("resumeTokenTimestamp", curTs). Any("writesOffTimestamp", writesOffTs). Msgf("%s has reached the writesOff timestamp. Shutting down.", csr) break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) - - if err != nil { - return err + if err := csCursor.GetNext(ctx); err != nil { + return errors.Wrap(err, "reading change stream") } } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, csCursor) + + if err := csCursor.GetNext(ctx); err != nil { + return errors.Wrapf(err, "reading %s", csr) + } if err == nil { err = persistResumeTokenIfNeeded() @@ -605,23 +609,25 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, *mongo.Session, bson.Timestamp, error) { - pipeline := csr.GetChangeStreamFilter() - opts := options.ChangeStream(). - SetMaxAwaitTime(maxChangeStreamAwaitTime) +) (*cursor.BatchCursor, bson.Timestamp, error) { + + changeStreamStage := bson.D{ + {"allChangesForCluster", true}, + } if csr.clusterInfo.VersionArray[0] >= 6 { - opts = opts.SetCustomPipeline( - bson.M{ - "showSystemEvents": true, - "showExpandedEvents": true, - }, + changeStreamStage = append( + changeStreamStage, + bson.D{ + {"showSystemEvents", true}, + {"showExpandedEvents", true}, + }..., ) } savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() @@ -641,50 +647,101 @@ func (csr *ChangeStreamReader) createChangeStream( logEvent.Msg("Starting change stream from persisted resume token.") - opts = opts.SetStartAfter(savedResumeToken) + changeStreamStage = append( + changeStreamStage, + bson.D{ + {"startAfter", savedResumeToken}, + }..., + ) } else { csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } sess, err := csr.watcherClient.StartSession() if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") } - sctx := mongo.NewSessionContext(ctx, sess) - changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") + + aggregateCmd := bson.D{ + {"aggregate", 1}, + {"cursor", bson.D{}}, + {"pipeline", append( + mongo.Pipeline{ + {{"$changeStream", changeStreamStage}}, + }, + csr.GetChangeStreamFilter()..., + )}, } - err = csr.persistChangeStreamResumeToken(ctx, changeStream) + sctx := mongo.NewSessionContext(ctx, sess) + adminDB := sess.Client().Database("admin") + result := adminDB.RunCommand( + sctx, + aggregateCmd, + options.RunCmd().SetReadPreference(csr.readPreference), + ) + myCursor, err := cursor.New(adminDB, result) + if err != nil { - return nil, nil, bson.Timestamp{}, err + return nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") } - startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + if savedResumeToken == nil { + err = csr.persistChangeStreamResumeToken(ctx, myCursor) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "persisting initial resume token") + } } - // With sharded clusters the resume token might lead the cluster time - // by 1 increment. In that case we need the actual cluster time; - // otherwise we will get errors. - clusterTime, err := util.GetClusterTimeFromSession(sess) - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + myCursor.SetSession(sess) + myCursor.SetMaxAwaitTime(maxChangeStreamAwaitTime) + myCursor.SetReadPreference(csr.readPreference) + + var startTs bson.Timestamp + for firstEvent, err := range myCursor.GetCurrentBatchIterator() { + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "reading first event") + } + + // If there is no `startAfter`, then the change stream’s first response + // should have no events. If that invariant breaks then we will have + // just persisted a resume token that exceeds events we have yet to + // process. + if savedResumeToken == nil { + panic(fmt.Sprintf("plain change stream first response should be empty; instead got: %v", firstEvent)) + } + + ct, err := firstEvent.LookupErr("clusterTime") + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "extracting first event’s cluster time") + } + + if err := mbson.UnmarshalRawValue(ct, &startTs); err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "parsing first event’s cluster time") + } + + break } - csr.logger.Debug(). - Any("resumeTokenTimestamp", startTs). - Any("clusterTime", clusterTime). - Stringer("changeStreamReader", csr). - Msg("Using earlier time as start timestamp.") + if savedResumeToken == nil { + resumeToken, err := cursor.GetResumeToken(myCursor) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap( + err, + "extracting change stream’s resume token", + ) + } - if startTs.After(clusterTime) { - startTs = clusterTime + startTs, err = extractTimestampFromResumeToken(resumeToken) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap( + err, + "extracting timestamp from change stream’s resume token", + ) + } } - return changeStream, sess, startTs, nil + return myCursor, startTs, nil } // StartChangeStream starts the change stream. @@ -712,7 +769,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) + csCursor, startTs, err := csr.createChangeStream(ctx) if err != nil { logEvent := csr.logger.Debug(). Err(err). @@ -730,8 +787,6 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return err } - defer changeStream.Close(ctx) - logEvent := csr.logger.Debug(). Stringer("changeStreamReader", csr). Any("startTimestamp", startTs) @@ -746,7 +801,15 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { logEvent.Msg("Retried change stream open succeeded.") } - return csr.iterateChangeStream(ctx, ri, changeStream, sess) + pprof.Do( + ctx, + pprof.Labels("component", "iterate-change-stream"), + func(ctx context.Context) { + err = csr.iterateChangeStream(ctx, ri, csCursor) + }, + ) + + return err }, "running %s", csr, ).Run(ctx, csr.logger) @@ -758,12 +821,12 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { result := <-initialCreateResultChan - startTs, err := result.Get() + startTS, err := result.Get() if err != nil { return err } - csr.startAtTs = &startTs + csr.startAtTs = &startTS csr.changeStreamRunning = true @@ -837,11 +900,14 @@ func (csr *ChangeStreamReader) resumeTokenDocID() string { } } -func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error { - token := cs.ResumeToken() +func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, csCursor *cursor.BatchCursor) error { + token, err := cursor.GetResumeToken(csCursor) + if err != nil { + return errors.Wrap(err, "reading cursor’s resume token") + } coll := csr.getChangeStreamMetadataCollection() - _, err := coll.ReplaceOne( + _, err = coll.ReplaceOne( ctx, bson.D{{"_id", csr.resumeTokenDocID()}}, token, diff --git a/internal/verifier/check.go b/internal/verifier/check.go index bd64874f..76a5dca2 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "runtime/pprof" "time" "github.com/10gen/migration-verifier/contextplus" @@ -273,7 +274,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh return errors.Wrapf(err, "failed to start %s", csReader) } ceHandlerGroup.Go(func() error { - return verifier.RunChangeEventHandler(groupCtx, csReader) + var err error + + pprof.Do( + groupCtx, + pprof.Labels("component", "change-event-handler"), + func(ctx context.Context) { + err = verifier.RunChangeEventHandler(groupCtx, csReader) + }, + ) + + return err }) } } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 8c97d43b..f220f40a 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -730,6 +730,7 @@ func (verifier *Verifier) getDocumentsCursor( if err == nil { c.SetSession(mongo.SessionFromContext(ctx)) + c.SetReadPreference(verifier.readPreference) } return c, err diff --git a/internal/verifier/uri.go b/internal/verifier/uri.go index c5ed2cb8..dfef812d 100644 --- a/internal/verifier/uri.go +++ b/internal/verifier/uri.go @@ -11,7 +11,8 @@ import ( ) func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error { - opts := verifier.getClientOpts(uri) + opts := verifier.getClientOpts(uri). + SetReadPreference(verifier.readPreference) var err error verifier.srcClient, err = mongo.Connect(opts) if err != nil { @@ -90,7 +91,9 @@ func isVersionSupported(version []int) bool { } func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error { - opts := verifier.getClientOpts(uri) + opts := verifier.getClientOpts(uri). + SetReadPreference(verifier.readPreference) + var err error verifier.dstClient, err = mongo.Connect(opts) if err != nil { diff --git a/main/migration_verifier.go b/main/migration_verifier.go index aa1c8a65..35b3fd4a 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -276,8 +276,13 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err Int("processID", os.Getpid()). Msg("migration-verifier started.") + err := v.SetReadPreference(cCtx.String(readPreference)) + if err != nil { + return nil, err + } + srcConnStr := cCtx.String(srcURI) - _, srcConnStr, err := mmongo.MaybeAddDirectConnection(srcConnStr) + _, srcConnStr, err = mmongo.MaybeAddDirectConnection(srcConnStr) if err != nil { return nil, errors.Wrap(err, "parsing source connection string") } @@ -350,10 +355,6 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err } v.SetDocCompareMethod(docCompareMethod) - err = v.SetReadPreference(cCtx.String(readPreference)) - if err != nil { - return nil, err - } v.SetFailureDisplaySize(cCtx.Int64(failureDisplaySize)) return v, nil } diff --git a/mmongo/cursor/batch.go b/mmongo/cursor/batch.go index 3b900507..1c1f18d8 100644 --- a/mmongo/cursor/batch.go +++ b/mmongo/cursor/batch.go @@ -17,6 +17,8 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" ) @@ -29,13 +31,29 @@ var ( // because there is no need to clone each document individually, as // mongo.Cursor requires. type BatchCursor struct { - sess *mongo.Session - maxAwaitTime option.Option[time.Duration] - id int64 - ns string - db *mongo.Database - rawResp bson.Raw - curBatch bson.RawArray + sess *mongo.Session + readPreference *readpref.ReadPref + maxAwaitTime option.Option[time.Duration] + id int64 + ns string + db *mongo.Database + rawResp bson.Raw + curBatch bson.RawArray +} + +// GetCurrentBatchLength returns the number of documents in the current batch. +func (c *BatchCursor) GetCurrentBatchLength() (int, error) { + count := 0 + + for _, err := range c.GetCurrentBatchIterator() { + if err != nil { + return 0, errors.Wrap(err, "counting documents in batch") + } + + count++ + } + + return count, nil } // GetCurrentBatchIterator returns an iterator over the BatchCursor’s current batch. @@ -141,7 +159,11 @@ func (c *BatchCursor) GetNext(ctx context.Context, extraPieces ...bson.E) error if c.sess != nil { ctx = mongo.NewSessionContext(ctx, c.sess) } - resp := c.db.RunCommand(ctx, cmd) + resp := c.db.RunCommand( + ctx, + cmd, + options.RunCmd().SetReadPreference(c.readPreference), + ) raw, err := resp.Raw() if err != nil { @@ -226,6 +248,10 @@ func (c *BatchCursor) SetSession(sess *mongo.Session) { c.sess = sess } +func (c *BatchCursor) SetReadPreference(rp *readpref.ReadPref) { + c.readPreference = rp +} + func (c *BatchCursor) SetMaxAwaitTime(d time.Duration) { c.maxAwaitTime = option.Some(d) }