Skip to content

Commit

Permalink
feat(db): Instrument DB connection lifecycle (#1027)
Browse files Browse the repository at this point in the history
## What ❔

Instruments DB connection lifecycle (acquiring connection call sites,
long-living connections). Traces connections for test pools and allows
to configure test pool size.

## Why ❔

- Can provide more insights as to which components utilize connections
suboptimally.
- Simplifies testing / DevEx in general.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Feb 8, 2024
1 parent d1e4774 commit 636fcfd
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 139 deletions.
19 changes: 0 additions & 19 deletions core/lib/dal/src/connection/holder.rs

This file was deleted.

167 changes: 117 additions & 50 deletions core/lib/dal/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use std::{env, fmt, time::Duration};
use std::{env, fmt, future::Future, panic::Location, sync::Arc, time::Duration};

use anyhow::Context as _;
use sqlx::{
pool::PoolConnection,
postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres},
};

use crate::{metrics::CONNECTION_METRICS, StorageProcessor};
pub use self::processor::StorageProcessor;
use self::processor::{StorageProcessorTags, TracedConnections};
use crate::metrics::CONNECTION_METRICS;

pub mod holder;
mod processor;

/// Builder for [`ConnectionPool`]s.
#[derive(Clone)]
pub struct ConnectionPoolBuilder {
database_url: String,
max_size: u32,
acquire_timeout: Duration,
statement_timeout: Option<Duration>,
}

Expand All @@ -24,12 +27,20 @@ impl fmt::Debug for ConnectionPoolBuilder {
formatter
.debug_struct("ConnectionPoolBuilder")
.field("max_size", &self.max_size)
.field("acquire_timeout", &self.acquire_timeout)
.field("statement_timeout", &self.statement_timeout)
.finish()
}
}

