From fe18250a7ddef8b1d1a63ebb84a7ee2c1ff7513e Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 20:52:57 +0200 Subject: [PATCH 1/7] fix(tests): address broadcasting bug in step:failed event due to CTE optimization - Added new integration tests to demonstrate that step:failed events are not broadcasted - Included tests for both full and minimal reproduction scenarios highlighting the bug - Updated test setup to simulate failure conditions and verify event broadcasting - Documented the issue where CTE optimization causes realtime.send() to be skipped - These changes help reproduce and confirm the bug, guiding future fixes --- .../integration/step-failed-event-bug.test.ts | 96 ++++++++++++++ .../step-failed-event-minimal.test.ts | 124 ++++++++++++++++++ .../realtime/step_failed_event_bug.test.sql | 79 +++++++++++ 3 files changed, 299 insertions(+) create mode 100644 pkgs/client/__tests__/integration/step-failed-event-bug.test.ts create mode 100644 pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts create mode 100644 pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql diff --git a/pkgs/client/__tests__/integration/step-failed-event-bug.test.ts b/pkgs/client/__tests__/integration/step-failed-event-bug.test.ts new file mode 100644 index 000000000..4b0adb7c6 --- /dev/null +++ b/pkgs/client/__tests__/integration/step-failed-event-bug.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../helpers/db.js'; +import { createTestSupabaseClient } from '../helpers/setup.js'; +import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; +import { PgflowClient } from '../../src/lib/PgflowClient.js'; + +describe('Step Failed Event Broadcasting Bug', () => { + it( + 'demonstrates that step:failed events are not broadcast due to CTE optimization', + withPgNoTransaction(async (sql) => { + // Grant minimal permissions + await grantMinimalPgflowPermissions(sql); + + // Create test flow with max_attempts = 1 to fail immediately + await sql`SELECT pgflow.create_flow('step_failed_bug_test', max_attempts => 1)`; + await sql`SELECT pgflow.add_step('step_failed_bug_test', 'failing_step')`; + + // Create clients + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Start the flow + const [{ run_id: runId }] = await sql` + SELECT * FROM pgflow.start_flow('step_failed_bug_test', '{}'::jsonb) + `; + + // Start the step + await sql`SELECT pgflow.start_ready_steps(${runId})`; + + // Simulate worker processing: update task to 'started' status + await sql` + UPDATE pgflow.step_tasks + SET status = 'started', + started_at = now(), + attempts_count = 1 + WHERE run_id = ${runId} + AND step_slug = 'failing_step' + AND task_index = 0 + `; + + // Fail the task - this triggers the bug + await sql` + SELECT pgflow.fail_task( + ${runId}, + 'failing_step', + 0, + 'Step failed to demonstrate CTE optimization bug' + ) + `; + + // Check database state + const [runState] = await sql` + SELECT status FROM pgflow.runs WHERE run_id = ${runId} + `; + const [stepState] = await sql` + SELECT status FROM pgflow.step_states + WHERE run_id = ${runId} AND step_slug = 'failing_step' + `; + + // Check broadcast events + const messages = await sql` + SELECT payload->>'event_type' as event_type + FROM realtime.messages + WHERE topic = ${'pgflow:run:' + runId} + ORDER BY inserted_at + `; + + const eventTypes = messages.map(m => m.event_type); + + console.log('\n=== Step Failed Event Bug Test Results ==='); + console.log('Database State:'); + console.log(' Run status:', runState.status); + console.log(' Step status:', stepState.status); + console.log('\nBroadcast Events:'); + console.log(' Event types:', eventTypes); + console.log('\nBug Analysis:'); + console.log(' run:failed event broadcast?', eventTypes.includes('run:failed') ? 'YES ✓' : 'NO ✗'); + console.log(' step:failed event broadcast?', eventTypes.includes('step:failed') ? 'YES ✓' : 'NO ✗'); + + // Verify database state is correct + expect(runState.status).toBe('failed'); + expect(stepState.status).toBe('failed'); + + // Verify run:failed was broadcast (this works) + expect(eventTypes).toContain('run:failed'); + + // BUG: step:failed is NOT broadcast due to CTE optimization + // The following assertion SHOULD pass but WILL fail + expect(eventTypes).toContain('step:failed'); + + // This test intentionally fails to demonstrate the bug + // The fix would be to change the broadcast_step_failed CTE + // from using SELECT to using PERFORM like run:failed does + }) + ); +}); \ No newline at end of file diff --git a/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts b/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts new file mode 100644 index 000000000..338d0eee4 --- /dev/null +++ b/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts @@ -0,0 +1,124 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../helpers/db.js'; +import { createTestSupabaseClient } from '../helpers/setup.js'; +import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; +import { PgflowClient } from '../../src/lib/PgflowClient.js'; + +describe('Step Failed Event CTE Bug - Minimal Reproduction', () => { + it( + 'demonstrates CTE optimization removing step:failed broadcast', + withPgNoTransaction(async (sql) => { + // Grant minimal permissions + await grantMinimalPgflowPermissions(sql); + + // Create test flow with max_attempts = 1 to fail immediately + await sql`SELECT pgflow.create_flow('cte_bug_test', max_attempts => 1)`; + await sql`SELECT pgflow.add_step('cte_bug_test', 'test_step')`; + + // Create clients + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Start the flow + const [{ run_id: runId }] = await sql` + SELECT * FROM pgflow.start_flow('cte_bug_test', '{}'::jsonb) + `; + + expect(runId).toBeTruthy(); + + // Manually set up the step to be ready to fail + await sql`SELECT pgflow.start_ready_steps(${runId})`; + + // Update the task to 'started' status (simulating worker picking it up) + // This is required because fail_task only fails tasks that are in 'started' status + await sql` + UPDATE pgflow.step_tasks + SET status = 'started', + started_at = now(), + attempts_count = 1 + WHERE run_id = ${runId} + AND step_slug = 'test_step' + AND task_index = 0 + `; + + // Count realtime messages before fail_task + const beforeMessages = await sql` + SELECT event, payload->>'event_type' as event_type + FROM realtime.messages + WHERE topic = ${'pgflow:run:' + runId} + ORDER BY inserted_at + `; + + console.log('Messages before fail_task:', beforeMessages.length); + console.log('Event types before:', beforeMessages.map(m => m.event_type)); + + // Now fail the task - this is where the bug occurs + const failResult = await sql` + SELECT * FROM pgflow.fail_task( + ${runId}, + 'test_step', + 0, + 'Testing CTE optimization bug' + ) + `; + console.log('fail_task returned status:', failResult[0]?.status); + + // Check run status after fail_task + const runStatus = await sql` + SELECT status FROM pgflow.runs WHERE run_id = ${runId} + `; + console.log('Run status after fail_task:', runStatus[0].status); + + // Check step state after fail_task + const stepStateAfter = await sql` + SELECT status FROM pgflow.step_states + WHERE run_id = ${runId} AND step_slug = 'test_step' + `; + console.log('Step state after fail_task:', stepStateAfter[0].status); + + // Get all messages after fail_task + const afterMessages = await sql` + SELECT event, payload->>'event_type' as event_type + FROM realtime.messages + WHERE topic = ${'pgflow:run:' + runId} + ORDER BY inserted_at + `; + + console.log('Messages after fail_task:', afterMessages.length); + console.log('All event types:', afterMessages.map(m => m.event_type)); + + // Check for specific event types + const eventTypes = afterMessages.map(m => m.event_type); + const hasStepStarted = eventTypes.includes('step:started'); + const hasStepFailed = eventTypes.includes('step:failed'); + const hasRunFailed = eventTypes.includes('run:failed'); + + console.log('Has step:started event:', hasStepStarted); + console.log('Has step:failed event:', hasStepFailed); + console.log('Has run:failed event:', hasRunFailed); + + // Verify the database state is correct + expect(runStatus[0].status).toBe('failed'); + expect(stepStateAfter[0].status).toBe('failed'); + + // Verify that step:started was broadcast + expect(hasStepStarted).toBe(true); + + // Verify that run:failed was broadcast + expect(hasRunFailed).toBe(true); + + // This is the bug - step:failed event is not sent due to CTE optimization + if (!hasStepFailed) { + console.error('\n*** BUG CONFIRMED ***'); + console.error('step:failed event was NOT broadcast!'); + console.error('The CTE with SELECT realtime.send() was optimized away by PostgreSQL'); + console.error('Database state is correct (step marked as failed) but event was not sent'); + } + + expect(hasStepFailed).toBe(true); // This will FAIL, demonstrating the bug! + + // Cleanup + await sql`DELETE FROM pgflow.runs WHERE run_id = ${runId}`; + }) + ); +}); \ No newline at end of file diff --git a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql new file mode 100644 index 000000000..bda094cdc --- /dev/null +++ b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql @@ -0,0 +1,79 @@ +begin; +select plan(3); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and setup a flow with max_attempts = 1 to fail immediately +select pgflow_tests.reset_db(); +select pgflow.create_flow('test-flow', max_attempts => 1); +select pgflow.add_step('test-flow', 'failing-step'); + +-- Start the flow +with flow as ( + select * from pgflow.start_flow('test-flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Manually create a started task (simulating worker started processing) +insert into pgflow.step_tasks (run_id, step_slug, task_index, status, message_id, attempts_count) +select run_id, 'failing-step', 0, 'started', 12345, 1 +from run_ids; + +-- Update step state to started +update pgflow.step_states +set status = 'started', started_at = now() +where run_id = (select run_id from run_ids) + and step_slug = 'failing-step'; + +-- Call fail_task to fail the step permanently +select pgflow.fail_task( + (select run_id from run_ids), + 'failing-step', + 0, + 'Test failure to check step:failed event' +); + +-- Wait a bit for async processing +select pg_sleep(0.1); + +-- Test 1: Verify step state is failed in database +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'failing-step'), + 'failed', + 'Step state should be marked as failed in the database' +); + +-- Test 2: Verify run:failed event was sent +select is( + pgflow_tests.count_realtime_events('run:failed', (select run_id from run_ids)), + 1::int, + 'run:failed event should be broadcast' +); + +-- Test 3: Verify step:failed event was sent (THIS IS THE BUG - IT MIGHT FAIL!) +select is( + pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'failing-step'), + 1::int, + 'step:failed event should be broadcast when step fails permanently' +); + +-- Additional debugging: show what events were actually sent +select diag('Events in realtime.messages:'); +select diag(concat(' ', mt.event, ': ', mt.payload::text)) +from realtime.messages m +cross join lateral ( + select + m.payload->>'event_type' as event, + m.payload +) mt +where m.topic = concat('pgflow:run:', (select run_id from run_ids)) +order by m.inserted_at; + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; \ No newline at end of file From 2b1ea777155057a6ba7882806f139065338a9750 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 21:52:35 +0200 Subject: [PATCH 2/7] fix: address bug where step:failed event was not broadcast due to CTE optimization - Updated test to reflect manual status update and added note about missing step:started event - Corrected SQL in test setup to match step slug naming conventions - Fixed bug in SQL logic that caused step:failed event not to be sent by using PERFORM in CTE - Adjusted test assertions to verify event broadcasting after fix - Clarified comments and logging for better understanding of the bug and fix --- .../step-failed-event-minimal.test.ts | 15 +++++---- .../realtime/step_failed_event_bug.test.sql | 31 ++++++++++--------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts b/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts index 338d0eee4..f294e6c7e 100644 --- a/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts +++ b/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts @@ -101,24 +101,23 @@ describe('Step Failed Event CTE Bug - Minimal Reproduction', () => { expect(runStatus[0].status).toBe('failed'); expect(stepStateAfter[0].status).toBe('failed'); - // Verify that step:started was broadcast - expect(hasStepStarted).toBe(true); + // Note: step:started is not expected in this test because we manually + // update the task to 'started' status instead of using start_tasks + console.log('Note: step:started event not expected (manual status update)'); // Verify that run:failed was broadcast expect(hasRunFailed).toBe(true); - // This is the bug - step:failed event is not sent due to CTE optimization + // This was the bug - step:failed event was not sent due to CTE optimization + // The fix changes the CTE to use PERFORM in the main function body if (!hasStepFailed) { - console.error('\n*** BUG CONFIRMED ***'); + console.error('\n*** BUG DETECTED ***'); console.error('step:failed event was NOT broadcast!'); console.error('The CTE with SELECT realtime.send() was optimized away by PostgreSQL'); console.error('Database state is correct (step marked as failed) but event was not sent'); } - expect(hasStepFailed).toBe(true); // This will FAIL, demonstrating the bug! - - // Cleanup - await sql`DELETE FROM pgflow.runs WHERE run_id = ${runId}`; + expect(hasStepFailed).toBe(true); // Now fixed! }) ); }); \ No newline at end of file diff --git a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql index bda094cdc..33d2007e9 100644 --- a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql +++ b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql @@ -6,30 +6,31 @@ select pgflow_tests.create_realtime_partition(); -- Reset database and setup a flow with max_attempts = 1 to fail immediately select pgflow_tests.reset_db(); -select pgflow.create_flow('test-flow', max_attempts => 1); -select pgflow.add_step('test-flow', 'failing-step'); +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'failing_step'); -- Start the flow with flow as ( - select * from pgflow.start_flow('test-flow', '{}'::jsonb) + select * from pgflow.start_flow('test_flow', '{}'::jsonb) ) select run_id into temporary run_ids from flow; --- Manually create a started task (simulating worker started processing) -insert into pgflow.step_tasks (run_id, step_slug, task_index, status, message_id, attempts_count) -select run_id, 'failing-step', 0, 'started', 12345, 1 -from run_ids; +-- Start ready steps (this creates the task) +select pgflow.start_ready_steps((select run_id from run_ids)); --- Update step state to started -update pgflow.step_states -set status = 'started', started_at = now() +-- Update the task to started status (simulating worker processing) +update pgflow.step_tasks +set status = 'started', + started_at = now(), + attempts_count = 1 where run_id = (select run_id from run_ids) - and step_slug = 'failing-step'; + and step_slug = 'failing_step' + and task_index = 0; -- Call fail_task to fail the step permanently select pgflow.fail_task( (select run_id from run_ids), - 'failing-step', + 'failing_step', 0, 'Test failure to check step:failed event' ); @@ -41,7 +42,7 @@ select pg_sleep(0.1); select is( (select status from pgflow.step_states where run_id = (select run_id from run_ids) - and step_slug = 'failing-step'), + and step_slug = 'failing_step'), 'failed', 'Step state should be marked as failed in the database' ); @@ -53,9 +54,9 @@ select is( 'run:failed event should be broadcast' ); --- Test 3: Verify step:failed event was sent (THIS IS THE BUG - IT MIGHT FAIL!) +-- Test 3: Verify step:failed event was sent (THIS IS THE BUG - IT WILL FAIL!) select is( - pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'failing-step'), + pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'failing_step'), 1::int, 'step:failed event should be broadcast when step fails permanently' ); From f50f47005520ce4529ffd59172eb2f2802ac85cf Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 22:04:03 +0200 Subject: [PATCH 3/7] test: update SQL test to use helper function for failing a flow step Refactors the test to replace manual step status updates with a poll_and_fail helper, ensuring consistent failure handling and simplifying the test logic. --- .../realtime/step_failed_event_bug.test.sql | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql index 33d2007e9..9e0eb0294 100644 --- a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql +++ b/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql @@ -9,34 +9,15 @@ select pgflow_tests.reset_db(); select pgflow.create_flow('test_flow', max_attempts => 1); select pgflow.add_step('test_flow', 'failing_step'); --- Start the flow +-- Start the flow and capture the run_id with flow as ( select * from pgflow.start_flow('test_flow', '{}'::jsonb) ) select run_id into temporary run_ids from flow; --- Start ready steps (this creates the task) -select pgflow.start_ready_steps((select run_id from run_ids)); - --- Update the task to started status (simulating worker processing) -update pgflow.step_tasks -set status = 'started', - started_at = now(), - attempts_count = 1 -where run_id = (select run_id from run_ids) - and step_slug = 'failing_step' - and task_index = 0; - --- Call fail_task to fail the step permanently -select pgflow.fail_task( - (select run_id from run_ids), - 'failing_step', - 0, - 'Test failure to check step:failed event' -); - --- Wait a bit for async processing -select pg_sleep(0.1); +-- Poll and fail the task using helper function +-- Since max_attempts = 1, it will fail permanently +select pgflow_tests.poll_and_fail('test_flow'); -- Test 1: Verify step state is failed in database select is( From a6130acc363b71d4cb0cc8b6699582ddddfbd14e Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 22:08:47 +0200 Subject: [PATCH 4/7] fix: update test file paths and improve test descriptions for step failed event broadcasting Refactors test file locations, renames for clarity, and enhances test descriptions. Addresses a bug where step:failed events were not broadcast due to CTE optimization. Includes removal of obsolete test file and updates to test comments for better clarity. --- .../step-failed-event-bug.test.ts | 20 ++- .../step-failed-event-minimal.test.ts | 123 ------------------ .../step_failed_event_bug.test.sql | 0 3 files changed, 8 insertions(+), 135 deletions(-) rename pkgs/client/__tests__/integration/{ => regressions}/step-failed-event-bug.test.ts (79%) delete mode 100644 pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts rename pkgs/core/supabase/tests/{realtime => regressions}/step_failed_event_bug.test.sql (100%) diff --git a/pkgs/client/__tests__/integration/step-failed-event-bug.test.ts b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts similarity index 79% rename from pkgs/client/__tests__/integration/step-failed-event-bug.test.ts rename to pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts index 4b0adb7c6..2229404ab 100644 --- a/pkgs/client/__tests__/integration/step-failed-event-bug.test.ts +++ b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts @@ -1,12 +1,12 @@ import { describe, it, expect } from 'vitest'; -import { withPgNoTransaction } from '../helpers/db.js'; -import { createTestSupabaseClient } from '../helpers/setup.js'; -import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; -import { PgflowClient } from '../../src/lib/PgflowClient.js'; +import { withPgNoTransaction } from '../../helpers/db.js'; +import { createTestSupabaseClient } from '../../helpers/setup.js'; +import { grantMinimalPgflowPermissions } from '../../helpers/permissions.js'; +import { PgflowClient } from '../../../src/lib/PgflowClient.js'; -describe('Step Failed Event Broadcasting Bug', () => { +describe('Step Failed Event Broadcasting', () => { it( - 'demonstrates that step:failed events are not broadcast due to CTE optimization', + 'should broadcast step:failed event when a step fails permanently', withPgNoTransaction(async (sql) => { // Grant minimal permissions await grantMinimalPgflowPermissions(sql); @@ -84,13 +84,9 @@ describe('Step Failed Event Broadcasting Bug', () => { // Verify run:failed was broadcast (this works) expect(eventTypes).toContain('run:failed'); - // BUG: step:failed is NOT broadcast due to CTE optimization - // The following assertion SHOULD pass but WILL fail + // Verify step:failed event is broadcast + // Regression: This fails due to PostgreSQL optimizing away the CTE that sends the event expect(eventTypes).toContain('step:failed'); - - // This test intentionally fails to demonstrate the bug - // The fix would be to change the broadcast_step_failed CTE - // from using SELECT to using PERFORM like run:failed does }) ); }); \ No newline at end of file diff --git a/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts b/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts deleted file mode 100644 index f294e6c7e..000000000 --- a/pkgs/client/__tests__/integration/step-failed-event-minimal.test.ts +++ /dev/null @@ -1,123 +0,0 @@ -import { describe, it, expect } from 'vitest'; -import { withPgNoTransaction } from '../helpers/db.js'; -import { createTestSupabaseClient } from '../helpers/setup.js'; -import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; -import { PgflowClient } from '../../src/lib/PgflowClient.js'; - -describe('Step Failed Event CTE Bug - Minimal Reproduction', () => { - it( - 'demonstrates CTE optimization removing step:failed broadcast', - withPgNoTransaction(async (sql) => { - // Grant minimal permissions - await grantMinimalPgflowPermissions(sql); - - // Create test flow with max_attempts = 1 to fail immediately - await sql`SELECT pgflow.create_flow('cte_bug_test', max_attempts => 1)`; - await sql`SELECT pgflow.add_step('cte_bug_test', 'test_step')`; - - // Create clients - const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); - - // Start the flow - const [{ run_id: runId }] = await sql` - SELECT * FROM pgflow.start_flow('cte_bug_test', '{}'::jsonb) - `; - - expect(runId).toBeTruthy(); - - // Manually set up the step to be ready to fail - await sql`SELECT pgflow.start_ready_steps(${runId})`; - - // Update the task to 'started' status (simulating worker picking it up) - // This is required because fail_task only fails tasks that are in 'started' status - await sql` - UPDATE pgflow.step_tasks - SET status = 'started', - started_at = now(), - attempts_count = 1 - WHERE run_id = ${runId} - AND step_slug = 'test_step' - AND task_index = 0 - `; - - // Count realtime messages before fail_task - const beforeMessages = await sql` - SELECT event, payload->>'event_type' as event_type - FROM realtime.messages - WHERE topic = ${'pgflow:run:' + runId} - ORDER BY inserted_at - `; - - console.log('Messages before fail_task:', beforeMessages.length); - console.log('Event types before:', beforeMessages.map(m => m.event_type)); - - // Now fail the task - this is where the bug occurs - const failResult = await sql` - SELECT * FROM pgflow.fail_task( - ${runId}, - 'test_step', - 0, - 'Testing CTE optimization bug' - ) - `; - console.log('fail_task returned status:', failResult[0]?.status); - - // Check run status after fail_task - const runStatus = await sql` - SELECT status FROM pgflow.runs WHERE run_id = ${runId} - `; - console.log('Run status after fail_task:', runStatus[0].status); - - // Check step state after fail_task - const stepStateAfter = await sql` - SELECT status FROM pgflow.step_states - WHERE run_id = ${runId} AND step_slug = 'test_step' - `; - console.log('Step state after fail_task:', stepStateAfter[0].status); - - // Get all messages after fail_task - const afterMessages = await sql` - SELECT event, payload->>'event_type' as event_type - FROM realtime.messages - WHERE topic = ${'pgflow:run:' + runId} - ORDER BY inserted_at - `; - - console.log('Messages after fail_task:', afterMessages.length); - console.log('All event types:', afterMessages.map(m => m.event_type)); - - // Check for specific event types - const eventTypes = afterMessages.map(m => m.event_type); - const hasStepStarted = eventTypes.includes('step:started'); - const hasStepFailed = eventTypes.includes('step:failed'); - const hasRunFailed = eventTypes.includes('run:failed'); - - console.log('Has step:started event:', hasStepStarted); - console.log('Has step:failed event:', hasStepFailed); - console.log('Has run:failed event:', hasRunFailed); - - // Verify the database state is correct - expect(runStatus[0].status).toBe('failed'); - expect(stepStateAfter[0].status).toBe('failed'); - - // Note: step:started is not expected in this test because we manually - // update the task to 'started' status instead of using start_tasks - console.log('Note: step:started event not expected (manual status update)'); - - // Verify that run:failed was broadcast - expect(hasRunFailed).toBe(true); - - // This was the bug - step:failed event was not sent due to CTE optimization - // The fix changes the CTE to use PERFORM in the main function body - if (!hasStepFailed) { - console.error('\n*** BUG DETECTED ***'); - console.error('step:failed event was NOT broadcast!'); - console.error('The CTE with SELECT realtime.send() was optimized away by PostgreSQL'); - console.error('Database state is correct (step marked as failed) but event was not sent'); - } - - expect(hasStepFailed).toBe(true); // Now fixed! - }) - ); -}); \ No newline at end of file diff --git a/pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql similarity index 100% rename from pkgs/core/supabase/tests/realtime/step_failed_event_bug.test.sql rename to pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql From bd4afa5d9213d435a376d0d6b11ef4e1be9e1fcc Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 22:11:38 +0200 Subject: [PATCH 5/7] fix(core): improve fail_task function to handle step and run failure events and messaging Enhances the fail_task procedure to accurately check step failure status, send appropriate realtime events for both step and run failures, and manage message retries and archiving for failed tasks. Also updates migration to include the new realtime event handling logic. --- pkgs/core/schemas/0100_function_fail_task.sql | 47 +++-- ...w_fix_fail_task_missing_realtime_event.sql | 185 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- 3 files changed, 213 insertions(+), 22 deletions(-) create mode 100644 pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index d51f1968d..5deccf621 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -11,6 +11,7 @@ set search_path to '' as $$ DECLARE v_run_failed boolean; + v_step_failed boolean; begin WITH run_lock AS ( @@ -79,27 +80,8 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* -), --- Send broadcast event for step failed if necessary -broadcast_step_failed AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:failed', - 'run_id', fail_task.run_id, - 'step_slug', fail_task.step_slug, - 'status', 'failed', - 'error_message', fail_task.error_message, - 'failed_at', now() - ), - concat('step:', fail_task.step_slug, ':failed'), - concat('pgflow:run:', fail_task.run_id), - false - ) - FROM maybe_fail_step - WHERE maybe_fail_step.status = 'failed' ) --- Only decrement remaining_steps, don't update status +-- Update run status UPDATE pgflow.runs SET status = CASE WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' @@ -112,6 +94,29 @@ SET status = CASE WHERE pgflow.runs.run_id = fail_task.run_id RETURNING (status = 'failed') INTO v_run_failed; +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + -- Send broadcast event for run failure if the run was failed IF v_run_failed THEN DECLARE @@ -186,4 +191,4 @@ where st.run_id = fail_task.run_id and st.task_index = fail_task.task_index; end; -$$; +$$; \ No newline at end of file diff --git a/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql b/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql new file mode 100644 index 000000000..9782f5393 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql @@ -0,0 +1,185 @@ +-- Modify "fail_task" function +CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_failed boolean; + v_step_failed boolean; +begin + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = fail_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + FOR UPDATE +), +flow_info AS ( + SELECT r.flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id +), +config AS ( + SELECT + COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN flow_info fi ON fi.flow_slug = s.flow_slug + WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug +), +fail_or_retry_task as ( + UPDATE pgflow.step_tasks as task + SET + status = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued' + ELSE 'failed' + END, + failed_at = CASE + WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now() + ELSE NULL + END, + started_at = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL + ELSE task.started_at + END, + error_message = fail_task.error_message + WHERE task.run_id = fail_task.run_id + AND task.step_slug = fail_task.step_slug + AND task.task_index = fail_task.task_index + AND task.status = 'started' + RETURNING * +), +maybe_fail_step AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed' + ELSE pgflow.step_states.status + END, + failed_at = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now() + ELSE NULL + END, + error_message = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message + ELSE NULL + END + FROM fail_or_retry_task + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + RETURNING pgflow.step_states.* +) +-- Update run status +UPDATE pgflow.runs +SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END +WHERE pgflow.runs.run_id = fail_task.run_id +RETURNING (status = 'failed') INTO v_run_failed; + +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + +-- Send broadcast event for run failure if the run was failed +IF v_run_failed THEN + DECLARE + v_flow_slug text; + BEGIN + SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id; + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', fail_task.run_id, + 'flow_slug', v_flow_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', fail_task.run_id), + false + ); + END; +END IF; + +-- For queued tasks: delay the message for retry with exponential backoff +PERFORM ( + WITH retry_config AS ( + SELECT + COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN pgflow.runs r ON r.flow_slug = f.flow_slug + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug + ), + queued_tasks AS ( + SELECT + r.flow_slug, + st.message_id, + pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'queued' + ) + SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay) + FROM queued_tasks qt + WHERE EXISTS (SELECT 1 FROM queued_tasks) +); + +-- For failed tasks: archive the message +PERFORM ( + WITH failed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + ) + SELECT pgmq.archive(ft.flow_slug, ft.message_id) + FROM failed_tasks ft + WHERE EXISTS (SELECT 1 FROM failed_tasks) +); + +return query select * +from pgflow.step_tasks st +where st.run_id = fail_task.run_id + and st.step_slug = fail_task.step_slug + and st.task_index = fail_task.task_index; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 481cbe082..03b29f2d7 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,6 +1,7 @@ -h1:LTAAz5tGQFyM+VudEr21XqdHQidvAL1paItX5pE1tHY= +h1:gcuV4nVlkChnjqJrnOMip8qP406j6/5daYRZi66QnQQ= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= 20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql h1:rqhO47GgL/M+Yq6vrNHB775wegnqs+snewqdtUs/E70= 20250614124241_pgflow_add_realtime.sql h1:b461IlAMEyDrHCQ4JVN2RSbDOF0jdgCQCXe6qE1R3Rs= +20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql h1:vNSoTWGWOJmbgDmK8Ii067N8bGgKKvfzKZSoAhKysp8= From ab17a0c59d5962c91bb0896e0c16c3d1878e4f9f Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 22:13:47 +0200 Subject: [PATCH 6/7] fix(core): ensure step:failed events are broadcast when steps fail - Replace CTE-based broadcast with direct PERFORM statement - Add regression tests to prevent future issues - Fix column ambiguity in fail_task function --- .changeset/evil-hounds-own.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/evil-hounds-own.md diff --git a/.changeset/evil-hounds-own.md b/.changeset/evil-hounds-own.md new file mode 100644 index 000000000..98fa139cc --- /dev/null +++ b/.changeset/evil-hounds-own.md @@ -0,0 +1,7 @@ +--- +'@pgflow/core': patch +--- + +Fix step:failed events not being broadcast when steps fail + +Fixed a bug where step:failed events were not being broadcast to real-time subscribers when a step failed permanently. The issue was caused by PostgreSQL optimizing away the CTE that contained the realtime.send() call. The fix replaces the CTE approach with a direct PERFORM statement in the function body, ensuring the event is always sent when a step fails. From 6a3cdf6f48f9f452ddda5766eb1dfe6216214848 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 19 Jun 2025 22:19:45 +0200 Subject: [PATCH 7/7] fix: correct SQL syntax by removing extraneous end statement in schema file --- pkgs/core/schemas/0100_function_fail_task.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index 5deccf621..a3f328ab7 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -191,4 +191,4 @@ where st.run_id = fail_task.run_id and st.task_index = fail_task.task_index; end; -$$; \ No newline at end of file +$$;