[Incremental Reprocessing] [MongoDB] stream while snapshotting#641
Conversation
🦋 Changeset detectedLatest commit: 32a5292 The changes in this PR will be included in the next version bump. This PR includes changesets to release 11 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@Rentacookie Could you check the CDCStream changes here please? See PR description for background on that. |
fd14b43 to
dfdc4a7
Compare
Rentacookie
left a comment
There was a problem hiding this comment.
This looks good to me. I like that most of the snapshotting logic has been split out of the ChangeStream, it makes it much easier to reason about.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: dfff28e672
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
stevensJourney
left a comment
There was a problem hiding this comment.
This looks good to me also. I really like the addition of hooks to the storage.
This is a re-implementation of #450.
This refactors MongoDB replication to:
If a table needs a re-snapshot (e.g. due to replica identity changes), this also happens concurrently with streaming (although it still blocks the next commit until it completed). Truncating tables still block the replication stream.
The change
Currently, the replication process is effectively linear / "single threaded". When new sync config is deployed, we create a new replication stream, which performs a snapshot on each table, then starts streaming. This has a couple of limitations:
The changes here are also part of the bigger project to implement differential sync config updates - only re-replicating for changed bucket definitions / sync stream definitions. Part of that requires switching to a single replication stream for all copies of sync config, and this builds the base to implement that.
Implementation notes
There is a specific rare but important edge case when we start doing snapshots while streaming: If we don't use soft deletes, as introduced in storage version 3 in #425, then we can miss deletes that were made during the initial snapshot. Due to this, the concurrent snapshots are disabled on the older storage versions. This also adds a test for that specific case. Due to the very specific timing required to reproduce the issue, this adds some storage hooks that allows executing logic before and after a flush. We can investigate using these same or similar hooks to simplify some other tests later, that currently rely on timing only.
The main implementation changes are to move snapshotting to a separate MongoSnapshotter class, using a separate "queue" that can operate concurrently with streaming replication. This does not yet allow using separate processes for snapshot versus streaming yet, or using multiple concurrent snapshotters. Support for those can be added in the future, but requires more careful checks on potential race conditions / consistency issues.
There are additional edge cases around table snapshots: We can now get
markSnapshotDone()at the same time that another snapshot is queued. To handle this, themarkSnapshotDone()now explicitly checks that there are no pending individual table snapshots. The oldmarkAllSnapshotDone()is still kept around for tests.The above has a specific effect on MSSQL: There were cases where a capture instance is not present, where the SourceTable was created but the snapshot never performed, which then failed the above check during
markSnapshotDone(). This is now changed to not persisted the SourceTable at all in that case.AI Usage & Implementation notes
Used Codex gpt-5.5 to assist with the implementation, manually guiding and reviewing the changes. At lot of the changes were ported directly from #450, but with additional changes to keep the old behavior for older storage versions, and to fix more edge cases.