diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 04f1a2c9..4fd488e2 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -315,10 +315,10 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( Msg("Updated lastChangeEventTime.") } - var curTs primitive.Timestamp - curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + var tokenTs primitive.Timestamp + tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) if err == nil { - lagSecs := curTs.T - sess.OperationTime().T + lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) } else { csr.logger.Warn(). diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6c1ded7e..f33977a5 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -303,6 +303,57 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve return recheckDocs } +func (suite *IntegrationTestSuite) TestChangeStreamLag() { + zerolog.SetGlobalLevel(zerolog.TraceLevel) + + ctx := suite.Context() + + db := suite.srcMongoClient. + Database(suite.DBNameForTest()) + + suite.Require().NoError( + db.CreateCollection(ctx, "mycoll"), + ) + + verifier := suite.BuildVerifier() + + verifier.SetSrcNamespaces([]string{db.Name() + ".mycoll"}) + verifier.SetDstNamespaces([]string{db.Name() + ".mycoll"}) + verifier.SetNamespaceMap() + + verifierRunner := RunVerifierCheck(ctx, suite.T(), verifier) + suite.Require().NoError( + verifierRunner.AwaitGenerationEnd(), + ) + + _, err := db.Collection("mycoll").InsertOne(ctx, bson.D{}) + suite.Require().NoError(err) + + // On sharded clusters sometimes the event hasn’t shown yet. + suite.Require().Eventually( + func() bool { + suite.Require().NoError( + verifierRunner.StartNextGeneration(), + ) + suite.Require().NoError( + verifierRunner.AwaitGenerationEnd(), + ) + + return verifier.srcChangeStreamReader.GetLag().IsSome() + }, + time.Minute, + 100*time.Millisecond, + ) + + // NB: The lag will include whatever time elapsed above before + // verifier read the event, so it can be several seconds. + suite.Assert().Less( + verifier.srcChangeStreamReader.GetLag().MustGet(), + 10*time.Minute, + "verifier lag is as expected", + ) +} + func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { zerolog.SetGlobalLevel(zerolog.TraceLevel)