From 632a61a82bda45a83cc95d8e17f44d3e3f016695 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 4 Dec 2025 14:31:58 +0100 Subject: [PATCH] remove p_mode param from ensure_flow_compiled, auto-detect via is_local() --- .../0100_function_ensure_flow_compiled.sql | 34 ++++++------ pkgs/core/src/database-types.ts | 2 +- ..._temp_ensure_flow_compiled_auto_detect.sql | 53 +++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../auto_recompiles_when_local.test.sql | 44 +++++++++++++++ ...=> fails_mismatch_when_not_local.test.sql} | 23 ++++---- .../recompiles_in_development_mode.test.sql | 42 --------------- .../scripts/concatenate-migrations.sh | 5 ++ pkgs/edge-worker/src/core/Queries.ts | 6 +-- .../src/flow/FlowWorkerLifecycle.ts | 9 +--- pkgs/edge-worker/src/flow/createFlowWorker.ts | 1 - .../flow/compilationAtStartup.test.ts | 7 +-- .../FlowWorkerLifecycle.compilation.test.ts | 3 +- .../FlowWorkerLifecycle.deprecation.test.ts | 3 +- 14 files changed, 145 insertions(+), 90 deletions(-) create mode 100644 pkgs/core/supabase/migrations/20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql create mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql rename pkgs/core/supabase/tests/ensure_flow_compiled/{fails_on_production_mismatch.test.sql => fails_mismatch_when_not_local.test.sql} (64%) delete mode 100644 pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql diff --git a/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql index 6aaf49a79..dbb0c7b9b 100644 --- a/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql +++ b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql @@ -1,10 +1,9 @@ -- Ensure a flow is compiled in the database --- Handles both development (auto-recompile) and production (fail on mismatch) modes +-- Auto-detects environment via is_local(): local -> auto-recompile, production -> fail on mismatch -- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] } create or replace function pgflow.ensure_flow_compiled( - p_flow_slug text, - p_shape jsonb, - p_mode text default 'production' -- 'development' | 'production' + flow_slug text, + shape jsonb ) returns jsonb language plpgsql @@ -16,43 +15,46 @@ DECLARE v_flow_exists boolean; v_db_shape jsonb; v_differences text[]; + v_is_local boolean; BEGIN -- Generate lock key from flow_slug (deterministic hash) - v_lock_key := hashtext(p_flow_slug); + v_lock_key := hashtext(ensure_flow_compiled.flow_slug); -- Acquire transaction-level advisory lock -- Serializes concurrent compilation attempts for same flow PERFORM pg_advisory_xact_lock(1, v_lock_key); -- 1. Check if flow exists - SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug) + SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug) INTO v_flow_exists; - -- 2. If flow missing: compile (both modes) + -- 2. If flow missing: compile (both environments) IF NOT v_flow_exists THEN - PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape); RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb); END IF; -- 3. Get current shape from DB - v_db_shape := pgflow._get_flow_shape(p_flow_slug); + v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug); -- 4. Compare shapes - v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape); + v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape); -- 5. If shapes match: return verified IF array_length(v_differences, 1) IS NULL THEN RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb); END IF; - -- 6. Shapes differ - handle by mode - IF p_mode = 'development' THEN - -- Recompile in dev mode: full deletion + fresh compile - PERFORM pgflow.delete_flow_and_data(p_flow_slug); - PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + -- 6. Shapes differ - auto-detect environment via is_local() + v_is_local := pgflow.is_local(); + + IF v_is_local THEN + -- Recompile in local/dev: full deletion + fresh compile + PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug); + PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape); RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences)); ELSE - -- Fail in production mode + -- Fail in production RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences)); END IF; END; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 028d23bf5..41cbe281c 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -449,7 +449,7 @@ export type Database = { Returns: undefined } ensure_flow_compiled: { - Args: { p_flow_slug: string; p_mode?: string; p_shape: Json } + Args: { flow_slug: string; shape: Json } Returns: Json } fail_task: { diff --git a/pkgs/core/supabase/migrations/20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql b/pkgs/core/supabase/migrations/20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql new file mode 100644 index 000000000..42735d132 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql @@ -0,0 +1,53 @@ +-- Create "ensure_flow_compiled" function +CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_lock_key int; + v_flow_exists boolean; + v_db_shape jsonb; + v_differences text[]; + v_is_local boolean; +BEGIN + -- Generate lock key from flow_slug (deterministic hash) + v_lock_key := hashtext(ensure_flow_compiled.flow_slug); + + -- Acquire transaction-level advisory lock + -- Serializes concurrent compilation attempts for same flow + PERFORM pg_advisory_xact_lock(1, v_lock_key); + + -- 1. Check if flow exists + SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug) + INTO v_flow_exists; + + -- 2. If flow missing: compile (both environments) + IF NOT v_flow_exists THEN + PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape); + RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb); + END IF; + + -- 3. Get current shape from DB + v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug); + + -- 4. Compare shapes + v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape); + + -- 5. If shapes match: return verified + IF array_length(v_differences, 1) IS NULL THEN + RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb); + END IF; + + -- 6. Shapes differ - auto-detect environment via is_local() + v_is_local := pgflow.is_local(); + + IF v_is_local THEN + -- Recompile in local/dev: full deletion + fresh compile + PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug); + PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape); + RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences)); + ELSE + -- Fail in production + RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences)); + END IF; +END; +$$; +-- Drop "ensure_flow_compiled" function +DROP FUNCTION "pgflow"."ensure_flow_compiled" (text, jsonb, text); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 2467005c7..5d2947840 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:+t6j1CJcFBfV6SRbAz/7/mVgZeS9vmzWWC+b1/Mp3GA= +h1:moHUa6OxOaPfYpNNrLRhkw13YzwraGNDxMnA9aarP30= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -13,3 +13,4 @@ h1:+t6j1CJcFBfV6SRbAz/7/mVgZeS9vmzWWC+b1/Mp3GA= 20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg= 20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM= 20251204115929_pgflow_temp_is_local.sql h1:arP+PC2OYI9ktAFx0+G9/w8zsaq/AbFWjLgK6YDujvQ= +20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:9FsEd0iyaIv9X1alACBQCzyebt2/+m1rgL4ozXJWBcA= diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql new file mode 100644 index 000000000..e354d4741 --- /dev/null +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql @@ -0,0 +1,44 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Simulate local environment +select set_config('app.settings.jwt_secret', 'super-secret-jwt-token-with-at-least-32-characters-long', true); + +-- Setup: Create flow with different shape +select pgflow.create_flow('local_flow'); +select pgflow.add_step('local_flow', 'old_step'); + +-- Test: Different shape should auto-recompile when is_local()=true (no p_mode param) +select is( + ( + select result->>'status' + from pgflow.ensure_flow_compiled( + 'local_flow', + '{ + "steps": [ + {"slug": "new_step", "stepType": "single", "dependencies": []} + ] + }'::jsonb + ) as result + ), + 'recompiled', + 'Should auto-recompile when is_local()=true' +); + +-- Verify old step is gone +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'old_step'), + 0, + 'Old step should be deleted' +); + +-- Verify new step exists +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'new_step'), + 1, + 'New step should be created' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/fails_mismatch_when_not_local.test.sql similarity index 64% rename from pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql rename to pkgs/core/supabase/tests/ensure_flow_compiled/fails_mismatch_when_not_local.test.sql index cc461010e..90953a596 100644 --- a/pkgs/core/supabase/tests/ensure_flow_compiled/fails_on_production_mismatch.test.sql +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/fails_mismatch_when_not_local.test.sql @@ -2,26 +2,28 @@ begin; select plan(3); select pgflow_tests.reset_db(); +-- Setup: Simulate production environment (not local) +select set_config('app.settings.jwt_secret', 'production-jwt-secret-that-differs-from-local', true); + -- Setup: Create flow with different shape -select pgflow.create_flow('prod_flow'); -select pgflow.add_step('prod_flow', 'old_step'); +select pgflow.create_flow('prod_flow_auto'); +select pgflow.add_step('prod_flow_auto', 'old_step'); --- Test: Different shape in production mode should return mismatch +-- Test: Different shape should return mismatch when is_local()=false (no p_mode param) select is( ( select result->>'status' from pgflow.ensure_flow_compiled( - 'prod_flow', + 'prod_flow_auto', '{ "steps": [ {"slug": "new_step", "stepType": "single", "dependencies": []} ] - }'::jsonb, - 'production' + }'::jsonb ) as result ), 'mismatch', - 'Should return mismatch status in production mode' + 'Should return mismatch status when is_local()=false' ); -- Verify differences are returned @@ -29,13 +31,12 @@ select ok( ( select jsonb_array_length(result->'differences') > 0 from pgflow.ensure_flow_compiled( - 'prod_flow', + 'prod_flow_auto', '{ "steps": [ {"slug": "new_step", "stepType": "single", "dependencies": []} ] - }'::jsonb, - 'production' + }'::jsonb ) as result ), 'Should return differences for production mismatch' @@ -43,7 +44,7 @@ select ok( -- Verify database was NOT modified select is( - (select step_slug from pgflow.steps where flow_slug = 'prod_flow'), + (select step_slug from pgflow.steps where flow_slug = 'prod_flow_auto'), 'old_step', 'Database should not be modified on production mismatch' ); diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql deleted file mode 100644 index edda9e999..000000000 --- a/pkgs/core/supabase/tests/ensure_flow_compiled/recompiles_in_development_mode.test.sql +++ /dev/null @@ -1,42 +0,0 @@ -begin; -select plan(3); -select pgflow_tests.reset_db(); - --- Setup: Create flow with different shape -select pgflow.create_flow('dev_flow'); -select pgflow.add_step('dev_flow', 'old_step'); - --- Test: Different shape in development mode should recompile -select is( - ( - select result->>'status' - from pgflow.ensure_flow_compiled( - 'dev_flow', - '{ - "steps": [ - {"slug": "new_step", "stepType": "single", "dependencies": []} - ] - }'::jsonb, - 'development' - ) as result - ), - 'recompiled', - 'Should return recompiled status in development mode' -); - --- Verify old step is gone -select is( - (select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'old_step'), - 0, - 'Old step should be deleted' -); - --- Verify new step exists -select is( - (select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'new_step'), - 1, - 'New step should be created' -); - -select finish(); -rollback; diff --git a/pkgs/edge-worker/scripts/concatenate-migrations.sh b/pkgs/edge-worker/scripts/concatenate-migrations.sh index b78520034..663de01c5 100755 --- a/pkgs/edge-worker/scripts/concatenate-migrations.sh +++ b/pkgs/edge-worker/scripts/concatenate-migrations.sh @@ -34,3 +34,8 @@ echo "-- From file: seed.sql" >> "$target_file" cat "../core/supabase/seed.sql" >> "$target_file" echo "" >> "$target_file" echo "" >> "$target_file" + +# Configure local JWT secret for is_local() detection in tests +echo "-- Configure local JWT secret for is_local() detection" >> "$target_file" +echo "ALTER DATABASE postgres SET app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';" >> "$target_file" +echo "" >> "$target_file" diff --git a/pkgs/edge-worker/src/core/Queries.ts b/pkgs/edge-worker/src/core/Queries.ts index 0bf3bf71a..895f90074 100644 --- a/pkgs/edge-worker/src/core/Queries.ts +++ b/pkgs/edge-worker/src/core/Queries.ts @@ -54,8 +54,7 @@ export class Queries { async ensureFlowCompiled( flowSlug: string, - shape: FlowShape, - mode: 'development' | 'production' + shape: FlowShape ): Promise { // SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers, // arrays, and plain objects), but TypeScript can't prove this because FlowShape @@ -69,8 +68,7 @@ export class Queries { const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>` SELECT pgflow.ensure_flow_compiled( ${flowSlug}, - ${shapeJson}::jsonb, - ${mode} + ${shapeJson}::jsonb ) as result `; return result.result; diff --git a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts index 5fe42d975..3f52e5b25 100644 --- a/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts +++ b/pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts @@ -8,7 +8,6 @@ import { FlowShapeMismatchError } from './errors.js'; export interface FlowLifecycleConfig { heartbeatInterval?: number; - isLocalEnvironment?: boolean; ensureCompiledOnStartup?: boolean; } @@ -25,7 +24,6 @@ export class FlowWorkerLifecycle implements ILifecycle { private _workerId?: string; private heartbeatInterval: number; private lastHeartbeat = 0; - private isLocalEnvironment: boolean; private ensureCompiledOnStartup: boolean; constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) { @@ -34,7 +32,6 @@ export class FlowWorkerLifecycle implements ILifecycle { this.logger = logger; this.workerState = new WorkerState(logger); this.heartbeatInterval = config?.heartbeatInterval ?? 5000; - this.isLocalEnvironment = config?.isLocalEnvironment ?? false; this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true; } @@ -61,15 +58,13 @@ export class FlowWorkerLifecycle implements ILifecycle { } private async ensureFlowCompiled(): Promise { - const mode = this.isLocalEnvironment ? 'development' : 'production'; - this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`); + this.logger.info(`Compiling flow '${this.flow.slug}'...`); const shape = extractFlowShape(this.flow); const result = await this.queries.ensureFlowCompiled( this.flow.slug, - shape, - mode + shape ); if (result.status === 'mismatch') { diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 396a7d174..80225aac9 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -91,7 +91,6 @@ export function createFlowWorker ARRAY['double']::text[])`; - // Use isLocal: false to simulate production mode + // Override JWT secret to simulate production mode (is_local() returns false) + await sql`SET app.settings.jwt_secret = 'production-secret'`; + const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false); const worker = createFlowWorker( @@ -273,8 +275,7 @@ Deno.test( connections.map((conn) => conn`SELECT pgflow.ensure_flow_compiled( ${flowSlug}, - ${conn.json(shape)}, - 'production' + ${conn.json(shape)} ) as result` ) ); diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts index f00a506a3..2b6d2b711 100644 --- a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.compilation.test.ts @@ -32,8 +32,7 @@ class MockQueries extends Queries { override ensureFlowCompiled( _flowSlug: string, - _shape: FlowShape, - _mode: 'development' | 'production' + _shape: FlowShape ): Promise { this.ensureFlowCompiledCallCount++; return Promise.resolve({ status: 'verified', differences: [] }); diff --git a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts index f2304dd4f..6c198d1f4 100644 --- a/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts +++ b/pkgs/edge-worker/tests/unit/FlowWorkerLifecycle.deprecation.test.ts @@ -47,8 +47,7 @@ class MockQueries extends Queries { override ensureFlowCompiled( _flowSlug: string, - _shape: FlowShape, - _mode: 'development' | 'production' + _shape: FlowShape ): Promise { return Promise.resolve({ status: 'verified', differences: [] }); }