Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions etl/src/replication/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::pin;
use tokio_postgres::types::PgLsn;
use tracing::log::warn;
use tracing::{debug, info};

use crate::concurrency::shutdown::ShutdownRx;
Expand Down Expand Up @@ -571,7 +572,7 @@ where

// If we are not inside a transaction, we can cleanly stop streaming and return.
if !state.handling_transaction() {
info!("shutting down apply worker while waiting for incoming events outside of a transaction");
info!("shutting down apply loop while waiting for incoming events outside of a transaction");
return Ok(ApplyLoopResult::Paused);
}

Expand All @@ -583,7 +584,28 @@ where
// PRIORITY 2: Process incoming replication messages from Postgres.
// This is the primary data flow, converts replication protocol messages
// into typed events and accumulates them into batches for efficient processing.
Some(message) = logical_replication_stream.next() => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an error from our first implementation. After digging into the code, when the channel is closed, the stream returns None instead of erroring, thus this caused the pipeline to stall in some cases.

message = logical_replication_stream.next() => {
let Some(message) = message else {
// The stream returned None, which should never happen for logical replication
// since it runs indefinitely. This indicates either the connection was closed
// or something unexpected occurred.
if replication_client.is_closed() {
warn!("replication stream ended because the postgres connection was closed, the apply loop will terminate");

bail!(
ErrorKind::SourceConnectionFailed,
"PostgreSQL connection has been closed during the apply loop"
)
} else {
warn!("replication stream ended unexpectedly without the connection being closed, the apply loop will terminate");

bail!(
ErrorKind::SourceConnectionFailed,
"Replication stream ended unexpectedly during the apply loop"
)
}
};

let action = handle_replication_message_with_timeout(
&mut state,
logical_replication_stream.as_mut(),
Expand Down Expand Up @@ -661,6 +683,7 @@ where
false
)
.await?;

state.mark_status_update_sent();
}
}
Expand Down
20 changes: 12 additions & 8 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@ use tokio_postgres::{
use tracing::{Instrument, error, info, warn};

/// Spawns a background task to monitor a Postgres connection until it terminates.
///
/// The task will log when the connection terminates, either successfully or with an error.
fn spawn_postgres_connection<T>(connection: Connection<Socket, T::Stream>)
where
T: MakeTlsConnect<Socket>,
T::Stream: Send + 'static,
{
// TODO: maybe return a handle for this task to keep track of it.
let span = tracing::Span::current();
let task = async move {
if let Err(e) = connection.await {
error!("an error occurred during the Postgres connection: {}", e);
return;
}
let result = connection.await;

info!("postgres connection terminated successfully")
match result {
Err(err) => error!("an error occurred during the postgres connection: {}", err),
Ok(()) => info!("postgres connection terminated successfully"),
}
}
.instrument(span);

// There is no need to track the connection task via the `JoinHandle` since the `Client`, which
// returned the connection, will automatically terminate the connection when dropped.
tokio::spawn(task);
}

Expand Down Expand Up @@ -252,6 +251,11 @@ impl PgReplicationClient {
})
}

/// Checks if the underlying connection is closed.
pub fn is_closed(&self) -> bool {
self.client.is_closed()
}

/// Creates a new logical replication slot with the specified name and a transaction which is set
/// on the snapshot exported by the slot creation.
pub async fn create_slot_with_transaction(
Expand Down