diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index ebb46a24..e048a6fe 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -113,17 +113,23 @@ func (verifier *Verifier) insertRecheckDocs( verifier.mux.RLock() defer verifier.mux.RUnlock() - dbNames, collNames, rawDocIDs, dataSizes := deduplicateRechecks( + start := time.Now() + dbNames, collNames, documentIDs, dataSizes = deduplicateRechecks( dbNames, collNames, documentIDs, dataSizes, ) + fmt.Printf("----- deduplicate time: %s\n", time.Since(start)) generation, _ := verifier.getGenerationWhileLocked() eg, groupCtx := contextplus.ErrGroup(ctx) + // MongoDB’s Go driver starts failing requests if we try to exceed + // its connection pool’s size. To avoid that, we limit our concurrency. + eg.SetLimit(100) + genCollection := verifier.getRecheckQueueCollection(generation) sendRechecks := func(rechecks []bson.Raw) { @@ -175,7 +181,7 @@ func (verifier *Verifier) insertRecheckDocs( PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: dbName, SrcCollectionName: collNames[i], - DocumentID: rawDocIDs[i], + DocumentID: documentIDs[i], Rand: rand.Int32(), }, DataSize: dataSizes[i], @@ -225,6 +231,31 @@ func deduplicateRechecks( documentIDs []bson.RawValue, dataSizes []int, ) ([]string, []string, []bson.RawValue, []int) { + + /* + for i := len(dbNames) - 1; i >= 0; i-- { + for j := i - 1; j >= 0; j-- { + if dbNames[i] != dbNames[j] { + continue + } + + if collNames[i] != collNames[j] { + continue + } + + if !documentIDs[i].Equal(documentIDs[j]) { + continue + } + + dbNames = slices.Delete(dbNames, i, 1+i) + collNames = slices.Delete(collNames, i, 1+i) + documentIDs = slices.Delete(documentIDs, i, 1+i) + dataSizes = slices.Delete(dataSizes, i, 1+i) + break + } + } + */ + dedupeMap := map[string]map[string]map[string]int{} uniqueElems := 0 diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index e6b5ff81..eabc8c1d 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -253,7 +253,7 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() { verifier.SetNumWorkers(10) ctx := suite.Context() - docsCount := 20_000_000 + docsCount := 12_000_000 suite.T().Logf("Inserting %d rechecks …", docsCount)