Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions pkgs/core/schemas/0100_function_ensure_flow_compiled.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
-- Ensure a flow is compiled in the database
-- Handles both development (auto-recompile) and production (fail on mismatch) modes
-- Auto-detects environment via is_local(): local -> auto-recompile, production -> fail on mismatch
-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] }
create or replace function pgflow.ensure_flow_compiled(
p_flow_slug text,
p_shape jsonb,
p_mode text default 'production' -- 'development' | 'production'
flow_slug text,
shape jsonb
)
returns jsonb
language plpgsql
Expand All @@ -16,43 +15,46 @@ DECLARE
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
v_is_local boolean;
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(p_flow_slug);
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);

-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);

-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
INTO v_flow_exists;

-- 2. If flow missing: compile (both modes)
-- 2. If flow missing: compile (both environments)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;

-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);

-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);

-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;

-- 6. Shapes differ - handle by mode
IF p_mode = 'development' THEN
-- Recompile in dev mode: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
-- 6. Shapes differ - auto-detect environment via is_local()
v_is_local := pgflow.is_local();

IF v_is_local THEN
-- Recompile in local/dev: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production mode
-- Fail in production
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
Expand Down
2 changes: 1 addition & 1 deletion pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ export type Database = {
Returns: undefined
}
ensure_flow_compiled: {
Args: { p_flow_slug: string; p_mode?: string; p_shape: Json }
Args: { flow_slug: string; shape: Json }
Returns: Json
}
fail_task: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- Create "ensure_flow_compiled" function
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("flow_slug" text, "shape" jsonb) RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
v_is_local boolean;
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(ensure_flow_compiled.flow_slug);

-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);

-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = ensure_flow_compiled.flow_slug)
INTO v_flow_exists;

-- 2. If flow missing: compile (both environments)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;

-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(ensure_flow_compiled.flow_slug);

-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(ensure_flow_compiled.shape, v_db_shape);

-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;

-- 6. Shapes differ - auto-detect environment via is_local()
v_is_local := pgflow.is_local();

IF v_is_local THEN
-- Recompile in local/dev: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(ensure_flow_compiled.flow_slug);
PERFORM pgflow._create_flow_from_shape(ensure_flow_compiled.flow_slug, ensure_flow_compiled.shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$;
-- Drop "ensure_flow_compiled" function
DROP FUNCTION "pgflow"."ensure_flow_compiled" (text, jsonb, text);
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:+t6j1CJcFBfV6SRbAz/7/mVgZeS9vmzWWC+b1/Mp3GA=
h1:moHUa6OxOaPfYpNNrLRhkw13YzwraGNDxMnA9aarP30=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -13,3 +13,4 @@ h1:+t6j1CJcFBfV6SRbAz/7/mVgZeS9vmzWWC+b1/Mp3GA=
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
20251204115929_pgflow_temp_is_local.sql h1:arP+PC2OYI9ktAFx0+G9/w8zsaq/AbFWjLgK6YDujvQ=
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:9FsEd0iyaIv9X1alACBQCzyebt2/+m1rgL4ozXJWBcA=
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Simulate local environment
select set_config('app.settings.jwt_secret', 'super-secret-jwt-token-with-at-least-32-characters-long', true);

-- Setup: Create flow with different shape
select pgflow.create_flow('local_flow');
select pgflow.add_step('local_flow', 'old_step');

-- Test: Different shape should auto-recompile when is_local()=true (no p_mode param)
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'local_flow',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb
) as result
),
'recompiled',
'Should auto-recompile when is_local()=true'
);

-- Verify old step is gone
select is(
(select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'old_step'),
0,
'Old step should be deleted'
);

-- Verify new step exists
select is(
(select count(*)::int from pgflow.steps where flow_slug = 'local_flow' and step_slug = 'new_step'),
1,
'New step should be created'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,49 @@ begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Simulate production environment (not local)
select set_config('app.settings.jwt_secret', 'production-jwt-secret-that-differs-from-local', true);

-- Setup: Create flow with different shape
select pgflow.create_flow('prod_flow');
select pgflow.add_step('prod_flow', 'old_step');
select pgflow.create_flow('prod_flow_auto');
select pgflow.add_step('prod_flow_auto', 'old_step');

-- Test: Different shape in production mode should return mismatch
-- Test: Different shape should return mismatch when is_local()=false (no p_mode param)
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'prod_flow',
'prod_flow_auto',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'production'
}'::jsonb
) as result
),
'mismatch',
'Should return mismatch status in production mode'
'Should return mismatch status when is_local()=false'
);

