Skip to content
38 changes: 32 additions & 6 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ const (
type changeReader interface {
getWhichCluster() whichCluster
getReadChannel() <-chan changeEventBatch
getStartTimestamp() option.Option[bson.Timestamp]
getStartTimestamp() bson.Timestamp
getLastSeenClusterTime() option.Option[bson.Timestamp]
getEventsPerSecond() option.Option[float64]
getLag() option.Option[time.Duration]
getBufferSaturation() float64
Expand All @@ -48,9 +49,8 @@ type changeReader interface {
type ChangeReaderCommon struct {
readerType whichCluster

lastChangeEventTime *bson.Timestamp
logger *logger.Logger
namespaces []string
logger *logger.Logger
namespaces []string

metaDB *mongo.Database
watcherClient *mongo.Client
Expand All @@ -62,6 +62,8 @@ type ChangeReaderCommon struct {
changeEventBatchChan chan changeEventBatch
writesOffTs *util.Eventual[bson.Timestamp]

lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]]

startAtTs *bson.Timestamp

lag *msync.TypedAtomic[option.Option[time.Duration]]
Expand All @@ -70,12 +72,32 @@ type ChangeReaderCommon struct {
onDDLEvent ddlEventHandling
}

func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon {
return ChangeReaderCommon{
readerType: clusterName,
changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize),
writesOffTs: util.NewEventual[bson.Timestamp](),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()),
batchSizeHistory: history.New[int](time.Minute),
onDDLEvent: lo.Ternary(
clusterName == dst,
onDDLEventAllow,
"",
),
}
}

func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
return rc.readerType
}

func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
return option.FromPointer(rc.startAtTs)
func (rc *ChangeReaderCommon) getStartTimestamp() bson.Timestamp {
if rc.startAtTs == nil {
panic("no start timestamp yet?!?")
}

return *rc.startAtTs
}

func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) {
Expand All @@ -90,6 +112,10 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
return rc.changeEventBatchChan
}

func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timestamp] {
return rc.lastChangeEventTime.Load()
}

// getBufferSaturation returns the reader’s internal buffer’s saturation level
// as a fraction. If saturation rises, that means we’re reading events faster
// than we can persist them.
Expand Down
43 changes: 30 additions & 13 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ type ChangeStreamReader struct {

var _ changeReader = &ChangeStreamReader{}

func (v *Verifier) newChangeStreamReader(
namespaces []string,
cluster whichCluster,
client *mongo.Client,
clusterInfo util.ClusterInfo,
) *ChangeStreamReader {
common := newChangeReaderCommon(cluster)
common.namespaces = namespaces
common.readerType = cluster
common.watcherClient = client
common.clusterInfo = clusterInfo

common.logger = v.logger
common.metaDB = v.metaClient.Database(v.metaDBName)

common.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken

csr := &ChangeStreamReader{ChangeReaderCommon: common}

return csr
}

// GetChangeStreamFilter returns an aggregation pipeline that filters
// namespaces as per configuration.
//
Expand Down Expand Up @@ -193,11 +215,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead])
}

if changeEvents[eventsRead].ClusterTime != nil &&
(csr.lastChangeEventTime == nil ||
csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) {

csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime
eventTime := changeEvents[eventsRead].ClusterTime
if eventTime != nil && csr.lastChangeEventTime.Load().OrZero().Before(*eventTime) {
csr.lastChangeEventTime.Store(option.Some(*eventTime))
latestEvent = option.Some(changeEvents[eventsRead])
}

Expand Down Expand Up @@ -230,9 +250,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
events: changeEvents,

resumeToken: cs.ResumeToken(),

// NB: We know by now that OperationTime is non-nil.
clusterTime: *sess.OperationTime(),
}:
}

