From 962cd078634f9777aadd49fd7b5b336272bea7ff Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 3 Nov 2025 14:09:09 +0100 Subject: [PATCH] refactor(schemas): remove broadcast empty completed and step started events for simplification - Eliminated broadcast_empty_completed CTE to streamline completed step event handling - Removed broadcast_events CTE for step:started events to improve query clarity - Replaced broadcast event logic with direct PERFORM statements for real-time event broadcasting - Enhanced real-time event broadcasting for step:started and step:completed events - Simplified event dispatching logic for better maintainability and performance --- .changeset/clear-signal-beam.md | 18 + .../integration/real-flow-execution.test.ts | 207 ++++-- pkgs/client/src/lib/FlowRun.ts | 24 +- pkgs/client/src/lib/FlowStep.ts | 22 +- pkgs/client/src/lib/PgflowClient.ts | 61 +- pkgs/core/project.json | 11 +- ...nction_cascade_complete_taskless_steps.sql | 48 +- .../schemas/0100_function_complete_task.sql | 44 +- .../0100_function_start_ready_steps.sql | 72 +- ...broadcast_order_and_timestamp_handling.sql | 622 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../broadcast_order.test.sql | 99 +++ .../deep_cascade_events.test.sql | 13 +- .../dependent_map_cascade_events.test.sql | 13 +- .../mixed_cascade_events.test.sql | 13 +- .../single_step_cascade_events.test.sql | 13 +- 16 files changed, 1098 insertions(+), 185 deletions(-) create mode 100644 .changeset/clear-signal-beam.md create mode 100644 pkgs/core/supabase/migrations/20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql create mode 100644 pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql diff --git a/.changeset/clear-signal-beam.md b/.changeset/clear-signal-beam.md new file mode 100644 index 000000000..85d6b7cec --- /dev/null +++ b/.changeset/clear-signal-beam.md @@ -0,0 +1,18 @@ +--- +'@pgflow/client': patch +'@pgflow/core': patch +--- + +Fix missing realtime broadcasts for step:started and step:completed events + +**Critical bug fix:** Clients were not receiving `step:started` events when steps transitioned to Started status, and `step:completed` events for empty map steps and cascade completions were also missing. + +**Root cause:** PostgreSQL query optimizer was eliminating CTEs containing `realtime.send()` calls because they were not referenced by subsequent operations or the final RETURN statement. + +**Solution:** Moved `realtime.send()` calls directly into RETURNING clauses of UPDATE statements, ensuring they execute atomically with state changes and cannot be optimized away. + +**Changes:** +- `start_ready_steps()`: Broadcasts step:started and step:completed events in RETURNING clauses +- `cascade_complete_taskless_steps()`: Broadcasts step:completed events atomically with cascade completion +- `complete_task()`: Added PERFORM statements for run:failed and step:failed broadcasts +- Client: Added `applySnapshot()` methods to FlowRun and FlowStep for proper initial state hydration without event emission diff --git a/pkgs/client/__tests__/integration/real-flow-execution.test.ts b/pkgs/client/__tests__/integration/real-flow-execution.test.ts index 7a650248c..270eca16a 100644 --- a/pkgs/client/__tests__/integration/real-flow-execution.test.ts +++ b/pkgs/client/__tests__/integration/real-flow-execution.test.ts @@ -36,11 +36,11 @@ describe('Real Flow Execution', () => { expect(run.flow_slug).toBe(testFlow.slug); // Give realtime subscription time to establish properly - await new Promise(resolve => setTimeout(resolve, 2000)); + await new Promise((resolve) => setTimeout(resolve, 2000)); // Poll for task const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); - + expect(tasks).toHaveLength(1); expect(tasks[0].input.run).toEqual(input); @@ -112,7 +112,7 @@ describe('Real Flow Execution', () => { step.on('*', stepTracker.callback); // Give realtime subscription time to establish - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 100)); // Poll and complete task const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); @@ -147,54 +147,120 @@ describe('Real Flow Execution', () => { ); it( - 'CRITICAL: broadcasts step:started events (CTE optimization bug check)', + 'root steps: started immediately (verify via waitForStatus, not broadcasts)', withPgNoTransaction(async (sql) => { - // This test specifically verifies that step:started events ARE broadcast - // It SHOULD FAIL until the CTE optimization bug is fixed in start_ready_steps() + // Root steps are started in the same transaction as start_flow() + // By the time startFlow() returns, they're already Started + // We can't observe these broadcasts - they happen before we can listen + // Instead, verify the state directly - const testFlow = createTestFlow('started_event_flow'); + const testFlow = createTestFlow('root_started_flow'); await cleanupFlow(sql, testFlow.slug); await grantMinimalPgflowPermissions(sql); await sql`SELECT pgflow.create_flow(${testFlow.slug})`; - await sql`SELECT pgflow.add_step(${testFlow.slug}, 'test_step')`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`; const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); const pgflowClient = new PgflowClient(supabaseClient); + // Start flow - root step starts in this transaction const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); - const step = run.step('test_step'); + const step = run.step('root_step'); - // Track ALL step events with event matchers - const tracker = createEventTracker(); - step.on('*', tracker.callback); + // VERIFY: Step is already Started when startFlow() returns + expect(step.status).toBe(FlowStepStatus.Started); + expect(step.started_at).toBeDefined(); - // Give realtime subscription time to establish - await new Promise(resolve => setTimeout(resolve, 100)); + // waitForStatus should return immediately (already Started) + await step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 1000 }); - // Execute the step - this calls start_ready_steps() which should broadcast step:started + // Complete for cleanup const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); - - // Wait a moment for broadcast to propagate - await new Promise(resolve => setTimeout(resolve, 500)); - - // Complete the task await sqlClient.completeTask(tasks[0], { result: 'done' }); await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); - // CRITICAL ASSERTIONS: Verify step:started WAS broadcast - // These will FAIL with the current CTE optimization bug! + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'dependent steps: broadcasts step:started when they become ready', + withPgNoTransaction(async (sql) => { + // Dependent steps start AFTER their dependencies complete + // This happens AFTER startFlow() returns, so we CAN observe broadcasts + // This is the real test for step:started broadcasts! + + const testFlow = createTestFlow('dependent_started_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'dependent_step', ARRAY['root_step'])`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Start flow - only root_step starts + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const rootStep = run.step('root_step'); + const dependentStep = run.step('dependent_step'); + + // Root is started, dependent is still created (waiting for deps) + expect(rootStep.status).toBe(FlowStepStatus.Started); + expect(dependentStep.status).toBe(FlowStepStatus.Created); + + // NOW set up event tracker (before completing root) + const tracker = createEventTracker(); + dependentStep.on('*', tracker.callback); + + // Give realtime subscription time to establish + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Complete root step - this will trigger dependent_step to start + const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(rootTasks[0].step_slug).toBe('root_step'); + await sqlClient.completeTask(rootTasks[0], { result: 'root done' }); + + // Wait for dependent to start + await dependentStep.waitForStatus(FlowStepStatus.Started, { + timeoutMs: 5000, + }); + + // VERIFY: We received step:started broadcast for dependent step expect(tracker).toHaveReceivedEvent('step:started', { run_id: run.run_id, - step_slug: 'test_step', + step_slug: 'dependent_step', status: FlowStepStatus.Started, }); - // Verify proper event sequence - expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); - expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed'); + // Complete dependent step + const dependentTasks = await readAndStart( + sql, + sqlClient, + testFlow.slug, + 1, + 5 + ); + expect(dependentTasks[0].step_slug).toBe('dependent_step'); + await sqlClient.completeTask(dependentTasks[0], { + result: 'dependent done', + }); + + // Wait for completion + await dependentStep.waitForStatus(FlowStepStatus.Completed, { + timeoutMs: 5000, + }); - // Verify both events were received + // VERIFY: Proper event sequence + expect(tracker).toHaveReceivedEventSequence([ + 'step:started', + 'step:completed', + ]); + expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed'); expect(tracker).toHaveReceivedEventCount('step:started', 1); expect(tracker).toHaveReceivedEventCount('step:completed', 1); @@ -204,12 +270,13 @@ describe('Real Flow Execution', () => { ); it( - 'empty map steps: skip step:started and go straight to step:completed', + 'empty map steps (root): completed immediately (verify via state)', withPgNoTransaction(async (sql) => { - // This test verifies the EXPECTED behavior for empty map steps - // They should NOT send step:started, only step:completed + // Empty map steps with no tasks complete immediately + // Root empty maps complete in the start_flow transaction + // Can't observe broadcasts - verify state instead - const testFlow = createTestFlow('empty_map_flow'); + const testFlow = createTestFlow('root_empty_map_flow'); await cleanupFlow(sql, testFlow.slug); await grantMinimalPgflowPermissions(sql); @@ -229,31 +296,81 @@ describe('Real Flow Execution', () => { const supabaseClient = createTestSupabaseClient(); const pgflowClient = new PgflowClient(supabaseClient); - // Start flow with empty array directly (root map steps expect array input) + // Start flow with empty array (root map steps expect array input) const run = await pgflowClient.startFlow(testFlow.slug, []); const step = run.step('empty_map_step'); - // Track events + // VERIFY: Step is already Completed when startFlow() returns + expect(step.status).toBe(FlowStepStatus.Completed); + expect(step.completed_at).toBeDefined(); + + // Empty maps DO get started_at set (they transition through started briefly) + expect(step.started_at).toBeDefined(); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'empty map steps (dependent): broadcasts step:completed when triggered', + withPgNoTransaction(async (sql) => { + // Dependent empty map steps complete AFTER their dependencies + // This happens AFTER startFlow() returns, so we CAN observe broadcasts + // They skip step:started and go directly to step:completed + + const testFlow = createTestFlow('dependent_empty_map_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`; + // Dependent map step that will receive empty array from root + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'dependent_empty_map', + ARRAY['root_step'], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const rootStep = run.step('root_step'); + const emptyMapStep = run.step('dependent_empty_map'); + + // Set up tracker before completing root const tracker = createEventTracker(); - step.on('*', tracker.callback); + emptyMapStep.on('*', tracker.callback); - // Give realtime time to establish - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 100)); - // Wait for step to complete (should happen immediately) - await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + // Complete root with empty array (single steps feeding map steps output arrays directly) + const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + await sqlClient.completeTask(rootTasks[0], []); + + // Wait for dependent to complete (should happen immediately) + await emptyMapStep.waitForStatus(FlowStepStatus.Completed, { + timeoutMs: 5000, + }); - // Verify NO step:started event (expected for empty maps) + // VERIFY: NO step:started (empty maps skip this) expect(tracker).toNotHaveReceivedEvent('step:started'); - // Verify step:completed was sent + // VERIFY: Received step:completed directly expect(tracker).toHaveReceivedEvent('step:completed', { run_id: run.run_id, - step_slug: 'empty_map_step', + step_slug: 'dependent_empty_map', status: FlowStepStatus.Completed, }); - // Verify only 1 event total + // VERIFY: Only 1 event total (completed, no started) expect(tracker).toHaveReceivedTotalEvents(1); await supabaseClient.removeAllChannels(); @@ -285,10 +402,12 @@ describe('Real Flow Execution', () => { expect(step.started_at).toBeDefined(); // Give realtime subscription time to establish - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 100)); // waitForStatus should resolve immediately since step is already Started - const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 }); + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { + timeoutMs: 5000, + }); const result = await waitPromise; expect(result).toBe(step); expect(step.status).toBe(FlowStepStatus.Started); diff --git a/pkgs/client/src/lib/FlowRun.ts b/pkgs/client/src/lib/FlowRun.ts index abd56fa41..891c75f96 100644 --- a/pkgs/client/src/lib/FlowRun.ts +++ b/pkgs/client/src/lib/FlowRun.ts @@ -264,13 +264,33 @@ export class FlowRun }); } + /** + * Apply state from database snapshot (no events emitted) + * Used when initializing state from start_flow_with_states() or get_run_with_states() + * + * @internal This method is only intended for use by PgflowClient. + * Applications should not call this directly. + */ + applySnapshot(row: import('@pgflow/core').RunRow): void { + // Direct state assignment from database row (no event conversion) + this.#state.status = row.status as FlowRunStatus; + this.#state.input = row.input as ExtractFlowInput; + this.#state.output = row.output as ExtractFlowOutput | null; + this.#state.started_at = row.started_at ? new Date(row.started_at) : null; + this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null; + this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null; + this.#state.remaining_steps = row.remaining_steps; + this.#state.error_message = null; // Database doesn't have error_message for runs + this.#state.error = null; + } + /** * Updates the run state based on an event - * + * * @internal This method is only intended for use by PgflowClient and tests. * Applications should not call this directly - state updates should come from * database events through the PgflowClient. - * + * * TODO: After v1.0, make this method private and refactor tests to use PgflowClient * with event emission instead of direct state manipulation. */ diff --git a/pkgs/client/src/lib/FlowStep.ts b/pkgs/client/src/lib/FlowStep.ts index 382a5aea7..717d62bd7 100644 --- a/pkgs/client/src/lib/FlowStep.ts +++ b/pkgs/client/src/lib/FlowStep.ts @@ -176,13 +176,31 @@ export class FlowStep< }); } + /** + * Apply state from database snapshot (no events emitted) + * Used when initializing state from start_flow_with_states() or get_run_with_states() + * + * @internal This method is only intended for use by PgflowClient. + * Applications should not call this directly. + */ + applySnapshot(row: import('@pgflow/core').StepStateRow): void { + // Direct state assignment from database row (no event conversion) + this.#state.status = row.status as FlowStepStatus; + this.#state.started_at = row.started_at ? new Date(row.started_at) : null; + this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null; + this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null; + this.#state.error_message = row.error_message; + this.#state.error = row.error_message ? new Error(row.error_message) : null; + // Note: output is not stored in step_states table, remains null + } + /** * Updates the step state based on an event - * + * * @internal This method is only intended for use by FlowRun and tests. * Applications should not call this directly - state updates should come from * database events through the PgflowClient. - * + * * TODO: After v1.0, make this method private and refactor tests to use PgflowClient * with event emission instead of direct state manipulation. */ diff --git a/pkgs/client/src/lib/PgflowClient.ts b/pkgs/client/src/lib/PgflowClient.ts index 53edb61ca..7843fd45d 100644 --- a/pkgs/client/src/lib/PgflowClient.ts +++ b/pkgs/client/src/lib/PgflowClient.ts @@ -1,7 +1,6 @@ import { v4 as uuidv4 } from 'uuid'; import type { SupabaseClient } from '@supabase/supabase-js'; import type { AnyFlow, ExtractFlowInput } from '@pgflow/dsl'; -import type { RunRow } from '@pgflow/core'; import { FlowRunStatus } from './types.js'; import type { IFlowClient, @@ -13,7 +12,7 @@ import type { } from './types.js'; import { SupabaseBroadcastAdapter } from './SupabaseBroadcastAdapter.js'; import { FlowRun } from './FlowRun.js'; -import { toTypedRunEvent, toTypedStepEvent, runRowToTypedEvent, stepStateRowToTypedEvent } from './eventAdapters.js'; +import { toTypedRunEvent, toTypedStepEvent } from './eventAdapters.js'; /** * Client for interacting with pgflow @@ -107,15 +106,15 @@ export class PgflowClient implements IFlowClien throw error; } - // Update the run state with the complete initial state snapshot + // Apply the run state snapshot (no events) if (data.run) { - run.updateState(runRowToTypedEvent(data.run)); + run.applySnapshot(data.run); } - // Update step states from the initial snapshot + // Apply step state snapshots (no events) if (data.steps && Array.isArray(data.steps)) { for (const stepState of data.steps) { - run.step(stepState.step_slug).updateState(stepStateRowToTypedEvent(stepState)); + run.step(stepState.step_slug).applySnapshot(stepState); } } @@ -210,37 +209,37 @@ export class PgflowClient implements IFlowClien return null; } - // Create initial state for the flow run - // Use type assertion since RunRow doesn't include error_message field - const runData = run as unknown as (RunRow & { error_message?: string }); - // Validate required fields - if (!runData.run_id || !runData.flow_slug || !runData.status) { + if (!run.run_id || !run.flow_slug || !run.status) { throw new Error('Invalid run data: missing required fields'); } - + // Validate status is a valid FlowRunStatus const validStatuses = Object.values(FlowRunStatus); - if (!validStatuses.includes(runData.status as FlowRunStatus)) { - throw new Error(`Invalid run data: invalid status '${runData.status}'`); + if (!validStatuses.includes(run.status as FlowRunStatus)) { + throw new Error(`Invalid run data: invalid status '${run.status}'`); } - + + // Create flow run with minimal initial state const initialState: FlowRunState = { - run_id: runData.run_id, - flow_slug: runData.flow_slug, - status: runData.status as FlowRunStatus, - input: runData.input as ExtractFlowInput, - output: runData.output as any, - error: runData.error_message ? new Error(runData.error_message) : null, - error_message: runData.error_message || null, - started_at: runData.started_at ? new Date(runData.started_at) : null, - completed_at: runData.completed_at ? new Date(runData.completed_at) : null, - failed_at: runData.failed_at ? new Date(runData.failed_at) : null, - remaining_steps: runData.remaining_steps || 0, + run_id: run.run_id, + flow_slug: run.flow_slug, + status: run.status as FlowRunStatus, + input: run.input as ExtractFlowInput, + output: null, + error: null, + error_message: null, + started_at: null, + completed_at: null, + failed_at: null, + remaining_steps: 0, }; - + // Create the flow run instance const flowRun = new FlowRun(initialState); + + // Apply the complete state from database snapshot + flowRun.applySnapshot(run); // Store the run this.#runs.set(run_id, flowRun); @@ -248,16 +247,16 @@ export class PgflowClient implements IFlowClien // Set up subscription for run and step events await this.#realtimeAdapter.subscribeToRun(run_id); - // Initialize steps + // Initialize steps from snapshot if (steps && Array.isArray(steps)) { for (const stepState of steps) { // Validate step has required fields if (!stepState.step_slug || !stepState.status) { throw new Error('Invalid step data: missing required fields'); } - - // Convert database step state to typed event - flowRun.step(stepState.step_slug).updateState(stepStateRowToTypedEvent(stepState)); + + // Apply snapshot state directly (no events) + flowRun.step(stepState.step_slug).applySnapshot(stepState); } } diff --git a/pkgs/core/project.json b/pkgs/core/project.json index 8c20654dc..94fc16d10 100644 --- a/pkgs/core/project.json +++ b/pkgs/core/project.json @@ -12,12 +12,6 @@ "{projectRoot}/supabase/migrations/atlas.sum", "{projectRoot}/scripts/atlas-verify-schemas-synced" ], - "atlasVerificationCache": [ - "{projectRoot}/.nx-inputs/verify-schemas-synced.txt" - ], - "migrationVerificationCache": [ - "{projectRoot}/.nx-inputs/verify-migrations.txt" - ], "databaseTypes": ["{projectRoot}/src/database-types.ts"], "pgtapTests": [ "{projectRoot}/supabase/tests/**/*.sql", @@ -43,8 +37,7 @@ "inputs": [ "schemas", "migrations", - "atlasSetup", - "atlasVerificationCache" + "atlasSetup" ], "outputs": ["{projectRoot}/.nx-inputs/verify-schemas-synced.txt"], "options": { @@ -60,7 +53,7 @@ "verify-migrations": { "executor": "nx:run-commands", "dependsOn": ["verify-schemas-synced", "supabase:ensure-started"], - "inputs": ["migrations", "migrationVerificationCache"], + "inputs": ["migrations"], "outputs": ["{projectRoot}/.nx-inputs/verify-migrations.txt"], "options": { "cwd": "{projectRoot}", diff --git a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql index d46c40de3..baad3175c 100644 --- a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql +++ b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql @@ -22,7 +22,20 @@ BEGIN -- ========================================== -- COMPLETE READY TASKLESS STEPS -- ========================================== - WITH completed AS ( + WITH + -- ---------- Find steps to complete in topological order ---------- + steps_to_complete AS ( + SELECT ss.run_id, ss.step_slug + FROM pgflow.step_states ss + JOIN pgflow.steps s ON s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + ORDER BY s.step_index + ), + completed AS ( -- ---------- Complete taskless steps ---------- -- Steps with initial_tasks=0 and no remaining deps UPDATE pgflow.step_states ss @@ -30,15 +43,30 @@ BEGIN started_at = now(), completed_at = now(), remaining_tasks = 0 - FROM pgflow.steps s - WHERE ss.run_id = cascade_complete_taskless_steps.run_id - AND ss.flow_slug = s.flow_slug - AND ss.step_slug = s.step_slug - AND ss.status = 'created' - AND ss.remaining_deps = 0 - AND ss.initial_tasks = 0 - -- Process in topological order to ensure proper cascade - RETURNING ss.* + FROM steps_to_complete stc + WHERE ss.run_id = stc.run_id + AND ss.step_slug = stc.step_slug + RETURNING + ss.*, + -- Broadcast step:completed event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', ss.run_id, + 'step_slug', ss.step_slug, + 'status', 'completed', + 'started_at', ss.started_at, + 'completed_at', ss.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', ss.step_slug, ':completed'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result -- Prefix with _ to indicate internal use only ), -- ---------- Update dependent steps ---------- -- Propagate completion and empty arrays to dependents diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index 34f0fdc02..0db64cc64 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -66,6 +66,21 @@ IF v_dependent_map_slug IS NOT NULL THEN failed_at = now() WHERE pgflow.runs.run_id = complete_task.run_id; + -- Broadcast run:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', complete_task.run_id, + 'flow_slug', v_run_record.flow_slug, + 'status', 'failed', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', complete_task.run_id), + false + ); + -- Archive all active messages (both queued and started) to prevent orphaned messages PERFORM pgmq.archive( v_run_record.flow_slug, @@ -101,6 +116,25 @@ IF v_dependent_map_slug IS NOT NULL THEN WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + -- Broadcast step:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'failed', + 'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END, + 'failed_at', now() + ), + concat('step:', complete_task.step_slug, ':failed'), + concat('pgflow:run:', complete_task.run_id), + false + ); + -- Archive the current task's message (it was started, now failed) PERFORM pgmq.archive( v_run_record.flow_slug, @@ -231,10 +265,8 @@ WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.st -- ---------- Handle step completion ---------- IF v_step_state.status = 'completed' THEN - -- Cascade complete any taskless steps that are now ready - PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); - - -- Broadcast step:completed event + -- Broadcast step:completed event FIRST (before cascade) + -- This ensures parent broadcasts before its dependent children -- For map steps, aggregate all task outputs; for single steps, use the task output PERFORM realtime.send( jsonb_build_object( @@ -262,6 +294,10 @@ IF v_step_state.status = 'completed' THEN concat('pgflow:run:', complete_task.run_id), false ); + + -- THEN cascade complete any taskless steps that are now ready + -- This ensures dependent children broadcast AFTER their parent + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); END IF; -- ---------- Archive completed task message ---------- diff --git a/pkgs/core/schemas/0100_function_start_ready_steps.sql b/pkgs/core/schemas/0100_function_start_ready_steps.sql index 3fd29ca1d..0c61ef564 100644 --- a/pkgs/core/schemas/0100_function_start_ready_steps.sql +++ b/pkgs/core/schemas/0100_function_start_ready_steps.sql @@ -39,28 +39,27 @@ completed_empty_steps AS ( FROM empty_map_steps WHERE pgflow.step_states.run_id = start_ready_steps.run_id AND pgflow.step_states.step_slug = empty_map_steps.step_slug - RETURNING pgflow.step_states.* -), --- ---------- Broadcast completion events ---------- -broadcast_empty_completed AS ( - SELECT + RETURNING + pgflow.step_states.*, + -- Broadcast step:completed event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner realtime.send( jsonb_build_object( 'event_type', 'step:completed', - 'run_id', completed_step.run_id, - 'step_slug', completed_step.step_slug, + 'run_id', pgflow.step_states.run_id, + 'step_slug', pgflow.step_states.step_slug, 'status', 'completed', - 'started_at', completed_step.started_at, - 'completed_at', completed_step.completed_at, + 'started_at', pgflow.step_states.started_at, + 'completed_at', pgflow.step_states.completed_at, 'remaining_tasks', 0, 'remaining_deps', 0, 'output', '[]'::jsonb ), - concat('step:', completed_step.step_slug, ':completed'), - concat('pgflow:run:', completed_step.run_id), + concat('step:', pgflow.step_states.step_slug, ':completed'), + concat('pgflow:run:', pgflow.step_states.run_id), false - ) - FROM completed_empty_steps AS completed_step + ) as _broadcast_completed -- Prefix with _ to indicate internal use only ), -- ========================================== @@ -94,7 +93,24 @@ started_step_states AS ( FROM ready_steps WHERE pgflow.step_states.run_id = start_ready_steps.run_id AND pgflow.step_states.step_slug = ready_steps.step_slug - RETURNING pgflow.step_states.* + RETURNING pgflow.step_states.*, + -- Broadcast step:started event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', pgflow.step_states.run_id, + 'step_slug', pgflow.step_states.step_slug, + 'status', 'started', + 'started_at', pgflow.step_states.started_at, + 'remaining_tasks', pgflow.step_states.remaining_tasks, + 'remaining_deps', pgflow.step_states.remaining_deps + ), + concat('step:', pgflow.step_states.step_slug, ':started'), + concat('pgflow:run:', pgflow.step_states.run_id), + false + ) as _broadcast_result -- Prefix with _ to indicate internal use only ), -- ========================================== @@ -139,26 +155,6 @@ sent_messages AS ( CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) WHERE task_indices.idx_ord = msg_ids.msg_ord -), - --- ---------- Broadcast step:started events ---------- -broadcast_events AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:started', - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'status', 'started', - 'started_at', started_step.started_at, - 'remaining_tasks', started_step.remaining_tasks, - 'remaining_deps', started_step.remaining_deps - ), - concat('step:', started_step.step_slug, ':started'), - concat('pgflow:run:', started_step.run_id), - false - ) - FROM started_step_states AS started_step ) -- ========================================== @@ -173,5 +169,13 @@ SELECT sent_messages.msg_id FROM sent_messages; +-- ========================================== +-- BROADCAST REALTIME EVENTS +-- ========================================== +-- Note: Both step:completed events for empty maps and step:started events +-- are now broadcast atomically in their respective CTEs using RETURNING pattern. +-- This ensures correct ordering, prevents duplicate broadcasts, and guarantees +-- that events are sent for exactly the rows that were updated. + end; $$; diff --git a/pkgs/core/supabase/migrations/20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql b/pkgs/core/supabase/migrations/20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql new file mode 100644 index 000000000..c353ce96d --- /dev/null +++ b/pkgs/core/supabase/migrations/20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql @@ -0,0 +1,622 @@ +-- Modify "cascade_complete_taskless_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_total_completed int := 0; + v_iteration_completed int; + v_iterations int := 0; + v_max_iterations int := 50; +BEGIN + -- ========================================== + -- ITERATIVE CASCADE COMPLETION + -- ========================================== + -- Completes taskless steps in waves until none remain + LOOP + -- ---------- Safety check ---------- + v_iterations := v_iterations + 1; + IF v_iterations > v_max_iterations THEN + RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; + END IF; + + -- ========================================== + -- COMPLETE READY TASKLESS STEPS + -- ========================================== + WITH + -- ---------- Find steps to complete in topological order ---------- + steps_to_complete AS ( + SELECT ss.run_id, ss.step_slug + FROM pgflow.step_states ss + JOIN pgflow.steps s ON s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + ORDER BY s.step_index + ), + completed AS ( + -- ---------- Complete taskless steps ---------- + -- Steps with initial_tasks=0 and no remaining deps + UPDATE pgflow.step_states ss + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM steps_to_complete stc + WHERE ss.run_id = stc.run_id + AND ss.step_slug = stc.step_slug + RETURNING + ss.*, + -- Broadcast step:completed event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', ss.run_id, + 'step_slug', ss.step_slug, + 'status', 'completed', + 'started_at', ss.started_at, + 'completed_at', ss.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', ss.step_slug, ':completed'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result -- Prefix with _ to indicate internal use only + ), + -- ---------- Update dependent steps ---------- + -- Propagate completion and empty arrays to dependents + dep_updates AS ( + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - dep_count.count, + -- If the dependent is a map step and its dependency completed with 0 tasks, + -- set its initial_tasks to 0 as well + initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 -- Empty array propagation + ELSE ss.initial_tasks -- Keep existing value (including NULL) + END + FROM ( + -- Aggregate dependency updates per dependent step + SELECT + d.flow_slug, + d.step_slug as dependent_slug, + COUNT(*) as count, + BOOL_OR(c.initial_tasks = 0) as has_zero_tasks + FROM completed c + JOIN pgflow.deps d ON d.flow_slug = c.flow_slug + AND d.dep_slug = c.step_slug + GROUP BY d.flow_slug, d.step_slug + ) dep_count, + pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = dep_count.flow_slug + AND ss.step_slug = dep_count.dependent_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug + ), + -- ---------- Update run counters ---------- + -- Only decrement remaining_steps; let maybe_complete_run handle finalization + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - c.completed_count + FROM (SELECT COUNT(*) AS completed_count FROM completed) c + WHERE r.run_id = cascade_complete_taskless_steps.run_id + AND c.completed_count > 0 + ) + -- ---------- Check iteration results ---------- + SELECT COUNT(*) INTO v_iteration_completed FROM completed; + + EXIT WHEN v_iteration_completed = 0; -- No more steps to complete + v_total_completed := v_total_completed + v_iteration_completed; + END LOOP; + + RETURN v_total_completed; +END; +$$; +-- Modify "start_ready_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +begin +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN + RETURN; +END IF; + +-- ========================================== +-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) +-- ========================================== +-- These complete immediately without spawning tasks +WITH empty_map_steps AS ( + SELECT step_state.* + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.step_type = 'map' + AND step_state.initial_tasks = 0 + ORDER BY step_state.step_slug + FOR UPDATE OF step_state +), +-- ---------- Complete empty map steps ---------- +completed_empty_steps AS ( + UPDATE pgflow.step_states + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM empty_map_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = empty_map_steps.step_slug + RETURNING + pgflow.step_states.*, + -- Broadcast step:completed event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', pgflow.step_states.run_id, + 'step_slug', pgflow.step_states.step_slug, + 'status', 'completed', + 'started_at', pgflow.step_states.started_at, + 'completed_at', pgflow.step_states.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', pgflow.step_states.step_slug, ':completed'), + concat('pgflow:run:', pgflow.step_states.run_id), + false + ) as _broadcast_completed -- Prefix with _ to indicate internal use only +), + +-- ========================================== +-- HANDLE NORMAL STEPS (initial_tasks > 0) +-- ========================================== +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count +ready_steps AS ( + SELECT * + FROM pgflow.step_states AS step_state + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps + -- Exclude empty map steps already handled + AND NOT EXISTS ( + SELECT 1 FROM empty_map_steps + WHERE empty_map_steps.run_id = step_state.run_id + AND empty_map_steps.step_slug = step_state.step_slug + ) + ORDER BY step_state.step_slug + FOR UPDATE +), +-- ---------- Mark steps as started ---------- +started_step_states AS ( + UPDATE pgflow.step_states + SET status = 'started', + started_at = now(), + remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting + FROM ready_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = ready_steps.step_slug + RETURNING pgflow.step_states.*, + -- Broadcast step:started event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', pgflow.step_states.run_id, + 'step_slug', pgflow.step_states.step_slug, + 'status', 'started', + 'started_at', pgflow.step_states.started_at, + 'remaining_tasks', pgflow.step_states.remaining_tasks, + 'remaining_deps', pgflow.step_states.remaining_deps + ), + concat('step:', pgflow.step_states.step_slug, ':started'), + concat('pgflow:run:', pgflow.step_states.run_id), + false + ) as _broadcast_result -- Prefix with _ to indicate internal use only +), + +-- ========================================== +-- TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) +message_batches AS ( + SELECT + started_step.flow_slug, + started_step.run_id, + started_step.step_slug, + COALESCE(step.opt_start_delay, 0) as delay, + array_agg( + jsonb_build_object( + 'flow_slug', started_step.flow_slug, + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'task_index', task_idx.task_index + ) ORDER BY task_idx.task_index + ) AS messages, + array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices + FROM started_step_states AS started_step + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug + AND step.step_slug = started_step.step_slug + -- Generate task indices from 0 to initial_tasks-1 + CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) + GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay +), +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays +sent_messages AS ( + SELECT + mb.flow_slug, + mb.run_id, + mb.step_slug, + task_indices.task_index, + msg_ids.msg_id + FROM message_batches mb + CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) + CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) + WHERE task_indices.idx_ord = msg_ids.msg_ord +) + +-- ========================================== +-- RECORD TASKS IN DATABASE +-- ========================================== +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) +SELECT + sent_messages.flow_slug, + sent_messages.run_id, + sent_messages.step_slug, + sent_messages.task_index, + sent_messages.msg_id +FROM sent_messages; + +-- ========================================== +-- BROADCAST REALTIME EVENTS +-- ========================================== +-- Note: Both step:completed events for empty maps and step:started events +-- are now broadcast atomically in their respective CTEs using RETURNING pattern. +-- This ensures correct ordering, prevents duplicate broadcasts, and guarantees +-- that events are sent for exactly the rows that were updated. + +end; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; +begin + +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = v_run_record.flow_slug + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +-- Handle type violation if detected +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Broadcast run:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', complete_task.run_id, + 'flow_slug', v_run_record.flow_slug, + 'status', 'failed', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Broadcast step:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'failed', + 'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END, + 'failed_at', now() + ), + concat('step:', complete_task.step_slug, ':failed'), + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering +), +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) + FOR UPDATE +), +-- ---------- Update child steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) + END + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Broadcast step:completed event FIRST (before cascade) + -- This ensures parent broadcasts before its dependent children + -- For map steps, aggregate all task outputs; for single steps, use the task output + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', CASE + WHEN (SELECT s.step_type FROM pgflow.steps s + WHERE s.flow_slug = v_step_state.flow_slug + AND s.step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed') + ELSE + -- Single step: use the individual task output + complete_task.output + END, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- THEN cascade complete any taskless steps that are now ready + -- This ensures dependent children broadcast AFTER their parent + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index a9cd0662c..d1dbb5669 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:oxuMOlzSnTahsfKHxdx7pYYNr4aK+j2BppGkrO23TtM= +h1:FClxPy988QMOIEYi66F38g1A8YpcTqps8VPEfuMG2Z4= 20250429164909_pgflow_initial.sql h1:pPSR5ej8e7VUxSam0jUyUWQguMenrmsd9gS2dpi9QHM= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:560T1AOw2XW+Hem7ZTt67Otl6PEdNwq4xFQVOnGY2Dk= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:7Ki2Jn9LhPAAGLs0ypwPfMn+9cPZ4ED9y23HWIa/O40= @@ -9,3 +9,4 @@ h1:oxuMOlzSnTahsfKHxdx7pYYNr4aK+j2BppGkrO23TtM= 20250707210212_pgflow_add_opt_start_delay.sql h1:hRR3baDRd0V41sVUq/l6z3fkAaNCSNLe8m0BvRIYpEU= 20250719205006_pgflow_worker_deprecation.sql h1:VIJnWcNC2tbbc5UV9kAnQP5SNlr7N9LLEUSB3/KWoU4= 20251006073122_pgflow_add_map_step_type.sql h1:glsi7ti7BWa7UwWZze8zDjlNZBNOv4+nPRKLYNRUOro= +20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:mP0YyOTmCmPIhvwC1d80RUs4syigRzYsmeeWi3i6piU= diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql new file mode 100644 index 000000000..16ec89236 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql @@ -0,0 +1,99 @@ +begin; +select plan(5); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow: +-- parent_step (single) -> taskless_child1 (map) -> taskless_child2 (map) +-- Parent outputs empty array, triggering cascade completion of map children +select pgflow_tests.reset_db(); +select pgflow.create_flow('order_test'); +select pgflow.add_step('order_test', 'parent_step', step_type => 'single'); +select pgflow.add_step('order_test', 'taskless_child1', deps_slugs => ARRAY['parent_step'], step_type => 'map'); +select pgflow.add_step('order_test', 'taskless_child2', deps_slugs => ARRAY['taskless_child1'], step_type => 'map'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('order_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start and complete parent_step task with EMPTY ARRAY output +-- This will trigger cascade completion of both taskless map children +select pgflow_tests.read_and_start('order_test', 1, 1); +select pgflow.complete_task( + (select run_id from run_ids), + 'parent_step', + 0, + '[]'::jsonb -- Empty array triggers cascade of dependent maps +); + +-- Test 1: All three step:completed events should be broadcast +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'Should broadcast 3 step:completed events (parent + 2 cascade children)' +); + +-- Test 2: Verify parent_step broadcast exists +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'parent_step'), + 1::int, + 'Parent step should broadcast step:completed' +); + +-- Test 3: Verify taskless_child1 broadcast exists +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'taskless_child1'), + 1::int, + 'Taskless child 1 should broadcast step:completed' +); + +-- Test 4: Verify taskless_child2 broadcast exists +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'taskless_child2'), + 1::int, + 'Taskless child 2 should broadcast step:completed' +); + +-- Test 5: CRITICAL - Verify broadcast order respects dependency graph +-- Parent MUST broadcast BEFORE its dependent children +-- Use inserted_at timestamp for ordering (UUIDs don't have order) +with ordered_events as ( + select + inserted_at, + payload->>'step_slug' as step_slug, + row_number() over (order by inserted_at) as event_order + from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'run_id' = (select run_id::text from run_ids) +), +parent_event as ( + select event_order as parent_order + from ordered_events + where step_slug = 'parent_step' +), +child1_event as ( + select event_order as child1_order + from ordered_events + where step_slug = 'taskless_child1' +), +child2_event as ( + select event_order as child2_order + from ordered_events + where step_slug = 'taskless_child2' +) +select ok( + (select parent_order from parent_event) < (select child1_order from child1_event) + AND (select child1_order from child1_event) < (select child2_order from child2_event), + 'CRITICAL: Events must arrive in dependency order (parent -> child1 -> child2)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql index a4f6dba5a..04c5f3c64 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(4); +select plan(3); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -45,17 +45,6 @@ select is( 'All step:completed events should have status "completed"' ); --- Test 4: Verify events broadcast in correct topological order -select results_eq( - $$ SELECT payload->>'step_slug' - FROM realtime.messages - WHERE payload->>'event_type' = 'step:completed' - AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) - ORDER BY id $$, - $$ VALUES ('root_map'), ('m1'), ('m2'), ('m3'), ('m4') $$, - 'Events should be broadcast in topological order: root_map, m1, m2, m3, m4' -); - -- Clean up drop table if exists run_ids; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql index 24940e3f7..8d5dc7540 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(6); +select plan(5); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -54,17 +54,6 @@ select is( 'Both step:completed events should have status "completed"' ); --- Test 6: Verify events broadcast in correct order (root_map first, then dependent_map) -select results_eq( - $$ SELECT payload->>'step_slug' - FROM realtime.messages - WHERE payload->>'event_type' = 'step:completed' - AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) - ORDER BY id $$, - $$ VALUES ('root_map'), ('dependent_map') $$, - 'Events should be broadcast in topological order: root_map, then dependent_map' -); - -- Clean up drop table if exists run_ids; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql index c4dcfceb8..a189c7e78 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(7); +select plan(6); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -71,17 +71,6 @@ select is( 'All three step:completed events should have empty array output' ); --- Test 7: Verify events broadcast in correct order -select results_eq( - $$ SELECT payload->>'step_slug' - FROM realtime.messages - WHERE payload->>'event_type' = 'step:completed' - AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) - ORDER BY id $$, - $$ VALUES ('producer'), ('map1'), ('map2') $$, - 'Events should be broadcast in topological order: producer, map1, map2' -); - -- Clean up drop table if exists run_ids; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql index 880b981fc..38e571436 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(4); +select plan(3); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -65,17 +65,6 @@ select is( 'Step C should broadcast step:completed event (from complete_task)' ); --- Test 4: Verify all three events broadcast in correct order -select results_eq( - $$ SELECT payload->>'step_slug' - FROM realtime.messages - WHERE payload->>'event_type' = 'step:completed' - AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) - ORDER BY id $$, - $$ VALUES ('step_a'), ('step_b'), ('step_c') $$, - 'Events should be broadcast in completion order: step_a, step_b, step_c' -); - -- Clean up drop table if exists run_ids;