From ed95665310dbbffa3b3a13e01b0cf5d42b813ed8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 22:07:10 +0000 Subject: [PATCH 1/2] Initial plan From db60a3f29f168d043468e1ffd0b5f2ebd8f8ef26 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 22:40:26 +0000 Subject: [PATCH 2/2] Fix signal fan-out to running sub-orchestrations and add race e2e coverage Agent-Logs-Url: https://github.com/microsoft/pg_durable/sessions/01258fe7-d415-4f63-95f4-9493c797fa20 Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- src/client.rs | 51 ++++++++++++++++++++ tests/e2e/sql/23_signal_in_race.sql | 72 +++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 tests/e2e/sql/23_signal_in_race.sql diff --git a/src/client.rs b/src/client.rs index af781970..61ff293f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -94,6 +94,42 @@ fn get_duroxide_client() -> Result<&'static Client, String> { }) } +async fn list_running_descendants(client: &Client, root_instance_id: &str) -> Vec { + let tree = match client.get_instance_tree(root_instance_id).await { + Ok(tree) => tree, + Err(e) => { + warning!( + "pg_durable: failed to inspect instance tree for signal fan-out (root={}): {:?}", + root_instance_id, + e + ); + return vec![]; + } + }; + + let mut descendants = Vec::new(); + for child_instance_id in tree.all_ids { + if child_instance_id == root_instance_id { + continue; + } + + match client.get_instance_info(&child_instance_id).await { + Ok(info) if info.status.eq_ignore_ascii_case("running") => { + descendants.push(child_instance_id); + } + Ok(_) => {} + Err(e) => { + warning!( + "pg_durable: failed to inspect child instance status for signal fan-out (child={}): {:?}", + child_instance_id, e + ); + } + } + } + + descendants +} + /// Start a durable function via the shared PostgreSQL store. pub fn start_durable_function( function_name: &str, @@ -141,6 +177,21 @@ pub fn raise_external_event(instance_id: &str, event_name: &str, data: &str) -> .raise_event(instance_id, event_name, data) .await .map_err(|e| format!("Failed to raise event: {e:?}"))?; + + for child_instance_id in list_running_descendants(client, instance_id).await { + if let Err(e) = client + .raise_event(&child_instance_id, event_name, data) + .await + { + warning!( + "pg_durable: failed to fan out signal '{}' to child instance {}: {:?}", + event_name, + child_instance_id, + e + ); + } + } + Ok(()) }) } diff --git a/tests/e2e/sql/23_signal_in_race.sql b/tests/e2e/sql/23_signal_in_race.sql new file mode 100644 index 00000000..b0670d82 --- /dev/null +++ b/tests/e2e/sql/23_signal_in_race.sql @@ -0,0 +1,72 @@ +-- Tests: df.signal propagation into wait_for_signal inside df.race branch +SET SESSION AUTHORIZATION df_e2e_user; + +DROP TABLE IF EXISTS test_signal_race_log; +CREATE TABLE test_signal_race_log ( + id SERIAL PRIMARY KEY, + branch TEXT NOT NULL, + data JSONB, + created_at TIMESTAMPTZ DEFAULT now() +); + +CREATE TEMP TABLE _test_signal_race_state (instance_id TEXT); + +INSERT INTO _test_signal_race_state +SELECT df.start( + df.race( + df.seq( + df.wait_for_signal('approve', 12) |=> 'sig', + 'INSERT INTO test_signal_race_log (branch, data) VALUES (''signal'', $sig::jsonb)' + ), + df.seq( + df.sleep(8), + 'INSERT INTO test_signal_race_log (branch, data) VALUES (''sleep'', ''{}''::jsonb)' + ) + ), + 'test-signal-in-race' +); + +SELECT pg_sleep(2); + +DO $$ +DECLARE + v_instance_id TEXT; +BEGIN + SELECT instance_id INTO v_instance_id FROM _test_signal_race_state; + PERFORM df.signal(v_instance_id, 'approve', '{"approved": true}'); +END $$; + +DO $$ +DECLARE + v_instance_id TEXT; + v_status TEXT; +BEGIN + SELECT instance_id INTO v_instance_id FROM _test_signal_race_state; + SELECT df.wait_for_completion(v_instance_id, 25) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [signal-in-race]: expected completed, got %', v_status; + END IF; + + IF NOT EXISTS ( + SELECT 1 + FROM test_signal_race_log + WHERE branch = 'signal' + AND (data->>'timed_out')::boolean = false + AND (data->'data'->>'approved')::boolean = true + ) THEN + RAISE EXCEPTION 'TEST FAILED [signal-in-race]: signal branch did not receive approve event'; + END IF; + + IF EXISTS (SELECT 1 FROM test_signal_race_log WHERE branch = 'sleep') THEN + RAISE EXCEPTION 'TEST FAILED [signal-in-race]: sleep branch unexpectedly won race'; + END IF; + + RAISE NOTICE 'TEST PASSED: signal_in_race'; +END $$; + +DROP TABLE _test_signal_race_state; +DROP TABLE test_signal_race_log; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED' AS result;