Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/clear-signal-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
'@pgflow/client': patch
'@pgflow/core': patch
---

Fix missing realtime broadcasts for step:started and step:completed events

**Critical bug fix:** Clients were not receiving `step:started` events when steps transitioned to Started status, and `step:completed` events for empty map steps and cascade completions were also missing.

**Root cause:** PostgreSQL query optimizer was eliminating CTEs containing `realtime.send()` calls because they were not referenced by subsequent operations or the final RETURN statement.

**Solution:** Moved `realtime.send()` calls directly into RETURNING clauses of UPDATE statements, ensuring they execute atomically with state changes and cannot be optimized away.

**Changes:**
- `start_ready_steps()`: Broadcasts step:started and step:completed events in RETURNING clauses
- `cascade_complete_taskless_steps()`: Broadcasts step:completed events atomically with cascade completion
- `complete_task()`: Added PERFORM statements for run:failed and step:failed broadcasts
- Client: Added `applySnapshot()` methods to FlowRun and FlowStep for proper initial state hydration without event emission
207 changes: 163 additions & 44 deletions pkgs/client/__tests__/integration/real-flow-execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ describe('Real Flow Execution', () => {
expect(run.flow_slug).toBe(testFlow.slug);

// Give realtime subscription time to establish properly
await new Promise(resolve => setTimeout(resolve, 2000));
await new Promise((resolve) => setTimeout(resolve, 2000));

// Poll for task
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);

expect(tasks).toHaveLength(1);
expect(tasks[0].input.run).toEqual(input);

Expand Down Expand Up @@ -112,7 +112,7 @@ describe('Real Flow Execution', () => {
step.on('*', stepTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 100));

// Poll and complete task
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
Expand Down Expand Up @@ -147,54 +147,120 @@ describe('Real Flow Execution', () => {
);

it(
'CRITICAL: broadcasts step:started events (CTE optimization bug check)',
'root steps: started immediately (verify via waitForStatus, not broadcasts)',
withPgNoTransaction(async (sql) => {
// This test specifically verifies that step:started events ARE broadcast
// It SHOULD FAIL until the CTE optimization bug is fixed in start_ready_steps()
// Root steps are started in the same transaction as start_flow()
// By the time startFlow() returns, they're already Started
// We can't observe these broadcasts - they happen before we can listen
// Instead, verify the state directly

const testFlow = createTestFlow('started_event_flow');
const testFlow = createTestFlow('root_started_flow');
await cleanupFlow(sql, testFlow.slug);
await grantMinimalPgflowPermissions(sql);
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'test_step')`;
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);

// Start flow - root step starts in this transaction
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
const step = run.step('test_step');
const step = run.step('root_step');

// Track ALL step events with event matchers
const tracker = createEventTracker();
step.on('*', tracker.callback);
// VERIFY: Step is already Started when startFlow() returns
expect(step.status).toBe(FlowStepStatus.Started);
expect(step.started_at).toBeDefined();

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));
// waitForStatus should return immediately (already Started)
await step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 1000 });

// Execute the step - this calls start_ready_steps() which should broadcast step:started
// Complete for cleanup
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);

// Wait a moment for broadcast to propagate
await new Promise(resolve => setTimeout(resolve, 500));

// Complete the task
await sqlClient.completeTask(tasks[0], { result: 'done' });
await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 });

// CRITICAL ASSERTIONS: Verify step:started WAS broadcast
// These will FAIL with the current CTE optimization bug!
await supabaseClient.removeAllChannels();
}),
15000
);

it(
'dependent steps: broadcasts step:started when they become ready',
withPgNoTransaction(async (sql) => {
// Dependent steps start AFTER their dependencies complete
// This happens AFTER startFlow() returns, so we CAN observe broadcasts
// This is the real test for step:started broadcasts!

const testFlow = createTestFlow('dependent_started_flow');
await cleanupFlow(sql, testFlow.slug);
await grantMinimalPgflowPermissions(sql);

await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'dependent_step', ARRAY['root_step'])`;

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);

// Start flow - only root_step starts
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
const rootStep = run.step('root_step');
const dependentStep = run.step('dependent_step');

// Root is started, dependent is still created (waiting for deps)
expect(rootStep.status).toBe(FlowStepStatus.Started);
expect(dependentStep.status).toBe(FlowStepStatus.Created);

// NOW set up event tracker (before completing root)
const tracker = createEventTracker();
dependentStep.on('*', tracker.callback);

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 100));

// Complete root step - this will trigger dependent_step to start
const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(rootTasks[0].step_slug).toBe('root_step');
await sqlClient.completeTask(rootTasks[0], { result: 'root done' });

// Wait for dependent to start
await dependentStep.waitForStatus(FlowStepStatus.Started, {
timeoutMs: 5000,
});

// VERIFY: We received step:started broadcast for dependent step
expect(tracker).toHaveReceivedEvent('step:started', {
run_id: run.run_id,
step_slug: 'test_step',
step_slug: 'dependent_step',
status: FlowStepStatus.Started,
});

// Verify proper event sequence
expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']);
expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed');
// Complete dependent step
const dependentTasks = await readAndStart(
sql,
sqlClient,
testFlow.slug,
1,
5
);
expect(dependentTasks[0].step_slug).toBe('dependent_step');
await sqlClient.completeTask(dependentTasks[0], {
result: 'dependent done',
});

// Wait for completion
await dependentStep.waitForStatus(FlowStepStatus.Completed, {
timeoutMs: 5000,
});

// Verify both events were received
// VERIFY: Proper event sequence
expect(tracker).toHaveReceivedEventSequence([
'step:started',
'step:completed',
]);
expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed');
expect(tracker).toHaveReceivedEventCount('step:started', 1);
expect(tracker).toHaveReceivedEventCount('step:completed', 1);

Expand All @@ -204,12 +270,13 @@ describe('Real Flow Execution', () => {
);

it(
'empty map steps: skip step:started and go straight to step:completed',
'empty map steps (root): completed immediately (verify via state)',
withPgNoTransaction(async (sql) => {
// This test verifies the EXPECTED behavior for empty map steps
// They should NOT send step:started, only step:completed
// Empty map steps with no tasks complete immediately
// Root empty maps complete in the start_flow transaction
// Can't observe broadcasts - verify state instead

const testFlow = createTestFlow('empty_map_flow');
const testFlow = createTestFlow('root_empty_map_flow');
await cleanupFlow(sql, testFlow.slug);
await grantMinimalPgflowPermissions(sql);

Expand All @@ -229,31 +296,81 @@ describe('Real Flow Execution', () => {
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);

// Start flow with empty array directly (root map steps expect array input)
// Start flow with empty array (root map steps expect array input)
const run = await pgflowClient.startFlow(testFlow.slug, []);
const step = run.step('empty_map_step');

// Track events
// VERIFY: Step is already Completed when startFlow() returns
expect(step.status).toBe(FlowStepStatus.Completed);
expect(step.completed_at).toBeDefined();

// Empty maps DO get started_at set (they transition through started briefly)
expect(step.started_at).toBeDefined();

await supabaseClient.removeAllChannels();
}),
15000
);

it(
'empty map steps (dependent): broadcasts step:completed when triggered',
withPgNoTransaction(async (sql) => {
// Dependent empty map steps complete AFTER their dependencies
// This happens AFTER startFlow() returns, so we CAN observe broadcasts
// They skip step:started and go directly to step:completed

const testFlow = createTestFlow('dependent_empty_map_flow');
await cleanupFlow(sql, testFlow.slug);
await grantMinimalPgflowPermissions(sql);

await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;
// Dependent map step that will receive empty array from root
await sql`SELECT pgflow.add_step(
${testFlow.slug},
'dependent_empty_map',
ARRAY['root_step'],
NULL,
NULL,
NULL,
NULL,
'map'
)`;

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);

const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
const rootStep = run.step('root_step');
const emptyMapStep = run.step('dependent_empty_map');

// Set up tracker before completing root
const tracker = createEventTracker();
step.on('*', tracker.callback);
emptyMapStep.on('*', tracker.callback);

// Give realtime time to establish
await new Promise(resolve => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 100));

// Wait for step to complete (should happen immediately)
await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 });
// Complete root with empty array (single steps feeding map steps output arrays directly)
const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
await sqlClient.completeTask(rootTasks[0], []);

// Wait for dependent to complete (should happen immediately)
await emptyMapStep.waitForStatus(FlowStepStatus.Completed, {
timeoutMs: 5000,
});

// Verify NO step:started event (expected for empty maps)
// VERIFY: NO step:started (empty maps skip this)
expect(tracker).toNotHaveReceivedEvent('step:started');

// Verify step:completed was sent
// VERIFY: Received step:completed directly
expect(tracker).toHaveReceivedEvent('step:completed', {
run_id: run.run_id,
step_slug: 'empty_map_step',
step_slug: 'dependent_empty_map',
status: FlowStepStatus.Completed,
});

// Verify only 1 event total
// VERIFY: Only 1 event total (completed, no started)
expect(tracker).toHaveReceivedTotalEvents(1);

await supabaseClient.removeAllChannels();
Expand Down Expand Up @@ -285,10 +402,12 @@ describe('Real Flow Execution', () => {
expect(step.started_at).toBeDefined();

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));
await new Promise((resolve) => setTimeout(resolve, 100));

// waitForStatus should resolve immediately since step is already Started
const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 });
const waitPromise = step.waitForStatus(FlowStepStatus.Started, {
timeoutMs: 5000,
});
const result = await waitPromise;
expect(result).toBe(step);
expect(step.status).toBe(FlowStepStatus.Started);
Expand Down
24 changes: 22 additions & 2 deletions pkgs/client/src/lib/FlowRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,33 @@ export class FlowRun<TFlow extends AnyFlow>
});
}

