Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
if err != nil {
return err
}

err = verifier.doInMetaTransaction(
ctx,
func(ctx context.Context, sCtx mongo.SessionContext) error {
return verifier.ResetInProgressTasks(sCtx)
},
)
if err != nil {
return errors.Wrap(err, "failed to reset any in-progress tasks")
}

verifier.logger.Debug().Msg("Starting Check")

verifier.phase = Check
Expand Down
161 changes: 161 additions & 0 deletions internal/verifier/reset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package verifier

import (
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

var defaultTaskUpdate = bson.M{
"$set": bson.M{"status": verificationTaskAdded},
"$unset": bson.M{"begin_time": 1},
}

func (verifier *Verifier) ResetInProgressTasks(ctx mongo.SessionContext) error {
didReset, err := verifier.handleIncompletePrimary(ctx)

if err == nil {
if didReset {
return nil
}

err = verifier.resetCollectionTasksIfNeeded(ctx)
}

if err == nil {
err = verifier.resetPartitionTasksIfNeeded(ctx)
}

return err
}

func (verifier *Verifier) handleIncompletePrimary(ctx mongo.SessionContext) (bool, error) {
taskColl := verifier.verificationTaskCollection()

cursor, err := taskColl.Find(
ctx,
bson.M{
"type": verificationTaskPrimary,
"status": bson.M{"$ne": verificationTaskCompleted},
},
)
if err != nil {
return false, errors.Wrapf(err, "failed to fetch incomplete %#q task", verificationTaskPrimary)
}

var incompletePrimaries []VerificationTask
err = cursor.All(ctx, &incompletePrimaries)
if err != nil {
return false, errors.Wrapf(err, "failed to read incomplete %#q task", verificationTaskPrimary)
}

switch len(incompletePrimaries) {
case 0:
// Nothing to do.
case 1:
// Invariant: task status should be “added”.
if incompletePrimaries[0].Status != verificationTaskAdded {
verifier.logger.Panic().
Interface("task", incompletePrimaries[0]).
Msg("Primary task status has invalid state.")
}

verifier.logger.Info().
Msg("Previous verifier run left primary task incomplete. Deleting non-primary tasks.")

deleted, err := taskColl.DeleteMany(
ctx,
bson.M{
"type": bson.M{
"$ne": verificationTaskPrimary,
},
},
)
if err != nil {
return false, errors.Wrapf(err, "failed to delete non-%#q tasks", verificationTaskPrimary)
}

verifier.logger.Info().
Int64("deletedTasksCount", deleted.DeletedCount).
Msg("Found and deleted non-primary tasks.")

return true, nil
default:
verifier.logger.Panic().
Interface("tasks", incompletePrimaries).
Msg("Found multiple incomplete primary tasks; there should only be 1.")
}

return false, nil
}

func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext) error {
taskColl := verifier.verificationTaskCollection()

cursor, err := taskColl.Find(
ctx,
bson.M{
"type": verificationTaskVerifyCollection,
"status": verificationTaskProcessing,
},
)
if err != nil {
return errors.Wrapf(err, "failed to find incomplete %#q tasks", verificationTaskVerifyCollection)
}
var incompleteCollTasks []VerificationTask
err = cursor.All(ctx, &incompleteCollTasks)
if err != nil {
return errors.Wrapf(err, "failed to read incomplete %#q tasks", verificationTaskVerifyCollection)
}

if len(incompleteCollTasks) > 0 {
verifier.logger.Info().
Int("count", len(incompleteCollTasks)).
Msg("Previous verifier run left collection-level verification task(s) pending. Resetting.")
}

for _, task := range incompleteCollTasks {
_, err := taskColl.DeleteMany(
ctx,
bson.M{
"type": verificationTaskVerifyDocuments,
"query_filter.namespace": task.QueryFilter.Namespace,
},
)
if err != nil {
return errors.Wrapf(err, "failed to delete namespace %#q's %#q tasks", task.QueryFilter.Namespace, verificationTaskVerifyDocuments)
}

_, err = taskColl.UpdateOne(
ctx,
bson.M{
"type": verificationTaskVerifyCollection,
"query_filter.namespace": task.QueryFilter.Namespace,
},
defaultTaskUpdate,
)
if err != nil {
return errors.Wrapf(err, "failed to reset namespace %#q's %#q task", task.QueryFilter.Namespace, verificationTaskVerifyCollection)
}
}

return nil
}

func (verifier *Verifier) resetPartitionTasksIfNeeded(ctx mongo.SessionContext) error {
taskColl := verifier.verificationTaskCollection()

_, err := taskColl.UpdateMany(
ctx,
bson.M{
"type": verificationTaskVerifyDocuments,
"status": verificationTaskProcessing,
},
defaultTaskUpdate,
)
if err != nil {
return errors.Wrapf(err, "failed to reset in-progress %#q tasks", verificationTaskVerifyDocuments)
}

return nil
}
188 changes: 188 additions & 0 deletions internal/verifier/reset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package verifier

