Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions svc/pkg/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ async fn inner(
// Sort job servers by allocated memory
servers.sort_by_key(|server| memory_by_server.get(&server.server_id));

// TODO: remove
tracing::info!(server_ids=?servers.iter().map(|s| s.server_id).collect::<Vec<_>>(), ?memory_by_server, "server topo");

// TODO: RVT-3732 Sort gg and ats servers by cpu usage
// servers.sort_by_key

Expand Down
1 change: 1 addition & 0 deletions svc/pkg/job-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub fn registry() -> WorkflowResult<Registry> {

let mut registry = Registry::new();
registry.register_workflow::<drain_all::Workflow>()?;
registry.register_workflow::<drain_all::Workflow2>()?;

Ok(registry)
}
21 changes: 10 additions & 11 deletions svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
// TODO: Disabled for now
// chirp_workflow::compat::workflow(
// ctx,
// crate::workflows::drain_all::Input {
// nomad_node_id: ctx.nomad_node_id.clone(),
// drain_timeout: ctx.drain_timeout,
// },
// )
// .await?
// .dispatch()
// .await?;
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input2 {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;

Ok(())
}
6 changes: 1 addition & 5 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,7 @@ async fn update_db(
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill job"
);
tracing::warn!(?err, ?alloc_id, "error while trying to manually kill job");
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions svc/pkg/job-run/src/workflows/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ pub async fn job_run_drain_all(ctx: &mut WorkflowCtx, input: &Input) -> GlobalRe
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Input2 {
pub nomad_node_id: String,
pub drain_timeout: u64,
}

#[workflow(Workflow2)]
pub async fn job_run_drain_all2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> {
// We fetch here so that when we kill allocs later, we don't refetch new job runs that might be on the
// nomad node. Only allocs fetched at this time will be killed.
let job_runs = ctx
.activity(FetchJobRunsInput {
nomad_node_id: input.nomad_node_id.clone(),
})
.await?;

ctx.sleep(input.drain_timeout.saturating_sub(DRAIN_PADDING))
.await?;

ctx.activity(StopJobRuns2Input {
run_ids: job_runs.iter().map(|jr| jr.run_id).collect(),
})
.await?;

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct FetchJobRunsInput {
nomad_node_id: String,
Expand Down Expand Up @@ -95,6 +122,24 @@ async fn stop_job_runs(ctx: &ActivityCtx, input: &StopJobRunsInput) -> GlobalRes
Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct StopJobRuns2Input {
run_ids: Vec<Uuid>,
}

#[activity(StopJobRuns2)]
async fn stop_job_runs2(ctx: &ActivityCtx, input: &StopJobRuns2Input) -> GlobalResult<()> {
for run_id in &input.run_ids {
msg!([ctx] job_run::msg::stop(run_id) {
run_id: Some((*run_id).into()),
skip_kill_alloc: false,
})
.await?;
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocsInput {
nomad_node_id: String,
Expand Down
29 changes: 2 additions & 27 deletions svc/pkg/mm/worker/src/workers/nomad_node_closed_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,9 @@ async fn worker(
}

pipe.query_async(&mut ctx.redis_mm().await?).await?;
} else {
let mut script = REDIS_SCRIPT.prepare_invoke();

script.arg(lobby_rows.len());

for lobby in lobby_rows {
script
.key(util_mm::key::lobby_config(lobby.lobby_id))
.key(util_mm::key::lobby_player_ids(lobby.lobby_id))
.key(util_mm::key::lobby_available_spots(
lobby.namespace_id,
datacenter_id,
lobby.lobby_group_id,
util_mm::JoinKind::Normal,
))
.key(util_mm::key::lobby_available_spots(
lobby.namespace_id,
datacenter_id,
lobby.lobby_group_id,
util_mm::JoinKind::Party,
))
.arg(lobby.lobby_id.to_string())
.arg(lobby.max_players_normal)
.arg(lobby.max_players_party);
}

script.invoke_async(&mut ctx.redis_mm().await?).await?;
}

// NOTE: Don't do anything on undrain

Ok(())
}