Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify server pool #177

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ assert-json-diff = "2.0"
colored = { version = "2.0", optional = true }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
log = "0.4"
rand = "0.8"
regex = "1.7"
Expand Down
68 changes: 18 additions & 50 deletions src/server_pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::Server;
use crate::{Error, ErrorKind};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut, Drop};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio::sync::{Semaphore, SemaphorePermit};

const DEFAULT_POOL_SIZE: usize = 50;

lazy_static! {
pub(crate) static ref SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);
}
// macOS has small default ulimits
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's test_server_pool_async and test_server_pool that might fail on a Mac now => please update the tests to use the values conditionally as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests happen to pass currently, because they have a bug — the servers vec is emptied on every iteration, so they never exhaust the pool.

But when these tests actually try to get the whole pool, they cause a deadlock whenever any other test runs in parallel that uses more than one server at a time.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, that vec was supposed to be outside the for loop 🤦

pub(crate) static SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);

///
/// A handle around a pooled `Server` object which dereferences to `Server`.
Expand Down Expand Up @@ -46,75 +43,46 @@ impl DerefMut for ServerGuard {
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(server) = self.server.take() {
// the permit is still held when recycling,
// so the next acquire will already see the recycled server
SERVER_POOL.recycle(server);
}
}
}

pub(crate) struct ServerPool {
max_size: usize,
created: Arc<Mutex<usize>>,
semaphore: Semaphore,
state: Arc<Mutex<VecDeque<Server>>>,
free_list: Mutex<VecDeque<Server>>,
}

impl ServerPool {
fn new(max_size: usize) -> ServerPool {
let created = Arc::new(Mutex::new(0));
let semaphore = Semaphore::new(max_size);
let state = Arc::new(Mutex::new(VecDeque::new()));
const fn new(max_size: usize) -> ServerPool {
ServerPool {
max_size,
created,
semaphore,
state,
semaphore: Semaphore::const_new(max_size),
free_list: Mutex::new(VecDeque::new()),
}
}

pub(crate) async fn get_async(&'static self) -> Result<ServerGuard, Error> {
// number of active permits limits the number of servers created
let permit = self
.semaphore
.acquire()
.await
.map_err(|err| Error::new_with_context(ErrorKind::Deadlock, err))?;

let should_create = {
let created_mutex = self.created.clone();
let mut created = created_mutex.lock().unwrap();
if *created < self.max_size {
*created += 1;
true
} else {
false
}
};

let server = {
if should_create {
Some(Server::try_new_with_port_async(0).await?)
} else {
None
}
// be careful not to lock locks in match - it extends scope of temporaries
let recycled = self.free_list.lock().unwrap().pop_front();
let server = match recycled {
Some(server) => server,
None => Server::try_new_with_port_async(0).await?,
};

let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();

if let Some(server) = server {
state.push_back(server);
}

if let Some(server) = state.pop_front() {
Ok(ServerGuard::new(server, permit))
} else {
Err(Error::new(ErrorKind::ServerBusy))
}
Ok(ServerGuard::new(server, permit))
}

fn recycle(&self, mut server: Server) {
server.reset();
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
state.push_back(server);
self.free_list.lock().unwrap().push_back(server);
}
}
Loading