diff --git a/lib/chirp-workflow/core/src/builder/common/message.rs b/lib/chirp-workflow/core/src/builder/common/message.rs index 3134908464..4837eab3c3 100644 --- a/lib/chirp-workflow/core/src/builder/common/message.rs +++ b/lib/chirp-workflow/core/src/builder/common/message.rs @@ -1,6 +1,6 @@ use std::fmt::Display; -use global_error::{GlobalError, GlobalResult}; +use global_error::GlobalResult; use serde::Serialize; use crate::{builder::BuilderError, ctx::MessageCtx, message::Message}; @@ -10,7 +10,7 @@ pub struct MessageBuilder<'a, M: Message> { body: M, tags: serde_json::Map, wait: bool, - error: Option, + error: Option, } impl<'a, M: Message> MessageBuilder<'a, M> { @@ -66,7 +66,7 @@ impl<'a, M: Message> MessageBuilder<'a, M> { pub async fn send(self) -> GlobalResult<()> { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } tracing::info!(msg_name=%M::NAME, tags=?self.tags, "dispatching message"); diff --git a/lib/chirp-workflow/core/src/builder/common/signal.rs b/lib/chirp-workflow/core/src/builder/common/signal.rs index f112754dca..a0e4733d9c 100644 --- a/lib/chirp-workflow/core/src/builder/common/signal.rs +++ b/lib/chirp-workflow/core/src/builder/common/signal.rs @@ -12,7 +12,7 @@ pub struct SignalBuilder { body: T, to_workflow_id: Option, tags: serde_json::Map, - error: Option, + error: Option, } impl SignalBuilder { @@ -69,7 +69,7 @@ impl SignalBuilder { pub async fn send(self) -> GlobalResult { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } let signal_id = Uuid::new_v4(); diff --git a/lib/chirp-workflow/core/src/builder/common/workflow.rs b/lib/chirp-workflow/core/src/builder/common/workflow.rs index ffb55655d2..4d207471d6 100644 --- a/lib/chirp-workflow/core/src/builder/common/workflow.rs +++ b/lib/chirp-workflow/core/src/builder/common/workflow.rs @@ -17,7 +17,7 @@ pub struct WorkflowBuilder { ray_id: Uuid, input: I, tags: serde_json::Map, - error: Option, + error: Option, } impl WorkflowBuilder @@ -66,7 +66,7 @@ where pub async fn dispatch(self) -> GlobalResult { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } let workflow_name = I::Workflow::NAME; diff --git a/lib/chirp-workflow/core/src/builder/mod.rs b/lib/chirp-workflow/core/src/builder/mod.rs index 0509add1c7..13137f00c5 100644 --- a/lib/chirp-workflow/core/src/builder/mod.rs +++ b/lib/chirp-workflow/core/src/builder/mod.rs @@ -11,4 +11,9 @@ pub(crate) enum BuilderError { NoWorkflowIdOrTags, #[error("cannot dispatch a workflow/signal from an operation within a workflow execution. trigger it from the workflow's body")] CannotDispatchFromOpInWorkflow, + #[error("using tags on a sub workflow ({0}) with `.output()` is not supported")] + TagsOnSubWorkflowOutputNotSupported(&'static str), + + #[error("serde: {0}")] + Serde(#[from] serde_json::Error), } diff --git a/lib/chirp-workflow/core/src/builder/workflow/message.rs b/lib/chirp-workflow/core/src/builder/workflow/message.rs index 5351723138..a5e17ce434 100644 --- a/lib/chirp-workflow/core/src/builder/workflow/message.rs +++ b/lib/chirp-workflow/core/src/builder/workflow/message.rs @@ -12,7 +12,7 @@ pub struct MessageBuilder<'a, M: Message> { body: M, tags: serde_json::Map, wait: bool, - error: Option, + error: Option, } impl<'a, M: Message> MessageBuilder<'a, M> { @@ -68,7 +68,7 @@ impl<'a, M: Message> MessageBuilder<'a, M> { pub async fn send(self) -> GlobalResult<()> { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } let event = self.ctx.current_history_event(); diff --git a/lib/chirp-workflow/core/src/builder/workflow/signal.rs b/lib/chirp-workflow/core/src/builder/workflow/signal.rs index c06489e262..c2d7706067 100644 --- a/lib/chirp-workflow/core/src/builder/workflow/signal.rs +++ b/lib/chirp-workflow/core/src/builder/workflow/signal.rs @@ -13,7 +13,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> { body: T, to_workflow_id: Option, tags: serde_json::Map, - error: Option, + error: Option, } impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { @@ -69,7 +69,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { pub async fn send(self) -> GlobalResult { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } let event = self.ctx.current_history_event(); diff --git a/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs b/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs index c0cfe46f3f..b4422fa67b 100644 --- a/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs +++ b/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs @@ -7,7 +7,8 @@ use uuid::Uuid; use crate::{ builder::BuilderError, ctx::WorkflowCtx, - error::WorkflowError, + error::{WorkflowError, WorkflowResult}, + event::Event, workflow::{Workflow, WorkflowInput}, }; @@ -15,7 +16,7 @@ pub struct SubWorkflowBuilder<'a, I: WorkflowInput> { ctx: &'a mut WorkflowCtx, input: I, tags: serde_json::Map, - error: Option, + error: Option, } impl<'a, I: WorkflowInput> SubWorkflowBuilder<'a, I> @@ -63,67 +64,32 @@ where pub async fn dispatch(self) -> GlobalResult { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } - let sub_workflow_name = I::Workflow::NAME; - let sub_workflow_id = Uuid::new_v4(); - - let no_tags = self.tags.is_empty(); - let tags = serde_json::Value::Object(self.tags); - let tags = if no_tags { None } else { Some(&tags) }; - - tracing::info!( - name=%self.ctx.name(), - id=%self.ctx.workflow_id(), - %sub_workflow_name, - %sub_workflow_id, - ?tags, - input=?self.input, - "dispatching sub workflow" - ); - - // Serialize input - let input_val = serde_json::to_value(&self.input) - .map_err(WorkflowError::SerializeWorkflowOutput) - .map_err(GlobalError::raw)?; + let tags = if self.tags.is_empty() { + None + } else { + Some(serde_json::Value::Object(self.tags)) + }; - self.ctx - .db() - .dispatch_sub_workflow( - self.ctx.ray_id(), - self.ctx.workflow_id(), - self.ctx.full_location().as_ref(), - sub_workflow_id, - &sub_workflow_name, - tags, - input_val, - self.ctx.loop_location(), - ) + Self::dispatch_workflow_inner(self.ctx, self.input, tags) .await - .map_err(GlobalError::raw)?; - - tracing::info!( - name=%self.ctx.name(), - id=%self.ctx.workflow_id(), - %sub_workflow_name, - ?sub_workflow_id, - "sub workflow dispatched" - ); - - Ok(sub_workflow_id) + .map_err(GlobalError::raw) } pub async fn output( self, ) -> GlobalResult<<::Workflow as Workflow>::Output> { if let Some(err) = self.error { - return Err(err); + return Err(err.into()); } - let no_tags = self.tags.is_empty(); - let tags = serde_json::Value::Object(self.tags); - let tags = if no_tags { None } else { Some(&tags) }; + if !self.tags.is_empty() { + return Err( + BuilderError::TagsOnSubWorkflowOutputNotSupported(I::Workflow::NAME).into(), + ); + } // Lookup workflow let Ok(workflow) = self.ctx.registry().get_workflow(I::Workflow::NAME) else { @@ -134,11 +100,19 @@ where "sub workflow not found in current registry", ); + let tags = if self.tags.is_empty() { + None + } else { + Some(serde_json::Value::Object(self.tags)) + }; + // TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub // worker in the registry, this will diverge in history because it will try to run the sub worker // in-process during the replay // If the workflow isn't in the current registry, dispatch the workflow instead - let sub_workflow_id = self.ctx.dispatch_workflow_inner(tags, self.input).await?; + let sub_workflow_id = Self::dispatch_workflow_inner(self.ctx, self.input, tags) + .await + .map_err(GlobalError::raw)?; let output = self .ctx .wait_for_workflow::(sub_workflow_id) @@ -168,4 +142,93 @@ where Ok(output) } + + async fn dispatch_workflow_inner( + ctx: &mut WorkflowCtx, + input: I, + tags: Option, + ) -> WorkflowResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let event = ctx.current_history_event(); + + // Signal received before + let id = if let Some(event) = event { + // Validate history is consistent + let Event::SubWorkflow(sub_workflow) = event else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event} at {}, found sub workflow {}", + ctx.loc(), + I::Workflow::NAME + ))); + }; + + if sub_workflow.name != I::Workflow::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event} at {}, found sub_workflow {}", + ctx.loc(), + I::Workflow::NAME + ))); + } + + tracing::debug!( + name=%ctx.name(), + id=%ctx.workflow_id(), + sub_workflow_name=%sub_workflow.name, + sub_workflow_id=%sub_workflow.sub_workflow_id, + "replaying workflow dispatch" + ); + + sub_workflow.sub_workflow_id + } + // Dispatch new workflow + else { + let sub_workflow_name = I::Workflow::NAME; + let sub_workflow_id = Uuid::new_v4(); + + tracing::info!( + name=%ctx.name(), + id=%ctx.workflow_id(), + %sub_workflow_name, + %sub_workflow_id, + ?tags, + ?input, + "dispatching sub workflow" + ); + + // Serialize input + let input_val = + serde_json::to_value(input).map_err(WorkflowError::SerializeWorkflowOutput)?; + + ctx.db() + .dispatch_sub_workflow( + ctx.ray_id(), + ctx.workflow_id(), + ctx.full_location().as_ref(), + sub_workflow_id, + &sub_workflow_name, + tags.as_ref(), + input_val, + ctx.loop_location(), + ) + .await?; + + tracing::info!( + name=%ctx.name(), + id=%ctx.workflow_id(), + %sub_workflow_name, + ?sub_workflow_id, + "sub workflow dispatched" + ); + + sub_workflow_id + }; + + // Move to next event + ctx.inc_location(); + + Ok(id) + } } diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 639d5e41d3..e990e244d4 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -395,100 +395,6 @@ impl WorkflowCtx { } impl WorkflowCtx { - /// Used internally to dispatch a workflow. Use `WorkflowCtx::workflow` instead. - pub(crate) async fn dispatch_workflow_inner( - &mut self, - tags: Option<&serde_json::Value>, - input: I, - ) -> GlobalResult - where - I: WorkflowInput, - ::Workflow: Workflow, - { - let event = self.current_history_event(); - - // Signal received before - let id = if let Some(event) = event { - // Validate history is consistent - let Event::SubWorkflow(sub_workflow) = event else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found sub workflow {}", - self.loc(), - I::Workflow::NAME - ))) - .map_err(GlobalError::raw); - }; - - if sub_workflow.name != I::Workflow::NAME { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found sub_workflow {}", - self.loc(), - I::Workflow::NAME - ))) - .map_err(GlobalError::raw); - } - - tracing::debug!( - name=%self.name, - id=%self.workflow_id, - sub_workflow_name=%sub_workflow.name, - sub_workflow_id=%sub_workflow.sub_workflow_id, - "replaying workflow dispatch" - ); - - sub_workflow.sub_workflow_id - } - // Dispatch new workflow - else { - let sub_workflow_name = I::Workflow::NAME; - let sub_workflow_id = Uuid::new_v4(); - - tracing::info!( - name=%self.name, - id=%self.workflow_id, - %sub_workflow_name, - %sub_workflow_id, - ?tags, - ?input, - "dispatching sub workflow" - ); - - // Serialize input - let input_val = serde_json::to_value(input) - .map_err(WorkflowError::SerializeWorkflowOutput) - .map_err(GlobalError::raw)?; - - self.db - .dispatch_sub_workflow( - self.ray_id, - self.workflow_id, - self.full_location().as_ref(), - sub_workflow_id, - &sub_workflow_name, - tags, - input_val, - self.loop_location(), - ) - .await - .map_err(GlobalError::raw)?; - - tracing::info!( - name=%self.name, - id=%self.workflow_id, - %sub_workflow_name, - ?sub_workflow_id, - "sub workflow dispatched" - ); - - sub_workflow_id - }; - - // Move to next event - self.inc_location(); - - Ok(id) - } - /// Wait for another workflow's response. If no response was found after polling the database, this /// workflow will go to sleep until the sub workflow completes. pub async fn wait_for_workflow( diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index 27d863c012..7184d4e285 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -19,7 +19,7 @@ use crate::{ }; /// Max amount of workflows pulled from the database with each call to `pull_workflows`. -const MAX_PULLED_WORKFLOWS: i64 = 10; +const MAX_PULLED_WORKFLOWS: i64 = 50; // Base retry for query retry backoff const QUERY_RETRY_MS: usize = 750; /// Maximum times a query ran bu this database adapter is retried. diff --git a/svc/api/admin/src/route/clusters/datacenters.rs b/svc/api/admin/src/route/clusters/datacenters.rs index 6a2ee2824a..f02db51172 100644 --- a/svc/api/admin/src/route/clusters/datacenters.rs +++ b/svc/api/admin/src/route/clusters/datacenters.rs @@ -95,25 +95,22 @@ pub async fn create( })) .await?; - ctx.tagged_signal( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::DatacenterCreate { - datacenter_id, - name_id: body.name_id.clone(), - display_name: body.display_name.clone(), - - provider: body.provider.api_into(), - provider_datacenter_id: body.provider_datacenter_id.clone(), - provider_api_token: None, - - pools, - - build_delivery_method: body.build_delivery_method.api_into(), - prebakes_enabled: body.prebakes_enabled, - }, - ) + ctx.signal(cluster::workflows::cluster::DatacenterCreate { + datacenter_id, + name_id: body.name_id.clone(), + display_name: body.display_name.clone(), + + provider: body.provider.api_into(), + provider_datacenter_id: body.provider_datacenter_id.clone(), + provider_api_token: None, + + pools, + + build_delivery_method: body.build_delivery_method.api_into(), + prebakes_enabled: body.prebakes_enabled, + }) + .tag("cluster_id", cluster_id) + .send() .await?; sub.next().await?; @@ -142,20 +139,17 @@ pub async fn update( bail_with!(CLUSTER_DATACENTER_NOT_IN_CLUSTER); } - ctx.tagged_signal( - &json!({ - "datacenter_id": datacenter_id, - }), - cluster::workflows::datacenter::Update { - pools: body - .pools - .iter() - .cloned() - .map(ApiInto::api_into) - .collect::>(), - prebakes_enabled: body.prebakes_enabled, - }, - ) + ctx.signal(cluster::workflows::datacenter::Update { + pools: body + .pools + .iter() + .cloned() + .map(ApiInto::api_into) + .collect::>(), + prebakes_enabled: body.prebakes_enabled, + }) + .tag("datacenter_id", datacenter_id) + .send() .await?; Ok(json!({})) diff --git a/svc/api/admin/src/route/clusters/mod.rs b/svc/api/admin/src/route/clusters/mod.rs index 87560a7dfa..0932c586cc 100644 --- a/svc/api/admin/src/route/clusters/mod.rs +++ b/svc/api/admin/src/route/clusters/mod.rs @@ -34,21 +34,19 @@ pub async fn create( ) -> GlobalResult { let cluster_id = Uuid::new_v4(); - let tags = json!({ - "cluster_id": cluster_id, - }); let mut sub = ctx - .subscribe::(&tags) + .subscribe::(&json!({ + "cluster_id": cluster_id, + })) .await?; - ctx.dispatch_tagged_workflow( - &tags, - cluster::workflows::cluster::Input { - cluster_id, - owner_team_id: body.owner_team_id, - name_id: body.name_id, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + owner_team_id: body.owner_team_id, + name_id: body.name_id, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await?; sub.next().await?; diff --git a/svc/api/servers/src/route/servers.rs b/svc/api/servers/src/route/servers.rs index 33e5014865..b5740eb863 100644 --- a/svc/api/servers/src/route/servers.rs +++ b/svc/api/servers/src/route/servers.rs @@ -83,62 +83,59 @@ pub async fn create( })) .await?; - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Input { - server_id, - env_id, - cluster_id, - datacenter_id: body.datacenter, - tags, - resources: (*body.resources).api_into(), - kill_timeout_ms: body - .lifecycle - .as_ref() - .and_then(|x| x.kill_timeout) - .unwrap_or_default(), - image_id: body.runtime.build, - args: body.runtime.arguments.unwrap_or_default(), - network_mode: body.network.mode.unwrap_or_default().api_into(), - environment: body.runtime.environment.unwrap_or_default(), - network_ports: unwrap!(body - .network - .ports - .into_iter() - .map(|(s, p)| Ok(( - s, - ds::workflows::server::Port { - internal_port: p.internal_port, - routing: if let Some(routing) = p.routing { - match *routing { - models::ServersPortRouting { - game_guard: Some(_), - host: None, - } => ds::types::Routing::GameGuard { - protocol: p.protocol.api_into(), - }, - models::ServersPortRouting { - game_guard: None, - host: Some(_), - } => ds::types::Routing::Host { - protocol: p.protocol.api_try_into()?, - }, - models::ServersPortRouting { .. } => { - bail_with!(SERVERS_MUST_SPECIFY_ROUTING_TYPE) - } - } - } else { - ds::types::Routing::GameGuard { + ctx.workflow(ds::workflows::server::Input { + server_id, + env_id, + cluster_id, + datacenter_id: body.datacenter, + tags, + resources: (*body.resources).api_into(), + kill_timeout_ms: body + .lifecycle + .as_ref() + .and_then(|x| x.kill_timeout) + .unwrap_or_default(), + image_id: body.runtime.build, + args: body.runtime.arguments.unwrap_or_default(), + network_mode: body.network.mode.unwrap_or_default().api_into(), + environment: body.runtime.environment.unwrap_or_default(), + network_ports: unwrap!(body + .network + .ports + .into_iter() + .map(|(s, p)| Ok(( + s, + ds::workflows::server::Port { + internal_port: p.internal_port, + routing: if let Some(routing) = p.routing { + match *routing { + models::ServersPortRouting { + game_guard: Some(_), + host: None, + } => ds::types::Routing::GameGuard { protocol: p.protocol.api_into(), + }, + models::ServersPortRouting { + game_guard: None, + host: Some(_), + } => ds::types::Routing::Host { + protocol: p.protocol.api_try_into()?, + }, + models::ServersPortRouting { .. } => { + bail_with!(SERVERS_MUST_SPECIFY_ROUTING_TYPE) } } + } else { + ds::types::Routing::GameGuard { + protocol: p.protocol.api_into(), + } } - ))) - .collect::>>()), - }, - ) + } + ))) + .collect::>>()), + }) + .tag("server_id", server_id) + .dispatch() .await?; tokio::select! { @@ -199,14 +196,11 @@ pub async fn destroy( })) .await?; - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Destroy { - override_kill_timeout_ms: query.override_kill_timeout, - }, - ) + ctx.signal(ds::workflows::server::Destroy { + override_kill_timeout_ms: query.override_kill_timeout, + }) + .tag("server_id", server_id) + .send() .await?; sub.next().await?; diff --git a/svc/pkg/cluster/src/ops/server/destroy_with_filter.rs b/svc/pkg/cluster/src/ops/server/destroy_with_filter.rs index e34f3119a5..cfbd6a75ae 100644 --- a/svc/pkg/cluster/src/ops/server/destroy_with_filter.rs +++ b/svc/pkg/cluster/src/ops/server/destroy_with_filter.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use chirp_workflow::prelude::*; -use serde_json::json; use crate::types::Filter; @@ -45,13 +44,10 @@ pub async fn cluster_server_destroy_with_filter( // Destroy servers for server_id in server_ids { - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - crate::workflows::server::Destroy {}, - ) - .await?; + ctx.signal(crate::workflows::server::Destroy {}) + .tag("server_id", server_id) + .send() + .await?; } // Trigger scale event @@ -61,13 +57,10 @@ pub async fn cluster_server_destroy_with_filter( .map(|x| x.datacenter_id) .collect::>(); for dc_id in dc_ids { - ctx.tagged_signal( - &json!({ - "datacenter_id": dc_id, - }), - crate::workflows::datacenter::Scale {}, - ) - .await?; + ctx.signal(crate::workflows::datacenter::Scale {}) + .tag("datacenter_id", dc_id) + .send() + .await?; } Ok(Output {}) diff --git a/svc/pkg/cluster/src/ops/server/taint_with_filter.rs b/svc/pkg/cluster/src/ops/server/taint_with_filter.rs index 802ac01142..a3d8292be0 100644 --- a/svc/pkg/cluster/src/ops/server/taint_with_filter.rs +++ b/svc/pkg/cluster/src/ops/server/taint_with_filter.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use chirp_workflow::prelude::*; -use serde_json::json; use crate::types::Filter; @@ -45,13 +44,10 @@ pub async fn cluster_server_taint_with_filter( // Taint servers for server_id in server_ids { - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - crate::workflows::server::Taint {}, - ) - .await?; + ctx.signal(crate::workflows::server::Taint {}) + .tag("server_id", server_id) + .send() + .await?; } // Trigger scale event @@ -61,13 +57,10 @@ pub async fn cluster_server_taint_with_filter( .map(|x| x.datacenter_id) .collect::>(); for dc_id in dc_ids { - ctx.tagged_signal( - &json!({ - "datacenter_id": dc_id, - }), - crate::workflows::datacenter::Scale {}, - ) - .await?; + ctx.signal(crate::workflows::datacenter::Scale {}) + .tag("datacenter_id", dc_id) + .send() + .await?; } Ok(Output {}) diff --git a/svc/pkg/cluster/src/workflows/datacenter/mod.rs b/svc/pkg/cluster/src/workflows/datacenter/mod.rs index c9e11a9b6a..66b4250e42 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/mod.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/mod.rs @@ -50,7 +50,7 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> datacenter_id: input.datacenter_id, renew: false, }) - .run() + .output() .await?; ctx.msg(CreateComplete {}) @@ -62,7 +62,7 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> ctx.workflow(scale::Input { datacenter_id: input.datacenter_id, }) - .run() + .output() .await?; ctx.repeat(|ctx| { @@ -79,10 +79,14 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) -> .await?; // Scale - ctx.workflow(scale::Input { datacenter_id }).run().await?; + ctx.workflow(scale::Input { datacenter_id }) + .output() + .await?; } Main::Scale(_) => { - ctx.workflow(scale::Input { datacenter_id }).run().await?; + ctx.workflow(scale::Input { datacenter_id }) + .output() + .await?; } Main::ServerCreate(sig) => { ctx.workflow(crate::workflows::server::Input { diff --git a/svc/pkg/cluster/src/workflows/prebake.rs b/svc/pkg/cluster/src/workflows/prebake.rs index 560a436286..8dd429af18 100644 --- a/svc/pkg/cluster/src/workflows/prebake.rs +++ b/svc/pkg/cluster/src/workflows/prebake.rs @@ -59,7 +59,7 @@ pub async fn cluster_prebake(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu pool_type: input.pool_type.clone(), initialize_immediately: false, }) - .run() + .output() .await?; // Create image diff --git a/svc/pkg/cluster/src/workflows/server/mod.rs b/svc/pkg/cluster/src/workflows/server/mod.rs index 63f5378f8d..6c86b6c5a5 100644 --- a/svc/pkg/cluster/src/workflows/server/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/mod.rs @@ -159,7 +159,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob pool_type: input.pool_type.clone(), initialize_immediately: true, }) - .run() + .output() .await; // If the server failed all attempts to install, clean it up @@ -188,7 +188,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob ctx.workflow(dns_create::Input { server_id: input.server_id, }) - .run() + .output() .await?; } @@ -223,14 +223,14 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob ctx.workflow(dns_create::Input { server_id: input.server_id, }) - .run() + .output() .await?; } Main::DnsDelete(_) => { ctx.workflow(dns_delete::Input { server_id: input.server_id, }) - .run() + .output() .await?; } Main::NomadRegistered(sig) => { @@ -256,7 +256,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob server_id: input.server_id, pool_type: input.pool_type.clone(), }) - .run() + .output() .await?; } Main::Undrain(_) => { @@ -265,7 +265,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob server_id: input.server_id, pool_type: input.pool_type.clone(), }) - .run() + .output() .await?; } Main::Taint(_) => {} // Only for state @@ -664,7 +664,7 @@ async fn cleanup( ctx.workflow(dns_delete::Input { server_id: input.server_id, }) - .run() + .output() .await?; } } diff --git a/svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs b/svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs index 407759e18c..89fae18b9f 100644 --- a/svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs +++ b/svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs @@ -1,7 +1,6 @@ use chirp_workflow::prelude::*; use cluster::types::TlsState; -use serde_json::json; // How much time before the cert expires to renew it const EXPIRE_PADDING: i64 = util::duration::days(30); @@ -38,13 +37,10 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { .collect::>(); for datacenter_id in updated_datacenter_ids { - ctx.tagged_signal( - &json!({ - "datacenter_id": datacenter_id, - }), - cluster::workflows::datacenter::TlsRenew {}, - ) - .await?; + ctx.signal(cluster::workflows::datacenter::TlsRenew {}) + .tag("datacenter_id", datacenter_id) + .send() + .await?; } Ok(()) diff --git a/svc/pkg/cluster/standalone/default-update/src/lib.rs b/svc/pkg/cluster/standalone/default-update/src/lib.rs index ecbf2e5605..b609d9e066 100644 --- a/svc/pkg/cluster/standalone/default-update/src/lib.rs +++ b/svc/pkg/cluster/standalone/default-update/src/lib.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use chirp_workflow::prelude::*; use serde::Deserialize; -use serde_json::json; #[derive(Deserialize)] struct Cluster { @@ -142,13 +141,11 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> { if cluster_res.clusters.is_empty() { tracing::warn!("creating default cluster"); - ctx.dispatch_tagged_workflow(&json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::Input { + ctx.workflow(cluster::workflows::cluster::Input { cluster_id, name_id: config.name_id.clone(), owner_team_id: None, - }).await?; + }).tag("cluster_id", cluster_id,).dispatch().await?; } for existing_datacenter in &datacenters_res.datacenters { @@ -199,18 +196,14 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> { }) .collect::>(); - ctx.tagged_signal(&json!({ - "datacenter_id": datacenter.datacenter_id, - }), cluster::workflows::datacenter::Update { + ctx.signal(cluster::workflows::datacenter::Update { pools: new_pools, prebakes_enabled: Some(datacenter.prebakes_enabled), - }).await?; + }).tag("datacenter_id", datacenter.datacenter_id,).send().await?; } // Create new datacenter else { - ctx.tagged_signal(&json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::DatacenterCreate { + ctx.signal(cluster::workflows::cluster::DatacenterCreate { datacenter_id: datacenter.datacenter_id, name_id, display_name: datacenter.display_name, @@ -232,7 +225,7 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> { build_delivery_method: datacenter.build_delivery_method.into(), prebakes_enabled: datacenter.prebakes_enabled, - }).await?; + }).tag("cluster_id", cluster_id,).send().await?; } } diff --git a/svc/pkg/cluster/standalone/gc/src/lib.rs b/svc/pkg/cluster/standalone/gc/src/lib.rs index 37e9c93436..d62f1e83b9 100644 --- a/svc/pkg/cluster/standalone/gc/src/lib.rs +++ b/svc/pkg/cluster/standalone/gc/src/lib.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use chirp_workflow::prelude::*; use cluster::types::PoolType; use futures_util::FutureExt; -use serde_json::json; #[derive(sqlx::FromRow)] struct ServerRow { @@ -122,13 +121,10 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<() // Scale for datacenter_id in datacenter_ids { - ctx.tagged_signal( - &json!({ - "datacenter_id": datacenter_id, - }), - cluster::workflows::datacenter::Scale {}, - ) - .await?; + ctx.signal(cluster::workflows::datacenter::Scale {}) + .tag("datacenter_id", datacenter_id) + .send() + .await?; } Ok(()) diff --git a/svc/pkg/cluster/standalone/gc/tests/integration.rs b/svc/pkg/cluster/standalone/gc/tests/integration.rs index b2ac0c23e3..553c4889c3 100644 --- a/svc/pkg/cluster/standalone/gc/tests/integration.rs +++ b/svc/pkg/cluster/standalone/gc/tests/integration.rs @@ -65,14 +65,11 @@ async fn basic() { .await .unwrap(); - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - cluster::workflows::server::Drain {}, - ) - .await - .unwrap(); + ctx.signal(cluster::workflows::server::Drain {}) + .tag("server_id", server_id) + .send() + .await + .unwrap(); // Run GC let ts = util::timestamp::now() + DRAIN_TIMEOUT + 1; @@ -104,14 +101,11 @@ async fn basic() { } // Clean up afterwards so we don't litter - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - cluster::workflows::server::Destroy {}, - ) - .await - .unwrap(); + ctx.signal(cluster::workflows::server::Destroy {}) + .tag("server_id", server_id) + .send() + .await + .unwrap(); } async fn setup(ctx: &TestCtx) -> (Uuid, Uuid) { @@ -132,16 +126,13 @@ async fn setup(ctx: &TestCtx) -> (Uuid, Uuid) { }]; let provider = Provider::Linode; - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); @@ -151,25 +142,22 @@ async fn setup(ctx: &TestCtx) -> (Uuid, Uuid) { })) .await .unwrap(); - ctx.tagged_signal( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::DatacenterCreate { - datacenter_id, - name_id: util::faker::ident(), - display_name: util::faker::ident(), + ctx.signal(cluster::workflows::cluster::DatacenterCreate { + datacenter_id, + name_id: util::faker::ident(), + display_name: util::faker::ident(), - provider: provider.clone(), - provider_datacenter_id: "us-southeast".to_string(), - provider_api_token: None, + provider: provider.clone(), + provider_datacenter_id: "us-southeast".to_string(), + provider_api_token: None, - pools: pools.clone(), + pools: pools.clone(), - build_delivery_method: BuildDeliveryMethod::TrafficServer, - prebakes_enabled: false, - }, - ) + build_delivery_method: BuildDeliveryMethod::TrafficServer, + prebakes_enabled: false, + }) + .tag("cluster_id", cluster_id) + .send() .await .unwrap(); @@ -197,16 +185,13 @@ async fn setup(ctx: &TestCtx) -> (Uuid, Uuid) { .await .unwrap(); - ctx.tagged_signal( - &json!({ - "datacenter_id": datacenter_id, - }), - cluster::workflows::datacenter::ServerCreate { - server_id, - pool_type: pool_type.clone(), - tags: vec!["test".to_string()], - }, - ) + ctx.signal(cluster::workflows::datacenter::ServerCreate { + server_id, + pool_type: pool_type.clone(), + tags: vec!["test".to_string()], + }) + .tag("datacenter_id", datacenter_id) + .send() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/common.rs b/svc/pkg/cluster/tests/common.rs index 07e57a5b04..65c055ed93 100644 --- a/svc/pkg/cluster/tests/common.rs +++ b/svc/pkg/cluster/tests/common.rs @@ -34,16 +34,13 @@ pub async fn setup(ctx: &TestCtx, opts: Setup) -> SetupRes { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": opts.cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id: opts.cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id: opts.cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", opts.cluster_id) + .dispatch() .await .unwrap(); @@ -56,25 +53,22 @@ pub async fn setup(ctx: &TestCtx, opts: Setup) -> SetupRes { .await .unwrap(); - ctx.tagged_signal( - &json!({ - "cluster_id": opts.cluster_id, - }), - cluster::workflows::cluster::DatacenterCreate { - datacenter_id: opts.datacenter_id, - name_id: util::faker::ident(), - display_name: util::faker::ident(), + ctx.signal(cluster::workflows::cluster::DatacenterCreate { + datacenter_id: opts.datacenter_id, + name_id: util::faker::ident(), + display_name: util::faker::ident(), - provider: provider.clone(), - provider_datacenter_id: "us-southeast".to_string(), - provider_api_token: None, + provider: provider.clone(), + provider_datacenter_id: "us-southeast".to_string(), + provider_api_token: None, - pools: pools.clone(), + pools: pools.clone(), - build_delivery_method: cluster::types::BuildDeliveryMethod::TrafficServer, - prebakes_enabled: false, - }, - ) + build_delivery_method: cluster::types::BuildDeliveryMethod::TrafficServer, + prebakes_enabled: false, + }) + .tag("cluster_id", opts.cluster_id) + .send() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/create.rs b/svc/pkg/cluster/tests/create.rs index 950a76510f..d6e9e57cf3 100644 --- a/svc/pkg/cluster/tests/create.rs +++ b/svc/pkg/cluster/tests/create.rs @@ -13,16 +13,13 @@ async fn create(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: Some(owner_team_id), - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: Some(owner_team_id), + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/datacenter_create.rs b/svc/pkg/cluster/tests/datacenter_create.rs index 292eca6d1c..85114b3d70 100644 --- a/svc/pkg/cluster/tests/datacenter_create.rs +++ b/svc/pkg/cluster/tests/datacenter_create.rs @@ -13,40 +13,34 @@ async fn datacenter_create(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); sub.next().await.unwrap(); - ctx.tagged_signal( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::DatacenterCreate { - datacenter_id, - name_id: util::faker::ident(), - display_name: util::faker::ident(), + ctx.signal(cluster::workflows::cluster::DatacenterCreate { + datacenter_id, + name_id: util::faker::ident(), + display_name: util::faker::ident(), - provider: cluster::types::Provider::Linode, - provider_datacenter_id: "us-southeast".to_string(), - provider_api_token: None, + provider: cluster::types::Provider::Linode, + provider_datacenter_id: "us-southeast".to_string(), + provider_api_token: None, - pools: Vec::new(), + pools: Vec::new(), - build_delivery_method: cluster::types::BuildDeliveryMethod::TrafficServer, - prebakes_enabled: false, - }, - ) + build_delivery_method: cluster::types::BuildDeliveryMethod::TrafficServer, + prebakes_enabled: false, + }) + .tag("cluster_id", cluster_id) + .send() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/get.rs b/svc/pkg/cluster/tests/get.rs index 4a5b85d349..98262afe2c 100644 --- a/svc/pkg/cluster/tests/get.rs +++ b/svc/pkg/cluster/tests/get.rs @@ -12,16 +12,13 @@ async fn get(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/get_for_game.rs b/svc/pkg/cluster/tests/get_for_game.rs index e2dc0150f0..e93d61e57c 100644 --- a/svc/pkg/cluster/tests/get_for_game.rs +++ b/svc/pkg/cluster/tests/get_for_game.rs @@ -13,29 +13,23 @@ async fn get_for_game(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); sub.next().await.unwrap(); - ctx.tagged_signal( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::GameLink { game_id }, - ) - .await - .unwrap(); + ctx.signal(cluster::workflows::cluster::GameLink { game_id }) + .tag("cluster_id", cluster_id) + .send() + .await + .unwrap(); let games_res = ctx .op(cluster::ops::get_for_game::Input { diff --git a/svc/pkg/cluster/tests/list.rs b/svc/pkg/cluster/tests/list.rs index 26488afecb..daf3c670bc 100644 --- a/svc/pkg/cluster/tests/list.rs +++ b/svc/pkg/cluster/tests/list.rs @@ -12,16 +12,13 @@ async fn list_single_cluster(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "cluster_id": cluster_id, - }), - cluster::workflows::cluster::Input { - cluster_id, - name_id: util::faker::ident(), - owner_team_id: None, - }, - ) + ctx.workflow(cluster::workflows::cluster::Input { + cluster_id, + name_id: util::faker::ident(), + owner_team_id: None, + }) + .tag("cluster_id", cluster_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/cluster/tests/server_provision.rs b/svc/pkg/cluster/tests/server_provision.rs index d302c01579..ba7ed0a93c 100644 --- a/svc/pkg/cluster/tests/server_provision.rs +++ b/svc/pkg/cluster/tests/server_provision.rs @@ -1,5 +1,4 @@ use chirp_workflow::prelude::*; -use serde_json::json; mod common; use common::{setup, Setup}; @@ -26,16 +25,13 @@ async fn server_provision(ctx: TestCtx) { ) .await; - ctx.tagged_signal( - &json!({ - "datacenter_id": datacenter_id, - }), - cluster::workflows::datacenter::ServerCreate { - server_id, - pool_type: dc.pools.first().unwrap().pool_type.clone(), - tags: vec!["test".to_string()], - }, - ) + ctx.signal(cluster::workflows::datacenter::ServerCreate { + server_id, + pool_type: dc.pools.first().unwrap().pool_type.clone(), + tags: vec!["test".to_string()], + }) + .tag("datacenter_id", datacenter_id) + .send() .await .unwrap(); @@ -65,12 +61,9 @@ async fn server_provision(ctx: TestCtx) { } // Clean up afterwards so we don't litter - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - cluster::workflows::server::Destroy {}, - ) - .await - .unwrap(); + ctx.signal(cluster::workflows::server::Destroy {}) + .tag("server_id", server_id) + .send() + .await + .unwrap(); } diff --git a/svc/pkg/ds/src/workflows/server/destroy.rs b/svc/pkg/ds/src/workflows/server/destroy.rs index a1b5e94983..961307b304 100644 --- a/svc/pkg/ds/src/workflows/server/destroy.rs +++ b/svc/pkg/ds/src/workflows/server/destroy.rs @@ -1,6 +1,5 @@ use chirp_workflow::prelude::*; use futures_util::FutureExt; -use serde_json::json; use crate::util::{signal_allocation, NOMAD_CONFIG}; diff --git a/svc/pkg/ds/src/workflows/server/mod.rs b/svc/pkg/ds/src/workflows/server/mod.rs index 152fd9e3f3..5708224c54 100644 --- a/svc/pkg/ds/src/workflows/server/mod.rs +++ b/svc/pkg/ds/src/workflows/server/mod.rs @@ -176,7 +176,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id: input.server_id, override_kill_timeout_ms, }) - .run() + .output() .await?; Ok(()) diff --git a/svc/pkg/ds/tests/common.rs b/svc/pkg/ds/tests/common.rs index 6f21742f91..6890dc1319 100644 --- a/svc/pkg/ds/tests/common.rs +++ b/svc/pkg/ds/tests/common.rs @@ -111,28 +111,25 @@ impl Setup { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Input { - server_id, - env_id: self.env_id, - cluster_id: self.cluster_id, - datacenter_id: self.datacenter_id, - resources: ds::types::ServerResources { - cpu_millicores: 100, - memory_mib: 200, - }, - kill_timeout_ms: 0, - tags: HashMap::new(), - args: Vec::new(), - environment: env, - image_id: self.image_id, - network_mode, - network_ports: ports, + ctx.workflow(ds::workflows::server::Input { + server_id, + env_id: self.env_id, + cluster_id: self.cluster_id, + datacenter_id: self.datacenter_id, + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, }, - ) + kill_timeout_ms: 0, + tags: HashMap::new(), + args: Vec::new(), + environment: env, + image_id: self.image_id, + network_mode, + network_ports: ports, + }) + .tag("server_id", server_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/ds/tests/print_test_data.rs b/svc/pkg/ds/tests/print_test_data.rs index f176795ab5..3f5ce661e5 100644 --- a/svc/pkg/ds/tests/print_test_data.rs +++ b/svc/pkg/ds/tests/print_test_data.rs @@ -165,30 +165,27 @@ async fn print_test_data(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Input { - server_id, - env_id: *env_id, - cluster_id, - datacenter_id: faker_region.region_id.unwrap().as_uuid(), - resources: ds::types::ServerResources { - cpu_millicores: 100, - memory_mib: 200, - }, - kill_timeout_ms: 0, - tags: vec![(String::from("test"), String::from("123"))] - .into_iter() - .collect(), - args: Vec::new(), - environment: env, - image_id: build_res.build_id.unwrap().as_uuid(), - network_mode: types::NetworkMode::Bridge, - network_ports: ports, + ctx.workflow(ds::workflows::server::Input { + server_id, + env_id: *env_id, + cluster_id, + datacenter_id: faker_region.region_id.unwrap().as_uuid(), + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, }, - ) + kill_timeout_ms: 0, + tags: vec![(String::from("test"), String::from("123"))] + .into_iter() + .collect(), + args: Vec::new(), + environment: env, + image_id: build_res.build_id.unwrap().as_uuid(), + network_mode: types::NetworkMode::Bridge, + network_ports: ports, + }) + .tag("server_id", server_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/ds/tests/server_create.rs b/svc/pkg/ds/tests/server_create.rs index 8fdc528040..3c6f583853 100644 --- a/svc/pkg/ds/tests/server_create.rs +++ b/svc/pkg/ds/tests/server_create.rs @@ -65,30 +65,27 @@ async fn server_create(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Input { - server_id, - env_id: *env_id, - cluster_id, - datacenter_id: faker_region.region_id.unwrap().as_uuid(), - resources: ds::types::ServerResources { - cpu_millicores: 100, - memory_mib: 200, - }, - kill_timeout_ms: 0, - tags: vec![(String::from("test"), String::from("123"))] - .into_iter() - .collect(), - args: Vec::new(), - environment: env, - image_id: build_res.build_id.unwrap().as_uuid(), - network_mode: types::NetworkMode::Bridge, - network_ports: ports, + ctx.workflow(ds::workflows::server::Input { + server_id, + env_id: *env_id, + cluster_id, + datacenter_id: faker_region.region_id.unwrap().as_uuid(), + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, }, - ) + kill_timeout_ms: 0, + tags: vec![(String::from("test"), String::from("123"))] + .into_iter() + .collect(), + args: Vec::new(), + environment: env, + image_id: build_res.build_id.unwrap().as_uuid(), + network_mode: types::NetworkMode::Bridge, + network_ports: ports, + }) + .tag("server_id", server_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/ds/tests/server_get.rs b/svc/pkg/ds/tests/server_get.rs index a19ad2dd8a..a5aa78c623 100644 --- a/svc/pkg/ds/tests/server_get.rs +++ b/svc/pkg/ds/tests/server_get.rs @@ -65,28 +65,25 @@ async fn server_get(ctx: TestCtx) { .await .unwrap(); - ctx.dispatch_tagged_workflow( - &json!({ - "server_id": server_id, - }), - ds::workflows::server::Input { - server_id, - env_id: *env_id, - cluster_id, - datacenter_id: faker_region.region_id.unwrap().as_uuid(), - resources: ds::types::ServerResources { - cpu_millicores: 100, - memory_mib: 200, - }, - kill_timeout_ms: 0, - tags: HashMap::new(), - args: Vec::new(), - environment: env, - image_id: build_res.build_id.unwrap().as_uuid(), - network_mode: types::NetworkMode::Bridge, - network_ports: ports, + ctx.workflow(ds::workflows::server::Input { + server_id, + env_id: *env_id, + cluster_id, + datacenter_id: faker_region.region_id.unwrap().as_uuid(), + resources: ds::types::ServerResources { + cpu_millicores: 100, + memory_mib: 200, }, - ) + kill_timeout_ms: 0, + tags: HashMap::new(), + args: Vec::new(), + environment: env, + image_id: build_res.build_id.unwrap().as_uuid(), + network_mode: types::NetworkMode::Bridge, + network_ports: ports, + }) + .tag("server_id", server_id) + .dispatch() .await .unwrap(); diff --git a/svc/pkg/game/ops/create/src/lib.rs b/svc/pkg/game/ops/create/src/lib.rs index d2153613df..24070572ed 100644 --- a/svc/pkg/game/ops/create/src/lib.rs +++ b/svc/pkg/game/ops/create/src/lib.rs @@ -66,15 +66,13 @@ async fn handle( .await?; if let Some(cluster_id) = ctx.cluster_id { - chirp_workflow::compat::tagged_signal( + let sig = chirp_workflow::compat::signal( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id.as_uuid(), - }), cluster::workflows::cluster::GameLink { game_id }, ) - .await - .unwrap(); + .await?; + + sig.tag("cluster_id", cluster_id.as_uuid()).send().await?; } msg!([ctx] game::msg::create_complete(game_id) { diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index 563882d0e5..402a1b2695 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -71,7 +71,7 @@ pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult linode_id: cleanup_ctx.linode_id, firewall_id: cleanup_ctx.firewall_id, }) - .run() + .output() .await?; Ok(()) diff --git a/svc/pkg/linode/standalone/gc/src/lib.rs b/svc/pkg/linode/standalone/gc/src/lib.rs index 8f922079fb..308f95f7db 100644 --- a/svc/pkg/linode/standalone/gc/src/lib.rs +++ b/svc/pkg/linode/standalone/gc/src/lib.rs @@ -106,12 +106,11 @@ async fn run_for_linode_account( let ctx = ctx.clone(); async move { - ctx.tagged_signal( - &json!({ - "image_id": &image_id, - }), - linode::workflows::image::CreateComplete { image_id }, - ) + ctx.signal(linode::workflows::image::CreateComplete { + image_id: image_id.clone(), + }) + .tag("image_id", image_id) + .send() .await } }) @@ -146,13 +145,10 @@ async fn delete_expired_images( let ctx = ctx.clone(); async move { - ctx.tagged_signal( - &json!({ - "image_id": img.id, - }), - linode::workflows::image::Destroy {}, - ) - .await + ctx.signal(linode::workflows::image::Destroy {}) + .tag("image_id", &img.id) + .send() + .await } }) .buffer_unordered(8) diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/node_registration.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/node_registration.rs index fc723c0848..d10bd31fe3 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/node_registration.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/node_registration.rs @@ -1,6 +1,5 @@ use chirp_workflow::prelude::*; use serde::Deserialize; -use serde_json::json; #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -16,14 +15,11 @@ pub async fn handle( let meta = unwrap_ref!(node.meta, "no metadata on node"); let server_id = util::uuid::parse(unwrap!(meta.get("server-id"), "no server-id in metadata"))?; - ctx.tagged_signal( - &json!({ - "server_id": server_id, - }), - cluster::workflows::server::NomadRegistered { - node_id: node_id.to_owned(), - }, - ) + ctx.signal(cluster::workflows::server::NomadRegistered { + node_id: node_id.to_owned(), + }) + .tag("server_id", server_id) + .send() .await?; Ok(()) diff --git a/svc/pkg/region/ops/list-for-game/tests/integration.rs b/svc/pkg/region/ops/list-for-game/tests/integration.rs index b68a0106a0..f352726ab0 100644 --- a/svc/pkg/region/ops/list-for-game/tests/integration.rs +++ b/svc/pkg/region/ops/list-for-game/tests/integration.rs @@ -6,14 +6,15 @@ async fn empty(ctx: TestCtx) { let (cluster_id, datacenter_id) = create_dc(&ctx).await; let game_id = Uuid::new_v4(); - chirp_workflow::compat::tagged_signal( + chirp_workflow::compat::signal( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::GameLink { game_id }, ) .await + .unwrap() + .tag("cluster_id", cluster_id) + .send() + .await .unwrap(); let regions_res = op!([ctx] region_list_for_game { @@ -34,11 +35,8 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid) { let datacenter_id = Uuid::new_v4(); let cluster_id = Uuid::new_v4(); - chirp_workflow::compat::dispatch_tagged_workflow( + chirp_workflow::compat::workflow( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::Input { cluster_id, name_id: util::faker::ident(), @@ -46,6 +44,10 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid) { }, ) .await + .unwrap() + .tag("cluster_id", cluster_id) + .dispatch() + .await .unwrap(); let mut create_sub = @@ -57,11 +59,8 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid) { ) .await .unwrap(); - chirp_workflow::compat::tagged_signal( + chirp_workflow::compat::signal( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::DatacenterCreate { datacenter_id, name_id: util::faker::ident(), @@ -78,6 +77,10 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid) { }, ) .await + .unwrap() + .tag("cluster_id", cluster_id) + .send() + .await .unwrap(); create_sub.next().await.unwrap(); diff --git a/svc/pkg/region/ops/resolve-for-game/tests/integration.rs b/svc/pkg/region/ops/resolve-for-game/tests/integration.rs index 7cd96af21b..a26272c134 100644 --- a/svc/pkg/region/ops/resolve-for-game/tests/integration.rs +++ b/svc/pkg/region/ops/resolve-for-game/tests/integration.rs @@ -7,16 +7,15 @@ async fn empty(ctx: TestCtx) { let (cluster_id, datacenter_id, dc_name_id) = create_dc(&ctx).await; let game_id = Uuid::new_v4(); - chirp_workflow::compat::tagged_signal( + let sig = chirp_workflow::compat::signal( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::GameLink { game_id }, ) .await .unwrap(); + sig.tag("cluster_id", cluster_id).send().await.unwrap(); + let regions_res = op!([ctx] region_resolve_for_game { game_id: Some(game_id.into()), name_ids: vec![dc_name_id, "foo".to_string()], @@ -43,11 +42,8 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid, String) { let dc_name_id = util::faker::ident(); let cluster_id = Uuid::new_v4(); - chirp_workflow::compat::dispatch_tagged_workflow( + chirp_workflow::compat::workflow( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::Input { cluster_id, name_id: util::faker::ident(), @@ -55,6 +51,10 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid, String) { }, ) .await + .unwrap() + .tag("cluster_id", cluster_id) + .dispatch() + .await .unwrap(); let mut create_sub = @@ -66,11 +66,8 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid, String) { ) .await .unwrap(); - chirp_workflow::compat::tagged_signal( + chirp_workflow::compat::signal( ctx.op_ctx(), - &json!({ - "cluster_id": cluster_id, - }), cluster::workflows::cluster::DatacenterCreate { datacenter_id, name_id: dc_name_id.clone(), @@ -87,6 +84,10 @@ async fn create_dc(ctx: &TestCtx) -> (Uuid, Uuid, String) { }, ) .await + .unwrap() + .tag("cluster_id", cluster_id) + .send() + .await .unwrap(); create_sub.next().await.unwrap(); diff --git a/svc/pkg/telemetry/standalone/beacon/src/lib.rs b/svc/pkg/telemetry/standalone/beacon/src/lib.rs index dc0167acbb..2f631e8b19 100644 --- a/svc/pkg/telemetry/standalone/beacon/src/lib.rs +++ b/svc/pkg/telemetry/standalone/beacon/src/lib.rs @@ -141,10 +141,10 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { ); let mut event = async_posthog::Event::new("cluster_beacon", &distinct_id); - event.insert_prop("$groups", &json!({ "cluster": util::env::cluster_id() }))?; + event.insert_prop("$groups", json!({ "cluster": util::env::cluster_id() }))?; event.insert_prop( "$set", - &json!({ + json!({ "ns_id": util::env::namespace(), "cluster_id": util::env::cluster_id(), }), @@ -156,7 +156,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { event.insert_prop("$group_key", util::env::cluster_id())?; event.insert_prop( "$group_set", - &json!({ + json!({ "name": util::env::namespace(), }), )?; @@ -177,7 +177,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { let mut event = async_posthog::Event::new("team_beacon", &distinct_id); event.insert_prop( "$groups", - &json!({ + json!({ "cluster": util::env::cluster_id(), "team": team_id, }), @@ -200,7 +200,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { event.insert_prop("$group_key", team_id)?; event.insert_prop( "$group_set", - &json!({ + json!({ "display_name": team.display_name, "create_ts": team.create_ts, }), @@ -217,7 +217,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { let mut event = async_posthog::Event::new("game_beacon", &distinct_id); event.insert_prop( "$groups", - &json!({ + json!({ "cluster": util::env::cluster_id(), "team": team_id, "game": game_id, @@ -242,7 +242,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { event.insert_prop("$group_key", game_id)?; event.insert_prop( "$group_set", - &json!({ + json!({ "name_id": game.name_id, "display_name": game.display_name, "create_ts": game.create_ts, @@ -305,7 +305,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { let mut event = async_posthog::Event::new("namespace_beacon", &distinct_id); event.insert_prop( "$groups", - &json!({ + json!({ "cluster": util::env::cluster_id(), "team": team_id, "game": game_id, @@ -334,7 +334,7 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> { event.insert_prop("$group_key", game_id)?; event.insert_prop( "$group_set", - &json!({ + json!({ "name_id": ns.name_id, "display_name": ns.display_name, "create_ts": ns.create_ts,