From 0b84bb0503d49ff1d7a8ca54e1beb3fd8776bc58 Mon Sep 17 00:00:00 2001 From: jumski <9126+jumski@users.noreply.github.com> Date: Thu, 4 Dec 2025 11:09:29 +0000 Subject: [PATCH] add ensureCompiledOnStartup config flag to opt-out of auto-compilation (#499) ### TL;DR Add automatic flow compilation at worker startup with the ability to opt-out. ### What changed? - Added a new `ensureCompiledOnStartup` configuration option to `FlowWorkerConfig` (defaults to `true`) - When enabled (default), workers call `pgflow.ensure_flow_compiled()` at startup to verify flows are up-to-date - In development mode, mismatched flows are automatically recompiled - In production mode, mismatches cause errors - Added comprehensive tests for the new configuration option ### How to test? 1. Create a flow worker with default settings to see automatic compilation: ```typescript const worker = createFlowWorker(MyFlow, { /* config */ }); ``` 2. Opt-out of automatic compilation: ```typescript const worker = createFlowWorker(MyFlow, { ensureCompiledOnStartup: false, // other config }); ``` 3. Run tests to verify behavior: ``` deno test pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts deno test pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts ``` ### Why make this change? This change improves the developer experience by ensuring flows are properly compiled at worker startup. It helps catch flow definition mismatches early, automatically recompiling in development environments while failing fast in production. The opt-out option provides flexibility for environments where flows are pre-compiled via CLI or other means. --- .changeset/auto-compile-on-startup.md | 7 + .../edge-worker/src/core/workerConfigTypes.ts | 10 +- .../src/flow/FlowWorkerLifecycle.ts | 9 +- pkgs/edge-worker/src/flow/createFlowWorker.ts | 5 +- .../flow/compilationAtStartup.test.ts | 135 ++++++++++++++ .../FlowWorkerLifecycle.compilation.test.ts | 169 ++++++++++++++++++ 6 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 .changeset/auto-compile-on-startup.md create mode 100644 pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts diff --git a/.changeset/auto-compile-on-startup.md b/.changeset/auto-compile-on-startup.md new file mode 100644 index 000000000..942361f7d --- /dev/null +++ b/.changeset/auto-compile-on-startup.md @@ -0,0 +1,7 @@ +--- +'@pgflow/core': patch +'@pgflow/dsl': patch +'@pgflow/edge-worker': patch +--- + +Add automatic flow compilation at worker startup. Workers now call ensure_flow_compiled to verify flows are up-to-date. In development, mismatched flows are recompiled automatically. In production, mismatches cause errors. Use ensureCompiledOnStartup: false to opt-out. diff --git a/pkgs/edge-worker/src/core/workerConfigTypes.ts b/pkgs/edge-worker/src/core/workerConfigTypes.ts index aba0c884f..08d123b35 100644 --- a/pkgs/edge-worker/src/core/workerConfigTypes.ts +++ b/pkgs/edge-worker/src/core/workerConfigTypes.ts @@ -144,6 +144,14 @@ export type ResolvedQueueWorkerConfig = Required> & { +export type ResolvedFlowWorkerConfig = Required> & { connectionString: string | undefined; env: Record; }; \ No newline at end of file diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 3b3d09c52..5fe42d975 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -9,6 +9,7 @@ import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; isLocalEnvironment?: boolean; + ensureCompiledOnStartup?: boolean; } /** @@ -25,6 +26,7 @@ export class FlowWorkerLifecycle implements ILifecycle { private heartbeatInterval: number; private lastHeartbeat = 0; private isLocalEnvironment: boolean; + private ensureCompiledOnStartup: boolean; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { this.queries = queries; @@ -33,6 +35,7 @@ export class FlowWorkerLifecycle implements ILifecycle { this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; this.isLocalEnvironment = config?.isLocalEnvironment ?? false; + this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true; } async acknowledgeStart(workerBootstrap: WorkerBootstrap): Promise { @@ -42,7 +45,11 @@ export class FlowWorkerLifecycle implements ILifecycle { this._workerId = workerBootstrap.workerId; // Compile/verify flow as part of Starting (before registering worker) - await this.ensureFlowCompiled(); + if (this.ensureCompiledOnStartup) { + await this.ensureFlowCompiled(); + } else { + this.logger.info(`Skipping compilation check for flow '${this.flow.slug}' (ensureCompiledOnStartup=false)`); + } // Only register worker after successful compilation this.workerRow = await this.queries.onWorkerStarted({ diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 7b6a5b7ac..396a7d174 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -90,7 +90,10 @@ export function createFlowWorker { + await sql`select pgflow_tests.reset_db();`; + + // Verify flow does NOT exist + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + + // Create worker with ensureCompiledOnStartup: false + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: false, // SKIP compilation + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + await delay(100); + + // Flow should NOT have been created (compilation was skipped) + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'compiles flow when ensureCompiledOnStartup is explicitly true', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Verify flow does NOT exist + const [flowBefore] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + + // Create worker with ensureCompiledOnStartup: true (explicit) + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: true, // EXPLICIT true + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId: crypto.randomUUID(), + }); + await delay(100); + + // Flow SHOULD have been created + const [flowAfter] = await sql` + SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' + `; + assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when ensureCompiledOnStartup is true'); + } finally { + await worker.stop(); + } + }) +); + +Deno.test( + 'worker still registers and polls when ensureCompiledOnStartup is false', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + // Pre-compile the flow manually (simulating pre-compiled via CLI) + await sql`SELECT pgflow.create_flow('test_compilation_flow')`; + await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`; + + const workerId = crypto.randomUUID(); + + // Create worker with ensureCompiledOnStartup: false + const worker = createFlowWorker( + TestCompilationFlow, + { + sql, + ensureCompiledOnStartup: false, // Skip compilation check + maxConcurrent: 1, + batchSize: 10, + maxPollSeconds: 1, + pollIntervalMs: 200, + }, + createLogger, + createPlatformAdapterWithLocalEnv(sql, false) + ); + + try { + worker.startOnlyOnce({ + edgeFunctionName: 'test_compilation', + workerId, + }); + await delay(100); + + // Worker should have registered (check workers table for this specific worker) + const workers = await sql` + SELECT * FROM pgflow.workers WHERE worker_id = ${workerId} + `; + assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation'); + assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue'); + } finally { + await worker.stop(); + } + }) +); diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts new file mode 100644 index 000000000..f00a506a3 --- /dev/null +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts @@ -0,0 +1,169 @@ +import { assertEquals } from '@std/assert'; +import { FlowWorkerLifecycle } from '../../src/flow/FlowWorkerLifecycle.ts'; +import { Queries, type EnsureFlowCompiledResult } from '../../src/core/Queries.ts'; +import type { WorkerRow } from '../../src/core/types.ts'; +import { Flow, type FlowShape } from '@pgflow/dsl'; +import type { Logger } from '../../src/platform/types.ts'; +import type { postgres } from '../sql.ts'; + +// Mock Queries +class MockQueries extends Queries { + public ensureFlowCompiledCallCount = 0; + + constructor() { + // Pass null as sql since we'll override all methods + super(null as unknown as postgres.Sql); + } + + override onWorkerStarted(params: { workerId: string; edgeFunctionName: string; queueName: string }): Promise { + return Promise.resolve({ + worker_id: params.workerId, + queue_name: params.queueName, + function_name: params.edgeFunctionName, + started_at: new Date().toISOString(), + deprecated_at: null, + last_heartbeat_at: new Date().toISOString(), + }); + } + + override sendHeartbeat(_workerRow: WorkerRow): Promise<{ is_deprecated: boolean }> { + return Promise.resolve({ is_deprecated: false }); + } + + override ensureFlowCompiled( + _flowSlug: string, + _shape: FlowShape, + _mode: 'development' | 'production' + ): Promise { + this.ensureFlowCompiledCallCount++; + return Promise.resolve({ status: 'verified', differences: [] }); + } +} + +// Real Flow for testing - using the DSL to create a valid flow +const TestFlow = new Flow<{ value: number }>({ slug: 'test_flow' }) + .step({ slug: 'step1' }, (input) => input.run.value); + +const createMockFlow = () => TestFlow; + +const createLogger = (): Logger => ({ + debug: () => {}, + info: () => {}, + error: () => {}, + warn: () => {}, +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled when ensureCompiledOnStartup is true', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, { + ensureCompiledOnStartup: true + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called once'); +}); + +Deno.test('FlowWorkerLifecycle - skips ensureFlowCompiled when ensureCompiledOnStartup is false', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, { + ensureCompiledOnStartup: false + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 0, 'ensureFlowCompiled should NOT be called'); +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (no config)', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called by default'); +}); + +Deno.test('FlowWorkerLifecycle - calls ensureFlowCompiled by default (empty config)', async () => { + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + const logger = createLogger(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, logger, {}); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + assertEquals(mockQueries.ensureFlowCompiledCallCount, 1, 'ensureFlowCompiled should be called with empty config'); +}); + +Deno.test('FlowWorkerLifecycle - logs skip message when ensureCompiledOnStartup is false', async () => { + const logs: string[] = []; + const testLogger: Logger = { + debug: () => {}, + info: (msg: string) => logs.push(msg), + error: () => {}, + warn: () => {}, + }; + + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, { + ensureCompiledOnStartup: false + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + const skipLog = logs.find(log => log.includes('Skipping compilation')); + assertEquals(skipLog !== undefined, true, 'Should log skip message'); + assertEquals(skipLog?.includes('ensureCompiledOnStartup=false'), true, 'Skip message should mention the config flag'); +}); + +Deno.test('FlowWorkerLifecycle - does not log skip message when ensureCompiledOnStartup is true', async () => { + const logs: string[] = []; + const testLogger: Logger = { + debug: () => {}, + info: (msg: string) => logs.push(msg), + error: () => {}, + warn: () => {}, + }; + + const mockQueries = new MockQueries(); + const mockFlow = createMockFlow(); + + const lifecycle = new FlowWorkerLifecycle(mockQueries, mockFlow, testLogger, { + ensureCompiledOnStartup: true + }); + + await lifecycle.acknowledgeStart({ + workerId: 'test-worker-id', + edgeFunctionName: 'test-function', + }); + + const skipLog = logs.find(log => log.includes('Skipping compilation')); + assertEquals(skipLog, undefined, 'Should NOT log skip message when compilation is enabled'); +});