Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkgs/core/schemas/0057_function_track_worker_function.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Track Worker Function
-- Registers an edge function for monitoring by ensure_workers() cron

drop function if exists pgflow.track_worker_function(text);

create or replace function pgflow.track_worker_function(
function_name text
) returns void
language sql
as $$
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
on conflict (function_name)
do update set
updated_at = clock_timestamp(),
last_invoked_at = clock_timestamp();
$$;

comment on function pgflow.track_worker_function(text) is
'Registers an edge function for monitoring. Called by workers on startup.
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';
4 changes: 4 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ export type Database = {
isSetofReturn: true
}
}
track_worker_function: {
Args: { function_name: string }
Returns: undefined
}
}
Enums: {
[_ in never]: never
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Create "track_worker_function" function
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
on conflict (function_name)
do update set
updated_at = clock_timestamp(),
last_invoked_at = clock_timestamp();
$$;
-- Set comment to function: "track_worker_function"
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
h1:eMTY2CZPFm2BibVAHjtIMhCWIFcmTcqQq9VRXfs1/1A=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -19,3 +19,4 @@ h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
20251204115929_pgflow_temp_is_local.sql h1:pjOFO6k8FCmbxp6S7U3fPImsqW81WwdLwq/UZK74BG4=
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs=
20251204145037_pgflow_temp_worker_functions_schema.sql h1:5DJJEP0jcg7yapTe7t6FX2ypZIj92lGvJ12AX8g5fz4=
20251204164612_pgflow_temp_track_worker_function.sql h1:SC8Z4Un37A2XVaIIvyZJ0eQ/7r0JENSi6RcBx4GaUCE=
75 changes: 75 additions & 0 deletions pkgs/core/supabase/tests/track_worker_function/basic.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
begin;
select plan(9);
select pgflow_tests.reset_db();

-- TEST: Inserts new worker function when it does not exist
select pgflow.track_worker_function('my-edge-function');
select is(
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
1::bigint,
'track_worker_function() inserts new worker function'
);

-- TEST: New worker function has enabled=true by default
select is(
(select enabled from pgflow.worker_functions where function_name = 'my-edge-function'),
true,
'New worker function has enabled=true by default'
);

-- TEST: New worker function has default heartbeat_timeout_seconds
select is(
(select heartbeat_timeout_seconds from pgflow.worker_functions where function_name = 'my-edge-function'),
6,
'New worker function has heartbeat_timeout_seconds=6 by default'
);

-- TEST: New worker function has last_invoked_at set (debounce protection)
select isnt(
(select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'),
null::timestamptz,
'New worker function has last_invoked_at set on insert (debounce protection)'
);

-- TEST: last_invoked_at is set to approximately now
select ok(
(select last_invoked_at >= now() - interval '1 second'
from pgflow.worker_functions
where function_name = 'my-edge-function'),
'last_invoked_at is set to approximately now on insert'
);

-- TEST: Upsert updates updated_at on conflict
-- First, get the original updated_at timestamp
select pg_sleep(0.01); -- Small delay to ensure timestamp difference
select pgflow.track_worker_function('my-edge-function');
select ok(
(select updated_at > created_at from pgflow.worker_functions where function_name = 'my-edge-function'),
'Upsert updates updated_at timestamp on conflict'
);

-- TEST: Upsert updates last_invoked_at on conflict
select ok(
(select last_invoked_at >= now() - interval '1 second'
from pgflow.worker_functions
where function_name = 'my-edge-function'),
'Upsert updates last_invoked_at on conflict (refreshes debounce)'
);

-- TEST: Upsert does not duplicate rows
select is(
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
1::bigint,
'Upsert does not create duplicate rows'
);

-- TEST: Can track multiple different functions
select pgflow.track_worker_function('another-function');
select is(
(select count(*) from pgflow.worker_functions),
2::bigint,
'Can track multiple different worker functions'
);

select finish();
rollback;
Loading