Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use chirp_workflow::prelude::*;

use crate::util::{signal_allocation, NOMAD_CONFIG, NOMAD_REGION};
use crate::util::{NOMAD_CONFIG, NOMAD_REGION};

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2);
Expand All @@ -15,6 +15,7 @@ pub struct Input {

#[workflow]
pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let job_id = unwrap_ref!(input.alloc.job_id);
let alloc_id = unwrap_ref!(input.alloc.ID);
let nomad_node_id = unwrap_ref!(input.alloc.node_id, "alloc has no node id");

Expand Down Expand Up @@ -64,8 +65,8 @@ pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) ->
.await?;

if db_res.kill_alloc {
ctx.activity(KillAllocInput {
alloc_id: alloc_id.clone(),
ctx.activity(DeleteJobInput {
job_id: job_id.clone(),
})
.await?;
}
Expand Down Expand Up @@ -227,37 +228,35 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
.unwrap_or_default();

if kill_alloc {
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing job");
}

Ok(UpdateDbOutput { kill_alloc })
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocInput {
alloc_id: String,
struct DeleteJobInput {
job_id: String,
}

#[activity(KillAlloc)]
async fn kill_alloc(ctx: &ActivityCtx, input: &KillAllocInput) -> GlobalResult<()> {
if let Err(err) = signal_allocation(
#[activity(DeleteJob)]
async fn kill_alloc(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<()> {
if let Err(err) = nomad_client::apis::jobs_api::delete_job(
&NOMAD_CONFIG,
&input.alloc_id,
None,
&input.job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client_old::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?input.alloc_id,
"error while trying to manually kill allocation"
?input.job_id,
"error while trying to manually kill job"
);
}

Expand Down
21 changes: 11 additions & 10 deletions svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;
// TODO: Disabled for now
// chirp_workflow::compat::workflow(
// ctx,
// crate::workflows::drain_all::Input {
// nomad_node_id: ctx.nomad_node_id.clone(),
// drain_timeout: ctx.drain_timeout,
// },
// )
// .await?
// .dispatch()
// .await?;

Ok(())
}
23 changes: 9 additions & 14 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;

use crate::{
util::{signal_allocation, NOMAD_REGION},
workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG},
};
use crate::{util::NOMAD_REGION, workers::NEW_NOMAD_CONFIG};

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down Expand Up @@ -320,26 +317,24 @@ async fn update_db(
.map(|id| id != &alloc_id)
.unwrap_or_default()
{
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing job");

if let Err(err) = signal_allocation(
&NOMAD_CONFIG,
&alloc_id,
None,
if let Err(err) = nomad_client_new::apis::jobs_api::delete_job(
&NEW_NOMAD_CONFIG,
&job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill allocation"
"error while trying to manually kill job"
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/linode/src/util/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub async fn wait_disk_ready(client: &Client, linode_id: u64, disk_id: u64) -> G
loop {
let res = client
.inner()
.get(&format!(
.get(format!(
"https://api.linode.com/v4/linode/instances/{linode_id}/disks/{disk_id}"
))
.send()
Expand Down
8 changes: 4 additions & 4 deletions svc/pkg/linode/src/util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Client {
let res = self
.request(
self.inner
.get(&format!("https://api.linode.com/v4{endpoint}")),
.get(format!("https://api.linode.com/v4{endpoint}")),
None,
false,
)
Expand All @@ -132,7 +132,7 @@ impl Client {
pub async fn delete(&self, endpoint: &str) -> GlobalResult<()> {
self.request(
self.inner
.delete(&format!("https://api.linode.com/v4{endpoint}")),
.delete(format!("https://api.linode.com/v4{endpoint}")),
None,
true,
)
Expand All @@ -149,7 +149,7 @@ impl Client {
let res = self
.request(
self.inner
.post(&format!("https://api.linode.com/v4{endpoint}"))
.post(format!("https://api.linode.com/v4{endpoint}"))
.header("content-type", "application/json"),
Some(body),
false,
Expand All @@ -162,7 +162,7 @@ impl Client {
pub async fn post_no_res(&self, endpoint: &str, body: serde_json::Value) -> GlobalResult<()> {
self.request(
self.inner
.post(&format!("https://api.linode.com/v4{endpoint}"))
.post(format!("https://api.linode.com/v4{endpoint}"))
.header("content-type", "application/json"),
Some(body),
false,
Expand Down
1 change: 1 addition & 0 deletions svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ struct CreateInstanceOutput {
}

#[activity(CreateInstance)]
#[timeout = 120]
async fn create_instance(
ctx: &ActivityCtx,
input: &CreateInstanceInput,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use chirp_workflow::prelude::*;
use rivet_operation::prelude::proto::backend::pkg::nomad;
use serde::Deserialize;
use serde_json::json;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down