From e6429f9383b6bd573d2a1b93a5a133a3d08b816d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 00:18:40 -0500 Subject: [PATCH 1/3] Remove dedupe logic --- internal/verifier/recheck.go | 82 +------------------------------ internal/verifier/recheck_test.go | 2 +- 2 files changed, 3 insertions(+), 81 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 127eae89..b1fd3d39 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -105,16 +105,10 @@ func (verifier *Verifier) insertRecheckDocs( verifier.mux.RLock() defer verifier.mux.RUnlock() - dbNames, collNames, rawDocIDs, dataSizes := deduplicateRechecks( - dbNames, - collNames, - documentIDs, - dataSizes, - ) - generation, _ := verifier.getGenerationWhileLocked() eg, groupCtx := contextplus.ErrGroup(ctx) + eg.SetLimit(100) genCollection := verifier.getRecheckQueueCollection(generation) @@ -167,7 +161,7 @@ func (verifier *Verifier) insertRecheckDocs( PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: dbName, SrcCollectionName: collNames[i], - DocumentID: rawDocIDs[i], + DocumentID: documentIDs[i], }, DataSize: dataSizes[i], } @@ -211,78 +205,6 @@ func (verifier *Verifier) insertRecheckDocs( return nil } -func deduplicateRechecks( - dbNames, collNames []string, - documentIDs []bson.RawValue, - dataSizes []int, -) ([]string, []string, []bson.RawValue, []int) { - dedupeMap := map[string]map[string]map[string]int{} - - uniqueElems := 0 - - for i, dbName := range dbNames { - collName := collNames[i] - docIDRaw := documentIDs[i] - dataSize := dataSizes[i] - - docIDBuf := make([]byte, 1+len(docIDRaw.Value)) - docIDBuf[0] = byte(docIDRaw.Type) - copy(docIDBuf[1:], docIDRaw.Value) - docIDStr := string(docIDBuf) - - if _, ok := dedupeMap[dbName]; !ok { - dedupeMap[dbName] = map[string]map[string]int{ - collName: { - docIDStr: dataSize, - }, - } - - uniqueElems++ - - continue - } - - if _, ok := dedupeMap[dbName][collName]; !ok { - dedupeMap[dbName][collName] = map[string]int{ - docIDStr: dataSize, - } - - uniqueElems++ - - continue - } - - if _, ok := dedupeMap[dbName][collName][docIDStr]; !ok { - dedupeMap[dbName][collName][docIDStr] = dataSize - uniqueElems++ - } - } - - dbNames = make([]string, 0, uniqueElems) - collNames = make([]string, 0, uniqueElems) - rawDocIDs := make([]bson.RawValue, 0, uniqueElems) - dataSizes = make([]int, 0, uniqueElems) - - for dbName, collMap := range dedupeMap { - for collName, docMap := range collMap { - for docIDStr, dataSize := range docMap { - dbNames = append(dbNames, dbName) - collNames = append(collNames, collName) - rawDocIDs = append( - rawDocIDs, - bson.RawValue{ - Type: bson.Type(docIDStr[0]), - Value: []byte(docIDStr[1:]), - }, - ) - dataSizes = append(dataSizes, dataSize) - } - } - } - - return dbNames, collNames, rawDocIDs, dataSizes -} - // DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck // documents from the verifier’s metadata. // diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index f172bd71..5b8ccc7b 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -240,7 +240,7 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() { verifier.SetNumWorkers(10) ctx := suite.Context() - docsCount := 20_000_000 + docsCount := 5_000_000 suite.T().Logf("Inserting %d rechecks …", docsCount) From 652d37acdbe8a36747cb79e3dbd760b6d7b2852f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 4 Nov 2025 00:31:20 -0500 Subject: [PATCH 2/3] maybe not --- internal/verifier/recheck.go | 107 ++++++++++++++++++++++++++++++ internal/verifier/recheck_test.go | 2 +- 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index b1fd3d39..83574560 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -3,6 +3,7 @@ package verifier import ( "context" "fmt" + "time" "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/reportutils" @@ -105,6 +106,15 @@ func (verifier *Verifier) insertRecheckDocs( verifier.mux.RLock() defer verifier.mux.RUnlock() + start := time.Now() + dbNames, collNames, documentIDs, dataSizes = deduplicateRechecks( + dbNames, + collNames, + documentIDs, + dataSizes, + ) + fmt.Printf("----- deduplicate time: %s\n", time.Since(start)) + generation, _ := verifier.getGenerationWhileLocked() eg, groupCtx := contextplus.ErrGroup(ctx) @@ -205,6 +215,103 @@ func (verifier *Verifier) insertRecheckDocs( return nil } +func deduplicateRechecks( + dbNames, collNames []string, + documentIDs []bson.RawValue, + dataSizes []int, +) ([]string, []string, []bson.RawValue, []int) { + + /* + for i := len(dbNames) - 1; i >= 0; i-- { + for j := i - 1; j >= 0; j-- { + if dbNames[i] != dbNames[j] { + continue + } + + if collNames[i] != collNames[j] { + continue + } + + if !documentIDs[i].Equal(documentIDs[j]) { + continue + } + + dbNames = slices.Delete(dbNames, i, 1+i) + collNames = slices.Delete(collNames, i, 1+i) + documentIDs = slices.Delete(documentIDs, i, 1+i) + dataSizes = slices.Delete(dataSizes, i, 1+i) + break + } + } + */ + + dedupeMap := map[string]map[string]map[string]int{} + + uniqueElems := 0 + + for i, dbName := range dbNames { + collName := collNames[i] + docIDRaw := documentIDs[i] + dataSize := dataSizes[i] + + docIDBuf := make([]byte, 1+len(docIDRaw.Value)) + docIDBuf[0] = byte(docIDRaw.Type) + copy(docIDBuf[1:], docIDRaw.Value) + docIDStr := string(docIDBuf) + + if _, ok := dedupeMap[dbName]; !ok { + dedupeMap[dbName] = map[string]map[string]int{ + collName: { + docIDStr: dataSize, + }, + } + + uniqueElems++ + + continue + } + + if _, ok := dedupeMap[dbName][collName]; !ok { + dedupeMap[dbName][collName] = map[string]int{ + docIDStr: dataSize, + } + + uniqueElems++ + + continue + } + + if _, ok := dedupeMap[dbName][collName][docIDStr]; !ok { + dedupeMap[dbName][collName][docIDStr] = dataSize + uniqueElems++ + } + } + + dbNames = make([]string, 0, uniqueElems) + collNames = make([]string, 0, uniqueElems) + rawDocIDs := make([]bson.RawValue, 0, uniqueElems) + dataSizes = make([]int, 0, uniqueElems) + + for dbName, collMap := range dedupeMap { + for collName, docMap := range collMap { + for docIDStr, dataSize := range docMap { + dbNames = append(dbNames, dbName) + collNames = append(collNames, collName) + rawDocIDs = append( + rawDocIDs, + bson.RawValue{ + Type: bson.Type(docIDStr[0]), + Value: []byte(docIDStr[1:]), + }, + ) + dataSizes = append(dataSizes, dataSize) + } + } + } + + return dbNames, collNames, rawDocIDs, dataSizes +} + // DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck // documents from the verifier’s metadata. // diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 5b8ccc7b..6dbb8cd0 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -240,7 +240,7 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() { verifier.SetNumWorkers(10) ctx := suite.Context() - docsCount := 5_000_000 + docsCount := 12_000_000 suite.T().Logf("Inserting %d rechecks …", docsCount) From 1ac81da80c202a5ddbbcf5e9309823977919a76b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 5 Nov 2025 11:16:01 -0500 Subject: [PATCH 3/3] comment --- internal/verifier/recheck.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 83574560..3f84c17a 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -118,6 +118,9 @@ func (verifier *Verifier) insertRecheckDocs( generation, _ := verifier.getGenerationWhileLocked() eg, groupCtx := contextplus.ErrGroup(ctx) + + // MongoDB’s Go driver starts failing requests if we try to exceed + // its connection pool’s size. To avoid that, we limit our concurrency. eg.SetLimit(100) genCollection := verifier.getRecheckQueueCollection(generation)