diff --git a/pkgs/core/schemas/0057_function_track_worker_function.sql b/pkgs/core/schemas/0057_function_track_worker_function.sql new file mode 100644 index 000000000..286dfb390 --- /dev/null +++ b/pkgs/core/schemas/0057_function_track_worker_function.sql @@ -0,0 +1,21 @@ +-- Track Worker Function +-- Registers an edge function for monitoring by ensure_workers() cron + +drop function if exists pgflow.track_worker_function(text); + +create or replace function pgflow.track_worker_function( + function_name text +) returns void +language sql +as $$ + insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at) + values (track_worker_function.function_name, clock_timestamp(), clock_timestamp()) + on conflict (function_name) + do update set + updated_at = clock_timestamp(), + last_invoked_at = clock_timestamp(); +$$; + +comment on function pgflow.track_worker_function(text) is +'Registers an edge function for monitoring. Called by workers on startup. +Sets last_invoked_at to prevent cron from pinging during startup (debounce).'; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 182ef7819..569165a96 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -578,6 +578,10 @@ export type Database = { isSetofReturn: true } } + track_worker_function: { + Args: { function_name: string } + Returns: undefined + } } Enums: { [_ in never]: never diff --git a/pkgs/core/supabase/migrations/20251204164612_pgflow_temp_track_worker_function.sql b/pkgs/core/supabase/migrations/20251204164612_pgflow_temp_track_worker_function.sql new file mode 100644 index 000000000..65f83d666 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251204164612_pgflow_temp_track_worker_function.sql @@ -0,0 +1,12 @@ +-- Create "track_worker_function" function +CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$ +insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at) + values (track_worker_function.function_name, clock_timestamp(), clock_timestamp()) + on conflict (function_name) + do update set + updated_at = clock_timestamp(), + last_invoked_at = clock_timestamp(); +$$; +-- Set comment to function: "track_worker_function" +COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup. +Sets last_invoked_at to prevent cron from pinging during startup (debounce).'; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 2c753e87f..b6b9434ed 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A= +h1:eMTY2CZPFm2BibVAHjtIMhCWIFcmTcqQq9VRXfs1/1A= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -19,3 +19,4 @@ h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A= 20251204115929_pgflow_temp_is_local.sql h1:pjOFO6k8FCmbxp6S7U3fPImsqW81WwdLwq/UZK74BG4= 20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs= 20251204145037_pgflow_temp_worker_functions_schema.sql h1:5DJJEP0jcg7yapTe7t6FX2ypZIj92lGvJ12AX8g5fz4= +20251204164612_pgflow_temp_track_worker_function.sql h1:SC8Z4Un37A2XVaIIvyZJ0eQ/7r0JENSi6RcBx4GaUCE= diff --git a/pkgs/core/supabase/tests/track_worker_function/basic.test.sql b/pkgs/core/supabase/tests/track_worker_function/basic.test.sql new file mode 100644 index 000000000..239074f51 --- /dev/null +++ b/pkgs/core/supabase/tests/track_worker_function/basic.test.sql @@ -0,0 +1,75 @@ +begin; +select plan(9); +select pgflow_tests.reset_db(); + +-- TEST: Inserts new worker function when it does not exist +select pgflow.track_worker_function('my-edge-function'); +select is( + (select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'), + 1::bigint, + 'track_worker_function() inserts new worker function' +); + +-- TEST: New worker function has enabled=true by default +select is( + (select enabled from pgflow.worker_functions where function_name = 'my-edge-function'), + true, + 'New worker function has enabled=true by default' +); + +-- TEST: New worker function has default heartbeat_timeout_seconds +select is( + (select heartbeat_timeout_seconds from pgflow.worker_functions where function_name = 'my-edge-function'), + 6, + 'New worker function has heartbeat_timeout_seconds=6 by default' +); + +-- TEST: New worker function has last_invoked_at set (debounce protection) +select isnt( + (select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'), + null::timestamptz, + 'New worker function has last_invoked_at set on insert (debounce protection)' +); + +-- TEST: last_invoked_at is set to approximately now +select ok( + (select last_invoked_at >= now() - interval '1 second' + from pgflow.worker_functions + where function_name = 'my-edge-function'), + 'last_invoked_at is set to approximately now on insert' +); + +-- TEST: Upsert updates updated_at on conflict +-- First, get the original updated_at timestamp +select pg_sleep(0.01); -- Small delay to ensure timestamp difference +select pgflow.track_worker_function('my-edge-function'); +select ok( + (select updated_at > created_at from pgflow.worker_functions where function_name = 'my-edge-function'), + 'Upsert updates updated_at timestamp on conflict' +); + +-- TEST: Upsert updates last_invoked_at on conflict +select ok( + (select last_invoked_at >= now() - interval '1 second' + from pgflow.worker_functions + where function_name = 'my-edge-function'), + 'Upsert updates last_invoked_at on conflict (refreshes debounce)' +); + +-- TEST: Upsert does not duplicate rows +select is( + (select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'), + 1::bigint, + 'Upsert does not create duplicate rows' +); + +-- TEST: Can track multiple different functions +select pgflow.track_worker_function('another-function'); +select is( + (select count(*) from pgflow.worker_functions), + 2::bigint, + 'Can track multiple different worker functions' +); + +select finish(); +rollback;