-- Verify differences are returned
select ok(
(
select jsonb_array_length(result->'differences') > 0
from pgflow.ensure_flow_compiled(
'prod_flow',
'prod_flow_auto',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'production'
}'::jsonb
) as result
),
'Should return differences for production mismatch'
);

-- Verify database was NOT modified
select is(
(select step_slug from pgflow.steps where flow_slug = 'prod_flow'),
(select step_slug from pgflow.steps where flow_slug = 'prod_flow_auto'),
'old_step',
'Database should not be modified on production mismatch'
);
Expand Down

This file was deleted.

5 changes: 5 additions & 0 deletions pkgs/edge-worker/scripts/concatenate-migrations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ echo "-- From file: seed.sql" >> "$target_file"
cat "../core/supabase/seed.sql" >> "$target_file"
echo "" >> "$target_file"
echo "" >> "$target_file"

# Configure local JWT secret for is_local() detection in tests
echo "-- Configure local JWT secret for is_local() detection" >> "$target_file"
echo "ALTER DATABASE postgres SET app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';" >> "$target_file"
echo "" >> "$target_file"
6 changes: 2 additions & 4 deletions pkgs/edge-worker/src/core/Queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ export class Queries {

async ensureFlowCompiled(
flowSlug: string,
shape: FlowShape,
mode: 'development' | 'production'
shape: FlowShape
): Promise<EnsureFlowCompiledResult> {
// SAFETY: FlowShape is JSON-compatible by construction (only strings, numbers,
// arrays, and plain objects), but TypeScript can't prove this because FlowShape
Expand All @@ -69,8 +68,7 @@ export class Queries {
const [result] = await this.sql<{ result: EnsureFlowCompiledResult }[]>`
SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${shapeJson}::jsonb,
${mode}
${shapeJson}::jsonb
) as result
`;
return result.result;
Expand Down
9 changes: 2 additions & 7 deletions pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { FlowShapeMismatchError } from './errors.js';

export interface FlowLifecycleConfig {
heartbeatInterval?: number;
isLocalEnvironment?: boolean;
ensureCompiledOnStartup?: boolean;
}

Expand All @@ -25,7 +24,6 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
private _workerId?: string;
private heartbeatInterval: number;
private lastHeartbeat = 0;
private isLocalEnvironment: boolean;
private ensureCompiledOnStartup: boolean;

constructor(queries: Queries, flow: TFlow, logger: Logger, config?: FlowLifecycleConfig) {
Expand All @@ -34,7 +32,6 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
this.logger = logger;
this.workerState = new WorkerState(logger);
this.heartbeatInterval = config?.heartbeatInterval ?? 5000;
this.isLocalEnvironment = config?.isLocalEnvironment ?? false;
this.ensureCompiledOnStartup = config?.ensureCompiledOnStartup ?? true;
}

Expand All @@ -61,15 +58,13 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
}

private async ensureFlowCompiled(): Promise<void> {
const mode = this.isLocalEnvironment ? 'development' : 'production';
this.logger.info(`Compiling flow '${this.flow.slug}' (mode: ${mode})...`);
this.logger.info(`Compiling flow '${this.flow.slug}'...`);

const shape = extractFlowShape(this.flow);

const result = await this.queries.ensureFlowCompiled(
this.flow.slug,
shape,
mode
shape
);

if (result.status === 'mismatch') {
Expand Down
1 change: 0 additions & 1 deletion pkgs/edge-worker/src/flow/createFlowWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ export function createFlowWorker<TFlow extends AnyFlow, TResources extends Recor
flow,
createLogger('FlowWorkerLifecycle'),
{
isLocalEnvironment: platformAdapter.isLocalEnvironment,
ensureCompiledOnStartup: config.ensureCompiledOnStartup ?? true
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ Deno.test(
await sql`SELECT pgflow.add_step('test_compilation_flow', 'double')`;
await sql`SELECT pgflow.add_step('test_compilation_flow', 'different_step', deps_slugs => ARRAY['double']::text[])`;

// Use isLocal: false to simulate production mode
// Override JWT secret to simulate production mode (is_local() returns false)
await sql`SET app.settings.jwt_secret = 'production-secret'`;

const platformAdapter = createPlatformAdapterWithLocalEnv(sql, false);

const worker = createFlowWorker(
Expand Down Expand Up @@ -273,8 +275,7 @@ Deno.test(
connections.map((conn) =>
conn`SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${conn.json(shape)},
'production'
${conn.json(shape)}
) as result`
)
);
Expand Down
Loading
Loading