Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 151 additions & 5 deletions config/src/shared/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,112 @@ use tokio_postgres::{Config as TokioPgConnectOptions, config::SslMode as TokioPg
use crate::SerializableSecretString;
use crate::shared::ValidationError;

/// PostgreSQL connection options for customizing server behavior.
///
/// These options are passed to PostgreSQL during connection establishment to configure
/// session-specific settings that affect how the server processes queries and data.
#[derive(Debug, Clone)]
pub struct PgConnectionOptions {
/// Sets the display format for date values.
pub datestyle: String,
/// Sets the display format for interval values.
pub intervalstyle: String,
/// Controls the number of digits displayed for floating-point values.
pub extra_float_digits: i32,
/// Sets the client-side character set encoding.
pub client_encoding: String,
/// Sets the time zone for displaying and interpreting time stamps.
pub timezone: String,
/// Aborts any statement that takes more than the specified number of milliseconds.
pub statement_timeout: u32,
/// Aborts any statement that waits longer than the specified milliseconds to acquire a lock.
pub lock_timeout: u32,
/// Terminates any session that has been idle within a transaction for longer than the specified milliseconds.
pub idle_in_transaction_session_timeout: u32,
/// Sets the application name to be reported in statistics views and logs for connection identification.
pub application_name: String,
}

impl Default for PgConnectionOptions {
/// Returns default configuration values optimized for ETL/replication workloads.
///
/// These defaults ensure consistent behavior across different PostgreSQL installations
/// and are specifically tuned for ETL systems that perform logical replication and
/// large data operations:
///
/// - `datestyle = "ISO"`: Provides consistent date formatting for reliable parsing
/// - `intervalstyle = "postgres"`: Uses standard PostgreSQL interval format
/// - `extra_float_digits = 3`: Ensures sufficient precision for numeric replication
/// - `client_encoding = "UTF8"`: Supports international character sets
/// - `timezone = "UTC"`: Eliminates timezone ambiguity in distributed ETL systems
/// - `statement_timeout = 0`: Disables the timeout, which allows large COPY operations to continue without being interrupted
/// - `lock_timeout = 30000` (30 seconds): Prevents indefinite blocking on table locks during replication
/// - `idle_in_transaction_session_timeout = 300000` (5 minutes): Cleans up abandoned transactions that could block VACUUM
/// - `application_name = "etl"`: Enables easy identification in monitoring and pg_stat_activity
fn default() -> Self {
Self {
datestyle: "ISO".to_string(),
intervalstyle: "postgres".to_string(),
extra_float_digits: 3,
client_encoding: "UTF8".to_string(),
timezone: "UTC".to_string(),
statement_timeout: 0,
lock_timeout: 30_000, // 30 seconds in milliseconds
idle_in_transaction_session_timeout: 300_000, // 5 minutes in milliseconds
application_name: "etl".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Postgres to Postgres replication use for the timeout settings? Ok to use etl as application name for now, but we should later on use separate names for table sync and apply workers, probably same or similar as slot names.

Copy link
Contributor Author

@iambriccardo iambriccardo Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, I was considering using different names but yeah, let's do in it in a follow up, since it might add a lot of lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the code and online, it seems like that the timeouts are set on a per subscription basis where the subscription contains the connection string. This means that no timeout is used by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should enable users to configure it in the future and keep it disabled for now, this way we can avoid possible problems related to timeouts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's disable statement_timeout and can keep the other timeouts at their current values in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By disable I meant that statement_timeout is set to 0. If we omit it it will take the default value set for the role we use to connect which may or may not be 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

}
}
}

impl PgConnectionOptions {
/// Returns the options as a string suitable for tokio-postgres options parameter.
///
/// Returns a space-separated list of `-c key=value` pairs.
pub fn to_options_string(&self) -> String {
format!(
"-c datestyle={} -c intervalstyle={} -c extra_float_digits={} -c client_encoding={} -c timezone={} -c statement_timeout={} -c lock_timeout={} -c idle_in_transaction_session_timeout={} -c application_name={}",
self.datestyle,
self.intervalstyle,
self.extra_float_digits,
self.client_encoding,
self.timezone,
self.statement_timeout,
self.lock_timeout,
self.idle_in_transaction_session_timeout,
self.application_name
)
}

/// Returns the options as key-value pairs suitable for sqlx.
///
/// Returns a vector of (key, value) tuples.
pub fn to_key_value_pairs(&self) -> Vec<(String, String)> {
vec![
("datestyle".to_string(), self.datestyle.clone()),
("intervalstyle".to_string(), self.intervalstyle.clone()),
(
"extra_float_digits".to_string(),
self.extra_float_digits.to_string(),
),
("client_encoding".to_string(), self.client_encoding.clone()),
("timezone".to_string(), self.timezone.clone()),
(
"statement_timeout".to_string(),
self.statement_timeout.to_string(),
),
("lock_timeout".to_string(), self.lock_timeout.to_string()),
(
"idle_in_transaction_session_timeout".to_string(),
self.idle_in_transaction_session_timeout.to_string(),
),
(
"application_name".to_string(),
self.application_name.clone(),
),
]
}
}

/// Configuration for connecting to a Postgres database.
///
/// This struct holds all necessary connection parameters and settings.
Expand Down Expand Up @@ -80,18 +186,20 @@ impl IntoConnectOptions<SqlxConnectOptions> for PgConnectionConfig {
} else {
SqlxSslMode::Prefer
};
let options = SqlxConnectOptions::new_without_pgpass()
let default_pg_options = PgConnectionOptions::default();
let mut options = SqlxConnectOptions::new_without_pgpass()
.host(&self.host)
.username(&self.username)
.port(self.port)
.ssl_mode(ssl_mode)
.ssl_root_cert_from_pem(self.tls.trusted_root_certs.clone().into_bytes());
.ssl_root_cert_from_pem(self.tls.trusted_root_certs.clone().into_bytes())
.options(default_pg_options.to_key_value_pairs());

if let Some(password) = &self.password {
options.password(password.expose_secret())
} else {
options
options = options.password(password.expose_secret());
}

options
}

