Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Add a bunch of debug to the task-runner. (#2164)
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Aug 18, 2020
1 parent 5b4c1d2 commit bd147b7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
47 changes: 37 additions & 10 deletions iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ lazy_static! {

async fn available_workers(
conn: &mut PoolConnection<Postgres>,
active: Arc<Mutex<HashSet<i32>>>,
ids: Vec<i32>,
) -> Result<Vec<LustreClient>, error::ImlTaskRunnerError> {
let ids = active.lock().await;
let ids: Vec<i32> = ids.iter().copied().collect();

let clients = sqlx::query_as!(
LustreClient,
r#"
Expand Down Expand Up @@ -168,6 +165,13 @@ async fn send_work(

let mut trans = pg_pool.begin().await?;

tracing::debug!(
"Started transaction for {}, {}, {}",
fqdn,
fsname,
task.name
);

let rowlist = sqlx::query_as!(
FidTaskQueue,
r#"
Expand Down Expand Up @@ -301,6 +305,8 @@ async fn run_tasks(fqdn: &str, worker: &LustreClient, xs: Vec<Task>, pool: &PgPo
.inspect_err(|e| tracing::warn!("send_work({}) failed {:?}", task.name, e))
.await?;

tracing::debug!("send_work({}) completed, rc: {}", task.name, rc);

if rc < FID_LIMIT {
break;
}
Expand All @@ -324,6 +330,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
interval.tick().await;

tracing::debug!("Pool State: {:?}", pg_pool);

let mut conn = match pg_pool.try_acquire() {
Some(x) => x,
None => {
Expand All @@ -334,8 +342,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};

let workers = available_workers(&mut conn, Arc::clone(&active_clients)).await?;
drop(conn);
let workers = {
let xs = active_clients.lock().await;
let ids: Vec<i32> = xs.iter().copied().collect();

tracing::debug!("checking workers for ids: {:?}", ids);

let workers = available_workers(&mut conn, ids).await?;

tracing::debug!("got workers: {:?}", workers);

drop(conn);
workers
};

{
let mut x = active_clients.lock().await;
Expand All @@ -348,23 +367,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let xs = workers.into_iter().map(|worker| {
let pg_pool = pg_pool.clone();
let active_clients = Arc::clone(&active_clients);
let active_clients2 = Arc::clone(&active_clients);
let worker_id = worker.id;

async move {
let tasks = tasks_per_worker(&pg_pool, &worker).await?;
let fqdn = worker_fqdn(&pg_pool, &worker).await?;

tracing::debug!("Starting run tasks for {}", &fqdn);

run_tasks(&fqdn, &worker, tasks, &pg_pool).await;

tracing::debug!("Completed run tasks for {}", &fqdn);

Ok::<_, error::ImlTaskRunnerError>(())
}
.then(move |x| async move {
let mut c = active_clients2.lock().await;
tracing::debug!("Attempting to take lock for release");

c.remove(&worker_id);
{
let mut c = active_clients.lock().await;
tracing::debug!("Took lock for release");

tracing::debug!("Releasing Client {:?}. Active Clients {:?}", worker_id, c);
c.remove(&worker_id);

tracing::debug!("Released Client {:?}. Active Clients {:?}", worker_id, c);
}

x
})
Expand Down
2 changes: 1 addition & 1 deletion iml-wire-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@ pub struct Task {
pub running_on_id: Option<i32>,
}

#[derive(serde::Serialize, serde::Deserialize)]
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct LustreClient {
pub id: i32,
pub state_modified_at: DateTime<Utc>,
Expand Down

0 comments on commit bd147b7

Please sign in to comment.