Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unwind safety #70

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 83 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::error;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand All @@ -62,7 +63,7 @@ mod config;
mod test;

/// A trait which provides connection-specific functionality.
pub trait ManageConnection: Send + Sync + 'static {
pub trait ManageConnection: Sized + Send + Sync + 'static {
/// The connection type this manager deals with.
type Connection: Send + 'static;

Expand All @@ -88,6 +89,14 @@ pub trait ManageConnection: Send + Sync + 'static {
/// has disconnected. Implementations that do not support this kind of
/// fast health check may simply return `false`.
fn has_broken(&self, conn: &mut Self::Connection) -> bool;

/// A hook that is called before returning a connection to the pool
///
/// This hook can be overidden to clean up a connection's state, or to
/// remove a connection from the pool if it was dropped during a panic.
/// This function will always be called from the same thread the pooled
/// connection was dropped from.
fn before_return(&self, _conn: &mut PooledConnection<Self>) {}
}

/// A trait which handles errors reported by the `ManageConnection`.
Expand Down Expand Up @@ -151,6 +160,7 @@ impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
struct Conn<C> {
conn: C,
birth: Instant,
should_be_returned: bool,
}

struct IdleConn<C> {
Expand All @@ -175,19 +185,38 @@ where
cond: Condvar,
}

fn drop_conns<M>(
// Condvar isn't UnwindSafe, but our pool is. The only thing you could do
// with it which causes problems in the face of unwinding is check out a
// connection. We assume that any `ManageConnection` which is unwind safe
// will override `before_return` to restore broken invariants, or remove
// connections from the pool, so we can safely implement `UnwindSafe`
impl<M: ManageConnection + UnwindSafe> UnwindSafe for SharedPool<M> {}
impl<M: ManageConnection + RefUnwindSafe> RefUnwindSafe for SharedPool<M> {}

fn forget_conns<M>(
shared: &Arc<SharedPool<M>>,
mut internals: MutexGuard<PoolInternals<M::Connection>>,
conns: Vec<M::Connection>,
conns: &mut [Conn<M::Connection>],
) where
M: ManageConnection,
{
internals.num_conns -= conns.len() as u32;
establish_idle_connections(shared, &mut internals);
drop(internals); // make sure we run connection destructors without this locked
for conn in conns {
conn.should_be_returned = false;
}
}

fn drop_conns<M>(
shared: &Arc<SharedPool<M>>,
internals: MutexGuard<PoolInternals<M::Connection>>,
mut conns: Vec<Conn<M::Connection>>,
) where
M: ManageConnection,
{
forget_conns(shared, internals, &mut conns);
for conn in conns {
shared.config.connection_customizer.on_release(conn);
shared.config.connection_customizer.on_release(conn.conn);
}
}

Expand Down Expand Up @@ -242,6 +271,7 @@ where
conn: Conn {
conn: conn,
birth: now,
should_be_returned: true,
},
idle_start: now,
};
Expand Down Expand Up @@ -286,7 +316,7 @@ where
reap |= now - conn.conn.birth >= lifetime;
}
if reap {
to_drop.push(conn.conn.conn);
to_drop.push(conn.conn);
} else {
internals.conns.push(conn);
}
Expand Down Expand Up @@ -433,7 +463,7 @@ where
// FIXME we shouldn't have to lock, unlock, and relock here
internals = self.0.internals.lock();
internals.last_error = Some(msg);
drop_conns(&self.0, internals, vec![conn.conn.conn]);
drop_conns(&self.0, internals, vec![conn.conn]);
internals = self.0.internals.lock();
continue;
}
Expand All @@ -449,13 +479,21 @@ where
}
}

