Skip to content

Commit

Permalink
fix: can cancel only if can disable schedule + stop worker on force c…
Browse files Browse the repository at this point in the history
…ancel + soft cancel job parent on job cancel (#3670)

* fix: can cancel only if can disable schedule + stop worker on force cancel + cancel job parents

* fix: sqlx build

* fix: flow reschedule check + soft cancel only direct parent

* fix: sqlx prepare

* fix: only soft cancel parent if force cancel and job is a flow step
  • Loading branch information
HugoCasa committed May 7, 2024
1 parent cd1711c commit 010662d
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 45 deletions.
2 changes: 2 additions & 0 deletions backend/windmill-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub enum Error {
JsonErr(serde_json::Value),
#[error("{0}")]
OpenAIError(String),
#[error("{0}")]
AlreadyCompleted(String),
}

impl Error {
Expand Down
122 changes: 93 additions & 29 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,24 @@ pub struct CanceledBy {
pub reason: Option<String>,
}

#[async_recursion]
pub async fn cancel_job<'c: 'async_recursion>(
pub async fn cancel_single_job<'c>(
username: &str,
reason: Option<String>,
id: Uuid,
job_running: &QueuedJob,
w_id: &str,
mut tx: Transaction<'c, Postgres>,
db: &Pool<Postgres>,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
force_cancel: bool,
) -> error::Result<(Transaction<'c, Postgres>, Option<Uuid>)> {
let job_running = get_queued_job_tx(id, &w_id, &mut tx).await?;

if job_running.is_none() {
return Ok((tx, None));
}
let job_running = job_running.unwrap();

if ((job_running.running || job_running.root_job.is_some()) || (job_running.is_flow()))
&& !force_cancel
{
let id = sqlx::query_scalar!(
"UPDATE queue SET canceled = true, canceled_by = $1, canceled_reason = $2, scheduled_for = now(), suspend = 0 WHERE id = $3 AND workspace_id = $4 RETURNING id",
username,
reason,
id,
job_running.id,
w_id
)
.fetch_optional(&mut *tx)
Expand All @@ -172,15 +164,15 @@ pub async fn cancel_job<'c: 'async_recursion>(
.unwrap_or_else(|| "unexplicited reasons".to_string());
let e = serde_json::json!({"message": format!("Job canceled: {reason} by {username}"), "name": "Canceled", "reason": reason, "canceler": username});
append_logs(
id,
job_running.id,
w_id.to_string(),
format!("canceled by {username}: (force cancel: {force_cancel})"),
db,
)
.await;
let add_job = add_completed_job_error(
&db,
&job_running,
job_running,
job_running.mem_peak.unwrap_or(0),
Some(CanceledBy { username: Some(username.to_string()), reason: Some(reason) }),
e,
Expand All @@ -189,16 +181,73 @@ pub async fn cancel_job<'c: 'async_recursion>(
false,
)
.await;

if let Err(e) = add_job {
tracing::error!("Failed to add canceled job: {}", e);
}
}
if let Some(mut rsmq) = rsmq.clone() {
rsmq.change_message_visibility(&job_running.tag, &id.to_string(), 0)
rsmq.change_message_visibility(&job_running.tag, &job_running.id.to_string(), 0)
.await
.map_err(|e| anyhow::anyhow!(e))?;
}

Ok((tx, Some(job_running.id)))
}

pub async fn cancel_job<'c>(
username: &str,
reason: Option<String>,
id: Uuid,
w_id: &str,
mut tx: Transaction<'c, Postgres>,
db: &Pool<Postgres>,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
force_cancel: bool,
) -> error::Result<(Transaction<'c, Postgres>, Option<Uuid>)> {
let job = get_queued_job_tx(id, &w_id, &mut tx).await?;

if job.is_none() {
return Ok((tx, None));
}

let job = job.unwrap();

let (ntx, _) = cancel_single_job(
username,
reason.clone(),
&job,
w_id,
tx,
db,
rsmq.clone(),
force_cancel,
)
.await?;
tx = ntx;

// soft cancel parent if force cancel and job is a flow step
if force_cancel && job.is_flow_step {
if let Some(parent_job_id) = job.parent_job {
let job = get_queued_job_tx(parent_job_id, &w_id, &mut tx).await?;
if let Some(job) = job {
let (ntx, _) = cancel_single_job(
username,
reason.clone(),
&job,
w_id,
tx,
db,
rsmq.clone(),
false,
)
.await?;
tx = ntx;
}
}
}

// cancel children
let mut jobs = vec![id];
let mut jobs_to_cancel = vec![];
while !jobs.is_empty() {
Expand All @@ -213,19 +262,23 @@ pub async fn cancel_job<'c: 'async_recursion>(
jobs.extend(new_jobs.clone());
jobs_to_cancel.extend(new_jobs);
}
for job in jobs_to_cancel {
let (ntx, _) = cancel_job(
username,
reason.clone(),
job,
w_id,
tx,
db,
rsmq.clone(),
force_cancel,
)
.await?;
tx = ntx;
for job_id in jobs_to_cancel {
let job = get_queued_job_tx(job_id, &w_id, &mut tx).await?;

if let Some(job) = job {
let (ntx, _) = cancel_single_job(
username,
reason.clone(),
&job,
w_id,
tx,
db,
rsmq.clone(),
force_cancel,
)
.await?;
tx = ntx;
}
}
Ok((tx, Some(id)))
}
Expand Down Expand Up @@ -633,8 +686,19 @@ pub async fn add_completed_job<
if let Some(schedule) = schedule {
skip_downstream_error_handlers = schedule.ws_error_handler_muted;

if !queued_job.is_flow() {
// script only
// script or flow that failed on start and might not have been rescheduled
let schedule_next_tick = !queued_job.is_flow()
|| {
let flow_status = queued_job.parse_flow_status();
flow_status.is_some_and(|fs| {
fs.step == 0
&& fs.modules.get(0).is_some_and(|m| {
matches!(m, FlowStatusModule::WaitingForPriorSteps { .. }) || matches!(m, FlowStatusModule::Failure { job, ..} if job == &Uuid::nil())
})
})
};

if schedule_next_tick {
if let Err(err) = handle_maybe_scheduled_job(
rsmq.clone(),
db,
Expand Down
41 changes: 32 additions & 9 deletions backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ pub async fn run_future_with_polling_update_job_poller<Fut, T>(
result_f: Fut,
worker_name: &str,
w_id: &str,
) -> anyhow::Result<T>
) -> error::Result<T>
where
Fut: Future<Output = anyhow::Result<T>>,
{
Expand Down Expand Up @@ -514,12 +514,22 @@ where
tracing::error!("Query timeout: {}", e);
Error::ExecutionErr(format!("Query timeout after (>{}s)", timeout_ms/1000))
})?,
_ = update_job, if job_id != Uuid::nil() => Err(Error::ExecutionErr("Job cancelled".to_string())).map_err(to_anyhow)?,
ex = update_job, if job_id != Uuid::nil() => {
match ex {
UpdateJobPollingExit::Done => Err(Error::ExecutionErr("Job cancelled".to_string())).map_err(to_anyhow)?,
UpdateJobPollingExit::AlreadyCompleted => Err(Error::AlreadyCompleted("Job already completed".to_string())).map_err(to_anyhow)?,
}
}
}?;
drop(tx);
Ok(rows)
}

pub enum UpdateJobPollingExit {
Done,
AlreadyCompleted,
}

pub async fn update_job_poller<F, Fut>(
job_id: Uuid,
db: &DB,
Expand All @@ -529,13 +539,14 @@ pub async fn update_job_poller<F, Fut>(
worker_name: &str,
w_id: &str,
mut rx: broadcast::Receiver<()>,
) where
) -> UpdateJobPollingExit
where
F: Fn() -> Fut,
Fut: Future<Output = i32>,
{
let update_job_interval = Duration::from_millis(500);
if job_id == Uuid::nil() {
return;
return UpdateJobPollingExit::Done;
}
let db = db.clone();

Expand Down Expand Up @@ -593,28 +604,36 @@ pub async fn update_job_poller<F, Fut>(
}
}

let (canceled, canceled_by, canceled_reason) = sqlx::query_as::<_, (bool, Option<String>, Option<String>)>("UPDATE queue SET mem_peak = $1, last_ping = now() WHERE id = $2 RETURNING canceled, canceled_by, canceled_reason")
let (canceled, canceled_by, canceled_reason, already_completed) = sqlx::query_as::<_, (bool, Option<String>, Option<String>, bool)>("UPDATE queue SET mem_peak = $1, last_ping = now() WHERE id = $2 RETURNING canceled, canceled_by, canceled_reason, false")
.bind(*mem_peak)
.bind(job_id)
.fetch_optional(&db)
.await
.unwrap_or_else(|e| {
tracing::error!(%e, "error updating job {job_id}: {e}");
Some((false, None, None))
Some((false, None, None, false))
})
.unwrap_or((false, None, None));
.unwrap_or_else(|| {
// if the job is not in queue, it can only be in the completed_job so it is already complete
(false, None, None, true)
});
if already_completed {
return UpdateJobPollingExit::AlreadyCompleted
}
if canceled {
canceled_by_ref.replace(CanceledBy {
username: canceled_by.clone(),
reason: canceled_reason.clone(),
});
break;
break
}
}
},
);
}
tracing::info!("job {job_id} finished");

UpdateJobPollingExit::Done
}

