Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.28.0
github.com/samber/lo v1.47.0
github.com/samber/mo v1.13.0
github.com/stretchr/testify v1.8.0
github.com/urfave/cli v1.22.9
go.mongodb.org/mongo-driver v1.17.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/samber/mo v1.13.0 h1:LB1OwfJMju3a6FjghH+AIvzMG0ZPOzgTWj1qaHs1IQ4=
github.com/samber/mo v1.13.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
5 changes: 3 additions & 2 deletions internal/util/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
// `ErrorCode` newtype, but that requires a more invasive change to everything
// that uses error codes.
const (
LockFailed int = 107
SampleTooManyDuplicates int = 28799
LockFailed = 107
SampleTooManyDuplicates = 28799
CursorKilled = 237
)

//
Expand Down
137 changes: 99 additions & 38 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"time"

"github.com/10gen/migration-verifier/internal/keystring"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/samber/mo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
Expand Down Expand Up @@ -63,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
for i, changeEvent := range batch {
if changeEvent.ClusterTime != nil &&
(verifier.lastChangeEventTime == nil ||
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
verifier.lastChangeEventTime.Before(*changeEvent.ClusterTime)) {
verifier.lastChangeEventTime = changeEvent.ClusterTime
}
switch changeEvent.OpType {
Expand Down Expand Up @@ -136,6 +139,7 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
// shouldn’t really happen anyway by definition.
func (verifier *Verifier) readAndHandleOneChangeEventBatch(
ctx context.Context,
ri *retry.Info,
cs *mongo.ChangeStream,
) error {
eventsRead := 0
Expand Down Expand Up @@ -163,6 +167,8 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
eventsRead++
}

ri.IterationSuccess()

if eventsRead == 0 {
return nil
}
Expand All @@ -175,9 +181,11 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
return nil
}

func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
defer cs.Close(ctx)

func (verifier *Verifier) iterateChangeStream(
ctx context.Context,
ri *retry.Info,
cs *mongo.ChangeStream,
) error {
var lastPersistedTime time.Time

persistResumeTokenIfNeeded := func() error {
Expand All @@ -201,10 +209,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha

// If the context is canceled, return immmediately.
case <-ctx.Done():
verifier.logger.Debug().
Err(ctx.Err()).
Msg("Change stream quitting.")
return
return ctx.Err()

// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
Expand All @@ -222,11 +227,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
var curTs primitive.Timestamp
curTs, err = extractTimestampFromResumeToken(cs.ResumeToken())
if err != nil {
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
break
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

if curTs.After(writesOffTs) {
// writesOffTs never refers to a real event,
// so we can stop once curTs >= writesOffTs.
if !curTs.Before(writesOffTs) {
verifier.logger.Debug().
Interface("currentTimestamp", curTs).
Interface("writesOffTimestamp", writesOffTs).
Expand All @@ -235,30 +241,22 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
break
}

err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs)

if err != nil {
break
return err
}
}

default:
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs)

if err == nil {
err = persistResumeTokenIfNeeded()
}
}

if err != nil && !errors.Is(err, context.Canceled) {
verifier.logger.Debug().
Err(err).
Msg("Sending change stream error.")

verifier.changeStreamErrChan <- err

if !gotwritesOffTimestamp {
break
if err != nil {
return err
}
}

Expand All @@ -284,10 +282,13 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
}

infoLog.Msg("Change stream is done.")

return nil
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
func (verifier *Verifier) createChangeStream(
ctx context.Context,
) (*mongo.ChangeStream, primitive.Timestamp, error) {
pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
Expand All @@ -299,7 +300,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
return errors.Wrap(err, "failed to load persisted change stream resume token")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
}

csStartLogEvent := verifier.logger.Info()
Expand All @@ -326,40 +327,100 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {

sess, err := verifier.srcClient.StartSession()
if err != nil {
return errors.Wrap(err, "failed to start session")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
}
sctx := mongo.NewSessionContext(ctx, sess)
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
if err != nil {
return errors.Wrap(err, "failed to open change stream")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
}

err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
if err != nil {
return err
return nil, primitive.Timestamp{}, err
}

csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
if err != nil {
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

// With sharded clusters the resume token might lead the cluster time
// by 1 increment. In that case we need the actual cluster time;
// otherwise we will get errors.
clusterTime, err := getClusterTimeFromSession(sess)
if err != nil {
return errors.Wrap(err, "failed to read cluster time from session")
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

if startTs.After(clusterTime) {
startTs = clusterTime
}

verifier.srcStartAtTs = &csTimestamp
if csTimestamp.After(clusterTime) {
verifier.srcStartAtTs = &clusterTime
return srcChangeStream, startTs, nil
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
// This channel holds the first change stream creation's result, whether
// success or failure. Rather than using a Result we could make separate
// Timestamp and error channels, but the single channel is cleaner since
// there's no chance of "nonsense" like both channels returning a payload.
initialCreateResultChan := make(chan mo.Result[primitive.Timestamp])

go func() {
retryer := retry.New(retry.DefaultDurationLimit)
retryer = retryer.WithErrorCodes(util.CursorKilled)

parentThreadWaiting := true

err := retryer.RunForTransientErrorsOnly(
ctx,
verifier.logger,
func(ri *retry.Info) error {
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
if err != nil {
if parentThreadWaiting {
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
return nil
}

return err
}

defer srcChangeStream.Close(ctx)

if parentThreadWaiting {
initialCreateResultChan <- mo.Ok(startTs)
close(initialCreateResultChan)
parentThreadWaiting = false
}

return verifier.iterateChangeStream(ctx, ri, srcChangeStream)
},
)

if err != nil {
// NB: This failure always happens after the initial change stream
// creation.
verifier.changeStreamErrChan <- err
close(verifier.changeStreamErrChan)
}
}()

result := <-initialCreateResultChan

startTs, err := result.Get()
if err != nil {
return err
}

verifier.srcStartAtTs = &startTs

verifier.mux.Lock()
verifier.changeStreamRunning = true
verifier.mux.Unlock()

go verifier.iterateChangeStream(ctx, srcChangeStream)

return nil
}

Expand Down
88 changes: 88 additions & 0 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

func TestChangeStreamFilter(t *testing.T) {
Expand Down Expand Up @@ -263,6 +265,92 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {

}

func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
ctx := suite.Context()

verifier := suite.BuildVerifier()

db := suite.srcMongoClient.Database(suite.DBNameForTest())
coll := db.Collection("mycoll")
suite.Require().NoError(
db.CreateCollection(ctx, coll.Name()),
)

// start verifier
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)

// wait for generation 0 to end
verifierRunner.AwaitGenerationEnd()

const mvName = "Migration Verifier"

// Kill verifier’s change stream.
cursor, err := suite.srcMongoClient.Database(
"admin",
options.Database().SetReadConcern(readconcern.Local()),
).Aggregate(
ctx,
mongo.Pipeline{
{
{"$currentOp", bson.D{
{"idleCursors", true},
}},
},
{
{"$match", bson.D{
{"clientMetadata.application.name", mvName},
{"command.collection", "$cmd.aggregate"},
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
bson.D{{"$type", "object"}},
},
}},
},
},
)
suite.Require().NoError(err)

var ops []bson.Raw
suite.Require().NoError(cursor.All(ctx, &ops))

for _, cursorRaw := range ops {
opId, err := cursorRaw.LookupErr("opid")
suite.Require().NoError(err, "should get opid from op")

suite.T().Logf("Killing change stream op %+v", opId)

suite.Require().NoError(
suite.srcMongoClient.Database("admin").RunCommand(
ctx,
bson.D{
{"killOp", 1},
{"op", opId},
},
).Err(),
)
}

_, err = coll.InsertOne(
ctx,
bson.D{{"_id", "after kill"}},
)
suite.Require().NoError(err)

suite.Require().NoError(verifier.WritesOff(ctx))

suite.Require().NoError(verifierRunner.Await())

failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
ctx,
verifier.verificationTaskCollection(),
verificationTaskVerifyDocuments,
verifier.generation,
)
suite.Require().NoError(err)

suite.Assert().Zero(incompleteTasks, "no incomplete tasks")
suite.Require().Len(failedTasks, 1, "expect one failed task")
}

func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
suite.testInsertsBeforeWritesOff(10_000)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
select {
case err := <-verifier.changeStreamErrChan:
cancel()
return err
return errors.Wrap(err, "change stream failed")
case <-ctx.Done():
cancel()
return nil
Expand Down
Loading