From 0bdfc0876f3de57092ac11e6d9d5b559c67ef590 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 30 Nov 2025 02:26:29 +0100 Subject: [PATCH] feat(core): add flow creation and deletion functions --- .../0100_function_create_flow_from_shape.sql | 36 +++++++++++++++ .../0100_function_delete_flow_and_data.sql | 22 +++++++++ pkgs/core/src/database-types.ts | 8 ++++ ...2043_pgflow_temp_compilation_utilities.sql | 42 +++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../basic_compile.test.sql | 46 +++++++++++++++++++ .../map_step_compile.test.sql | 36 +++++++++++++++ .../deletes_flow_definition.test.sql | 34 ++++++++++++++ .../deletes_runtime_data.test.sql | 40 ++++++++++++++++ .../maintenance/delete_flow_and_data.test.sql | 3 +- 10 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 pkgs/core/schemas/0100_function_create_flow_from_shape.sql create mode 100644 pkgs/core/schemas/0100_function_delete_flow_and_data.sql create mode 100644 pkgs/core/supabase/migrations/20251130012043_pgflow_temp_compilation_utilities.sql create mode 100644 pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql create mode 100644 pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql create mode 100644 pkgs/core/supabase/tests/delete_flow_and_data/deletes_flow_definition.test.sql create mode 100644 pkgs/core/supabase/tests/delete_flow_and_data/deletes_runtime_data.test.sql diff --git a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql new file mode 100644 index 000000000..4f71c14d9 --- /dev/null +++ b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql @@ -0,0 +1,36 @@ +-- Compile a flow from a JSONB shape +-- Creates the flow and all its steps using existing create_flow/add_step functions +create or replace function pgflow._create_flow_from_shape( + p_flow_slug text, + p_shape jsonb +) +returns void +language plpgsql +volatile +set search_path to '' +as $$ +DECLARE + v_step jsonb; + v_deps text[]; +BEGIN + -- Create the flow with defaults + PERFORM pgflow.create_flow(p_flow_slug); + + -- Iterate over steps in order and add each one + FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') + LOOP + -- Convert dependencies jsonb array to text array + SELECT COALESCE(array_agg(dep), '{}') + INTO v_deps + FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; + + -- Add the step + PERFORM pgflow.add_step( + flow_slug => p_flow_slug, + step_slug => v_step->>'slug', + deps_slugs => v_deps, + step_type => v_step->>'stepType' + ); + END LOOP; +END; +$$; diff --git a/pkgs/core/schemas/0100_function_delete_flow_and_data.sql b/pkgs/core/schemas/0100_function_delete_flow_and_data.sql new file mode 100644 index 000000000..7cd14e110 --- /dev/null +++ b/pkgs/core/schemas/0100_function_delete_flow_and_data.sql @@ -0,0 +1,22 @@ +-- Deletes a flow and all its associated data +-- WARNING: This is destructive - deletes flow definition AND all runtime data +-- Used by ensure_flow_compiled for development mode recompilation +create or replace function pgflow.delete_flow_and_data(p_flow_slug text) +returns void +language plpgsql +volatile +set search_path to '' +as $$ +BEGIN + -- Drop queue and archive table (pgmq) + PERFORM pgmq.drop_queue(p_flow_slug); + + -- Delete all associated data in the correct order (respecting FK constraints) + DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug; + DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug; + DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug; + DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug; + DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug; + DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug; +END; +$$; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index c2bf0e5dc..41c461803 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -350,6 +350,10 @@ export type Database = { Args: { p_db: Json; p_local: Json } Returns: string[] } + _create_flow_from_shape: { + Args: { p_flow_slug: string; p_shape: Json } + Returns: undefined + } _get_flow_shape: { Args: { p_flow_slug: string }; Returns: Json } add_step: { Args: { @@ -440,6 +444,10 @@ export type Database = { isSetofReturn: false } } + delete_flow_and_data: { + Args: { p_flow_slug: string } + Returns: undefined + } fail_task: { Args: { error_message: string diff --git a/pkgs/core/supabase/migrations/20251130012043_pgflow_temp_compilation_utilities.sql b/pkgs/core/supabase/migrations/20251130012043_pgflow_temp_compilation_utilities.sql new file mode 100644 index 000000000..9814d7378 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251130012043_pgflow_temp_compilation_utilities.sql @@ -0,0 +1,42 @@ +-- Create "_create_flow_from_shape" function +CREATE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_step jsonb; + v_deps text[]; +BEGIN + -- Create the flow with defaults + PERFORM pgflow.create_flow(p_flow_slug); + + -- Iterate over steps in order and add each one + FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') + LOOP + -- Convert dependencies jsonb array to text array + SELECT COALESCE(array_agg(dep), '{}') + INTO v_deps + FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; + + -- Add the step + PERFORM pgflow.add_step( + flow_slug => p_flow_slug, + step_slug => v_step->>'slug', + deps_slugs => v_deps, + step_type => v_step->>'stepType' + ); + END LOOP; +END; +$$; +-- Create "delete_flow_and_data" function +CREATE FUNCTION "pgflow"."delete_flow_and_data" ("p_flow_slug" text) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +BEGIN + -- Drop queue and archive table (pgmq) + PERFORM pgmq.drop_queue(p_flow_slug); + + -- Delete all associated data in the correct order (respecting FK constraints) + DELETE FROM pgflow.step_tasks AS task WHERE task.flow_slug = p_flow_slug; + DELETE FROM pgflow.step_states AS state WHERE state.flow_slug = p_flow_slug; + DELETE FROM pgflow.runs AS run WHERE run.flow_slug = p_flow_slug; + DELETE FROM pgflow.deps AS dep WHERE dep.flow_slug = p_flow_slug; + DELETE FROM pgflow.steps AS step WHERE step.flow_slug = p_flow_slug; + DELETE FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug; +END; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 6b10371b4..363cd58f0 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc= +h1:JnAOhrfq6ppZAuSX1Q14eD0gt5obztnyVM2MToozmvA= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -12,3 +12,4 @@ h1:GJGcig/hHnOUiqhMaJfe/JiwaD1pl1vGFaG0SxeAmRc= 20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4= 20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg= 20251130011221_pgflow_temp_shape_utilities.sql h1:KzcP/xJjwfQ7BTbxdgaBzkfPztQcoUwuAmnZTBVqoIE= +20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y= diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql new file mode 100644 index 000000000..ee059957b --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql @@ -0,0 +1,46 @@ +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Test: Compile a simple sequential flow from shape +select pgflow._create_flow_from_shape( + 'test_flow', + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": []}, + {"slug": "second", "stepType": "single", "dependencies": ["first"]}, + {"slug": "third", "stepType": "single", "dependencies": ["second"]} + ] + }'::jsonb +); + +-- Verify flow was created +select is( + (select count(*)::int from pgflow.flows where flow_slug = 'test_flow'), + 1, + 'Flow should be created' +); + +-- Verify steps were created +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'test_flow'), + 3, + 'All 3 steps should be created' +); + +-- Verify step order matches (step_index) +select results_eq( + $$ SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_flow' ORDER BY step_index $$, + $$ VALUES ('first'), ('second'), ('third') $$, + 'Steps should be in correct order' +); + +-- Verify dependencies were created +select results_eq( + $$ SELECT dep_slug, step_slug FROM pgflow.deps WHERE flow_slug = 'test_flow' ORDER BY step_slug $$, + $$ VALUES ('first', 'second'), ('second', 'third') $$, + 'Dependencies should be created correctly' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql new file mode 100644 index 000000000..a0e26bb41 --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql @@ -0,0 +1,36 @@ +begin; +select plan(2); +select pgflow_tests.reset_db(); + +-- Test: Compile flow with map step +select pgflow._create_flow_from_shape( + 'map_flow', + '{ + "steps": [ + {"slug": "root_map", "stepType": "map", "dependencies": []}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + ] + }'::jsonb +); + +-- Verify map step was created with correct type +select is( + (select step_type from pgflow.steps where flow_slug = 'map_flow' and step_slug = 'root_map'), + 'map', + 'Map step should have step_type = map' +); + +-- Verify shape round-trips correctly +select is( + pgflow._get_flow_shape('map_flow'), + '{ + "steps": [ + {"slug": "root_map", "stepType": "map", "dependencies": []}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + ] + }'::jsonb, + 'Shape should round-trip correctly' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/delete_flow_and_data/deletes_flow_definition.test.sql b/pkgs/core/supabase/tests/delete_flow_and_data/deletes_flow_definition.test.sql new file mode 100644 index 000000000..02f124a94 --- /dev/null +++ b/pkgs/core/supabase/tests/delete_flow_and_data/deletes_flow_definition.test.sql @@ -0,0 +1,34 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Create a flow with steps +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'first'); +select pgflow.add_step('test_flow', 'second', array['first']); + +-- Verify setup +select is( + (select count(*)::int from pgflow.flows where flow_slug = 'test_flow'), + 1, + 'Flow should exist before deletion' +); + +-- Test: Delete the flow +select pgflow.delete_flow_and_data('test_flow'); + +-- Verify deletion +select is( + (select count(*)::int from pgflow.flows where flow_slug = 'test_flow'), + 0, + 'Flow should be deleted' +); + +select is( + (select count(*)::int from pgflow.steps where flow_slug = 'test_flow'), + 0, + 'Steps should be deleted' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/delete_flow_and_data/deletes_runtime_data.test.sql b/pkgs/core/supabase/tests/delete_flow_and_data/deletes_runtime_data.test.sql new file mode 100644 index 000000000..795ae4e95 --- /dev/null +++ b/pkgs/core/supabase/tests/delete_flow_and_data/deletes_runtime_data.test.sql @@ -0,0 +1,40 @@ +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Setup: Create flow, start it, create runtime data +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'first'); +select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb); + +-- Verify runtime data exists +select is( + (select count(*)::int from pgflow.runs where flow_slug = 'test_flow'), + 1, + 'Run should exist before deletion' +); + +select is( + (select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'), + 1, + 'Step state should exist before deletion' +); + +-- Test: Delete the flow +select pgflow.delete_flow_and_data('test_flow'); + +-- Verify all runtime data deleted +select is( + (select count(*)::int from pgflow.runs where flow_slug = 'test_flow'), + 0, + 'Runs should be deleted' +); + +select is( + (select count(*)::int from pgflow.step_states where flow_slug = 'test_flow'), + 0, + 'Step states should be deleted' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/maintenance/delete_flow_and_data.test.sql b/pkgs/core/supabase/tests/maintenance/delete_flow_and_data.test.sql index 2bbb0d8dd..0363d1a4c 100644 --- a/pkgs/core/supabase/tests/maintenance/delete_flow_and_data.test.sql +++ b/pkgs/core/supabase/tests/maintenance/delete_flow_and_data.test.sql @@ -2,8 +2,7 @@ begin; select plan(9); select pgflow_tests.reset_db(); --- Load the delete_flow_and_data function -\i _shared/delete_flow_and_data.sql.raw +-- Note: delete_flow_and_data is now part of core schema (no longer needs to be loaded) -- Create test flow with steps and dependencies select pgflow.create_flow('test_flow_to_delete', max_attempts => 0);