Skip to content

Conversation

@iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Dec 9, 2025

This PR adds a validation step which makes sure that the table states are sound when the pipeline starts. This way we avoid inconsistent states and fail fast in case we can't guarantee consistency in the pipeline.

@iambriccardo iambriccardo marked this pull request as ready for review December 9, 2025 09:10
@iambriccardo iambriccardo requested a review from a team as a code owner December 9, 2025 09:10
@coveralls
Copy link

coveralls commented Dec 9, 2025

Pull Request Test Coverage Report for Build 20058228542

Details

  • 14 of 28 (50.0%) changed or added relevant lines in 1 file are covered.
  • 98 unchanged lines in 8 files lost coverage.
  • Overall coverage decreased (-0.03%) to 81.939%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl/src/workers/apply.rs 14 28 50.0%
Files with Coverage Reduction New Missed Lines %
etl/src/pipeline.rs 2 81.56%
etl/src/types/event.rs 2 37.84%
etl/src/workers/table_sync.rs 2 77.41%
etl-postgres/src/tokio/test_utils.rs 6 85.96%
etl/src/test_utils/event.rs 8 80.77%
etl/src/test_utils/notify.rs 8 87.1%
etl/src/test_utils/test_destination_wrapper.rs 14 73.61%
etl/src/replication/client.rs 56 79.57%
Totals Coverage Status
Change from base Build 19957625863: -0.03%
Covered Lines: 16555
Relevant Lines: 20204

💛 - Coveralls

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 196 to 203
let slot_name: String = EtlReplicationSlot::for_apply_worker(pipeline_id).try_into()?;

// TODO: validate that we only create the slot when we first start replication which
// means when all tables are in the Init state. In any other case we should raise an
// error because that means the apply slot was deleted and creating a fresh slot now
// could cause inconsistent data to be read.
// Addendum: this might be hard to detect in all cases. E.g. what if the apply worker
// starts bunch of table sync workers and before creating a slot the process crashes?
// In this case, the apply worker slot is missing not because someone deleted it but
// because it was never created in the first place. The answer here might be to create
// the apply worker slot as the first thing, before starting table sync workers.
let slot = replication_client.get_or_create_slot(&slot_name).await?;

// When creating a new apply worker slot, all tables must be in the `Init` state. If any table
// is not in Init state, it means the table was synchronized based on another apply worker
// lineage (different slot) which will break correctness.
if let GetOrCreateSlotResult::CreateSlot(_) = &slot {
let table_states = store.get_table_replication_states().await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Validate table states before creating apply slot

The new validation runs after get_or_create_slot has already created the apply worker slot. When the slot is missing but any table is not in Init state, this path creates the slot and then bail!s, leaving the freshly created slot behind. On the next restart the code will take the GetSlot branch and skip the validation entirely, allowing replication to start with tables in non-Init states—the inconsistent lineage this check was meant to block—and leaving an orphan slot in Postgres. The check needs to happen before slot creation or the slot must be cleaned up on error.

Useful? React with 👍 / 👎.

// is not in Init state, it means the table was synchronized based on another apply worker
// lineage (different slot) which will break correctness.
if let GetOrCreateSlotResult::CreateSlot(_) = &slot {
let table_states = store.get_table_replication_states().await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Severity: HIGH

TOCTOU Race Condition: Validation only runs for newly created slots (line 205 checks CreateSlot variant). If the process crashes after slot creation but before validation cleanup (line 216), the slot persists. On restart, get_or_create_slot returns GetSlot variant, completely bypassing this validation block. This allows inconsistent replication state. The comment on line 216 explicitly warns about this issue. Fix: Validate table states BEFORE calling get_or_create_slot(), or validate for BOTH CreateSlot and GetSlot cases.
Helpful? Add 👍 / 👎

💡 Fix Suggestion

Suggestion: To fix the TOCTOU race condition, validate table states BEFORE calling get_or_create_slot() on line 200. Move the table state validation logic (lines 206-235) to occur before line 200, and fail fast if any tables are not in Init state. This ensures validation happens regardless of whether the slot already exists or is newly created. Alternatively, remove the 'if let CreateSlot' conditional check on line 205 and validate for both CreateSlot AND GetSlot cases, but note that this would still require deleting the slot if validation fails for GetSlot, which may have other implications.

@iambriccardo
Copy link
Contributor Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +205 to +206
if let GetOrCreateSlotResult::CreateSlot(_) = &slot {
let table_states = store.get_table_replication_states().await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clean up slot on validation failure

When get_or_create_slot returns CreateSlot, the code immediately calls get_table_replication_states() and will return the error if that fetch fails, but it never deletes the freshly created apply slot in that failure path. On restart the slot will already exist, so the Init-state validation is skipped and replication can proceed on tables that were already past Init, undermining the new consistency check. Consider dropping the slot whenever the validation step cannot complete so that the check always reruns.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants