Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,17 @@ impl DatabaseKv {
);

let workflow_name_key = keys::workflow::NameKey::new(workflow_id);
let wake_signal_key =
keys::workflow::WakeSignalKey::new(workflow_id, signal_name.to_string());

let (workflow_name_entry, wake_signal_entry) = tokio::try_join!(
tx.get(&self.subspace.pack(&workflow_name_key), Serializable),
tx.get(&self.subspace.pack(&wake_signal_key), Serializable),
)?;

// TODO: This does not check if the workflow is silenced
// Check if the workflow exists
let Some(workflow_name_entry) = tx
.get(&self.subspace.pack(&workflow_name_key), Serializable)
.await?
else {
let Some(workflow_name_entry) = workflow_name_entry else {
return Err(WorkflowError::WorkflowNotFound.into());
};

Expand Down Expand Up @@ -200,15 +204,8 @@ impl DatabaseKv {
&workflow_id_key.serialize(workflow_id)?,
);

let wake_signal_key =
keys::workflow::WakeSignalKey::new(workflow_id, signal_name.to_string());

// If the workflow currently has a wake signal key for this signal, wake it
if tx
.get(&self.subspace.pack(&wake_signal_key), Serializable)
.await?
.is_some()
{
if wake_signal_entry.is_some() {
let mut wake_condition_key = keys::wake::WorkflowWakeConditionKey::new(
workflow_name,
workflow_id,
Expand Down
Loading