From 6ed062fc899d8d31cb8f6f68b0ff7c8836a1c202 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 30 Nov 2025 11:53:21 +0100 Subject: [PATCH] feat(edge-worker): add ensure-compiled endpoint to ControlPlane --- pkgs/edge-worker/src/control-plane/server.ts | 170 ++++++++- .../tests/unit/control-plane/server.test.ts | 331 +++++++++++++++++- 2 files changed, 485 insertions(+), 16 deletions(-) diff --git a/pkgs/edge-worker/src/control-plane/server.ts b/pkgs/edge-worker/src/control-plane/server.ts index 3121f242b..9a4c10d0e 100644 --- a/pkgs/edge-worker/src/control-plane/server.ts +++ b/pkgs/edge-worker/src/control-plane/server.ts @@ -1,4 +1,4 @@ -import type { AnyFlow } from '@pgflow/dsl'; +import type { AnyFlow, FlowShape } from '@pgflow/dsl'; import { compileFlow } from '@pgflow/dsl'; /** @@ -17,6 +17,40 @@ export interface ErrorResponse { message: string; } +/** + * Response type for the /flows/:slug/ensure-compiled endpoint + */ +export interface EnsureCompiledResponse { + status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; + differences: string[]; +} + +/** + * Request body for the /flows/:slug/ensure-compiled endpoint + */ +export interface EnsureCompiledRequest { + shape: FlowShape; + mode: 'development' | 'production'; +} + +/** + * SQL function interface for database operations + * Compatible with the postgres library's tagged template interface + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +// deno-lint-ignore no-explicit-any +export type SqlFunction = (strings: TemplateStringsArray, ...values: any[]) => Promise; + +/** + * Options for configuring the ControlPlane handler + */ +export interface ControlPlaneOptions { + /** SQL function for database operations (required for ensure-compiled endpoint) */ + sql?: SqlFunction; + /** Secret key for authentication (required for ensure-compiled endpoint) */ + secretKey?: string; +} + /** * Input type for flow registration - accepts array or object (for namespace imports) */ @@ -54,13 +88,17 @@ function buildFlowRegistry(flows: AnyFlow[]): Map { /** * Creates a request handler for the ControlPlane HTTP API * @param flowsInput Array or object of flow definitions to register + * @param options Optional configuration for database and authentication * @returns Request handler function */ -export function createControlPlaneHandler(flowsInput: FlowInput) { +export function createControlPlaneHandler( + flowsInput: FlowInput, + options?: ControlPlaneOptions +) { const flows = normalizeFlowInput(flowsInput); const registry = buildFlowRegistry(flows); - return (req: Request): Response => { + return (req: Request): Response | Promise => { const url = new URL(req.url); // Supabase Edge Functions always include function name as first segment @@ -75,6 +113,15 @@ export function createControlPlaneHandler(flowsInput: FlowInput) { return handleGetFlow(registry, slug); } + // Handle POST /flows/:slug/ensure-compiled + const ensureCompiledMatch = pathname.match( + /^\/flows\/([a-zA-Z0-9_]+)\/ensure-compiled$/ + ); + if (ensureCompiledMatch && req.method === 'POST') { + const slug = ensureCompiledMatch[1]; + return handleEnsureCompiled(req, slug, options); + } + // 404 for unknown routes return jsonResponse( { @@ -146,3 +193,120 @@ function jsonResponse(data: unknown, status: number): Response { }, }); } + +/** + * Verifies authentication using apikey header + */ +function verifyAuth(request: Request, secretKey: string | undefined): boolean { + if (!secretKey) return false; + const apikey = request.headers.get('apikey'); + return apikey === secretKey; +} + +/** + * Validates the ensure-compiled request body + */ +function validateEnsureCompiledBody( + body: unknown +): { valid: true; data: EnsureCompiledRequest } | { valid: false; error: string } { + if (!body || typeof body !== 'object') { + return { valid: false, error: 'Request body must be an object' }; + } + + const { shape, mode } = body as Record; + + if (!shape || typeof shape !== 'object') { + return { valid: false, error: 'Missing or invalid shape in request body' }; + } + + if (mode !== 'development' && mode !== 'production') { + return { + valid: false, + error: "Invalid mode: must be 'development' or 'production'", + }; + } + + return { valid: true, data: { shape: shape as FlowShape, mode } }; +} + +/** + * Handles POST /flows/:slug/ensure-compiled requests + */ +async function handleEnsureCompiled( + request: Request, + flowSlug: string, + options?: ControlPlaneOptions +): Promise { + // Check if SQL is configured + if (!options?.sql) { + return jsonResponse( + { + error: 'Not Found', + message: 'ensure-compiled endpoint requires SQL configuration', + }, + 404 + ); + } + + // Verify authentication + if (!verifyAuth(request, options.secretKey)) { + return jsonResponse( + { + error: 'Unauthorized', + message: 'Invalid or missing apikey header', + }, + 401 + ); + } + + // Parse and validate request body + let body: unknown; + try { + body = await request.json(); + } catch { + return jsonResponse( + { + error: 'Bad Request', + message: 'Invalid JSON in request body', + }, + 400 + ); + } + + const validation = validateEnsureCompiledBody(body); + if (!validation.valid) { + return jsonResponse( + { + error: 'Bad Request', + message: validation.error, + }, + 400 + ); + } + + const { shape, mode } = validation.data; + + // Call SQL function + try { + const [result] = await options.sql` + SELECT pgflow.ensure_flow_compiled( + ${flowSlug}, + ${JSON.stringify(shape)}::jsonb, + ${mode} + ) as result + `; + + const response = result.result as EnsureCompiledResponse; + + return jsonResponse(response, response.status === 'mismatch' ? 409 : 200); + } catch (error) { + console.error('Error calling ensure_flow_compiled:', error); + return jsonResponse( + { + error: 'Database Error', + message: error instanceof Error ? error.message : 'Unknown error', + }, + 500 + ); + } +} diff --git a/pkgs/edge-worker/tests/unit/control-plane/server.test.ts b/pkgs/edge-worker/tests/unit/control-plane/server.test.ts index b61f1089f..44c1c4fa2 100644 --- a/pkgs/edge-worker/tests/unit/control-plane/server.test.ts +++ b/pkgs/edge-worker/tests/unit/control-plane/server.test.ts @@ -1,6 +1,50 @@ import { assertEquals, assertMatch } from '@std/assert'; -import { Flow, compileFlow } from '@pgflow/dsl'; -import { createControlPlaneHandler } from '../../../src/control-plane/server.ts'; +import { Flow, compileFlow, extractFlowShape } from '@pgflow/dsl'; +import type { FlowShape } from '@pgflow/dsl'; +import { + createControlPlaneHandler, + type ControlPlaneOptions, +} from '../../../src/control-plane/server.ts'; + +// Mock SQL function that simulates database responses +function createMockSql(response: { + status: 'compiled' | 'verified' | 'recompiled' | 'mismatch'; + differences: string[]; +}) { + return function mockSql( + _strings: TemplateStringsArray, + ..._values: unknown[] + ) { + // Return array with result object matching SQL query pattern + return Promise.resolve([{ result: response }]); + }; +} + +// Mock SQL that throws an error +function createErrorSql(errorMessage: string) { + return function mockSql() { + return Promise.reject(new Error(errorMessage)); + }; +} + +// Helper to create POST request with body +function createEnsureCompiledRequest( + slug: string, + body: { shape: FlowShape; mode: 'development' | 'production' }, + apikey?: string +): Request { + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (apikey) { + headers['apikey'] = apikey; + } + return new Request(`http://localhost/pgflow/flows/${slug}/ensure-compiled`, { + method: 'POST', + headers, + body: JSON.stringify(body), + }); +} // Test flows covering different DSL features const FlowWithSingleStep = new Flow({ slug: 'flow_single_step' }) @@ -72,7 +116,7 @@ Deno.test('ControlPlane Handler - GET /flows/:slug returns 404 for unknown flow' const handler = createControlPlaneHandler(ALL_TEST_FLOWS); const request = new Request('http://localhost/pgflow/flows/unknown_flow'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); @@ -84,7 +128,7 @@ Deno.test('ControlPlane Handler - returns 404 for invalid routes', async () => { const handler = createControlPlaneHandler(ALL_TEST_FLOWS); const request = new Request('http://localhost/pgflow/invalid/route'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); @@ -98,7 +142,7 @@ Deno.test('ControlPlane Handler - returns 404 for wrong HTTP method', async () = const request = new Request('http://localhost/pgflow/flows/flow_single_step', { method: 'POST', }); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); @@ -113,7 +157,7 @@ ALL_TEST_FLOWS.forEach((flow) => { const expectedSql = compileFlow(flow); const request = new Request(`http://localhost/pgflow/flows/${flow.slug}`); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 200); const data = await response.json(); @@ -136,7 +180,7 @@ Deno.test('ControlPlane Handler - GET /flows/:slug returns 500 on compilation er const handler = createControlPlaneHandler([brokenFlow]); const request = new Request('http://localhost/pgflow/flows/broken_flow'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 500); const data = await response.json(); @@ -155,7 +199,7 @@ Deno.test('ControlPlane Handler - accepts object of flows', async () => { const expectedSql = compileFlow(FlowWithSingleStep); const request = new Request('http://localhost/pgflow/flows/flow_single_step'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 200); const data = await response.json(); @@ -163,7 +207,7 @@ Deno.test('ControlPlane Handler - accepts object of flows', async () => { assertEquals(data.sql, expectedSql); }); -Deno.test('ControlPlane Handler - accepts namespace import style object', () => { +Deno.test('ControlPlane Handler - accepts namespace import style object', async () => { // Simulates: import * as flows from './flows/index.ts' const flowsNamespace = { FlowWithSingleStep, @@ -176,7 +220,7 @@ Deno.test('ControlPlane Handler - accepts namespace import style object', () => // All flows should be accessible for (const flow of [FlowWithSingleStep, FlowWithMultipleSteps, FlowWithParallelSteps]) { const request = new Request(`http://localhost/pgflow/flows/${flow.slug}`); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 200); } }); @@ -201,7 +245,7 @@ Deno.test('ControlPlane Handler - object input returns 404 for unknown flow', as const handler = createControlPlaneHandler({ FlowWithSingleStep }); const request = new Request('http://localhost/pgflow/flows/unknown_flow'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); @@ -212,7 +256,7 @@ Deno.test('ControlPlane Handler - empty object creates handler with no flows', a const handler = createControlPlaneHandler({}); const request = new Request('http://localhost/pgflow/flows/any_flow'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); @@ -223,9 +267,270 @@ Deno.test('ControlPlane Handler - empty array creates handler with no flows', as const handler = createControlPlaneHandler([]); const request = new Request('http://localhost/pgflow/flows/any_flow'); - const response = handler(request); + const response = await handler(request); assertEquals(response.status, 404); const data = await response.json(); assertEquals(data.error, 'Flow Not Found'); }); + +// ============================================================ +// Tests for POST /flows/:slug/ensure-compiled endpoint +// ============================================================ + +const TEST_SECRET_KEY = 'test-secret-key-12345'; + +Deno.test('ensure-compiled - returns 401 without apikey header', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' } + // No apikey + ); + const response = await handler(request); + + assertEquals(response.status, 401); + const data = await response.json(); + assertEquals(data.error, 'Unauthorized'); +}); + +Deno.test('ensure-compiled - returns 401 with wrong apikey', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + 'wrong-api-key' + ); + const response = await handler(request); + + assertEquals(response.status, 401); + const data = await response.json(); + assertEquals(data.error, 'Unauthorized'); +}); + +Deno.test('ensure-compiled - returns 200 with status compiled for new flow', async () => { + const mockSql = createMockSql({ status: 'compiled', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 200); + const data = await response.json(); + assertEquals(data.status, 'compiled'); + assertEquals(data.differences, []); +}); + +Deno.test('ensure-compiled - returns 200 with status verified for matching shape', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 200); + const data = await response.json(); + assertEquals(data.status, 'verified'); +}); + +Deno.test('ensure-compiled - returns 200 with status recompiled in development mode', async () => { + const mockSql = createMockSql({ + status: 'recompiled', + differences: ['Step count differs: 1 vs 2'], + }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'development' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 200); + const data = await response.json(); + assertEquals(data.status, 'recompiled'); + assertEquals(data.differences, ['Step count differs: 1 vs 2']); +}); + +Deno.test('ensure-compiled - returns 409 on shape mismatch in production mode', async () => { + const mockSql = createMockSql({ + status: 'mismatch', + differences: ['Step count differs: 1 vs 2'], + }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 409); + const data = await response.json(); + assertEquals(data.status, 'mismatch'); + assertEquals(data.differences, ['Step count differs: 1 vs 2']); +}); + +Deno.test('ensure-compiled - returns 500 on database error', async () => { + const mockSql = createErrorSql('Connection failed'); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 500); + const data = await response.json(); + assertEquals(data.error, 'Database Error'); + assertMatch(data.message, /Connection failed/); +}); + +Deno.test('ensure-compiled - returns 404 when SQL not configured', async () => { + // No sql option provided + const handler = createControlPlaneHandler(ALL_TEST_FLOWS); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = createEnsureCompiledRequest( + 'flow_single_step', + { shape, mode: 'production' }, + TEST_SECRET_KEY + ); + const response = await handler(request); + + assertEquals(response.status, 404); + const data = await response.json(); + assertEquals(data.error, 'Not Found'); +}); + +Deno.test('ensure-compiled - returns 400 for invalid JSON body', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const request = new Request( + 'http://localhost/pgflow/flows/flow_single_step/ensure-compiled', + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + apikey: TEST_SECRET_KEY, + }, + body: 'invalid json', + } + ); + const response = await handler(request); + + assertEquals(response.status, 400); + const data = await response.json(); + assertEquals(data.error, 'Bad Request'); +}); + +Deno.test('ensure-compiled - returns 400 for missing shape in body', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const request = new Request( + 'http://localhost/pgflow/flows/flow_single_step/ensure-compiled', + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + apikey: TEST_SECRET_KEY, + }, + body: JSON.stringify({ mode: 'production' }), // missing shape + } + ); + const response = await handler(request); + + assertEquals(response.status, 400); + const data = await response.json(); + assertEquals(data.error, 'Bad Request'); + assertMatch(data.message, /shape/); +}); + +Deno.test('ensure-compiled - returns 400 for invalid mode', async () => { + const mockSql = createMockSql({ status: 'verified', differences: [] }); + const options: ControlPlaneOptions = { + sql: mockSql, + secretKey: TEST_SECRET_KEY, + }; + const handler = createControlPlaneHandler(ALL_TEST_FLOWS, options); + + const shape = extractFlowShape(FlowWithSingleStep); + const request = new Request( + 'http://localhost/pgflow/flows/flow_single_step/ensure-compiled', + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + apikey: TEST_SECRET_KEY, + }, + body: JSON.stringify({ shape, mode: 'invalid' }), + } + ); + const response = await handler(request); + + assertEquals(response.status, 400); + const data = await response.json(); + assertEquals(data.error, 'Bad Request'); + assertMatch(data.message, /mode/); +});