Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkgs/cli/__tests__/commands/compile/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('fetchFlowSQL', () => {
status: 404,
json: async () => ({
error: 'Flow Not Found',
message: "Flow 'unknown_flow' not found. Did you add it to flows.ts?",
message: "Flow 'unknown_flow' not found. Did you add it to supabase/functions/pgflow/index.ts?",
}),
};

Expand All @@ -83,7 +83,7 @@ describe('fetchFlowSQL', () => {
).rejects.toThrow("Flow 'unknown_flow' not found");
await expect(
fetchFlowSQL('unknown_flow', 'http://127.0.0.1:50621/functions/v1/pgflow', 'test-publishable-key')
).rejects.toThrow('Add your flow to supabase/functions/pgflow/flows.ts');
).rejects.toThrow('Add your flow to supabase/functions/pgflow/index.ts');
});

it('should handle ECONNREFUSED with startup instructions', async () => {
Expand Down Expand Up @@ -167,7 +167,7 @@ describe('fetchFlowSQL', () => {
).rejects.toThrow("Flow 'unknown_flow' not found");
await expect(
fetchFlowSQL('unknown_flow', 'http://127.0.0.1:50621/functions/v1/pgflow', 'test-publishable-key')
).rejects.toThrow('Did you add it to flows.ts');
).rejects.toThrow('Did you add it to supabase/functions/pgflow/index.ts');
});

