Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,33 +510,35 @@ impl ServiceContextData {
);
}

let can_depend =
if self.is_monolith_worker() {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
} else if matches!(self.config().kind, ServiceKind::Api { .. }) {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
};
let can_depend = if self.is_monolith_worker() {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
} else if matches!(self.config().kind, ServiceKind::Api { .. }) {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::ApiRoutes { .. }
| ServiceKind::Consumer { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::Operation { .. }
| ServiceKind::Package { .. }
| ServiceKind::Consumer { .. }
)
};

if !can_depend {
panic!(
Expand Down
14 changes: 9 additions & 5 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ impl DatabasePgNats {
T: 'a,
{
let mut backoff = rivet_util::Backoff::new(3, None, QUERY_RETRY_MS, 50);
let mut i = 0;

for _ in 0..MAX_QUERY_RETRIES {
loop {
match cb().await {
Err(WorkflowError::Sqlx(err)) => {
i += 1;
if i > MAX_QUERY_RETRIES {
return Err(WorkflowError::MaxSqlRetries(err));
}

use sqlx::Error::*;
match &err {
// Retry transaction errors immediately
Expand All @@ -88,13 +94,13 @@ impl DatabasePgNats {
.message()
.contains("TransactionRetryWithProtoRefreshError")
{
tracing::info!(message=%db_err.message(), "transaction retry");
tracing::warn!(message=%db_err.message(), "transaction retry");
}
}
// Retry internal errors with a backoff
Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::info!(?err, "query retry");
tracing::warn!(?err, "query retry");
backoff.tick().await;
}
// Throw error
Expand All @@ -104,8 +110,6 @@ impl DatabasePgNats {
x => return x,
}
}

Err(WorkflowError::MaxSqlRetries)
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ pub enum WorkflowError {
#[error("sql: {0}")]
Sqlx(sqlx::Error),

#[error("max sql retries")]
MaxSqlRetries,
#[error("max sql retries (last error: {0})")]
MaxSqlRetries(sqlx::Error),

#[error("pools: {0}")]
Pools(#[from] rivet_pools::Error),
Expand Down
6 changes: 5 additions & 1 deletion svc/pkg/cluster/src/ops/server/prune_with_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ async fn run_for_linode_account(
tracing::info!("pruning {} servers", servers.len());

for server in servers {
let linode_id = unwrap_ref!(server.provider_server_id).parse()?;
let Some(linode_id) = &server.provider_server_id else {
tracing::warn!(server_id = ?server.server_id, "provider_server_ide is none");
continue;
};
let linode_id = linode_id.parse()?;

tracing::info!("pruning {} (linode_id {})", server.server_id, linode_id);

Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ async fn inner(
.map(TryInto::try_into)
.collect::<GlobalResult<Vec<Server>>>()?;

// Sort job servers by memory usage
// Sort job servers by allocated memory
servers.sort_by_key(|server| memory_by_server.get(&server.server_id));

// TODO: RVT-3732 Sort gg and ats servers by cpu usage
Expand Down Expand Up @@ -388,7 +388,6 @@ async fn scale_down_job_servers(

let drain_candidates = nomad_servers
.iter()
.rev()
.take(drain_count)
.map(|server| server.server_id);

Expand Down Expand Up @@ -420,7 +419,6 @@ async fn scale_down_gg_servers<'a, I: Iterator<Item = &'a Server> + DoubleEndedI
tracing::info!(count=%drain_count, "draining gg servers");

let drain_candidates = installed_servers
.rev()
.take(drain_count)
.map(|server| server.server_id);

Expand Down Expand Up @@ -455,7 +453,6 @@ async fn scale_down_ats_servers<
tracing::info!(count=%drain_count, "draining ats servers");

let drain_candidates = installed_servers
.rev()
.take(drain_count)
.map(|server| server.server_id);

Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/cluster/src/workflows/server/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct InstallOverSshInput {
}

#[activity(InstallOverSsh)]
#[timeout = 120]
#[timeout = 300]
#[max_retries = 10]
async fn install_over_ssh(ctx: &ActivityCtx, input: &InstallOverSshInput) -> GlobalResult<()> {
let public_ip = input.public_ip;
Expand Down
33 changes: 16 additions & 17 deletions svc/pkg/ds/src/workflows/server/nomad_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use chirp_workflow::prelude::*;

use crate::util::{signal_allocation, NOMAD_CONFIG, NOMAD_REGION};
use crate::util::{NOMAD_CONFIG, NOMAD_REGION};

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2);
Expand All @@ -15,6 +15,7 @@ pub struct Input {

#[workflow]
pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let job_id = unwrap_ref!(input.alloc.job_id);
let alloc_id = unwrap_ref!(input.alloc.ID);
let nomad_node_id = unwrap_ref!(input.alloc.node_id, "alloc has no node id");

Expand Down Expand Up @@ -64,8 +65,8 @@ pub async fn ds_server_nomad_alloc_plan(ctx: &mut WorkflowCtx, input: &Input) ->
.await?;

if db_res.kill_alloc {
ctx.activity(KillAllocInput {
alloc_id: alloc_id.clone(),
ctx.activity(DeleteJobInput {
job_id: job_id.clone(),
})
.await?;
}
Expand Down Expand Up @@ -227,37 +228,35 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<Upd
.unwrap_or_default();

if kill_alloc {
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(server_id=%input.server_id, existing_alloc_id=?nomad_alloc_id, new_alloc_id=%input.alloc_id, "different allocation id given, killing job");
}

Ok(UpdateDbOutput { kill_alloc })
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocInput {
alloc_id: String,
struct DeleteJobInput {
job_id: String,
}

#[activity(KillAlloc)]
async fn kill_alloc(ctx: &ActivityCtx, input: &KillAllocInput) -> GlobalResult<()> {
if let Err(err) = signal_allocation(
#[activity(DeleteJob)]
async fn kill_alloc(ctx: &ActivityCtx, input: &DeleteJobInput) -> GlobalResult<()> {
if let Err(err) = nomad_client::apis::jobs_api::delete_job(
&NOMAD_CONFIG,
&input.alloc_id,
None,
&input.job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client_old::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?input.alloc_id,
"error while trying to manually kill allocation"
?input.job_id,
"error while trying to manually kill job"
);
}

Expand Down
1 change: 1 addition & 0 deletions svc/pkg/job-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub fn registry() -> WorkflowResult<Registry> {

let mut registry = Registry::new();
registry.register_workflow::<drain_all::Workflow>()?;
registry.register_workflow::<drain_all::Workflow2>()?;

Ok(registry)
}
2 changes: 1 addition & 1 deletion svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use proto::backend::pkg::*;
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input {
crate::workflows::drain_all::Input2 {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
Expand Down
27 changes: 9 additions & 18 deletions svc/pkg/job-run/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;

use crate::{
util::{signal_allocation, NOMAD_REGION},
workers::{NEW_NOMAD_CONFIG, NOMAD_CONFIG},
};
use crate::{util::NOMAD_REGION, workers::NEW_NOMAD_CONFIG};

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down Expand Up @@ -320,27 +317,21 @@ async fn update_db(
.map(|id| id != &alloc_id)
.unwrap_or_default()
{
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing new allocation");
tracing::warn!(%run_id, existing_alloc_id=?run_row.alloc_id, new_alloc_id=%alloc_id, "different allocation id given, killing job");

if let Err(err) = signal_allocation(
&NOMAD_CONFIG,
&alloc_id,
None,
if let Err(err) = nomad_client_new::apis::jobs_api::delete_job(
&NEW_NOMAD_CONFIG,
&job_id,
Some(NOMAD_REGION),
None,
None,
Some(nomad_client::models::AllocSignalRequest {
task: None,
signal: Some("SIGKILL".to_string()),
}),
None,
Some(false),
None,
)
.await
{
tracing::warn!(
?err,
?alloc_id,
"error while trying to manually kill allocation"
);
tracing::warn!(?err, ?alloc_id, "error while trying to manually kill job");
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions svc/pkg/job-run/src/workflows/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ pub async fn job_run_drain_all(ctx: &mut WorkflowCtx, input: &Input) -> GlobalRe
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Input2 {
pub nomad_node_id: String,
pub drain_timeout: u64,
}

#[workflow(Workflow2)]
pub async fn job_run_drain_all2(ctx: &mut WorkflowCtx, input: &Input2) -> GlobalResult<()> {
// We fetch here so that when we kill allocs later, we don't refetch new job runs that might be on the
// nomad node. Only allocs fetched at this time will be killed.
let job_runs = ctx
.activity(FetchJobRunsInput {
nomad_node_id: input.nomad_node_id.clone(),
})
.await?;

ctx.sleep(input.drain_timeout.saturating_sub(DRAIN_PADDING))
.await?;

ctx.activity(StopJobRuns2Input {
run_ids: job_runs.iter().map(|jr| jr.run_id).collect(),
})
.await?;

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct FetchJobRunsInput {
nomad_node_id: String,
Expand Down Expand Up @@ -95,6 +122,24 @@ async fn stop_job_runs(ctx: &ActivityCtx, input: &StopJobRunsInput) -> GlobalRes
Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct StopJobRuns2Input {
run_ids: Vec<Uuid>,
}

#[activity(StopJobRuns2)]
async fn stop_job_runs2(ctx: &ActivityCtx, input: &StopJobRuns2Input) -> GlobalResult<()> {
for run_id in &input.run_ids {
msg!([ctx] job_run::msg::stop(run_id) {
run_id: Some((*run_id).into()),
skip_kill_alloc: false,
})
.await?;
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct KillAllocsInput {
nomad_node_id: String,
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/linode/src/util/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub async fn wait_disk_ready(client: &Client, linode_id: u64, disk_id: u64) -> G
loop {
let res = client
.inner()
.get(&format!(
.get(format!(
"https://api.linode.com/v4/linode/instances/{linode_id}/disks/{disk_id}"
))
.send()
Expand Down
Loading