diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 1ddc8789..2a085d77 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::pin; use tokio_postgres::types::PgLsn; +use tracing::log::warn; use tracing::{debug, info}; use crate::concurrency::shutdown::ShutdownRx; @@ -571,7 +572,7 @@ where // If we are not inside a transaction, we can cleanly stop streaming and return. if !state.handling_transaction() { - info!("shutting down apply worker while waiting for incoming events outside of a transaction"); + info!("shutting down apply loop while waiting for incoming events outside of a transaction"); return Ok(ApplyLoopResult::Paused); } @@ -583,7 +584,28 @@ where // PRIORITY 2: Process incoming replication messages from Postgres. // This is the primary data flow, converts replication protocol messages // into typed events and accumulates them into batches for efficient processing. - Some(message) = logical_replication_stream.next() => { + message = logical_replication_stream.next() => { + let Some(message) = message else { + // The stream returned None, which should never happen for logical replication + // since it runs indefinitely. This indicates either the connection was closed + // or something unexpected occurred. + if replication_client.is_closed() { + warn!("replication stream ended because the postgres connection was closed, the apply loop will terminate"); + + bail!( + ErrorKind::SourceConnectionFailed, + "PostgreSQL connection has been closed during the apply loop" + ) + } else { + warn!("replication stream ended unexpectedly without the connection being closed, the apply loop will terminate"); + + bail!( + ErrorKind::SourceConnectionFailed, + "Replication stream ended unexpectedly during the apply loop" + ) + } + }; + let action = handle_replication_message_with_timeout( &mut state, logical_replication_stream.as_mut(), @@ -661,6 +683,7 @@ where false ) .await?; + state.mark_status_update_sent(); } } diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 7c8f7688..b291f6bb 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -25,25 +25,24 @@ use tokio_postgres::{ use tracing::{Instrument, error, info, warn}; /// Spawns a background task to monitor a Postgres connection until it terminates. -/// -/// The task will log when the connection terminates, either successfully or with an error. fn spawn_postgres_connection(connection: Connection) where T: MakeTlsConnect, T::Stream: Send + 'static, { - // TODO: maybe return a handle for this task to keep track of it. let span = tracing::Span::current(); let task = async move { - if let Err(e) = connection.await { - error!("an error occurred during the Postgres connection: {}", e); - return; - } + let result = connection.await; - info!("postgres connection terminated successfully") + match result { + Err(err) => error!("an error occurred during the postgres connection: {}", err), + Ok(()) => info!("postgres connection terminated successfully"), + } } .instrument(span); + // There is no need to track the connection task via the `JoinHandle` since the `Client`, which + // returned the connection, will automatically terminate the connection when dropped. tokio::spawn(task); } @@ -252,6 +251,11 @@ impl PgReplicationClient { }) } + /// Checks if the underlying connection is closed. + pub fn is_closed(&self) -> bool { + self.client.is_closed() + } + /// Creates a new logical replication slot with the specified name and a transaction which is set /// on the snapshot exported by the slot creation. pub async fn create_slot_with_transaction(