diff --git a/.claude/skills/pgtap-testing/SKILL.md b/.claude/skills/pgtap-testing/SKILL.md index 04b4ab67d..825757595 100644 --- a/.claude/skills/pgtap-testing/SKILL.md +++ b/.claude/skills/pgtap-testing/SKILL.md @@ -1,6 +1,6 @@ --- name: pgtap-testing -description: Guide pgTAP test writing in pgflow. Use when user asks to write pgTAP test, add test for feature, test SQL function, or asks how to test database scenarios. Provides test patterns and helper functions. +description: Guide pgTAP test writing in pgflow. Use when user asks to create tests, write tests, add tests, create test files, fix tests, improve tests, add missing tests, create realtime tests, write database tests, test SQL functions, test broadcast events, test realtime events, add test coverage, create step tests, create run tests, test pgflow functions, or asks how to test database scenarios. Provides test patterns, helper functions, and realtime event testing examples. Use for any pgTAP test creation or modification. --- # pgTAP Testing Guide diff --git a/pkgs/client/__tests__/FlowRun.test.ts b/pkgs/client/__tests__/FlowRun.test.ts index cb3ffd479..8f0728ffa 100644 --- a/pkgs/client/__tests__/FlowRun.test.ts +++ b/pkgs/client/__tests__/FlowRun.test.ts @@ -50,23 +50,26 @@ describe('FlowRun', () => { }); const startedEvent = createRunStartedEvent({ run_id: RUN_ID }); - + // Set up event tracking const allTracker = createEventTracker(); const startedTracker = createEventTracker(); run.on('*', allTracker.callback); run.on('started', startedTracker.callback); - + // Attempt to update state (should be rejected due to same status) const result = run.updateState(toTypedRunEvent(startedEvent)); - + // Verify state update was rejected expect(result).toBe(false); - - // Verify no events were emitted - expect(allTracker.events).toHaveLength(0); - expect(startedTracker.events).toHaveLength(0); - + + // Verify no events were emitted using comprehensive matchers + expect(allTracker).toHaveReceivedTotalEvents(0); + expect(startedTracker).toNotHaveReceivedEvent('run:started'); + expect(allTracker).toNotHaveReceivedEvent('run:started'); + expect(allTracker).toNotHaveReceivedEvent('run:completed'); + expect(allTracker).toNotHaveReceivedEvent('run:failed'); + // Check state remains unchanged expect(run.status).toBe(FlowRunStatus.Started); expect(run.remaining_steps).toBe(2); @@ -83,6 +86,12 @@ describe('FlowRun', () => { output: { result: 'success' }, }); + // Set up event tracking before state update + const allTracker = createEventTracker(); + const completedTracker = createEventTracker(); + run.on('*', allTracker.callback); + run.on('completed', completedTracker.callback); + // Update state with completed event const result = run.updateState(toTypedRunEvent(completedEvent)); @@ -94,11 +103,23 @@ describe('FlowRun', () => { expect(run.completed_at).toBeInstanceOf(Date); expect(run.output).toEqual({ result: 'success' }); - // Verify callbacks were called - const tracker = createEventTracker(); - run.on('completed', tracker.callback); - run.updateState(toTypedRunEvent(completedEvent)); // Will be rejected as already completed - expect(tracker.events).toHaveLength(0); + // Verify events were emitted with comprehensive matchers + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(completedTracker).toHaveReceivedEvent('run:completed'); + expect(allTracker).toHaveReceivedEvent('run:completed', { + run_id: RUN_ID, + status: FlowRunStatus.Completed, + output: { result: 'success' }, + }); + expect(allTracker).toNotHaveReceivedEvent('run:failed'); + expect(allTracker).toNotHaveReceivedEvent('run:started'); + + // Verify attempt to update again is rejected + const secondTracker = createEventTracker(); + run.on('*', secondTracker.callback); + const secondResult = run.updateState(toTypedRunEvent(completedEvent)); + expect(secondResult).toBe(false); + expect(secondTracker).toHaveReceivedTotalEvents(0); }); test('handles failed event correctly', () => { @@ -112,6 +133,12 @@ describe('FlowRun', () => { error_message: 'Something went wrong', }); + // Set up event tracking before state update + const allTracker = createEventTracker(); + const failedTracker = createEventTracker(); + run.on('*', allTracker.callback); + run.on('failed', failedTracker.callback); + // Update state with failed event const result = run.updateState(toTypedRunEvent(failedEvent)); @@ -124,6 +151,20 @@ describe('FlowRun', () => { expect(run.error_message).toBe('Something went wrong'); expect(run.error).toBeInstanceOf(Error); expect(run.error?.message).toBe('Something went wrong'); + + // Verify events were emitted with comprehensive matchers and payload validation + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(failedTracker).toHaveReceivedEvent('run:failed'); + expect(allTracker).toHaveReceivedEvent('run:failed', { + run_id: RUN_ID, + status: FlowRunStatus.Failed, + error_message: 'Something went wrong', + }); + expect(allTracker).toNotHaveReceivedEvent('run:completed'); + expect(allTracker).toNotHaveReceivedEvent('run:started'); + + // Note: Broadcast events don't include error as Error instances + // They only have error_message as strings (already verified above) }); test('handles events with missing fields gracefully', () => { @@ -282,6 +323,160 @@ describe('FlowRun', () => { }); }); + describe('Run Event Lifecycles', () => { + test('happy path: started → completed with full event sequence', () => { + const run = createFlowRun({ + run_id: RUN_ID, + remaining_steps: 1, + }); + + const tracker = createEventTracker(); + run.on('*', tracker.callback); + + // Simulate completed event (started already happened during creation) + const completedEvent = createRunCompletedEvent({ + run_id: RUN_ID, + output: { final: 'result' }, + }); + run.updateState(toTypedRunEvent(completedEvent)); + + // Verify event sequence and counts + expect(tracker).toHaveReceivedTotalEvents(1); + expect(tracker).toHaveReceivedEventCount('run:completed', 1); + expect(tracker).toNotHaveReceivedEvent('run:failed'); + + // Verify payload completeness + expect(tracker).toHaveReceivedEvent('run:completed', { + run_id: RUN_ID, + status: FlowRunStatus.Completed, + output: { final: 'result' }, + }); + }); + + test('failure path: started → failed with no completed event', () => { + const run = createFlowRun({ + run_id: RUN_ID, + remaining_steps: 1, + }); + + const tracker = createEventTracker(); + run.on('*', tracker.callback); + + // Simulate failed event + const failedEvent = createRunFailedEvent({ + run_id: RUN_ID, + error_message: 'Task execution failed', + }); + run.updateState(toTypedRunEvent(failedEvent)); + + // Verify event sequence - should have failed but NOT completed + expect(tracker).toHaveReceivedTotalEvents(1); + expect(tracker).toHaveReceivedEventCount('run:failed', 1); + expect(tracker).toNotHaveReceivedEvent('run:completed'); + + // Verify error payload + expect(tracker).toHaveReceivedEvent('run:failed', { + run_id: RUN_ID, + status: FlowRunStatus.Failed, + error_message: 'Task execution failed', + }); + }); + + test('event ordering: terminal events come after started', () => { + // Create run in created state to test full lifecycle + const run = new FlowRun({ + run_id: RUN_ID, + flow_slug: FLOW_SLUG, + status: FlowRunStatus.Started, + input: { foo: 'bar' } as any, + output: null, + error: null, + error_message: null, + started_at: new Date(), + completed_at: null, + failed_at: null, + remaining_steps: 1, + }); + + const tracker = createEventTracker(); + run.on('*', tracker.callback); + + // Emit a started event first + const startedEvent = createRunStartedEvent({ run_id: RUN_ID }); + run.updateState(toTypedRunEvent(startedEvent)); // Will be rejected as already started + + // Then completed + const completedEvent = createRunCompletedEvent({ run_id: RUN_ID }); + run.updateState(toTypedRunEvent(completedEvent)); + + // We should only have completed (started was rejected) + expect(tracker).toHaveReceivedTotalEvents(1); + expect(tracker).toHaveReceivedEvent('run:completed'); + }); + + test('comprehensive payload validation for completed events', () => { + const run = createFlowRun({ + run_id: RUN_ID, + remaining_steps: 1, + }); + + const tracker = createEventTracker(); + run.on('*', tracker.callback); + + const output = { + data: [1, 2, 3], + metadata: { count: 3, processed: true }, + }; + + const completedEvent = createRunCompletedEvent({ + run_id: RUN_ID, + output, + }); + run.updateState(toTypedRunEvent(completedEvent)); + + // Use matchers for comprehensive payload validation + expect(tracker).toHaveReceivedEventCount('run:completed', 1); + expect(tracker).toHaveReceivedEvent('run:completed', { + run_id: RUN_ID, + status: FlowRunStatus.Completed, + output, + // Note: completed events don't include error, error_message fields + }); + + // Note: Broadcast events have timestamps as ISO strings, not Date objects + // The event tracker stores them as-is (strings) + }); + + test('comprehensive payload validation for failed events', () => { + const run = createFlowRun({ + run_id: RUN_ID, + remaining_steps: 1, + }); + + const tracker = createEventTracker(); + run.on('*', tracker.callback); + + const errorMessage = 'Network timeout after 30s'; + const failedEvent = createRunFailedEvent({ + run_id: RUN_ID, + error_message: errorMessage, + }); + run.updateState(toTypedRunEvent(failedEvent)); + + // Use matchers for comprehensive payload validation + expect(tracker).toHaveReceivedEventCount('run:failed', 1); + expect(tracker).toHaveReceivedEvent('run:failed', { + run_id: RUN_ID, + status: FlowRunStatus.Failed, + error_message: errorMessage, + // Note: failed events don't include output field + }); + + // Note: Broadcast events don't include error as Error instances + // They only have error_message (string) and failed_at (ISO string), not Date objects + }); + }); + describe('foreign-run events protection', () => { test('ignores events for different run IDs', () => { const run = createFlowRun({ run_id: RUN_ID }); @@ -485,6 +680,122 @@ describe('FlowRun', () => { expect(result).toBe(run); expect(run.status).toBe(FlowRunStatus.Completed); }); + + test('resolves when target status Failed is reached', async () => { + const run = createFlowRun({ run_id: RUN_ID }); + const failedEvent = createRunFailedEvent({ + run_id: RUN_ID, + error_message: 'Task execution failed', + }); + + // Create a promise that should resolve when the status is updated to Failed + const waitPromise = run.waitForStatus(FlowRunStatus.Failed); + + // Update the status after a delay + setTimeout(() => { + run.updateState(toTypedRunEvent(failedEvent)); + }, 1000); + + // Advance timers to trigger the update + await advanceTimersAndFlush(1000); + + // Wait for the promise to resolve + const result = await waitPromise; + expect(result).toBe(run); + expect(run.status).toBe(FlowRunStatus.Failed); + expect(run.error_message).toBe('Task execution failed'); + }); + + test('waitForStatus(Failed) times out if failure does not occur', async () => { + const run = new FlowRun({ + run_id: RUN_ID, + flow_slug: FLOW_SLUG, + status: FlowRunStatus.Started, + input: { foo: 'bar' } as any, + output: null, + error: null, + error_message: null, + started_at: new Date(), + completed_at: null, + failed_at: null, + remaining_steps: 1, + }); + + // Should timeout after 5000ms if run never fails + const waitPromise = run.waitForStatus(FlowRunStatus.Failed, { + timeoutMs: 5000, + }); + + // Immediately add catch handler to avoid unhandled rejection + const expectPromise = expect(waitPromise).rejects.toThrow(/Timeout waiting for run/); + + // Advance timers past the timeout + await advanceTimersAndFlush(5001); + + // Wait for the expectation to complete + await expectPromise; + }); + + test('waitForStatus(Failed) can be aborted with AbortSignal', async () => { + const run = new FlowRun({ + run_id: RUN_ID, + flow_slug: FLOW_SLUG, + status: FlowRunStatus.Started, + input: { foo: 'bar' } as any, + output: null, + error: null, + error_message: null, + started_at: new Date(), + completed_at: null, + failed_at: null, + remaining_steps: 1, + }); + + // Create an abort controller + const controller = new AbortController(); + + // Create a promise that should be aborted + const waitPromise = run.waitForStatus(FlowRunStatus.Failed, { + signal: controller.signal, + }); + + // Immediately add catch handler to avoid unhandled rejection + const expectPromise = expect(waitPromise).rejects.toThrow(/Aborted waiting for run/); + + // Abort the operation + setTimeout(() => { + controller.abort(); + }, 1000); + + // Advance timers to trigger the abort + await advanceTimersAndFlush(1000); + + // Wait for the expectation to complete + await expectPromise; + }); + + test('waitForStatus(Failed) resolves if already in Failed status', async () => { + const run = new FlowRun({ + run_id: RUN_ID, + flow_slug: FLOW_SLUG, + status: FlowRunStatus.Failed, + input: { foo: 'bar' } as any, + output: null, + error: new Error('Already failed'), + error_message: 'Already failed', + started_at: new Date(), + completed_at: null, + failed_at: new Date(), + remaining_steps: 1, + }); + + // Should resolve immediately since already in Failed status + const waitPromise = run.waitForStatus(FlowRunStatus.Failed); + const result = await waitPromise; + + expect(result).toBe(run); + expect(run.status).toBe(FlowRunStatus.Failed); + }); }); describe('step caching', () => { diff --git a/pkgs/client/__tests__/FlowStep.test.ts b/pkgs/client/__tests__/FlowStep.test.ts index 02d7e793a..83b6a1991 100644 --- a/pkgs/client/__tests__/FlowStep.test.ts +++ b/pkgs/client/__tests__/FlowStep.test.ts @@ -46,6 +46,12 @@ describe('FlowStep', () => { step_slug: STEP_SLUG, }); + // Set up event tracking + const allTracker = createEventTracker(); + const startedTracker = createEventTracker(); + step.on('*', allTracker.callback); + step.on('started', startedTracker.callback); + // Update state and verify const result = step.updateState(startedEvent); expect(result).toBe(true); @@ -53,6 +59,17 @@ describe('FlowStep', () => { // Check state was updated correctly expect(step.status).toBe(FlowStepStatus.Started); expect(step.started_at).toBeInstanceOf(Date); + + // Verify events were emitted with comprehensive matchers + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(startedTracker).toHaveReceivedEvent('step:started'); + expect(allTracker).toHaveReceivedEvent('step:started', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Started, + }); + expect(allTracker).toNotHaveReceivedEvent('step:completed'); + expect(allTracker).toNotHaveReceivedEvent('step:failed'); }); test('handles completed event correctly', () => { @@ -69,6 +86,12 @@ describe('FlowStep', () => { output: { step_result: 'success' }, }); + // Set up event tracking + const allTracker = createEventTracker(); + const completedTracker = createEventTracker(); + step.on('*', allTracker.callback); + step.on('completed', completedTracker.callback); + // Update state and verify const result = step.updateState(completedEvent); expect(result).toBe(true); @@ -77,6 +100,18 @@ describe('FlowStep', () => { expect(step.status).toBe(FlowStepStatus.Completed); expect(step.completed_at).toBeInstanceOf(Date); expect(step.output).toEqual({ step_result: 'success' }); + + // Verify events were emitted with comprehensive matchers and payload validation + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(completedTracker).toHaveReceivedEvent('step:completed'); + expect(allTracker).toHaveReceivedEvent('step:completed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Completed, + output: { step_result: 'success' }, + }); + expect(allTracker).toNotHaveReceivedEvent('step:failed'); + expect(allTracker).toNotHaveReceivedEvent('step:started'); }); test('handles failed event correctly', () => { @@ -93,6 +128,12 @@ describe('FlowStep', () => { error_message: 'Step failed', }); + // Set up event tracking + const allTracker = createEventTracker(); + const failedTracker = createEventTracker(); + step.on('*', allTracker.callback); + step.on('failed', failedTracker.callback); + // Update state and verify const result = step.updateState(failedEvent); expect(result).toBe(true); @@ -103,6 +144,21 @@ describe('FlowStep', () => { expect(step.error_message).toBe('Step failed'); expect(step.error).toBeInstanceOf(Error); expect(step.error?.message).toBe('Step failed'); + + // Verify events were emitted with comprehensive matchers and payload validation + expect(allTracker).toHaveReceivedTotalEvents(1); + expect(failedTracker).toHaveReceivedEvent('step:failed'); + expect(allTracker).toHaveReceivedEvent('step:failed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Failed, + error_message: 'Step failed', + }); + expect(allTracker).toNotHaveReceivedEvent('step:completed'); + expect(allTracker).toNotHaveReceivedEvent('step:started'); + + // Note: Broadcast events don't include error as Error instances + // They only have error_message as strings (already verified above) }); test('handles events with missing fields gracefully', () => { @@ -269,6 +325,222 @@ describe('FlowStep', () => { }); }); + describe('Step Event Lifecycles', () => { + test('normal step: started → completed with full event sequence', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Simulate started event + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + step.updateState(startedEvent); + + // Simulate completed event + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output: { result: 'done' }, + }); + step.updateState(completedEvent); + + // Verify exact event sequence + expect(tracker).toHaveReceivedTotalEvents(2); + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed'); + expect(tracker).toNotHaveReceivedEvent('step:failed'); + + // Verify payload completeness + expect(tracker).toHaveReceivedEvent('step:started', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Started, + }); + expect(tracker).toHaveReceivedEvent('step:completed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Completed, + output: { result: 'done' }, + }); + }); + + test('normal step: started → failed with full event sequence', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Simulate started event + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + step.updateState(startedEvent); + + // Simulate failed event + const failedEvent = createStepFailedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + error_message: 'Handler threw exception', + }); + step.updateState(failedEvent); + + // Verify exact event sequence + expect(tracker).toHaveReceivedTotalEvents(2); + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:failed']); + expect(tracker).toHaveReceivedInOrder('step:started', 'step:failed'); + expect(tracker).toNotHaveReceivedEvent('step:completed'); + + // Verify error payload + expect(tracker).toHaveReceivedEvent('step:failed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Failed, + error_message: 'Handler threw exception', + }); + }); + + test('empty map step: completed ONLY (no started event)', () => { + // Empty maps skip started and go straight to completed + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Simulate completed event without started (empty map behavior) + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output: [], // Empty array output for empty map + }); + step.updateState(completedEvent); + + // Verify NO started event, only completed + expect(tracker).toHaveReceivedTotalEvents(1); + expect(tracker).toNotHaveReceivedEvent('step:started'); + expect(tracker).toHaveReceivedEvent('step:completed'); + expect(tracker).toHaveReceivedEventCount('step:completed', 1); + + // Verify empty array output + expect(tracker).toHaveReceivedEvent('step:completed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Completed, + output: [], + }); + }); + + test('comprehensive payload validation for started events', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + step.updateState(startedEvent); + + // Use matchers for comprehensive payload validation + expect(tracker).toHaveReceivedEventCount('step:started', 1); + expect(tracker).toHaveReceivedEvent('step:started', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Started, + // Note: started events don't include completed_at, failed_at, output, error fields + }); + + // Note: Broadcast events have timestamps as ISO strings, not Date objects + }); + + test('comprehensive payload validation for completed events', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Started, + started_at: new Date(), + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + const output = { + items: [1, 2, 3], + total: 3, + metadata: { processed: true }, + }; + + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output, + }); + step.updateState(completedEvent); + + // Use matchers for comprehensive payload validation + expect(tracker).toHaveReceivedEventCount('step:completed', 1); + expect(tracker).toHaveReceivedEvent('step:completed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Completed, + output, + // Note: completed events don't include error, error_message fields + }); + + // Note: Broadcast events have timestamps as ISO strings, not Date objects + }); + + test('comprehensive payload validation for failed events', () => { + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Started, + started_at: new Date(), + }); + + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + const errorMessage = 'Timeout: handler exceeded 30s limit'; + const failedEvent = createStepFailedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + error_message: errorMessage, + }); + step.updateState(failedEvent); + + // Use matchers for comprehensive payload validation + expect(tracker).toHaveReceivedEventCount('step:failed', 1); + expect(tracker).toHaveReceivedEvent('step:failed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Failed, + error_message: errorMessage, + // Note: failed events don't include output field + }); + + // Note: Broadcast events don't include error as Error instances + // They only have error_message (string) and failed_at (ISO string), not Date objects + }); + }); + test('ignores events for different step slugs', () => { const step = new FlowStep({ run_id: RUN_ID, @@ -497,5 +769,94 @@ describe('FlowStep', () => { const failedResult = await failedPromise; expect(failedResult.status).toBe(FlowStepStatus.Failed); }); + + test('EDGE CASE: waitForStatus(Started) times out for empty map steps', async () => { + // Empty map steps skip 'started' and go straight to 'completed' + // This test documents that waiting for 'started' will timeout + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + // Start waiting for 'started' status + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 2000 }); + const expectPromise = expect(waitPromise).rejects.toThrow(/Timeout waiting for step/); + + // Simulate empty map: goes directly to completed WITHOUT started event + setTimeout(() => { + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output: [], // Empty array indicates empty map + }); + step.updateState(completedEvent); + }, 500); + + await advanceTimersAndFlush(500); + + // Step should be completed now + expect(step.status).toBe(FlowStepStatus.Completed); + + // But waitForStatus(Started) should still timeout + await advanceTimersAndFlush(2000); + await expectPromise; + }); + + test('EDGE CASE: waitForStatus(Completed) succeeds immediately for empty maps', async () => { + // Empty map steps can go straight from Created to Completed + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + // Start waiting for completed + const waitPromise = step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + // Simulate empty map: goes directly to completed + setTimeout(() => { + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output: [], + }); + step.updateState(completedEvent); + }, 100); + + await advanceTimersAndFlush(100); + + // Should resolve successfully without ever seeing 'started' + const result = await waitPromise; + expect(result.status).toBe(FlowStepStatus.Completed); + expect(result.output).toEqual([]); + }); + + test('EDGE CASE: normal step waitForStatus(Started) works as expected', async () => { + // This test confirms normal steps DO send started events + const step = createFlowStep({ + run_id: RUN_ID, + step_slug: STEP_SLUG as any, + status: FlowStepStatus.Created, + }); + + // Normal steps should successfully wait for 'started' + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 }); + + // Simulate normal step: sends started event + setTimeout(() => { + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + step.updateState(startedEvent); + }, 100); + + await advanceTimersAndFlush(100); + + // Should resolve successfully + const result = await waitPromise; + expect(result.status).toBe(FlowStepStatus.Started); + }); }); }); \ No newline at end of file diff --git a/pkgs/client/__tests__/concurrent-operations.test.ts b/pkgs/client/__tests__/concurrent-operations.test.ts index 513801d74..474902f33 100644 --- a/pkgs/client/__tests__/concurrent-operations.test.ts +++ b/pkgs/client/__tests__/concurrent-operations.test.ts @@ -10,6 +10,7 @@ import { emitBroadcastEvent, mockSequentialUuids, setupConcurrentOperations, + createEventTracker, } from './helpers/test-utils'; import { createStepStartedEvent, @@ -151,23 +152,37 @@ describe('Concurrent Operations', () => { const run = await pgflowClient.getRun(RUN_ID); if (!run) throw new Error('Run not found'); - // Track events on the run - const runEvents: string[] = []; - run.on('*', (event) => runEvents.push(event.event_type)); + // Track events on the run with event tracker + const runTracker = createEventTracker(); + run.on('*', runTracker.callback); // Emit completed event - const completedEvent = createRunCompletedEvent({ run_id: RUN_ID }); + const output = { final: 'result' }; + const completedEvent = createRunCompletedEvent({ + run_id: RUN_ID, + output, + }); emitBroadcastEvent(mocks, 'run:completed', completedEvent); - // Verify event was received - expect(runEvents).toEqual(['run:completed']); + // Verify event was received with comprehensive matchers + expect(runTracker).toHaveReceivedTotalEvents(1); + expect(runTracker).toHaveReceivedEventCount('run:completed', 1); + expect(runTracker).toHaveReceivedEvent('run:completed', { + run_id: RUN_ID, + status: FlowRunStatus.Completed, + output, + }); + expect(runTracker).toNotHaveReceivedEvent('run:failed'); + expect(runTracker).toNotHaveReceivedEvent('run:started'); + + // Verify run state was updated expect(run.status).toBe(FlowRunStatus.Completed); + expect(run.output).toEqual(output); }); it('forwards step events through the client', async () => { const { client, mocks } = createMockClient(); - const pgflowClient = new PgflowClient(client); // Set up a run @@ -184,26 +199,47 @@ describe('Concurrent Operations', () => { const step = run.step(STEP_SLUG as any); - // Track events on the step - const stepEvents: string[] = []; - step.on('*', (event) => stepEvents.push(event.event_type)); + // Track events on the step with event tracker + const stepTracker = createEventTracker(); + step.on('*', stepTracker.callback); // Get the broadcast handler and emit events - const startedEvent = createStepStartedEvent({ run_id: RUN_ID }); - const completedEvent = createStepCompletedEvent({ run_id: RUN_ID }); - + const startedEvent = createStepStartedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + }); + const output = { step_result: 'success' }; + const completedEvent = createStepCompletedEvent({ + run_id: RUN_ID, + step_slug: STEP_SLUG, + output, + }); + emitBroadcastEvent(mocks, 'step:started', startedEvent); emitBroadcastEvent(mocks, 'step:completed', completedEvent); - // Verify events were received (step was already started, so only completed event is processed) - expect(stepEvents).toEqual(['step:completed']); + // Verify events were received with comprehensive matchers + // Note: step was already in 'started' state from stepStatesSample, + // so the started event is rejected and only completed is processed + expect(stepTracker).toHaveReceivedTotalEvents(1); + expect(stepTracker).toHaveReceivedEventCount('step:completed', 1); + expect(stepTracker).toHaveReceivedEvent('step:completed', { + run_id: RUN_ID, + step_slug: STEP_SLUG, + status: FlowStepStatus.Completed, + output, + }); + expect(stepTracker).toNotHaveReceivedEvent('step:started'); + expect(stepTracker).toNotHaveReceivedEvent('step:failed'); + + // Verify step state was updated expect(step.status).toBe(FlowStepStatus.Completed); + expect(step.output).toEqual(output); }); it('ignores events with wrong run_id', async () => { const { client, mocks } = createMockClient(); - const pgflowClient = new PgflowClient(client); // Set up a run @@ -218,17 +254,27 @@ describe('Concurrent Operations', () => { const run = await pgflowClient.getRun(RUN_ID); if (!run) throw new Error('Run not found'); - // Track events - const events: string[] = []; - run.on('*', (event) => events.push(event.event_type)); + // Track events with event tracker + const runTracker = createEventTracker(); + run.on('*', runTracker.callback); // Emit event with different run_id - const wrongRunEvent = createRunCompletedEvent({ run_id: 'different-id' }); + const wrongRunId = 'different-id-12345'; + const wrongRunEvent = createRunCompletedEvent({ + run_id: wrongRunId, + output: { should: 'be ignored' }, + }); emitBroadcastEvent(mocks, 'run:completed', wrongRunEvent); - // Should not receive event - expect(events).toEqual([]); + // Verify no events were received using comprehensive matchers + expect(runTracker).toHaveReceivedTotalEvents(0); + expect(runTracker).toNotHaveReceivedEvent('run:completed'); + expect(runTracker).toNotHaveReceivedEvent('run:failed'); + expect(runTracker).toNotHaveReceivedEvent('run:started'); + + // Verify run state remains unchanged expect(run.status).toBe(FlowRunStatus.Started); + expect(run.output).toBeNull(); }); }); diff --git a/pkgs/client/__tests__/event-matchers.test.ts b/pkgs/client/__tests__/event-matchers.test.ts new file mode 100644 index 000000000..4d6700da0 --- /dev/null +++ b/pkgs/client/__tests__/event-matchers.test.ts @@ -0,0 +1,358 @@ +import { describe, test, expect } from 'vitest'; +import { createEventTracker } from './helpers/test-utils'; +import type { BroadcastRunEvent, BroadcastStepEvent } from '../src/lib/types'; + +/** + * Tests demonstrating the new event matcher patterns + * + * This file serves as both tests and documentation for how to use + * the custom event matchers for comprehensive event testing. + */ +describe('Event Matchers (Examples)', () => { + describe('toHaveReceivedEvent', () => { + test('asserts event type was received', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: { foo: 'bar' }, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + // Assert event was received + expect(tracker).toHaveReceivedEvent('run:started'); + }); + + test('asserts event with matching payload', () => { + const tracker = createEventTracker(); + const testInput = { foo: 'bar' }; + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: testInput, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + // Assert event with specific payload fields + expect(tracker).toHaveReceivedEvent('run:started', { + run_id: 'test-run', + flow_slug: 'test-flow', + input: testInput, + }); + }); + + test('can be negated', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: {}, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + // Assert event was NOT received using standard Vitest negation + expect(tracker).not.toHaveReceivedEvent('run:failed'); + }); + }); + + describe('toHaveReceivedEventSequence', () => { + test('asserts exact event sequence', () => { + const tracker = createEventTracker(); + + // Emit events in sequence + tracker.callback({ + event_type: 'step:started', + run_id: 'test-run', + step_slug: 'step1', + status: 'started', + started_at: new Date().toISOString(), + }); + + tracker.callback({ + event_type: 'step:completed', + run_id: 'test-run', + step_slug: 'step1', + status: 'completed', + output: { result: 'success' }, + completed_at: new Date().toISOString(), + }); + + // Assert exact sequence + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + }); + + test('fails if sequence is wrong', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'step:completed', + run_id: 'test-run', + step_slug: 'step1', + status: 'completed', + output: {}, + completed_at: new Date().toISOString(), + }); + + tracker.callback({ + event_type: 'step:started', + run_id: 'test-run', + step_slug: 'step1', + status: 'started', + started_at: new Date().toISOString(), + }); + + // This will fail because order is wrong + expect(() => { + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + }).toThrow(); + }); + }); + + describe('toHaveReceivedEventSubsequence', () => { + test('allows gaps in sequence', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: {}, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 2, + }); + + // Some intermediate events... + tracker.callback({ + event_type: 'run:progress' as any, + run_id: 'test-run', + remaining_steps: 1, + } as any); + + tracker.callback({ + event_type: 'run:completed', + run_id: 'test-run', + flow_slug: 'test-flow', + status: 'completed', + output: {}, + completed_at: new Date().toISOString(), + remaining_steps: 0, + }); + + // Assert subsequence (gaps allowed) + expect(tracker).toHaveReceivedEventSubsequence(['run:started', 'run:completed']); + }); + }); + + describe('toHaveReceivedAtLeast', () => { + test('asserts minimum event count', () => { + const tracker = createEventTracker(); + + // Emit multiple step events + for (let i = 0; i < 3; i++) { + tracker.callback({ + event_type: 'step:completed', + run_id: 'test-run', + step_slug: `step${i}`, + status: 'completed', + output: {}, + completed_at: new Date().toISOString(), + }); + } + + // Assert at least 3 events + expect(tracker).toHaveReceivedAtLeast('step:completed', 3); + // Also works with lower numbers + expect(tracker).toHaveReceivedAtLeast('step:completed', 1); + }); + }); + + describe('toHaveReceivedEventCount', () => { + test('asserts exact event count', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'step:started', + run_id: 'test-run', + step_slug: 'step1', + status: 'started', + started_at: new Date().toISOString(), + }); + + tracker.callback({ + event_type: 'step:started', + run_id: 'test-run', + step_slug: 'step2', + status: 'started', + started_at: new Date().toISOString(), + }); + + // Assert exactly 2 events + expect(tracker).toHaveReceivedEventCount('step:started', 2); + }); + }); + + describe('toHaveReceivedInOrder', () => { + test('asserts relative ordering', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: {}, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + tracker.callback({ + event_type: 'run:completed', + run_id: 'test-run', + flow_slug: 'test-flow', + status: 'completed', + output: {}, + completed_at: new Date().toISOString(), + remaining_steps: 0, + }); + + // Assert started comes before completed + expect(tracker).toHaveReceivedInOrder('run:started', 'run:completed'); + }); + }); + + describe('toNotHaveReceivedEvent', () => { + test('asserts event was not received', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: {}, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + // Assert failed event was not received + expect(tracker).toNotHaveReceivedEvent('run:failed'); + }); + }); + + describe('toHaveReceivedTotalEvents', () => { + test('asserts total event count', () => { + const tracker = createEventTracker(); + + tracker.callback({ + event_type: 'run:started', + run_id: 'test-run', + flow_slug: 'test-flow', + input: {}, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + tracker.callback({ + event_type: 'run:completed', + run_id: 'test-run', + flow_slug: 'test-flow', + status: 'completed', + output: {}, + completed_at: new Date().toISOString(), + remaining_steps: 0, + }); + + // Assert exactly 2 total events + expect(tracker).toHaveReceivedTotalEvents(2); + }); + }); + + describe('Real-world pattern: Complete flow lifecycle', () => { + test('validates complete event sequence with payloads', () => { + const tracker = createEventTracker(); + const runId = 'test-run-123'; + const flowSlug = 'my-flow'; + const input = { user_id: '456' }; + const output = { result: 'success' }; + + // Simulate run lifecycle + tracker.callback({ + event_type: 'run:started', + run_id: runId, + flow_slug: flowSlug, + input, + status: 'started', + started_at: new Date().toISOString(), + remaining_steps: 1, + }); + + tracker.callback({ + event_type: 'run:completed', + run_id: runId, + flow_slug: flowSlug, + status: 'completed', + output, + completed_at: new Date().toISOString(), + remaining_steps: 0, + }); + + // Comprehensive assertions + expect(tracker).toHaveReceivedTotalEvents(2); + expect(tracker).toHaveReceivedEventSequence(['run:started', 'run:completed']); + expect(tracker).toHaveReceivedEvent('run:started', { run_id: runId, input }); + expect(tracker).toHaveReceivedEvent('run:completed', { run_id: runId, output }); + expect(tracker).toNotHaveReceivedEvent('run:failed'); + expect(tracker).toHaveReceivedInOrder('run:started', 'run:completed'); + }); + }); + + describe('Combining with tracker query methods', () => { + test('can mix matchers with manual queries for complex assertions', () => { + const tracker = createEventTracker(); + + // Emit multiple events + tracker.callback({ + event_type: 'step:started', + run_id: 'test-run', + step_slug: 'step1', + status: 'started', + started_at: new Date().toISOString(), + }); + + tracker.callback({ + event_type: 'step:completed', + run_id: 'test-run', + step_slug: 'step1', + status: 'completed', + output: { items: 5 }, + completed_at: new Date().toISOString(), + }); + + // Use matchers for standard assertions + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + + // Use query methods for complex logic + const completedEvents = tracker.findByType('step:completed'); + expect(completedEvents).toHaveLength(1); + expect(completedEvents[0].output).toEqual({ items: 5 }); + + // Can also use query methods in custom assertions + const firstEvent = tracker.getFirstByType('step:started'); + expect(firstEvent?.step_slug).toBe('step1'); + }); + }); +}); diff --git a/pkgs/client/__tests__/helpers/event-matchers.ts b/pkgs/client/__tests__/helpers/event-matchers.ts new file mode 100644 index 000000000..f2683e1d7 --- /dev/null +++ b/pkgs/client/__tests__/helpers/event-matchers.ts @@ -0,0 +1,333 @@ +import { expect } from 'vitest'; +import type { EventTracker } from './test-utils'; + +/** + * Custom Vitest matchers for event assertions + * + * These matchers provide clean, standard Vitest-style assertions for event testing. + * They separate concerns: trackers collect data, matchers make assertions. + * + * Usage: + * expect(tracker).toHaveReceivedEvent('run:started', { run_id: RUN_ID }); + * expect(tracker).toHaveReceivedEventSequence(['run:started', 'run:completed']); + * expect(tracker).not.toHaveReceivedEvent('run:failed'); + */ + +interface MatcherResult { + pass: boolean; + message: () => string; + actual?: unknown; + expected?: unknown; +} + +export const eventMatchers = { + /** + * Assert that tracker received a specific event type, optionally with matching payload + */ + toHaveReceivedEvent( + tracker: EventTracker, + eventType: string, + payload?: Partial + ): MatcherResult { + const events = tracker.findByType(eventType); + const event = events[0]; + + if (!event) { + return { + pass: false, + message: () => { + const summary = tracker.getSummary(); + return [ + `Expected to receive event "${eventType}" but did not`, + '', + 'Received events:', + ...Object.entries(summary.breakdown).map( + ([type, count]) => ` - ${type}: ${count}x` + ), + ].join('\n'); + }, + }; + } + + if (payload) { + // Check if payload matches + const mismatches: string[] = []; + for (const [key, expectedValue] of Object.entries(payload)) { + const actualValue = event[key as keyof T]; + if (JSON.stringify(actualValue) !== JSON.stringify(expectedValue)) { + mismatches.push( + ` - ${key}: expected ${JSON.stringify(expectedValue)}, got ${JSON.stringify(actualValue)}` + ); + } + } + + if (mismatches.length > 0) { + return { + pass: false, + message: () => + [ + `Event "${eventType}" payload does not match expected:`, + '', + ...mismatches, + '', + 'Full event:', + JSON.stringify(event, null, 2), + ].join('\n'), + }; + } + } + + return { + pass: true, + message: () => `Expected not to receive event "${eventType}"${payload ? ' with matching payload' : ''}`, + }; + }, + + /** + * Assert exact event sequence (no gaps, exact order) + */ + toHaveReceivedEventSequence( + tracker: EventTracker, + expectedTypes: string[] + ): MatcherResult { + const actual = tracker.getSequence(); + const pass = JSON.stringify(actual) === JSON.stringify(expectedTypes); + + return { + pass, + message: () => + pass + ? `Expected not to receive sequence:\n [${expectedTypes.join(', ')}]` + : [ + 'Expected event sequence:', + ` [${expectedTypes.join(', ')}]`, + '', + 'But received:', + ` [${actual.join(', ')}]`, + '', + 'Diff:', + ...expectedTypes.map((expected, i) => { + const actualType = actual[i]; + if (actualType === expected) { + return ` ✓ [${i}] ${expected}`; + } else { + return ` ✗ [${i}] expected "${expected}", got "${actualType || 'nothing'}"`; + } + }), + ].join('\n'), + }; + }, + + /** + * Assert subsequence exists (order matters, but gaps allowed) + */ + toHaveReceivedEventSubsequence( + tracker: EventTracker, + expectedTypes: string[] + ): MatcherResult { + const sequence = tracker.getSequence(); + let matchIndex = 0; + const matchedIndices: number[] = []; + + for (let i = 0; i < sequence.length; i++) { + if (sequence[i] === expectedTypes[matchIndex]) { + matchedIndices.push(i); + matchIndex++; + if (matchIndex === expectedTypes.length) break; + } + } + + const pass = matchIndex === expectedTypes.length; + + return { + pass, + message: () => + pass + ? `Expected not to receive subsequence:\n [${expectedTypes.join(', ')}]` + : [ + 'Expected event subsequence:', + ` [${expectedTypes.join(', ')}]`, + '', + 'In sequence:', + ` [${sequence.join(', ')}]`, + '', + `Matched ${matchIndex} of ${expectedTypes.length} events`, + matchIndex > 0 ? `Matched at indices: [${matchedIndices.join(', ')}]` : '', + ] + .filter(Boolean) + .join('\n'), + }; + }, + + /** + * Assert minimum count of events by type + */ + toHaveReceivedAtLeast( + tracker: EventTracker, + eventType: string, + minCount: number + ): MatcherResult { + const count = tracker.countByType(eventType); + const pass = count >= minCount; + + return { + pass, + message: () => + pass + ? `Expected fewer than ${minCount} "${eventType}" events, but got ${count}` + : `Expected at least ${minCount} "${eventType}" events, but got ${count}`, + actual: count, + expected: minCount, + }; + }, + + /** + * Assert exact count of events by type + */ + toHaveReceivedEventCount( + tracker: EventTracker, + eventType: string, + expectedCount: number + ): MatcherResult { + const count = tracker.countByType(eventType); + const pass = count === expectedCount; + + return { + pass, + message: () => + pass + ? `Expected not to receive exactly ${expectedCount} "${eventType}" events` + : `Expected ${expectedCount} "${eventType}" events, but got ${count}`, + actual: count, + expected: expectedCount, + }; + }, + + /** + * Assert relative ordering of two event types + */ + toHaveReceivedInOrder( + tracker: EventTracker, + earlierType: string, + laterType: string + ): MatcherResult { + const sequence = tracker.getSequence(); + const earlierIndex = sequence.indexOf(earlierType); + const laterIndex = sequence.indexOf(laterType); + + if (earlierIndex === -1) { + return { + pass: false, + message: () => + `Expected to find "${earlierType}" event but it was not received\n\nSequence: [${sequence.join(', ')}]`, + }; + } + + if (laterIndex === -1) { + return { + pass: false, + message: () => + `Expected to find "${laterType}" event but it was not received\n\nSequence: [${sequence.join(', ')}]`, + }; + } + + const pass = earlierIndex < laterIndex; + + return { + pass, + message: () => + pass + ? `Expected "${earlierType}" not to come before "${laterType}"` + : [ + `Expected "${earlierType}" to come before "${laterType}"`, + '', + 'Sequence:', + ` [${sequence.join(', ')}]`, + '', + `"${earlierType}" at index ${earlierIndex}`, + `"${laterType}" at index ${laterIndex}`, + ].join('\n'), + }; + }, + + /** + * Assert no events of a specific type were received + */ + toNotHaveReceivedEvent( + tracker: EventTracker, + eventType: string + ): MatcherResult { + const count = tracker.countByType(eventType); + const pass = count === 0; + + return { + pass, + message: () => + pass + ? `Expected to receive "${eventType}" event but it was not received` + : `Expected not to receive "${eventType}" event, but received ${count} time(s)`, + actual: count, + expected: 0, + }; + }, + + /** + * Assert total number of events received + */ + toHaveReceivedTotalEvents( + tracker: EventTracker, + expectedTotal: number + ): MatcherResult { + const actual = tracker.events.length; + const pass = actual === expectedTotal; + + return { + pass, + message: () => { + const summary = tracker.getSummary(); + return pass + ? `Expected not to receive exactly ${expectedTotal} total events` + : [ + `Expected ${expectedTotal} total events, but got ${actual}`, + '', + 'Breakdown:', + ...Object.entries(summary.breakdown).map( + ([type, count]) => ` - ${type}: ${count}x` + ), + ].join('\n'); + }, + actual, + expected: expectedTotal, + }; + }, +}; + +// Extend Vitest's expect type +declare module 'vitest' { + interface Assertion { + toHaveReceivedEvent(eventType: string, payload?: Partial): T; + toHaveReceivedEventSequence(types: string[]): T; + toHaveReceivedEventSubsequence(types: string[]): T; + toHaveReceivedAtLeast(eventType: string, minCount: number): T; + toHaveReceivedEventCount(eventType: string, expectedCount: number): T; + toHaveReceivedInOrder(earlierType: string, laterType: string): T; + toNotHaveReceivedEvent(eventType: string): T; + toHaveReceivedTotalEvents(expectedTotal: number): T; + } + + interface AsymmetricMatchersContaining { + toHaveReceivedEvent(eventType: string, payload?: Partial): any; + toHaveReceivedEventSequence(types: string[]): any; + toHaveReceivedEventSubsequence(types: string[]): any; + toHaveReceivedAtLeast(eventType: string, minCount: number): any; + toHaveReceivedEventCount(eventType: string, expectedCount: number): any; + toHaveReceivedInOrder(earlierType: string, laterType: string): any; + toNotHaveReceivedEvent(eventType: string): any; + toHaveReceivedTotalEvents(expectedTotal: number): any; + } +} + +// Register matchers with Vitest +export function registerEventMatchers() { + expect.extend(eventMatchers); +} diff --git a/pkgs/client/__tests__/helpers/test-utils.ts b/pkgs/client/__tests__/helpers/test-utils.ts index bd5ee1f01..e5fa02a9a 100644 --- a/pkgs/client/__tests__/helpers/test-utils.ts +++ b/pkgs/client/__tests__/helpers/test-utils.ts @@ -164,6 +164,9 @@ export function mockRpcCalls( /** * Helper to track events emitted on a callback + * + * This tracker focuses on data collection and queries only. + * For assertions, use the custom matchers (toHaveReceivedEvent, etc.) */ export function createEventTracker() { const events: T[] = []; @@ -172,17 +175,67 @@ export function createEventTracker() { }); return { + // Core callback, events, + + // Query methods - return data for further use getEventTypes: () => events.map((e) => e.event_type), getLastEvent: () => events[events.length - 1], + findByType: (type: string) => events.filter((e) => e.event_type === type), + countByType: (type: string) => events.filter((e) => e.event_type === type).length, + hasEventType: (type: string) => events.some((e) => e.event_type === type), + getFirstByType: (type: string) => events.find((e) => e.event_type === type), + getSequence: () => events.map((e) => e.event_type), + + findByPayload: (matcher: Partial) => + events.filter((e) => + Object.entries(matcher).every(([key, value]) => e[key as keyof T] === value) + ), + + getEventsBetween: (startType: string, endType: string) => { + const startIndex = events.findIndex((e) => e.event_type === startType); + const endIndex = events.findIndex((e) => e.event_type === endType); + + if (startIndex === -1 || endIndex === -1) { + return []; + } + + return events.slice(startIndex + 1, endIndex); + }, + + // Utility clear: () => { events.length = 0; callback.mockClear(); }, + + // Debug helpers + debug: () => { + console.log('=== Event Tracker Debug ==='); + console.log(`Total events: ${events.length}`); + events.forEach((event, index) => { + console.log(`[${index}] ${event.event_type}:`, event); + }); + }, + + getSummary: () => { + const typeCounts = events.reduce((acc, e) => { + acc[e.event_type] = (acc[e.event_type] || 0) + 1; + return acc; + }, {} as Record); + + return { + total: events.length, + types: Object.keys(typeCounts).length, + breakdown: typeCounts, + }; + }, }; } +export type EventTracker = ReturnType>; + /** * Advances timers and flushes all pending microtasks */ diff --git a/pkgs/client/__tests__/integration/input-validation.test.ts b/pkgs/client/__tests__/integration/input-validation.test.ts new file mode 100644 index 000000000..32846a165 --- /dev/null +++ b/pkgs/client/__tests__/integration/input-validation.test.ts @@ -0,0 +1,333 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../helpers/db.js'; +import { createTestSupabaseClient } from '../helpers/setup.js'; +import { createTestFlow } from '../helpers/fixtures.js'; +import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; +import { PgflowClient } from '../../src/lib/PgflowClient.js'; +import { FlowRunStatus, FlowStepStatus } from '../../src/lib/types.js'; +import { cleanupFlow } from '../helpers/cleanup.js'; +import { PgflowSqlClient } from '@pgflow/core'; +import { readAndStart } from '../helpers/polling.js'; +import { createEventTracker } from '../helpers/test-utils.js'; + +describe('Input Validation', () => { + it( + 'rejects non-array input for root map steps before creating run', + withPgNoTransaction(async (sql) => { + // Setup: Create flow with root map step + const testFlow = createTestFlow('validation_root_map'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'map_step', + ARRAY[]::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Act & Assert: Should throw validation error + await expect( + pgflowClient.startFlow(testFlow.slug, { items: [] }) // Object, not array + ).rejects.toThrow(/has root map steps but input is not an array/); + + // Verify no run was created (validation failed before run creation) + const runs = await sql` + SELECT * FROM pgflow.runs WHERE flow_slug = ${testFlow.slug} + `; + expect(runs.length).toBe(0); + + // Verify no step states were created + const stepStates = await sql` + SELECT * FROM pgflow.step_states WHERE flow_slug = ${testFlow.slug} + `; + expect(stepStates.length).toBe(0); + + await supabaseClient.removeAllChannels(); + }), + 10000 + ); + + it( + 'accepts empty array input for root map steps', + withPgNoTransaction(async (sql) => { + // Setup: Create flow with root map step + const testFlow = createTestFlow('validation_empty_array'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'map_step', + ARRAY[]::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Act: Should succeed with empty array + const run = await pgflowClient.startFlow(testFlow.slug, []); // ✓ Valid array + + // Assert: Run was created successfully + expect(run.run_id).toBeDefined(); + expect(run.flow_slug).toBe(testFlow.slug); + + // Verify run exists in database + const runs = await sql` + SELECT * FROM pgflow.runs WHERE run_id = ${run.run_id} + `; + expect(runs.length).toBe(1); + + await supabaseClient.removeAllChannels(); + }), + 10000 + ); + + it( + 'accepts non-empty array input for root map steps', + withPgNoTransaction(async (sql) => { + // Setup: Create flow with root map step + const testFlow = createTestFlow('validation_nonempty_array'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'map_step', + ARRAY[]::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Act: Should succeed with array of items + const input = [{ id: 1 }, { id: 2 }, { id: 3 }]; + const run = await pgflowClient.startFlow(testFlow.slug, input); + + // Assert: Run was created successfully + expect(run.run_id).toBeDefined(); + expect(run.flow_slug).toBe(testFlow.slug); + expect(run.input).toEqual(input); + + // Verify run exists in database with correct input + const runs = await sql` + SELECT * FROM pgflow.runs WHERE run_id = ${run.run_id} + `; + expect(runs.length).toBe(1); + expect(runs[0].input).toEqual(input); + + await supabaseClient.removeAllChannels(); + }), + 10000 + ); + + it( + 'accepts object input for flows without root map steps', + withPgNoTransaction(async (sql) => { + // Setup: Create flow with regular (non-map) root step + const testFlow = createTestFlow('validation_single_step'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'regular_step')`; // Single step + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Act: Should succeed with object input (no root map validation) + const input = { items: [], foo: 'bar' }; + const run = await pgflowClient.startFlow(testFlow.slug, input); + + // Assert: Run was created successfully + expect(run.run_id).toBeDefined(); + expect(run.flow_slug).toBe(testFlow.slug); + expect(run.input).toEqual(input); + + // Verify run exists in database + const runs = await sql` + SELECT * FROM pgflow.runs WHERE run_id = ${run.run_id} + `; + expect(runs.length).toBe(1); + + await supabaseClient.removeAllChannels(); + }), + 10000 + ); + + // ========================================================================= + // Dependent Map Type Validation Tests + // ========================================================================= + + it( + 'CRITICAL: broadcasts step:failed and run:failed when dependency produces non-array for dependent map', + withPgNoTransaction(async (sql) => { + // This test verifies that type violations for dependent maps + // broadcast failure events (currently they do NOT - this is a bug) + + // Setup: Create flow with single step -> dependent map step + const testFlow = createTestFlow('dep_map_type_err'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'producer')`; // Single step + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'consumer_map', + ARRAY['producer']::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; // Dependent map step + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test' }); + + // Track events + const runTracker = createEventTracker(); + const stepTracker = createEventTracker(); + run.on('*', runTracker.callback); + run.step('producer').on('*', stepTracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Execute the producer step + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Complete with INVALID output (object instead of array) + const invalidOutput = { items: [1, 2, 3] }; // Object, not array! + await sqlClient.completeTask(tasks[0], invalidOutput); + + // Wait for events to propagate + await new Promise(resolve => setTimeout(resolve, 500)); + + // CRITICAL ASSERTIONS: Verify failure events WERE broadcast + // These will FAIL until complete_task.sql is fixed to send events! + + // Should receive step:failed event for the producer + expect(stepTracker).toHaveReceivedEvent('step:failed', { + run_id: run.run_id, + step_slug: 'producer', + status: FlowStepStatus.Failed, + }); + + // Should receive run:failed event + expect(runTracker).toHaveReceivedEvent('run:failed', { + run_id: run.run_id, + flow_slug: testFlow.slug, + status: FlowRunStatus.Failed, + }); + + // Verify database state (this should pass - state is updated correctly) + const runState = await sql` + SELECT status, failed_at FROM pgflow.runs WHERE run_id = ${run.run_id} + `; + expect(runState[0].status).toBe('failed'); + expect(runState[0].failed_at).not.toBeNull(); + + const stepState = await sql` + SELECT status, failed_at, error_message + FROM pgflow.step_states + WHERE run_id = ${run.run_id} AND step_slug = 'producer' + `; + expect(stepState[0].status).toBe('failed'); + expect(stepState[0].failed_at).not.toBeNull(); + expect(stepState[0].error_message).toMatch(/TYPE_VIOLATION/); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'accepts array output from dependency for dependent map step', + withPgNoTransaction(async (sql) => { + // Setup: Create flow with single step -> dependent map step + const testFlow = createTestFlow('dep_map_ok'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'producer')`; + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'consumer_map', + ARRAY['producer']::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test' }); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Execute the producer step + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Complete with VALID output (array) + const validOutput = [{ id: 1 }, { id: 2 }, { id: 3 }]; + await sqlClient.completeTask(tasks[0], validOutput); + + // Wait for step to complete + await run.step('producer').waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + // Verify step completed successfully (not failed) + const stepState = await sql` + SELECT status, failed_at, error_message + FROM pgflow.step_states + WHERE run_id = ${run.run_id} AND step_slug = 'producer' + `; + expect(stepState[0].status).toBe('completed'); + expect(stepState[0].failed_at).toBeNull(); + expect(stepState[0].error_message).toBeNull(); + + // Verify dependent map step has correct initial_tasks + const consumerMapState = await sql` + SELECT status, initial_tasks + FROM pgflow.step_states + WHERE run_id = ${run.run_id} AND step_slug = 'consumer_map' + `; + expect(consumerMapState[0].initial_tasks).toBe(3); // Array length + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); +}); diff --git a/pkgs/client/__tests__/integration/real-flow-execution.test.ts b/pkgs/client/__tests__/integration/real-flow-execution.test.ts index 6d40098d7..7a650248c 100644 --- a/pkgs/client/__tests__/integration/real-flow-execution.test.ts +++ b/pkgs/client/__tests__/integration/real-flow-execution.test.ts @@ -8,6 +8,7 @@ import { FlowRunStatus, FlowStepStatus } from '../../src/lib/types.js'; import { PgflowSqlClient } from '@pgflow/core'; import { readAndStart } from '../helpers/polling.js'; import { cleanupFlow } from '../helpers/cleanup.js'; +import { createEventTracker } from '../helpers/test-utils.js'; describe('Real Flow Execution', () => { it( @@ -83,10 +84,10 @@ describe('Real Flow Execution', () => { withPgNoTransaction(async (sql) => { // Create test flow and step const testFlow = createTestFlow('event_flow'); - + // Clean up flow data to ensure clean state await cleanupFlow(sql, testFlow.slug); - + // Grant minimal permissions for PgflowClient API access await grantMinimalPgflowPermissions(sql); await sql`SELECT pgflow.create_flow(${testFlow.slug})`; @@ -102,13 +103,13 @@ describe('Real Flow Execution', () => { const input = { foo: 'bar' }; const run = await pgflowClient.startFlow(testFlow.slug, input); - // Track events without logging - let runEventCount = 0; - let stepEventCount = 0; - - run.on('*', () => { runEventCount++; }); + // Track events with EventTracker + const runTracker = createEventTracker(); + const stepTracker = createEventTracker(); + + run.on('*', runTracker.callback); const step = run.step('event_step'); - step.on('*', () => { stepEventCount++; }); + step.on('*', stepTracker.callback); // Give realtime subscription time to establish await new Promise(resolve => setTimeout(resolve, 100)); @@ -121,12 +122,187 @@ describe('Real Flow Execution', () => { await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 15000 }); await run.waitForStatus(FlowRunStatus.Completed, { timeoutMs: 15000 }); - // Verify events were received - expect(runEventCount).toBeGreaterThan(0); - expect(stepEventCount).toBeGreaterThan(0); + // Verify run events with payload matching + expect(runTracker).toHaveReceivedEvent('run:completed', { + run_id: run.run_id, + flow_slug: testFlow.slug, + status: FlowRunStatus.Completed, + }); + + // Verify step events with payload matching + expect(stepTracker).toHaveReceivedEvent('step:completed', { + run_id: run.run_id, + step_slug: 'event_step', + status: FlowStepStatus.Completed, + output: { hello: 'world' }, + }); + + // Verify we received exactly the expected events + expect(runTracker).toHaveReceivedEventCount('run:completed', 1); + expect(stepTracker).toHaveReceivedEventCount('step:completed', 1); await supabaseClient.removeAllChannels(); }), 10000 ); + + it( + 'CRITICAL: broadcasts step:started events (CTE optimization bug check)', + withPgNoTransaction(async (sql) => { + // This test specifically verifies that step:started events ARE broadcast + // It SHOULD FAIL until the CTE optimization bug is fixed in start_ready_steps() + + const testFlow = createTestFlow('started_event_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'test_step')`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('test_step'); + + // Track ALL step events with event matchers + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Execute the step - this calls start_ready_steps() which should broadcast step:started + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + + // Wait a moment for broadcast to propagate + await new Promise(resolve => setTimeout(resolve, 500)); + + // Complete the task + await sqlClient.completeTask(tasks[0], { result: 'done' }); + await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + // CRITICAL ASSERTIONS: Verify step:started WAS broadcast + // These will FAIL with the current CTE optimization bug! + expect(tracker).toHaveReceivedEvent('step:started', { + run_id: run.run_id, + step_slug: 'test_step', + status: FlowStepStatus.Started, + }); + + // Verify proper event sequence + expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']); + expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed'); + + // Verify both events were received + expect(tracker).toHaveReceivedEventCount('step:started', 1); + expect(tracker).toHaveReceivedEventCount('step:completed', 1); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'empty map steps: skip step:started and go straight to step:completed', + withPgNoTransaction(async (sql) => { + // This test verifies the EXPECTED behavior for empty map steps + // They should NOT send step:started, only step:completed + + const testFlow = createTestFlow('empty_map_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + // Create a map step (will complete immediately with empty array input) + await sql`SELECT pgflow.add_step( + ${testFlow.slug}, + 'empty_map_step', + ARRAY[]::text[], + NULL, + NULL, + NULL, + NULL, + 'map' + )`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + // Start flow with empty array directly (root map steps expect array input) + const run = await pgflowClient.startFlow(testFlow.slug, []); + const step = run.step('empty_map_step'); + + // Track events + const tracker = createEventTracker(); + step.on('*', tracker.callback); + + // Give realtime time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Wait for step to complete (should happen immediately) + await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + // Verify NO step:started event (expected for empty maps) + expect(tracker).toNotHaveReceivedEvent('step:started'); + + // Verify step:completed was sent + expect(tracker).toHaveReceivedEvent('step:completed', { + run_id: run.run_id, + step_slug: 'empty_map_step', + status: FlowStepStatus.Completed, + }); + + // Verify only 1 event total + expect(tracker).toHaveReceivedTotalEvents(1); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'waitForStatus(Started): waits for step to reach Started status', + withPgNoTransaction(async (sql) => { + // This test verifies that waitForStatus works correctly for Started status + // Note: Root steps (no dependencies) are started immediately by start_flow() + + const testFlow = createTestFlow('wait_started_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'test_step')`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); + const step = run.step('test_step'); + + // Root steps are started immediately - verify step is in Started status + expect(step.status).toBe(FlowStepStatus.Started); + expect(step.started_at).toBeDefined(); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // waitForStatus should resolve immediately since step is already Started + const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 }); + const result = await waitPromise; + expect(result).toBe(step); + expect(step.status).toBe(FlowStepStatus.Started); + + // Poll for task to ensure step execution is progressing + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Complete the task for cleanup + await sqlClient.completeTask(tasks[0], { result: 'done' }); + await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); }); diff --git a/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts index 2229404ab..720577a78 100644 --- a/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts +++ b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts @@ -1,92 +1,82 @@ import { describe, it, expect } from 'vitest'; import { withPgNoTransaction } from '../../helpers/db.js'; import { createTestSupabaseClient } from '../../helpers/setup.js'; +import { createTestFlow } from '../../helpers/fixtures.js'; import { grantMinimalPgflowPermissions } from '../../helpers/permissions.js'; +import { cleanupFlow } from '../../helpers/cleanup.js'; import { PgflowClient } from '../../../src/lib/PgflowClient.js'; +import { PgflowSqlClient } from '@pgflow/core'; +import { readAndStart } from '../../helpers/polling.js'; +import { createEventTracker } from '../../helpers/test-utils.js'; +import { FlowStepStatus, FlowRunStatus } from '../../../src/lib/types.js'; describe('Step Failed Event Broadcasting', () => { it( 'should broadcast step:failed event when a step fails permanently', withPgNoTransaction(async (sql) => { - // Grant minimal permissions + // Setup test flow + const testFlow = createTestFlow('step_failed_bug_test'); + await cleanupFlow(sql, testFlow.slug); await grantMinimalPgflowPermissions(sql); - // Create test flow with max_attempts = 1 to fail immediately - await sql`SELECT pgflow.create_flow('step_failed_bug_test', max_attempts => 1)`; - await sql`SELECT pgflow.add_step('step_failed_bug_test', 'failing_step')`; + // Create flow with max_attempts = 1 to fail immediately on first failure + await sql`SELECT pgflow.create_flow(${testFlow.slug}, max_attempts => 1)`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'failing_step')`; // Create clients const supabaseClient = createTestSupabaseClient(); const pgflowClient = new PgflowClient(supabaseClient); + const sqlClient = new PgflowSqlClient(sql); // Start the flow - const [{ run_id: runId }] = await sql` - SELECT * FROM pgflow.start_flow('step_failed_bug_test', '{}'::jsonb) - `; - - // Start the step - await sql`SELECT pgflow.start_ready_steps(${runId})`; - - // Simulate worker processing: update task to 'started' status - await sql` - UPDATE pgflow.step_tasks - SET status = 'started', - started_at = now(), - attempts_count = 1 - WHERE run_id = ${runId} - AND step_slug = 'failing_step' - AND task_index = 0 - `; - - // Fail the task - this triggers the bug - await sql` - SELECT pgflow.fail_task( - ${runId}, - 'failing_step', - 0, - 'Step failed to demonstrate CTE optimization bug' - ) - `; - - // Check database state - const [runState] = await sql` - SELECT status FROM pgflow.runs WHERE run_id = ${runId} - `; - const [stepState] = await sql` - SELECT status FROM pgflow.step_states - WHERE run_id = ${runId} AND step_slug = 'failing_step' - `; - - // Check broadcast events - const messages = await sql` - SELECT payload->>'event_type' as event_type - FROM realtime.messages - WHERE topic = ${'pgflow:run:' + runId} - ORDER BY inserted_at - `; - - const eventTypes = messages.map(m => m.event_type); - - console.log('\n=== Step Failed Event Bug Test Results ==='); - console.log('Database State:'); - console.log(' Run status:', runState.status); - console.log(' Step status:', stepState.status); - console.log('\nBroadcast Events:'); - console.log(' Event types:', eventTypes); - console.log('\nBug Analysis:'); - console.log(' run:failed event broadcast?', eventTypes.includes('run:failed') ? 'YES ✓' : 'NO ✗'); - console.log(' step:failed event broadcast?', eventTypes.includes('step:failed') ? 'YES ✓' : 'NO ✗'); - + const run = await pgflowClient.startFlow(testFlow.slug, {}); + const step = run.step('failing_step'); + + // Track events with event matchers + const stepTracker = createEventTracker(); + const runTracker = createEventTracker(); + step.on('*', stepTracker.callback); + run.on('*', runTracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Poll and start the task (uses pgmq.read_with_poll and pgflow.start_tasks internally) + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Fail the task using the proper API (uses pgflow.fail_task) + await sqlClient.failTask( + tasks[0], + 'Step failed to demonstrate CTE optimization bug' + ); + + // Wait for step to reach failed status + await step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 5000 }); + await run.waitForStatus(FlowRunStatus.Failed, { timeoutMs: 5000 }); + // Verify database state is correct - expect(runState.status).toBe('failed'); - expect(stepState.status).toBe('failed'); - - // Verify run:failed was broadcast (this works) - expect(eventTypes).toContain('run:failed'); - - // Verify step:failed event is broadcast - // Regression: This fails due to PostgreSQL optimizing away the CTE that sends the event - expect(eventTypes).toContain('step:failed'); - }) + expect(step.status).toBe(FlowStepStatus.Failed); + expect(run.status).toBe(FlowRunStatus.Failed); + expect(step.error_message).toBe('Step failed to demonstrate CTE optimization bug'); + + // Verify step:failed event was broadcast using event matchers + // Regression: This would fail if the CTE optimization bug existed + expect(stepTracker).toHaveReceivedEvent('step:failed', { + run_id: run.run_id, + step_slug: 'failing_step', + status: FlowStepStatus.Failed, + error_message: 'Step failed to demonstrate CTE optimization bug', + }); + + // Verify run:failed event was broadcast + expect(runTracker).toHaveReceivedEvent('run:failed', { + run_id: run.run_id, + status: FlowRunStatus.Failed, + }); + + await supabaseClient.removeAllChannels(); + }), + 15000 ); }); \ No newline at end of file diff --git a/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts b/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts new file mode 100644 index 000000000..d04fbf789 --- /dev/null +++ b/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts @@ -0,0 +1,218 @@ +import { describe, it, expect } from 'vitest'; +import { withPgNoTransaction } from '../helpers/db.js'; +import { createTestSupabaseClient } from '../helpers/setup.js'; +import { createTestFlow } from '../helpers/fixtures.js'; +import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; +import { PgflowClient } from '../../src/lib/PgflowClient.js'; +import { FlowRunStatus, FlowStepStatus } from '../../src/lib/types.js'; +import { PgflowSqlClient } from '@pgflow/core'; +import { readAndStart } from '../helpers/polling.js'; +import { cleanupFlow } from '../helpers/cleanup.js'; +import { createEventTracker } from '../helpers/test-utils.js'; + +describe('waitForStatus - Failure Scenarios', () => { + it( + 'step.waitForStatus(Failed): waits for step to reach Failed status', + withPgNoTransaction(async (sql) => { + // Test that waitForStatus correctly waits for a step to fail + // IMPORTANT: max_attempts = 1 ensures immediate failure without retries + + const testFlow = createTestFlow('wait_failed_step_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug}, max_attempts => 1)`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'failing_step')`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'will-fail' }); + const step = run.step('failing_step'); + + // Track events to verify broadcasting + const stepTracker = createEventTracker(); + step.on('*', stepTracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Start waiting for Failed status (before the step actually fails) + const waitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 10000 }); + + // Execute the step and then fail it + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Fail the task + const errorMessage = 'Step execution failed intentionally'; + await sqlClient.failTask(tasks[0], errorMessage); + + // Wait for the step to reach Failed status + const result = await waitPromise; + expect(result).toBe(step); + expect(step.status).toBe(FlowStepStatus.Failed); + expect(step.error_message).toBe(errorMessage); + expect(step.failed_at).toBeDefined(); + + // Verify step:failed event was broadcast + expect(stepTracker).toHaveReceivedEvent('step:failed', { + run_id: run.run_id, + step_slug: 'failing_step', + status: FlowStepStatus.Failed, + error_message: errorMessage, + }); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'run.waitForStatus(Failed): waits for run to reach Failed status', + withPgNoTransaction(async (sql) => { + // Test that waitForStatus correctly waits for a run to fail + // IMPORTANT: max_attempts = 1 ensures immediate failure without retries + + const testFlow = createTestFlow('wait_failed_run_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug}, max_attempts => 1)`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'failing_step')`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'run-will-fail' }); + + // Track events to verify broadcasting + const runTracker = createEventTracker(); + run.on('*', runTracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Start waiting for Failed status (before the run actually fails) + const waitPromise = run.waitForStatus(FlowRunStatus.Failed, { timeoutMs: 10000 }); + + // Execute the step and then fail it (which will fail the run) + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + expect(tasks).toHaveLength(1); + + // Fail the task + const errorMessage = 'Run execution failed intentionally'; + await sqlClient.failTask(tasks[0], errorMessage); + + // Wait for the run to reach Failed status + const result = await waitPromise; + expect(result).toBe(run); + expect(run.status).toBe(FlowRunStatus.Failed); + expect(run.error_message).toBeDefined(); + expect(run.failed_at).toBeDefined(); + + // Verify run:failed event was broadcast + expect(runTracker).toHaveReceivedEvent('run:failed', { + run_id: run.run_id, + status: FlowRunStatus.Failed, + }); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'waitForStatus timeout: handles timeout when failure does not occur', + withPgNoTransaction(async (sql) => { + // Test that waitForStatus times out correctly when the expected failure never happens + + const testFlow = createTestFlow('timeout_test_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug})`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'normal_step')`; + + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'timeout' }); + const step = run.step('normal_step'); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Wait for Failed status with a short timeout (step will complete normally, not fail) + const waitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 2000 }); + + // Start the task but complete it successfully instead of failing + const sqlClient = new PgflowSqlClient(sql); + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + await sqlClient.completeTask(tasks[0], { result: 'success' }); + + // Wait for completion to ensure the step doesn't fail + await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 }); + + // The waitForStatus(Failed) should timeout since the step completed successfully + await expect(waitPromise).rejects.toThrow(/Timeout waiting for step/); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); + + it( + 'multiple failures: handles step failure followed by run failure', + withPgNoTransaction(async (sql) => { + // Test the complete failure lifecycle: step fails -> run fails + // IMPORTANT: max_attempts = 1 ensures immediate failure without retries + + const testFlow = createTestFlow('multi_failure_flow'); + await cleanupFlow(sql, testFlow.slug); + await grantMinimalPgflowPermissions(sql); + await sql`SELECT pgflow.create_flow(${testFlow.slug}, max_attempts => 1)`; + await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_one')`; + + const sqlClient = new PgflowSqlClient(sql); + const supabaseClient = createTestSupabaseClient(); + const pgflowClient = new PgflowClient(supabaseClient); + + const run = await pgflowClient.startFlow(testFlow.slug, { test: 'multi-fail' }); + const step = run.step('step_one'); + + // Track events for both run and step + const runTracker = createEventTracker(); + const stepTracker = createEventTracker(); + run.on('*', runTracker.callback); + step.on('*', stepTracker.callback); + + // Give realtime subscription time to establish + await new Promise(resolve => setTimeout(resolve, 100)); + + // Wait for both step and run to fail + const stepWaitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 10000 }); + const runWaitPromise = run.waitForStatus(FlowRunStatus.Failed, { timeoutMs: 10000 }); + + // Execute and fail the task + const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); + const errorMessage = 'Cascading failure test'; + await sqlClient.failTask(tasks[0], errorMessage); + + // Wait for both to fail + await Promise.all([stepWaitPromise, runWaitPromise]); + + // Verify both reached Failed status + expect(step.status).toBe(FlowStepStatus.Failed); + expect(run.status).toBe(FlowRunStatus.Failed); + + // Verify run:failed event was broadcast + expect(runTracker).toHaveReceivedEvent('run:failed'); + + // Verify step:failed event was broadcast (on step tracker, not run tracker) + expect(stepTracker).toHaveReceivedEvent('step:failed'); + + await supabaseClient.removeAllChannels(); + }), + 15000 + ); +}); diff --git a/pkgs/client/__tests__/setup.ts b/pkgs/client/__tests__/setup.ts new file mode 100644 index 000000000..b91ae9809 --- /dev/null +++ b/pkgs/client/__tests__/setup.ts @@ -0,0 +1,9 @@ +/** + * Vitest setup file - runs before all tests + * + * Registers custom matchers and performs any global test configuration + */ +import { registerEventMatchers } from './helpers/event-matchers'; + +// Register custom event matchers with Vitest +registerEventMatchers(); diff --git a/pkgs/client/vite.config.ts b/pkgs/client/vite.config.ts index db6c92020..22b7eb088 100644 --- a/pkgs/client/vite.config.ts +++ b/pkgs/client/vite.config.ts @@ -50,6 +50,7 @@ export default defineConfig({ include: [ '__tests__/**/*.{test,spec}.{js,mjs,cjs,ts,mts,cts,jsx,tsx}' ], + setupFiles: ['__tests__/setup.ts'], reporters: ['default'], coverage: { reportsDirectory: '../../coverage/pkgs/client', diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade-tests.md b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade-tests.md new file mode 100644 index 000000000..0cc9cc2a8 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade-tests.md @@ -0,0 +1,284 @@ +# Test Plan: cascade_complete_taskless_steps Event Broadcasting + +## Problem + +The `cascade_complete_taskless_steps()` function completes steps silently without broadcasting `step:completed` events. This causes: +- Empty map steps to complete without client notification +- Cascaded step completions to be invisible to realtime clients +- Test failures in `@pkgs/client` for empty map step event handling + +## Required Tests + +All tests should verify that `step:completed` events are broadcast via `realtime.send()` for steps completed by cascade. + +### 1. Empty Root Map Steps + +**Scenario:** Flow with root map step receives empty array input + +**Setup:** +```sql +-- Create flow with single root map step +SELECT pgflow.create_flow('empty_root_map'); +SELECT pgflow.add_step('empty_root_map', 'process', deps := '{}', step_type := 'map'); +``` + +**Test:** +```sql +-- Start flow with empty array +SELECT pgflow.start_flow('empty_root_map', '[]'::jsonb); + +-- Verify step completed +SELECT status FROM pgflow.step_states +WHERE step_slug = 'process'; +-- Expected: 'completed' + +-- Verify step:completed event was broadcast +SELECT * FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' + AND payload->>'step_slug' = 'process'; +-- Expected: 1 row with status='completed', output='[]' +``` + +**Current Status:** ❌ FAILING - Step completes but no event broadcast + +--- + +### 2. Dependent Map with Empty Array Propagation + +**Scenario:** Map step completes with 0 tasks, dependent map inherits empty array and cascades + +**Setup:** +```sql +-- Create flow: root_map -> dependent_map +SELECT pgflow.create_flow('cascade_map_chain'); +SELECT pgflow.add_step('cascade_map_chain', 'root_map', deps := '{}', step_type := 'map'); +SELECT pgflow.add_step('cascade_map_chain', 'dependent_map', deps := '{"root_map"}', step_type := 'map'); +``` + +**Test:** +```sql +-- Start flow with empty array +SELECT pgflow.start_flow('cascade_map_chain', '[]'::jsonb); + +-- Verify both steps completed +SELECT step_slug, status, initial_tasks FROM pgflow.step_states +WHERE flow_slug = 'cascade_map_chain' +ORDER BY step_slug; +-- Expected: +-- root_map: completed, initial_tasks=0 +-- dependent_map: completed, initial_tasks=0 + +-- Verify both step:completed events were broadcast +SELECT payload->>'step_slug' as step_slug, payload->>'status' as status +FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' +ORDER BY payload->>'step_slug'; +-- Expected: 2 rows, both with status='completed' +``` + +**Current Status:** ❌ FAILING - Both steps complete but no events broadcast + +--- + +### 3. Single Step Cascade Chain + +**Scenario:** Multiple single steps in a chain, each with 1 task + +**Setup:** +```sql +-- Create flow: A -> B -> C (all single steps) +SELECT pgflow.create_flow('single_cascade'); +SELECT pgflow.add_step('single_cascade', 'step_a'); +SELECT pgflow.add_step('single_cascade', 'step_b', deps := '{"step_a"}'); +SELECT pgflow.add_step('single_cascade', 'step_c', deps := '{"step_b"}'); +``` + +**Test:** +```sql +-- Start flow +SELECT pgflow.start_flow('single_cascade', '{}'::jsonb); + +-- Complete step_a task +-- This should trigger cascade for step_b and step_c +SELECT pgflow.complete_task(run_id, 'step_a', 0, '{"result": "a"}'::jsonb) +FROM pgflow.runs WHERE flow_slug = 'single_cascade'; + +-- Verify step_a broadcasts event (from complete_task, not cascade) +SELECT COUNT(*) FROM pgflow_realtime.messages +WHERE payload->>'step_slug' = 'step_a' + AND payload->>'event_type' = 'step:completed'; +-- Expected: 1 + +-- Complete step_b task +-- This should trigger cascade for step_c +SELECT pgflow.complete_task(run_id, 'step_b', 0, '{"result": "b"}'::jsonb) +FROM pgflow.runs WHERE flow_slug = 'single_cascade'; + +-- Complete step_c task +SELECT pgflow.complete_task(run_id, 'step_c', 0, '{"result": "c"}'::jsonb) +FROM pgflow.runs WHERE flow_slug = 'single_cascade'; + +-- Verify all three step:completed events were broadcast +SELECT payload->>'step_slug' as step_slug +FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' +ORDER BY id; +-- Expected: step_a, step_b, step_c +``` + +**Note:** This test verifies normal task completion broadcasts work (baseline) + +--- + +### 4. Mixed Chain with Cascade + +**Scenario:** Single step -> empty map -> dependent empty map + +**Setup:** +```sql +-- Create flow: producer -> map1 -> map2 +SELECT pgflow.create_flow('mixed_cascade'); +SELECT pgflow.add_step('mixed_cascade', 'producer'); +SELECT pgflow.add_step('mixed_cascade', 'map1', deps := '{"producer"}', step_type := 'map'); +SELECT pgflow.add_step('mixed_cascade', 'map2', deps := '{"map1"}', step_type := 'map'); +``` + +**Test:** +```sql +-- Start flow +SELECT pgflow.start_flow('mixed_cascade', '{}'::jsonb); + +-- Complete producer task with empty array output +SELECT pgflow.complete_task(run_id, 'producer', 0, '[]'::jsonb) +FROM pgflow.runs WHERE flow_slug = 'mixed_cascade'; + +-- Verify map1 and map2 cascade-completed +SELECT step_slug, status, initial_tasks FROM pgflow.step_states +WHERE flow_slug = 'mixed_cascade' +ORDER BY step_slug; +-- Expected: +-- producer: completed, initial_tasks=1 +-- map1: completed, initial_tasks=0 +-- map2: completed, initial_tasks=0 + +-- Verify all three step:completed events were broadcast +SELECT payload->>'step_slug' as step_slug, payload->>'output' as output +FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' +ORDER BY id; +-- Expected: +-- producer: {"output": []} +-- map1: {"output": []} +-- map2: {"output": []} +``` + +**Current Status:** ❌ FAILING - map1 and map2 complete but no events broadcast + +--- + +### 5. Multiple Iterations Cascade + +**Scenario:** Deep chain requiring multiple cascade iterations + +**Setup:** +```sql +-- Create flow: root_map -> m1 -> m2 -> m3 -> m4 (all maps) +SELECT pgflow.create_flow('deep_cascade'); +SELECT pgflow.add_step('deep_cascade', 'root_map', deps := '{}', step_type := 'map'); +SELECT pgflow.add_step('deep_cascade', 'm1', deps := '{"root_map"}', step_type := 'map'); +SELECT pgflow.add_step('deep_cascade', 'm2', deps := '{"m1"}', step_type := 'map'); +SELECT pgflow.add_step('deep_cascade', 'm3', deps := '{"m2"}', step_type := 'map'); +SELECT pgflow.add_step('deep_cascade', 'm4', deps := '{"m3"}', step_type := 'map'); +``` + +**Test:** +```sql +-- Start flow with empty array +SELECT pgflow.start_flow('deep_cascade', '[]'::jsonb); + +-- Verify all steps completed +SELECT COUNT(*) FROM pgflow.step_states +WHERE flow_slug = 'deep_cascade' AND status = 'completed'; +-- Expected: 5 + +-- Verify all step:completed events were broadcast (one per step) +SELECT COUNT(*) FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' + AND payload->>'flow_slug' = 'deep_cascade'; +-- Expected: 5 + +-- Verify events broadcast in correct order +SELECT payload->>'step_slug' as step_slug +FROM pgflow_realtime.messages +WHERE payload->>'event_type' = 'step:completed' +ORDER BY id; +-- Expected: root_map, m1, m2, m3, m4 +``` + +**Current Status:** ❌ FAILING - All steps complete but no events broadcast + +--- + +### 6. Event Payload Verification + +**Scenario:** Verify event payload contains all required fields + +**Setup:** +```sql +SELECT pgflow.create_flow('payload_check'); +SELECT pgflow.add_step('payload_check', 'empty_map', deps := '{}', step_type := 'map'); +``` + +**Test:** +```sql +-- Start flow +SELECT pgflow.start_flow('payload_check', '[]'::jsonb); + +-- Verify event payload structure +SELECT + payload->>'event_type' as event_type, + payload->>'run_id' IS NOT NULL as has_run_id, + payload->>'step_slug' as step_slug, + payload->>'status' as status, + payload->>'started_at' IS NOT NULL as has_started_at, + payload->>'completed_at' IS NOT NULL as has_completed_at, + payload->>'output' as output +FROM pgflow_realtime.messages +WHERE payload->>'step_slug' = 'empty_map'; +-- Expected: +-- event_type: 'step:completed' +-- has_run_id: true +-- step_slug: 'empty_map' +-- status: 'completed' +-- has_started_at: true +-- has_completed_at: true +-- output: '[]' +``` + +**Current Status:** ❌ FAILING - No event to verify + +--- + +## Implementation Notes + +1. **Realtime Message Capture:** Tests need to query the realtime messages table to verify broadcasts. The table name may vary based on schema setup. + +2. **Event Ordering:** For cascade chains, events should be broadcast in topological order (iteration order). + +3. **Output Aggregation:** For map steps, the `output` field should be an aggregated array of all task outputs (empty array for 0 tasks). + +4. **Timing:** Events should be broadcast within the same transaction as the step completion to ensure consistency. + +## Success Criteria + +- ✅ All empty map steps broadcast `step:completed` events +- ✅ All cascade-completed steps broadcast events +- ✅ Event payloads contain correct data (status, timestamps, output) +- ✅ Events broadcast in correct order (topological) +- ✅ Client integration tests pass for empty map steps + +## Related Files + +- Implementation: `pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql` +- Client tests: `pkgs/client/__tests__/integration/real-flow-execution.test.ts` +- Related issue: CTE optimization bug preventing `step:started` broadcasts diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql new file mode 100644 index 000000000..935bec970 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql @@ -0,0 +1,78 @@ +begin; +select plan(8); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow with single empty map +select pgflow_tests.reset_db(); +select pgflow.create_flow('payload_check'); +select pgflow.add_step('payload_check', 'empty_map', step_type => 'map'); + +-- Start flow with empty array +with flow as ( + select * from pgflow.start_flow('payload_check', '[]'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify event_type field +select is( + (select payload->>'event_type' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'step:completed', + 'Event payload should have event_type = "step:completed"' +); + +-- Test 2: Verify run_id field exists and matches +select is( + (select payload->>'run_id' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + (select run_id::text from run_ids), + 'Event payload should have correct run_id' +); + +-- Test 3: Verify step_slug field +select is( + (select payload->>'step_slug' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'empty_map', + 'Event payload should have correct step_slug' +); + +-- Test 4: Verify status field +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'completed', + 'Event payload should have status = "completed"' +); + +-- Test 5: Verify started_at field exists and is a valid timestamp +select ok( + (select (payload->>'started_at')::timestamptz is not null + from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'Event payload should have a valid started_at timestamp' +); + +-- Test 6: Verify completed_at field exists and is a valid timestamp +select ok( + (select (payload->>'completed_at')::timestamptz is not null + from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'Event payload should have a valid completed_at timestamp' +); + +-- Test 7: Verify output field has empty array +select is( + (select payload->'output'::text from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + '[]', + 'Event payload should have output = []' +); + +-- Test 8: Verify event name formatting (step::completed) +select is( + (select event from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'empty_map')), + 'step:empty_map:completed', + 'Event should have correct event name format (step::completed)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql new file mode 100644 index 000000000..a4f6dba5a --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql @@ -0,0 +1,63 @@ +begin; +select plan(4); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow: root_map -> m1 -> m2 -> m3 -> m4 (all maps) +select pgflow_tests.reset_db(); +select pgflow.create_flow('deep_cascade'); +select pgflow.add_step('deep_cascade', 'root_map', step_type => 'map'); +select pgflow.add_step('deep_cascade', 'm1', deps_slugs => ARRAY['root_map'], step_type => 'map'); +select pgflow.add_step('deep_cascade', 'm2', deps_slugs => ARRAY['m1'], step_type => 'map'); +select pgflow.add_step('deep_cascade', 'm3', deps_slugs => ARRAY['m2'], step_type => 'map'); +select pgflow.add_step('deep_cascade', 'm4', deps_slugs => ARRAY['m3'], step_type => 'map'); + +-- Start flow with empty array (should cascade-complete all 5 steps) +with flow as ( + select * from pgflow.start_flow('deep_cascade', '[]'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify all steps completed +select is( + (select count(*) from pgflow.step_states where status = 'completed'), + 5::bigint, + 'All 5 map steps should be completed via cascade' +); + +-- Test 2: Verify all step:completed events were broadcast (one per step) +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'run_id' = (select run_id::text from run_ids)), + 5::bigint, + 'Should broadcast 5 step:completed events (one for each step)' +); + +-- Test 3: Verify all events have status=completed +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'status' = 'completed' + and payload->>'run_id' = (select run_id::text from run_ids)), + 5::bigint, + 'All step:completed events should have status "completed"' +); + +-- Test 4: Verify events broadcast in correct topological order +select results_eq( + $$ SELECT payload->>'step_slug' + FROM realtime.messages + WHERE payload->>'event_type' = 'step:completed' + AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) + ORDER BY id $$, + $$ VALUES ('root_map'), ('m1'), ('m2'), ('m3'), ('m4') $$, + 'Events should be broadcast in topological order: root_map, m1, m2, m3, m4' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql new file mode 100644 index 000000000..24940e3f7 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql @@ -0,0 +1,72 @@ +begin; +select plan(6); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow: root_map -> dependent_map +select pgflow_tests.reset_db(); +select pgflow.create_flow('cascade_map_chain'); +select pgflow.add_step('cascade_map_chain', 'root_map', step_type => 'map'); +select pgflow.add_step('cascade_map_chain', 'dependent_map', deps_slugs => ARRAY['root_map'], step_type => 'map'); + +-- Start flow with empty array (should cascade-complete both maps) +with flow as ( + select * from pgflow.start_flow('cascade_map_chain', '[]'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify both steps completed +select is( + (select count(*) from pgflow.step_states where status = 'completed'), + 2::bigint, + 'Both root_map and dependent_map should be completed' +); + +-- Test 2: Verify both have initial_tasks=0 +select results_eq( + $$ SELECT step_slug, initial_tasks FROM pgflow.step_states ORDER BY step_slug $$, + $$ VALUES ('dependent_map', 0), ('root_map', 0) $$, + 'Both maps should have initial_tasks=0' +); + +-- Test 3: Verify root_map step:completed event was broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'root_map'), + 1::int, + 'Root map should broadcast step:completed event' +); + +-- Test 4: Verify dependent_map step:completed event was broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'dependent_map'), + 1::int, + 'Dependent map should broadcast step:completed event via cascade' +); + +-- Test 5: Verify both events have status=completed +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'status' = 'completed' + and payload->>'run_id' = (select run_id::text from run_ids)), + 2::bigint, + 'Both step:completed events should have status "completed"' +); + +-- Test 6: Verify events broadcast in correct order (root_map first, then dependent_map) +select results_eq( + $$ SELECT payload->>'step_slug' + FROM realtime.messages + WHERE payload->>'event_type' = 'step:completed' + AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) + ORDER BY id $$, + $$ VALUES ('root_map'), ('dependent_map') $$, + 'Events should be broadcast in topological order: root_map, then dependent_map' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql new file mode 100644 index 000000000..f011c4e67 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql @@ -0,0 +1,50 @@ +begin; +select plan(4); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow with single root map step +select pgflow_tests.reset_db(); +select pgflow.create_flow('empty_root_map'); +select pgflow.add_step('empty_root_map', 'process', step_type => 'map'); + +-- Start flow with empty array (should cascade-complete the map step) +with flow as ( + select * from pgflow.start_flow('empty_root_map', '[]'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify step completed in database +select is( + (select status from pgflow.step_states where step_slug = 'process'), + 'completed', + 'Empty root map step should be completed' +); + +-- Test 2: Verify step:completed event was broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'process'), + 1::int, + 'Empty root map should broadcast step:completed event via cascade' +); + +-- Test 3: Verify event has correct status +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'process')), + 'completed', + 'Cascade event should have status "completed"' +); + +-- Test 4: Verify event has empty array output +select is( + (select payload->'output'::text from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'process')), + '[]', + 'Cascade event should have empty array output' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql new file mode 100644 index 000000000..c4dcfceb8 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql @@ -0,0 +1,89 @@ +begin; +select plan(7); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow: producer -> map1 -> map2 +select pgflow_tests.reset_db(); +select pgflow.create_flow('mixed_cascade'); +select pgflow.add_step('mixed_cascade', 'producer'); +select pgflow.add_step('mixed_cascade', 'map1', deps_slugs => ARRAY['producer'], step_type => 'map'); +select pgflow.add_step('mixed_cascade', 'map2', deps_slugs => ARRAY['map1'], step_type => 'map'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('mixed_cascade', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start and complete producer task with empty array output (triggers cascade for map1 and map2) +select pgflow_tests.read_and_start('mixed_cascade', 1, 1); +select pgflow.complete_task( + (select run_id from run_ids), + 'producer', + 0, + '[]'::jsonb +); + +-- Test 1: Verify all steps completed +select results_eq( + $$ SELECT step_slug, status FROM pgflow.step_states ORDER BY step_slug $$, + $$ VALUES ('map1', 'completed'), ('map2', 'completed'), ('producer', 'completed') $$, + 'All three steps should be completed' +); + +-- Test 2: Verify initial_tasks counts +select results_eq( + $$ SELECT step_slug, initial_tasks FROM pgflow.step_states ORDER BY step_slug $$, + $$ VALUES ('map1', 0), ('map2', 0), ('producer', 1) $$, + 'Maps should have initial_tasks=0, producer should have initial_tasks=1' +); + +-- Test 3: Verify producer event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'producer'), + 1::int, + 'Producer should broadcast step:completed event (from complete_task)' +); + +-- Test 4: Verify map1 event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'map1'), + 1::int, + 'Map1 should broadcast step:completed event (from cascade)' +); + +-- Test 5: Verify map2 event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'map2'), + 1::int, + 'Map2 should broadcast step:completed event (from cascade)' +); + +-- Test 6: Verify all events have empty array output +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->'output'::text = '[]' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'All three step:completed events should have empty array output' +); + +-- Test 7: Verify events broadcast in correct order +select results_eq( + $$ SELECT payload->>'step_slug' + FROM realtime.messages + WHERE payload->>'event_type' = 'step:completed' + AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) + ORDER BY id $$, + $$ VALUES ('producer'), ('map1'), ('map2') $$, + 'Events should be broadcast in topological order: producer, map1, map2' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql new file mode 100644 index 000000000..880b981fc --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql @@ -0,0 +1,83 @@ +begin; +select plan(4); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow: A -> B -> C (all single steps) +select pgflow_tests.reset_db(); +select pgflow.create_flow('single_cascade'); +select pgflow.add_step('single_cascade', 'step_a'); +select pgflow.add_step('single_cascade', 'step_b', deps_slugs => ARRAY['step_a']); +select pgflow.add_step('single_cascade', 'step_c', deps_slugs => ARRAY['step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('single_cascade', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start and complete step_a task +select pgflow_tests.read_and_start('single_cascade', 1, 1); +select pgflow.complete_task( + (select run_id from run_ids), + 'step_a', + 0, + '{"result": "a"}'::jsonb +); + +-- Start and complete step_b task +select pgflow_tests.read_and_start('single_cascade', 1, 1); +select pgflow.complete_task( + (select run_id from run_ids), + 'step_b', + 0, + '{"result": "b"}'::jsonb +); + +-- Start and complete step_c task +select pgflow_tests.read_and_start('single_cascade', 1, 1); +select pgflow.complete_task( + (select run_id from run_ids), + 'step_c', + 0, + '{"result": "c"}'::jsonb +); + +-- Test 1: Verify step_a event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'step_a'), + 1::int, + 'Step A should broadcast step:completed event (from complete_task)' +); + +-- Test 2: Verify step_b event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'step_b'), + 1::int, + 'Step B should broadcast step:completed event (from complete_task)' +); + +-- Test 3: Verify step_c event broadcast +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'step_c'), + 1::int, + 'Step C should broadcast step:completed event (from complete_task)' +); + +-- Test 4: Verify all three events broadcast in correct order +select results_eq( + $$ SELECT payload->>'step_slug' + FROM realtime.messages + WHERE payload->>'event_type' = 'step:completed' + AND payload->>'run_id' = (SELECT run_id::text FROM run_ids) + ORDER BY id $$, + $$ VALUES ('step_a'), ('step_b'), ('step_c') $$, + 'Events should be broadcast in completion order: step_a, step_b, step_c' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql b/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql new file mode 100644 index 000000000..7c5b47d2c --- /dev/null +++ b/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql @@ -0,0 +1,79 @@ +begin; +select plan(8); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database and create flow with root map step +select pgflow_tests.reset_db(); +select pgflow.create_flow('empty_map_test'); +select pgflow.add_step('empty_map_test', 'root_map', step_type => 'map'); + +-- Start flow with empty array (should complete map immediately) +with flow as ( + select * from pgflow.start_flow('empty_map_test', '[]'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify step:completed event for empty map +select is( + pgflow_tests.count_realtime_events('step:completed', (select run_id from run_ids), 'root_map'), + 1::int, + 'Empty map step should send step:completed event immediately' +); + +-- Test 2: Verify output is empty array +select is( + (select payload->'output'::text from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'root_map')), + '[]', + 'Empty map completion should have empty array output' +); + +-- Test 3: Verify status is completed +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'root_map')), + 'completed', + 'Empty map should have status "completed"' +); + +-- Test 4: Verify started_at and completed_at both exist +select ok( + (select (payload->>'started_at')::timestamptz is not null + AND (payload->>'completed_at')::timestamptz is not null + from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'root_map')), + 'Empty map should have both started_at and completed_at timestamps' +); + +-- Test 5: Verify started_at and completed_at are nearly identical (instant completion) +select ok( + (select extract(epoch from (payload->>'completed_at')::timestamptz - (payload->>'started_at')::timestamptz) < 0.1 + from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'root_map')), + 'Empty map should complete instantly (< 100ms difference between started_at and completed_at)' +); + +-- Test 6: Verify event name formatting +select is( + (select event from pgflow_tests.get_realtime_message('step:completed', (select run_id from run_ids), 'root_map')), + 'step:root_map:completed', + 'The step:completed event should have the correct event name (step::completed)' +); + +-- Test 7: Verify run also completes +select is( + pgflow_tests.count_realtime_events('run:completed', (select run_id from run_ids)), + 1::int, + 'Run with only empty map should also complete and send run:completed event' +); + +-- Test 8: Verify NO step:started event (goes straight to completed) +select is( + pgflow_tests.count_realtime_events('step:started', (select run_id from run_ids), 'root_map'), + 0::int, + 'Empty map should NOT send step:started event (instant completion means it never starts in the traditional sense)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql b/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql index 6bf1fa172..273d83f48 100644 --- a/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql @@ -1,5 +1,5 @@ begin; -select plan(5); +select plan(10); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -61,6 +61,41 @@ select is( 'pgflow.fail_task should NOT send any run:completed events when a run fails' ); +-- Test 6: Verify one step:failed event exists +select is( + pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'first'), + 1::int, + 'pgflow.fail_task should send a step:failed event when a step fails permanently' +); + +-- Test 7: Verify step:failed event status +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'first')), + 'failed', + 'The step:failed event should have status "failed"' +); + +-- Test 8: Verify step:failed event contains error message +select is( + (select payload->>'error_message' from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'first')), + 'Test failure message', + 'The step:failed event should contain the error message' +); + +-- Test 9: Verify failed_at timestamp exists +select ok( + (select (payload->>'failed_at')::timestamptz is not null + from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'first')), + 'The step:failed event should include a failed_at timestamp' +); + +-- Test 10: Verify event name formatting +select is( + (select event from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'first')), + 'step:first:failed', + 'The step:failed event should have the correct event name (step::failed)' +); + -- Clean up drop table if exists run_ids; drop table if exists failed_tasks; diff --git a/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql b/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql index b02040f8b..6c9447bce 100644 --- a/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql @@ -1,7 +1,5 @@ begin; --- Note: Since the start_ready_steps function doesn't actually send the step:started events yet, --- we'll test the overall flow to ensure appropriate events are still sent -select plan(1); +select plan(7); -- Ensure partition exists for realtime.messages select pgflow_tests.create_realtime_partition(); @@ -16,28 +14,57 @@ with flow as ( ) select run_id into temporary run_ids from flow; --- Poll for the first task and complete it --- This should trigger start_ready_steps internally for the 'second' step -with task as ( - select * from pgflow_tests.read_and_start('sequential', 1, 1) -) -select pgflow.complete_task( - (select run_id from task), - (select step_slug from task), - 0, - '{"result": "success"}'::jsonb -) into temporary completed_tasks; - --- Since we know that step:started events aren't currently implemented, just verify --- that at least the run:started event was sent +-- Test 1: Verify one step:started event exists for the root step +select is( + pgflow_tests.count_realtime_events('step:started', (select run_id from run_ids), 'first'), + 1::int, + 'pgflow.start_ready_steps should send exactly one step:started event for root step' +); + +-- Test 2: Verify step_slug in event payload +select is( + (select payload->>'step_slug' from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + 'first', + 'The step:started event should contain the correct step_slug' +); + +-- Test 3: Verify status in event payload +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + 'started', + 'The step:started event should have status "started"' +); + +-- Test 4: Verify started_at timestamp exists and is valid select ok( - pgflow_tests.count_realtime_events('run:started', (select run_id from run_ids)) = 1, - 'The system should send a run:started event' + (select (payload->>'started_at')::timestamptz is not null + from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + 'The step:started event should include a started_at timestamp' +); + +-- Test 5: Verify remaining_tasks in payload +select ok( + (select (payload->>'remaining_tasks')::int > 0 + from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + 'The step:started event should include remaining_tasks count' +); + +-- Test 6: Verify event name formatting +select is( + (select event from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + 'step:first:started', + 'The step:started event should have the correct event name (step::started)' +); + +-- Test 7: Verify topic formatting +select is( + (select topic from pgflow_tests.get_realtime_message('step:started', (select run_id from run_ids), 'first')), + concat('pgflow:run:', (select run_id from run_ids)), + 'The step:started event should have the correct topic (pgflow:run:)' ); -- Clean up drop table if exists run_ids; -drop table if exists completed_tasks; select finish(); -rollback; \ No newline at end of file +rollback; diff --git a/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql b/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql new file mode 100644 index 000000000..3767753cf --- /dev/null +++ b/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql @@ -0,0 +1,116 @@ +begin; +select plan(8); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +-- Reset database +select pgflow_tests.reset_db(); + +-- ========================================================================= +-- Setup: Create flow with single step -> dependent map step +-- ========================================================================= +select pgflow.create_flow('type_violation_test'); +select pgflow.add_step('type_violation_test', 'producer'); -- Single step +select pgflow.add_step( + 'type_violation_test', + 'consumer_map', + ARRAY['producer']::text[], + NULL, -- opt_max_attempts + NULL, -- opt_base_delay + NULL, -- opt_timeout + NULL, -- opt_start_delay + 'map' -- step_type +); + +-- Start the flow and capture the run_id +with flow as ( + select * from pgflow.start_flow('type_violation_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Poll for the producer task +with task as ( + select * from pgflow_tests.read_and_start('type_violation_test', 1, 1) +) +select * into temporary producer_tasks from task; + +-- Complete the producer with INVALID output (object instead of array for dependent map) +-- This should trigger a type violation failure +select pgflow.complete_task( + (select run_id from producer_tasks), + (select step_slug from producer_tasks), + 0, + '{"items": [1, 2, 3]}'::jsonb -- Object, not array! +) into temporary completed_tasks; + +-- ========================================================================= +-- CRITICAL TESTS: Verify failure events ARE broadcast +-- These will FAIL until complete_task.sql is fixed! +-- ========================================================================= + +-- Test 1: Verify run status is failed in database (this should pass) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'failed', + '[Database State] Run should be marked as failed due to type violation' +); + +-- Test 2: Verify producer step status is failed in database (this should pass) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'producer'), + 'failed', + '[Database State] Producer step should be marked as failed due to type violation' +); + +-- Test 3: Verify error message contains TYPE_VIOLATION (this should pass) +select ok( + (select error_message from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'producer') + LIKE '%TYPE_VIOLATION%', + '[Database State] Step error message should indicate TYPE_VIOLATION' +); + +-- Test 4: CRITICAL - Verify step:failed event was broadcast (WILL FAIL - BUG) +select is( + pgflow_tests.count_realtime_events('step:failed', (select run_id from run_ids), 'producer'), + 1::int, + '[MISSING EVENT] pgflow.complete_task should send step:failed event for type violation' +); + +-- Test 5: CRITICAL - Verify run:failed event was broadcast (WILL FAIL - BUG) +select is( + pgflow_tests.count_realtime_events('run:failed', (select run_id from run_ids)), + 1::int, + '[MISSING EVENT] pgflow.complete_task should send run:failed event for type violation' +); + +-- Test 6: Verify step:failed event contains correct status (WILL FAIL - no event) +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'producer')), + 'failed', + '[Event Payload] step:failed event should have status "failed"' +); + +-- Test 7: Verify step:failed event contains error message (WILL FAIL - no event) +select ok( + (select payload->>'error_message' from pgflow_tests.get_realtime_message('step:failed', (select run_id from run_ids), 'producer')) + LIKE '%TYPE_VIOLATION%', + '[Event Payload] step:failed event should include TYPE_VIOLATION error message' +); + +-- Test 8: Verify run:failed event contains correct status (WILL FAIL - no event) +select is( + (select payload->>'status' from pgflow_tests.get_realtime_message('run:failed', (select run_id from run_ids))), + 'failed', + '[Event Payload] run:failed event should have status "failed"' +); + +-- Clean up +drop table if exists run_ids; +drop table if exists producer_tasks; +drop table if exists completed_tasks; + +select finish(); +rollback;