From 4eb2fbb363c8f9f9b9e215374633da1a2cd8db05 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 8 Nov 2024 09:06:31 -0500 Subject: [PATCH 1/4] Change stream error-handling fixes. * If an error happens in readOneChangeEvent, that error will now be reported via verifier.changeStreamErrChan. Previously the change stream would just stop. * The change stream will now stop after reporting an error. Previously it would keep iterating. --- internal/verifier/change_stream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index f7ab8e97..a85214ea 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -143,10 +143,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } } - if err != nil { - break - } - default: _, err = readOneChangeEvent() } @@ -157,6 +153,10 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha if err != nil && !errors.Is(err, context.Canceled) { verifier.changeStreamErrChan <- err + + if !changeStreamEnded { + break + } } if changeStreamEnded { From 1b05f65fad665dc1a29b6096099836c91d522022 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 8 Nov 2024 09:12:53 -0500 Subject: [PATCH 2/4] =?UTF-8?q?Fix=20TestVerifierWithFilter=E2=80=99s=20ch?= =?UTF-8?q?ange=20stream.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously this function’s verifier would on occasion fail because it would see the `drop` events from TearDownTest. This change causes that verifier’s change stream to end, which should prevent that failure. --- internal/verifier/migration_verifier_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 5aaa6503..5a9417f6 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1567,6 +1567,10 @@ func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() { // Turn writes off. verifier.WritesOff(ctx) + + // Tell CheckDriver to do one more pass. This should terminate the change stream. + checkContinueChan <- struct{}{} + <-checkDoneChan } func (suite *MultiDataVersionTestSuite) TestPartitionWithFilter() { From 5fa90b8ce52ce59b803da781c90c99376b866907 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 8 Nov 2024 09:23:36 -0500 Subject: [PATCH 3/4] Add more debug statements; move a couple variable declarations. These will hopefully aid in debugging. --- internal/verifier/change_stream.go | 15 ++++++++++----- internal/verifier/check.go | 23 +++++++++++++++++++++-- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index a85214ea..4738cf64 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -25,7 +25,7 @@ type ParsedEvent struct { } func (pe *ParsedEvent) String() string { - return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID) + return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime) } // DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about @@ -44,7 +44,7 @@ type UnknownEventError struct { } func (uee UnknownEventError) Error() string { - return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType) + return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) } // HandleChangeStreamEvent performs the necessary work for change stream events that occur during @@ -83,8 +83,6 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D { } func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { - var changeEvent ParsedEvent - var lastPersistedTime time.Time persistResumeTokenIfNeeded := func() error { @@ -103,6 +101,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha readOneChangeEvent := func() (bool, error) { gotEvent := cs.TryNext(ctx) if gotEvent { + var changeEvent ParsedEvent if err := cs.Decode(&changeEvent); err != nil { return false, errors.Wrap(err, "failed to decode change event") } @@ -129,13 +128,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // source writes are ended. This means we should exit rather than continue // reading the change stream since there should be no more events. case <-verifier.changeStreamEnderChan: - var gotEvent bool + verifier.logger.Debug(). + Msg("Change stream thread received shutdown request.") changeStreamEnded = true // Read all change events until the source reports no events. // (i.e., the `getMore` call returns empty) for { + var gotEvent bool gotEvent, err = readOneChangeEvent() if !gotEvent || err != nil { @@ -152,6 +153,10 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } if err != nil && !errors.Is(err, context.Canceled) { + verifier.logger.Debug(). + Err(err). + Msg("Sending change stream error.") + verifier.changeStreamErrChan <- err if !changeStreamEnded { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index fa5b2fd0..9f90f9fa 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -51,8 +51,12 @@ func (verifier *Verifier) waitForChangeStream() error { verifier.changeStreamEnderChan <- struct{}{} select { case err := <-verifier.changeStreamErrChan: + verifier.logger.Warn().Err(err). + Msg("Received error from change stream.") return err case <-verifier.changeStreamDoneChan: + verifier.logger.Debug(). + Msg("Received completion signal from change stream.") break } } @@ -188,16 +192,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any return err } // we will only coordinate when the number of channels is exactly 2. - // * Channel 0 signals a generation is done - // * Channel 1 signals to check to continue the next generation + // * Channel 0 informs the test of a generation bounary. + // * Block until the test (via channel 1) tells us to do the + // next generation. if len(testChan) == 2 { + + verifier.logger.Debug(). + Msg("Telling test about generation boundary.") testChan[0] <- struct{}{} + + verifier.logger.Debug(). + Msg("Awaiting test's signal to continue.") <-testChan[1] + + verifier.logger.Debug(). + Msg("Received test's signal. Continuing.") } time.Sleep(verifier.generationPauseDelayMillis * time.Millisecond) verifier.mux.Lock() if verifier.lastGeneration { verifier.mux.Unlock() + + verifier.logger.Debug(). + Int("generation", verifier.generation). + Msg("Final generation done.") + return nil } // TODO: wait here until writesOff is hit or enough time has passed, so we don't spin From 7205696dc1aa5ab93445a7d632d9cb88dd62600d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 8 Nov 2024 09:31:34 -0500 Subject: [PATCH 4/4] =?UTF-8?q?Log=20the=20change=20stream=E2=80=99s=20las?= =?UTF-8?q?t=20timestamp.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/verifier/change_stream.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4738cf64..eb0cb6f0 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -174,11 +174,18 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // since we have started Recheck, we must signal that we have // finished the change stream changes so that Recheck can continue. verifier.changeStreamDoneChan <- struct{}{} - // since the changeStream is exhausted, we now return - verifier.logger.Debug().Msg("Change stream is done") - return + break } } + + infoLog := verifier.logger.Info() + if verifier.lastChangeEventTime == nil { + infoLog = infoLog.Str("changeStreamStopTime", "none") + } else { + infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime) + } + + infoLog.Msg("Change stream is done.") } // StartChangeStream starts the change stream.