Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pool): reimplement pool internals with futures-intrusive #1320

Merged
merged 1 commit into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ encoding_rs = { version = "0.8.23", optional = true }
either = "1.5.3"
futures-channel = { version = "0.3.5", default-features = false, features = ["sink", "alloc", "std"] }
futures-core = { version = "0.3.5", default-features = false }
futures-intrusive = "0.4.0"
futures-util = { version = "0.3.5", features = ["sink"] }
generic-array = { version = "0.14.4", default-features = false, optional = true }
hex = "0.4.2"
Expand Down
42 changes: 29 additions & 13 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::inner::{DecrementSizeGuard, SharedPool};
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use sqlx_rt::spawn;
use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;

use futures_intrusive::sync::SemaphoreReleaser;

use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, SharedPool};

/// A connection managed by a [`Pool`][crate::pool::Pool].
///
/// Will be returned to the pool on-drop.
Expand All @@ -28,8 +31,8 @@ pub(super) struct Idle<DB: Database> {

/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<'p, C> {
inner: C,
guard: DecrementSizeGuard<'p>,
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<'p>,
}

const DEREF_ERR: &str = "(bug) connection already released to pool";
Expand Down Expand Up @@ -71,7 +74,7 @@ impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
sqlx_rt::spawn(async move {
let mut floating = live.float(&pool);

// test the connection on-release to ensure it is still viable
Expand Down Expand Up @@ -102,7 +105,8 @@ impl<DB: Database> Live<DB> {
pub fn float(self, pool: &SharedPool<DB>) -> Floating<'_, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new(pool),
// create a new guard from a previously leaked permit
guard: DecrementSizeGuard::new_permit(pool),
}
}

Expand Down Expand Up @@ -161,6 +165,11 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}
}

pub async fn close(self) -> Result<(), Error> {
// `guard` is dropped as intended
self.inner.raw.close().await
}

pub fn detach(self) -> DB::Connection {
self.inner.raw
}
Expand All @@ -174,10 +183,14 @@ impl<'s, DB: Database> Floating<'s, Live<DB>> {
}

impl<'s, DB: Database> Floating<'s, Idle<DB>> {
pub fn from_idle(idle: Idle<DB>, pool: &'s SharedPool<DB>) -> Self {
pub fn from_idle(
idle: Idle<DB>,
pool: &'s SharedPool<DB>,
permit: SemaphoreReleaser<'s>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::new(pool),
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}

Expand All @@ -192,9 +205,12 @@ impl<'s, DB: Database> Floating<'s, Idle<DB>> {
}
}

pub async fn close(self) -> Result<(), Error> {
pub async fn close(self) -> DecrementSizeGuard<'s> {
// `guard` is dropped as intended
self.inner.live.raw.close().await
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
}
self.guard
}
}

Expand Down
Loading