Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/evil-hounds-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@pgflow/core': patch
---

Fix step:failed events not being broadcast when steps fail

Fixed a bug where step:failed events were not being broadcast to real-time subscribers when a step failed permanently. The issue was caused by PostgreSQL optimizing away the CTE that contained the realtime.send() call. The fix replaces the CTE approach with a direct PERFORM statement in the function body, ensuring the event is always sent when a step fails.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { describe, it, expect } from 'vitest';
import { withPgNoTransaction } from '../../helpers/db.js';
import { createTestSupabaseClient } from '../../helpers/setup.js';
import { grantMinimalPgflowPermissions } from '../../helpers/permissions.js';
import { PgflowClient } from '../../../src/lib/PgflowClient.js';

describe('Step Failed Event Broadcasting', () => {
it(
'should broadcast step:failed event when a step fails permanently',
withPgNoTransaction(async (sql) => {
// Grant minimal permissions
await grantMinimalPgflowPermissions(sql);

// Create test flow with max_attempts = 1 to fail immediately
await sql`SELECT pgflow.create_flow('step_failed_bug_test', max_attempts => 1)`;
await sql`SELECT pgflow.add_step('step_failed_bug_test', 'failing_step')`;

// Create clients
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);

// Start the flow
const [{ run_id: runId }] = await sql`
SELECT * FROM pgflow.start_flow('step_failed_bug_test', '{}'::jsonb)
`;

// Start the step
await sql`SELECT pgflow.start_ready_steps(${runId})`;

// Simulate worker processing: update task to 'started' status
await sql`
UPDATE pgflow.step_tasks
SET status = 'started',
started_at = now(),
attempts_count = 1
WHERE run_id = ${runId}
AND step_slug = 'failing_step'
AND task_index = 0
`;

// Fail the task - this triggers the bug
await sql`
SELECT pgflow.fail_task(
${runId},
'failing_step',
0,
'Step failed to demonstrate CTE optimization bug'
)
`;

// Check database state
const [runState] = await sql`
SELECT status FROM pgflow.runs WHERE run_id = ${runId}
`;
const [stepState] = await sql`
SELECT status FROM pgflow.step_states
WHERE run_id = ${runId} AND step_slug = 'failing_step'
`;

// Check broadcast events
const messages = await sql`
SELECT payload->>'event_type' as event_type
FROM realtime.messages
WHERE topic = ${'pgflow:run:' + runId}
ORDER BY inserted_at
`;

const eventTypes = messages.map(m => m.event_type);

console.log('\n=== Step Failed Event Bug Test Results ===');
console.log('Database State:');
console.log(' Run status:', runState.status);
console.log(' Step status:', stepState.status);
console.log('\nBroadcast Events:');
console.log(' Event types:', eventTypes);
console.log('\nBug Analysis:');
console.log(' run:failed event broadcast?', eventTypes.includes('run:failed') ? 'YES ✓' : 'NO ✗');
console.log(' step:failed event broadcast?', eventTypes.includes('step:failed') ? 'YES ✓' : 'NO ✗');

// Verify database state is correct
expect(runState.status).toBe('failed');
expect(stepState.status).toBe('failed');

// Verify run:failed was broadcast (this works)
expect(eventTypes).toContain('run:failed');

// Verify step:failed event is broadcast
// Regression: This fails due to PostgreSQL optimizing away the CTE that sends the event
expect(eventTypes).toContain('step:failed');
})
);
});
45 changes: 25 additions & 20 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set search_path to ''
as $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
begin

WITH run_lock AS (
Expand Down Expand Up @@ -79,27 +80,8 @@ maybe_fail_step AS (
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
-- Send broadcast event for step failed if necessary
broadcast_step_failed AS (
SELECT
realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
)
FROM maybe_fail_step
WHERE maybe_fail_step.status = 'failed'
)
-- Only decrement remaining_steps, don't update status
-- Update run status
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
Expand All @@ -112,6 +94,29 @@ SET status = CASE
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;

-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
-- Modify "fail_task" function
CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
begin

WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
ELSE NULL
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
-- Update run status
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;

-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;

-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);

-- For failed tasks: archive the message
PERFORM (
WITH failed_tasks AS (
SELECT r.flow_slug, st.message_id
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
)
SELECT pgmq.archive(ft.flow_slug, ft.message_id)
FROM failed_tasks ft
WHERE EXISTS (SELECT 1 FROM failed_tasks)
);

return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;

end;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
h1:LTAAz5tGQFyM+VudEr21XqdHQidvAL1paItX5pE1tHY=
h1:gcuV4nVlkChnjqJrnOMip8qP406j6/5daYRZi66QnQQ=
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql h1:rqhO47GgL/M+Yq6vrNHB775wegnqs+snewqdtUs/E70=
20250614124241_pgflow_add_realtime.sql h1:b461IlAMEyDrHCQ4JVN2RSbDOF0jdgCQCXe6qE1R3Rs=
20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql h1:vNSoTWGWOJmbgDmK8Ii067N8bGgKKvfzKZSoAhKysp8=
Loading