pub enum CompactLogs {
Expand Down Expand Up @@ -897,6 +916,7 @@ pub async fn handle_child(
TooManyLogs,
Timeout,
Cancelled,
AlreadyCompleted,
}

let (timeout_duration, timeout_warn_msg) =
Expand All @@ -914,7 +934,10 @@ pub async fn handle_child(
result = child.wait() => return result.map(Ok),
Ok(()) = too_many_logs.changed() => KillReason::TooManyLogs,
_ = sleep(timeout_duration) => KillReason::Timeout,
_ = update_job, if job_id != Uuid::nil() => KillReason::Cancelled,
ex = update_job, if job_id != Uuid::nil() => match ex {
UpdateJobPollingExit::Done => KillReason::Cancelled,
UpdateJobPollingExit::AlreadyCompleted => KillReason::AlreadyCompleted,
},
};
tx.send(()).expect("rx should never be dropped");
drop(tx);
Expand Down
11 changes: 9 additions & 2 deletions backend/windmill-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ pub async fn run_worker<R: rsmq_async::RsmqConnection + Send + Sync + Clone + 's
let tag = job.tag.clone();

let arc_job = Arc::new(job);
if let Some(err) = handle_queued_job(
if let Err(err) = handle_queued_job(
arc_job.clone(),
db,
&authed_client,
Expand All @@ -1686,7 +1686,6 @@ pub async fn run_worker<R: rsmq_async::RsmqConnection + Send + Sync + Clone + 's
worker_code_execution_duration.clone(),
)
.await
.err()
{
let is_init_script = arc_job.tag.as_str() == INIT_SCRIPT_TAG;
handle_job_error(
Expand Down Expand Up @@ -2831,6 +2830,14 @@ async fn handle_queued_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
if job.as_ref().workspace_id == "" {
return Ok(());
}

if result
.as_ref()
.is_err_and(|err| matches!(err, &Error::AlreadyCompleted(_)))
{
return Ok(());
}

process_result(
job,
result,
Expand Down
4 changes: 1 addition & 3 deletions frontend/src/lib/components/FlowMetadata.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
export let job: Job
const SMALL_ICON_SIZE = 14
let scheduleEditor: ScheduleEditor
export let scheduleEditor: ScheduleEditor
</script>

<ScheduleEditor bind:this={scheduleEditor} />

<div
class="rounded-md p-3 bg-surface-secondary shadow-sm sm:text-sm md:text-base"
style="min-height: 150px;"
Expand Down

0 comments on commit 010662d

Please sign in to comment.