From 7bb30ca314e1a1ce93c912bb6dd0cb2b14033193 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 15 Nov 2024 12:35:45 -0500 Subject: [PATCH 1/3] Cap recheck size. --- internal/reportutils/reportutils.go | 13 +++++++--- internal/verifier/recheck.go | 38 +++++++++++++++++++++++------ 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/internal/reportutils/reportutils.go b/internal/reportutils/reportutils.go index acb603b9..cb7b348d 100644 --- a/internal/reportutils/reportutils.go +++ b/internal/reportutils/reportutils.go @@ -15,10 +15,11 @@ import ( const decimalPrecision = 2 -// This could include signed ints, but we have no need for now. -// The bigger requirement is that it exclude uint8. +// num16Plus is like realNum, but it excludes 8-bit int/uint. type num16Plus interface { - constraints.Float | ~uint | ~uint16 | ~uint32 | ~uint64 + constraints.Float | + ~uint | ~uint16 | ~uint32 | ~uint64 | + ~int | ~int16 | ~int32 | ~int64 } type realNum interface { @@ -178,3 +179,9 @@ func FindBestUnit[T num16Plus](count T) DataUnit { return biggestUnit } + +// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit. +// Use it to format a single count of bytes. +func FmtBytes[T num16Plus](count T) string { + return BytesToUnit(count, FindBestUnit(count)) +} diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 5626e714..df13d8a7 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/pkg/errors" @@ -194,19 +195,40 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int { func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { prevGeneration := verifier.getPreviousGenerationWhileLocked() - verifier.logger.Debug().Msgf("Creating recheck tasks from generation %d’s %s documents", prevGeneration, recheckQueue) + recheckColl := verifier.verificationDatabase().Collection(recheckQueue) + estimatedRechecks, err := recheckColl.EstimatedDocumentCount(ctx) + if err != nil { + return errors.Wrapf(err, + "failed to retrieve %#q’s estimated document count", + verifier.verificationDatabase().Name()+"."+recheckQueue, + ) + } + + verifier.logger.Debug(). + Int("enqueuedInGeneration", prevGeneration). + Int("estimatedRechecks", int(estimatedRechecks)). + Msgf("Creating recheck tasks from enqueued rechecks.") // We generate one recheck task per collection, unless // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding // the 16MB BSON limit) // 2) The size of the data would exceed our desired partition size. This limits memory use // during the recheck phase. + // 3) The number of documents exceeds $rechecksCount/$numWorkers. We do + // this to prevent one thread from doing all of the rechecks. + prevDBName, prevCollName := "", "" var idAccum []interface{} var idLenAccum int var dataSizeAccum int64 const maxIdsSize = 12 * 1024 * 1024 - cursor, err := verifier.verificationDatabase().Collection(recheckQueue).Find( + maxDocsPerTask := estimatedRechecks / int64(verifier.numWorkers) + + if maxDocsPerTask < int64(verifier.numWorkers) { + maxDocsPerTask = int64(verifier.numWorkers) + } + + cursor, err := recheckColl.Find( ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}})) if err != nil { return err @@ -227,10 +249,9 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { idRaw := cursor.Current.Lookup("_id", "docID") idLen := len(idRaw.Value) - verifier.logger.Debug().Msgf("Found persisted recheck doc for %s.%s", doc.PrimaryKey.DatabaseName, doc.PrimaryKey.CollectionName) - if doc.PrimaryKey.DatabaseName != prevDBName || doc.PrimaryKey.CollectionName != prevCollName || + int64(len(idAccum)) > maxDocsPerTask || idLenAccum >= maxIdsSize || dataSizeAccum >= verifier.partitionSizeInBytes { namespace := prevDBName + "." + prevCollName @@ -239,10 +260,11 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { if err != nil { return err } - verifier.logger.Debug().Msgf( - "Created ID verification task for namespace %s with %d ids, "+ - "%d id bytes and %d data bytes", - namespace, len(idAccum), idLenAccum, dataSizeAccum) + verifier.logger.Debug(). + Str("namespace", namespace). + Int("numDocuments", len(idAccum)). + Str("dataSize", reportutils.FmtBytes(dataSizeAccum)). + Msg("Created document recheck task.") } prevDBName = doc.PrimaryKey.DatabaseName prevCollName = doc.PrimaryKey.CollectionName From 1fcb8a811995e92ef41d98623e42a73e779a1d09 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 15 Nov 2024 15:22:27 -0500 Subject: [PATCH 2/3] rework --- internal/verifier/recheck.go | 61 ++++++++++++++++---------- internal/verifier/verification_task.go | 2 +- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index df13d8a7..1e46bf72 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -196,18 +196,21 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { prevGeneration := verifier.getPreviousGenerationWhileLocked() recheckColl := verifier.verificationDatabase().Collection(recheckQueue) - estimatedRechecks, err := recheckColl.EstimatedDocumentCount(ctx) + rechecksCount, err := recheckColl.CountDocuments( + ctx, + bson.D{{"_id.generation", prevGeneration}}, + ) if err != nil { return errors.Wrapf(err, - "failed to retrieve %#q’s estimated document count", - verifier.verificationDatabase().Name()+"."+recheckQueue, + "failed to count generation %d’s rechecks", + prevGeneration, ) } verifier.logger.Debug(). - Int("enqueuedInGeneration", prevGeneration). - Int("estimatedRechecks", int(estimatedRechecks)). - Msgf("Creating recheck tasks from enqueued rechecks.") + Int("priorGeneration", prevGeneration). + Int("rechecksCount", int(rechecksCount)). + Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.") // We generate one recheck task per collection, unless // 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding @@ -222,7 +225,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { var idLenAccum int var dataSizeAccum int64 const maxIdsSize = 12 * 1024 * 1024 - maxDocsPerTask := estimatedRechecks / int64(verifier.numWorkers) + maxDocsPerTask := rechecksCount / int64(verifier.numWorkers) if maxDocsPerTask < int64(verifier.numWorkers) { maxDocsPerTask = int64(verifier.numWorkers) @@ -234,13 +237,27 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { return err } defer cursor.Close(ctx) - // We group these here using a sort rather than using aggregate because aggregate is - // subject to a 16MB limit on group size. - for cursor.Next(ctx) { - err := cursor.Err() + + createRecheckTask := func() error { + namespace := prevDBName + "." + prevCollName + + err := verifier.InsertDocumentRecheckTask(idAccum, types.ByteCount(dataSizeAccum), namespace) if err != nil { return err } + + verifier.logger.Debug(). + Str("namespace", namespace). + Int("numDocuments", len(idAccum)). + Str("dataSize", reportutils.FmtBytes(dataSizeAccum)). + Msg("Created document recheck task.") + + return nil + } + + // We group these here using a sort rather than using aggregate because aggregate is + // subject to a 16MB limit on group size. + for cursor.Next(ctx) { var doc RecheckDoc err = cursor.Decode(&doc) if err != nil { @@ -254,17 +271,12 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { int64(len(idAccum)) > maxDocsPerTask || idLenAccum >= maxIdsSize || dataSizeAccum >= verifier.partitionSizeInBytes { - namespace := prevDBName + "." + prevCollName + if len(idAccum) > 0 { - err := verifier.InsertFailedIdsVerificationTask(idAccum, types.ByteCount(dataSizeAccum), namespace) + err := createRecheckTask() if err != nil { return err } - verifier.logger.Debug(). - Str("namespace", namespace). - Int("numDocuments", len(idAccum)). - Str("dataSize", reportutils.FmtBytes(dataSizeAccum)). - Msg("Created document recheck task.") } prevDBName = doc.PrimaryKey.DatabaseName prevCollName = doc.PrimaryKey.CollectionName @@ -276,16 +288,17 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) } + + err = cursor.Err() + if err != nil { + return err + } + if len(idAccum) > 0 { - namespace := prevDBName + "." + prevCollName - err := verifier.InsertFailedIdsVerificationTask(idAccum, types.ByteCount(dataSizeAccum), namespace) + err := createRecheckTask() if err != nil { return err } - verifier.logger.Debug().Msgf( - "Created ID verification task for namespace %s with %d ids, "+ - "%d id bytes and %d data bytes", - namespace, len(idAccum), idLenAccum, dataSizeAccum) } return nil } diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index b02dbe8a..dc1f4af9 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -153,7 +153,7 @@ func (verifier *Verifier) InsertPartitionVerificationTask(partition *partitions. return &verificationTask, err } -func (verifier *Verifier) InsertFailedIdsVerificationTask(ids []interface{}, dataSize types.ByteCount, srcNamespace string) error { +func (verifier *Verifier) InsertDocumentRecheckTask(ids []interface{}, dataSize types.ByteCount, srcNamespace string) error { dstNamespace := srcNamespace if len(verifier.nsMap) != 0 { var ok bool From 40f7e0747e25bcfd8968c9cc4f43bf5a8aff35b1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 15 Nov 2024 15:39:38 -0500 Subject: [PATCH 3/3] more comments & tweaks --- internal/verifier/recheck.go | 76 ++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 1e46bf72..0f7ad651 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -19,11 +19,16 @@ const ( recheckQueue = "recheckQueue" maxBSONObjSize = 16 * 1024 * 1024 recheckInserterThreadsSoftMax = 100 + maxIdsPerRecheckTask = 12 * 1024 * 1024 ) // RecheckPrimaryKey stores the implicit type of recheck to perform // Currently, we only handle document mismatches/change stream updates, // so DatabaseName, CollectionName, and DocumentID must always be specified. +// +// NB: Order is important here so that, within a given generation, +// sorting by _id will guarantee that all rechecks for a given +// namespace appear consecutively. type RecheckPrimaryKey struct { Generation int `bson:"generation"` DatabaseName string `bson:"db"` @@ -195,11 +200,14 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int { func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { prevGeneration := verifier.getPreviousGenerationWhileLocked() + findFilter := bson.D{{"_id.generation", prevGeneration}} + + verifier.logger.Debug(). + Int("priorGeneration", prevGeneration). + Msgf("Counting prior generation’s enqueued rechecks.") + recheckColl := verifier.verificationDatabase().Collection(recheckQueue) - rechecksCount, err := recheckColl.CountDocuments( - ctx, - bson.D{{"_id.generation", prevGeneration}}, - ) + rechecksCount, err := recheckColl.CountDocuments(ctx, findFilter) if err != nil { return errors.Wrapf(err, "failed to count generation %d’s rechecks", @@ -220,30 +228,49 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { // 3) The number of documents exceeds $rechecksCount/$numWorkers. We do // this to prevent one thread from doing all of the rechecks. - prevDBName, prevCollName := "", "" + var prevDBName, prevCollName string var idAccum []interface{} var idLenAccum int var dataSizeAccum int64 - const maxIdsSize = 12 * 1024 * 1024 + maxDocsPerTask := rechecksCount / int64(verifier.numWorkers) if maxDocsPerTask < int64(verifier.numWorkers) { maxDocsPerTask = int64(verifier.numWorkers) } + // The sort here is important because the recheck _id is an embedded + // document that includes the namespace. Thus, all rechecks for a given + // namespace will be consecutive in this query’s result. cursor, err := recheckColl.Find( - ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}})) + ctx, + findFilter, + options.Find().SetSort(bson.D{{"_id", 1}}), + ) if err != nil { return err } defer cursor.Close(ctx) - createRecheckTask := func() error { + persistBufferedRechecks := func() error { + if len(idAccum) == 0 { + return nil + } + namespace := prevDBName + "." + prevCollName - err := verifier.InsertDocumentRecheckTask(idAccum, types.ByteCount(dataSizeAccum), namespace) + err := verifier.InsertDocumentRecheckTask( + idAccum, + types.ByteCount(dataSizeAccum), + namespace, + ) if err != nil { - return err + return errors.Wrapf( + err, + "failed to create a %d-document recheck task for collection %#q", + len(idAccum), + namespace, + ) } verifier.logger.Debug(). @@ -263,27 +290,34 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { if err != nil { return err } + idRaw := cursor.Current.Lookup("_id", "docID") idLen := len(idRaw.Value) + // We persist rechecks if any of these happen: + // - the namespace has changed + // - we’ve reached the per-task recheck maximum + // - the buffered document IDs’ size exceeds the per-task maximum + // - the buffered documents exceed the partition size + // if doc.PrimaryKey.DatabaseName != prevDBName || doc.PrimaryKey.CollectionName != prevCollName || int64(len(idAccum)) > maxDocsPerTask || - idLenAccum >= maxIdsSize || + idLenAccum >= maxIdsPerRecheckTask || dataSizeAccum >= verifier.partitionSizeInBytes { - if len(idAccum) > 0 { - err := createRecheckTask() - if err != nil { - return err - } + err := persistBufferedRechecks() + if err != nil { + return err } + prevDBName = doc.PrimaryKey.DatabaseName prevCollName = doc.PrimaryKey.CollectionName idLenAccum = 0 dataSizeAccum = 0 - idAccum = []interface{}{} + idAccum = idAccum[:0] } + idLenAccum += idLen dataSizeAccum += int64(doc.DataSize) idAccum = append(idAccum, doc.PrimaryKey.DocumentID) @@ -294,11 +328,5 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { return err } - if len(idAccum) > 0 { - err := createRecheckTask() - if err != nil { - return err - } - } - return nil + return persistBufferedRechecks() }