impl ConnectionPoolBuilder {
/// Sets the acquire timeout for a single connection attempt. There are multiple attempts (currently 3)
/// before `access_storage*` methods return an error.
pub fn set_acquire_timeout(&mut self, timeout: Duration) -> &mut Self {
self.acquire_timeout = timeout;
self
}

/// Sets the statement timeout for the pool. See [Postgres docs] for semantics.
/// If not specified, the statement timeout will not be set.
///
Expand All @@ -41,7 +52,9 @@ impl ConnectionPoolBuilder {

/// Builds a connection pool from this builder.
pub async fn build(&self) -> anyhow::Result<ConnectionPool> {
let options = PgPoolOptions::new().max_connections(self.max_size);
let options = PgPoolOptions::new()
.max_connections(self.max_size)
.acquire_timeout(self.acquire_timeout);
let mut connect_options: PgConnectOptions = self
.database_url
.parse()
Expand All @@ -54,16 +67,12 @@ impl ConnectionPoolBuilder {
.connect_with(connect_options)
.await
.context("Failed connecting to database")?;
tracing::info!(
"Created pool with {max_connections} max connections \
and {statement_timeout:?} statement timeout",
max_connections = self.max_size,
statement_timeout = self.statement_timeout
);
tracing::info!("Created DB pool with parameters {self:?}");
Ok(ConnectionPool {
database_url: self.database_url.clone(),
inner: pool,
max_size: self.max_size,
traced_connections: None,
})
}

Expand All @@ -77,24 +86,7 @@ impl ConnectionPoolBuilder {
}
}

#[derive(Clone)]
pub struct ConnectionPool {
pub(crate) inner: PgPool,
database_url: String,
max_size: u32,
}

impl fmt::Debug for ConnectionPool {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't print the `database_url`, as is may contain
// sensitive information (e.g. database password).
formatter
.debug_struct("ConnectionPool")
.field("max_size", &self.max_size)
.finish_non_exhaustive()
}
}

#[derive(Debug)]
pub struct TestTemplate(url::Url);

impl TestTemplate {
Expand All @@ -121,7 +113,7 @@ impl TestTemplate {
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

Expand All @@ -138,7 +130,7 @@ impl TestTemplate {
/// so that the db can be used as a template.
pub async fn freeze(pool: ConnectionPool) -> anyhow::Result<Self> {
use sqlx::Executor as _;
let mut conn = pool.acquire_connection_retried().await?;
let mut conn = pool.acquire_connection_retried(None).await?;
conn.execute(
"UPDATE pg_database SET datallowconn = false WHERE datname = current_database()",
)
Expand All @@ -157,7 +149,7 @@ impl TestTemplate {
/// whenever you write to the DBs, therefore making it as fast as an in-memory Postgres instance.
/// The database is not cleaned up automatically, but rather the whole Postgres
/// container is recreated whenever you call "zk test rust".
pub async fn create_db(&self) -> anyhow::Result<ConnectionPool> {
pub async fn create_db(&self, connections: u32) -> anyhow::Result<ConnectionPoolBuilder> {
use rand::Rng as _;
use sqlx::Executor as _;

Expand All @@ -170,24 +162,68 @@ impl TestTemplate {
.await
.context("CREATE DATABASE")?;

const TEST_MAX_CONNECTIONS: u32 = 50; // Expected to be enough for any unit test.
ConnectionPool::builder(self.url(&db_new).as_ref(), TEST_MAX_CONNECTIONS)
.build()
.await
.context("ConnectionPool::builder()")
Ok(ConnectionPool::builder(
self.url(&db_new).as_ref(),
connections,
))
}
}

#[derive(Clone)]
pub struct ConnectionPool {
pub(crate) inner: PgPool,
database_url: String,
max_size: u32,
traced_connections: Option<Arc<TracedConnections>>,
}

impl fmt::Debug for ConnectionPool {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't print the `database_url`, as is may contain
// sensitive information (e.g. database password).
formatter
.debug_struct("ConnectionPool")
.field("max_size", &self.max_size)
.finish_non_exhaustive()
}
}

impl ConnectionPool {
const TEST_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(1);

/// Creates a test pool with a reasonably large number of connections.
///
/// Test pools trace their active connections. If acquiring a connection fails (e.g., with a timeout),
/// the returned error will contain information on all active connections.
pub async fn test_pool() -> ConnectionPool {
TestTemplate::empty().unwrap().create_db().await.unwrap()
const DEFAULT_CONNECTIONS: u32 = 50; // Expected to be enough for any unit test.
Self::constrained_test_pool(DEFAULT_CONNECTIONS).await
}

/// Same as [`Self::test_pool()`], but with a configurable number of connections. This is useful to test
/// behavior of components that rely on singleton / constrained pools in production.
pub async fn constrained_test_pool(connections: u32) -> ConnectionPool {
assert!(connections > 0, "Number of connections must be positive");
let mut builder = TestTemplate::empty()
.expect("failed creating test template")
.create_db(connections)
.await
.expect("failed creating database for tests");
let mut pool = builder
.set_acquire_timeout(Self::TEST_ACQUIRE_TIMEOUT)
.build()
.await
.expect("cannot build connection pool");
pool.traced_connections = Some(Arc::default());
pool
}

/// Initializes a builder for connection pools.
pub fn builder(database_url: &str, max_pool_size: u32) -> ConnectionPoolBuilder {
ConnectionPoolBuilder {
database_url: database_url.to_string(),
max_size: max_pool_size,
acquire_timeout: Duration::from_secs(30), // Default value used by `sqlx`
statement_timeout: None,
}
}
Expand Down Expand Up @@ -218,34 +254,50 @@ impl ConnectionPool {
}

/// A version of `access_storage` that would also expose the duration of the connection
/// acquisition tagged to the `requester` name.
/// acquisition tagged to the `requester` name. It also tracks the caller location for the purposes
/// of logging (e.g., long-living connections) and debugging (when used with a test connection pool).
///
/// WARN: This method should not be used if it will result in too many time series (e.g.
/// from witness generators or provers), otherwise Prometheus won't be able to handle it.
pub async fn access_storage_tagged(
#[track_caller] // In order to use it, we have to de-sugar `async fn`
pub fn access_storage_tagged(
&self,
requester: &'static str,
) -> anyhow::Result<StorageProcessor<'_>> {
self.access_storage_inner(Some(requester)).await
) -> impl Future<Output = anyhow::Result<StorageProcessor<'_>>> + '_ {
let location = Location::caller();
async move {
let tags = StorageProcessorTags {
requester,
location,
};
self.access_storage_inner(Some(tags)).await
}
}

async fn access_storage_inner(
&self,
requester: Option<&'static str>,
tags: Option<StorageProcessorTags>,
) -> anyhow::Result<StorageProcessor<'_>> {
let acquire_latency = CONNECTION_METRICS.acquire.start();
let conn = self
.acquire_connection_retried()
.acquire_connection_retried(tags.as_ref())
.await
.context("acquire_connection_retried()")?;
let elapsed = acquire_latency.observe();
if let Some(requester) = requester {
CONNECTION_METRICS.acquire_tagged[&requester].observe(elapsed);
if let Some(tags) = &tags {
CONNECTION_METRICS.acquire_tagged[&tags.requester].observe(elapsed);
}
Ok(StorageProcessor::from_pool(conn))
Ok(StorageProcessor::from_pool(
conn,
tags,
self.traced_connections.as_deref(),
))
}

async fn acquire_connection_retried(&self) -> anyhow::Result<PoolConnection<Postgres>> {
async fn acquire_connection_retried(
&self,
tags: Option<&StorageProcessorTags>,
) -> anyhow::Result<PoolConnection<Postgres>> {
const DB_CONNECTION_RETRIES: u32 = 3;
const BACKOFF_INTERVAL: Duration = Duration::from_secs(1);

Expand All @@ -266,8 +318,12 @@ impl ConnectionPool {
};

Self::report_connection_error(&connection_err);
let tags_display: &(dyn fmt::Display + Send + Sync) = match tags {
Some(tags) => tags,
None => &"not tagged",
};
tracing::warn!(
"Failed to get connection to DB, backing off for {BACKOFF_INTERVAL:?}: {connection_err}"
"Failed to get connection to DB ({tags_display}), backing off for {BACKOFF_INTERVAL:?}: {connection_err}"
);
tokio::time::sleep(BACKOFF_INTERVAL).await;
}
Expand All @@ -277,7 +333,18 @@ impl ConnectionPool {
Ok(conn) => Ok(conn),
Err(err) => {
Self::report_connection_error(&err);
anyhow::bail!("Run out of retries getting a DB connection, last error: {err}");
let tags_display: &dyn fmt::Display = match tags {
Some(tags) => tags,
None => &"not tagged",
};
if let Some(traced_connections) = &self.traced_connections {
anyhow::bail!(
"Run out of retries getting a DB connection ({tags_display}), last error: {err}\n\
Active connections: {traced_connections:#?}"
);
} else {
anyhow::bail!("Run out of retries getting a DB connection ({tags_display}), last error: {err}");
}
}
}
}
Expand All @@ -297,7 +364,7 @@ mod tests {
async fn setting_statement_timeout() {
let db_url = TestTemplate::empty()
.unwrap()
.create_db()
.create_db(1)
.await
.unwrap()
.database_url;
Expand Down
Loading

0 comments on commit 636fcfd

Please sign in to comment.