diff --git a/.changeset/evil-hounds-own.md b/.changeset/evil-hounds-own.md new file mode 100644 index 000000000..98fa139cc --- /dev/null +++ b/.changeset/evil-hounds-own.md @@ -0,0 +1,7 @@ +--- +'@pgflow/core': patch +--- + +Fix step:failed events not being broadcast when steps fail + +Fixed a bug where step:failed events were not being broadcast to real-time subscribers when a step failed permanently. The issue was caused by PostgreSQL optimizing away the CTE that contained the realtime.send() call. The fix replaces the CTE approach with a direct PERFORM statement in the function body, ensuring the event is always sent when a step fails. diff --git a/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts new file mode 100644 index 000000000..2229404ab --- /dev/null +++ b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts @@ -0,0 +1,92 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../../helpers/db.js'; +import { createTestSupabaseClient } from '../../helpers/setup.js'; +import { grantMinimalPgflowPermissions } from '../../helpers/permissions.js'; +import { PgflowClient } from '../../../src/lib/PgflowClient.js'; + +describe('Step Failed Event Broadcasting', () => { + it( + 'should broadcast step:failed event when a step fails permanently', + withPgNoTransaction(async (sql) => { + // Grant minimal permissions + await grantMinimalPgflowPermissions(sql); + + // Create test flow with max_attempts = 1 to fail immediately + await sql`SELECT pgflow.create_flow('step_failed_bug_test', max_attempts => 1)`; + await sql`SELECT pgflow.add_step('step_failed_bug_test', 'failing_step')`; + + // Create clients + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Start the flow + const [{ run_id: runId }] = await sql` + SELECT * FROM pgflow.start_flow('step_failed_bug_test', '{}'::jsonb) + `; + + // Start the step + await sql`SELECT pgflow.start_ready_steps(${runId})`; + + // Simulate worker processing: update task to 'started' status + await sql` + UPDATE pgflow.step_tasks + SET status = 'started', + started_at = now(), + attempts_count = 1 + WHERE run_id = ${runId} + AND step_slug = 'failing_step' + AND task_index = 0 + `; + + // Fail the task - this triggers the bug + await sql` + SELECT pgflow.fail_task( + ${runId}, + 'failing_step', + 0, + 'Step failed to demonstrate CTE optimization bug' + ) + `; + + // Check database state + const [runState] = await sql` + SELECT status FROM pgflow.runs WHERE run_id = ${runId} + `; + const [stepState] = await sql` + SELECT status FROM pgflow.step_states + WHERE run_id = ${runId} AND step_slug = 'failing_step' + `; + + // Check broadcast events + const messages = await sql` + SELECT payload->>'event_type' as event_type + FROM realtime.messages + WHERE topic = ${'pgflow:run:' + runId} + ORDER BY inserted_at + `; + + const eventTypes = messages.map(m => m.event_type); + + console.log('\n=== Step Failed Event Bug Test Results ==='); + console.log('Database State:'); + console.log(' Run status:', runState.status); + console.log(' Step status:', stepState.status); + console.log('\nBroadcast Events:'); + console.log(' Event types:', eventTypes); + console.log('\nBug Analysis:'); + console.log(' run:failed event broadcast?', eventTypes.includes('run:failed') ? 'YES ✓' : 'NO ✗'); + console.log(' step:failed event broadcast?', eventTypes.includes('step:failed') ? 'YES ✓' : 'NO ✗'); + + // Verify database state is correct + expect(runState.status).toBe('failed'); + expect(stepState.status).toBe('failed'); + + // Verify run:failed was broadcast (this works) + expect(eventTypes).toContain('run:failed'); + + // Verify step:failed event is broadcast + // Regression: This fails due to PostgreSQL optimizing away the CTE that sends the event + expect(eventTypes).toContain('step:failed'); + }) + ); +}); \ No newline at end of file diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index d51f1968d..a3f328ab7 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -11,6 +11,7 @@ set search_path to '' as $$ DECLARE v_run_failed boolean; + v_step_failed boolean; begin WITH run_lock AS ( @@ -79,27 +80,8 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* -), --- Send broadcast event for step failed if necessary -broadcast_step_failed AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:failed', - 'run_id', fail_task.run_id, - 'step_slug', fail_task.step_slug, - 'status', 'failed', - 'error_message', fail_task.error_message, - 'failed_at', now() - ), - concat('step:', fail_task.step_slug, ':failed'), - concat('pgflow:run:', fail_task.run_id), - false - ) - FROM maybe_fail_step - WHERE maybe_fail_step.status = 'failed' ) --- Only decrement remaining_steps, don't update status +-- Update run status UPDATE pgflow.runs SET status = CASE WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' @@ -112,6 +94,29 @@ SET status = CASE WHERE pgflow.runs.run_id = fail_task.run_id RETURNING (status = 'failed') INTO v_run_failed; +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + -- Send broadcast event for run failure if the run was failed IF v_run_failed THEN DECLARE diff --git a/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql b/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql new file mode 100644 index 000000000..9782f5393 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql @@ -0,0 +1,185 @@ +-- Modify "fail_task" function +CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_failed boolean; + v_step_failed boolean; +begin + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = fail_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + FOR UPDATE +), +flow_info AS ( + SELECT r.flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id +), +config AS ( + SELECT + COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN flow_info fi ON fi.flow_slug = s.flow_slug + WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug +), +fail_or_retry_task as ( + UPDATE pgflow.step_tasks as task + SET + status = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued' + ELSE 'failed' + END, + failed_at = CASE + WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now() + ELSE NULL + END, + started_at = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL + ELSE task.started_at + END, + error_message = fail_task.error_message + WHERE task.run_id = fail_task.run_id + AND task.step_slug = fail_task.step_slug + AND task.task_index = fail_task.task_index + AND task.status = 'started' + RETURNING * +), +maybe_fail_step AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed' + ELSE pgflow.step_states.status + END, + failed_at = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now() + ELSE NULL + END, + error_message = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message + ELSE NULL + END + FROM fail_or_retry_task + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + RETURNING pgflow.step_states.* +) +-- Update run status +UPDATE pgflow.runs +SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END +WHERE pgflow.runs.run_id = fail_task.run_id +RETURNING (status = 'failed') INTO v_run_failed; + +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + +-- Send broadcast event for run failure if the run was failed +IF v_run_failed THEN + DECLARE + v_flow_slug text; + BEGIN + SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id; + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', fail_task.run_id, + 'flow_slug', v_flow_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', fail_task.run_id), + false + ); + END; +END IF; + +-- For queued tasks: delay the message for retry with exponential backoff +PERFORM ( + WITH retry_config AS ( + SELECT + COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN pgflow.runs r ON r.flow_slug = f.flow_slug + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug + ), + queued_tasks AS ( + SELECT + r.flow_slug, + st.message_id, + pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'queued' + ) + SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay) + FROM queued_tasks qt + WHERE EXISTS (SELECT 1 FROM queued_tasks) +); + +-- For failed tasks: archive the message +PERFORM ( + WITH failed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + ) + SELECT pgmq.archive(ft.flow_slug, ft.message_id) + FROM failed_tasks ft + WHERE EXISTS (SELECT 1 FROM failed_tasks) +); + +return query select * +from pgflow.step_tasks st +where st.run_id = fail_task.run_id + and st.step_slug = fail_task.step_slug + and st.task_index = fail_task.task_index; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 481cbe082..03b29f2d7 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,6 +1,7 @@ -h1:LTAAz5tGQFyM+VudEr21XqdHQidvAL1paItX5pE1tHY= +h1:gcuV4nVlkChnjqJrnOMip8qP406j6/5daYRZi66QnQQ= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= 20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql h1:rqhO47GgL/M+Yq6vrNHB775wegnqs+snewqdtUs/E70= 20250614124241_pgflow_add_realtime.sql h1:b461IlAMEyDrHCQ4JVN2RSbDOF0jdgCQCXe6qE1R3Rs= +20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql h1:vNSoTWGWOJmbgDmK8Ii067N8bGgKKvfzKZSoAhKysp8= diff --git a/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql new file mode 100644 index 000000000..9e0eb0294 --- /dev/null +++ b/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql @@ -0,0 +1,61 @@ +begin; +select plan(3); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and setup a flow with max_attempts = 1 to fail immediately +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'failing_step'); + +-- Start the flow and capture the run_id +with flow as ( + select * from pgflow.start_flow('test_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Poll and fail the task using helper function +-- Since max_attempts = 1, it will fail permanently +select pgflow_tests.poll_and_fail('test_flow'); + +-- Test 1: Verify step state is failed in database +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'failing_step'), + 'failed', + 'Step state should be marked as failed in the database' +); + +-- Test 2: Verify run:failed event was sent +select is( + pgflow_tests.count_realtime_events('run:failed', (select run_id from run_ids)), + 1::int, + 'run:failed event should be broadcast' +); + +-- Test 3: Verify step:failed event was sent (THIS IS THE BUG - IT WILL FAIL!) +select is( + pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'failing_step'), + 1::int, + 'step:failed event should be broadcast when step fails permanently' +); + +-- Additional debugging: show what events were actually sent +select diag('Events in realtime.messages:'); +select diag(concat(' ', mt.event, ': ', mt.payload::text)) +from realtime.messages m +cross join lateral ( + select + m.payload->>'event_type' as event, + m.payload +) mt +where m.topic = concat('pgflow:run:', (select run_id from run_ids)) +order by m.inserted_at; + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; \ No newline at end of file