Skip to content

Conversation

@imor
Copy link
Contributor

@imor imor commented Jul 9, 2025

This PR:

  • Fixes a bug in which table states were updated before we got an ack back from the destination. This could have lead to data corruption if the process crashed after updating the table state and before sending the batch.
  • No longer batches the ReplicationMessage<LogicalReplicationMessage> messages into a vec, which were later copied to another vec. This reduces number of allocations. Allocations can be further reduced if we send a slice to the destination instead of a vec, but that's not yet handled.

I also wanted to create a stream which encapsulates the whole message filtering logic and produced LogicalReplicationMessages and then batch the output of this stream, but this became complicated due to stream calling async methods. This will be done in a separate PR later on.

@imor imor force-pushed the raminder/etl-148-process_syncing_tables-in-apply-loop-should-be-called-on branch from 2f20960 to 3c12160 Compare July 10, 2025 11:59
@imor imor changed the title Raminder/etl 148 process syncing tables in apply loop should be called on Improve message sending behaviour Jul 11, 2025
@imor imor changed the title Improve message sending behaviour fix: potential data corruption bug and improve message sending behaviour Jul 11, 2025
@imor imor marked this pull request as ready for review July 11, 2025 11:16
@imor imor requested a review from a team as a code owner July 11, 2025 11:16
Copy link
Contributor

@iambriccardo iambriccardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments

Comment on lines +445 to +447
end_loop |= !hook
.process_syncing_tables(state.next_status_update.flush_lsn, true)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the process syncing here, the system becomes much harder to reason about, but I am failing to see any alternative that would work well with batching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it has become harder to see the logic, but as you said I can't think of a real alternative either.

Comment on lines +419 to +420
if let Some(table_id) = skip_table {
end_loop |= !hook.skip_table(table_id).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be outside of the sending of the batch? As far as I know we need to skip the table as soon as we get the event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called as soon as the relation event indicates a change in schema because the end_batch.is_some() part in the if time_to_send_batch || state.events_batch.len() >= max_batch_size || end_batch.is_some() condition will be true in that case (end_batch will be set to EndBatch::Exclusive).

This again is important to call only after the batch is acked because if we set the table state to skipped before that we run the risk of missing events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, nvm, I missed the last condition.

@imor imor requested a review from iambriccardo July 11, 2025 13:37
@imor imor merged commit cd8b282 into main Jul 11, 2025
3 checks passed
@imor imor deleted the raminder/etl-148-process_syncing_tables-in-apply-loop-should-be-called-on branch July 11, 2025 14:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants