Skip to content

Commit

Permalink
Simplify server pool
Browse files Browse the repository at this point in the history
  • Loading branch information
kornelski committed Sep 14, 2023
1 parent 99735e7 commit 3099e94
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 51 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ assert-json-diff = "2.0"
colored = { version = "2.0", optional = true }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
log = "0.4"
rand = "0.8"
regex = "1.7"
Expand Down
68 changes: 18 additions & 50 deletions src/server_pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::Server;
use crate::{Error, ErrorKind};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut, Drop};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio::sync::{Semaphore, SemaphorePermit};

const DEFAULT_POOL_SIZE: usize = 50;

lazy_static! {
pub(crate) static ref SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);
}
// macOS has small default ulimits
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };
pub(crate) static SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);

///
/// A handle around a pooled `Server` object which dereferences to `Server`.
Expand Down Expand Up @@ -46,75 +43,46 @@ impl DerefMut for ServerGuard {
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(server) = self.server.take() {
// the permit is still held when recycling,
// so the next acquire will already see the recycled server
SERVER_POOL.recycle(server);
}
}
}

pub(crate) struct ServerPool {
max_size: usize,
created: Arc<Mutex<usize>>,
semaphore: Semaphore,
state: Arc<Mutex<VecDeque<Server>>>,
free_list: Mutex<VecDeque<Server>>,
}

impl ServerPool {
fn new(max_size: usize) -> ServerPool {
let created = Arc::new(Mutex::new(0));
let semaphore = Semaphore::new(max_size);
let state = Arc::new(Mutex::new(VecDeque::new()));
const fn new(max_size: usize) -> ServerPool {
ServerPool {
max_size,
created,
semaphore,
state,
semaphore: Semaphore::const_new(max_size),
free_list: Mutex::new(VecDeque::new()),
}
}

pub(crate) async fn get_async(&'static self) -> Result<ServerGuard, Error> {
// number of active permits limits the number of servers created
let permit = self
.semaphore
.acquire()
.await
.map_err(|err| Error::new_with_context(ErrorKind::Deadlock, err))?;

let should_create = {
let created_mutex = self.created.clone();
let mut created = created_mutex.lock().unwrap();
if *created < self.max_size {
*created += 1;
true
} else {
false
}
};

let server = {
if should_create {
Some(Server::try_new_with_port_async(0).await?)
} else {
None
}
// be careful not to lock locks in match - it extends scope of temporaries
let recycled = self.free_list.lock().unwrap().pop_front();
let server = match recycled {
Some(server) => server,
None => Server::try_new_with_port_async(0).await?,
};

let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();

if let Some(server) = server {
state.push_back(server);
}

if let Some(server) = state.pop_front() {
Ok(ServerGuard::new(server, permit))
} else {
Err(Error::new(ErrorKind::ServerBusy))
}
Ok(ServerGuard::new(server, permit))
}

fn recycle(&self, mut server: Server) {
server.reset();
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
state.push_back(server);
self.free_list.lock().unwrap().push_back(server);
}
}

0 comments on commit 3099e94

Please sign in to comment.