fn put_back(&self, mut conn: Conn<M::Connection>) {
fn put_back(&self, conn: &mut PooledConnection<M>) {
self.0.manager.before_return(conn);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should only be called if the connection is not broken (e.g. before line 498) so that the caller does not need to check for a broken connection as well


let mut conn = match conn.conn.take() {
None => return,
Some(ref c) if !c.should_be_returned => return,
Some(c) => c,
};

// This is specified to be fast, but call it before locking anyways
let broken = self.0.manager.has_broken(&mut conn.conn);

let mut internals = self.0.internals.lock();
if broken {
drop_conns(&self.0, internals, vec![conn.conn]);
drop_conns(&self.0, internals, vec![conn]);
} else {
let conn = IdleConn {
conn: conn,
Expand All @@ -466,6 +504,13 @@ where
}
}

fn forget_conn(&self, conn: &mut Conn<M::Connection>) {
use std::slice;

let internals = self.0.internals.lock();
forget_conns(&self.0, internals, slice::from_mut(conn));
}

/// Returns information about the current state of the pool.
pub fn state(&self) -> State {
let internals = self.0.internals.lock();
Expand Down Expand Up @@ -554,6 +599,34 @@ where
conn: Option<Conn<M::Connection>>,
}

impl<M: ManageConnection> PooledConnection<M> {
/// Takes the inner connection, removing it from the pool
///
/// See [`PooledConnection::remove_from_pool`] for details
pub fn into_inner(mut self) -> M::Connection {
self.remove_from_pool();
self.conn
.take()
.unwrap()
.conn
}

/// Removes this connection from the pool
///
/// The connection can still be used afterwards, but it will never be
/// returned to the pool. If a connection customizer is set, its
/// `on_release` hook will not be called.
///
/// The pool will establish a new connection to take its place. Care must
/// be taken when using this method to ensure you are not establishing an
/// unbounded number of connections.
pub fn remove_from_pool(&mut self) {
if let Some(ref mut conn) = self.conn {
self.pool.forget_conn(conn);
}
}
}

impl<M> fmt::Debug for PooledConnection<M>
where
M: ManageConnection,
Expand All @@ -569,7 +642,7 @@ where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.conn.take().unwrap());
self.pool.clone().put_back(self)
}
}

Expand Down
79 changes: 78 additions & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use antidote::Mutex;
use std::panic::catch_unwind;
use std::sync::atomic::{
AtomicBool, AtomicIsize, AtomicUsize, Ordering, ATOMIC_BOOL_INIT, ATOMIC_USIZE_INIT,
};
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::time::Duration;
use std::{error, fmt, mem, thread};

use {CustomizeConnection, ManageConnection, Pool};
use {CustomizeConnection, ManageConnection, Pool, PooledConnection};

#[derive(Debug)]
pub struct Error;
Expand Down Expand Up @@ -545,3 +546,79 @@ fn conns_drop_on_pool_drop() {
}
panic!("timed out waiting for connections to drop");
}

#[test]
fn connections_can_be_configured_to_not_return_to_pool_on_drop() {
static DROPPED: AtomicUsize = ATOMIC_USIZE_INIT;

struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

struct Handler;

impl ManageConnection for Handler {
type Connection = Connection;
type Error = Error;

fn connect(&self) -> Result<Connection, Error> {
Ok(Connection)
}

fn is_valid(&self, _: &mut Connection) -> Result<(), Error> {
Ok(())
}

fn has_broken(&self, _: &mut Connection) -> bool {
false
}

fn before_return(&self, conn: &mut PooledConnection<Self>) {
use std::thread::panicking;
if panicking() {
conn.remove_from_pool();
}
}
}

let pool = Pool::builder()
.max_lifetime(Some(Duration::from_secs(1)))
.max_size(10)
.min_idle(Some(0))
.build(Handler)
.unwrap();

let _ = catch_unwind(|| {
let _conn = pool.get().unwrap();
panic!();
});

assert_eq!(DROPPED.load(Ordering::SeqCst), 1);
assert_eq!(pool.state().connections, 0);
assert_eq!(pool.state().idle_connections, 0);
}

#[test]
fn pooled_conn_into_inner_removes_from_pool() {
let pool = Pool::builder()
.connection_timeout(Duration::from_millis(50))
.max_size(2)
.build(OkManager)
.unwrap();

let conn1 = pool.get();
let conn2 = pool.get();
let conn3 = pool.get();

assert!(conn1.is_ok());
assert!(conn2.is_ok());
assert!(conn3.is_err());

conn1.unwrap().into_inner();

assert!(pool.get().is_ok());
}