diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index f7ab8e97..eb0cb6f0 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -25,7 +25,7 @@ type ParsedEvent struct { } func (pe *ParsedEvent) String() string { - return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID) + return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime) } // DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about @@ -44,7 +44,7 @@ type UnknownEventError struct { } func (uee UnknownEventError) Error() string { - return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType) + return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) } // HandleChangeStreamEvent performs the necessary work for change stream events that occur during @@ -83,8 +83,6 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D { } func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { - var changeEvent ParsedEvent - var lastPersistedTime time.Time persistResumeTokenIfNeeded := func() error { @@ -103,6 +101,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha readOneChangeEvent := func() (bool, error) { gotEvent := cs.TryNext(ctx) if gotEvent { + var changeEvent ParsedEvent if err := cs.Decode(&changeEvent); err != nil { return false, errors.Wrap(err, "failed to decode change event") } @@ -129,13 +128,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // source writes are ended. This means we should exit rather than continue // reading the change stream since there should be no more events. case <-verifier.changeStreamEnderChan: - var gotEvent bool + verifier.logger.Debug(). + Msg("Change stream thread received shutdown request.") changeStreamEnded = true // Read all change events until the source reports no events. // (i.e., the `getMore` call returns empty) for { + var gotEvent bool gotEvent, err = readOneChangeEvent() if !gotEvent || err != nil { @@ -143,10 +144,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } } - if err != nil { - break - } - default: _, err = readOneChangeEvent() } @@ -156,7 +153,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } if err != nil && !errors.Is(err, context.Canceled) { + verifier.logger.Debug(). + Err(err). + Msg("Sending change stream error.") + verifier.changeStreamErrChan <- err + + if !changeStreamEnded { + break + } } if changeStreamEnded { @@ -169,11 +174,18 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // since we have started Recheck, we must signal that we have // finished the change stream changes so that Recheck can continue. verifier.changeStreamDoneChan <- struct{}{} - // since the changeStream is exhausted, we now return - verifier.logger.Debug().Msg("Change stream is done") - return + break } } + + infoLog := verifier.logger.Info() + if verifier.lastChangeEventTime == nil { + infoLog = infoLog.Str("changeStreamStopTime", "none") + } else { + infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime) + } + + infoLog.Msg("Change stream is done.") } // StartChangeStream starts the change stream. diff --git a/internal/verifier/check.go b/internal/verifier/check.go index fa5b2fd0..9f90f9fa 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -51,8 +51,12 @@ func (verifier *Verifier) waitForChangeStream() error { verifier.changeStreamEnderChan <- struct{}{} select { case err := <-verifier.changeStreamErrChan: + verifier.logger.Warn().Err(err). + Msg("Received error from change stream.") return err case <-verifier.changeStreamDoneChan: + verifier.logger.Debug(). + Msg("Received completion signal from change stream.") break } } @@ -188,16 +192,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any return err } // we will only coordinate when the number of channels is exactly 2. - // * Channel 0 signals a generation is done - // * Channel 1 signals to check to continue the next generation + // * Channel 0 informs the test of a generation bounary. + // * Block until the test (via channel 1) tells us to do the + // next generation. if len(testChan) == 2 { + + verifier.logger.Debug(). + Msg("Telling test about generation boundary.") testChan[0] <- struct{}{} + + verifier.logger.Debug(). + Msg("Awaiting test's signal to continue.") <-testChan[1] + + verifier.logger.Debug(). + Msg("Received test's signal. Continuing.") } time.Sleep(verifier.generationPauseDelayMillis * time.Millisecond) verifier.mux.Lock() if verifier.lastGeneration { verifier.mux.Unlock() + + verifier.logger.Debug(). + Int("generation", verifier.generation). + Msg("Final generation done.") + return nil } // TODO: wait here until writesOff is hit or enough time has passed, so we don't spin diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 5aaa6503..5a9417f6 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1567,6 +1567,10 @@ func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() { // Turn writes off. verifier.WritesOff(ctx) + + // Tell CheckDriver to do one more pass. This should terminate the change stream. + checkContinueChan <- struct{}{} + <-checkDoneChan } func (suite *MultiDataVersionTestSuite) TestPartitionWithFilter() {