diff --git a/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql index cf62ae26b..6aaf49a79 100644 --- a/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql +++ b/pkgs/core/schemas/0100_function_ensure_flow_compiled.sql @@ -12,10 +12,18 @@ volatile set search_path to '' as $$ DECLARE + v_lock_key int; v_flow_exists boolean; v_db_shape jsonb; v_differences text[]; BEGIN + -- Generate lock key from flow_slug (deterministic hash) + v_lock_key := hashtext(p_flow_slug); + + -- Acquire transaction-level advisory lock + -- Serializes concurrent compilation attempts for same flow + PERFORM pg_advisory_xact_lock(1, v_lock_key); + -- 1. Check if flow exists SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug) INTO v_flow_exists; diff --git a/pkgs/core/supabase/migrations/20251201105311_pgflow_temp_advisory_lock_for_compilation.sql b/pkgs/core/supabase/migrations/20251201105311_pgflow_temp_advisory_lock_for_compilation.sql new file mode 100644 index 000000000..3b8c30c10 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251201105311_pgflow_temp_advisory_lock_for_compilation.sql @@ -0,0 +1,48 @@ +-- Modify "ensure_flow_compiled" function +CREATE OR REPLACE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_lock_key int; + v_flow_exists boolean; + v_db_shape jsonb; + v_differences text[]; +BEGIN + -- Generate lock key from flow_slug (deterministic hash) + v_lock_key := hashtext(p_flow_slug); + + -- Acquire transaction-level advisory lock + -- Serializes concurrent compilation attempts for same flow + PERFORM pg_advisory_xact_lock(1, v_lock_key); + + -- 1. Check if flow exists + SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug) + INTO v_flow_exists; + + -- 2. If flow missing: compile (both modes) + IF NOT v_flow_exists THEN + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb); + END IF; + + -- 3. Get current shape from DB + v_db_shape := pgflow._get_flow_shape(p_flow_slug); + + -- 4. Compare shapes + v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape); + + -- 5. If shapes match: return verified + IF array_length(v_differences, 1) IS NULL THEN + RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb); + END IF; + + -- 6. Shapes differ - handle by mode + IF p_mode = 'development' THEN + -- Recompile in dev mode: full deletion + fresh compile + PERFORM pgflow.delete_flow_and_data(p_flow_slug); + PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape); + RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences)); + ELSE + -- Fail in production mode + RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences)); + END IF; +END; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index aa9bbe923..8aee34675 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY= +h1:K23lj19qKEUldXvUo2XV8Dg1GIY994aZgpvg8XS0TRE= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -15,3 +15,4 @@ h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY= 20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y= 20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko= 20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0= +20251201105311_pgflow_temp_advisory_lock_for_compilation.sql h1:OmRtiaPYjPuq9P87Px2PH06gdKhHZ0Ro6GfjjS0G+Rs= diff --git a/pkgs/edge-worker/deno.lock b/pkgs/edge-worker/deno.lock index d7f54bea1..01a1b0bf4 100644 --- a/pkgs/edge-worker/deno.lock +++ b/pkgs/edge-worker/deno.lock @@ -2,10 +2,8 @@ "version": "4", "specifiers": { "jsr:@deno-library/progress@*": "1.5.1", - "jsr:@henrygd/queue@^1.0.7": "1.0.7", - "jsr:@std/assert@*": "0.224.0", + "jsr:@henrygd/queue@^1.0.7": "1.2.0", "jsr:@std/assert@0.224": "0.224.0", - "jsr:@std/async@*": "0.224.2", "jsr:@std/async@0.224": "0.224.2", "jsr:@std/crypto@*": "1.0.5", "jsr:@std/fmt@0.224": "0.224.0", @@ -13,7 +11,7 @@ "jsr:@std/internal@0.224": "0.224.0", "jsr:@std/io@0.225": "0.225.0", "jsr:@std/io@0.225.0": "0.225.0", - "npm:@supabase/supabase-js@^2.39.0": "2.74.0", + "npm:@supabase/supabase-js@^2.39.0": "2.84.0", "npm:postgres@3.4.5": "3.4.5" }, "jsr": { @@ -24,8 +22,8 @@ "jsr:@std/io@0.225.0" ] }, - "@henrygd/queue@1.0.7": { - "integrity": "98cade132744bb420957c5413393f76eb8ba7261826f026c8a89a562b8fa2961" + "@henrygd/queue@1.2.0": { + "integrity": "3e6c968f9bd56e3e7dfbd9952d0b7173a4ecdb7d4df041b3a0f4dfc896efeede" }, "@std/assert@0.224.0": { "integrity": "8643233ec7aec38a940a8264a6e3eed9bfa44e7a71cc6b3c8874213ff401967f", @@ -57,51 +55,44 @@ } }, "npm": { - "@supabase/auth-js@2.74.0": { - "integrity": "sha512-EJYDxYhBCOS40VJvfQ5zSjo8Ku7JbTICLTcmXt4xHMQZt4IumpRfHg11exXI9uZ6G7fhsQlNgbzDhFN4Ni9NnA==", + "@supabase/auth-js@2.84.0": { + "integrity": "sha512-J6XKbqqg1HQPMfYkAT9BrC8anPpAiifl7qoVLsYhQq5B/dnu/lxab1pabnxtJEsvYG5rwI5HEVEGXMjoQ6Wz2Q==", "dependencies": [ - "@supabase/node-fetch" + "tslib" ] }, - "@supabase/functions-js@2.74.0": { - "integrity": "sha512-VqWYa981t7xtIFVf7LRb9meklHckbH/tqwaML5P3LgvlaZHpoSPjMCNLcquuLYiJLxnh2rio7IxLh+VlvRvSWw==", + "@supabase/functions-js@2.84.0": { + "integrity": "sha512-2oY5QBV4py/s64zMlhPEz+4RTdlwxzmfhM1k2xftD2v1DruRZKfoe7Yn9DCz1VondxX8evcvpc2udEIGzHI+VA==", "dependencies": [ - "@supabase/node-fetch" + "tslib" ] }, - "@supabase/node-fetch@2.6.15": { - "integrity": "sha512-1ibVeYUacxWYi9i0cf5efil6adJ9WRyZBLivgjs+AUpewx1F3xPi7gLgaASI2SmIQxPoCEjAsLAzKPgMJVgOUQ==", + "@supabase/postgrest-js@2.84.0": { + "integrity": "sha512-oplc/3jfJeVW4F0J8wqywHkjIZvOVHtqzF0RESijepDAv5Dn/LThlGW1ftysoP4+PXVIrnghAbzPHo88fNomPQ==", "dependencies": [ - "whatwg-url" + "tslib" ] }, - "@supabase/postgrest-js@2.74.0": { - "integrity": "sha512-9Ypa2eS0Ib/YQClE+BhDSjx7OKjYEF6LAGjTB8X4HucdboGEwR0LZKctNfw6V0PPIAVjjzZxIlNBXGv0ypIkHw==", + "@supabase/realtime-js@2.84.0": { + "integrity": "sha512-ThqjxiCwWiZAroHnYPmnNl6tZk6jxGcG2a7Hp/3kcolPcMj89kWjUTA3cHmhdIWYsP84fHp8MAQjYWMLf7HEUg==", "dependencies": [ - "@supabase/node-fetch" - ] - }, - "@supabase/realtime-js@2.74.0": { - "integrity": "sha512-K5VqpA4/7RO1u1nyD5ICFKzWKu58bIDcPxHY0aFA7MyWkFd0pzi/XYXeoSsAifnD9p72gPIpgxVXCQZKJg1ktQ==", - "dependencies": [ - "@supabase/node-fetch", "@types/phoenix", "@types/ws", + "tslib", "ws" ] }, - "@supabase/storage-js@2.74.0": { - "integrity": "sha512-o0cTQdMqHh4ERDLtjUp1/KGPbQoNwKRxUh6f8+KQyjC5DSmiw/r+jgFe/WHh067aW+WU8nA9Ytw9ag7OhzxEkQ==", + "@supabase/storage-js@2.84.0": { + "integrity": "sha512-vXvAJ1euCuhryOhC6j60dG8ky+lk0V06ubNo+CbhuoUv+sl39PyY0lc+k+qpQhTk/VcI6SiM0OECLN83+nyJ5A==", "dependencies": [ - "@supabase/node-fetch" + "tslib" ] }, - "@supabase/supabase-js@2.74.0": { - "integrity": "sha512-IEMM/V6gKdP+N/X31KDIczVzghDpiPWFGLNjS8Rus71KvV6y6ueLrrE/JGCHDrU+9pq5copF3iCa0YQh+9Lq9Q==", + "@supabase/supabase-js@2.84.0": { + "integrity": "sha512-byMqYBvb91sx2jcZsdp0qLpmd4Dioe80e4OU/UexXftCkpTcgrkoENXHf5dO8FCSai8SgNeq16BKg10QiDI6xg==", "dependencies": [ "@supabase/auth-js", "@supabase/functions-js", - "@supabase/node-fetch", "@supabase/postgrest-js", "@supabase/realtime-js", "@supabase/storage-js" @@ -125,22 +116,12 @@ "postgres@3.4.5": { "integrity": "sha512-cDWgoah1Gez9rN3H4165peY9qfpEo+SA61oQv65O3cRUE1pOEoJWwddwcqKE8XZYjbblOJlYDlLV4h67HrEVDg==" }, - "tr46@0.0.3": { - "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" + "tslib@2.8.1": { + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==" }, "undici-types@6.19.8": { "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==" }, - "webidl-conversions@3.0.1": { - "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==" - }, - "whatwg-url@5.0.0": { - "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", - "dependencies": [ - "tr46", - "webidl-conversions" - ] - }, "ws@8.18.3": { "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==" } diff --git a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts index 54efe0a99..1858f3246 100644 --- a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts @@ -5,6 +5,8 @@ import { delay } from '@std/async'; import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts'; import { createTestPlatformAdapter } from '../_helpers.ts'; import type { postgres } from '../../sql.ts'; +import postgresLib from 'postgres'; +import { integrationConfig } from '../../config.ts'; // Define a minimal test flow const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilation_flow' }) @@ -245,3 +247,63 @@ Deno.test( } }) ); + +Deno.test( + 'handles concurrent compilation with advisory locks (stress test)', + withPgNoTransaction(async (sql) => { + await sql`select pgflow_tests.reset_db();`; + + const CONCURRENT = 50; // 50 separate connections + const flowSlug = `concurrent_test_${Date.now()}`; + const shape = { + steps: [{ slug: 'step1', stepType: 'single', dependencies: [] }], + }; + + // Create N SEPARATE connections (critical for true concurrency) + const connections = await Promise.all( + Array(CONCURRENT) + .fill(null) + .map(() => postgresLib(integrationConfig.dbUrl, { prepare: false })) + ); + + try { + // Fire all compilations simultaneously on separate connections + // Note: Must use conn.json() for proper jsonb parameter passing + const results = await Promise.all( + connections.map((conn) => + conn`SELECT pgflow.ensure_flow_compiled( + ${flowSlug}, + ${conn.json(shape)}, + 'production' + ) as result` + ) + ); + + // Parse results + const statuses = results.map((r) => r[0].result.status); + const compiled = statuses.filter((s) => s === 'compiled'); + const verified = statuses.filter((s) => s === 'verified'); + + // Assert: exactly 1 compiled, rest verified + assertEquals(compiled.length, 1, 'Exactly 1 should compile'); + assertEquals(verified.length, CONCURRENT - 1, 'Rest should verify'); + + // Assert: exactly 1 flow and 1 step in DB + const [flowCount] = await sql` + SELECT COUNT(*)::int as count FROM pgflow.flows + WHERE flow_slug = ${flowSlug} + `; + const [stepCount] = await sql` + SELECT COUNT(*)::int as count FROM pgflow.steps + WHERE flow_slug = ${flowSlug} + `; + + assertEquals(flowCount.count, 1, 'Exactly 1 flow should exist'); + assertEquals(stepCount.count, 1, 'Exactly 1 step should exist'); + } finally { + // Cleanup + await sql`SELECT pgflow.delete_flow_and_data(${flowSlug})`.catch(() => {}); + await Promise.all(connections.map((c) => c.end())); + } + }) +);