fn with_db(&self) -> SqlxConnectOptions {
Expand All @@ -107,11 +215,13 @@ impl IntoConnectOptions<TokioPgConnectOptions> for PgConnectionConfig {
} else {
TokioPgSslMode::Prefer
};
let default_pg_options = PgConnectionOptions::default();
let mut config = TokioPgConnectOptions::new();
config
.host(self.host.clone())
.port(self.port)
.user(self.username.clone())
.options(default_pg_options.to_options_string())
//
// We set only ssl_mode from the tls config here and not trusted_root_certs
// because we are using rustls for tls connections and rust_postgres
Expand All @@ -136,3 +246,39 @@ impl IntoConnectOptions<TokioPgConnectOptions> for PgConnectionConfig {
options
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_options_string_format() {
let options = PgConnectionOptions::default();
let options_string = options.to_options_string();

assert_eq!(
options_string,
"-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3 -c client_encoding=UTF8 -c timezone=UTC -c statement_timeout=0 -c lock_timeout=30000 -c idle_in_transaction_session_timeout=300000 -c application_name=etl"
);
}

#[test]
fn test_key_value_pairs() {
let options = PgConnectionOptions::default();
let pairs = options.to_key_value_pairs();

assert_eq!(pairs.len(), 9);
assert!(pairs.contains(&("datestyle".to_string(), "ISO".to_string())));
assert!(pairs.contains(&("intervalstyle".to_string(), "postgres".to_string())));
assert!(pairs.contains(&("extra_float_digits".to_string(), "3".to_string())));
assert!(pairs.contains(&("client_encoding".to_string(), "UTF8".to_string())));
assert!(pairs.contains(&("timezone".to_string(), "UTC".to_string())));
assert!(pairs.contains(&("statement_timeout".to_string(), "0".to_string())));
assert!(pairs.contains(&("lock_timeout".to_string(), "30000".to_string())));
assert!(pairs.contains(&(
"idle_in_transaction_session_timeout".to_string(),
"300000".to_string()
)));
assert!(pairs.contains(&("application_name".to_string(), "etl".to_string())));
}
}