Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ARCHITECTURE_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ npx pgflow compile path/to/flow.ts
```typescript
import { createFlowWorker } from '@pgflow/edge-worker';
import { createClient } from '@supabase/supabase-js';
import MyFlow from './_flows/my_flow.ts';
import { MyFlow } from '../../flows/my_flow.ts';

// Create Supabase client
const supabase = createClient(
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"netlify-cli": "^22.1.3",
"nx": "21.2.1",
"prettier": "^2.6.2",
"supabase": "^2.34.3",
"tslib": "^2.3.0",
"typescript": "5.8.3",
"typescript-eslint": "8.34.1",
Expand Down
2 changes: 1 addition & 1 deletion pkgs/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ The installer will:
Convert a TypeScript flow definition into a SQL migration:

```bash
npx pgflow@latest compile supabase/functions/_flows/my_flow.ts
npx pgflow@latest compile my_flow
```

Options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ describe('createEdgeFunction', () => {
expect(fs.existsSync(indexPath)).toBe(true);
expect(fs.existsSync(denoJsonPath)).toBe(true);

// Verify index.ts content (inline flow registration, no flows.ts)
// Verify index.ts content (namespace import from flows directory)
const indexContent = fs.readFileSync(indexPath, 'utf8');
expect(indexContent).toContain("import { ControlPlane } from '@pgflow/edge-worker'");
expect(indexContent).toContain('ControlPlane.serve([');
expect(indexContent).toContain('// Import your flows here');
expect(indexContent).toContain("import * as flows from '../../flows/index.ts'");
expect(indexContent).toContain('ControlPlane.serve(flows)');

// Verify deno.json content
const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
Expand Down
150 changes: 150 additions & 0 deletions pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import fs from 'fs';
import path from 'path';
import os from 'os';
import { createFlowsDirectory } from '../../../src/commands/install/create-flows-directory';

describe('createFlowsDirectory', () => {
let tempDir: string;
let supabasePath: string;
let flowsDir: string;

beforeEach(() => {
// Create a temporary directory for testing
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'pgflow-test-'));
supabasePath = path.join(tempDir, 'supabase');
flowsDir = path.join(supabasePath, 'flows');
});

afterEach(() => {
// Clean up the temporary directory
fs.rmSync(tempDir, { recursive: true, force: true });
});

it('should create both files when none exist', async () => {
const result = await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

// Should return true because files were created
expect(result).toBe(true);

// Verify directory was created
expect(fs.existsSync(flowsDir)).toBe(true);

// Verify all files exist
const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');

expect(fs.existsSync(indexPath)).toBe(true);
expect(fs.existsSync(exampleFlowPath)).toBe(true);
});

it('should create index.ts with barrel export pattern', async () => {
await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

const indexPath = path.join(flowsDir, 'index.ts');
const indexContent = fs.readFileSync(indexPath, 'utf8');

// Should have export for ExampleFlow
expect(indexContent).toContain("export { ExampleFlow } from './example_flow.ts'");
// Should have documenting comment
expect(indexContent).toContain('Re-export all flows');
});

it('should create example_flow.ts with named export', async () => {
await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowContent = fs.readFileSync(exampleFlowPath, 'utf8');

// Should use named export (not default)
expect(exampleFlowContent).toContain('export const ExampleFlow');
// Should import Flow from @pgflow/dsl
expect(exampleFlowContent).toContain("import { Flow } from '@pgflow/dsl'");
// Should have correct slug
expect(exampleFlowContent).toContain("slug: 'example_flow'");
// Should have input type
expect(exampleFlowContent).toContain('type Input');
// Should have at least one step
expect(exampleFlowContent).toContain('.step(');
});

it('should not create files when they already exist', async () => {
// Pre-create the directory and files
fs.mkdirSync(flowsDir, { recursive: true });

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');

fs.writeFileSync(indexPath, '// existing content');
fs.writeFileSync(exampleFlowPath, '// existing content');

const result = await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

// Should return false because no changes were needed
expect(result).toBe(false);

// Verify files still exist with original content
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
expect(fs.readFileSync(exampleFlowPath, 'utf8')).toBe('// existing content');
});

it('should create only missing files when some already exist', async () => {
// Pre-create the directory and one file
fs.mkdirSync(flowsDir, { recursive: true });

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');

// Only create index.ts
fs.writeFileSync(indexPath, '// existing content');

const result = await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

// Should return true because example_flow.ts was created
expect(result).toBe(true);

// Verify index.ts was not modified
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');

// Verify example_flow.ts was created
expect(fs.existsSync(exampleFlowPath)).toBe(true);

const exampleContent = fs.readFileSync(exampleFlowPath, 'utf8');
expect(exampleContent).toContain('export const ExampleFlow');
});

it('should create parent directories if they do not exist', async () => {
// Don't create anything - let the function create it all
expect(fs.existsSync(supabasePath)).toBe(false);

const result = await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

expect(result).toBe(true);

// Verify all parent directories were created
expect(fs.existsSync(supabasePath)).toBe(true);
expect(fs.existsSync(flowsDir)).toBe(true);

// Verify files exist
expect(fs.existsSync(path.join(flowsDir, 'index.ts'))).toBe(true);
expect(fs.existsSync(path.join(flowsDir, 'example_flow.ts'))).toBe(true);
});
});
8 changes: 2 additions & 6 deletions pkgs/cli/src/commands/install/create-edge-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ import chalk from 'chalk';
import { getVersion } from '../../utils/get-version.js';

const INDEX_TS_TEMPLATE = `import { ControlPlane } from '@pgflow/edge-worker';
// Import your flows here:
// import { MyFlow } from '../_flows/my_flow.ts';
import * as flows from '../../flows/index.ts';

