Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkgs/core/schemas/0100_function_ensure_flow_compiled.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ volatile
set search_path to ''
as $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(p_flow_slug);

-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);

-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
INTO v_flow_exists;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- Modify "ensure_flow_compiled" function
CREATE OR REPLACE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_lock_key int;
v_flow_exists boolean;
v_db_shape jsonb;
v_differences text[];
BEGIN
-- Generate lock key from flow_slug (deterministic hash)
v_lock_key := hashtext(p_flow_slug);

-- Acquire transaction-level advisory lock
-- Serializes concurrent compilation attempts for same flow
PERFORM pg_advisory_xact_lock(1, v_lock_key);

-- 1. Check if flow exists
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
INTO v_flow_exists;

-- 2. If flow missing: compile (both modes)
IF NOT v_flow_exists THEN
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
END IF;

-- 3. Get current shape from DB
v_db_shape := pgflow._get_flow_shape(p_flow_slug);

-- 4. Compare shapes
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);

-- 5. If shapes match: return verified
IF array_length(v_differences, 1) IS NULL THEN
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
END IF;

-- 6. Shapes differ - handle by mode
IF p_mode = 'development' THEN
-- Recompile in dev mode: full deletion + fresh compile
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
ELSE
-- Fail in production mode
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
END IF;
END;
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
h1:K23lj19qKEUldXvUo2XV8Dg1GIY994aZgpvg8XS0TRE=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -15,3 +15,4 @@ h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=
20251201105311_pgflow_temp_advisory_lock_for_compilation.sql h1:OmRtiaPYjPuq9P87Px2PH06gdKhHZ0Ro6GfjjS0G+Rs=
65 changes: 23 additions & 42 deletions pkgs/edge-worker/deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { delay } from '@std/async';
import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts';
import { createTestPlatformAdapter } from '../_helpers.ts';
import type { postgres } from '../../sql.ts';
import postgresLib from 'postgres';
import { integrationConfig } from '../../config.ts';

// Define a minimal test flow
const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilation_flow' })
Expand Down Expand Up @@ -245,3 +247,63 @@ Deno.test(
}
})
);

Deno.test(
'handles concurrent compilation with advisory locks (stress test)',
withPgNoTransaction(async (sql) => {
await sql`select pgflow_tests.reset_db();`;

const CONCURRENT = 50; // 50 separate connections
const flowSlug = `concurrent_test_${Date.now()}`;
const shape = {
steps: [{ slug: 'step1', stepType: 'single', dependencies: [] }],
};

// Create N SEPARATE connections (critical for true concurrency)
const connections = await Promise.all(
Array(CONCURRENT)
.fill(null)
.map(() => postgresLib(integrationConfig.dbUrl, { prepare: false }))
);

try {
// Fire all compilations simultaneously on separate connections
// Note: Must use conn.json() for proper jsonb parameter passing
const results = await Promise.all(
connections.map((conn) =>
conn`SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${conn.json(shape)},
'production'
) as result`
)
);

// Parse results
const statuses = results.map((r) => r[0].result.status);
const compiled = statuses.filter((s) => s === 'compiled');
const verified = statuses.filter((s) => s === 'verified');

// Assert: exactly 1 compiled, rest verified
assertEquals(compiled.length, 1, 'Exactly 1 should compile');
assertEquals(verified.length, CONCURRENT - 1, 'Rest should verify');

// Assert: exactly 1 flow and 1 step in DB
const [flowCount] = await sql`
SELECT COUNT(*)::int as count FROM pgflow.flows
WHERE flow_slug = ${flowSlug}
`;
const [stepCount] = await sql`
SELECT COUNT(*)::int as count FROM pgflow.steps
WHERE flow_slug = ${flowSlug}
`;

assertEquals(flowCount.count, 1, 'Exactly 1 flow should exist');
assertEquals(stepCount.count, 1, 'Exactly 1 step should exist');
} finally {
// Cleanup
await sql`SELECT pgflow.delete_flow_and_data(${flowSlug})`.catch(() => {});
await Promise.all(connections.map((c) => c.end()));
}
})
);
Loading