Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Don't use try_acquire alongside spawned task
Browse files Browse the repository at this point in the history
We're hitting an issue that looks similar to
launchbadge/sqlx#622

I was able to work around it by disabling fairness, but we can just
avoid the lookup entirely if active workers is >= the pool size.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Aug 18, 2020
1 parent bd147b7 commit 642eaa3
Showing 1 changed file with 13 additions and 21 deletions.
34 changes: 13 additions & 21 deletions iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use iml_action_client::invoke_rust_agent;
use iml_manager_env::get_pool_limit;
use iml_postgres::{
get_db_pool,
sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres},
sqlx::{self, Done, Executor, PgPool},
};
use iml_tracing::tracing;
use iml_wire_types::{
Expand Down Expand Up @@ -37,7 +37,7 @@ lazy_static! {
}

async fn available_workers(
conn: &mut PoolConnection<Postgres>,
pool: &PgPool,
ids: Vec<i32>,
) -> Result<Vec<LustreClient>, error::ImlTaskRunnerError> {
let clients = sqlx::query_as!(
Expand All @@ -53,7 +53,7 @@ async fn available_workers(
&ids,
max(*POOL_LIMIT as i64 - ids.len() as i64, 0),
)
.fetch_all(conn)
.fetch_all(pool)
.await?;

Ok(clients)
Expand Down Expand Up @@ -332,29 +332,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

tracing::debug!("Pool State: {:?}", pg_pool);

let mut conn = match pg_pool.try_acquire() {
Some(x) => x,
None => {
tracing::info!(
"Could not acquire connection (pool full), will try again next tick"
);
continue;
}
};

let workers = {
let ids: Vec<i32> = {
let xs = active_clients.lock().await;
let ids: Vec<i32> = xs.iter().copied().collect();
xs.iter().copied().collect()
};

tracing::debug!("checking workers for ids: {:?}", ids);
if ids.len() as u32 >= *POOL_LIMIT {
tracing::info!("No more capacity to service tasks. Active workers: {:?}, Connection Limit: {}. Will try again next tick.", ids, *POOL_LIMIT);
continue;
}

let workers = available_workers(&mut conn, ids).await?;
tracing::debug!("checking workers for ids: {:?}", ids);

tracing::debug!("got workers: {:?}", workers);
let workers = available_workers(&pg_pool, ids).await?;

drop(conn);
workers
};
tracing::debug!("got workers: {:?}", workers);

{
let mut x = active_clients.lock().await;
Expand Down

0 comments on commit 642eaa3

Please sign in to comment.