From fc57eb07855429b3934fcc0ce68ff0361bfdb780 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 25 Nov 2025 17:24:15 +0100 Subject: [PATCH 1/6] feat(connection): Improve connection handling --- etl/src/replication/apply.rs | 5 ++++ etl/src/replication/client.rs | 48 ++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 1ddc87898..1f21df78f 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -42,6 +42,7 @@ use crate::{bail, etl_error}; /// If set too high, Postgres may timeout before the next status update is sent. const STATUS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); + /// Result type for the apply loop execution. /// /// [`ApplyLoopResult`] indicates the reason why the apply loop terminated, @@ -652,6 +653,9 @@ where // 2. Allows Postgres to clean up old WAL files based on our progress // 3. Provides a heartbeat mechanism to detect connection issues _ = tokio::time::sleep_until(state.next_status_update_deadline()) => { + // Check if the connection background task has terminated. + replication_client.check_connection_alive()?; + logical_replication_stream .as_mut() .get_inner() @@ -661,6 +665,7 @@ where false ) .await?; + state.mark_status_update_sent(); } } diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 7c8f76887..ce46ae1fc 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::num::NonZeroI32; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio_postgres::error::SqlState; @@ -27,20 +28,26 @@ use tracing::{Instrument, error, info, warn}; /// Spawns a background task to monitor a Postgres connection until it terminates. /// /// The task will log when the connection terminates, either successfully or with an error. -fn spawn_postgres_connection(connection: Connection) +/// When the connection terminates for any reason, the `connection_alive` flag is set to `false`. +fn spawn_postgres_connection( + connection: Connection, + connection_alive: Arc, +) where T: MakeTlsConnect, T::Stream: Send + 'static, { - // TODO: maybe return a handle for this task to keep track of it. let span = tracing::Span::current(); let task = async move { - if let Err(e) = connection.await { - error!("an error occurred during the Postgres connection: {}", e); - return; - } + let result = connection.await; + + // Mark the connection as dead regardless of success or error. + connection_alive.store(false, Ordering::SeqCst); - info!("postgres connection terminated successfully") + match result { + Err(err) => error!("an error occurred during the postgres connection: {}", err), + Ok(()) => info!("postgres connection terminated successfully"), + } } .instrument(span); @@ -178,6 +185,9 @@ struct PublicationFilter { pub struct PgReplicationClient { client: Arc, server_version: Option, + /// Flag indicating whether the underlying connection is still alive. + /// Set to `false` by the connection background task when the connection terminates. + connection_alive: Arc, } impl PgReplicationClient { @@ -205,13 +215,15 @@ impl PgReplicationClient { .parameter("server_version") .and_then(extract_server_version); - spawn_postgres_connection::(connection); + let connection_alive = Arc::new(AtomicBool::new(true)); + spawn_postgres_connection::(connection, connection_alive.clone()); info!("successfully connected to postgres without tls"); Ok(PgReplicationClient { client: Arc::new(client), server_version, + connection_alive, }) } @@ -242,16 +254,34 @@ impl PgReplicationClient { .parameter("server_version") .and_then(extract_server_version); - spawn_postgres_connection::(connection); + let connection_alive = Arc::new(AtomicBool::new(true)); + spawn_postgres_connection::(connection, connection_alive.clone()); info!("successfully connected to postgres with tls"); Ok(PgReplicationClient { client: Arc::new(client), server_version, + connection_alive, }) } + /// Checks if the underlying connection is still alive. + /// + /// Returns `Ok(())` if the connection is alive, or an error if the connection + /// has terminated. This is useful for detecting connection failures that might + /// not be immediately visible to the replication stream. + pub fn check_connection_alive(&self) -> EtlResult<()> { + if self.connection_alive.load(Ordering::SeqCst) { + Ok(()) + } else { + bail!( + ErrorKind::SourceConnectionFailed, + "PostgreSQL connection has been terminated" + ) + } + } + /// Creates a new logical replication slot with the specified name and a transaction which is set /// on the snapshot exported by the slot creation. pub async fn create_slot_with_transaction( From c9935c9dc570c20cee84ccb971ab372b11f1e76e Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 26 Nov 2025 09:17:43 +0100 Subject: [PATCH 2/6] Improve --- etl/src/replication/apply.rs | 56 +++++++++++++++++++++++------------ etl/src/replication/client.rs | 40 ++++++------------------- 2 files changed, 46 insertions(+), 50 deletions(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 1f21df78f..7fdcb931f 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -1,18 +1,3 @@ -use etl_config::shared::PipelineConfig; -use etl_postgres::replication::worker::WorkerType; -use etl_postgres::types::TableId; -use futures::StreamExt; -use metrics::histogram; -use postgres_replication::protocol; -use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::pin; -use tokio_postgres::types::PgLsn; -use tracing::{debug, info}; - use crate::concurrency::shutdown::ShutdownRx; use crate::concurrency::signal::SignalRx; use crate::concurrency::stream::{TimeoutStream, TimeoutStreamResult}; @@ -35,6 +20,21 @@ use crate::state::table::{RetryPolicy, TableReplicationError}; use crate::store::schema::SchemaStore; use crate::types::{Event, PipelineId}; use crate::{bail, etl_error}; +use etl_config::shared::PipelineConfig; +use etl_postgres::replication::worker::WorkerType; +use etl_postgres::types::TableId; +use futures::StreamExt; +use metrics::histogram; +use postgres_replication::protocol; +use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::pin; +use tokio_postgres::types::PgLsn; +use tracing::log::warn; +use tracing::{debug, info}; /// The minimum interval (in milliseconds) between consecutive status updates. /// @@ -42,7 +42,6 @@ use crate::{bail, etl_error}; /// If set too high, Postgres may timeout before the next status update is sent. const STATUS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); - /// Result type for the apply loop execution. /// /// [`ApplyLoopResult`] indicates the reason why the apply loop terminated, @@ -555,6 +554,18 @@ where // Main event processing loop - continues until shutdown or fatal error loop { + // If we detect that the connection is closed, we return an error to stop the loop. + if replication_client.is_closed() { + warn!( + "shutting down the apply loop because the connection to postgres has been terminated" + ); + + bail!( + ErrorKind::SourceConnectionFailed, + "PostgreSQL connection has been terminated" + ) + } + tokio::select! { // Use biased selection to prioritize shutdown signals over other operations // This ensures graceful shutdown takes precedence over event processing @@ -572,7 +583,7 @@ where // If we are not inside a transaction, we can cleanly stop streaming and return. if !state.handling_transaction() { - info!("shutting down apply worker while waiting for incoming events outside of a transaction"); + info!("shutting down apply loop while waiting for incoming events outside of a transaction"); return Ok(ApplyLoopResult::Paused); } @@ -653,8 +664,15 @@ where // 2. Allows Postgres to clean up old WAL files based on our progress // 3. Provides a heartbeat mechanism to detect connection issues _ = tokio::time::sleep_until(state.next_status_update_deadline()) => { - // Check if the connection background task has terminated. - replication_client.check_connection_alive()?; + // If we detect that the connection is closed, we return an error to stop the loop. + if replication_client.is_closed() { + warn!("shutting down the apply loop because the connection to postgres has been terminated"); + + bail!( + ErrorKind::SourceConnectionFailed, + "PostgreSQL connection has been terminated" + ) + } logical_replication_stream .as_mut() diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index ce46ae1fc..2f2256ced 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -14,8 +14,8 @@ use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::num::NonZeroI32; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; @@ -29,10 +29,7 @@ use tracing::{Instrument, error, info, warn}; /// /// The task will log when the connection terminates, either successfully or with an error. /// When the connection terminates for any reason, the `connection_alive` flag is set to `false`. -fn spawn_postgres_connection( - connection: Connection, - connection_alive: Arc, -) +fn spawn_postgres_connection(connection: Connection) where T: MakeTlsConnect, T::Stream: Send + 'static, @@ -41,9 +38,6 @@ where let task = async move { let result = connection.await; - // Mark the connection as dead regardless of success or error. - connection_alive.store(false, Ordering::SeqCst); - match result { Err(err) => error!("an error occurred during the postgres connection: {}", err), Ok(()) => info!("postgres connection terminated successfully"), @@ -51,6 +45,8 @@ where } .instrument(span); + // There is no need to track the connection task via the `JoinHandle` since the `Client` which + // returned the connection, will automatically terminate the connection when dropped. tokio::spawn(task); } @@ -185,9 +181,6 @@ struct PublicationFilter { pub struct PgReplicationClient { client: Arc, server_version: Option, - /// Flag indicating whether the underlying connection is still alive. - /// Set to `false` by the connection background task when the connection terminates. - connection_alive: Arc, } impl PgReplicationClient { @@ -215,15 +208,13 @@ impl PgReplicationClient { .parameter("server_version") .and_then(extract_server_version); - let connection_alive = Arc::new(AtomicBool::new(true)); - spawn_postgres_connection::(connection, connection_alive.clone()); + spawn_postgres_connection::(connection); info!("successfully connected to postgres without tls"); Ok(PgReplicationClient { client: Arc::new(client), server_version, - connection_alive, }) } @@ -254,32 +245,19 @@ impl PgReplicationClient { .parameter("server_version") .and_then(extract_server_version); - let connection_alive = Arc::new(AtomicBool::new(true)); - spawn_postgres_connection::(connection, connection_alive.clone()); + spawn_postgres_connection::(connection); info!("successfully connected to postgres with tls"); Ok(PgReplicationClient { client: Arc::new(client), server_version, - connection_alive, }) } - /// Checks if the underlying connection is still alive. - /// - /// Returns `Ok(())` if the connection is alive, or an error if the connection - /// has terminated. This is useful for detecting connection failures that might - /// not be immediately visible to the replication stream. - pub fn check_connection_alive(&self) -> EtlResult<()> { - if self.connection_alive.load(Ordering::SeqCst) { - Ok(()) - } else { - bail!( - ErrorKind::SourceConnectionFailed, - "PostgreSQL connection has been terminated" - ) - } + /// Checks if the underlying connection is closed. + pub fn is_closed(&self) -> bool { + self.client.is_closed() } /// Creates a new logical replication slot with the specified name and a transaction which is set From 0f6b846782a590faee5f0bef58116e31f84bd1ed Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 26 Nov 2025 09:34:25 +0100 Subject: [PATCH 3/6] Improve --- etl/src/replication/apply.rs | 43 ++++++++++++++++------------------- etl/src/replication/client.rs | 1 - 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 7fdcb931f..95971b011 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -554,18 +554,6 @@ where // Main event processing loop - continues until shutdown or fatal error loop { - // If we detect that the connection is closed, we return an error to stop the loop. - if replication_client.is_closed() { - warn!( - "shutting down the apply loop because the connection to postgres has been terminated" - ); - - bail!( - ErrorKind::SourceConnectionFailed, - "PostgreSQL connection has been terminated" - ) - } - tokio::select! { // Use biased selection to prioritize shutdown signals over other operations // This ensures graceful shutdown takes precedence over event processing @@ -595,7 +583,26 @@ where // PRIORITY 2: Process incoming replication messages from Postgres. // This is the primary data flow, converts replication protocol messages // into typed events and accumulates them into batches for efficient processing. - Some(message) = logical_replication_stream.next() => { + message = logical_replication_stream.next() => { + let Some(message) = message else { + // The stream returned None, which should never happen for logical replication + // since it runs indefinitely. This indicates either the connection was closed + // or something unexpected occurred. + if replication_client.is_closed() { + warn!("replication stream ended because the postgres connection was closed, the apply loop will terminate"); + bail!( + ErrorKind::SourceConnectionFailed, + "PostgreSQL connection has been closed" + ) + } else { + warn!("replication stream ended unexpectedly without the connection being closed, the apply loop will terminate"); + bail!( + ErrorKind::SourceConnectionFailed, + "Replication stream ended unexpectedly" + ) + } + }; + let action = handle_replication_message_with_timeout( &mut state, logical_replication_stream.as_mut(), @@ -664,16 +671,6 @@ where // 2. Allows Postgres to clean up old WAL files based on our progress // 3. Provides a heartbeat mechanism to detect connection issues _ = tokio::time::sleep_until(state.next_status_update_deadline()) => { - // If we detect that the connection is closed, we return an error to stop the loop. - if replication_client.is_closed() { - warn!("shutting down the apply loop because the connection to postgres has been terminated"); - - bail!( - ErrorKind::SourceConnectionFailed, - "PostgreSQL connection has been terminated" - ) - } - logical_replication_stream .as_mut() .get_inner() diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 2f2256ced..c8ba4714f 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -15,7 +15,6 @@ use std::fmt; use std::io::BufReader; use std::num::NonZeroI32; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; From 3aba5a6415e62e2e6404f0d35c0adbe7f3913b8d Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 26 Nov 2025 09:35:15 +0100 Subject: [PATCH 4/6] Improve --- etl/src/replication/apply.rs | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index 95971b011..fbd4b0983 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -1,3 +1,19 @@ +use etl_config::shared::PipelineConfig; +use etl_postgres::replication::worker::WorkerType; +use etl_postgres::types::TableId; +use futures::StreamExt; +use metrics::histogram; +use postgres_replication::protocol; +use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::pin; +use tokio_postgres::types::PgLsn; +use tracing::log::warn; +use tracing::{debug, info}; + use crate::concurrency::shutdown::ShutdownRx; use crate::concurrency::signal::SignalRx; use crate::concurrency::stream::{TimeoutStream, TimeoutStreamResult}; @@ -20,21 +36,6 @@ use crate::state::table::{RetryPolicy, TableReplicationError}; use crate::store::schema::SchemaStore; use crate::types::{Event, PipelineId}; use crate::{bail, etl_error}; -use etl_config::shared::PipelineConfig; -use etl_postgres::replication::worker::WorkerType; -use etl_postgres::types::TableId; -use futures::StreamExt; -use metrics::histogram; -use postgres_replication::protocol; -use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::pin; -use tokio_postgres::types::PgLsn; -use tracing::log::warn; -use tracing::{debug, info}; /// The minimum interval (in milliseconds) between consecutive status updates. /// From 6fd471c47997447700972f5b90bee7f2a18016c0 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 26 Nov 2025 09:42:43 +0100 Subject: [PATCH 5/6] Improve --- etl/src/replication/apply.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index fbd4b0983..2a085d774 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -591,15 +591,17 @@ where // or something unexpected occurred. if replication_client.is_closed() { warn!("replication stream ended because the postgres connection was closed, the apply loop will terminate"); + bail!( ErrorKind::SourceConnectionFailed, - "PostgreSQL connection has been closed" + "PostgreSQL connection has been closed during the apply loop" ) } else { warn!("replication stream ended unexpectedly without the connection being closed, the apply loop will terminate"); + bail!( ErrorKind::SourceConnectionFailed, - "Replication stream ended unexpectedly" + "Replication stream ended unexpectedly during the apply loop" ) } }; From 70926b8e671c9477aae6f728d052594e65cfa312 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 26 Nov 2025 09:50:26 +0100 Subject: [PATCH 6/6] Improve --- etl/src/replication/client.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index c8ba4714f..b291f6bbb 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -25,9 +25,6 @@ use tokio_postgres::{ use tracing::{Instrument, error, info, warn}; /// Spawns a background task to monitor a Postgres connection until it terminates. -/// -/// The task will log when the connection terminates, either successfully or with an error. -/// When the connection terminates for any reason, the `connection_alive` flag is set to `false`. fn spawn_postgres_connection(connection: Connection) where T: MakeTlsConnect, @@ -44,7 +41,7 @@ where } .instrument(span); - // There is no need to track the connection task via the `JoinHandle` since the `Client` which + // There is no need to track the connection task via the `JoinHandle` since the `Client`, which // returned the connection, will automatically terminate the connection when dropped. tokio::spawn(task); }