From bccce312dbb75bd8e49d40260e47a8d7d8b98bae Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:53:37 +0000 Subject: [PATCH] fix: loops and cache (#1010) ## Changes --- lib/chirp-workflow/core/src/compat.rs | 36 ++++++++------ lib/chirp-workflow/core/src/ctx/activity.rs | 4 +- lib/chirp-workflow/core/src/ctx/api.rs | 14 +++--- lib/chirp-workflow/core/src/ctx/operation.rs | 4 +- lib/chirp-workflow/core/src/ctx/standalone.rs | 14 +++--- lib/chirp-workflow/core/src/ctx/test.rs | 14 +++--- lib/chirp-workflow/core/src/ctx/workflow.rs | 49 +++++++++---------- lib/chirp-workflow/macros/src/lib.rs | 6 +-- svc/pkg/cluster/src/ops/datacenter/get.rs | 2 +- .../src/ops/datacenter/location_get.rs | 2 +- .../cluster/src/workflows/datacenter/mod.rs | 23 +++++++-- svc/pkg/cluster/src/workflows/server/mod.rs | 4 +- 12 files changed, 95 insertions(+), 77 deletions(-) diff --git a/lib/chirp-workflow/core/src/compat.rs b/lib/chirp-workflow/core/src/compat.rs index ccbf815822..002c0e8bb2 100644 --- a/lib/chirp-workflow/core/src/compat.rs +++ b/lib/chirp-workflow/core/src/compat.rs @@ -34,10 +34,10 @@ where bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body."); } - let name = I::Workflow::NAME; - let id = Uuid::new_v4(); + let workflow_name = I::Workflow::NAME; + let workflow_id = Uuid::new_v4(); - tracing::info!(%name, %id, ?input, "dispatching workflow"); + tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -46,13 +46,13 @@ where db_from_ctx(ctx) .await? - .dispatch_workflow(ctx.ray_id(), id, &name, None, input_val) + .dispatch_workflow(ctx.ray_id(), workflow_id, &workflow_name, None, input_val) .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(%workflow_name, ?workflow_id, "workflow dispatched"); - Ok(id) + Ok(workflow_id) } pub async fn dispatch_tagged_workflow( @@ -69,10 +69,10 @@ where bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body."); } - let name = I::Workflow::NAME; - let id = Uuid::new_v4(); + let workflow_name = I::Workflow::NAME; + let workflow_id = Uuid::new_v4(); - tracing::info!(%name, %id, ?input, "dispatching workflow"); + tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching tagged workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -81,13 +81,19 @@ where db_from_ctx(ctx) .await? - .dispatch_workflow(ctx.ray_id(), id, &name, Some(tags), input_val) + .dispatch_workflow( + ctx.ray_id(), + workflow_id, + &workflow_name, + Some(tags), + input_val, + ) .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(%workflow_name, ?workflow_id, "workflow tagged dispatched"); - Ok(id) + Ok(workflow_id) } /// Wait for a given workflow to complete. @@ -96,7 +102,7 @@ pub async fn wait_for_workflow( ctx: &rivet_operation::OperationContext, workflow_id: Uuid, ) -> GlobalResult { - tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow"); + tracing::info!(sub_workflow_name=W::NAME, sub_workflow_id=?workflow_id, "waiting for workflow"); tokio::time::timeout(WORKFLOW_TIMEOUT, async move { let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY); @@ -161,7 +167,7 @@ pub async fn signal( let signal_id = Uuid::new_v4(); - tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal"); + tracing::info!(signal_name=%I::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(input) @@ -188,7 +194,7 @@ pub async fn tagged_signal( let signal_id = Uuid::new_v4(); - tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(signal_name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(input) diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index 23749e1d66..ddb8b8020c 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -70,7 +70,7 @@ impl ActivityCtx { I: OperationInput, ::Operation: Operation, { - tracing::info!(?input, "operation call"); + tracing::info!(activity_name=%self.name, ?input, "operation call"); let ctx = OperationCtx::new( self.db.clone(), @@ -87,7 +87,7 @@ impl ActivityCtx { .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw); - tracing::info!(?res, "operation response"); + tracing::info!(activity_name=%self.name, ?res, "operation response"); res } diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs index 9cc7a62c44..33a1115b1a 100644 --- a/lib/chirp-workflow/core/src/ctx/api.rs +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -79,7 +79,7 @@ impl ApiCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?input, "dispatching workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -91,7 +91,7 @@ impl ApiCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched"); Ok(id) } @@ -108,7 +108,7 @@ impl ApiCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -120,7 +120,7 @@ impl ApiCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "tagged workflow dispatched"); Ok(id) } @@ -131,7 +131,7 @@ impl ApiCtx { &self, workflow_id: Uuid, ) -> GlobalResult { - tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow"); + tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow"); tokio::time::timeout(WORKFLOW_TIMEOUT, async move { let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY); @@ -188,7 +188,7 @@ impl ApiCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); + tracing::info!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(input) @@ -210,7 +210,7 @@ impl ApiCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(input) diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index fd4437a263..55041efac0 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -97,7 +97,7 @@ impl OperationCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); + tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(input) @@ -119,7 +119,7 @@ impl OperationCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(input) diff --git a/lib/chirp-workflow/core/src/ctx/standalone.rs b/lib/chirp-workflow/core/src/ctx/standalone.rs index d670633322..dee5c021a3 100644 --- a/lib/chirp-workflow/core/src/ctx/standalone.rs +++ b/lib/chirp-workflow/core/src/ctx/standalone.rs @@ -78,7 +78,7 @@ impl StandaloneCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?input, "dispatching workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -90,7 +90,7 @@ impl StandaloneCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched"); Ok(id) } @@ -107,7 +107,7 @@ impl StandaloneCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -119,7 +119,7 @@ impl StandaloneCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched"); Ok(id) } @@ -130,7 +130,7 @@ impl StandaloneCtx { &self, workflow_id: Uuid, ) -> GlobalResult { - tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow"); + tracing::info!(workflow_name=%W::NAME, id=?workflow_id, "waiting for workflow"); tokio::time::timeout(WORKFLOW_TIMEOUT, async move { let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY); @@ -187,7 +187,7 @@ impl StandaloneCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); + tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(input) @@ -209,7 +209,7 @@ impl StandaloneCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(input) diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index 1a5867a563..7c4c027dc9 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -94,7 +94,7 @@ impl TestCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?input, "dispatching workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -106,7 +106,7 @@ impl TestCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched"); Ok(id) } @@ -123,7 +123,7 @@ impl TestCtx { let name = I::Workflow::NAME; let id = Uuid::new_v4(); - tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow"); + tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow"); // Serialize input let input_val = serde_json::to_value(input) @@ -135,7 +135,7 @@ impl TestCtx { .await .map_err(GlobalError::raw)?; - tracing::info!(%name, ?id, "workflow dispatched"); + tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched"); Ok(id) } @@ -144,7 +144,7 @@ impl TestCtx { &self, workflow_id: Uuid, ) -> GlobalResult { - tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow"); + tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow"); let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY); loop { @@ -198,7 +198,7 @@ impl TestCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); + tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(input) @@ -220,7 +220,7 @@ impl TestCtx { ) -> GlobalResult { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(input) diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 3bcb416e3e..c92a96f737 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -281,7 +281,7 @@ impl WorkflowCtx { activity_id: &ActivityId, create_ts: i64, ) -> WorkflowResult { - tracing::debug!(id=%self.workflow_id, activity_name=%A::NAME, "running activity"); + tracing::debug!(name=%self.name, id=%self.workflow_id, activity_name=%A::NAME, "running activity"); let ctx = ActivityCtx::new( self.workflow_id, @@ -422,7 +422,7 @@ impl WorkflowCtx { I: WorkflowInput, ::Workflow: Workflow, { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal received before let id = if let Some(event) = event { @@ -510,7 +510,7 @@ impl WorkflowCtx { &self, sub_workflow_id: Uuid, ) -> GlobalResult { - tracing::info!(name = W::NAME, ?sub_workflow_id, "waiting for workflow"); + tracing::info!(name=%self.name, id=%self.workflow_id, sub_workflow_name=%W::NAME, ?sub_workflow_id, "waiting for workflow"); let mut retries = 0; let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY); @@ -653,7 +653,7 @@ impl WorkflowCtx { { let activity_id = ActivityId::new::(&input); - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Activity was ran before let output = if let Some(event) = event { @@ -679,7 +679,7 @@ impl WorkflowCtx { // Activity succeeded if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? { - tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity"); + tracing::debug!(name=%self.name, id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity"); output } @@ -752,7 +752,7 @@ impl WorkflowCtx { workflow_id: Uuid, body: T, ) -> GlobalResult { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal sent before let signal_id = if let Some(event) = event { @@ -773,14 +773,14 @@ impl WorkflowCtx { .map_err(GlobalError::raw); } - tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying signal dispatch"); + tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying signal dispatch"); signal.signal_id } // Send signal else { let signal_id = Uuid::new_v4(); - tracing::info!(id=%self.workflow_id, signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal"); + tracing::info!(name=%self.name, id=%self.workflow_id, signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal"); // Serialize input let input_val = serde_json::to_value(&body) @@ -816,7 +816,7 @@ impl WorkflowCtx { tags: &serde_json::Value, body: T, ) -> GlobalResult { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal sent before let signal_id = if let Some(event) = event { @@ -837,7 +837,7 @@ impl WorkflowCtx { .map_err(GlobalError::raw); } - tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying tagged signal dispatch"); + tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying tagged signal dispatch"); signal.signal_id } @@ -845,7 +845,7 @@ impl WorkflowCtx { else { let signal_id = Uuid::new_v4(); - tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); + tracing::info!(name=%self.name, id=%self.workflow_id, signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal"); // Serialize input let input_val = serde_json::to_value(&body) @@ -878,7 +878,7 @@ impl WorkflowCtx { /// Listens for a signal for a short time before setting the workflow to sleep. Once the signal is /// received, the workflow will be woken up and continue. pub async fn listen(&mut self) -> GlobalResult { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal received before let signal = if let Some(event) = event { @@ -931,7 +931,7 @@ impl WorkflowCtx { &mut self, listener: &T, ) -> GlobalResult<::Output> { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal received before let signal = if let Some(event) = event { @@ -981,7 +981,7 @@ impl WorkflowCtx { /// Checks if the given signal exists in the database. pub async fn query_signal(&mut self) -> GlobalResult> { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Signal received before let signal = if let Some(event) = event { @@ -1018,7 +1018,7 @@ impl WorkflowCtx { where M: Message, { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Message sent before if let Some(event) = event { @@ -1039,11 +1039,11 @@ impl WorkflowCtx { .map_err(GlobalError::raw); } - tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); + tracing::debug!(name=%self.name, id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); } // Send message else { - tracing::info!(id=%self.workflow_id, msg_name=%M::NAME, ?tags, "dispatching message"); + tracing::info!(name=%self.name, id=%self.workflow_id, msg_name=%M::NAME, ?tags, "dispatching message"); // Serialize body let body_val = serde_json::to_value(&body) @@ -1077,7 +1077,7 @@ impl WorkflowCtx { where M: Message, { - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(self.location_idx); // Message sent before if let Some(event) = event { @@ -1098,11 +1098,11 @@ impl WorkflowCtx { .map_err(GlobalError::raw); } - tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); + tracing::debug!(name=%self.name, id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); } // Send message else { - tracing::info!(id=%self.workflow_id, msg_name=%M::NAME, ?tags, "dispatching message"); + tracing::info!(name=%self.name, id=%self.workflow_id, msg_name=%M::NAME, ?tags, "dispatching message"); // Serialize body let body_val = serde_json::to_value(&body) @@ -1139,10 +1139,11 @@ impl WorkflowCtx { F: for<'a> FnMut(&'a mut WorkflowCtx) -> AsyncResult<'a, Loop>, T: Serialize + DeserializeOwned, { + let event_location = self.location_idx; let loop_location = self.full_location(); let mut loop_branch = self.branch(); - let event = { self.relevant_history().nth(self.location_idx) }; + let event = self.relevant_history().nth(event_location); // Loop existed before let output = if let Some(event) = event { @@ -1175,8 +1176,6 @@ impl WorkflowCtx { tracing::info!(name=%self.name, id=%self.workflow_id, "running loop"); loop { - let iteration_idx = loop_branch.location_idx; - let mut iteration_branch = loop_branch.branch(); iteration_branch.loop_location = Some(loop_location.clone()); @@ -1186,7 +1185,7 @@ impl WorkflowCtx { .update_loop( self.workflow_id, loop_location.as_ref(), - iteration_idx, + loop_branch.location_idx, None, self.loop_location(), ) @@ -1201,7 +1200,7 @@ impl WorkflowCtx { .update_loop( self.workflow_id, loop_location.as_ref(), - iteration_idx, + loop_branch.location_idx, Some(output_val), self.loop_location(), ) diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index e2a886e10b..1bffe673c6 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -300,12 +300,12 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream { #serde_derive #item_struct - impl Signal for #struct_ident { + impl chirp_workflow::prelude::Signal for #struct_ident { const NAME: &'static str = #name; } #[async_trait::async_trait] - impl Listen for #struct_ident { + impl chirp_workflow::prelude::Listen for #struct_ident { async fn listen(ctx: &chirp_workflow::prelude::ListenCtx) -> chirp_workflow::prelude::WorkflowResult { let row = ctx.listen_any(&[::NAME]).await?; Self::parse(&row.signal_name, row.body) @@ -350,7 +350,7 @@ pub fn message(attr: TokenStream, item: TokenStream) -> TokenStream { #[derive(Debug)] #item_struct - impl Message for #struct_ident { + impl chirp_workflow::prelude::Message for #struct_ident { const NAME: &'static str = #name; const TAIL_TTL: std::time::Duration = std::time::Duration::from_secs(#tail_ttl); } diff --git a/svc/pkg/cluster/src/ops/datacenter/get.rs b/svc/pkg/cluster/src/ops/datacenter/get.rs index 21eec564f0..a0ed09280f 100644 --- a/svc/pkg/cluster/src/ops/datacenter/get.rs +++ b/svc/pkg/cluster/src/ops/datacenter/get.rs @@ -78,7 +78,7 @@ impl TryFrom for Datacenter { pub async fn cluster_datacenter_get(ctx: &OperationCtx, input: &Input) -> GlobalResult { let datacenters = ctx .cache() - .fetch_all_json("cluster.datacenters", input.datacenter_ids.clone(), { + .fetch_all_json("cluster.datacenters2", input.datacenter_ids.clone(), { let ctx = ctx.clone(); move |mut cache, datacenter_ids| { let ctx = ctx.clone(); diff --git a/svc/pkg/cluster/src/ops/datacenter/location_get.rs b/svc/pkg/cluster/src/ops/datacenter/location_get.rs index 6276839709..32d762ae16 100644 --- a/svc/pkg/cluster/src/ops/datacenter/location_get.rs +++ b/svc/pkg/cluster/src/ops/datacenter/location_get.rs @@ -37,7 +37,7 @@ pub async fn cluster_datacenter_location_get( let datacenters = ctx .cache() .fetch_all_json( - "cluster.datacenters.location", + "cluster.datacenters.location2", input.datacenter_ids.clone(), { let ctx = ctx.clone(); diff --git a/svc/pkg/cluster/src/workflows/datacenter/mod.rs b/svc/pkg/cluster/src/workflows/datacenter/mod.rs index c0a54a612f..3657c19fc5 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/mod.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/mod.rs @@ -1,5 +1,8 @@ +use std::convert::TryInto; + use chirp_workflow::prelude::*; use futures_util::FutureExt; +use rivet_operation::prelude::{proto::backend, Message}; use serde_json::json; pub mod scale; @@ -245,16 +248,26 @@ struct UpdateDbInput { #[activity(UpdateDb)] async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<()> { // Get current pools - let (pools,) = sql_fetch_one!( - [ctx, (sqlx::types::Json>,)] + let (pools, pools2) = sql_fetch_one!( + [ctx, (Vec, Option>>,)] " - SELECT pools2 FROM db_cluster.datacenters + SELECT pools, pools2 FROM db_cluster.datacenters WHERE datacenter_id = $1 ", input.datacenter_id, ) .await?; - let mut pools = pools.0; + // Handle backwards compatibility + let mut pools = if let Some(pools) = pools2 { + pools.0 + } else { + let proto = backend::cluster::Pools::decode(pools.as_slice())?.pools; + + proto + .into_iter() + .map(TryInto::try_into) + .collect::>>()? + }; for pool in &input.pools { let current_pool = unwrap!( @@ -297,7 +310,7 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<()> // Purge cache ctx.cache() - .purge("cluster.datacenters", [input.datacenter_id]) + .purge("cluster.datacenters2", [input.datacenter_id]) .await?; Ok(()) diff --git a/svc/pkg/cluster/src/workflows/server/mod.rs b/svc/pkg/cluster/src/workflows/server/mod.rs index 4d61ebe158..b90401fe81 100644 --- a/svc/pkg/cluster/src/workflows/server/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/mod.rs @@ -718,7 +718,7 @@ impl CustomListener for State { undrain // if drain && !taint taint // if !taint dns create // if !dns && !drain && !taint - dns delete // if !(dns || drain && taint) + dns delete // if dns && (!drain || !taint) nomad registered // always nomad drain complete // if drain */ @@ -744,7 +744,7 @@ impl CustomListener for State { signals.push(DnsCreate::NAME); } - if !(self.has_dns || self.draining && self.is_tainted) { + if self.has_dns && (!self.draining || !self.is_tainted) { signals.push(DnsDelete::NAME); }