/**
* Apply state from database snapshot (no events emitted)
* Used when initializing state from start_flow_with_states() or get_run_with_states()
*
* @internal This method is only intended for use by PgflowClient.
* Applications should not call this directly.
*/
applySnapshot(row: import('@pgflow/core').RunRow): void {
// Direct state assignment from database row (no event conversion)
this.#state.status = row.status as FlowRunStatus;
this.#state.input = row.input as ExtractFlowInput<TFlow>;
this.#state.output = row.output as ExtractFlowOutput<TFlow> | null;
this.#state.started_at = row.started_at ? new Date(row.started_at) : null;
this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null;
this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null;
this.#state.remaining_steps = row.remaining_steps;
this.#state.error_message = null; // Database doesn't have error_message for runs
this.#state.error = null;
}

/**
* Updates the run state based on an event
*
*
* @internal This method is only intended for use by PgflowClient and tests.
* Applications should not call this directly - state updates should come from
* database events through the PgflowClient.
*
*
* TODO: After v1.0, make this method private and refactor tests to use PgflowClient
* with event emission instead of direct state manipulation.
*/
Expand Down
22 changes: 20 additions & 2 deletions pkgs/client/src/lib/FlowStep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,31 @@ export class FlowStep<
});
}

/**
* Apply state from database snapshot (no events emitted)
* Used when initializing state from start_flow_with_states() or get_run_with_states()
*
* @internal This method is only intended for use by PgflowClient.
* Applications should not call this directly.
*/
applySnapshot(row: import('@pgflow/core').StepStateRow): void {
// Direct state assignment from database row (no event conversion)
this.#state.status = row.status as FlowStepStatus;
this.#state.started_at = row.started_at ? new Date(row.started_at) : null;
this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null;
this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null;
this.#state.error_message = row.error_message;
this.#state.error = row.error_message ? new Error(row.error_message) : null;
// Note: output is not stored in step_states table, remains null
}

/**
* Updates the step state based on an event
*
*
* @internal This method is only intended for use by FlowRun and tests.
* Applications should not call this directly - state updates should come from
* database events through the PgflowClient.
*
*
* TODO: After v1.0, make this method private and refactor tests to use PgflowClient
* with event emission instead of direct state manipulation.
*/
Expand Down
Loading