Skip to content

Conversation

@iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Nov 25, 2025

This PR fixes a connection handling issue where we did not handle the None case from the replication stream. This caused the pipeline to stall when the connection was not gracefully closed since the library returns None when the connection is closed:

impl Stream for Responses {
    type Item = Result<Message, Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match ready!((*self).poll_next(cx)) {
            Err(err) if err.is_closed() => Poll::Ready(None),
            msg => Poll::Ready(Some(msg)),
        }
    }
}

This way now the system handles both the case when Postgres sends the message via the connection (e.g., an error) and when the connection is actually closed.

All of this was replicated locally:

  • pg_terminate_backend for the termination of the stream via the error (was already handled).
  • docker kill for the abrupt termination which was just closing the connection (was not handled before and now it is).

@coveralls
Copy link

coveralls commented Nov 25, 2025

Pull Request Test Coverage Report for Build 19697831375

Details

  • 6 of 17 (35.29%) changed or added relevant lines in 2 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-0.04%) to 82.356%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl/src/replication/client.rs 3 7 42.86%
etl/src/replication/apply.rs 3 10 30.0%
Totals Coverage Status
Change from base Build 19671716937: -0.04%
Covered Lines: 16421
Relevant Lines: 19939

💛 - Coveralls

@coveralls
Copy link

Pull Request Test Coverage Report for Build 19676588258

Details

  • 16 of 22 (72.73%) changed or added relevant lines in 2 files are covered.
  • 4 unchanged lines in 1 file lost coverage.
  • Overall coverage decreased (-0.003%) to 82.395%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl/src/replication/client.rs 15 21 71.43%
Files with Coverage Reduction New Missed Lines %
etl/src/concurrency/stream.rs 4 82.22%
Totals Coverage Status
Change from base Build 19671716937: -0.003%
Covered Lines: 16432
Relevant Lines: 19943

💛 - Coveralls

// PRIORITY 2: Process incoming replication messages from Postgres.
// This is the primary data flow, converts replication protocol messages
// into typed events and accumulates them into batches for efficient processing.
Some(message) = logical_replication_stream.next() => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an error from our first implementation. After digging into the code, when the channel is closed, the stream returns None instead of erroring, thus this caused the pipeline to stall in some cases.

@iambriccardo iambriccardo marked this pull request as ready for review November 26, 2025 08:47
@iambriccardo iambriccardo requested a review from a team as a code owner November 26, 2025 08:47
@iambriccardo iambriccardo merged commit de2b738 into main Nov 26, 2025
11 checks passed
@iambriccardo iambriccardo deleted the fix-connection branch November 26, 2025 08:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants