Skip to content

Commit

Permalink
fix(pool): ignore spurious wakeups when waiting for a connection
Browse files Browse the repository at this point in the history
fixes #622
  • Loading branch information
abonander authored and mehcode committed Oct 13, 2020
1 parent 7a70717 commit fa7981f
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 10 deletions.
47 changes: 37 additions & 10 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::task::Context;
use std::time::Instant;

pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Waker>,
waiters: SegQueue<Arc<Waiter>>,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) options: PoolOptions<DB>,
Expand Down Expand Up @@ -122,19 +123,22 @@ impl<DB: Database> SharedPool<DB> {
return Err(Error::PoolClosed);
}

let mut waker_pushed = false;
let mut waiter = None;

timeout(
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|ctx| -> Poll<()> {
if !waker_pushed {
// only push the waker once
self.waiters.push(ctx.waker().to_owned());
waker_pushed = true;
Poll::Pending
} else {
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(waiter.clone());
waiter
});

if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
Expand Down Expand Up @@ -346,7 +350,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
waiters: &'a SegQueue<Waker>,
waiters: &'a SegQueue<Arc<Waiter>>,
dropped: bool,
}

Expand Down Expand Up @@ -379,3 +383,26 @@ impl Drop for DecrementSizeGuard<'_> {
}
}
}

struct Waiter {
woken: AtomicBool,
waker: Waker,
}

impl Waiter {
fn new(cx: &mut Context<'_>) -> Arc<Self> {
Arc::new(Self {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
})
}

fn wake(&self) {
self.woken.store(true, Ordering::Release);
self.waker.wake_by_ref();
}

fn is_woken(&self) -> bool {
self.woken.load(Ordering::Acquire)
}
}
54 changes: 54 additions & 0 deletions tests/mysql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> {

Ok(())
}

// repro is more reliable with the basic scheduler used by `#[tokio::test]`
#[cfg(feature = "runtime-tokio")]
#[tokio::test]
async fn test_issue_622() -> anyhow::Result<()> {
use std::time::Instant;

setup_if_needed();

let pool = MySqlPoolOptions::new()
.max_connections(1) // also fails with higher counts, e.g. 5
.connect(&std::env::var("DATABASE_URL").unwrap())
.await?;

println!("pool state: {:?}", pool);

let mut handles = vec![];

// given repro spawned 100 tasks but I found it reliably reproduced with 3
for i in 0..3 {
let pool = pool.clone();

handles.push(sqlx_rt::spawn(async move {
{
let mut conn = pool.acquire().await.unwrap();

let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap();

// conn gets dropped here and should be returned to the pool
}

// (do some other work here without holding on to a connection)
// this actually fixes the issue, depending on the timeout used
// sqlx_rt::sleep(Duration::from_millis(500)).await;

{
let start = Instant::now();
match pool.acquire().await {
Ok(conn) => {
println!("{} acquire took {:?}", i, start.elapsed());
drop(conn);
}
Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool),
}
}

Result::<(), anyhow::Error>::Ok(())
}));
}

futures::future::try_join_all(handles).await?;

Ok(())
}
54 changes: 54 additions & 0 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,3 +709,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> {

Ok(())
}

// repro is more reliable with the basic scheduler used by `#[tokio::test]`
#[cfg(feature = "runtime-tokio")]
#[tokio::test]
async fn test_issue_622() -> anyhow::Result<()> {
use std::time::Instant;

setup_if_needed();

let pool = PgPoolOptions::new()
.max_connections(1) // also fails with higher counts, e.g. 5
.connect(&std::env::var("DATABASE_URL").unwrap())
.await?;

println!("pool state: {:?}", pool);

let mut handles = vec![];

// given repro spawned 100 tasks but I found it reliably reproduced with 3
for i in 0..3 {
let pool = pool.clone();

handles.push(sqlx_rt::spawn(async move {
{
let mut conn = pool.acquire().await.unwrap();

let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap();

// conn gets dropped here and should be returned to the pool
}

// (do some other work here without holding on to a connection)
// this actually fixes the issue, depending on the timeout used
// sqlx_rt::sleep(Duration::from_millis(500)).await;

{
let start = Instant::now();
match pool.acquire().await {
Ok(conn) => {
println!("{} acquire took {:?}", i, start.elapsed());
drop(conn);
}
Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool),
}
}

Result::<(), anyhow::Error>::Ok(())
}));
}

futures::future::try_join_all(handles).await?;

Ok(())
}

0 comments on commit fa7981f

Please sign in to comment.