Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 2 additions & 166 deletions pkgs/edge-worker/src/control-plane/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { AnyFlow, FlowShape } from '@pgflow/dsl';
import type { AnyFlow } from '@pgflow/dsl';
import { compileFlow } from '@pgflow/dsl';
import { isLocalSupabase } from '../shared/localDetection.ts';

/**
* Response type for the /flows/:slug endpoint
Expand All @@ -18,38 +17,6 @@ export interface ErrorResponse {
message: string;
}

/**
* Response type for the /flows/:slug/ensure-compiled endpoint
*/
export interface EnsureCompiledResponse {
status: 'compiled' | 'verified' | 'recompiled' | 'mismatch';
differences: string[];
mode: 'development' | 'production';
}

/**
* Request body for the /flows/:slug/ensure-compiled endpoint
*/
export interface EnsureCompiledRequest {
shape: FlowShape;
}

/**
* SQL function interface for database operations
* Compatible with the postgres library's tagged template interface
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
// deno-lint-ignore no-explicit-any
export type SqlFunction = (strings: TemplateStringsArray, ...values: any[]) => Promise<any[]>;

/**
* Options for configuring the ControlPlane handler
*/
export interface ControlPlaneOptions {
/** SQL function for database operations (required for ensure-compiled endpoint) */
sql?: SqlFunction;
}

/**
* Input type for flow registration - accepts array or object (for namespace imports)
*/
Expand Down Expand Up @@ -87,13 +54,9 @@ function buildFlowRegistry(flows: AnyFlow[]): Map<string, AnyFlow> {
/**
* Creates a request handler for the ControlPlane HTTP API
* @param flowsInput Array or object of flow definitions to register
* @param options Optional configuration for database and authentication
* @returns Request handler function
*/
export function createControlPlaneHandler(
flowsInput: FlowInput,
options?: ControlPlaneOptions
) {
export function createControlPlaneHandler(flowsInput: FlowInput) {
const flows = normalizeFlowInput(flowsInput);
const registry = buildFlowRegistry(flows);

Expand All @@ -112,15 +75,6 @@ export function createControlPlaneHandler(
return handleGetFlow(registry, slug);
}

// Handle POST /flows/:slug/ensure-compiled
const ensureCompiledMatch = pathname.match(
/^\/flows\/([a-zA-Z0-9_]+)\/ensure-compiled$/
);
if (ensureCompiledMatch && req.method === 'POST') {
const slug = ensureCompiledMatch[1];
return handleEnsureCompiled(req, slug, options);
}

// 404 for unknown routes
return jsonResponse(
{
Expand Down Expand Up @@ -192,121 +146,3 @@ function jsonResponse(data: unknown, status: number): Response {
},
});
}

/**
* Verifies authentication using apikey header against SUPABASE_SERVICE_ROLE_KEY env var
*/
function verifyAuth(request: Request): boolean {
const serviceRoleKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY');
if (!serviceRoleKey) return false;
const apikey = request.headers.get('apikey');
return apikey === serviceRoleKey;
}

/**
* Validates the ensure-compiled request body
*/
function validateEnsureCompiledBody(
body: unknown
): { valid: true; data: EnsureCompiledRequest } | { valid: false; error: string } {
if (!body || typeof body !== 'object') {
return { valid: false, error: 'Request body must be an object' };
}

const { shape } = body as Record<string, unknown>;

if (!shape || typeof shape !== 'object') {
return { valid: false, error: 'Missing or invalid shape in request body' };
}

return { valid: true, data: { shape: shape as FlowShape } };
}

/**
* Handles POST /flows/:slug/ensure-compiled requests
*/
async function handleEnsureCompiled(
request: Request,
flowSlug: string,
options?: ControlPlaneOptions
): Promise<Response> {
// Check if SQL is configured
if (!options?.sql) {
return jsonResponse(
{
error: 'Not Found',
message: 'ensure-compiled endpoint requires SQL configuration',
},
404
);
}

// Verify authentication
if (!verifyAuth(request)) {
return jsonResponse(
{
error: 'Unauthorized',
message: 'Invalid or missing apikey header',
},
401
);
}

// Parse and validate request body
let body: unknown;
try {
body = await request.json();
} catch {
return jsonResponse(
{
error: 'Bad Request',
message: 'Invalid JSON in request body',
},
400
);
}

const validation = validateEnsureCompiledBody(body);
if (!validation.valid) {
return jsonResponse(
{
error: 'Bad Request',
message: validation.error,
},
400
);
}

const { shape } = validation.data;

// Auto-detect mode based on environment
const mode = isLocalSupabase() ? 'development' : 'production';

// Call SQL function
try {
const [result] = await options.sql`
SELECT pgflow.ensure_flow_compiled(
${flowSlug},
${JSON.stringify(shape)}::jsonb,
${mode}
) as result
`;

// Include detected mode in response for transparency
const response: EnsureCompiledResponse = {
...result.result,
mode,
};

return jsonResponse(response, response.status === 'mismatch' ? 409 : 200);
} catch (error) {
console.error('Error calling ensure_flow_compiled:', error);
return jsonResponse(
{
error: 'Database Error',
message: error instanceof Error ? error.message : 'Unknown error',
},
500
);
}
}
Loading
Loading