Skip to content

Commit

Permalink
feat: Add set_connect_options method to Pool
Browse files Browse the repository at this point in the history
This allows external updates of the ConnectionOptions used when a new
connection needs to be opened for the pool.  The primary use case
is to support dynamically updated (read: rotated) credentials used
by systems like AWS RDS.
  • Loading branch information
moatra committed Sep 19, 2022
1 parent 1379eb6 commit 1b76a6b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
16 changes: 12 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::pool::options::PoolConnectionMetadata;
Expand All @@ -20,7 +20,7 @@ use futures_util::FutureExt;
use std::time::{Duration, Instant};

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) connect_options: RwLock<<DB::Connection as Connection>::Options>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
Expand All @@ -47,7 +47,7 @@ impl<DB: Database> PoolInner<DB> {
};

let pool = Self {
connect_options,
connect_options: RwLock::new(connect_options),
idle_conns: ArrayQueue::new(capacity),
semaphore: Semaphore::new(options.fair, semaphore_capacity),
size: AtomicU32::new(0),
Expand Down Expand Up @@ -292,9 +292,17 @@ impl<DB: Database> PoolInner<DB> {
loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;

// clone the connect options so they can be used without holding the RwLockReadGuard
// across an async await point
let connect_options = self
.connect_options
.read()
.expect("write-lock holder panicked")
.clone();

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
match sqlx_rt::timeout(timeout, connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
Expand Down
33 changes: 29 additions & 4 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -489,9 +490,29 @@ impl<DB: Database> Pool<DB> {
self.0.num_idle()
}

/// Get the connection options for this pool
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
&self.0.connect_options
/// Gets a clone of the connection options for this pool
pub fn connect_options(&self) -> <DB::Connection as Connection>::Options {
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.clone()
}

/// Updates the connection options this pool will use when opening any future connections. Any
/// existing open connection in the pool will be left as-is.
pub fn with_connect_options(
&self,
mut connect_options: <DB::Connection as Connection>::Options,
) {
// technically write() could also panic if the current thread already holds the lock,
// but because this method can't be re-entered by the same thread that shouldn't be a problem
let mut guard = self
.0
.connect_options
.write()
.expect("write-lock holder panicked");
std::mem::swap(guard.deref_mut(), &mut connect_options);
}

/// Get the options for this pool
Expand All @@ -514,7 +535,11 @@ impl Pool<Any> {
///
/// Determined by the connection URL.
pub fn any_kind(&self) -> AnyKind {
self.0.connect_options.kind()
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.kind()
}
}

Expand Down

0 comments on commit 1b76a6b

Please sign in to comment.