Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 167 additions & 3 deletions pkgs/edge-worker/src/control-plane/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { AnyFlow } from '@pgflow/dsl';
import type { AnyFlow, FlowShape } from '@pgflow/dsl';
import { compileFlow } from '@pgflow/dsl';

/**
Expand All @@ -17,6 +17,40 @@ export interface ErrorResponse {
message: string;
}

/**
* Response type for the /flows/:slug/ensure-compiled endpoint
*/
export interface EnsureCompiledResponse {
status: 'compiled' | 'verified' | 'recompiled' | 'mismatch';
differences: string[];
}

/**
* Request body for the /flows/:slug/ensure-compiled endpoint
*/
export interface EnsureCompiledRequest {
shape: FlowShape;
mode: 'development' | 'production';
}

/**
* SQL function interface for database operations
* Compatible with the postgres library's tagged template interface
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
// deno-lint-ignore no-explicit-any
export type SqlFunction = (strings: TemplateStringsArray, ...values: any[]) => Promise<any[]>;

/**
* Options for configuring the ControlPlane handler
*/
export interface ControlPlaneOptions {
/** SQL function for database operations (required for ensure-compiled endpoint) */
sql?: SqlFunction;
/** Secret key for authentication (required for ensure-compiled endpoint) */
secretKey?: string;
}

/**
* Input type for flow registration - accepts array or object (for namespace imports)
*/
Expand Down Expand Up @@ -54,13 +88,17 @@ function buildFlowRegistry(flows: AnyFlow[]): Map<string, AnyFlow> {
/**
* Creates a request handler for the ControlPlane HTTP API
* @param flowsInput Array or object of flow definitions to register
* @param options Optional configuration for database and authentication
* @returns Request handler function
*/
export function createControlPlaneHandler(flowsInput: FlowInput) {
export function createControlPlaneHandler(
flowsInput: FlowInput,
options?: ControlPlaneOptions
) {
const flows = normalizeFlowInput(flowsInput);
const registry = buildFlowRegistry(flows);

return (req: Request): Response => {
return (req: Request): Response | Promise<Response> => {
const url = new URL(req.url);

// Supabase Edge Functions always include function name as first segment
Expand All @@ -75,6 +113,15 @@ export function createControlPlaneHandler(flowsInput: FlowInput) {
return handleGetFlow(registry, slug);
}

// Handle POST /flows/:slug/ensure-compiled
const ensureCompiledMatch = pathname.match(
/^\/flows\/([a-zA-Z0-9_]+)\/ensure-compiled$/
);
if (ensureCompiledMatch && req.method === 'POST') {
const slug = ensureCompiledMatch[1];
return handleEnsureCompiled(req, slug, options);
}

// 404 for unknown routes
return jsonResponse(
{
Expand Down Expand Up @@ -146,3 +193,120 @@ function jsonResponse(data: unknown, status: number): Response {
},
});
}

/**
* Verifies authentication using apikey header
*/
function verifyAuth(request: Request, secretKey: string | undefined): boolean {
if (!secretKey) return false;
const apikey = request.headers.get('apikey');
return apikey === secretKey;
}

/**
* Validates the ensure-compiled request body
*/
function validateEnsureCompiledBody(
body: unknown
): { valid: true; data: EnsureCompiledRequest } | { valid: false; error: string } {
if (!body || typeof body !== 'object') {
return { valid: false, error: 'Request body must be an object' };
}

const { shape, mode } = body as Record<string, unknown>;

if (!shape || typeof shape !== 'object') {
return { valid: false, error: 'Missing or invalid shape in request body' };
}

if (mode !== 'development' && mode !== 'production') {
return {
valid: false,
error: "Invalid mode: must be 'development' or 'production'",
};
}

return { valid: true, data: { shape: shape as FlowShape, mode } };
}

/**
* Handles POST /flows/:slug/ensure-compiled requests
*/
async function handleEnsureCompiled(
request: Request,
flowSlug: string,
options?: ControlPlaneOptions
): Promise<Response> {
// Check if SQL is configured
if (!options?.sql) {
return jsonResponse(
{
error: 'Not Found',
message: 'ensure-compiled endpoint requires SQL configuration',
},
404
);
}

// Verify authentication
if (!verifyAuth(request, options.secretKey)) {
return jsonResponse(
{
error: 'Unauthorized',
message: 'Invalid or missing apikey header',
},
401
);
}

// Parse and validate request body
let body: unknown;
try {
body = await request.json();
} catch {
return jsonResponse(
{
error: 'Bad Request',
message: 'Invalid JSON in request body',
},
400
);
}

const validation = validateEnsureCompiledBody(body);
if (!validation.valid) {
return jsonResponse(
{
error: 'Bad Request',
message: validation.error,
},
400
);
}

const { shape, mode } = validation.data;

// Call SQL function
try {
const [result] = await options.sql`
SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${JSON.stringify(shape)}::jsonb,
${mode}
) as result
`;

const response = result.result as EnsureCompiledResponse;

return jsonResponse(response, response.status === 'mismatch' ? 409 : 200);
} catch (error) {
console.error('Error calling ensure_flow_compiled:', error);
return jsonResponse(
{
error: 'Database Error',
message: error instanceof Error ? error.message : 'Unknown error',
},
500
);
}
}
Loading
Loading