Skip to content

Support keepalive interval and retries. #944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ use tokio_postgres::{Error, Socket};
/// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
Expand Down Expand Up @@ -279,6 +283,33 @@ impl Config {
self.config.get_keepalives_idle()
}

/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive struct’s keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.config.keepalives_interval(keepalives_interval);
self
}

/// Gets the time interval between TCP keepalive probes.
pub fn get_keepalives_interval(&self) -> Option<Duration> {
self.config.get_keepalives_interval()
}

/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.config.keepalives_retries(keepalives_retries);
self
}

/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
pub fn get_keepalives_retries(&self) -> Option<u32> {
self.config.get_keepalives_retries()
}

/// Sets the requirements of the session.
///
/// This can be used to connect to the primary server in a clustered database rather than one of the read-only
Expand Down
3 changes: 1 addition & 2 deletions tokio-postgres/src/cancel_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ where
&config.host,
config.port,
config.connect_timeout,
config.keepalives,
config.keepalives_idle,
config.keepalive.as_ref(),
)
.await?;

Expand Down
5 changes: 3 additions & 2 deletions tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::config::Host;
use crate::config::SslMode;
use crate::connection::{Request, RequestMessages};
use crate::copy_out::CopyOutStream;
#[cfg(feature = "runtime")]
use crate::keepalive::KeepaliveConfig;
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -154,8 +156,7 @@ pub(crate) struct SocketConfig {
pub host: Host,
pub port: u16,
pub connect_timeout: Option<Duration>,
pub keepalives: bool,
pub keepalives_idle: Duration,
pub keepalive: Option<KeepaliveConfig>,
}

/// An asynchronous PostgreSQL client.
Expand Down
63 changes: 58 additions & 5 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#[cfg(feature = "runtime")]
use crate::connect::connect;
use crate::connect_raw::connect_raw;
use crate::keepalive::KeepaliveConfig;
#[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect;
Expand Down Expand Up @@ -99,6 +100,10 @@ pub enum Host {
/// This option is ignored when connecting with Unix sockets. Defaults to on.
/// * `keepalives_idle` - The number of seconds of inactivity after which a keepalive message is sent to the server.
/// This option is ignored when connecting with Unix sockets. Defaults to 2 hours.
/// * `keepalives_interval` - The time interval between TCP keepalive probes.
/// This option is ignored when connecting with Unix sockets.
/// * `keepalives_retries` - The maximum number of TCP keepalive probes that will be sent before dropping a connection.
/// This option is ignored when connecting with Unix sockets.
/// * `target_session_attrs` - Specifies requirements of the session. If set to `read-write`, the client will check that
/// the `transaction_read_write` session parameter is set to `on`. This can be used to connect to the primary server
/// in a database cluster as opposed to the secondary read-only mirrors. Defaults to `all`.
Expand Down Expand Up @@ -156,7 +161,7 @@ pub struct Config {
pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) keepalives: bool,
pub(crate) keepalives_idle: Duration,
pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
}
Expand All @@ -170,6 +175,11 @@ impl Default for Config {
impl Config {
/// Creates a new configuration.
pub fn new() -> Config {
let keepalive_config = KeepaliveConfig {
idle: Duration::from_secs(2 * 60 * 60),
interval: None,
retries: None,
};
Config {
user: None,
password: None,
Expand All @@ -181,7 +191,7 @@ impl Config {
port: vec![],
connect_timeout: None,
keepalives: true,
keepalives_idle: Duration::from_secs(2 * 60 * 60),
keepalive_config,
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
}
Expand Down Expand Up @@ -347,14 +357,41 @@ impl Config {
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled. Defaults to 2 hours.
pub fn keepalives_idle(&mut self, keepalives_idle: Duration) -> &mut Config {
self.keepalives_idle = keepalives_idle;
self.keepalive_config.idle = keepalives_idle;
self
}

/// Gets the configured amount of idle time before a keepalive packet will
/// be sent on the connection.
pub fn get_keepalives_idle(&self) -> Duration {
self.keepalives_idle
self.keepalive_config.idle
}

/// Sets the time interval between TCP keepalive probes.
/// On Windows, this sets the value of the tcp_keepalive struct’s keepaliveinterval field.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_interval(&mut self, keepalives_interval: Duration) -> &mut Config {
self.keepalive_config.interval = Some(keepalives_interval);
self
}

/// Gets the time interval between TCP keepalive probes.
pub fn get_keepalives_interval(&self) -> Option<Duration> {
self.keepalive_config.interval
}

/// Sets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
///
/// This is ignored for Unix domain sockets, or if the `keepalives` option is disabled.
pub fn keepalives_retries(&mut self, keepalives_retries: u32) -> &mut Config {
self.keepalive_config.retries = Some(keepalives_retries);
self
}

/// Gets the maximum number of TCP keepalive probes that will be sent before dropping a connection.
pub fn get_keepalives_retries(&self) -> Option<u32> {
self.keepalive_config.retries
}

/// Sets the requirements of the session.
Expand Down Expand Up @@ -451,6 +488,20 @@ impl Config {
self.keepalives_idle(Duration::from_secs(keepalives_idle as u64));
}
}
"keepalives_interval" => {
let keepalives_interval = value.parse::<i64>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_interval")))
})?;
if keepalives_interval > 0 {
self.keepalives_interval(Duration::from_secs(keepalives_interval as u64));
}
}
"keepalives_retries" => {
let keepalives_retries = value.parse::<u32>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("keepalives_retries")))
})?;
self.keepalives_retries(keepalives_retries);
}
"target_session_attrs" => {
let target_session_attrs = match value {
"any" => TargetSessionAttrs::Any,
Expand Down Expand Up @@ -545,7 +596,9 @@ impl fmt::Debug for Config {
.field("port", &self.port)
.field("connect_timeout", &self.connect_timeout)
.field("keepalives", &self.keepalives)
.field("keepalives_idle", &self.keepalives_idle)
.field("keepalives_idle", &self.keepalive_config.idle)
.field("keepalives_interval", &self.keepalive_config.interval)
.field("keepalives_retries", &self.keepalive_config.retries)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.finish()
Expand Down
14 changes: 10 additions & 4 deletions tokio-postgres/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ where
host,
port,
config.connect_timeout,
config.keepalives,
config.keepalives_idle,
if config.keepalives {
Some(&config.keepalive_config)
} else {
None
},
)
.await?;
let (mut client, mut connection) = connect_raw(socket, tls, config).await?;
Expand Down Expand Up @@ -115,8 +118,11 @@ where
host: host.clone(),
port,
connect_timeout: config.connect_timeout,
keepalives: config.keepalives,
keepalives_idle: config.keepalives_idle,
keepalive: if config.keepalives {
Some(config.keepalive_config.clone())
} else {
None
},
});