ControlPlane.serve([
// Add your flows here:
// MyFlow,
]);
ControlPlane.serve(flows);
`;

const DENO_JSON_TEMPLATE = (version: string) => `{
Expand Down
91 changes: 91 additions & 0 deletions pkgs/cli/src/commands/install/create-flows-directory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import fs from 'fs';
import path from 'path';
import { log, confirm } from '@clack/prompts';
import chalk from 'chalk';

const INDEX_TS_TEMPLATE = `// Re-export all flows from this directory
// Example: export { MyFlow } from './my_flow.ts';

export { ExampleFlow } from './example_flow.ts';
`;

const EXAMPLE_FLOW_TEMPLATE = `import { Flow } from '@pgflow/dsl';

type Input = { name: string };

export const ExampleFlow = new Flow<Input>({ slug: 'example_flow' })
.step({ slug: 'greet' }, (input) => \`Hello, \${input.run.name}!\`);
`;

export async function createFlowsDirectory({
supabasePath,
autoConfirm = false,
}: {
supabasePath: string;
autoConfirm?: boolean;
}): Promise<boolean> {
const flowsDir = path.join(supabasePath, 'flows');

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');

// Relative paths for display
const relativeFlowsDir = 'supabase/flows';
const relativeIndexPath = `${relativeFlowsDir}/index.ts`;
const relativeExampleFlowPath = `${relativeFlowsDir}/example_flow.ts`;

// Check what needs to be created
const filesToCreate: Array<{ path: string; relativePath: string }> = [];

if (!fs.existsSync(indexPath)) {
filesToCreate.push({ path: indexPath, relativePath: relativeIndexPath });
}

if (!fs.existsSync(exampleFlowPath)) {
filesToCreate.push({ path: exampleFlowPath, relativePath: relativeExampleFlowPath });
}

// If all files exist, return success
if (filesToCreate.length === 0) {
log.success('Flows directory already up to date');
return false;
}

// Show preview and ask for confirmation only when not auto-confirming
if (!autoConfirm) {
const summaryMsg = [
`Create ${chalk.cyan('flows/')} ${chalk.dim('(flow definitions directory)')}:`,
'',
...filesToCreate.map((file) => ` ${chalk.bold(path.basename(file.relativePath))}`),
].join('\n');

log.info(summaryMsg);

const confirmResult = await confirm({
message: `Create flows/?`,
});

if (confirmResult !== true) {
log.warn('Flows directory installation skipped');
return false;
}
}

// Create the directory if it doesn't exist
if (!fs.existsSync(flowsDir)) {
fs.mkdirSync(flowsDir, { recursive: true });
}

// Create files
if (filesToCreate.some((f) => f.path === indexPath)) {
fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE);
}

if (filesToCreate.some((f) => f.path === exampleFlowPath)) {
fs.writeFileSync(exampleFlowPath, EXAMPLE_FLOW_TEMPLATE);
}

log.success('Flows directory created');

return true;
}
9 changes: 8 additions & 1 deletion pkgs/cli/src/commands/install/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { copyMigrations } from './copy-migrations.js';
import { updateConfigToml } from './update-config-toml.js';
import { updateEnvFile } from './update-env-file.js';
import { createEdgeFunction } from './create-edge-function.js';
import { createFlowsDirectory } from './create-flows-directory.js';
import { supabasePathPrompt } from './supabase-path-prompt.js';

