Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkgs/client/__tests__/e2e/concurrent-operations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FlowRunStatus, FlowStepStatus } from '../../src/lib/types.js';
import { PgflowSqlClient } from '@pgflow/core';
import { readAndStart } from '../helpers/polling.js';
import { cleanupFlow } from '../helpers/cleanup.js';
import { log } from '../helpers/debug.js';

describe('Concurrent Operations Tests', () => {
it(
Expand Down Expand Up @@ -36,15 +37,15 @@ describe('Concurrent Operations Tests', () => {
});

// Start flows sequentially to avoid overwhelming the system
console.log('=== Starting flows ===');
log('=== Starting flows ===');
const run1 = await pgflowClient.startFlow(flow1.slug, { data: 'flow1-data' });
const run2 = await pgflowClient.startFlow(flow2.slug, { data: 'flow2-data' });

expect(run1.flow_slug).toBe(flow1.slug);
expect(run2.flow_slug).toBe(flow2.slug);

// Get and complete tasks from both flows
console.log('=== Completing steps ===');
log('=== Completing steps ===');

const tasks1 = await readAndStart(sql, sqlClient, flow1.slug, 1, 5);
const tasks2 = await readAndStart(sql, sqlClient, flow2.slug, 1, 5);
Expand All @@ -69,16 +70,16 @@ describe('Concurrent Operations Tests', () => {
// Debug: Check database state directly
const dbState1 = await sql`SELECT status, output FROM pgflow.step_tasks WHERE run_id = ${run1.run_id}::uuid`;
const dbState2 = await sql`SELECT status, output FROM pgflow.step_tasks WHERE run_id = ${run2.run_id}::uuid`;
console.log('DB State 1:', dbState1);
console.log('DB State 2:', dbState2);
log('DB State 1:', dbState1);
log('DB State 2:', dbState2);

// Verify database shows completion (since realtime events may be delayed)
expect(dbState1[0].status).toBe('completed');
expect(dbState2[0].status).toBe('completed');
expect(dbState1[0].output).toEqual({ result: 'flow1-completed' });
expect(dbState2[0].output).toEqual({ result: 'flow2-completed' });

console.log('=== Concurrent flows completed successfully ===');
log('=== Concurrent flows completed successfully ===');
await supabaseClient.removeAllChannels();
}),
40000
Expand Down Expand Up @@ -164,9 +165,9 @@ describe('Concurrent Operations Tests', () => {
expect(observerRun2!.step('shared_step').output).toEqual(stepOutput);

// Log event counts for debugging (realtime delivery can be unreliable in tests)
console.log('Client 1 events:', client1Events.length);
console.log('Client 2 events:', client2Events.length);
console.log('Client 3 events:', client3Events.length);
log('Client 1 events:', client1Events.length);
log('Client 2 events:', client2Events.length);
log('Client 3 events:', client3Events.length);

// Don't assert on event count - final state verification is more important
// The test already verified all clients have correct final state above
Expand Down Expand Up @@ -251,7 +252,7 @@ describe('Concurrent Operations Tests', () => {
expect(dbState[0].output).toEqual(expectedOutput);
}

console.log('=== All concurrent instances completed without conflicts ===');
log('=== All concurrent instances completed without conflicts ===');
await supabaseClient.removeAllChannels();
}),
40000
Expand Down Expand Up @@ -332,7 +333,7 @@ describe('Concurrent Operations Tests', () => {
expect(dbStateA[0].output.result).toContain('flow-a');
expect(dbStateB[0].output.result).toContain('flow-b');

console.log('=== Flow isolation maintained successfully ===');
log('=== Flow isolation maintained successfully ===');
await supabaseClient.removeAllChannels();
}),
40000
Expand Down
11 changes: 6 additions & 5 deletions pkgs/client/__tests__/e2e/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Flow } from '@pgflow/dsl';
import { compileFlow } from '@pgflow/dsl';
import { readAndStart } from '../helpers/polling.js';
import { cleanupFlow } from '../helpers/cleanup.js';
import { log } from '../helpers/debug.js';

describe('Full Stack DSL Integration', () => {
it(
Expand Down Expand Up @@ -41,7 +42,7 @@ describe('Full Stack DSL Integration', () => {

// 2. Compile to SQL
const flowSql = compileFlow(SimpleFlow);
console.log('Generated SQL statements:', flowSql);
log('Generated SQL statements:', flowSql);

// 3. Execute SQL to create flow definition
for (const statement of flowSql) {
Expand Down Expand Up @@ -90,7 +91,7 @@ describe('Full Stack DSL Integration', () => {
expect(run.input).toEqual(input);

// 7. Execute the complete flow lifecycle
console.log('=== Step 1: Completing fetch step ===');
log('=== Step 1: Completing fetch step ===');
let tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('fetch');
Expand All @@ -106,7 +107,7 @@ describe('Full Stack DSL Integration', () => {
expect(fetchStep.status).toBe(FlowStepStatus.Completed);
expect(fetchStep.output).toEqual(fetchOutput);

console.log('=== Step 2: Completing process step ===');
log('=== Step 2: Completing process step ===');
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
Expand All @@ -127,7 +128,7 @@ describe('Full Stack DSL Integration', () => {
expect(processStep.status).toBe(FlowStepStatus.Completed);
expect(processStep.output).toEqual(processOutput);

console.log('=== Step 3: Completing save step ===');
log('=== Step 3: Completing save step ===');
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('save');
Expand Down Expand Up @@ -157,7 +158,7 @@ describe('Full Stack DSL Integration', () => {
expect(run.status).toBe(FlowRunStatus.Completed);
expect(run.remaining_steps).toBe(0);

console.log('=== Full Stack DSL Test Completed Successfully ===');
log('=== Full Stack DSL Test Completed Successfully ===');
await supabaseClient.removeAllChannels();
}),
30000
Expand Down
29 changes: 15 additions & 14 deletions pkgs/client/__tests__/e2e/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FlowRunStatus, FlowStepStatus } from '../../src/lib/types.js';
import { PgflowSqlClient } from '@pgflow/core';
import { readAndStart } from '../helpers/polling.js';
import { cleanupFlow } from '../helpers/cleanup.js';
import { log } from '../helpers/debug.js';

describe('Happy Path E2E Integration', () => {
it(
Expand Down Expand Up @@ -40,7 +41,7 @@ describe('Happy Path E2E Integration', () => {

// Listen to all events for verification
run.on('*', (event) => {
console.log('Run event received:', event);
log('Run event received:', event);
receivedRunEvents.push(event);
});

Expand All @@ -49,17 +50,17 @@ describe('Happy Path E2E Integration', () => {
const saveStep = run.step('save');

fetchStep.on('*', (event) => {
console.log('Fetch step event:', event);
log('Fetch step event:', event);
receivedStepEvents.push({ step: 'fetch', event });
});

processStep.on('*', (event) => {
console.log('Process step event:', event);
log('Process step event:', event);
receivedStepEvents.push({ step: 'process', event });
});

saveStep.on('*', (event) => {
console.log('Save step event:', event);
log('Save step event:', event);
receivedStepEvents.push({ step: 'save', event });
});

Expand All @@ -68,7 +69,7 @@ describe('Happy Path E2E Integration', () => {
expect(run.input).toEqual(input);

// Step 1: Complete fetch step
console.log('=== Step 1: Completing fetch step ===');
log('=== Step 1: Completing fetch step ===');
let tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('fetch');
Expand All @@ -83,7 +84,7 @@ describe('Happy Path E2E Integration', () => {
expect(fetchStep.output).toEqual(fetchOutput);

// Step 2: Complete process step (should now be available)
console.log('=== Step 2: Completing process step ===');
log('=== Step 2: Completing process step ===');
tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
Expand All @@ -103,28 +104,28 @@ describe('Happy Path E2E Integration', () => {
expect(processStep.output).toEqual(processOutput);

// Step 3: Complete save step (should now be available)
console.log('=== Step 3: Completing save step ===');
log('=== Step 3: Completing save step ===');

// Debug: Check dependencies in database
const deps = await sql`
SELECT * FROM pgflow.deps WHERE flow_slug = ${testFlow.slug}
ORDER BY dep_slug, step_slug
`;
console.log('Dependencies in database:', deps);
log('Dependencies in database:', deps);

// Debug: Check step_tasks status
const stepTasks = await sql`
SELECT step_slug, status, output FROM pgflow.step_tasks
WHERE run_id = ${run.run_id}::uuid
ORDER BY step_slug
`;
console.log('Step tasks status:', stepTasks);
log('Step tasks status:', stepTasks);

tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('save');

console.log('Save task input:', JSON.stringify(tasks[0].input, null, 2));
log('Save task input:', JSON.stringify(tasks[0].input, null, 2));

expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].input.process).toEqual(processOutput);
Expand All @@ -150,9 +151,9 @@ describe('Happy Path E2E Integration', () => {
expect(run.output).toEqual({ save: saveOutput }); // Final step output becomes run output

// Verify we received realtime events
console.log('=== Event Verification ===');
console.log('Total run events received:', receivedRunEvents.length);
console.log('Total step events received:', receivedStepEvents.length);
log('=== Event Verification ===');
log('Total run events received:', receivedRunEvents.length);
log('Total step events received:', receivedStepEvents.length);

// Should have received at least the completion event
// Note: run:started event may fire before listeners are established
Expand Down Expand Up @@ -183,7 +184,7 @@ describe('Happy Path E2E Integration', () => {
const remainingTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 1);
expect(remainingTasks).toHaveLength(0);

console.log('=== Happy Path E2E Test Completed Successfully ===');
log('=== Happy Path E2E Test Completed Successfully ===');
await supabaseClient.removeAllChannels();
}),
30000 // Allow extra time for 3-step completion
Expand Down
15 changes: 8 additions & 7 deletions pkgs/client/__tests__/e2e/network-resilience.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FlowStepStatus } from '../../src/lib/types.js';
import { PgflowSqlClient } from '@pgflow/core';
import { readAndStart } from '../helpers/polling.js';
import { cleanupFlow } from '../helpers/cleanup.js';
import { log } from '../helpers/debug.js';

describe('Network Resilience Tests', () => {
it(
Expand Down Expand Up @@ -61,7 +62,7 @@ describe('Network Resilience Tests', () => {
expect(stepOne.status).toBe(FlowStepStatus.Completed);

// Simulate network disconnection by unsubscribing from all channels
console.log('=== Simulating network disconnection ===');
log('=== Simulating network disconnection ===');
await supabaseClient.removeAllChannels();

// Wait a bit to ensure disconnection
Expand All @@ -76,7 +77,7 @@ describe('Network Resilience Tests', () => {
await sqlClient.completeTask(tasks[0], stepTwoOutput);

// Reconnect by creating a new run instance (simulates app restart/reconnection)
console.log('=== Simulating reconnection ===');
log('=== Simulating reconnection ===');
const reconnectedRun = await pgflowClient.getRun(run.run_id);
expect(reconnectedRun).toBeTruthy();

Expand Down Expand Up @@ -114,8 +115,8 @@ describe('Network Resilience Tests', () => {
expect(dbState[1].status).toBe('completed');
expect(dbState[1].output).toEqual(stepTwoOutput);

console.log('Events before disconnect:', eventsBeforeDisconnect.length);
console.log('Events after reconnect:', eventsAfterReconnect.length);
log('Events before disconnect:', eventsBeforeDisconnect.length);
log('Events after reconnect:', eventsAfterReconnect.length);

await supabaseClient.removeAllChannels();
}),
Expand Down Expand Up @@ -148,7 +149,7 @@ describe('Network Resilience Tests', () => {
if (channel) {
channel.subscribe((status: string) => {
connectionEvents.push(status);
console.log('Channel status changed:', status);
log('Channel status changed:', status);
});
}

Expand All @@ -165,7 +166,7 @@ describe('Network Resilience Tests', () => {
expect(step.status).toBe(FlowStepStatus.Completed);
expect(step.output).toEqual(stepOutput);

console.log('Connection events recorded:', connectionEvents);
log('Connection events recorded:', connectionEvents);

await supabaseClient.removeAllChannels();
}),
Expand Down Expand Up @@ -195,7 +196,7 @@ describe('Network Resilience Tests', () => {

// Simulate poor network by introducing delays and multiple reconnections
for (let i = 0; i < 3; i++) {
console.log(`=== Connection cycle ${i + 1} ===`);
log(`=== Connection cycle ${i + 1} ===`);

// Short connection period
await new Promise((resolve) => setTimeout(resolve, 100));
Expand Down
11 changes: 11 additions & 0 deletions pkgs/client/__tests__/helpers/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Debug logging utility for e2e tests.
* Enable verbose output by setting DEBUG=1 or VERBOSE=1 environment variable.
*/
export const DEBUG = process.env.DEBUG === '1' || process.env.VERBOSE === '1';

export const log = (...args: unknown[]) => {
if (DEBUG) {
console.log(...args);
}
};
Loading