Ok((client, connection))
Expand Down
8 changes: 4 additions & 4 deletions tokio-postgres/src/connect_socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::config::Host;
use crate::keepalive::KeepaliveConfig;
use crate::{Error, Socket};
use socket2::{SockRef, TcpKeepalive};
use std::future::Future;
Expand All @@ -13,8 +14,7 @@ pub(crate) async fn connect_socket(
host: &Host,
port: u16,
connect_timeout: Option<Duration>,
keepalives: bool,
keepalives_idle: Duration,
keepalive_config: Option<&KeepaliveConfig>,
) -> Result<Socket, Error> {
match host {
Host::Tcp(host) => {
Expand All @@ -35,9 +35,9 @@ pub(crate) async fn connect_socket(
};

stream.set_nodelay(true).map_err(Error::connect)?;
if keepalives {
if let Some(keepalive_config) = keepalive_config {
SockRef::from(&stream)
.set_tcp_keepalive(&TcpKeepalive::new().with_time(keepalives_idle))
.set_tcp_keepalive(&TcpKeepalive::from(keepalive_config))
.map_err(Error::connect)?;
}

Expand Down
27 changes: 27 additions & 0 deletions tokio-postgres/src/keepalive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use socket2::TcpKeepalive;
use std::time::Duration;

#[derive(Clone, PartialEq, Eq)]
pub(crate) struct KeepaliveConfig {
pub idle: Duration,
pub interval: Option<Duration>,
pub retries: Option<u32>,
}

impl From<&KeepaliveConfig> for TcpKeepalive {
fn from(keepalive_config: &KeepaliveConfig) -> Self {
let mut tcp_keepalive = Self::new().with_time(keepalive_config.idle);

#[cfg(not(any(target_os = "redox", target_os = "solaris")))]
if let Some(interval) = keepalive_config.interval {
tcp_keepalive = tcp_keepalive.with_interval(interval);
}

#[cfg(not(any(target_os = "redox", target_os = "solaris", target_os = "windows")))]
if let Some(retries) = keepalive_config.retries {
tcp_keepalive = tcp_keepalive.with_retries(retries);
}

tcp_keepalive
}
}
1 change: 1 addition & 0 deletions tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ mod copy_in;
mod copy_out;
pub mod error;
mod generic_client;
mod keepalive;
mod maybe_tls_stream;
mod portal;
mod prepare;
Expand Down
12 changes: 12 additions & 0 deletions tokio-postgres/tests/test/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ fn settings() {
);
}

#[test]
fn keepalive_settings() {
check(
"keepalives=1 keepalives_idle=15 keepalives_interval=5 keepalives_retries=9",
Config::new()
.keepalives(true)
.keepalives_idle(Duration::from_secs(15))
.keepalives_interval(Duration::from_secs(5))
.keepalives_retries(9),
);
}

#[test]
fn url() {
check("postgresql://", &Config::new());
Expand Down