export default (program: Command) => {
Expand Down Expand Up @@ -34,6 +35,7 @@ export default (program: Command) => {
'',
` • Update ${chalk.cyan('supabase/config.toml')} ${chalk.dim('(enable pooler, per_worker runtime)')}`,
` • Add pgflow migrations to ${chalk.cyan('supabase/migrations/')}`,
` • Create ${chalk.cyan('supabase/flows/')} ${chalk.dim('(flow definitions with example)')}`,
` • Create Control Plane in ${chalk.cyan('supabase/functions/pgflow/')}`,
` • Configure ${chalk.cyan('supabase/functions/.env')}`,
'',
Expand Down Expand Up @@ -73,6 +75,11 @@ export default (program: Command) => {
autoConfirm: true,
});

const flowsDirectory = await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

const edgeFunction = await createEdgeFunction({
supabasePath,
autoConfirm: true,
Expand All @@ -86,7 +93,7 @@ export default (program: Command) => {
// Step 4: Show completion message
const outroMessages: string[] = [];

if (migrations || configUpdate || edgeFunction || envFile) {
if (migrations || configUpdate || flowsDirectory || edgeFunction || envFile) {
outroMessages.push(chalk.green.bold('✓ Installation complete!'));
} else {
outroMessages.push(
Expand Down
2 changes: 2 additions & 0 deletions pkgs/cli/supabase/flows/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Re-export all flows from this directory
export { TestFlowE2E } from './test_flow_e2e.ts';
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ export const TestFlowE2E = new Flow<{ value: string }>({
}).step({ slug: 'step1' }, async (input) => ({
result: `processed: ${input.run.value}`,
}));

export default TestFlowE2E;
4 changes: 2 additions & 2 deletions pkgs/cli/supabase/functions/pgflow/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ControlPlane } from '@pgflow/edge-worker';
import { TestFlowE2E } from '../_flows/test_flow_e2e.ts';
import * as flows from '../../flows/index.ts';

ControlPlane.serve([TestFlowE2E]);
ControlPlane.serve(flows);
14 changes: 12 additions & 2 deletions pkgs/edge-worker/src/control-plane/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@
*
* @example
* ```typescript
* // Using namespace import (recommended)
* import { ControlPlane } from '@pgflow/edge-worker';
* import { MyFlow } from '../_flows/my_flow.ts';
* import * as flows from '../../flows/index.ts';
*
* ControlPlane.serve(flows);
* ```
*
* @example
* ```typescript
* // Using array (legacy)
* import { ControlPlane } from '@pgflow/edge-worker';
* import { MyFlow } from '../../flows/my_flow.ts';
*
* ControlPlane.serve([MyFlow]);
* ```
Expand All @@ -21,7 +31,7 @@ import { serveControlPlane } from './server.js';
export const ControlPlane = {
/**
* Start the ControlPlane HTTP server
* @param flows Array of flow definitions to register
* @param flowsInput Array or object of flow definitions to register
*/
serve: serveControlPlane,
};
Loading