it('should construct correct URL with flow slug', async () => {
Expand Down
28 changes: 6 additions & 22 deletions pkgs/cli/__tests__/commands/install/create-edge-function.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('createEdgeFunction', () => {
fs.rmSync(tempDir, { recursive: true, force: true });
});

it('should create all three files when none exist', async () => {
it('should create both files when none exist', async () => {
const result = await createEdgeFunction({
supabasePath,
autoConfirm: true,
Expand All @@ -36,23 +36,16 @@ describe('createEdgeFunction', () => {

// Verify all files exist
const indexPath = path.join(pgflowFunctionDir, 'index.ts');
const flowsPath = path.join(pgflowFunctionDir, 'flows.ts');
const denoJsonPath = path.join(pgflowFunctionDir, 'deno.json');

expect(fs.existsSync(indexPath)).toBe(true);
expect(fs.existsSync(flowsPath)).toBe(true);
expect(fs.existsSync(denoJsonPath)).toBe(true);

// Verify index.ts content
// Verify index.ts content (inline flow registration, no flows.ts)
const indexContent = fs.readFileSync(indexPath, 'utf8');
expect(indexContent).toContain("import { ControlPlane } from '@pgflow/edge-worker'");
expect(indexContent).toContain("import { flows } from './flows.ts'");
expect(indexContent).toContain('ControlPlane.serve(flows)');

// Verify flows.ts content
const flowsContent = fs.readFileSync(flowsPath, 'utf8');
expect(flowsContent).toContain('export const flows = [');
expect(flowsContent).toContain('// Import your flows here');
expect(indexContent).toContain('ControlPlane.serve([');
expect(indexContent).toContain('// Import your flows here');

// Verify deno.json content
const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
Expand All @@ -66,11 +59,9 @@ describe('createEdgeFunction', () => {
fs.mkdirSync(pgflowFunctionDir, { recursive: true });

const indexPath = path.join(pgflowFunctionDir, 'index.ts');
const flowsPath = path.join(pgflowFunctionDir, 'flows.ts');
const denoJsonPath = path.join(pgflowFunctionDir, 'deno.json');

fs.writeFileSync(indexPath, '// existing content');
fs.writeFileSync(flowsPath, '// existing content');
fs.writeFileSync(denoJsonPath, '// existing content');

const result = await createEdgeFunction({
Expand All @@ -83,7 +74,6 @@ describe('createEdgeFunction', () => {

// Verify files still exist with original content
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
expect(fs.readFileSync(flowsPath, 'utf8')).toBe('// existing content');
expect(fs.readFileSync(denoJsonPath, 'utf8')).toBe('// existing content');
});

Expand All @@ -92,7 +82,6 @@ describe('createEdgeFunction', () => {
fs.mkdirSync(pgflowFunctionDir, { recursive: true });

const indexPath = path.join(pgflowFunctionDir, 'index.ts');
const flowsPath = path.join(pgflowFunctionDir, 'flows.ts');
const denoJsonPath = path.join(pgflowFunctionDir, 'deno.json');

// Only create index.ts
Expand All @@ -103,19 +92,15 @@ describe('createEdgeFunction', () => {
autoConfirm: true,
});

// Should return true because some files were created
// Should return true because deno.json was created
expect(result).toBe(true);

// Verify index.ts was not modified
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');

// Verify flows.ts and deno.json were created
expect(fs.existsSync(flowsPath)).toBe(true);
// Verify deno.json was created
expect(fs.existsSync(denoJsonPath)).toBe(true);

const flowsContent = fs.readFileSync(flowsPath, 'utf8');
expect(flowsContent).toContain('export const flows = [');

const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
expect(denoJsonContent).toContain('"imports"');
});
Expand All @@ -138,7 +123,6 @@ describe('createEdgeFunction', () => {

// Verify files exist
expect(fs.existsSync(path.join(pgflowFunctionDir, 'index.ts'))).toBe(true);
expect(fs.existsSync(path.join(pgflowFunctionDir, 'flows.ts'))).toBe(true);
expect(fs.existsSync(path.join(pgflowFunctionDir, 'deno.json'))).toBe(true);
});

Expand Down
9 changes: 2 additions & 7 deletions pkgs/cli/src/commands/compile/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ export async function fetchFlowSQL(
const errorData = await response.json();
throw new Error(
`Flow '${flowSlug}' not found.\n\n` +
`${errorData.message || 'Did you add it to flows.ts?'}\n\n` +
`${errorData.message || 'Did you add it to supabase/functions/pgflow/index.ts?'}\n\n` +
`Fix:\n` +
`1. Add your flow to supabase/functions/pgflow/flows.ts\n` +
`1. Add your flow to supabase/functions/pgflow/index.ts\n` +
`2. Restart edge functions: supabase functions serve`
);
}
Expand All @@ -45,11 +45,6 @@ export async function fetchFlowSQL(
return await response.json();
} catch (error) {
if (error instanceof Error) {
// Debug: show actual error and URL
console.error(`[DEBUG] Fetch failed for URL: ${url}`);
console.error(`[DEBUG] Error message: ${error.message}`);
console.error(`[DEBUG] Error cause:`, (error as any).cause);

// Check for connection refused errors
if (
error.message.includes('ECONNREFUSED') ||
Expand Down
23 changes: 4 additions & 19 deletions pkgs/cli/src/commands/install/create-edge-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ import chalk from 'chalk';
import { getVersion } from '../../utils/get-version.js';

const INDEX_TS_TEMPLATE = `import { ControlPlane } from '@pgflow/edge-worker';
import { flows } from './flows.ts';

ControlPlane.serve(flows);
`;

const FLOWS_TS_TEMPLATE = `// Import your flows here
// Import your flows here:
// import { MyFlow } from '../_flows/my_flow.ts';

// Export flows array for ControlPlane
export const flows = [
ControlPlane.serve([
// Add your flows here:
// MyFlow,
];
]);
`;

const DENO_JSON_TEMPLATE = (version: string) => `{
Expand Down Expand Up @@ -44,7 +39,6 @@ export async function createEdgeFunction({
const pgflowFunctionDir = path.join(functionsDir, 'pgflow');

const indexPath = path.join(pgflowFunctionDir, 'index.ts');
const flowsPath = path.join(pgflowFunctionDir, 'flows.ts');
const denoJsonPath = path.join(pgflowFunctionDir, 'deno.json');

// Check what needs to be created
Expand All @@ -54,10 +48,6 @@ export async function createEdgeFunction({
filesToCreate.push({ path: indexPath, name: 'index.ts' });
}

if (!fs.existsSync(flowsPath)) {
filesToCreate.push({ path: flowsPath, name: 'flows.ts' });
}

if (!fs.existsSync(denoJsonPath)) {
filesToCreate.push({ path: denoJsonPath, name: 'deno.json' });
}
Expand All @@ -69,7 +59,6 @@ export async function createEdgeFunction({
const detailedMsg = [
'Existing files:',
` ${chalk.dim('•')} ${chalk.bold('supabase/functions/pgflow/index.ts')}`,
` ${chalk.dim('•')} ${chalk.bold('supabase/functions/pgflow/flows.ts')}`,
` ${chalk.dim('•')} ${chalk.bold('supabase/functions/pgflow/deno.json')}`,
].join('\n');

Expand Down Expand Up @@ -113,10 +102,6 @@ export async function createEdgeFunction({
fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE);
}

if (filesToCreate.some((f) => f.path === flowsPath)) {
fs.writeFileSync(flowsPath, FLOWS_TS_TEMPLATE);
}

if (filesToCreate.some((f) => f.path === denoJsonPath)) {
fs.writeFileSync(denoJsonPath, DENO_JSON_TEMPLATE(getVersion()));
}
Expand Down
5 changes: 0 additions & 5 deletions pkgs/cli/supabase/functions/pgflow/flows.ts

This file was deleted.

4 changes: 2 additions & 2 deletions pkgs/cli/supabase/functions/pgflow/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ControlPlane } from '@pgflow/edge-worker';
import { flows } from './flows.ts';
import { TestFlowE2E } from '../_flows/test_flow_e2e.ts';

ControlPlane.serve(flows);
ControlPlane.serve([TestFlowE2E]);
4 changes: 2 additions & 2 deletions pkgs/edge-worker/src/control-plane/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
* @example
* ```typescript
* import { ControlPlane } from '@pgflow/edge-worker';
* import { flows } from './flows.ts';
* import { MyFlow } from '../_flows/my_flow.ts';
*
* ControlPlane.serve(flows);
* ControlPlane.serve([MyFlow]);
* ```
*/

Expand Down
21 changes: 2 additions & 19 deletions pkgs/edge-worker/src/control-plane/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,15 @@ export function serveControlPlane(flows: AnyFlow[]): void {
/**
* Handles GET /flows/:slug requests
*/
function handleGetFlow(
registry: Map<string, AnyFlow>,
slug: string
): Response {
function handleGetFlow(registry: Map<string, AnyFlow>, slug: string): Response {
try {
const flow = registry.get(slug);

if (!flow) {
return jsonResponse(
{
error: 'Flow Not Found',
message: `Flow '${slug}' not found. Did you add it to flows.ts?`,
message: `Flow '${slug}' not found. Did you add it to supabase/functions/pgflow/index.ts?`,
},
404
);
Expand Down Expand Up @@ -134,17 +131,3 @@ function jsonResponse(data: unknown, status: number): Response {
},
});
}

/**
* ControlPlane class for serving flow compilation HTTP API
*/
export class ControlPlane {
/**
* Serves the ControlPlane HTTP API for flow compilation
* @param flows Array of flow definitions to register
*/
static serve(flows: AnyFlow[]): void {
const handler = createControlPlaneHandler(flows);
Deno.serve({}, handler);
}
}
8 changes: 8 additions & 0 deletions pkgs/website/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ export default defineConfig({
link: '/concepts/three-layer-architecture/',
},
{ label: 'Data model', link: '/concepts/data-model/' },
{
label: 'Compilation',
link: '/concepts/compilation/',
},
],
},
{
Expand Down Expand Up @@ -366,6 +370,10 @@ export default defineConfig({
},
{ label: 'Context API', link: '/reference/context/' },
{ label: 'Compile API', link: '/reference/compile-api/' },
{
label: 'ControlPlane API',
link: '/reference/control-plane-api/',
},
{
label: 'Manual installation',
link: '/reference/manual-installation/',
Expand Down
5 changes: 5 additions & 0 deletions pkgs/website/src/content/docs/build/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ Now that you've created your first flow, learn how to structure your code, integ
href="/build/process-arrays-in-parallel/"
description="Process arrays of data in parallel using map steps"
/>
<LinkCard
title="Validation steps"
href="/build/validation-steps/"
description="Use explicit validation steps to fail fast on invalid input"
/>
</CardGrid>

## Starting Flows
Expand Down
88 changes: 88 additions & 0 deletions pkgs/website/src/content/docs/concepts/compilation.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
title: Compilation
description: How pgflow compiles TypeScript flows to SQL via HTTP
sidebar:
order: 25
---

import { Aside } from "@astrojs/starlight/components";

pgflow compiles TypeScript flow definitions to SQL migrations via an HTTP-based architecture. This design eliminates the need for a local Deno installation and ensures compilation uses the same runtime as production.

## How It Works

When you run `pgflow compile greet_user`, the following happens:

```
┌─────────────┐ HTTP GET ┌─────────────────────┐
│ pgflow CLI │ ─────────────────>│ ControlPlane Edge │
│ │ │ Function │
│ │ │ │
│ │ SQL Array │ 1. Look up flow │
│ │ <─────────────────│ 2. Call compileFlow│
│ │ │ 3. Return SQL │
│ │ └─────────────────────┘
│ 4. Write │
│ migration │
└─────────────┘
```

1. **CLI sends request** - The compile command sends an HTTP GET request to:
`http://127.0.0.1:54321/functions/v1/pgflow/flows/{slug}`

2. **ControlPlane looks up flow** - The edge function has a registry of flows (from your `index.ts`). It finds the flow by slug.

3. **Compilation happens in Deno** - The ControlPlane calls `compileFlow()` from `@pgflow/dsl`, which extracts the flow structure and generates SQL.

4. **SQL returned to CLI** - The response contains an array of SQL statements.

5. **CLI writes migration** - The CLI joins the SQL and writes it to `supabase/migrations/{timestamp}_create_{slug}_flow.sql`.

## The ControlPlane Edge Function

The `pgflow` edge function is created during installation and serves as your project's ControlPlane:

```typescript title="supabase/functions/pgflow/index.ts"
import { ControlPlane } from '@pgflow/edge-worker';
import GreetUser from '../_flows/greet_user.ts';
import ProcessOrder from '../_flows/process_order.ts';

ControlPlane.serve([
GreetUser,
ProcessOrder,
]);
```

The ControlPlane:
- **Registers flows** by slug in an in-memory registry
- **Exposes** the `/flows/:slug` endpoint for compilation
- **Returns 404** if a flow slug is not found in the registry
- **Returns 500** with error details if compilation fails

## Why HTTP-Based Compilation?

This architecture provides several benefits:

**No local Deno required** - Users don't need Deno installed on their machine. The Supabase Edge Functions runtime handles everything.

**Same runtime as production** - Flows are compiled using the exact same Deno environment they'll run in, eliminating "works on my machine" issues.

**Consistent dependency resolution** - The `deno.json` import map in your edge function ensures consistent package versions.

**Simpler CLI** - The CLI is a lightweight Node.js package that makes HTTP requests, rather than needing to bundle the entire compilation infrastructure.

## Adding New Flows

To make a flow available for compilation:

1. Create the flow definition in `_flows/`
2. Import it in `supabase/functions/pgflow/index.ts`
3. Add it to the `ControlPlane.serve([...])` array

<Aside type="note">
When running `supabase functions serve`, code changes are detected automatically and the ControlPlane reloads with your new flows.
</Aside>

## API Reference

For detailed HTTP endpoint documentation, see [ControlPlane API Reference](/reference/control-plane-api/).
Loading