From 480189f23d3d2078f32123077bc27651ccd068b3 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 18 Nov 2025 17:16:54 -0800 Subject: [PATCH] fix: parallelize publish signal awaits --- engine/packages/gasoline/src/db/kv/mod.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 474e271100..5245bfc224 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -142,13 +142,17 @@ impl DatabaseKv { ); let workflow_name_key = keys::workflow::NameKey::new(workflow_id); + let wake_signal_key = + keys::workflow::WakeSignalKey::new(workflow_id, signal_name.to_string()); + + let (workflow_name_entry, wake_signal_entry) = tokio::try_join!( + tx.get(&self.subspace.pack(&workflow_name_key), Serializable), + tx.get(&self.subspace.pack(&wake_signal_key), Serializable), + )?; // TODO: This does not check if the workflow is silenced // Check if the workflow exists - let Some(workflow_name_entry) = tx - .get(&self.subspace.pack(&workflow_name_key), Serializable) - .await? - else { + let Some(workflow_name_entry) = workflow_name_entry else { return Err(WorkflowError::WorkflowNotFound.into()); }; @@ -200,15 +204,8 @@ impl DatabaseKv { &workflow_id_key.serialize(workflow_id)?, ); - let wake_signal_key = - keys::workflow::WakeSignalKey::new(workflow_id, signal_name.to_string()); - // If the workflow currently has a wake signal key for this signal, wake it - if tx - .get(&self.subspace.pack(&wake_signal_key), Serializable) - .await? - .is_some() - { + if wake_signal_entry.is_some() { let mut wake_condition_key = keys::wake::WorkflowWakeConditionKey::new( workflow_name, workflow_id,