import (
"context"
"strings"

"github.com/10gen/migration-verifier/internal/partitions"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func (suite *MultiDataVersionTestSuite) TestResetPrimaryTask() {
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)

created, err := verifier.CheckIsPrimary()
suite.Require().NoError(err)
suite.Require().True(created)

_, err = verifier.InsertCollectionVerificationTask("foo.bar")
suite.Require().NoError(err)

ctx := context.Background()

err = verifier.doInMetaTransaction(
ctx,
func(_ context.Context, ctx mongo.SessionContext) error {
return verifier.ResetInProgressTasks(ctx)
},
)
suite.Require().NoError(err)

tasksColl := verifier.verificationTaskCollection()
cursor, err := tasksColl.Find(ctx, bson.M{})
suite.Require().NoError(err)
var taskDocs []bson.M
suite.Require().NoError(cursor.All(ctx, &taskDocs))

suite.Assert().Len(taskDocs, 1)
}

func (suite *MultiDataVersionTestSuite) TestResetNonPrimaryTasks() {
ctx := context.Background()

verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)

// Create a primary task, and set it to complete.
created, err := verifier.CheckIsPrimary()
suite.Require().NoError(err)
suite.Require().True(created)

suite.Require().NoError(verifier.UpdatePrimaryTaskComplete())

ns1 := "foo.bar"
ns2 := "qux.quux"

// Create a collection-verification task, and set it to processing.
collTask, err := verifier.InsertCollectionVerificationTask(ns1)
suite.Require().NoError(err)

collTask.Status = verificationTaskProcessing

suite.Require().NoError(
verifier.UpdateVerificationTask(collTask),
)

// Create three partition tasks with the same namespace as the
// collection-verification task (status=[added, processing, completed]),
// and another for a different namespace that’s completed.
for _, taskParts := range []struct {
Status verificationTaskStatus
Namespace string
}{
{verificationTaskAdded, ns1},
{verificationTaskProcessing, ns1},
{verificationTaskCompleted, ns1},
{verificationTaskAdded, ns2},
{verificationTaskProcessing, ns2},
{verificationTaskCompleted, ns2},
} {
task, err := verifier.InsertPartitionVerificationTask(
&partitions.Partition{
Ns: &partitions.Namespace{
DB: strings.Split(taskParts.Namespace, ".")[0],
Coll: strings.Split(taskParts.Namespace, ".")[1],
},
},
nil,
taskParts.Namespace,
)
suite.Require().NoError(err)

task.Status = taskParts.Status
suite.Require().NoError(
verifier.UpdateVerificationTask(task),
)
}

// Reset tasks
err = verifier.doInMetaTransaction(
ctx,
func(_ context.Context, ctx mongo.SessionContext) error {
return verifier.ResetInProgressTasks(ctx)
},
)
suite.Require().NoError(err)

// Contents should just be the primary task and
// the completed partition-level.
tasksColl := verifier.verificationTaskCollection()
cursor, err := tasksColl.Find(
ctx,
bson.M{},
options.Find().SetSort(bson.D{
{"type", 1},
{"query_filter.namespace", 1},
{"status", 1},
}),
)
suite.Require().NoError(err)
var taskDocs []VerificationTask
suite.Require().NoError(cursor.All(ctx, &taskDocs))

suite.Require().Len(taskDocs, 5)

// The tasks that should remain are:

// the primary (completed)
suite.Assert().Equal(
verificationTaskPrimary,
taskDocs[0].Type,
)

// the 2 ns2 partition tasks that weren’t completed (both “added”)
suite.Assert().Equal(
verificationTaskVerifyDocuments,
taskDocs[1].Type,
)
suite.Assert().Equal(
verificationTaskAdded,
taskDocs[1].Status,
)
suite.Assert().Equal(
ns2,
taskDocs[1].QueryFilter.Namespace,
)

suite.Assert().Equal(
verificationTaskVerifyDocuments,
taskDocs[2].Type,
)
suite.Assert().Equal(
verificationTaskAdded,
taskDocs[2].Status,
)
suite.Assert().Equal(
ns2,
taskDocs[2].QueryFilter.Namespace,
)

// the ns2 partition task that *was* completed
suite.Assert().Equal(
verificationTaskVerifyDocuments,
taskDocs[3].Type,
)
suite.Assert().Equal(
verificationTaskCompleted,
taskDocs[3].Status,
)
suite.Assert().Equal(
ns2,
taskDocs[3].QueryFilter.Namespace,
)

// ns1’s verify-collection task (added state)
suite.Assert().Equal(
verificationTaskVerifyCollection,
taskDocs[4].Type,
)
suite.Assert().Equal(
verificationTaskAdded,
taskDocs[4].Status,
)
suite.Assert().Equal(
ns1,
taskDocs[4].QueryFilter.Namespace,
)
}
Loading
Loading