Expand Down Expand Up @@ -314,19 +331,19 @@ func (csr *ChangeStreamReader) iterateChangeStream(

if gotwritesOffTimestamp {
csr.running = false
if csr.lastChangeEventTime != nil {
csr.startAtTs = csr.lastChangeEventTime
if ts, has := csr.lastChangeEventTime.Load().Get(); has {
csr.startAtTs = &ts
}

break
}
}

infoLog := csr.logger.Info()
if csr.lastChangeEventTime == nil {
infoLog = infoLog.Str("lastEventTime", "none")
if ts, has := csr.lastChangeEventTime.Load().Get(); has {
infoLog = infoLog.Any("lastEventTime", ts)
} else {
infoLog = infoLog.Any("lastEventTime", *csr.lastChangeEventTime)
infoLog = infoLog.Str("lastEventTime", "none")
}

infoLog.
Expand Down
18 changes: 6 additions & 12 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {

suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)

startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get()

suite.Require().True(hasStartAtTs)
startAtTs := verifier2.srcChangeReader.getStartTimestamp()

suite.Assert().False(
startAtTs.After(newTime),
Expand Down Expand Up @@ -631,14 +629,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {

eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
suite.Require().True(hasStartAtTs, "startAtTs should be set")
startAtTs := verifier.srcChangeReader.getStartTimestamp()

verifier.srcChangeReader.setWritesOff(insertTs)

suite.Require().NoError(eg.Wait())

startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet()
startAtTs2 := verifier.srcChangeReader.getStartTimestamp()

suite.Require().False(
startAtTs2.Before(startAtTs),
Expand All @@ -663,8 +660,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
suite.Require().NotNil(origSessionTime)
eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
suite.Require().True(hasStartAtTs, "startAtTs should be set")
startAtTs := verifier.srcChangeReader.getStartTimestamp()

// srcStartAtTs derives from the change stream’s resume token, which can
// postdate our session time but should not precede it.
Expand Down Expand Up @@ -697,8 +693,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {

suite.Require().NoError(eg.Wait())

startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get()
suite.Require().True(hasStartAtTs, "startAtTs should be set")
startAtTs = verifier.srcChangeReader.getStartTimestamp()

suite.Assert().Equal(
*postEventsSessionTime,
Expand All @@ -720,8 +715,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
suite.Require().NotNil(origStartTs)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
suite.Require().True(hasStartAtTs, "startAtTs should be set")
startAtTs := verifier.srcChangeReader.getStartTimestamp()

suite.Require().NotNil(startAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0)
Expand Down
104 changes: 48 additions & 56 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ import (
"time"

"github.com/10gen/migration-verifier/contextplus"
"github.com/10gen/migration-verifier/history"
"github.com/10gen/migration-verifier/internal/logger"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
mapset "github.com/deckarep/golang-set/v2"
"github.com/goaux/timer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -255,28 +251,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
verifier.phase = Idle
}()

changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx)
for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) {
if changeReader.isRunning() {
verifier.logger.Debug().Msgf("Check: %s already running.", changeReader)
} else {
verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader)

err = changeReader.start(groupCtx, changeReaderGroup)
if err != nil {
return errors.Wrapf(err, "failed to start %s", changeReader)
}
changeReaderGroup.Go(func() error {
return verifier.RunChangeEventPersistor(groupCtx, changeReader)
})
}
if err := verifier.startChangeHandling(ctx); err != nil {
return err
}

changeHandlingErr := verifier.changeHandlingErr
go func() {
changeHandlingErr.Set(changeReaderGroup.Wait())
}()

// Log the verification status when initially booting up so it's easy to see the current state
verificationStatus, err := verifier.GetVerificationStatus(ctx)
if err != nil {
Expand Down Expand Up @@ -409,6 +387,38 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
}
}

// startChangeHandling starts the goroutines that read changes
// from the source & destination and that persist those changes
// to the metadata.
//
// As part of this, it sets the change readers’ start timestamps.
// (It blocks until those are set.)
func (verifier *Verifier) startChangeHandling(ctx context.Context) error {
changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx)
for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) {
if changeReader.isRunning() {
verifier.logger.Debug().Msgf("Check: %s already running.", changeReader)
} else {
verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader)

err := changeReader.start(groupCtx, changeReaderGroup)
if err != nil {
return errors.Wrapf(err, "failed to start %s", changeReader)
}
changeReaderGroup.Go(func() error {
return verifier.RunChangeEventPersistor(groupCtx, changeReader)
})
}
}

changeHandlingErr := verifier.changeHandlingErr
go func() {
changeHandlingErr.Set(changeReaderGroup.Wait())
}()

return nil
}

func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error {
// We want to check all user collections on both source and dest.
srcNamespaces, err := ListAllUserNamespaces(ctx, verifier.logger, verifier.srcClient, verifier.metaDBName)
Expand Down Expand Up @@ -600,36 +610,18 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
}
}

func (verifier *Verifier) initializeChangeReaders() {
srcReader := &ChangeStreamReader{
ChangeReaderCommon: ChangeReaderCommon{
readerType: src,
namespaces: verifier.srcNamespaces,
watcherClient: verifier.srcClient,
clusterInfo: *verifier.srcClusterInfo,
},
}
verifier.srcChangeReader = srcReader

dstReader := &ChangeStreamReader{
ChangeReaderCommon: ChangeReaderCommon{
readerType: dst,
namespaces: verifier.dstNamespaces,
watcherClient: verifier.dstClient,
clusterInfo: *verifier.dstClusterInfo,
onDDLEvent: onDDLEventAllow,
},
}
verifier.dstChangeReader = dstReader

// Common elements in both readers:
for _, csr := range mslices.Of(srcReader, dstReader) {
csr.logger = verifier.logger
csr.metaDB = verifier.metaClient.Database(verifier.metaDBName)
csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize)
csr.writesOffTs = util.NewEventual[bson.Timestamp]()
csr.lag = msync.NewTypedAtomic(option.None[time.Duration]())
csr.batchSizeHistory = history.New[int](time.Minute)
csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken
}
func (v *Verifier) initializeChangeReaders() {
v.srcChangeReader = v.newChangeStreamReader(
v.srcNamespaces,
src,
v.srcClient,
*v.srcClusterInfo,
)

v.dstChangeReader = v.newChangeStreamReader(
v.dstNamespaces,
dst,
v.dstClient,
*v.dstClusterInfo,
)
}
Loading