diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 9f90f9fa..5277ad7d 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -153,6 +153,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any if err != nil { return err } + + err = verifier.doInMetaTransaction( + ctx, + func(ctx context.Context, sCtx mongo.SessionContext) error { + return verifier.ResetInProgressTasks(sCtx) + }, + ) + if err != nil { + return errors.Wrap(err, "failed to reset any in-progress tasks") + } + verifier.logger.Debug().Msg("Starting Check") verifier.phase = Check diff --git a/internal/verifier/reset.go b/internal/verifier/reset.go new file mode 100644 index 00000000..b20267e5 --- /dev/null +++ b/internal/verifier/reset.go @@ -0,0 +1,161 @@ +package verifier + +import ( + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +var defaultTaskUpdate = bson.M{ + "$set": bson.M{"status": verificationTaskAdded}, + "$unset": bson.M{"begin_time": 1}, +} + +func (verifier *Verifier) ResetInProgressTasks(ctx mongo.SessionContext) error { + didReset, err := verifier.handleIncompletePrimary(ctx) + + if err == nil { + if didReset { + return nil + } + + err = verifier.resetCollectionTasksIfNeeded(ctx) + } + + if err == nil { + err = verifier.resetPartitionTasksIfNeeded(ctx) + } + + return err +} + +func (verifier *Verifier) handleIncompletePrimary(ctx mongo.SessionContext) (bool, error) { + taskColl := verifier.verificationTaskCollection() + + cursor, err := taskColl.Find( + ctx, + bson.M{ + "type": verificationTaskPrimary, + "status": bson.M{"$ne": verificationTaskCompleted}, + }, + ) + if err != nil { + return false, errors.Wrapf(err, "failed to fetch incomplete %#q task", verificationTaskPrimary) + } + + var incompletePrimaries []VerificationTask + err = cursor.All(ctx, &incompletePrimaries) + if err != nil { + return false, errors.Wrapf(err, "failed to read incomplete %#q task", verificationTaskPrimary) + } + + switch len(incompletePrimaries) { + case 0: + // Nothing to do. + case 1: + // Invariant: task status should be “added”. + if incompletePrimaries[0].Status != verificationTaskAdded { + verifier.logger.Panic(). + Interface("task", incompletePrimaries[0]). + Msg("Primary task status has invalid state.") + } + + verifier.logger.Info(). + Msg("Previous verifier run left primary task incomplete. Deleting non-primary tasks.") + + deleted, err := taskColl.DeleteMany( + ctx, + bson.M{ + "type": bson.M{ + "$ne": verificationTaskPrimary, + }, + }, + ) + if err != nil { + return false, errors.Wrapf(err, "failed to delete non-%#q tasks", verificationTaskPrimary) + } + + verifier.logger.Info(). + Int64("deletedTasksCount", deleted.DeletedCount). + Msg("Found and deleted non-primary tasks.") + + return true, nil + default: + verifier.logger.Panic(). + Interface("tasks", incompletePrimaries). + Msg("Found multiple incomplete primary tasks; there should only be 1.") + } + + return false, nil +} + +func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext) error { + taskColl := verifier.verificationTaskCollection() + + cursor, err := taskColl.Find( + ctx, + bson.M{ + "type": verificationTaskVerifyCollection, + "status": verificationTaskProcessing, + }, + ) + if err != nil { + return errors.Wrapf(err, "failed to find incomplete %#q tasks", verificationTaskVerifyCollection) + } + var incompleteCollTasks []VerificationTask + err = cursor.All(ctx, &incompleteCollTasks) + if err != nil { + return errors.Wrapf(err, "failed to read incomplete %#q tasks", verificationTaskVerifyCollection) + } + + if len(incompleteCollTasks) > 0 { + verifier.logger.Info(). + Int("count", len(incompleteCollTasks)). + Msg("Previous verifier run left collection-level verification task(s) pending. Resetting.") + } + + for _, task := range incompleteCollTasks { + _, err := taskColl.DeleteMany( + ctx, + bson.M{ + "type": verificationTaskVerifyDocuments, + "query_filter.namespace": task.QueryFilter.Namespace, + }, + ) + if err != nil { + return errors.Wrapf(err, "failed to delete namespace %#q's %#q tasks", task.QueryFilter.Namespace, verificationTaskVerifyDocuments) + } + + _, err = taskColl.UpdateOne( + ctx, + bson.M{ + "type": verificationTaskVerifyCollection, + "query_filter.namespace": task.QueryFilter.Namespace, + }, + defaultTaskUpdate, + ) + if err != nil { + return errors.Wrapf(err, "failed to reset namespace %#q's %#q task", task.QueryFilter.Namespace, verificationTaskVerifyCollection) + } + } + + return nil +} + +func (verifier *Verifier) resetPartitionTasksIfNeeded(ctx mongo.SessionContext) error { + taskColl := verifier.verificationTaskCollection() + + _, err := taskColl.UpdateMany( + ctx, + bson.M{ + "type": verificationTaskVerifyDocuments, + "status": verificationTaskProcessing, + }, + defaultTaskUpdate, + ) + if err != nil { + return errors.Wrapf(err, "failed to reset in-progress %#q tasks", verificationTaskVerifyDocuments) + } + + return nil +} diff --git a/internal/verifier/reset_test.go b/internal/verifier/reset_test.go new file mode 100644 index 00000000..f18fb461 --- /dev/null +++ b/internal/verifier/reset_test.go @@ -0,0 +1,188 @@ +package verifier + +import ( + "context" + "strings" + + "github.com/10gen/migration-verifier/internal/partitions" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (suite *MultiDataVersionTestSuite) TestResetPrimaryTask() { + verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + + created, err := verifier.CheckIsPrimary() + suite.Require().NoError(err) + suite.Require().True(created) + + _, err = verifier.InsertCollectionVerificationTask("foo.bar") + suite.Require().NoError(err) + + ctx := context.Background() + + err = verifier.doInMetaTransaction( + ctx, + func(_ context.Context, ctx mongo.SessionContext) error { + return verifier.ResetInProgressTasks(ctx) + }, + ) + suite.Require().NoError(err) + + tasksColl := verifier.verificationTaskCollection() + cursor, err := tasksColl.Find(ctx, bson.M{}) + suite.Require().NoError(err) + var taskDocs []bson.M + suite.Require().NoError(cursor.All(ctx, &taskDocs)) + + suite.Assert().Len(taskDocs, 1) +} + +func (suite *MultiDataVersionTestSuite) TestResetNonPrimaryTasks() { + ctx := context.Background() + + verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + + // Create a primary task, and set it to complete. + created, err := verifier.CheckIsPrimary() + suite.Require().NoError(err) + suite.Require().True(created) + + suite.Require().NoError(verifier.UpdatePrimaryTaskComplete()) + + ns1 := "foo.bar" + ns2 := "qux.quux" + + // Create a collection-verification task, and set it to processing. + collTask, err := verifier.InsertCollectionVerificationTask(ns1) + suite.Require().NoError(err) + + collTask.Status = verificationTaskProcessing + + suite.Require().NoError( + verifier.UpdateVerificationTask(collTask), + ) + + // Create three partition tasks with the same namespace as the + // collection-verification task (status=[added, processing, completed]), + // and another for a different namespace that’s completed. + for _, taskParts := range []struct { + Status verificationTaskStatus + Namespace string + }{ + {verificationTaskAdded, ns1}, + {verificationTaskProcessing, ns1}, + {verificationTaskCompleted, ns1}, + {verificationTaskAdded, ns2}, + {verificationTaskProcessing, ns2}, + {verificationTaskCompleted, ns2}, + } { + task, err := verifier.InsertPartitionVerificationTask( + &partitions.Partition{ + Ns: &partitions.Namespace{ + DB: strings.Split(taskParts.Namespace, ".")[0], + Coll: strings.Split(taskParts.Namespace, ".")[1], + }, + }, + nil, + taskParts.Namespace, + ) + suite.Require().NoError(err) + + task.Status = taskParts.Status + suite.Require().NoError( + verifier.UpdateVerificationTask(task), + ) + } + + // Reset tasks + err = verifier.doInMetaTransaction( + ctx, + func(_ context.Context, ctx mongo.SessionContext) error { + return verifier.ResetInProgressTasks(ctx) + }, + ) + suite.Require().NoError(err) + + // Contents should just be the primary task and + // the completed partition-level. + tasksColl := verifier.verificationTaskCollection() + cursor, err := tasksColl.Find( + ctx, + bson.M{}, + options.Find().SetSort(bson.D{ + {"type", 1}, + {"query_filter.namespace", 1}, + {"status", 1}, + }), + ) + suite.Require().NoError(err) + var taskDocs []VerificationTask + suite.Require().NoError(cursor.All(ctx, &taskDocs)) + + suite.Require().Len(taskDocs, 5) + + // The tasks that should remain are: + + // the primary (completed) + suite.Assert().Equal( + verificationTaskPrimary, + taskDocs[0].Type, + ) + + // the 2 ns2 partition tasks that weren’t completed (both “added”) + suite.Assert().Equal( + verificationTaskVerifyDocuments, + taskDocs[1].Type, + ) + suite.Assert().Equal( + verificationTaskAdded, + taskDocs[1].Status, + ) + suite.Assert().Equal( + ns2, + taskDocs[1].QueryFilter.Namespace, + ) + + suite.Assert().Equal( + verificationTaskVerifyDocuments, + taskDocs[2].Type, + ) + suite.Assert().Equal( + verificationTaskAdded, + taskDocs[2].Status, + ) + suite.Assert().Equal( + ns2, + taskDocs[2].QueryFilter.Namespace, + ) + + // the ns2 partition task that *was* completed + suite.Assert().Equal( + verificationTaskVerifyDocuments, + taskDocs[3].Type, + ) + suite.Assert().Equal( + verificationTaskCompleted, + taskDocs[3].Status, + ) + suite.Assert().Equal( + ns2, + taskDocs[3].QueryFilter.Namespace, + ) + + // ns1’s verify-collection task (added state) + suite.Assert().Equal( + verificationTaskVerifyCollection, + taskDocs[4].Type, + ) + suite.Assert().Equal( + verificationTaskAdded, + taskDocs[4].Status, + ) + suite.Assert().Equal( + ns1, + taskDocs[4].QueryFilter.Namespace, + ) +} diff --git a/internal/verifier/unit_test_util.go b/internal/verifier/unit_test_util.go index 5773cba2..93ecace3 100644 --- a/internal/verifier/unit_test_util.go +++ b/internal/verifier/unit_test_util.go @@ -91,7 +91,7 @@ func (suite *WithMongodsTestSuite) SetupSuite() { suite.Require().NoError(err) clientOpts = options.Client().ApplyURI("mongodb://localhost:" + suite.metaMongoInstance.port).SetAppName("Verifier Test Suite") suite.metaMongoClient, err = mongo.Connect(ctx, clientOpts) - suite.startReplSet() + suite.startReplSets() suite.Require().NoError(err) suite.initialDbNames = map[string]bool{} for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient, suite.metaMongoClient} { @@ -103,25 +103,28 @@ func (suite *WithMongodsTestSuite) SetupSuite() { } } -func (suite *WithMongodsTestSuite) startReplSet() { +func (suite *WithMongodsTestSuite) startReplSets() { ctx := context.Background() - clientOpts := options. - Client(). - ApplyURI("mongodb://localhost:" + suite.srcMongoInstance.port). - SetDirect(true). - SetAppName("Verifier Test Suite") - directClient, err := mongo.Connect(ctx, clientOpts) - suite.Require().NoError(err) - command := bson.M{ - "replSetInitiate": bson.M{ - "_id": replSet, - "members": bson.A{ - bson.M{"_id": 0, "host": "localhost:" + suite.srcMongoInstance.port}, + + for _, instance := range []MongoInstance{suite.srcMongoInstance, suite.metaMongoInstance} { + clientOpts := options. + Client(). + ApplyURI("mongodb://localhost:" + instance.port). + SetDirect(true). + SetAppName("Verifier Test Suite") + directClient, err := mongo.Connect(ctx, clientOpts) + suite.Require().NoError(err) + command := bson.M{ + "replSetInitiate": bson.M{ + "_id": replSet, + "members": bson.A{ + bson.M{"_id": 0, "host": "localhost:" + instance.port}, + }, }, - }, + } + err = directClient.Database("admin").RunCommand(ctx, command).Err() + suite.Require().NoError(err) } - err = directClient.Database("admin").RunCommand(ctx, command).Err() - suite.Require().NoError(err) } func (suite *WithMongodsTestSuite) TearDownSuite() { @@ -181,7 +184,7 @@ func startTestMongods(t *testing.T, srcMongoInstance *MongoInstance, dstMongoIns return err } - err = startOneMongod(t, metaMongoInstance) + err = startOneMongod(t, metaMongoInstance, "--replSet", replSet) if err != nil { return err } diff --git a/internal/verifier/util.go b/internal/verifier/util.go index a51fe84b..0ab905d7 100644 --- a/internal/verifier/util.go +++ b/internal/verifier/util.go @@ -1,6 +1,7 @@ package verifier import ( + "context" "fmt" "strings" @@ -89,3 +90,26 @@ type QueryFilter struct { Namespace string `bson:"namespace" json:"namespace"` To string `bson:"to,omitempty" json:"to,omitempty"` } + +func (verifier *Verifier) doInMetaTransaction( + ctx context.Context, + todo func(context.Context, mongo.SessionContext) error, +) error { + if mongo.SessionFromContext(ctx) != nil { + verifier.logger.Panic(). + Msg("Context indicates an active session when it should not.") + } + + session, err := verifier.metaClient.StartSession() + if err != nil { + return errors.Wrap(err, "failed to start metadata session") + } + + defer session.EndSession(ctx) + + _, err = session.WithTransaction(ctx, func(sctx mongo.SessionContext) (any, error) { + return nil, todo(ctx, sctx) + }) + + return err +}