Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pkgs/core/schemas/0100_function_ensure_flow_compiled.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-- Ensure a flow is compiled in the database
-- Handles both development (auto-recompile) and production (fail on mismatch) modes
-- Returns: { status: 'compiled' | 'verified' | 'recompiled' | 'mismatch', differences: text[] }
create or replace function pgflow.ensure_flow_compiled(
p_flow_slug text,
p_shape jsonb,
p_mode text default 'production' -- 'development' | 'production'
)
returns jsonb
language plpgsql
volatile
set search_path to ''
as $$
DECLARE
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
BEGIN
-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
INTO v_flow_exists;

-- 2. If flow missing: compile (both modes)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;

-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(p_flow_slug);

-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);

-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;

-- 6. Shapes differ - handle by mode
IF p_mode = 'development' THEN
-- Recompile in dev mode: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production mode
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$;
4 changes: 4 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ export type Database = {
Args: { p_flow_slug: string }
Returns: undefined
}
ensure_flow_compiled: {
Args: { p_flow_slug: string; p_mode?: string; p_shape: Json }
Returns: Json
}
fail_task: {
Args: {
error_message: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Create "ensure_flow_compiled" function
CREATE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
BEGIN
-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
INTO v_flow_exists;

-- 2. If flow missing: compile (both modes)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;

-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(p_flow_slug);

-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);

-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;

-- 6. Shapes differ - handle by mode
IF p_mode = 'development' THEN
-- Recompile in dev mode: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production mode
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
h1:hsI4u+P4rwzRfYUEWgaWMGHQuhCCJcT12gVMHqHV830=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -13,3 +13,4 @@ h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Test: Missing flow should be compiled (default production mode)
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'new_flow',
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []}
]
}'::jsonb
) as result
),
'compiled',
'Should return compiled status for missing flow'
);

-- Verify flow was actually created
select is(
(select count(*)::int from pgflow.flows where flow_slug = 'new_flow'),
1,
'Flow should be created in database'
);

select is(
(select count(*)::int from pgflow.steps where flow_slug = 'new_flow'),
1,
'Step should be created in database'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Create flow with different shape
select pgflow.create_flow('prod_flow');
select pgflow.add_step('prod_flow', 'old_step');

-- Test: Different shape in production mode should return mismatch
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'prod_flow',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'production'
) as result
),
'mismatch',
'Should return mismatch status in production mode'
);

-- Verify differences are returned
select ok(
(
select jsonb_array_length(result->'differences') > 0
from pgflow.ensure_flow_compiled(
'prod_flow',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'production'
) as result
),
'Should return differences for production mismatch'
);

-- Verify database was NOT modified
select is(
(select step_slug from pgflow.steps where flow_slug = 'prod_flow'),
'old_step',
'Database should not be modified on production mismatch'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Create flow with different shape
select pgflow.create_flow('dev_flow');
select pgflow.add_step('dev_flow', 'old_step');

-- Test: Different shape in development mode should recompile
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'dev_flow',
'{
"steps": [
{"slug": "new_step", "stepType": "single", "dependencies": []}
]
}'::jsonb,
'development'
) as result
),
'recompiled',
'Should return recompiled status in development mode'
);

-- Verify old step is gone
select is(
(select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'old_step'),
0,
'Old step should be deleted'
);

-- Verify new step exists
select is(
(select count(*)::int from pgflow.steps where flow_slug = 'dev_flow' and step_slug = 'new_step'),
1,
'New step should be created'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
begin;
select plan(2);
select pgflow_tests.reset_db();

-- Setup: Create flow first
select pgflow.create_flow('existing_flow');
select pgflow.add_step('existing_flow', 'first');
select pgflow.add_step('existing_flow', 'second', array['first']);

-- Test: Matching shape should return verified
select is(
(
select result->>'status'
from pgflow.ensure_flow_compiled(
'existing_flow',
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
]
}'::jsonb
) as result
),
'verified',
'Should return verified status for matching shape'
);

-- Verify differences array is empty
select is(
(
select jsonb_array_length(result->'differences')
from pgflow.ensure_flow_compiled(
'existing_flow',
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["first"]}
]
}'::jsonb
) as result
),
0,
'Differences should be empty for matching shape'
);

select finish();
rollback;
Loading