Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/auto-compile-on-startup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@pgflow/core': patch
'@pgflow/dsl': patch
'@pgflow/edge-worker': patch
---

Add automatic flow compilation at worker startup. Workers now call ensure_flow_compiled to verify flows are up-to-date. In development, mismatched flows are recompiled automatically. In production, mismatches cause errors. Use ensureCompiledOnStartup: false to opt-out.
10 changes: 9 additions & 1 deletion pkgs/edge-worker/src/core/workerConfigTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ export type ResolvedQueueWorkerConfig = Required<Omit<QueueWorkerConfig, 'retryD
* Configuration for the flow worker with two-phase polling
*/
export type FlowWorkerConfig = {
/**
* Whether to verify/compile flow at worker startup.
* When true (default), worker calls pgflow.ensure_flow_compiled() before polling.
* Set to false to skip compilation check (useful if flows are pre-compiled via CLI).
* @default true
*/
ensureCompiledOnStartup?: boolean;

/**
* How many tasks are processed at the same time
* @default 10
Expand Down Expand Up @@ -201,7 +209,7 @@ export type FlowWorkerConfig = {
/**
* Resolved flow configuration with all defaults applied
*/
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env'>> & {
export type ResolvedFlowWorkerConfig = Required<Omit<FlowWorkerConfig, 'connectionString' | 'env' | 'ensureCompiledOnStartup'>> & {
connectionString: string | undefined;
env: Record<string, string | undefined>;
};
9 changes: 8 additions & 1 deletion pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { FlowShapeMismatchError } from './errors.js';
export interface FlowLifecycleConfig {
heartbeatInterval?: number;
isLocalEnvironment?: boolean;
ensureCompiledOnStartup?: boolean;
}

/**
Expand All @@ -25,6 +26,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
private heartbeatInterval: number;
private lastHeartbeat = 0;
private isLocalEnvironment: boolean;
private ensureCompiledOnStartup: boolean;

constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
this.queries = queries;
Expand All @@ -33,6 +35,7 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
this.workerState = new WorkerState(logger);
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
}

async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise<void> {
Expand All @@ -42,7 +45,11 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
this._workerId = workerBootstrap.workerId;

// Compile/verify flow as part of Starting (before registering worker)
await this.ensureFlowCompiled();
if (this.ensureCompiledOnStartup) {
await this.ensureFlowCompiled();
} else {
this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`);
}

// Only register worker after successful compilation
this.workerRow = await this.queries.onWorkerStarted({
Expand Down
5 changes: 4 additions & 1 deletion pkgs/edge-worker/src/flow/createFlowWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
queries,
flow,
createLogger('FlowWorkerLifecycle'),
{ isLocalEnvironment: platformAdapter.isLocalEnvironment }
{
isLocalEnvironment: platformAdapter.isLocalEnvironment,
ensureCompiledOnStartup: config.ensureCompiledOnStartup ?? true
}
);

// Create frozen worker config ONCE for reuse across all task executions
Expand Down
135 changes: 135 additions & 0 deletions pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,138 @@ Deno.test(
}
})
);

// Tests for ensureCompiledOnStartup config option

Deno.test(
'skips compilation when ensureCompiledOnStartup is false',
withPgNoTransaction(async (sql) => {
await sql`select pgflow_tests.reset_db();`;

// Verify flow does NOT exist
const [flowBefore] = await sql`
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
`;
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');

// Create worker with ensureCompiledOnStartup: false
const worker = createFlowWorker(
TestCompilationFlow,
{
sql,
ensureCompiledOnStartup: false, // SKIP compilation
maxConcurrent: 1,
batchSize: 10,
maxPollSeconds: 1,
pollIntervalMs: 200,
},
createLogger,
createPlatformAdapterWithLocalEnv(sql, false)
);

try {
worker.startOnlyOnce({
edgeFunctionName: 'test_compilation',
workerId: crypto.randomUUID(),
});
await delay(100);

// Flow should NOT have been created (compilation was skipped)
const [flowAfter] = await sql`
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
`;
assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped');
} finally {
await worker.stop();
}
})
);

Deno.test(
'compiles flow when ensureCompiledOnStartup is explicitly true',
withPgNoTransaction(async (sql) => {
await sql`select pgflow_tests.reset_db();`;

// Verify flow does NOT exist
const [flowBefore] = await sql`
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
`;
assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup');

// Create worker with ensureCompiledOnStartup: true (explicit)
const worker = createFlowWorker(
TestCompilationFlow,
{
sql,
ensureCompiledOnStartup: true, // EXPLICIT true
maxConcurrent: 1,
batchSize: 10,
maxPollSeconds: 1,
pollIntervalMs: 200,
},
createLogger,
createPlatformAdapterWithLocalEnv(sql, false)
);

try {
worker.startOnlyOnce({
edgeFunctionName: 'test_compilation',
workerId: crypto.randomUUID(),
});
await delay(100);

// Flow SHOULD have been created
const [flowAfter] = await sql`
SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow'
`;
assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when ensureCompiledOnStartup is true');
} finally {
await worker.stop();
}
})
);

Deno.test(
'worker still registers and polls when ensureCompiledOnStartup is false',
withPgNoTransaction(async (sql) => {
await sql`select pgflow_tests.reset_db();`;

// Pre-compile the flow manually (simulating pre-compiled via CLI)
await sql`SELECT pgflow.create_flow('test_compilation_flow')`;
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;

const workerId = crypto.randomUUID();

// Create worker with ensureCompiledOnStartup: false
const worker = createFlowWorker(
TestCompilationFlow,
{
sql,
ensureCompiledOnStartup: false, // Skip compilation check
maxConcurrent: 1,
batchSize: 10,
maxPollSeconds: 1,
pollIntervalMs: 200,
},
createLogger,
createPlatformAdapterWithLocalEnv(sql, false)
);

try {
worker.startOnlyOnce({
edgeFunctionName: 'test_compilation',
workerId,
});
await delay(100);

// Worker should have registered (check workers table for this specific worker)
const workers = await sql`
SELECT * FROM pgflow.workers WHERE worker_id = ${workerId}
`;
assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation');
assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue');
} finally {
await worker.stop();
}
})
);
169 changes: 169 additions & 0 deletions pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { assertEquals } from '@std/assert';
import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts';
import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts';
import type { WorkerRow } from '../../src/core/types.ts';
import { Flow, type FlowShape } from '@pgflow/dsl';
import type { Logger } from '../../src/platform/types.ts';
import type { postgres } from '../sql.ts';

// Mock Queries
class MockQueries extends Queries {
public ensureFlowCompiledCallCount = 0;

constructor() {
// Pass null as sql since we'll override all methods
super(null as unknown as postgres.Sql);
}

override onWorkerStarted(params: { workerId: string; edgeFunctionName: string; queueName: string }): Promise<WorkerRow> {
return Promise.resolve({
worker_id: params.workerId,
queue_name: params.queueName,
function_name: params.edgeFunctionName,
started_at: new Date().toISOString(),
deprecated_at: null,
last_heartbeat_at: new Date().toISOString(),
});
}

override sendHeartbeat(_workerRow: WorkerRow): Promise<{ is_deprecated: boolean }> {
return Promise.resolve({ is_deprecated: false });
}

override ensureFlowCompiled(
_flowSlug: string,
_shape: FlowShape,
_mode: 'development' | 'production'
): Promise<EnsureFlowCompiledResult> {
this.ensureFlowCompiledCallCount++;
return Promise.resolve({ status: 'verified', differences: [] });
}
}

// Real Flow for testing - using the DSL to create a valid flow
const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' })
.step({ slug: 'step1' }, (input) => input.run.value);

const createMockFlow = () => TestFlow;

const createLogger = (): Logger => ({
debug: () => {},
info: () => {},
error: () => {},
warn: () => {},
});

Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled when ensureCompiledOnStartup is true', async () => {
const mockQueries = new MockQueries();
const mockFlow = createMockFlow();
const logger = createLogger();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
ensureCompiledOnStartup: true
});

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called once');
});

Deno.test('FlowWorkerLifecycle - skips ensureFlowCompiled when ensureCompiledOnStartup is false', async () => {
const mockQueries = new MockQueries();
const mockFlow = createMockFlow();
const logger = createLogger();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {
ensureCompiledOnStartup: false
});

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

assertEquals(mockQueries.ensureFlowCompiledCallCount, 0, 'ensureFlowCompiled should NOT be called');
});

Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (no config)', async () => {
const mockQueries = new MockQueries();
const mockFlow = createMockFlow();
const logger = createLogger();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger);

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called by default');
});

Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (empty config)', async () => {
const mockQueries = new MockQueries();
const mockFlow = createMockFlow();
const logger = createLogger();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {});

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called with empty config');
});

Deno.test('FlowWorkerLifecycle - logs skip message when ensureCompiledOnStartup is false', async () => {
const logs: string[] = [];
const testLogger: Logger = {
debug: () => {},
info: (msg: string) => logs.push(msg),
error: () => {},
warn: () => {},
};

const mockQueries = new MockQueries();
const mockFlow = createMockFlow();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
ensureCompiledOnStartup: false
});

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

const skipLog = logs.find(log => log.includes('Skipping compilation'));
assertEquals(skipLog !== undefined, true, 'Should log skip message');
assertEquals(skipLog?.includes('ensureCompiledOnStartup=false'), true, 'Skip message should mention the config flag');
});

Deno.test('FlowWorkerLifecycle - does not log skip message when ensureCompiledOnStartup is true', async () => {
const logs: string[] = [];
const testLogger: Logger = {
debug: () => {},
info: (msg: string) => logs.push(msg),
error: () => {},
warn: () => {},
};

const mockQueries = new MockQueries();
const mockFlow = createMockFlow();

const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, {
ensureCompiledOnStartup: true
});

await lifecycle.acknowledgeStart({
workerId: 'test-worker-id',
edgeFunctionName: 'test-function',
});

const skipLog = logs.find(log => log.includes('Skipping compilation'));
assertEquals(skipLog, undefined, 'Should NOT log skip message when compilation is enabled');
});