Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Compile a flow from a JSONB shape
-- Creates the flow and all its steps using existing create_flow/add_step functions
create or replace function pgflow._create_flow_from_shape(
p_flow_slug text,
p_shape jsonb
)
returns void
language plpgsql
volatile
set search_path to ''
as $$
DECLARE
v_step jsonb;
v_deps text[];
BEGIN
-- Create the flow with defaults
PERFORM pgflow.create_flow(p_flow_slug);

-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;

-- Add the step
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
step_type => v_step->>'stepType'
);
END LOOP;
END;
$$;
22 changes: 22 additions & 0 deletions pkgs/core/schemas/0100_function_delete_flow_and_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- Deletes a flow and all its associated data
-- WARNING: This is destructive - deletes flow definition AND all runtime data
-- Used by ensure_flow_compiled for development mode recompilation
create or replace function pgflow.delete_flow_and_data(p_flow_slug text)
returns void
language plpgsql
volatile
set search_path to ''
as $$
BEGIN
-- Drop queue and archive table (pgmq)
PERFORM pgmq.drop_queue(p_flow_slug);

-- Delete all associated data in the correct order (respecting FK constraints)
DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug;
DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug;
DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug;
DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug;
DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug;
DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug;
END;
$$;
8 changes: 8 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ export type Database = {
Args: { p_db: Json; p_local: Json }
Returns: string[]
}
_create_flow_from_shape: {
Args: { p_flow_slug: string; p_shape: Json }
Returns: undefined
}
_get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json }
add_step: {
Args: {
Expand Down Expand Up @@ -440,6 +444,10 @@ export type Database = {
isSetofReturn: false
}
}
delete_flow_and_data: {
Args: { p_flow_slug: string }
Returns: undefined
}
fail_task: {
Args: {
error_message: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Create "_create_flow_from_shape" function
CREATE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_step jsonb;
v_deps text[];
BEGIN
-- Create the flow with defaults
PERFORM pgflow.create_flow(p_flow_slug);

-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;

-- Add the step
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
step_type => v_step->>'stepType'
);
END LOOP;
END;
$$;
-- Create "delete_flow_and_data" function
CREATE FUNCTION "pgflow"."delete_flow_and_data" ("p_flow_slug" text) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
BEGIN
-- Drop queue and archive table (pgmq)
PERFORM pgmq.drop_queue(p_flow_slug);

-- Delete all associated data in the correct order (respecting FK constraints)
DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug;
DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug;
DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug;
DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug;
DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug;
DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug;
END;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -12,3 +12,4 @@ h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc=
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE=
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
begin;
select plan(4);
select pgflow_tests.reset_db();

-- Test: Compile a simple sequential flow from shape
select pgflow._create_flow_from_shape(
'test_flow',
'{
"steps": [
{"slug": "first", "stepType": "single", "dependencies": []},
{"slug": "second", "stepType": "single", "dependencies": ["first"]},
{"slug": "third", "stepType": "single", "dependencies": ["second"]}
]
}'::jsonb
);

-- Verify flow was created
select is(
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
1,
'Flow should be created'
);

-- Verify steps were created
select is(
(select count(*)::int from pgflow.steps where flow_slug = 'test_flow'),
3,
'All 3 steps should be created'
);

-- Verify step order matches (step_index)
select results_eq(
$$ SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_flow' ORDER BY step_index $$,
$$ VALUES ('first'), ('second'), ('third') $$,
'Steps should be in correct order'
);

-- Verify dependencies were created
select results_eq(
$$ SELECT dep_slug, step_slug FROM pgflow.deps WHERE flow_slug = 'test_flow' ORDER BY step_slug $$,
$$ VALUES ('first', 'second'), ('second', 'third') $$,
'Dependencies should be created correctly'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
begin;
select plan(2);
select pgflow_tests.reset_db();

-- Test: Compile flow with map step
select pgflow._create_flow_from_shape(
'map_flow',
'{
"steps": [
{"slug": "root_map", "stepType": "map", "dependencies": []},
{"slug": "process", "stepType": "single", "dependencies": ["root_map"]}
]
}'::jsonb
);

-- Verify map step was created with correct type
select is(
(select step_type from pgflow.steps where flow_slug = 'map_flow' and step_slug = 'root_map'),
'map',
'Map step should have step_type = map'
);

-- Verify shape round-trips correctly
select is(
pgflow._get_flow_shape('map_flow'),
'{
"steps": [
{"slug": "root_map", "stepType": "map", "dependencies": []},
{"slug": "process", "stepType": "single", "dependencies": ["root_map"]}
]
}'::jsonb,
'Shape should round-trip correctly'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
begin;
select plan(3);
select pgflow_tests.reset_db();

-- Setup: Create a flow with steps
select pgflow.create_flow('test_flow');
select pgflow.add_step('test_flow', 'first');
select pgflow.add_step('test_flow', 'second', array['first']);

-- Verify setup
select is(
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
1,
'Flow should exist before deletion'
);

-- Test: Delete the flow
select pgflow.delete_flow_and_data('test_flow');

-- Verify deletion
select is(
(select count(*)::int from pgflow.flows where flow_slug = 'test_flow'),
0,
'Flow should be deleted'
);

select is(
(select count(*)::int from pgflow.steps where flow_slug = 'test_flow'),
0,
'Steps should be deleted'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
begin;
select plan(4);
select pgflow_tests.reset_db();

-- Setup: Create flow, start it, create runtime data
select pgflow.create_flow('test_flow');
select pgflow.add_step('test_flow', 'first');
select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb);

-- Verify runtime data exists
select is(
(select count(*)::int from pgflow.runs where flow_slug = 'test_flow'),
1,
'Run should exist before deletion'
);

select is(
(select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'),
1,
'Step state should exist before deletion'
);

-- Test: Delete the flow
select pgflow.delete_flow_and_data('test_flow');

-- Verify all runtime data deleted
select is(
(select count(*)::int from pgflow.runs where flow_slug = 'test_flow'),
0,
'Runs should be deleted'
);

select is(
(select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'),
0,
'Step states should be deleted'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ begin;
select plan(9);
select pgflow_tests.reset_db();

-- Load the delete_flow_and_data function
\i _shared/delete_flow_and_data.sql.raw
-- Note: delete_flow_and_data is now part of core schema (no longer needs to be loaded)

-- Create test flow with steps and dependencies
select pgflow.create_flow('test_flow_to_delete', max_attempts => 0);
Expand Down
Loading