Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,42 @@ fn get_duroxide_client() -> Result<&'static Client, String> {
})
}

async fn list_running_descendants(client: &Client, root_instance_id: &str) -> Vec<String> {
let tree = match client.get_instance_tree(root_instance_id).await {
Ok(tree) => tree,
Err(e) => {
warning!(
"pg_durable: failed to inspect instance tree for signal fan-out (root={}): {:?}",
root_instance_id,
e
);
return vec![];
}
};

let mut descendants = Vec::new();
for child_instance_id in tree.all_ids {
if child_instance_id == root_instance_id {
continue;
}

match client.get_instance_info(&child_instance_id).await {
Ok(info) if info.status.eq_ignore_ascii_case("running") => {
descendants.push(child_instance_id);
}
Ok(_) => {}
Err(e) => {
warning!(
"pg_durable: failed to inspect child instance status for signal fan-out (child={}): {:?}",
child_instance_id, e
);
}
}
}

descendants
}

/// Start a durable function via the shared PostgreSQL store.
pub fn start_durable_function(
function_name: &str,
Expand Down Expand Up @@ -141,6 +177,21 @@ pub fn raise_external_event(instance_id: &str, event_name: &str, data: &str) ->
.raise_event(instance_id, event_name, data)
.await
.map_err(|e| format!("Failed to raise event: {e:?}"))?;

for child_instance_id in list_running_descendants(client, instance_id).await {
if let Err(e) = client
.raise_event(&child_instance_id, event_name, data)
.await
{
warning!(
"pg_durable: failed to fan out signal '{}' to child instance {}: {:?}",
event_name,
child_instance_id,
e
);
}
}

Ok(())
})
}
72 changes: 72 additions & 0 deletions tests/e2e/sql/23_signal_in_race.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- Tests: df.signal propagation into wait_for_signal inside df.race branch
SET SESSION AUTHORIZATION df_e2e_user;

DROP TABLE IF EXISTS test_signal_race_log;
CREATE TABLE test_signal_race_log (
id SERIAL PRIMARY KEY,
branch TEXT NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);

CREATE TEMP TABLE _test_signal_race_state (instance_id TEXT);

INSERT INTO _test_signal_race_state
SELECT df.start(
df.race(
df.seq(
df.wait_for_signal('approve', 12) |=> 'sig',
'INSERT INTO test_signal_race_log (branch, data) VALUES (''signal'', $sig::jsonb)'
),
df.seq(
df.sleep(8),
'INSERT INTO test_signal_race_log (branch, data) VALUES (''sleep'', ''{}''::jsonb)'
)
),
'test-signal-in-race'
);

SELECT pg_sleep(2);

DO $$
DECLARE
v_instance_id TEXT;
BEGIN
SELECT instance_id INTO v_instance_id FROM _test_signal_race_state;
PERFORM df.signal(v_instance_id, 'approve', '{"approved": true}');
END $$;

DO $$
DECLARE
v_instance_id TEXT;
v_status TEXT;
BEGIN
SELECT instance_id INTO v_instance_id FROM _test_signal_race_state;
SELECT df.wait_for_completion(v_instance_id, 25) INTO v_status;

IF v_status != 'completed' THEN
RAISE EXCEPTION 'TEST FAILED [signal-in-race]: expected completed, got %', v_status;
END IF;

IF NOT EXISTS (
SELECT 1
FROM test_signal_race_log
WHERE branch = 'signal'
AND (data->>'timed_out')::boolean = false
AND (data->'data'->>'approved')::boolean = true
) THEN
RAISE EXCEPTION 'TEST FAILED [signal-in-race]: signal branch did not receive approve event';
END IF;

IF EXISTS (SELECT 1 FROM test_signal_race_log WHERE branch = 'sleep') THEN
RAISE EXCEPTION 'TEST FAILED [signal-in-race]: sleep branch unexpectedly won race';
END IF;

RAISE NOTICE 'TEST PASSED: signal_in_race';
END $$;

DROP TABLE _test_signal_race_state;
DROP TABLE test_signal_race_log;

RESET SESSION AUTHORIZATION;
SELECT 'TEST PASSED' AS result;
Loading