Skip to content

Commit bd3d75f

Browse files
committed
fix(job-run): delete second allocation immediately
1 parent f526f3a commit bd3d75f

File tree

5 files changed

+224
-158
lines changed

5 files changed

+224
-158
lines changed

svc/pkg/ds/src/workers/nomad_monitor_eval_update.rs

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -63,41 +63,42 @@ async fn worker(
6363
}
6464
};
6565

66-
// Fetch and update the run
67-
let server_row = sql_fetch_optional!(
68-
[ctx, ServerRow]
69-
"
70-
SELECT server_id, datacenter_id, nomad_eval_plan_ts
71-
UPDATE db_ds.server_nomad
72-
SET nomad_eval_plan_ts = $2
73-
WHERE
74-
nomad_dispatched_job_id = $1 AND
75-
run_meta_nomad.nomad_eval_plan_ts IS NULL
76-
",
77-
job_id,
78-
ctx.ts(),
79-
)
80-
.await?;
81-
82-
// Check if server found
83-
let Some(server_row) = server_row else {
84-
if ctx.req_dt() > util::duration::minutes(5) {
85-
tracing::error!("discarding stale message");
86-
return Ok(());
87-
} else {
88-
retry_bail!("server not found, may be race condition with insertion");
89-
}
90-
};
91-
let server_id = server_row.server_id;
66+
// TODO: Rewrite on workflows
9267

93-
if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts {
94-
tracing::info!(?eval_plan_ts, "eval already planned");
95-
return Ok(());
96-
}
68+
// // Fetch and update the run
69+
// let server_row = sql_fetch_optional!(
70+
// [ctx, ServerRow]
71+
// "
72+
// UPDATE db_ds.server_nomad
73+
// SET nomad_eval_plan_ts = $2
74+
// WHERE
75+
// nomad_dispatched_job_id = $1 AND
76+
// nomad_eval_plan_ts IS NULL
77+
// RETURNING server_id, datacenter_id, nomad_eval_plan_ts
78+
// ",
79+
// job_id,
80+
// ctx.ts(),
81+
// )
82+
// .await?;
83+
84+
// // Check if server found
85+
// let Some(server_row) = server_row else {
86+
// if ctx.req_dt() > util::duration::minutes(5) {
87+
// tracing::error!("discarding stale message");
88+
// return Ok(());
89+
// } else {
90+
// retry_bail!("server not found, may be race condition with insertion");
91+
// }
92+
// };
93+
// let server_id = server_row.server_id;
9794

98-
tracing::info!(%job_id, %server_id, ?eval_status, "updated server");
95+
// if let Some(eval_plan_ts) = server_row.nomad_eval_plan_ts {
96+
// tracing::info!(?eval_plan_ts, "eval already planned");
97+
// return Ok(());
98+
// }
99+
100+
// tracing::info!(%job_id, %server_id, ?eval_status, "updated server");
99101

100-
// TODO: Rewrite on workflows
101102
// match eval_status {
102103
// EvalStatus::Failed => {
103104
// tracing::info!(%server_id, "eval failed");

svc/pkg/job-run/src/util.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub const NOMAD_REGION: &str = "global";
2+
13
// Have to patch `nomad_client::apis::allocations_api::signal_allocation` because it uses `/allocation`
24
// instead of `/client/allocation`
35
pub async fn signal_allocation(

svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use proto::backend::{self, pkg::*};
33
use redis::AsyncCommands;
44
use serde::Deserialize;
55

6-
use crate::workers::NEW_NOMAD_CONFIG;
6+
use crate::{
7+
util::{signal_allocation, NOMAD_REGION},
8+
workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG},
9+
};
710

811
#[derive(Debug, Deserialize)]
912
#[serde(rename_all = "PascalCase")]
@@ -229,11 +232,12 @@ async fn update_db(
229232
let run_row = sql_fetch_optional!(
230233
[ctx, RunRow, @tx tx]
231234
"
232-
SELECT runs.run_id, runs.region_id, runs.stop_ts, run_meta_nomad.alloc_plan_ts
233-
FROM db_job_state.run_meta_nomad
234-
INNER JOIN db_job_state.runs ON runs.run_id = run_meta_nomad.run_id
235+
SELECT r.run_id, r.region_id, r.stop_ts, rn.alloc_plan_ts
236+
FROM db_job_state.run_meta_nomad AS rn
237+
INNER JOIN db_job_state.runs AS r
238+
ON r.run_id = rn.run_id
235239
WHERE dispatched_job_id = $1
236-
FOR UPDATE OF run_meta_nomad
240+
FOR UPDATE OF rn
237241
",
238242
&job_id,
239243
)
@@ -255,7 +259,13 @@ async fn update_db(
255259
[ctx, @tx tx]
256260
"
257261
UPDATE db_job_state.run_meta_nomad
258-
SET alloc_id = $2, alloc_plan_ts = $3, node_id = $4, node_name = $5, node_public_ipv4 = $6, node_vlan_ipv4 = $7
262+
SET
263+
alloc_id = $2,
264+
alloc_plan_ts = $3,
265+
node_id = $4,
266+
node_name = $5,
267+
node_public_ipv4 = $6,
268+
node_vlan_ipv4 = $7
259269
WHERE run_id = $1
260270
",
261271
run_row.run_id,
@@ -301,6 +311,29 @@ async fn update_db(
301311
)
302312
.await?;
303313
}
314+
} else {
315+
tracing::warn!(%run_id, %alloc_id, "run was already allocated before, killing new allocation");
316+
317+
if let Err(err) = signal_allocation(
318+
&NOMAD_CONFIG,
319+
&alloc_id,
320+
None,
321+
Some(NOMAD_REGION),
322+
None,
323+
None,
324+
Some(nomad_client::models::AllocSignalRequest {
325+
task: None,
326+
signal: Some("SIGKILL".to_string()),
327+
}),
328+
)
329+
.await
330+
{
331+
tracing::warn!(
332+
?err,
333+
?alloc_id,
334+
"error while trying to manually kill allocation"
335+
);
336+
}
304337
}
305338

306339
// Update the run ports

0 commit comments

Comments
 (0)