Skip to content

Commit

Permalink
Unify error types
Browse files Browse the repository at this point in the history
  • Loading branch information
sfackler committed Nov 26, 2017
1 parent 7fb1825 commit 406472e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 63 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Cargo.lock
.cargo/
.idea/
*.iml
.vscode/
16 changes: 6 additions & 10 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::time::Duration;
use std::marker::PhantomData;
use std::sync::Arc;

use {HandleError, LoggingErrorHandler, CustomizeConnection, NopConnectionCustomizer, InitializationError, ManageConnection, Pool};
use {CustomizeConnection, Error, HandleError, LoggingErrorHandler, ManageConnection,
NopConnectionCustomizer, Pool};

/// A builder for a connection pool.
pub struct Builder<M>
Expand All @@ -26,7 +27,7 @@ where

impl<M> fmt::Debug for Builder<M>
where
M: ManageConnection
M: ManageConnection,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Builder")
Expand Down Expand Up @@ -65,7 +66,7 @@ where

impl<M> Builder<M>
where
M: ManageConnection
M: ManageConnection,
{
/// Constructs a new `Builder`.
///
Expand Down Expand Up @@ -211,7 +212,7 @@ where
/// # Panics
///
/// Panics if `min_idle` is greater than `max_size`.
pub fn build(self, manager: M) -> Result<Pool<M>, InitializationError> {
pub fn build(self, manager: M) -> Result<Pool<M>, Error> {
let pool = self.build_unchecked(manager);
pool.wait_for_initialization()?;
Ok(pool)
Expand All @@ -235,12 +236,7 @@ where

let thread_pool = match self.thread_pool {
Some(thread_pool) => thread_pool,
None => {
Arc::new(ScheduledThreadPool::with_name(
"r2d2-worker-{}",
3,
))
}
None => Arc::new(ScheduledThreadPool::with_name("r2d2-worker-{}", 3)),
};

let config = Config {
Expand Down
64 changes: 19 additions & 45 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@
//! }
//! ```
#![warn(missing_docs)]
#![doc(html_root_url="https://docs.rs/r2d2/0.7")]
#![doc(html_root_url = "https://docs.rs/r2d2/0.7")]

extern crate antidote;
extern crate scheduled_thread_pool;
#[macro_use]
extern crate log;
extern crate scheduled_thread_pool;

use antidote::{Mutex, MutexGuard, Condvar};
use antidote::{Condvar, Mutex, MutexGuard};
use std::cmp;
use std::collections::VecDeque;
use std::error::Error;
use std::error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::mem;
Expand All @@ -68,7 +68,7 @@ pub trait ManageConnection: Send + Sync + 'static {
type Connection: Send + 'static;

/// The error type returned by `Connection`s.
type Error: Error + 'static;
type Error: error::Error + 'static;

/// Attempts to create a new connection.
fn connect(&self) -> Result<Self::Connection, Self::Error>;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct LoggingErrorHandler;

impl<E> HandleError<E> for LoggingErrorHandler
where
E: Error,
E: error::Error,
{
fn handle_error(&self, error: E) {
error!("{}", error);
Expand Down Expand Up @@ -200,9 +200,7 @@ fn establish_idle_connections<M>(
) where
M: ManageConnection,
{
let min = shared.config.min_idle.unwrap_or(
shared.config.max_size,
);
let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
let idle = internals.conns.len() as u32;
for _ in idle..min {
add_connection(shared, internals);
Expand Down Expand Up @@ -330,9 +328,7 @@ where
M: ManageConnection,
{
/// Creates a new connection pool with a default configuration.
pub fn new(
manager: M,
) -> Result<Pool<M>, InitializationError> {
pub fn new(manager: M) -> Result<Pool<M>, Error> {
Pool::builder().build(manager)
}

Expand Down Expand Up @@ -375,18 +371,16 @@ where
Pool(shared)
}

fn wait_for_initialization(&self) -> Result<(), InitializationError> {
fn wait_for_initialization(&self) -> Result<(), Error> {
let end = Instant::now() + self.0.config.connection_timeout;
let mut internals = self.0.internals.lock();

let initial_size = self.0.config.min_idle.unwrap_or(
self.0.config.max_size,
);
let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);

while internals.num_conns != initial_size {
let now = Instant::now();
if now >= end {
return Err(InitializationError(internals.last_error.take()));
return Err(Error(internals.last_error.take()));
}
internals = self.0.cond.wait_timeout(internals, end - now).0;
}
Expand All @@ -398,7 +392,7 @@ where
///
/// Waits for at most the configured connection timeout before returning an
/// error.
pub fn get(&self) -> Result<PooledConnection<M>, GetTimeout> {
pub fn get(&self) -> Result<PooledConnection<M>, Error> {
let end = Instant::now() + self.0.config.connection_timeout;
let mut internals = self.0.internals.lock();

Expand All @@ -414,7 +408,7 @@ where

let now = Instant::now();
if now >= end {
return Err(GetTimeout(internals.last_error.take()));
return Err(Error(internals.last_error.take()));
}
internals = self.0.cond.wait_timeout(internals, end - now).0;
}
Expand Down Expand Up @@ -519,43 +513,23 @@ where
}
}

/// An error returned by `Pool::new` if it fails to initialize connections.
#[derive(Debug)]
pub struct InitializationError(Option<String>);

impl fmt::Display for InitializationError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(self.description())?;
if let Some(ref err) = self.0 {
write!(fmt, ": {}", err)?;
}
Ok(())
}
}

impl Error for InitializationError {
fn description(&self) -> &str {
"unable to initialize connections"
}
}

/// An error returned by `Pool::get` if it times out without retrieving a connection.
/// The error type returned by methods in this crate.
#[derive(Debug)]
pub struct GetTimeout(Option<String>);
pub struct Error(Option<String>);

impl fmt::Display for GetTimeout {
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(self.description())?;
fmt.write_str(error::Error::description(self))?;
if let Some(ref err) = self.0 {
write!(fmt, ": {}", err)?;
}
Ok(())
}
}

impl Error for GetTimeout {
impl error::Error for Error {
fn description(&self) -> &str {
"timed out while waiting for a connection"
"timed out waiting for connection"
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use antidote::Mutex;
use std::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, AtomicUsize, ATOMIC_USIZE_INIT, AtomicIsize,
Ordering};
use std::sync::mpsc::{self, SyncSender, Receiver};
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering, ATOMIC_BOOL_INIT,
ATOMIC_USIZE_INIT};
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::time::Duration;
use std::{mem, thread, fmt, error};
use std::{error, fmt, mem, thread};

use {ManageConnection, CustomizeConnection, Pool};
use {CustomizeConnection, ManageConnection, Pool};

#[derive(Debug)]
pub struct Error;
Expand Down Expand Up @@ -161,7 +161,9 @@ fn test_issue_2_unlocked_during_is_valid() {
.unwrap();

let p2 = pool.clone();
let t = thread::spawn(move || { p2.get().ok().unwrap(); });
let t = thread::spawn(move || {
p2.get().ok().unwrap();
});

r1.recv().unwrap();
// get call by other task has triggered the health check
Expand Down Expand Up @@ -307,7 +309,6 @@ fn test_connection_customizer() {
assert!(DROPPED.load(Ordering::SeqCst));
}
assert!(RELEASED.load(Ordering::SeqCst));

}

#[test]
Expand Down Expand Up @@ -432,7 +433,11 @@ fn min_idle() {
}
}

let pool = Pool::builder().max_size(5).min_idle(Some(2)).build(Handler).unwrap();
let pool = Pool::builder()
.max_size(5)
.min_idle(Some(2))
.build(Handler)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert_eq!(2, pool.state().idle_connections);
assert_eq!(2, pool.state().connections);
Expand Down

0 comments on commit 406472e

Please sign in to comment.