Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,23 @@ func (verifier *Verifier) insertRecheckDocs(
verifier.mux.RLock()
defer verifier.mux.RUnlock()

dbNames, collNames, rawDocIDs, dataSizes := deduplicateRechecks(
start := time.Now()
dbNames, collNames, documentIDs, dataSizes = deduplicateRechecks(
dbNames,
collNames,
documentIDs,
dataSizes,
)
fmt.Printf("----- deduplicate time: %s\n", time.Since(start))

generation, _ := verifier.getGenerationWhileLocked()

eg, groupCtx := contextplus.ErrGroup(ctx)

// MongoDB’s Go driver starts failing requests if we try to exceed
// its connection pool’s size. To avoid that, we limit our concurrency.
eg.SetLimit(100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting the limit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the driver fails requests with errors about not being able to get a thread within 30s or so. This way we throttle the requests to avoid “upsetting” the driver.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Would appreciate this information to be added to code comment.


genCollection := verifier.getRecheckQueueCollection(generation)

sendRechecks := func(rechecks []bson.Raw) {
Expand Down Expand Up @@ -175,7 +181,7 @@ func (verifier *Verifier) insertRecheckDocs(
PrimaryKey: RecheckPrimaryKey{
SrcDatabaseName: dbName,
SrcCollectionName: collNames[i],
DocumentID: rawDocIDs[i],
DocumentID: documentIDs[i],
Rand: rand.Int32(),
},
DataSize: dataSizes[i],
Expand Down Expand Up @@ -225,6 +231,31 @@ func deduplicateRechecks(
documentIDs []bson.RawValue,
dataSizes []int,
) ([]string, []string, []bson.RawValue, []int) {

/*
for i := len(dbNames) - 1; i >= 0; i-- {
for j := i - 1; j >= 0; j-- {
if dbNames[i] != dbNames[j] {
continue
}

if collNames[i] != collNames[j] {
continue
}

if !documentIDs[i].Equal(documentIDs[j]) {
continue
}

dbNames = slices.Delete(dbNames, i, 1+i)
collNames = slices.Delete(collNames, i, 1+i)
documentIDs = slices.Delete(documentIDs, i, 1+i)
dataSizes = slices.Delete(dataSizes, i, 1+i)
break
}
}
*/

dedupeMap := map[string]map[string]map[string]int{}

uniqueElems := 0
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() {
verifier.SetNumWorkers(10)
ctx := suite.Context()

docsCount := 20_000_000
docsCount := 12_000_000

suite.T().Logf("Inserting %d rechecks …", docsCount)

Expand Down