diff --git a/.changeset/auto-compile-on-startup.md b/.changeset/auto-compile-on-startup.md new file mode 100644 index 000000000..942361f7d --- /dev/null +++ b/.changeset/auto-compile-on-startup.md @@ -0,0 +1,7 @@ +--- +'@pgflow/core': patch +'@pgflow/dsl': patch +'@pgflow/edge-worker': patch +--- + +Add automatic flow compilation at worker startup. Workers now call ensure_flow_compiled to verify flows are up-to-date. In development, mismatched flows are recompiled automatically. In production, mismatches cause errors. Use ensureCompiledOnStartup: false to opt-out. diff --git a/pkgs/edge-worker/src/core/workerConfigTypes.ts b/pkgs/edge-worker/src/core/workerConfigTypes.ts index aba0c884f..08d123b35 100644 --- a/pkgs/edge-worker/src/core/workerConfigTypes.ts +++ b/pkgs/edge-worker/src/core/workerConfigTypes.ts @@ -144,6 +144,14 @@ export type ResolvedQueueWorkerConfig = Required> & { +export type ResolvedFlowWorkerConfig = Required> & { connectionString: string | undefined; env: Record; }; \ No newline at end of file diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 3b3d09c52..5fe42d975 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -9,6 +9,7 @@ import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; isLocalEnvironment?: boolean; + ensureCompiledOnStartup?: boolean; } /** @@ -25,6 +26,7 @@ export class FlowWorkerLifecycle implements ILifecycle { private heartbeatInterval: number; private lastHeartbeat = 0; private isLocalEnvironment: boolean; + private ensureCompiledOnStartup: boolean; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { this.queries = queries; @@ -33,6 +35,7 @@ export class FlowWorkerLifecycle implements ILifecycle { this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; this.isLocalEnvironment = config?.isLocalEnvironment ?? false; + this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true; } async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise { @@ -42,7 +45,11 @@ export class FlowWorkerLifecycle implements ILifecycle { this._workerId = workerBootstrap.workerId; // Compile/verify flow as part of Starting (before registering worker) - await this.ensureFlowCompiled(); + if (this.ensureCompiledOnStartup) { + await this.ensureFlowCompiled(); + } else { + this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`); + } // Only register worker after successful compilation this.workerRow = await this.queries.onWorkerStarted({ diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 7b6a5b7ac..396a7d174 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -90,7 +90,10 @@ export function createFlowWorker { + await sql`select pgflow_tests.reset_db();`; + + // Verify flow does NOT exist + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + + // Create worker with ensureCompiledOnStartup: false + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: false, // SKIP compilation + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + await delay(100); + + // Flow should NOT have been created (compilation was skipped) + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'compiles flow when ensureCompiledOnStartup is explicitly true', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Verify flow does NOT exist + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + + // Create worker with ensureCompiledOnStartup: true (explicit) + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: true, // EXPLICIT true + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + await delay(100); + + // Flow SHOULD have been created + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when ensureCompiledOnStartup is true'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'worker still registers and polls when ensureCompiledOnStartup is false', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Pre-compile the flow manually (simulating pre-compiled via CLI) + await sql`SELECT pgflow.create_flow('test_compilation_flow')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; + + const workerId = crypto.randomUUID(); + + // Create worker with ensureCompiledOnStartup: false + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: false, // Skip compilation check + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId, + }); + await delay(100); + + // Worker should have registered (check workers table for this specific worker) + const workers = await sql` + SELECT * FROM pgflow.workers WHERE worker_id = ${workerId} + `; + assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation'); + assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue'); + } finally { + await worker.stop(); + } + }) +); diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts new file mode 100644 index 000000000..f00a506a3 --- /dev/null +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts @@ -0,0 +1,169 @@ +import { assertEquals } from '@std/assert'; +import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts'; +import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts'; +import type { WorkerRow } from '../../src/core/types.ts'; +import { Flow, type FlowShape } from '@pgflow/dsl'; +import type { Logger } from '../../src/platform/types.ts'; +import type { postgres } from '../sql.ts'; + +// Mock Queries +class MockQueries extends Queries { + public ensureFlowCompiledCallCount = 0; + + constructor() { + // Pass null as sql since we'll override all methods + super(null as unknown as postgres.Sql); + } + + override onWorkerStarted(params: { workerId: string; edgeFunctionName: string; queueName: string }): Promise { + return Promise.resolve({ + worker_id: params.workerId, + queue_name: params.queueName, + function_name: params.edgeFunctionName, + started_at: new Date().toISOString(), + deprecated_at: null, + last_heartbeat_at: new Date().toISOString(), + }); + } + + override sendHeartbeat(_workerRow: WorkerRow): Promise<{ is_deprecated: boolean }> { + return Promise.resolve({ is_deprecated: false }); + } + + override ensureFlowCompiled( + _flowSlug: string, + _shape: FlowShape, + _mode: 'development' | 'production' + ): Promise { + this.ensureFlowCompiledCallCount++; + return Promise.resolve({ status: 'verified', differences: [] }); + } +} + +// Real Flow for testing - using the DSL to create a valid flow +const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' }) + .step({ slug: 'step1' }, (input) => input.run.value); + +const createMockFlow = () => TestFlow; + +const createLogger = (): Logger => ({ + debug: () => {}, + info: () => {}, + error: () => {}, + warn: () => {}, +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled when ensureCompiledOnStartup is true', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, { + ensureCompiledOnStartup: true + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called once'); +}); + +Deno.test('FlowWorkerLifecycle - skips ensureFlowCompiled when ensureCompiledOnStartup is false', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, { + ensureCompiledOnStartup: false + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 0, 'ensureFlowCompiled should NOT be called'); +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (no config)', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called by default'); +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (empty config)', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {}); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called with empty config'); +}); + +Deno.test('FlowWorkerLifecycle - logs skip message when ensureCompiledOnStartup is false', async () => { + const logs: string[] = []; + const testLogger: Logger = { + debug: () => {}, + info: (msg: string) => logs.push(msg), + error: () => {}, + warn: () => {}, + }; + + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, { + ensureCompiledOnStartup: false + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + const skipLog = logs.find(log => log.includes('Skipping compilation')); + assertEquals(skipLog !== undefined, true, 'Should log skip message'); + assertEquals(skipLog?.includes('ensureCompiledOnStartup=false'), true, 'Skip message should mention the config flag'); +}); + +Deno.test('FlowWorkerLifecycle - does not log skip message when ensureCompiledOnStartup is true', async () => { + const logs: string[] = []; + const testLogger: Logger = { + debug: () => {}, + info: (msg: string) => logs.push(msg), + error: () => {}, + warn: () => {}, + }; + + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, { + ensureCompiledOnStartup: true + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + const skipLog = logs.find(log => log.includes('Skipping compilation')); + assertEquals(skipLog, undefined, 'Should NOT log skip message when compilation is enabled'); +});