From 7c547be5d3b485c745edf3a6b2f189a838e5a46f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 10:26:20 -0500 Subject: [PATCH 01/21] detect sharding mismatches --- internal/partitions/partition.go | 2 +- internal/partitions/partition_test.go | 14 +-- internal/retry/retry.go | 18 ++- internal/util/buildinfo.go | 31 ----- internal/util/clusterinfo.go | 78 ++++++++++++ internal/util/sharding.go | 66 ++++++++++ internal/verifier/change_stream.go | 2 +- internal/verifier/change_stream_test.go | 2 +- internal/verifier/compare.go | 4 +- internal/verifier/migration_verifier.go | 158 +++++++++++++++++------- 10 files changed, 286 insertions(+), 89 deletions(-) delete mode 100644 internal/util/buildinfo.go create mode 100644 internal/util/clusterinfo.go create mode 100644 internal/util/sharding.go diff --git a/internal/partitions/partition.go b/internal/partitions/partition.go index 024458e8..443b80f3 100644 --- a/internal/partitions/partition.go +++ b/internal/partitions/partition.go @@ -137,7 +137,7 @@ func (p *Partition) FindCmd( // (e.g. use the partitions on the source to read the destination for verification) // If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used. // filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option. -func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D { +func (p *Partition) GetFindOptions(buildInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D { if p == nil { if len(filterAndPredicates) > 0 { return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}} diff --git a/internal/partitions/partition_test.go b/internal/partitions/partition_test.go index 53d41a0a..5f847ba4 100644 --- a/internal/partitions/partition_test.go +++ b/internal/partitions/partition_test.go @@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() { suite.Require().Equal(expectedFilter, filter) // 6.0 - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilter, filter) // 5.3.0.9 - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilter, filter) // 7.1.3.5 - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilter, filter) // 4.4 (int64) - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilterWithTypeBracketing, filter) // 4.4 - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilterWithTypeBracketing, filter) // 4.2 - findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilterWithTypeBracketing, filter) // No version array -- assume old, require type bracketing. - findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil) + findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil) filter = getFilterFromFindOptions(findOptions) suite.Require().Equal(expectedFilterWithTypeBracketing, filter) } diff --git a/internal/retry/retry.go b/internal/retry/retry.go index bb2a55aa..b083fa09 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -11,7 +11,12 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -// RunForUUIDAndTransientErrors retries f() for the CollectionUUIDMismatch error and for transient errors. +// RunForUUIDAndTransientErrors should never be used. Its presence here +// is a vestige of the retryer's origin in mongosync. Since migration-verifier +// forbids DDL operations, a namespace should be just as stable as its UUID; thus, +// this function is needless complexity. +// +// This function retries f() for the CollectionUUIDMismatch error and for transient errors. // This should be used to run a driver operation that optionally specifies the `collectionUUID` parameter // for a collection that may have been: // @@ -34,7 +39,7 @@ import ( // f() is provided with a collection name string, which is the one that should be used in the body // of f() when a collection name is needed. The initial value of this string is expectedCollName. // -// RunForUUIDAndTransientErrors always returns the collection's current name. It returns +// This function always returns the collection's current name. It returns // an error if the duration limit is reached, or if f() returns a non-transient error. func (r *Retryer) RunForUUIDAndTransientErrors( ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error, @@ -42,7 +47,12 @@ func (r *Retryer) RunForUUIDAndTransientErrors( return r.runRetryLoop(ctx, logger, expectedCollName, f, true, true) } -// RunForUUIDErrorOnly retries f() for the CollectionUUIDMismatch error only. This should primarily +// RunForUUIDErrorOnly should never be used. Its presence here +// is a vestige of the retryer's origin in mongosync. Since migration-verifier +// forbids DDL operations, a namespace should be just as stable as its UUID; thus, +// this function is needless complexity. +// +// This function retries f() for the CollectionUUIDMismatch error only. This should primarily // be used to wrap a transaction callback containing an operation that specifies the `collectionUUID` // parameter for a collection that may have been: // @@ -57,7 +67,7 @@ func (r *Retryer) RunForUUIDAndTransientErrors( // f() is provided with a collection name string, which is the one that should be used in the body // of f() where a collection name is needed. The initial value of this string is expectedCollName. // -// RunForUUIDErrorOnly returns the collection's current name in all cases. +// This function returns the collection's current name in all cases. func (r *Retryer) RunForUUIDErrorOnly( ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error, ) (string, error) { diff --git a/internal/util/buildinfo.go b/internal/util/buildinfo.go deleted file mode 100644 index 2338e7dc..00000000 --- a/internal/util/buildinfo.go +++ /dev/null @@ -1,31 +0,0 @@ -package util - -import ( - "context" - - "github.com/10gen/migration-verifier/mbson" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" -) - -type BuildInfo struct { - VersionArray []int -} - -func GetBuildInfo(ctx context.Context, client *mongo.Client) (BuildInfo, error) { - commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}}) - - rawResp, err := commandResult.Raw() - if err != nil { - return BuildInfo{}, errors.Wrap(err, "failed to fetch build info") - } - - bi := BuildInfo{} - _, err = mbson.RawLookup(rawResp, &bi.VersionArray, "versionArray") - if err != nil { - return BuildInfo{}, errors.Wrap(err, "failed to decode build info version array") - } - - return bi, nil -} diff --git a/internal/util/clusterinfo.go b/internal/util/clusterinfo.go new file mode 100644 index 00000000..980a374c --- /dev/null +++ b/internal/util/clusterinfo.go @@ -0,0 +1,78 @@ +package util + +import ( + "context" + + "github.com/10gen/migration-verifier/mbson" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +type ClusterTopology string + +type ClusterInfo struct { + VersionArray []int + Topology ClusterTopology +} + +const ( + TopologySharded ClusterTopology = "sharded" + TopologyReplset ClusterTopology = "replset" +) + +func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) { + va, err := getVersionArray(ctx, client) + if err != nil { + return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array") + } + + topology, err := getTopology(ctx, client) + if err != nil { + return ClusterInfo{}, errors.Wrap(err, "failed to determine topology") + } + + return ClusterInfo{ + VersionArray: va, + Topology: topology, + }, nil +} + +func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) { + commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}}) + + rawResp, err := commandResult.Raw() + if err != nil { + return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo") + } + + var va []int + _, err = mbson.RawLookup(rawResp, &va, "versionArray") + if err != nil { + return nil, errors.Wrap(err, "failed to decode build info version array") + } + + return va, nil +} + +func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) { + resp := client.Database("admin").RunCommand( + ctx, + bson.D{{"hello", 1}}, + ) + + hello := struct { + Msg string + }{} + + if err := resp.Decode(&hello); err != nil { + return "", errors.Wrapf( + err, + "failed to decode %#q response", + "hello", + ) + } + + return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil +} diff --git a/internal/util/sharding.go b/internal/util/sharding.go new file mode 100644 index 00000000..3fbf55b0 --- /dev/null +++ b/internal/util/sharding.go @@ -0,0 +1,66 @@ +package util + +import ( + "context" + + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +const ( + configDBName = "config" + collsCollName = "collections" +) + +// GetShardKey returns the collection's shard key, or an empty option +// if the collection is unsharded. +func GetShardKey( + ctx context.Context, + coll *mongo.Collection, +) (option.Option[bson.Raw], error) { + namespace := coll.Database().Name() + "." + coll.Name() + + configCollectionsColl := coll.Database().Client(). + Database(configDBName). + Collection(collsCollName) + + rawResult, err := configCollectionsColl.FindOne(ctx, bson.D{{"_id", namespace}}).Raw() + if errors.Is(err, mongo.ErrNoDocuments) { + return option.None[bson.Raw](), nil + } else if err != nil { + return option.None[bson.Raw](), errors.Wrapf( + err, + "failed to find sharding info for %#q", + namespace, + ) + } + + keyAsVal, err := rawResult.LookupErr("key") + if errors.Is(err, bsoncore.ErrElementNotFound) { + return option.None[bson.Raw](), nil + } else if err != nil { + return option.None[bson.Raw](), errors.Wrapf( + err, + "failed to find %#q in %#q's %#q entry", + "key", + namespace, + FullName(configCollectionsColl), + ) + } + + keyAsRaw, isDoc := keyAsVal.DocumentOK() + if !isDoc { + return option.None[bson.Raw](), errors.Errorf( + "%#q in %#q's %#q entry is of type %#q, not an object", + "key", + namespace, + FullName(configCollectionsColl), + keyAsVal.Type, + ) + } + + return option.Some(keyAsRaw), nil +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 483af491..fe1548d3 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream( SetMaxAwaitTime(1 * time.Second). SetFullDocument(options.UpdateLookup) - if verifier.srcBuildInfo.VersionArray[0] >= 6 { + if verifier.srcClusterInfo.VersionArray[0] >= 6 { opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true}) } diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index f4a6f42d..1b1aca0a 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) { func (suite *IntegrationTestSuite) TestCreateForbidden() { ctx := suite.Context() - buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient) + buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient) suite.Require().NoError(err) if buildInfo.VersionArray[0] < 6 { diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index d6a50324..911051ad 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -266,7 +266,7 @@ func (verifier *Verifier) getFetcherChannels( cursor, err := verifier.getDocumentsCursor( ctx, verifier.srcClientCollection(task), - verifier.srcBuildInfo, + verifier.srcClusterInfo, verifier.srcStartAtTs, task, ) @@ -290,7 +290,7 @@ func (verifier *Verifier) getFetcherChannels( cursor, err := verifier.getDocumentsCursor( ctx, verifier.dstClientCollection(task), - verifier.dstBuildInfo, + verifier.dstClusterInfo, nil, //startAtTs task, ) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 9b056a51..c0adbbd8 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -8,7 +8,6 @@ import ( "math/rand" _ "net/http/pprof" "os" - "sort" "strconv" "strings" "sync" @@ -35,7 +34,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" "go.mongodb.org/mongo-driver/mongo/writeconcern" - "golang.org/x/exp/maps" ) // ReadConcernSetting describes the verifier’s handling of read @@ -90,8 +88,8 @@ type Verifier struct { metaClient *mongo.Client srcClient *mongo.Client dstClient *mongo.Client - srcBuildInfo *util.BuildInfo - dstBuildInfo *util.BuildInfo + srcClusterInfo *util.ClusterInfo + dstClusterInfo *util.ClusterInfo numWorkers int failureDisplaySize int64 @@ -319,12 +317,12 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error { return errors.Wrapf(err, "failed to connect to source %#q", uri) } - buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient) + buildInfo, err := util.GetClusterInfo(ctx, verifier.srcClient) if err != nil { - return errors.Wrap(err, "failed to read source build info") + return errors.Wrap(err, "failed to read source cluster info") } - verifier.srcBuildInfo = &buildInfo + verifier.srcClusterInfo = &buildInfo return nil } @@ -336,12 +334,12 @@ func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error { return errors.Wrapf(err, "failed to connect to destination %#q", uri) } - buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient) + buildInfo, err := util.GetClusterInfo(ctx, verifier.dstClient) if err != nil { - return errors.Wrap(err, "failed to read destination build info") + return errors.Wrap(err, "failed to read destination cluster info") } - verifier.dstBuildInfo = &buildInfo + verifier.dstClusterInfo = &buildInfo return nil } @@ -473,7 +471,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A) return append(predicates, verifier.globalFilter) } -func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo, +func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.ClusterInfo, startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() @@ -720,42 +718,42 @@ func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uu } } -func (verifier *Verifier) getShardingInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) ([]string, error) { - uuid := namespaceAndUUID.UUID - namespace := namespaceAndUUID.DBName + "." + namespaceAndUUID.CollName - configCollectionsColl := verifier.srcClientDatabase(configDBName).Collection(collsCollName) - cursor, err := configCollectionsColl.Find(ctx, bson.D{{"uuid", uuid}}) +func (verifier *Verifier) getShardKeyFields( + ctx context.Context, + namespaceAndUUID *uuidutil.NamespaceAndUUID, +) ([]string, error) { + coll := verifier.srcClient.Database(namespaceAndUUID.DBName). + Collection(namespaceAndUUID.CollName) + + shardKeyOpt, err := util.GetShardKey(ctx, coll) if err != nil { - return nil, fmt.Errorf("Failed to read sharding info for %s: %v", namespace, err) + return nil, errors.Wrapf( + err, + "failed to fetch %#q's shard key", + FullName(coll), + ) } - defer cursor.Close(ctx) - collectionSharded := false - - shardKeys := []string{} - for cursor.Next(ctx) { - collectionSharded = true - var result struct { - Key bson.M - } - - if err = cursor.Decode(&result); err != nil { - return nil, fmt.Errorf("Failed to decode sharding info for %s: %v", namespace, err) - } - - verifier.logger.Debug().Msgf("Collection %s is sharded with shard key %v", namespace, result.Key) - - shardKeys = maps.Keys(result.Key) - sort.Strings(shardKeys) - } - if err = cursor.Err(); err != nil { - verifier.logger.Error().Msgf("Error reading sharding info for %s: %v", namespace, err) + shardKeyRaw, isSharded := shardKeyOpt.Get() + if !isSharded { + return []string{}, nil } - if collectionSharded { - verifier.logChunkInfo(ctx, namespaceAndUUID) + + els, err := shardKeyRaw.Elements() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to parse %#q's shard key", + FullName(coll), + ) } - return shardKeys, nil + return lo.Map( + els, + func(el bson.RawElement, _ int) string { + return el.Key() + }, + ), nil } // partitionAndInspectNamespace does a few preliminary tasks for the @@ -771,7 +769,7 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name if err != nil { return nil, nil, 0, 0, err } - shardKeys, err := verifier.getShardingInfo(ctx, namespaceAndUUID) + shardKeys, err := verifier.getShardKeyFields(ctx, namespaceAndUUID) if err != nil { return nil, nil, 0, 0, err } @@ -1033,6 +1031,62 @@ func getIndexesMap( return specsMap, nil } +func (verifier *Verifier) verifyShardingIfNeeded( + ctx context.Context, + srcColl, dstColl *mongo.Collection, +) ([]VerificationResult, error) { + + // If one cluster is sharded and the other is unsharded then there's + // nothing to do here. + if verifier.srcClusterInfo.Topology != verifier.dstClusterInfo.Topology { + return nil, nil + } + + srcShardOpt, err := util.GetShardKey(ctx, srcColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's shard key on source", + FullName(srcColl), + ) + } + + dstShardOpt, err := util.GetShardKey(ctx, dstColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's shard key on destination", + FullName(dstColl), + ) + } + + srcKey, srcIsSharded := srcShardOpt.Get() + dstKey, dstIsSharded := dstShardOpt.Get() + + if !srcIsSharded && !dstIsSharded { + return nil, nil + } + + if srcIsSharded != dstIsSharded { + return []VerificationResult{{ + Field: "Shard Key", + Cluster: lo.Ternary(srcIsSharded, ClusterTarget, ClusterSource), + Details: Missing, + NameSpace: FullName(srcColl), + }}, nil + } + + if !bytes.Equal(srcKey, dstKey) { + return []VerificationResult{{ + Field: "Shard Key", + Details: fmt.Sprintf("%s: src=%v; dst=%v", Mismatch, srcKey, dstKey), + NameSpace: FullName(srcColl), + }}, nil + } + + return nil, nil +} + func (verifier *Verifier) verifyIndexes( ctx context.Context, srcColl, dstColl *mongo.Collection, @@ -1221,6 +1275,26 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( task.Status = verificationTaskMetadataMismatch } + shardingProblems, err := verifier.verifyShardingIfNeeded(ctx, srcColl, dstColl) + if err != nil { + return errors.Wrapf( + err, + "failed to compare namespace %#q's sharding", + srcNs, + ) + } + if len(shardingProblems) > 0 { + // don't insert a failed collection unless we did not insert one above + if len(specificationProblems)+len(indexProblems) == 0 { + err = insertFailedCollection() + if err != nil { + return err + } + } + task.FailedDocs = append(task.FailedDocs, indexProblems...) + task.Status = verificationTaskMetadataMismatch + } + // We’ve confirmed that the collection metadata (including indices) // matches between soruce & destination. Now we can partition the collection. From 82effb4465e2a0b0f91d1a01bacf8a9078992da1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:19:00 -0500 Subject: [PATCH 02/21] save --- internal/util/askserver.go | 64 +++++ internal/verifier/check.go | 4 +- internal/verifier/integration_test_suite.go | 27 +- internal/verifier/metadata.go | 268 +++++++++++++++++ internal/verifier/metadata_test.go | 175 +++++++++++ internal/verifier/migration_verifier.go | 288 +------------------ internal/verifier/migration_verifier_test.go | 5 +- internal/verifier/summary.go | 10 +- mslices/slices.go | 11 + 9 files changed, 548 insertions(+), 304 deletions(-) create mode 100644 internal/util/askserver.go create mode 100644 internal/verifier/metadata.go create mode 100644 internal/verifier/metadata_test.go diff --git a/internal/util/askserver.go b/internal/util/askserver.go new file mode 100644 index 00000000..d2e3a278 --- /dev/null +++ b/internal/util/askserver.go @@ -0,0 +1,64 @@ +package util + +import ( + "context" + "fmt" + "slices" + + "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// ServerThinksTheseMatch runs an aggregation on the server that determines +// whether the server thinks a & b are equal. This allows you, e.g., to +// ignore BSON type differences for equivalent numbers. +func ServerThinksTheseMatch( + ctx context.Context, + client *mongo.Client, + a, b any, + tinker option.Option[mongo.Pipeline], +) (bool, error) { + fmt.Printf("----------- a: %+v\n\n", a) + fmt.Printf("----------- b: %+v\n\n", b) + + pipeline := mongo.Pipeline{ + {{"$documents", []bson.D{ + { + {"a", bson.D{{"$literal", a}}}, + {"b", bson.D{{"$literal", b}}}, + }, + }}}, + + // Now check to be sure that those specs match. + {{"$match", bson.D{ + {"$expr", bson.D{ + {"$eq", mslices.Of("$a", "$b")}, + }}, + }}}, + } + + if extra, hasExtra := tinker.Get(); hasExtra { + pipeline = slices.Insert( + pipeline, + 1, + extra..., + ) + } + + cursor, err := client.Database("admin").Aggregate(ctx, pipeline) + + if err == nil { + defer cursor.Close(ctx) + + if cursor.Next(ctx) { + return true, nil + } + + err = cursor.Err() + } + + return false, errors.Wrapf(err, "failed to ask server if a (%v) matches b (%v)", a, b) +} diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 425a90d2..b8a22ff5 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -20,7 +20,7 @@ const ( GenerationComplete GenerationStatus = "complete" ) -var failedStatus = mapset.NewSet( +var failedStatuses = mapset.NewSet( verificationTaskFailed, verificationTaskMetadataMismatch, ) @@ -406,7 +406,7 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, return FailedTasks, IncompleteTasks, err } for _, t := range allTasks { - if failedStatus.Contains(t.Status) { + if failedStatuses.Contains(t.Status) { FailedTasks = append(FailedTasks, t) } else if t.Status != verificationTaskCompleted { IncompleteTasks = append(IncompleteTasks, t) diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 33e3a336..67623b56 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -5,9 +5,9 @@ import ( "strings" "time" + "github.com/10gen/migration-verifier/internal/util" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" - "github.com/samber/lo" "github.com/stretchr/testify/suite" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -16,13 +16,8 @@ import ( "go.mongodb.org/mongo-driver/mongo/writeconcern" ) -type TestTopology string - const ( - metaDBName = "VERIFIER_TEST_META" - topologyEnvVar = "MVTEST_TOPOLOGY" - TopologyReplset TestTopology = "replset" - TopologySharded TestTopology = "sharded" + metaDBName = "VERIFIER_TEST_META" ) type IntegrationTestSuite struct { @@ -139,22 +134,14 @@ func (suite *IntegrationTestSuite) TearDownTest() { } } -func (suite *IntegrationTestSuite) GetSrcTopology() TestTopology { - hello := struct { - Msg string - }{} - - resp := suite.srcMongoClient.Database("admin").RunCommand( +func (suite *IntegrationTestSuite) GetSrcTopology() util.ClusterTopology { + clusterInfo, err := util.GetClusterInfo( suite.Context(), - bson.D{{"hello", 1}}, - ) - - suite.Require().NoError( - resp.Decode(&hello), - "should fetch & decode hello", + suite.srcMongoClient, ) + suite.Require().NoError(err, "should fetch src cluster info") - return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, "") + return clusterInfo.Topology } func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go new file mode 100644 index 00000000..03e5eb99 --- /dev/null +++ b/internal/verifier/metadata.go @@ -0,0 +1,268 @@ +package verifier + +import ( + "bytes" + "context" + "fmt" + + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// This is the Field for a VerificationResult for shard key mismatches. +const ShardKeyField = "Shard Key" + +// Returns a slice of VerificationResults with the differences, and a boolean indicating whether or +// not the collection data can be safely verified. +func (verifier *Verifier) compareCollectionSpecifications( + srcNs, dstNs string, + srcSpecOpt, dstSpecOpt option.Option[util.CollectionSpec], +) ([]VerificationResult, bool, error) { + srcSpec, hasSrcSpec := srcSpecOpt.Get() + dstSpec, hasDstSpec := dstSpecOpt.Get() + + if !hasSrcSpec { + return []VerificationResult{{ + NameSpace: srcNs, + Cluster: ClusterSource, + Details: Missing}}, false, nil + } + if !hasDstSpec { + return []VerificationResult{{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Details: Missing}}, false, nil + } + if srcSpec.Type != dstSpec.Type { + return []VerificationResult{{ + NameSpace: srcNs, + Cluster: ClusterTarget, + Field: "Type", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false, nil + // If the types differ, the rest is not important. + } + var results []VerificationResult + if srcSpec.Info.ReadOnly != dstSpec.Info.ReadOnly { + results = append(results, VerificationResult{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Field: "ReadOnly", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Info.ReadOnly, dstSpec.Info.ReadOnly)}) + } + if !bytes.Equal(srcSpec.Options, dstSpec.Options) { + mismatchDetails, err := BsonUnorderedCompareRawDocumentWithDetails(srcSpec.Options, dstSpec.Options) + if err != nil { + return nil, false, errors.Wrapf( + err, + "failed to compare namespace %#q's specifications", + srcNs, + ) + } + if mismatchDetails == nil { + results = append(results, VerificationResult{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Field: "Options (Field Order Only)", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options)}) + } else { + results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, nil /* id */, "Options.")...) + } + } + + // Don't compare view data; they have no data of their own. + canCompareData := srcSpec.Type != "view" + // Do not compare data between capped and uncapped collections because the partitioning is different. + canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) + + return results, canCompareData, nil +} + +func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec, dstSpec bson.Raw) (bool, error) { + // If the byte buffers match, then we’re done. + if bytes.Equal(srcSpec, dstSpec) { + return true, nil + } + + var fieldsToRemove = []string{ + // v4.4 stopped adding “ns” to index fields. + "ns", + + // v4.2+ ignores this field. + "background", + } + + return util.ServerThinksTheseMatch( + ctx, + verifier.metaClient, + srcSpec, + dstSpec, + option.Some(mongo.Pipeline{ + {{"$unset", lo.Reduce( + fieldsToRemove, + func(cur []string, field string, _ int) []string { + return append(cur, "a."+field, "b."+field) + }, + []string{}, + )}}, + }), + ) +} + +func (verifier *Verifier) verifyShardingIfNeeded( + ctx context.Context, + srcColl, dstColl *mongo.Collection, +) ([]VerificationResult, error) { + + // If one cluster is sharded and the other is unsharded then there's + // nothing to do here. + if verifier.srcClusterInfo.Topology != verifier.dstClusterInfo.Topology { + return nil, nil + } + + srcShardOpt, err := util.GetShardKey(ctx, srcColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's shard key on source", + FullName(srcColl), + ) + } + + dstShardOpt, err := util.GetShardKey(ctx, dstColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's shard key on destination", + FullName(dstColl), + ) + } + + srcKey, srcIsSharded := srcShardOpt.Get() + dstKey, dstIsSharded := dstShardOpt.Get() + + if !srcIsSharded && !dstIsSharded { + return nil, nil + } + + if srcIsSharded != dstIsSharded { + return []VerificationResult{{ + Field: ShardKeyField, + Cluster: lo.Ternary(srcIsSharded, ClusterTarget, ClusterSource), + Details: Missing, + NameSpace: FullName(srcColl), + }}, nil + } + + if bytes.Equal(srcKey, dstKey) { + return nil, nil + } + + areEqual, err := util.ServerThinksTheseMatch( + ctx, + verifier.metaClient, + srcKey, dstKey, + option.None[mongo.Pipeline](), + ) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to ask server if shard keys (src %v; dst: %v) match", + srcKey, + dstKey, + ) + } + + if !areEqual { + return []VerificationResult{{ + Field: ShardKeyField, + Details: fmt.Sprintf("%s: src=%v; dst=%v", Mismatch, srcKey, dstKey), + NameSpace: FullName(srcColl), + }}, nil + } + + return nil, nil +} + +func (verifier *Verifier) verifyIndexes( + ctx context.Context, + srcColl, dstColl *mongo.Collection, + srcIdIndexSpec, dstIdIndexSpec bson.Raw, +) ([]VerificationResult, error) { + + srcMap, err := getIndexesMap(ctx, srcColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's indexes on source", + FullName(srcColl), + ) + } + + if srcIdIndexSpec != nil { + srcMap["_id"] = srcIdIndexSpec + } + + dstMap, err := getIndexesMap(ctx, dstColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's indexes on destination", + FullName(dstColl), + ) + } + + if dstIdIndexSpec != nil { + dstMap["_id"] = dstIdIndexSpec + } + + var results []VerificationResult + srcMapUsed := map[string]bool{} + + for indexName, dstSpec := range dstMap { + srcSpec, exists := srcMap[indexName] + if exists { + srcMapUsed[indexName] = true + theyMatch, err := verifier.doIndexSpecsMatch(ctx, srcSpec, dstSpec) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to check whether %#q's source & desstination %#q indexes match", + FullName(srcColl), + indexName, + ) + } + + if !theyMatch { + results = append(results, VerificationResult{ + NameSpace: FullName(dstColl), + Cluster: ClusterTarget, + ID: indexName, + Details: Mismatch + fmt.Sprintf(": src: %v, dst: %v", srcSpec, dstSpec), + }) + } + } else { + results = append(results, VerificationResult{ + ID: indexName, + Details: Missing, + Cluster: ClusterSource, + NameSpace: FullName(srcColl), + }) + } + } + + // Find any index specs which existed in the source cluster but not the target cluster. + for indexName := range srcMap { + if !srcMapUsed[indexName] { + results = append(results, VerificationResult{ + ID: indexName, + Details: Missing, + Cluster: ClusterTarget, + NameSpace: FullName(dstColl)}) + } + } + return results, nil +} diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go new file mode 100644 index 00000000..e5f8fdac --- /dev/null +++ b/internal/verifier/metadata_test.go @@ -0,0 +1,175 @@ +package verifier + +import ( + "fmt" + + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mslices" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +func (suite *IntegrationTestSuite) TestShardingMismatch() { + ctx := suite.Context() + srcInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient) + suite.Require().NoError(err, "should fetch src cluster info") + + dstInfo, err := util.GetClusterInfo(ctx, suite.dstMongoClient) + suite.Require().NoError(err, "should fetch dst cluster info") + + dbname := suite.DBNameForTest() + + shardCollection := func(client *mongo.Client, collName string, key bson.D, label string) { + suite.Require().NoError( + client.Database("admin").RunCommand( + ctx, + bson.D{ + {"shardCollection", dbname + "." + collName}, + {"key", key}, + }, + ).Err(), + fmt.Sprintf("%s: should create %#q", label, dbname+"."+collName), + ) + } + + collections := mslices.Of( + "idonly", + "numtype", + "id_and_foo", + "sharded_dst", + ) + + for _, collName := range collections { + for c, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { + suite.Require().NoError( + client.Database(dbname).CreateCollection( + ctx, + collName, + ), + "should create %#q on "+lo.Ternary(c == 0, "source", "destinatinon"), + collName, + ) + } + } + + if srcInfo.Topology == util.TopologySharded { + shardCollection( + suite.srcMongoClient, + "idonly", + bson.D{{"_id", 1}}, + "src", + ) + shardCollection( + suite.srcMongoClient, + "numtype", + bson.D{{"foo", 1}}, + "src", + ) + shardCollection( + suite.srcMongoClient, + "id_and_foo", + bson.D{{"_id", "hashed"}, {"foo", 1}}, + "src", + ) + } + + if dstInfo.Topology == util.TopologySharded { + shardCollection( + suite.dstMongoClient, + "idonly", + bson.D{{"_id", 1}}, + "dst", + ) + shardCollection( + suite.dstMongoClient, + "numtype", + bson.D{{"foo", float64(1)}}, + "dst", + ) + shardCollection( + suite.dstMongoClient, + "id_and_foo", + bson.D{{"foo", 1}, {"_id", "hashed"}}, + "dst", + ) + shardCollection( + suite.dstMongoClient, + "sharded_dst", + bson.D{{"foo", 1}, {"_id", "hashed"}}, + "dst", + ) + } + + verifier := suite.BuildVerifier() + + namespaces := lo.Map( + collections, + func(collName string, _ int) string { + return dbname + "." + collName + }, + ) + verifier.SetSrcNamespaces(namespaces) + verifier.SetDstNamespaces(namespaces) + verifier.SetNamespaceMap() + + runner := RunVerifierCheck(ctx, suite.T(), verifier) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + cursor, err := verifier.verificationTaskCollection().Find( + ctx, + bson.M{ + "generation": 0, + "type": verificationTaskVerifyCollection, + }, + ) + suite.Require().NoError(err) + + var tasks []VerificationTask + suite.Require().NoError(cursor.All(ctx, &tasks)) + + suite.Require().Len(tasks, len(collections)) + + if srcInfo.Topology != dstInfo.Topology { + for _, task := range tasks { + suite.Assert().Equal( + verificationTaskCompleted, + task.Status, + "mismatched topologies, so task should have succeeded: %v", task, + ) + } + } else { + taskMap := mslices.ToMap( + tasks, + func(task VerificationTask) string { + return task.QueryFilter.Namespace + }, + ) + + suite.Assert().Equal( + verificationTaskCompleted, + taskMap[dbname+".idonly"].Status, + "full match", + ) + + suite.Assert().Equal( + verificationTaskCompleted, + taskMap[dbname+".numtype"].Status, + "number type differences are ignored", + ) + + suite.Assert().Equal( + verificationTaskMetadataMismatch, + taskMap[dbname+".id_and_foo"].Status, + "catch field order difference", + ) + + suite.Assert().Equal( + verificationTaskMetadataMismatch, + taskMap[dbname+".sharded_dst"].Status, + "catch dst-only sharded", + ) + + suite.T().Logf("tasks: %+v", tasks) + } +} diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index c0adbbd8..1263fe7d 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -22,8 +22,6 @@ import ( "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/internal/uuidutil" "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/option" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" "github.com/samber/lo" @@ -819,137 +817,6 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name return partitionList, shardKeys, srcDocs, srcBytes, nil } -// Returns a slice of VerificationResults with the differences, and a boolean indicating whether or -// not the collection data can be safely verified. -func (verifier *Verifier) compareCollectionSpecifications( - srcNs, dstNs string, - srcSpecOpt, dstSpecOpt option.Option[util.CollectionSpec], -) ([]VerificationResult, bool, error) { - srcSpec, hasSrcSpec := srcSpecOpt.Get() - dstSpec, hasDstSpec := dstSpecOpt.Get() - - if !hasSrcSpec { - return []VerificationResult{{ - NameSpace: srcNs, - Cluster: ClusterSource, - Details: Missing}}, false, nil - } - if !hasDstSpec { - return []VerificationResult{{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Details: Missing}}, false, nil - } - if srcSpec.Type != dstSpec.Type { - return []VerificationResult{{ - NameSpace: srcNs, - Cluster: ClusterTarget, - Field: "Type", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false, nil - // If the types differ, the rest is not important. - } - var results []VerificationResult - if srcSpec.Info.ReadOnly != dstSpec.Info.ReadOnly { - results = append(results, VerificationResult{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Field: "ReadOnly", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Info.ReadOnly, dstSpec.Info.ReadOnly)}) - } - if !bytes.Equal(srcSpec.Options, dstSpec.Options) { - mismatchDetails, err := BsonUnorderedCompareRawDocumentWithDetails(srcSpec.Options, dstSpec.Options) - if err != nil { - return nil, false, errors.Wrapf( - err, - "failed to compare namespace %#q's specifications", - srcNs, - ) - } - if mismatchDetails == nil { - results = append(results, VerificationResult{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Field: "Options (Field Order Only)", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options)}) - } else { - results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, nil /* id */, "Options.")...) - } - } - - // Don't compare view data; they have no data of their own. - canCompareData := srcSpec.Type != "view" - // Do not compare data between capped and uncapped collections because the partitioning is different. - canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) - - return results, canCompareData, nil -} - -func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec bson.Raw, dstSpec bson.Raw) (bool, error) { - // If the byte buffers match, then we’re done. - if bytes.Equal(srcSpec, dstSpec) { - return true, nil - } - - var fieldsToRemove = []string{ - // v4.4 stopped adding “ns” to index fields. - "ns", - - // v4.2+ ignores this field. - "background", - } - - // Next check to see if the only differences are type differences. - // (We can safely use $documents here since this is against the metadata - // cluster, which we can require to be v5+.) - db := verifier.metaClient.Database(verifier.metaDBName) - cursor, err := db.Aggregate( - ctx, - mongo.Pipeline{ - {{"$documents", []bson.D{ - { - {"spec", bson.D{{"$literal", srcSpec}}}, - {"dstSpec", bson.D{{"$literal", dstSpec}}}, - }, - }}}, - - {{"$unset", lo.Reduce( - fieldsToRemove, - func(cur []string, field string, _ int) []string { - return append(cur, "spec."+field, "dstSpec."+field) - }, - []string{}, - )}}, - - // Now check to be sure that those specs match. - {{"$match", bson.D{ - {"$expr", bson.D{ - {"$eq", mslices.Of("$spec", "$dstSpec")}, - }}, - }}}, - }, - ) - - if err != nil { - return false, errors.Wrap(err, "failed to check index specification match in metadata") - } - var docs []bson.Raw - err = cursor.All(ctx, &docs) - if err != nil { - return false, errors.Wrap(err, "failed to parse index specification match’s result") - } - - count := len(docs) - - switch count { - case 0: - return false, nil - case 1: - return true, nil - } - - return false, errors.Errorf("weirdly received %d matching index docs (should be 0 or 1)", count) -} - func (verifier *Verifier) ProcessCollectionVerificationTask( ctx context.Context, workerNum int, @@ -1007,14 +874,6 @@ func getIndexesMap( var name string has, err := mbson.RawLookup(spec, &name, "name") - if !has { - return nil, errors.Errorf( - "%#q has an unnamed index (%+v)", - FullName(coll), - spec, - ) - } - if err != nil { return nil, errors.Wrapf( err, @@ -1025,148 +884,20 @@ func getIndexesMap( ) } + if !has { + return nil, errors.Errorf( + "%#q has an unnamed index (%+v)", + FullName(coll), + spec, + ) + } + specsMap[name] = spec } return specsMap, nil } -func (verifier *Verifier) verifyShardingIfNeeded( - ctx context.Context, - srcColl, dstColl *mongo.Collection, -) ([]VerificationResult, error) { - - // If one cluster is sharded and the other is unsharded then there's - // nothing to do here. - if verifier.srcClusterInfo.Topology != verifier.dstClusterInfo.Topology { - return nil, nil - } - - srcShardOpt, err := util.GetShardKey(ctx, srcColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's shard key on source", - FullName(srcColl), - ) - } - - dstShardOpt, err := util.GetShardKey(ctx, dstColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's shard key on destination", - FullName(dstColl), - ) - } - - srcKey, srcIsSharded := srcShardOpt.Get() - dstKey, dstIsSharded := dstShardOpt.Get() - - if !srcIsSharded && !dstIsSharded { - return nil, nil - } - - if srcIsSharded != dstIsSharded { - return []VerificationResult{{ - Field: "Shard Key", - Cluster: lo.Ternary(srcIsSharded, ClusterTarget, ClusterSource), - Details: Missing, - NameSpace: FullName(srcColl), - }}, nil - } - - if !bytes.Equal(srcKey, dstKey) { - return []VerificationResult{{ - Field: "Shard Key", - Details: fmt.Sprintf("%s: src=%v; dst=%v", Mismatch, srcKey, dstKey), - NameSpace: FullName(srcColl), - }}, nil - } - - return nil, nil -} - -func (verifier *Verifier) verifyIndexes( - ctx context.Context, - srcColl, dstColl *mongo.Collection, - srcIdIndexSpec, dstIdIndexSpec bson.Raw, -) ([]VerificationResult, error) { - - srcMap, err := getIndexesMap(ctx, srcColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's indexes on source", - FullName(srcColl), - ) - } - - if srcIdIndexSpec != nil { - srcMap["_id"] = srcIdIndexSpec - } - - dstMap, err := getIndexesMap(ctx, dstColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's indexes on destination", - FullName(dstColl), - ) - } - - if dstIdIndexSpec != nil { - dstMap["_id"] = dstIdIndexSpec - } - - var results []VerificationResult - srcMapUsed := map[string]bool{} - - for indexName, dstSpec := range dstMap { - srcSpec, exists := srcMap[indexName] - if exists { - srcMapUsed[indexName] = true - theyMatch, err := verifier.doIndexSpecsMatch(ctx, srcSpec, dstSpec) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to check whether %#q's source & desstination %#q indexes match", - FullName(srcColl), - indexName, - ) - } - - if !theyMatch { - results = append(results, VerificationResult{ - NameSpace: FullName(dstColl), - Cluster: ClusterTarget, - ID: indexName, - Details: Mismatch + fmt.Sprintf(": src: %v, dst: %v", srcSpec, dstSpec), - }) - } - } else { - results = append(results, VerificationResult{ - ID: indexName, - Details: Missing, - Cluster: ClusterSource, - NameSpace: FullName(srcColl), - }) - } - } - - // Find any index specs which existed in the source cluster but not the target cluster. - for indexName := range srcMap { - if !srcMapUsed[indexName] { - results = append(results, VerificationResult{ - ID: indexName, - Details: Missing, - Cluster: ClusterTarget, - NameSpace: FullName(dstColl)}) - } - } - return results, nil -} - func (verifier *Verifier) verifyMetadataAndPartitionCollection( ctx context.Context, workerNum int, @@ -1291,7 +1022,8 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( return err } } - task.FailedDocs = append(task.FailedDocs, indexProblems...) + + task.FailedDocs = append(task.FailedDocs, shardingProblems...) task.Status = verificationTaskMetadataMismatch } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index a8fae37d..b7f5fd2e 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -18,6 +18,7 @@ import ( "github.com/10gen/migration-verifier/internal/partitions" "github.com/10gen/migration-verifier/internal/testutil" + "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" "github.com/cespare/permute/v2" "github.com/rs/zerolog" @@ -1182,7 +1183,7 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { err = suite.dstMongoClient.Database("testDb4").CreateCollection(ctx, "testColl6") suite.Require().NoError(err) - if suite.GetSrcTopology() != TopologySharded { + if suite.GetSrcTopology() != util.TopologySharded { err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "testColl7") suite.Require().NoError(err) } @@ -1223,7 +1224,7 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { suite.ElementsMatch([]string{"testDb1.testColl1", "testDb1.testColl2", "testDb1.testView1"}, verifier.dstNamespaces) // Collections in admin, config, and local should not be found - if suite.GetSrcTopology() != TopologySharded { + if suite.GetSrcTopology() != util.TopologySharded { err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc") suite.Require().NoError(err) err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest") diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 4c86ae67..ce27760b 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -40,11 +40,17 @@ func (verifier *Verifier) reportCollectionMetadataMismatches(ctx context.Context if len(failedTasks) != 0 { table := tablewriter.NewWriter(strBuilder) - table.SetHeader([]string{"Index", "Cluster", "Type", "Field", "Namespace", "Details"}) + table.SetHeader([]string{"Index", "Cluster", "Field", "Namespace", "Details"}) for _, v := range failedTasks { for _, f := range v.FailedDocs { - table.Append([]string{fmt.Sprintf("%v", f.ID), fmt.Sprintf("%v", f.Cluster), fmt.Sprintf("%v", f.Field), fmt.Sprintf("%v", f.NameSpace), fmt.Sprintf("%v", f.Details)}) + table.Append([]string{ + fmt.Sprintf("%v", f.ID), + fmt.Sprintf("%v", f.Cluster), + fmt.Sprintf("%v", f.Field), + fmt.Sprintf("%v", f.NameSpace), + fmt.Sprintf("%v", f.Details), + }) } } strBuilder.WriteString("\nCollections/Indexes in failed or retry status:\n") diff --git a/mslices/slices.go b/mslices/slices.go index f9f0a25a..1ea106ce 100644 --- a/mslices/slices.go +++ b/mslices/slices.go @@ -9,3 +9,14 @@ package mslices func Of[T any](pieces ...T) []T { return append([]T{}, pieces...) } + +// ToMap outputs a map that “indexes” the given slice. +func ToMap[S ~[]E, E any, K comparable](s S, cb func(el E) K) map[K]E { + theMap := make(map[K]E, len(s)) + + for _, el := range s { + theMap[cb(el)] = el + } + + return theMap +} From cc5495cdd49bacc3cc02f8665b897eb95451f080 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:21:32 -0500 Subject: [PATCH 03/21] test rs-to-sc --- .github/workflows/all.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index 458e6abd..47b9de00 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -41,8 +41,14 @@ jobs: srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 + - name: replset-to-sharded + dstArgs: --sharded 2 + srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 + dstConnStr: mongodb://localhost:27030 + - name: sharded - args: --sharded 2 + srcArgs: --sharded 2 + dstArgs: --sharded 2 srcConnStr: mongodb://localhost:27020 dstConnStr: mongodb://localhost:27030 @@ -82,8 +88,8 @@ jobs: - name: Start clusters run: |- { - echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }} - echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }} + echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.srcArgs }} + echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.dstArgs }} echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1 } | parallel From 2219b6f7a761f900a96f4010f8dec86bc8b4c56a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:23:15 -0500 Subject: [PATCH 04/21] keep logging chunk info --- internal/verifier/migration_verifier.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 1263fe7d..8ad8bd72 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -62,9 +62,6 @@ const ( // is unworkable. ReadConcernIgnore ReadConcernSetting = "ignore" - configDBName = "config" - collsCollName = "collections" - DefaultFailureDisplaySize = 20 okSymbol = "\u2705" // white heavy check mark @@ -737,6 +734,8 @@ func (verifier *Verifier) getShardKeyFields( return []string{}, nil } + verifier.logChunkInfo(ctx, namespaceAndUUID) + els, err := shardKeyRaw.Elements() if err != nil { return nil, errors.Wrapf( From 5460ff039e776e43f4acb459e030323c17000399 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:27:04 -0500 Subject: [PATCH 05/21] dic --- internal/util/askserver.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/util/askserver.go b/internal/util/askserver.go index d2e3a278..72e3cd40 100644 --- a/internal/util/askserver.go +++ b/internal/util/askserver.go @@ -2,7 +2,6 @@ package util import ( "context" - "fmt" "slices" "github.com/10gen/migration-verifier/mslices" @@ -15,15 +14,15 @@ import ( // ServerThinksTheseMatch runs an aggregation on the server that determines // whether the server thinks a & b are equal. This allows you, e.g., to // ignore BSON type differences for equivalent numbers. +// +// tinker is an optional pipeline that operates on the documents in the +// pipeline (`a` and `b`, respectively) before they're compared. func ServerThinksTheseMatch( ctx context.Context, client *mongo.Client, a, b any, tinker option.Option[mongo.Pipeline], ) (bool, error) { - fmt.Printf("----------- a: %+v\n\n", a) - fmt.Printf("----------- b: %+v\n\n", b) - pipeline := mongo.Pipeline{ {{"$documents", []bson.D{ { From 1272f199bdb485a30bf12a9fc3c89c83e014d214 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:33:40 -0500 Subject: [PATCH 06/21] undo moved code --- internal/verifier/metadata.go | 177 ------------------------ internal/verifier/migration_verifier.go | 177 ++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 177 deletions(-) diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 03e5eb99..8ab66aee 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -9,109 +9,12 @@ import ( "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) // This is the Field for a VerificationResult for shard key mismatches. const ShardKeyField = "Shard Key" -// Returns a slice of VerificationResults with the differences, and a boolean indicating whether or -// not the collection data can be safely verified. -func (verifier *Verifier) compareCollectionSpecifications( - srcNs, dstNs string, - srcSpecOpt, dstSpecOpt option.Option[util.CollectionSpec], -) ([]VerificationResult, bool, error) { - srcSpec, hasSrcSpec := srcSpecOpt.Get() - dstSpec, hasDstSpec := dstSpecOpt.Get() - - if !hasSrcSpec { - return []VerificationResult{{ - NameSpace: srcNs, - Cluster: ClusterSource, - Details: Missing}}, false, nil - } - if !hasDstSpec { - return []VerificationResult{{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Details: Missing}}, false, nil - } - if srcSpec.Type != dstSpec.Type { - return []VerificationResult{{ - NameSpace: srcNs, - Cluster: ClusterTarget, - Field: "Type", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false, nil - // If the types differ, the rest is not important. - } - var results []VerificationResult - if srcSpec.Info.ReadOnly != dstSpec.Info.ReadOnly { - results = append(results, VerificationResult{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Field: "ReadOnly", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Info.ReadOnly, dstSpec.Info.ReadOnly)}) - } - if !bytes.Equal(srcSpec.Options, dstSpec.Options) { - mismatchDetails, err := BsonUnorderedCompareRawDocumentWithDetails(srcSpec.Options, dstSpec.Options) - if err != nil { - return nil, false, errors.Wrapf( - err, - "failed to compare namespace %#q's specifications", - srcNs, - ) - } - if mismatchDetails == nil { - results = append(results, VerificationResult{ - NameSpace: dstNs, - Cluster: ClusterTarget, - Field: "Options (Field Order Only)", - Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options)}) - } else { - results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, nil /* id */, "Options.")...) - } - } - - // Don't compare view data; they have no data of their own. - canCompareData := srcSpec.Type != "view" - // Do not compare data between capped and uncapped collections because the partitioning is different. - canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) - - return results, canCompareData, nil -} - -func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec, dstSpec bson.Raw) (bool, error) { - // If the byte buffers match, then we’re done. - if bytes.Equal(srcSpec, dstSpec) { - return true, nil - } - - var fieldsToRemove = []string{ - // v4.4 stopped adding “ns” to index fields. - "ns", - - // v4.2+ ignores this field. - "background", - } - - return util.ServerThinksTheseMatch( - ctx, - verifier.metaClient, - srcSpec, - dstSpec, - option.Some(mongo.Pipeline{ - {{"$unset", lo.Reduce( - fieldsToRemove, - func(cur []string, field string, _ int) []string { - return append(cur, "a."+field, "b."+field) - }, - []string{}, - )}}, - }), - ) -} - func (verifier *Verifier) verifyShardingIfNeeded( ctx context.Context, srcColl, dstColl *mongo.Collection, @@ -186,83 +89,3 @@ func (verifier *Verifier) verifyShardingIfNeeded( return nil, nil } - -func (verifier *Verifier) verifyIndexes( - ctx context.Context, - srcColl, dstColl *mongo.Collection, - srcIdIndexSpec, dstIdIndexSpec bson.Raw, -) ([]VerificationResult, error) { - - srcMap, err := getIndexesMap(ctx, srcColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's indexes on source", - FullName(srcColl), - ) - } - - if srcIdIndexSpec != nil { - srcMap["_id"] = srcIdIndexSpec - } - - dstMap, err := getIndexesMap(ctx, dstColl) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to fetch %#q's indexes on destination", - FullName(dstColl), - ) - } - - if dstIdIndexSpec != nil { - dstMap["_id"] = dstIdIndexSpec - } - - var results []VerificationResult - srcMapUsed := map[string]bool{} - - for indexName, dstSpec := range dstMap { - srcSpec, exists := srcMap[indexName] - if exists { - srcMapUsed[indexName] = true - theyMatch, err := verifier.doIndexSpecsMatch(ctx, srcSpec, dstSpec) - if err != nil { - return nil, errors.Wrapf( - err, - "failed to check whether %#q's source & desstination %#q indexes match", - FullName(srcColl), - indexName, - ) - } - - if !theyMatch { - results = append(results, VerificationResult{ - NameSpace: FullName(dstColl), - Cluster: ClusterTarget, - ID: indexName, - Details: Mismatch + fmt.Sprintf(": src: %v, dst: %v", srcSpec, dstSpec), - }) - } - } else { - results = append(results, VerificationResult{ - ID: indexName, - Details: Missing, - Cluster: ClusterSource, - NameSpace: FullName(srcColl), - }) - } - } - - // Find any index specs which existed in the source cluster but not the target cluster. - for indexName := range srcMap { - if !srcMapUsed[indexName] { - results = append(results, VerificationResult{ - ID: indexName, - Details: Missing, - Cluster: ClusterTarget, - NameSpace: FullName(dstColl)}) - } - } - return results, nil -} diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 8ad8bd72..90482d03 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -22,6 +22,7 @@ import ( "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/internal/uuidutil" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/option" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" "github.com/samber/lo" @@ -816,6 +817,102 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name return partitionList, shardKeys, srcDocs, srcBytes, nil } +// Returns a slice of VerificationResults with the differences, and a boolean indicating whether or +// not the collection data can be safely verified. +func (verifier *Verifier) compareCollectionSpecifications( + srcNs, dstNs string, + srcSpecOpt, dstSpecOpt option.Option[util.CollectionSpec], +) ([]VerificationResult, bool, error) { + srcSpec, hasSrcSpec := srcSpecOpt.Get() + dstSpec, hasDstSpec := dstSpecOpt.Get() + + if !hasSrcSpec { + return []VerificationResult{{ + NameSpace: srcNs, + Cluster: ClusterSource, + Details: Missing}}, false, nil + } + if !hasDstSpec { + return []VerificationResult{{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Details: Missing}}, false, nil + } + if srcSpec.Type != dstSpec.Type { + return []VerificationResult{{ + NameSpace: srcNs, + Cluster: ClusterTarget, + Field: "Type", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Type, dstSpec.Type)}}, false, nil + // If the types differ, the rest is not important. + } + var results []VerificationResult + if srcSpec.Info.ReadOnly != dstSpec.Info.ReadOnly { + results = append(results, VerificationResult{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Field: "ReadOnly", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Info.ReadOnly, dstSpec.Info.ReadOnly)}) + } + if !bytes.Equal(srcSpec.Options, dstSpec.Options) { + mismatchDetails, err := BsonUnorderedCompareRawDocumentWithDetails(srcSpec.Options, dstSpec.Options) + if err != nil { + return nil, false, errors.Wrapf( + err, + "failed to compare namespace %#q's specifications", + srcNs, + ) + } + if mismatchDetails == nil { + results = append(results, VerificationResult{ + NameSpace: dstNs, + Cluster: ClusterTarget, + Field: "Options (Field Order Only)", + Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options)}) + } else { + results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, nil /* id */, "Options.")...) + } + } + + // Don't compare view data; they have no data of their own. + canCompareData := srcSpec.Type != "view" + // Do not compare data between capped and uncapped collections because the partitioning is different. + canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) + + return results, canCompareData, nil +} + +func (verifier *Verifier) doIndexSpecsMatch(ctx context.Context, srcSpec, dstSpec bson.Raw) (bool, error) { + // If the byte buffers match, then we’re done. + if bytes.Equal(srcSpec, dstSpec) { + return true, nil + } + + var fieldsToRemove = []string{ + // v4.4 stopped adding “ns” to index fields. + "ns", + + // v4.2+ ignores this field. + "background", + } + + return util.ServerThinksTheseMatch( + ctx, + verifier.metaClient, + srcSpec, + dstSpec, + option.Some(mongo.Pipeline{ + {{"$unset", lo.Reduce( + fieldsToRemove, + func(cur []string, field string, _ int) []string { + return append(cur, "a."+field, "b."+field) + }, + []string{}, + )}}, + }), + ) +} + func (verifier *Verifier) ProcessCollectionVerificationTask( ctx context.Context, workerNum int, @@ -897,6 +994,86 @@ func getIndexesMap( return specsMap, nil } +func (verifier *Verifier) verifyIndexes( + ctx context.Context, + srcColl, dstColl *mongo.Collection, + srcIdIndexSpec, dstIdIndexSpec bson.Raw, +) ([]VerificationResult, error) { + + srcMap, err := getIndexesMap(ctx, srcColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's indexes on source", + FullName(srcColl), + ) + } + + if srcIdIndexSpec != nil { + srcMap["_id"] = srcIdIndexSpec + } + + dstMap, err := getIndexesMap(ctx, dstColl) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to fetch %#q's indexes on destination", + FullName(dstColl), + ) + } + + if dstIdIndexSpec != nil { + dstMap["_id"] = dstIdIndexSpec + } + + var results []VerificationResult + srcMapUsed := map[string]bool{} + + for indexName, dstSpec := range dstMap { + srcSpec, exists := srcMap[indexName] + if exists { + srcMapUsed[indexName] = true + theyMatch, err := verifier.doIndexSpecsMatch(ctx, srcSpec, dstSpec) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to check whether %#q's source & desstination %#q indexes match", + FullName(srcColl), + indexName, + ) + } + + if !theyMatch { + results = append(results, VerificationResult{ + NameSpace: FullName(dstColl), + Cluster: ClusterTarget, + ID: indexName, + Details: Mismatch + fmt.Sprintf(": src: %v, dst: %v", srcSpec, dstSpec), + }) + } + } else { + results = append(results, VerificationResult{ + ID: indexName, + Details: Missing, + Cluster: ClusterSource, + NameSpace: FullName(srcColl), + }) + } + } + + // Find any index specs which existed in the source cluster but not the target cluster. + for indexName := range srcMap { + if !srcMapUsed[indexName] { + results = append(results, VerificationResult{ + ID: indexName, + Details: Missing, + Cluster: ClusterTarget, + NameSpace: FullName(dstColl)}) + } + } + return results, nil +} + func (verifier *Verifier) verifyMetadataAndPartitionCollection( ctx context.Context, workerNum int, From afa6f47ba97857e3cec23a650898a369b5ad6e26 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:35:50 -0500 Subject: [PATCH 07/21] undo line change --- internal/verifier/summary.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index ce27760b..0df302c5 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -44,13 +44,7 @@ func (verifier *Verifier) reportCollectionMetadataMismatches(ctx context.Context for _, v := range failedTasks { for _, f := range v.FailedDocs { - table.Append([]string{ - fmt.Sprintf("%v", f.ID), - fmt.Sprintf("%v", f.Cluster), - fmt.Sprintf("%v", f.Field), - fmt.Sprintf("%v", f.NameSpace), - fmt.Sprintf("%v", f.Details), - }) + table.Append([]string{fmt.Sprintf("%v", f.ID), fmt.Sprintf("%v", f.Cluster), fmt.Sprintf("%v", f.Field), fmt.Sprintf("%v", f.NameSpace), fmt.Sprintf("%v", f.Details)}) } } strBuilder.WriteString("\nCollections/Indexes in failed or retry status:\n") From 43db3b46dd0312c002ba1e60dbc4c324de9e403f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:54:08 -0500 Subject: [PATCH 08/21] only test if both are sharded --- internal/verifier/metadata.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 8ab66aee..9202a7f2 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -20,9 +20,11 @@ func (verifier *Verifier) verifyShardingIfNeeded( srcColl, dstColl *mongo.Collection, ) ([]VerificationResult, error) { - // If one cluster is sharded and the other is unsharded then there's - // nothing to do here. - if verifier.srcClusterInfo.Topology != verifier.dstClusterInfo.Topology { + // We only need to compare if both clusters are sharded + srcSharded := verifier.srcClusterInfo.Topology == util.TopologySharded + dstSharded := verifier.dstClusterInfo.Topology == util.TopologySharded + + if !srcSharded || !dstSharded { return nil, nil } From 129cd218b2cbbaf6fda7d7a049f42cd30b8eb89a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 12:57:57 -0500 Subject: [PATCH 09/21] enable sharding --- internal/verifier/metadata_test.go | 38 ++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index e5f8fdac..bd0fa0f1 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -29,7 +29,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { {"key", key}, }, ).Err(), - fmt.Sprintf("%s: should create %#q", label, dbname+"."+collName), + fmt.Sprintf("%s: should shard %#q", label, dbname+"."+collName), ) } @@ -40,8 +40,8 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { "sharded_dst", ) - for _, collName := range collections { - for c, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { + for c, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { + for _, collName := range collections { suite.Require().NoError( client.Database(dbname).CreateCollection( ctx, @@ -54,6 +54,13 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { } if srcInfo.Topology == util.TopologySharded { + suite.Require().NoError( + suite.srcMongoClient.Database("admin").RunCommand( + ctx, + bson.D{{"enableSharding", dbname}}, + ).Err(), + ) + shardCollection( suite.srcMongoClient, "idonly", @@ -75,6 +82,13 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { } if dstInfo.Topology == util.TopologySharded { + suite.Require().NoError( + suite.dstMongoClient.Database("admin").RunCommand( + ctx, + bson.D{{"enableSharding", dbname}}, + ).Err(), + ) + shardCollection( suite.dstMongoClient, "idonly", @@ -130,15 +144,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { suite.Require().Len(tasks, len(collections)) - if srcInfo.Topology != dstInfo.Topology { - for _, task := range tasks { - suite.Assert().Equal( - verificationTaskCompleted, - task.Status, - "mismatched topologies, so task should have succeeded: %v", task, - ) - } - } else { + if srcInfo.Topology == util.TopologySharded && dstInfo.Topology == util.TopologySharded { taskMap := mslices.ToMap( tasks, func(task VerificationTask) string { @@ -171,5 +177,13 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { ) suite.T().Logf("tasks: %+v", tasks) + } else { + for _, task := range tasks { + suite.Assert().Equal( + verificationTaskCompleted, + task.Status, + "mismatched topologies, so task should have succeeded: %v", task, + ) + } } } From 700b138f21595560ae5b9b070b16507b96c2daa7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 13:08:39 -0500 Subject: [PATCH 10/21] create indexes --- internal/verifier/metadata_test.go | 66 ++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index bd0fa0f1..2b8d12c1 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -76,9 +76,39 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { shardCollection( suite.srcMongoClient, "id_and_foo", - bson.D{{"_id", "hashed"}, {"foo", 1}}, + bson.D{{"_id", "1"}, {"foo", 1}}, "src", ) + } else { + suite.Require().NoError( + suite.srcMongoClient.Database(dbname).RunCommand( + ctx, + bson.D{ + {"createIndexes", "numtype"}, + {"indexes", []bson.D{ + { + {"name", "foo_1"}, + {"key", bson.D{{"foo", 1}}}, + }, + }}, + }, + ).Err(), + ) + + suite.Require().NoError( + suite.srcMongoClient.Database(dbname).RunCommand( + ctx, + bson.D{ + {"createIndexes", "id_and_foo"}, + {"indexes", []bson.D{ + { + {"name", "foo_1"}, + {"key", bson.D{{"_id", "1"}, {"foo", 1}}}, + }, + }}, + }, + ).Err(), + ) } if dstInfo.Topology == util.TopologySharded { @@ -104,15 +134,45 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { shardCollection( suite.dstMongoClient, "id_and_foo", - bson.D{{"foo", 1}, {"_id", "hashed"}}, + bson.D{{"foo", 1}, {"_id", "1"}}, "dst", ) shardCollection( suite.dstMongoClient, "sharded_dst", - bson.D{{"foo", 1}, {"_id", "hashed"}}, + bson.D{{"foo", 1}}, "dst", ) + } else { + suite.Require().NoError( + suite.dstMongoClient.Database(dbname).RunCommand( + ctx, + bson.D{ + {"createIndexes", "numtype"}, + {"indexes", []bson.D{ + { + {"name", "foo_1"}, + {"key", bson.D{{"foo", float64(1)}}}, + }, + }}, + }, + ).Err(), + ) + + suite.Require().NoError( + suite.dstMongoClient.Database(dbname).RunCommand( + ctx, + bson.D{ + {"createIndexes", "id_and_foo"}, + {"indexes", []bson.D{ + { + {"name", "foo_1"}, + {"key", bson.D{{"foo", 1}, {"_id", "1"}}}, + }, + }}, + }, + ).Err(), + ) } verifier := suite.BuildVerifier() From 1166dd2c3558f69e2ba6087e0322fd18cbab52f5 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 13:09:38 -0500 Subject: [PATCH 11/21] create both orders of index --- internal/verifier/metadata_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index 2b8d12c1..89676472 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -101,6 +101,10 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { bson.D{ {"createIndexes", "id_and_foo"}, {"indexes", []bson.D{ + { + {"name", "foo_1"}, + {"key", bson.D{{"foo", 1}, {"_id", "1"}}}, + }, { {"name", "foo_1"}, {"key", bson.D{{"_id", "1"}, {"foo", 1}}}, @@ -169,6 +173,10 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { {"name", "foo_1"}, {"key", bson.D{{"foo", 1}, {"_id", "1"}}}, }, + { + {"name", "foo_1"}, + {"key", bson.D{{"_id", "1"}, {"foo", 1}}}, + }, }}, }, ).Err(), From 178451600b3fab9aec5f926270ee72981f6e574b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 13:17:55 -0500 Subject: [PATCH 12/21] fix string 1 --- internal/verifier/metadata_test.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index 89676472..086e8725 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -76,7 +76,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { shardCollection( suite.srcMongoClient, "id_and_foo", - bson.D{{"_id", "1"}, {"foo", 1}}, + bson.D{{"_id", 1}, {"foo", 1}}, "src", ) } else { @@ -102,12 +102,12 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { {"createIndexes", "id_and_foo"}, {"indexes", []bson.D{ { - {"name", "foo_1"}, - {"key", bson.D{{"foo", 1}, {"_id", "1"}}}, + {"name", "foo_1__id_1"}, + {"key", bson.D{{"foo", 1}, {"_id", 1}}}, }, { - {"name", "foo_1"}, - {"key", bson.D{{"_id", "1"}, {"foo", 1}}}, + {"name", "_id_1_foo_1"}, + {"key", bson.D{{"_id", 1}, {"foo", 1}}}, }, }}, }, @@ -138,7 +138,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { shardCollection( suite.dstMongoClient, "id_and_foo", - bson.D{{"foo", 1}, {"_id", "1"}}, + bson.D{{"foo", 1}, {"_id", 1}}, "dst", ) shardCollection( @@ -170,12 +170,12 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { {"createIndexes", "id_and_foo"}, {"indexes", []bson.D{ { - {"name", "foo_1"}, - {"key", bson.D{{"foo", 1}, {"_id", "1"}}}, + {"name", "foo_1__id_1"}, + {"key", bson.D{{"foo", 1}, {"_id", 1}}}, }, { - {"name", "foo_1"}, - {"key", bson.D{{"_id", "1"}, {"foo", 1}}}, + {"name", "_id_1_foo_1"}, + {"key", bson.D{{"_id", 1}, {"foo", 1}}}, }, }}, }, @@ -243,8 +243,6 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { taskMap[dbname+".sharded_dst"].Status, "catch dst-only sharded", ) - - suite.T().Logf("tasks: %+v", tasks) } else { for _, task := range tasks { suite.Assert().Equal( From 22cf1e9bcb49d6bb50b1441d4884f24a6d95d604 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 13:27:04 -0500 Subject: [PATCH 13/21] +v --- internal/verifier/metadata_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index 086e8725..5e6a55c5 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -248,7 +248,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { suite.Assert().Equal( verificationTaskCompleted, task.Status, - "mismatched topologies, so task should have succeeded: %v", task, + "mismatched topologies, so task should have succeeded: %+v", task, ) } } From 47e2ced7a2ec9220e26b90af817b1306fb32ec43 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 13:41:29 -0500 Subject: [PATCH 14/21] create indexes consistently --- internal/verifier/metadata_test.go | 83 ++++++++++++++---------------- 1 file changed, 39 insertions(+), 44 deletions(-) diff --git a/internal/verifier/metadata_test.go b/internal/verifier/metadata_test.go index 5e6a55c5..16d53573 100644 --- a/internal/verifier/metadata_test.go +++ b/internal/verifier/metadata_test.go @@ -8,6 +8,7 @@ import ( "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "golang.org/x/exp/maps" ) func (suite *IntegrationTestSuite) TestShardingMismatch() { @@ -33,15 +34,22 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { ) } - collections := mslices.Of( - "idonly", - "numtype", - "id_and_foo", - "sharded_dst", - ) + // Create indexes as needed to ensure that we only + // check for sharding mismatches. + allIndexes := map[string]map[string]bson.D{ + "numtype": { + "foo_1": {{"foo", 1}}, + }, + "id_and_foo": { + "foo_1__id_1": {{"foo", 1}, {"_id", 1}}, + "_id_1_foo_1": {{"_id", 1}, {"foo", 1}}, + }, + "idonly": {}, + "sharded_dst": {}, + } for c, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { - for _, collName := range collections { + for collName, indexMap := range allIndexes { suite.Require().NoError( client.Database(dbname).CreateCollection( ctx, @@ -50,6 +58,27 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { "should create %#q on "+lo.Ternary(c == 0, "source", "destinatinon"), collName, ) + + if len(indexMap) > 0 { + + suite.Require().NoError( + client.Database(dbname).RunCommand( + ctx, + bson.D{ + {"createIndexes", collName}, + {"indexes", lo.Map( + maps.Keys(indexMap), + func(idxName string, _ int) bson.D { + return bson.D{ + {"name", idxName}, + {"key", indexMap[idxName]}, + } + }, + )}, + }, + ).Err(), + ) + } } } @@ -79,40 +108,6 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { bson.D{{"_id", 1}, {"foo", 1}}, "src", ) - } else { - suite.Require().NoError( - suite.srcMongoClient.Database(dbname).RunCommand( - ctx, - bson.D{ - {"createIndexes", "numtype"}, - {"indexes", []bson.D{ - { - {"name", "foo_1"}, - {"key", bson.D{{"foo", 1}}}, - }, - }}, - }, - ).Err(), - ) - - suite.Require().NoError( - suite.srcMongoClient.Database(dbname).RunCommand( - ctx, - bson.D{ - {"createIndexes", "id_and_foo"}, - {"indexes", []bson.D{ - { - {"name", "foo_1__id_1"}, - {"key", bson.D{{"foo", 1}, {"_id", 1}}}, - }, - { - {"name", "_id_1_foo_1"}, - {"key", bson.D{{"_id", 1}, {"foo", 1}}}, - }, - }}, - }, - ).Err(), - ) } if dstInfo.Topology == util.TopologySharded { @@ -144,7 +139,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { shardCollection( suite.dstMongoClient, "sharded_dst", - bson.D{{"foo", 1}}, + bson.D{{"_id", 1}}, "dst", ) } else { @@ -186,7 +181,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { verifier := suite.BuildVerifier() namespaces := lo.Map( - collections, + maps.Keys(allIndexes), func(collName string, _ int) string { return dbname + "." + collName }, @@ -210,7 +205,7 @@ func (suite *IntegrationTestSuite) TestShardingMismatch() { var tasks []VerificationTask suite.Require().NoError(cursor.All(ctx, &tasks)) - suite.Require().Len(tasks, len(collections)) + suite.Require().Len(tasks, len(allIndexes)) if srcInfo.Topology == util.TopologySharded && dstInfo.Topology == util.TopologySharded { taskMap := mslices.ToMap( From b4f45b34962ff9a06fbefd8a2f1e0eac2163c73c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 28 Nov 2024 14:04:11 -0500 Subject: [PATCH 15/21] account for rs->sc in list-namespaces test --- internal/verifier/integration_test_suite.go | 4 ++-- internal/verifier/migration_verifier_test.go | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 67623b56..4c6fc911 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -134,10 +134,10 @@ func (suite *IntegrationTestSuite) TearDownTest() { } } -func (suite *IntegrationTestSuite) GetSrcTopology() util.ClusterTopology { +func (suite *IntegrationTestSuite) GetTopology(client *mongo.Client) util.ClusterTopology { clusterInfo, err := util.GetClusterInfo( suite.Context(), - suite.srcMongoClient, + client, ) suite.Require().NoError(err, "should fetch src cluster info") diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index b7f5fd2e..f1c36abf 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1183,7 +1183,7 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { err = suite.dstMongoClient.Database("testDb4").CreateCollection(ctx, "testColl6") suite.Require().NoError(err) - if suite.GetSrcTopology() != util.TopologySharded { + if suite.GetTopology(suite.dstMongoClient) != util.TopologySharded { err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "testColl7") suite.Require().NoError(err) } @@ -1224,9 +1224,12 @@ func (suite *IntegrationTestSuite) TestVerifierNamespaceList() { suite.ElementsMatch([]string{"testDb1.testColl1", "testDb1.testColl2", "testDb1.testView1"}, verifier.dstNamespaces) // Collections in admin, config, and local should not be found - if suite.GetSrcTopology() != util.TopologySharded { + if suite.GetTopology(suite.srcMongoClient) != util.TopologySharded { err = suite.srcMongoClient.Database("local").CreateCollection(ctx, "islocalSrc") suite.Require().NoError(err) + } + + if suite.GetTopology(suite.dstMongoClient) != util.TopologySharded { err = suite.dstMongoClient.Database("local").CreateCollection(ctx, "islocalDest") suite.Require().NoError(err) } From 9c164b829d80d6a63a490cc8edc0d8e8ea220a3b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 09:52:57 -0500 Subject: [PATCH 16/21] fix --- internal/verifier/migration_verifier.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e84e6fe5..813f91a2 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -434,11 +434,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A) return append(predicates, verifier.globalFilter) } -<<<<<<< HEAD -func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.ClusterInfo, -======= func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, ->>>>>>> main startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() From 1b699f3347835a133c42dbb7266b93377b1a2cb1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 10:32:29 -0500 Subject: [PATCH 17/21] cluster --- internal/verifier/uri.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/uri.go b/internal/verifier/uri.go index 25470af4..abd71e18 100644 --- a/internal/verifier/uri.go +++ b/internal/verifier/uri.go @@ -19,7 +19,7 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error { clusterInfo, err := util.GetClusterInfo(ctx, verifier.srcClient) if err != nil { - return errors.Wrap(err, "failed to read source build info") + return errors.Wrap(err, "failed to read source cluster info") } verifier.srcClusterInfo = &clusterInfo From 10837f4363c31468ab1eee5877838b8d9f450fd3 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 10:53:50 -0500 Subject: [PATCH 18/21] undo retry.go changes --- internal/retry/retry.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index b083fa09..bb2a55aa 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -11,12 +11,7 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -// RunForUUIDAndTransientErrors should never be used. Its presence here -// is a vestige of the retryer's origin in mongosync. Since migration-verifier -// forbids DDL operations, a namespace should be just as stable as its UUID; thus, -// this function is needless complexity. -// -// This function retries f() for the CollectionUUIDMismatch error and for transient errors. +// RunForUUIDAndTransientErrors retries f() for the CollectionUUIDMismatch error and for transient errors. // This should be used to run a driver operation that optionally specifies the `collectionUUID` parameter // for a collection that may have been: // @@ -39,7 +34,7 @@ import ( // f() is provided with a collection name string, which is the one that should be used in the body // of f() when a collection name is needed. The initial value of this string is expectedCollName. // -// This function always returns the collection's current name. It returns +// RunForUUIDAndTransientErrors always returns the collection's current name. It returns // an error if the duration limit is reached, or if f() returns a non-transient error. func (r *Retryer) RunForUUIDAndTransientErrors( ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error, @@ -47,12 +42,7 @@ func (r *Retryer) RunForUUIDAndTransientErrors( return r.runRetryLoop(ctx, logger, expectedCollName, f, true, true) } -// RunForUUIDErrorOnly should never be used. Its presence here -// is a vestige of the retryer's origin in mongosync. Since migration-verifier -// forbids DDL operations, a namespace should be just as stable as its UUID; thus, -// this function is needless complexity. -// -// This function retries f() for the CollectionUUIDMismatch error only. This should primarily +// RunForUUIDErrorOnly retries f() for the CollectionUUIDMismatch error only. This should primarily // be used to wrap a transaction callback containing an operation that specifies the `collectionUUID` // parameter for a collection that may have been: // @@ -67,7 +57,7 @@ func (r *Retryer) RunForUUIDAndTransientErrors( // f() is provided with a collection name string, which is the one that should be used in the body // of f() where a collection name is needed. The initial value of this string is expectedCollName. // -// This function returns the collection's current name in all cases. +// RunForUUIDErrorOnly returns the collection's current name in all cases. func (r *Retryer) RunForUUIDErrorOnly( ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error, ) (string, error) { From 4a29f5a56fc43013cf33e366b038e032f8b90883 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 11:00:08 -0500 Subject: [PATCH 19/21] copy test over --- mslices/slices_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/mslices/slices_test.go b/mslices/slices_test.go index 4264d76f..64dee56f 100644 --- a/mslices/slices_test.go +++ b/mslices/slices_test.go @@ -25,3 +25,26 @@ func (s *mySuite) Test_Of() { s.Assert().Equal(1, b[0], "should copy slice") } + +func (s *mySuite) Test_ToMap() { + type stuff struct { + Name string + } + + mySlice := []stuff{{"foo"}, {"bar"}, {"bar"}} + + myMap := ToMap( + mySlice, + func(el stuff) string { + return el.Name + }, + ) + + s.Assert().Equal( + map[string]stuff{ + "foo": {"foo"}, + "bar": {"bar"}, + }, + myMap, + ) +} From dd5b4adf0c6905f02bbeee1e483d3befcd0ccd27 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 14:16:08 -0500 Subject: [PATCH 20/21] simplify --- internal/util/sharding.go | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/internal/util/sharding.go b/internal/util/sharding.go index 3fbf55b0..66e0f566 100644 --- a/internal/util/sharding.go +++ b/internal/util/sharding.go @@ -7,7 +7,6 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) const ( @@ -27,7 +26,14 @@ func GetShardKey( Database(configDBName). Collection(collsCollName) - rawResult, err := configCollectionsColl.FindOne(ctx, bson.D{{"_id", namespace}}).Raw() + decoded := struct { + Key option.Option[bson.Raw] + }{} + + err := configCollectionsColl. + FindOne(ctx, bson.D{{"_id", namespace}}). + Decode(&decoded) + if errors.Is(err, mongo.ErrNoDocuments) { return option.None[bson.Raw](), nil } else if err != nil { @@ -38,29 +44,11 @@ func GetShardKey( ) } - keyAsVal, err := rawResult.LookupErr("key") - if errors.Is(err, bsoncore.ErrElementNotFound) { - return option.None[bson.Raw](), nil - } else if err != nil { - return option.None[bson.Raw](), errors.Wrapf( - err, - "failed to find %#q in %#q's %#q entry", - "key", - namespace, - FullName(configCollectionsColl), - ) - } + key, hasKey := decoded.Key.Get() - keyAsRaw, isDoc := keyAsVal.DocumentOK() - if !isDoc { - return option.None[bson.Raw](), errors.Errorf( - "%#q in %#q's %#q entry is of type %#q, not an object", - "key", - namespace, - FullName(configCollectionsColl), - keyAsVal.Type, - ) + if !hasKey { + return option.None[bson.Raw](), nil } - return option.Some(keyAsRaw), nil + return option.Some(key), nil } From 7dba357ce1d3e2b114a5adb49095f8e65fc4d2ef Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 3 Dec 2024 14:32:13 -0500 Subject: [PATCH 21/21] add comment about the metadata cluster --- internal/util/askserver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/util/askserver.go b/internal/util/askserver.go index 72e3cd40..ba03aebe 100644 --- a/internal/util/askserver.go +++ b/internal/util/askserver.go @@ -17,6 +17,9 @@ import ( // // tinker is an optional pipeline that operates on the documents in the // pipeline (`a` and `b`, respectively) before they're compared. +// +// In migration-verifier the client is generally expected to be for +// the metadata cluster. func ServerThinksTheseMatch( ctx context.Context, client *mongo.Client,