diff --git a/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts b/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts
new file mode 100644
index 000000000..2bb486053
--- /dev/null
+++ b/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts
@@ -0,0 +1,192 @@
+import { describe, it, expect, beforeEach, afterEach } from 'vitest';
+import fs from 'fs';
+import path from 'path';
+import os from 'os';
+import { createExampleWorker } from '../../../src/commands/install/create-example-worker';
+import { getVersion } from '../../../src/utils/get-version';
+
+describe('createExampleWorker', () => {
+ let tempDir: string;
+ let supabasePath: string;
+ let workerDir: string;
+
+ beforeEach(() => {
+ // Create a temporary directory for testing
+ tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'pgflow-test-'));
+ supabasePath = path.join(tempDir, 'supabase');
+ workerDir = path.join(supabasePath, 'functions', 'greet-user-worker');
+ });
+
+ afterEach(() => {
+ // Clean up the temporary directory
+ fs.rmSync(tempDir, { recursive: true, force: true });
+ });
+
+ it('should create both files when none exist', async () => {
+ const result = await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ // Should return true because files were created
+ expect(result).toBe(true);
+
+ // Verify directory was created
+ expect(fs.existsSync(workerDir)).toBe(true);
+
+ // Verify all files exist
+ const indexPath = path.join(workerDir, 'index.ts');
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+
+ expect(fs.existsSync(indexPath)).toBe(true);
+ expect(fs.existsSync(denoJsonPath)).toBe(true);
+ });
+
+ it('should create index.ts that imports GreetUser and starts EdgeWorker', async () => {
+ await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ const indexPath = path.join(workerDir, 'index.ts');
+ const indexContent = fs.readFileSync(indexPath, 'utf8');
+
+ // Should import EdgeWorker
+ expect(indexContent).toContain("import { EdgeWorker } from '@pgflow/edge-worker'");
+ // Should import GreetUser from flows directory
+ expect(indexContent).toContain("import { GreetUser } from '../../flows/greet-user.ts'");
+ // Should start EdgeWorker with GreetUser
+ expect(indexContent).toContain('EdgeWorker.start(GreetUser)');
+ });
+
+ it('should create deno.json with correct import mappings', async () => {
+ await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+ const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
+ const denoJson = JSON.parse(denoJsonContent);
+
+ // Verify imports exist
+ expect(denoJson.imports).toBeDefined();
+ expect(denoJson.imports['@pgflow/core']).toBeDefined();
+ expect(denoJson.imports['@pgflow/dsl']).toBeDefined();
+ expect(denoJson.imports['@pgflow/edge-worker']).toBeDefined();
+ });
+
+ it('should inject package version instead of @latest in deno.json', async () => {
+ await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+ const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
+ const denoJson = JSON.parse(denoJsonContent);
+
+ const version = getVersion();
+
+ // Verify version is not 'unknown'
+ expect(version).not.toBe('unknown');
+
+ // Verify that @latest is NOT used
+ expect(denoJsonContent).not.toContain('@latest');
+
+ // Verify that the actual version is used
+ expect(denoJson.imports['@pgflow/core']).toBe(`npm:@pgflow/core@${version}`);
+ expect(denoJson.imports['@pgflow/dsl']).toBe(`npm:@pgflow/dsl@${version}`);
+ expect(denoJson.imports['@pgflow/edge-worker']).toBe(`jsr:@pgflow/edge-worker@${version}`);
+ });
+
+ it('should not create files when they already exist', async () => {
+ // Pre-create the directory and files
+ fs.mkdirSync(workerDir, { recursive: true });
+
+ const indexPath = path.join(workerDir, 'index.ts');
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+
+ fs.writeFileSync(indexPath, '// existing content');
+ fs.writeFileSync(denoJsonPath, '// existing content');
+
+ const result = await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ // Should return false because no changes were needed
+ expect(result).toBe(false);
+
+ // Verify files still exist with original content
+ expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
+ expect(fs.readFileSync(denoJsonPath, 'utf8')).toBe('// existing content');
+ });
+
+ it('should create only missing files when some already exist', async () => {
+ // Pre-create the directory and one file
+ fs.mkdirSync(workerDir, { recursive: true });
+
+ const indexPath = path.join(workerDir, 'index.ts');
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+
+ // Only create index.ts
+ fs.writeFileSync(indexPath, '// existing content');
+
+ const result = await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ // Should return true because deno.json was created
+ expect(result).toBe(true);
+
+ // Verify index.ts was not modified
+ expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
+
+ // Verify deno.json was created
+ expect(fs.existsSync(denoJsonPath)).toBe(true);
+
+ const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
+ expect(denoJsonContent).toContain('"imports"');
+ });
+
+ it('should create parent directories if they do not exist', async () => {
+ // Don't create anything - let the function create it all
+ expect(fs.existsSync(supabasePath)).toBe(false);
+
+ const result = await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ expect(result).toBe(true);
+
+ // Verify all parent directories were created
+ expect(fs.existsSync(supabasePath)).toBe(true);
+ expect(fs.existsSync(path.join(supabasePath, 'functions'))).toBe(true);
+ expect(fs.existsSync(workerDir)).toBe(true);
+
+ // Verify files exist
+ expect(fs.existsSync(path.join(workerDir, 'index.ts'))).toBe(true);
+ expect(fs.existsSync(path.join(workerDir, 'deno.json'))).toBe(true);
+ });
+
+ it('should include subpath exports for Deno import mapping', async () => {
+ await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+ const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8');
+ const denoJson = JSON.parse(denoJsonContent);
+
+ const version = getVersion();
+
+ // Verify subpath exports include versions (needed for proper Deno import mapping)
+ expect(denoJson.imports['@pgflow/core/']).toBe(`npm:@pgflow/core@${version}/`);
+ expect(denoJson.imports['@pgflow/dsl/']).toBe(`npm:@pgflow/dsl@${version}/`);
+ expect(denoJson.imports['@pgflow/edge-worker/']).toBe(`jsr:@pgflow/edge-worker@${version}/`);
+ });
+});
diff --git a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts
index e7dd42444..4dc942afc 100644
--- a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts
+++ b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts
@@ -35,10 +35,10 @@ describe('createFlowsDirectory', () => {
// Verify all files exist
const indexPath = path.join(flowsDir, 'index.ts');
- const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
expect(fs.existsSync(indexPath)).toBe(true);
- expect(fs.existsSync(exampleFlowPath)).toBe(true);
+ expect(fs.existsSync(greetUserPath)).toBe(true);
});
it('should create index.ts with barrel export pattern', async () => {
@@ -50,31 +50,49 @@ describe('createFlowsDirectory', () => {
const indexPath = path.join(flowsDir, 'index.ts');
const indexContent = fs.readFileSync(indexPath, 'utf8');
- // Should have export for ExampleFlow
- expect(indexContent).toContain("export { ExampleFlow } from './example-flow.ts';");
+ // Should have export for GreetUser
+ expect(indexContent).toContain("export { GreetUser } from './greet-user.ts';");
// Should have documenting comment
expect(indexContent).toContain('Re-export all flows');
});
- it('should create example-flow.ts with named export', async () => {
+ it('should create greet-user.ts with named export', async () => {
await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});
- const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
- const exampleFlowContent = fs.readFileSync(exampleFlowPath, 'utf8');
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
+ const greetUserContent = fs.readFileSync(greetUserPath, 'utf8');
// Should use named export (not default)
- expect(exampleFlowContent).toContain('export const ExampleFlow');
+ expect(greetUserContent).toContain('export const GreetUser');
// Should import Flow from @pgflow/dsl
- expect(exampleFlowContent).toContain("import { Flow } from '@pgflow/dsl'");
+ expect(greetUserContent).toContain("import { Flow } from '@pgflow/dsl'");
// Should have correct slug
- expect(exampleFlowContent).toContain("slug: 'exampleFlow'");
- // Should have input type
- expect(exampleFlowContent).toContain('type Input');
- // Should have at least one step
- expect(exampleFlowContent).toContain('.step(');
+ expect(greetUserContent).toContain("slug: 'greetUser'");
+ // Should have input type with firstName and lastName
+ expect(greetUserContent).toContain('type Input');
+ expect(greetUserContent).toContain('firstName');
+ expect(greetUserContent).toContain('lastName');
+ });
+
+ it('should create greet-user.ts with two steps showing dependsOn', async () => {
+ await createFlowsDirectory({
+ supabasePath,
+ autoConfirm: true,
+ });
+
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
+ const greetUserContent = fs.readFileSync(greetUserPath, 'utf8');
+
+ // Should have two steps
+ expect(greetUserContent).toContain("slug: 'fullName'");
+ expect(greetUserContent).toContain("slug: 'greeting'");
+ // Second step should depend on first
+ expect(greetUserContent).toContain("dependsOn: ['fullName']");
+ // Second step should access result from first step
+ expect(greetUserContent).toContain('input.fullName');
});
it('should not create files when they already exist', async () => {
@@ -82,10 +100,10 @@ describe('createFlowsDirectory', () => {
fs.mkdirSync(flowsDir, { recursive: true });
const indexPath = path.join(flowsDir, 'index.ts');
- const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
fs.writeFileSync(indexPath, '// existing content');
- fs.writeFileSync(exampleFlowPath, '// existing content');
+ fs.writeFileSync(greetUserPath, '// existing content');
const result = await createFlowsDirectory({
supabasePath,
@@ -97,7 +115,7 @@ describe('createFlowsDirectory', () => {
// Verify files still exist with original content
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
- expect(fs.readFileSync(exampleFlowPath, 'utf8')).toBe('// existing content');
+ expect(fs.readFileSync(greetUserPath, 'utf8')).toBe('// existing content');
});
it('should create only missing files when some already exist', async () => {
@@ -105,7 +123,7 @@ describe('createFlowsDirectory', () => {
fs.mkdirSync(flowsDir, { recursive: true });
const indexPath = path.join(flowsDir, 'index.ts');
- const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
// Only create index.ts
fs.writeFileSync(indexPath, '// existing content');
@@ -115,17 +133,17 @@ describe('createFlowsDirectory', () => {
autoConfirm: true,
});
- // Should return true because example-flow.ts was created
+ // Should return true because greet-user.ts was created
expect(result).toBe(true);
// Verify index.ts was not modified
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');
- // Verify example-flow.ts was created
- expect(fs.existsSync(exampleFlowPath)).toBe(true);
+ // Verify greet-user.ts was created
+ expect(fs.existsSync(greetUserPath)).toBe(true);
- const exampleContent = fs.readFileSync(exampleFlowPath, 'utf8');
- expect(exampleContent).toContain('export const ExampleFlow');
+ const greetUserContent = fs.readFileSync(greetUserPath, 'utf8');
+ expect(greetUserContent).toContain('export const GreetUser');
});
it('should create parent directories if they do not exist', async () => {
@@ -145,6 +163,6 @@ describe('createFlowsDirectory', () => {
// Verify files exist
expect(fs.existsSync(path.join(flowsDir, 'index.ts'))).toBe(true);
- expect(fs.existsSync(path.join(flowsDir, 'example-flow.ts'))).toBe(true);
+ expect(fs.existsSync(path.join(flowsDir, 'greet-user.ts'))).toBe(true);
});
});
diff --git a/pkgs/cli/examples/async-function-hang.ts b/pkgs/cli/examples/async-function-hang.ts
deleted file mode 100644
index 33babbaaa..000000000
--- a/pkgs/cli/examples/async-function-hang.ts
+++ /dev/null
@@ -1,48 +0,0 @@
-import { Flow } from '@pgflow/dsl';
-
-type Input = {
- logo_url: string;
- company_slug: string;
- company_id: string;
-};
-
-// Dummy async task that simulates real async work
-async function processCompanyLogoTask(input: { logo_url: string; company_slug: string }) {
- // Simulate some async work with a promise
- await new Promise((resolve) => setTimeout(resolve, 100));
- return {
- file_path: `/uploads/${input.company_slug}/logo.png`,
- };
-}
-
-// Another dummy async task
-async function updateCompanyLogoUrlTask(input: { company_id: string; file_path: string }) {
- // Simulate some async work
- await new Promise((resolve) => setTimeout(resolve, 50));
- return {
- success: true,
- company_id: input.company_id,
- logo_url: input.file_path,
- };
-}
-
-export default new Flow({
- slug: 'upload_company_logo',
- maxAttempts: 3,
- timeout: 60,
- baseDelay: 2,
-})
- .step(
- { slug: 'process_company_logo' },
- async (input) => await processCompanyLogoTask({
- logo_url: input.run.logo_url,
- company_slug: input.run.company_slug,
- })
- )
- .step(
- { slug: 'update_company_logo_url', dependsOn: ['process_company_logo'] },
- async (input) => await updateCompanyLogoUrlTask({
- company_id: input.run.company_id,
- file_path: (input.process_company_logo as { file_path: string }).file_path,
- })
- );
diff --git a/pkgs/cli/src/commands/install/create-example-worker.ts b/pkgs/cli/src/commands/install/create-example-worker.ts
new file mode 100644
index 000000000..137ccc722
--- /dev/null
+++ b/pkgs/cli/src/commands/install/create-example-worker.ts
@@ -0,0 +1,99 @@
+import fs from 'fs';
+import path from 'path';
+import { log, confirm } from '@clack/prompts';
+import chalk from 'chalk';
+import { getVersion } from '../../utils/get-version.js';
+
+const INDEX_TS_TEMPLATE = `import { EdgeWorker } from '@pgflow/edge-worker';
+import { GreetUser } from '../../flows/greet-user.ts';
+
+EdgeWorker.start(GreetUser);
+`;
+
+const DENO_JSON_TEMPLATE = (version: string) => `{
+ "imports": {
+ "@pgflow/core": "npm:@pgflow/core@${version}",
+ "@pgflow/core/": "npm:@pgflow/core@${version}/",
+ "@pgflow/dsl": "npm:@pgflow/dsl@${version}",
+ "@pgflow/dsl/": "npm:@pgflow/dsl@${version}/",
+ "@pgflow/dsl/supabase": "npm:@pgflow/dsl@${version}/supabase",
+ "@pgflow/edge-worker": "jsr:@pgflow/edge-worker@${version}",
+ "@pgflow/edge-worker/": "jsr:@pgflow/edge-worker@${version}/",
+ "@pgflow/edge-worker/_internal": "jsr:@pgflow/edge-worker@${version}/_internal"
+ }
+}
+`;
+
+export async function createExampleWorker({
+ supabasePath,
+ autoConfirm = false,
+}: {
+ supabasePath: string;
+ autoConfirm?: boolean;
+}): Promise {
+ const functionsDir = path.join(supabasePath, 'functions');
+ const workerDir = path.join(functionsDir, 'greet-user-worker');
+
+ const indexPath = path.join(workerDir, 'index.ts');
+ const denoJsonPath = path.join(workerDir, 'deno.json');
+
+ // Relative paths for display
+ const relativeWorkerDir = 'supabase/functions/greet-user-worker';
+ const relativeIndexPath = `${relativeWorkerDir}/index.ts`;
+ const relativeDenoJsonPath = `${relativeWorkerDir}/deno.json`;
+
+ // Check what needs to be created
+ const filesToCreate: Array<{ path: string; relativePath: string }> = [];
+
+ if (!fs.existsSync(indexPath)) {
+ filesToCreate.push({ path: indexPath, relativePath: relativeIndexPath });
+ }
+
+ if (!fs.existsSync(denoJsonPath)) {
+ filesToCreate.push({ path: denoJsonPath, relativePath: relativeDenoJsonPath });
+ }
+
+ // If all files exist, return success
+ if (filesToCreate.length === 0) {
+ log.success('Example worker already up to date');
+ return false;
+ }
+
+ // Show preview and ask for confirmation only when not auto-confirming
+ if (!autoConfirm) {
+ const summaryMsg = [
+ `Create ${chalk.cyan('functions/greet-user-worker/')} ${chalk.dim('(example worker for GreetUser flow)')}:`,
+ '',
+ ...filesToCreate.map((file) => ` ${chalk.bold(path.basename(file.relativePath))}`),
+ ].join('\n');
+
+ log.info(summaryMsg);
+
+ const confirmResult = await confirm({
+ message: `Create functions/greet-user-worker/?`,
+ });
+
+ if (confirmResult !== true) {
+ log.warn('Example worker installation skipped');
+ return false;
+ }
+ }
+
+ // Create the directory if it doesn't exist
+ if (!fs.existsSync(workerDir)) {
+ fs.mkdirSync(workerDir, { recursive: true });
+ }
+
+ // Create files
+ if (filesToCreate.some((f) => f.path === indexPath)) {
+ fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE);
+ }
+
+ if (filesToCreate.some((f) => f.path === denoJsonPath)) {
+ fs.writeFileSync(denoJsonPath, DENO_JSON_TEMPLATE(getVersion()));
+ }
+
+ log.success('Example worker created');
+
+ return true;
+}
diff --git a/pkgs/cli/src/commands/install/create-flows-directory.ts b/pkgs/cli/src/commands/install/create-flows-directory.ts
index 32f6b80bd..0b2014451 100644
--- a/pkgs/cli/src/commands/install/create-flows-directory.ts
+++ b/pkgs/cli/src/commands/install/create-flows-directory.ts
@@ -6,15 +6,27 @@ import chalk from 'chalk';
const INDEX_TS_TEMPLATE = `// Re-export all flows from this directory
// Example: export { MyFlow } from './my-flow.ts';
-export { ExampleFlow } from './example-flow.ts';
+export { GreetUser } from './greet-user.ts';
`;
-const EXAMPLE_FLOW_TEMPLATE = `import { Flow } from '@pgflow/dsl';
-
-type Input = { name: string };
-
-export const ExampleFlow = new Flow({ slug: 'exampleFlow' })
- .step({ slug: 'greet' }, (input) => \`Hello, \${input.run.name}!\`);
+const GREET_USER_TEMPLATE = `import { Flow } from '@pgflow/dsl';
+
+type Input = {
+ firstName: string;
+ lastName: string;
+};
+
+export const GreetUser = new Flow({
+ slug: 'greetUser',
+})
+ .step(
+ { slug: 'fullName' },
+ (input) => \`\${input.run.firstName} \${input.run.lastName}\`
+ )
+ .step(
+ { slug: 'greeting', dependsOn: ['fullName'] },
+ (input) => \`Hello, \${input.fullName}!\`
+ );
`;
export async function createFlowsDirectory({
@@ -27,12 +39,12 @@ export async function createFlowsDirectory({
const flowsDir = path.join(supabasePath, 'flows');
const indexPath = path.join(flowsDir, 'index.ts');
- const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
+ const greetUserPath = path.join(flowsDir, 'greet-user.ts');
// Relative paths for display
const relativeFlowsDir = 'supabase/flows';
const relativeIndexPath = `${relativeFlowsDir}/index.ts`;
- const relativeExampleFlowPath = `${relativeFlowsDir}/example-flow.ts`;
+ const relativeGreetUserPath = `${relativeFlowsDir}/greet-user.ts`;
// Check what needs to be created
const filesToCreate: Array<{ path: string; relativePath: string }> = [];
@@ -41,8 +53,8 @@ export async function createFlowsDirectory({
filesToCreate.push({ path: indexPath, relativePath: relativeIndexPath });
}
- if (!fs.existsSync(exampleFlowPath)) {
- filesToCreate.push({ path: exampleFlowPath, relativePath: relativeExampleFlowPath });
+ if (!fs.existsSync(greetUserPath)) {
+ filesToCreate.push({ path: greetUserPath, relativePath: relativeGreetUserPath });
}
// If all files exist, return success
@@ -81,8 +93,8 @@ export async function createFlowsDirectory({
fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE);
}
- if (filesToCreate.some((f) => f.path === exampleFlowPath)) {
- fs.writeFileSync(exampleFlowPath, EXAMPLE_FLOW_TEMPLATE);
+ if (filesToCreate.some((f) => f.path === greetUserPath)) {
+ fs.writeFileSync(greetUserPath, GREET_USER_TEMPLATE);
}
log.success('Flows directory created');
diff --git a/pkgs/cli/src/commands/install/index.ts b/pkgs/cli/src/commands/install/index.ts
index 95fdbb1c2..c4fd61c1d 100644
--- a/pkgs/cli/src/commands/install/index.ts
+++ b/pkgs/cli/src/commands/install/index.ts
@@ -6,6 +6,7 @@ import { updateConfigToml } from './update-config-toml.js';
import { updateEnvFile } from './update-env-file.js';
import { createEdgeFunction } from './create-edge-function.js';
import { createFlowsDirectory } from './create-flows-directory.js';
+import { createExampleWorker } from './create-example-worker.js';
import { supabasePathPrompt } from './supabase-path-prompt.js';
export default (program: Command) => {
@@ -35,8 +36,9 @@ export default (program: Command) => {
'',
` • Update ${chalk.cyan('supabase/config.toml')} ${chalk.dim('(enable pooler, per_worker runtime)')}`,
` • Add pgflow migrations to ${chalk.cyan('supabase/migrations/')}`,
- ` • Create ${chalk.cyan('supabase/flows/')} ${chalk.dim('(flow definitions with example)')}`,
+ ` • Create ${chalk.cyan('supabase/flows/')} ${chalk.dim('(flow definitions with GreetUser example)')}`,
` • Create Control Plane in ${chalk.cyan('supabase/functions/pgflow/')}`,
+ ` • Create ${chalk.cyan('supabase/functions/greet-user-worker/')} ${chalk.dim('(example worker)')}`,
` • Configure ${chalk.cyan('supabase/functions/.env')}`,
'',
` ${chalk.green('✓ Safe to re-run - completed steps will be skipped')}`,
@@ -85,6 +87,11 @@ export default (program: Command) => {
autoConfirm: true,
});
+ const exampleWorker = await createExampleWorker({
+ supabasePath,
+ autoConfirm: true,
+ });
+
const envFile = await updateEnvFile({
supabasePath,
autoConfirm: true,
@@ -93,7 +100,7 @@ export default (program: Command) => {
// Step 4: Show completion message
const outroMessages: string[] = [];
- if (migrations || configUpdate || flowsDirectory || edgeFunction || envFile) {
+ if (migrations || configUpdate || flowsDirectory || edgeFunction || exampleWorker || envFile) {
outroMessages.push(chalk.green.bold('✓ Installation complete!'));
} else {
outroMessages.push(
@@ -122,7 +129,7 @@ export default (program: Command) => {
}
outroMessages.push(
- ` ${stepNumber}. Create your first flow: ${chalk.blue.underline('https://pgflow.dev/getting-started/create-first-flow/')}`
+ ` ${stepNumber}. Run the example: ${chalk.blue.underline('https://pgflow.dev/get-started/flows/quickstart/')}`
);
outro(outroMessages.join('\n'));
diff --git a/pkgs/website/astro.config.mjs b/pkgs/website/astro.config.mjs
index 034b0101d..2e71facc3 100644
--- a/pkgs/website/astro.config.mjs
+++ b/pkgs/website/astro.config.mjs
@@ -166,6 +166,7 @@ export default defineConfig({
],
promote: [
'get-started/installation',
+ 'get-started/flows/quickstart',
'get-started/flows/create-flow',
'get-started/flows/compile-flow',
'get-started/flows/run-flow',
diff --git a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx
index 45adbebac..fcadb5fcf 100644
--- a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx
+++ b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx
@@ -1,93 +1,140 @@
---
-title: Create your first flow
-description: Learn how to define a flow using pgflow's TypeScript DSL
+title: Understanding Flows
+description: Learn how the scaffolded GreetUser flow works
sidebar:
order: 20
---
-import { Aside, Steps, Tabs, TabItem, CardGrid, LinkCard } from "@astrojs/starlight/components";
+import { Aside, CardGrid, LinkCard } from "@astrojs/starlight/components";
import { FileTree } from '@astrojs/starlight/components';
-Now that pgflow is installed, let's create a simple hello world flow that demonstrates the core concepts.
+The `pgflow install` command created an example flow that demonstrates core pgflow concepts. Let's understand how it works.
-Our flow will:
+
-1. Take a person's **first name** and **last name** as input
-2. Format a **full name**
-3. Create a personalized **greeting**
+## Project structure
-
+
+- supabase
+ - flows
+ - greet-user.ts (flow definition)
+ - index.ts (re-exports all flows)
+ - functions
+ - pgflow
+ - index.ts (Control Plane)
+ - greet-user-worker
+ - index.ts (Edge Worker)
+
-## Creating a Simple Greeting Flow
+## The GreetUser flow
-
+Open `supabase/flows/greet-user.ts`:
+```typescript title="supabase/flows/greet-user.ts"
+import { Flow } from '@pgflow/dsl';
-1. ### Set up your project structure
+type Input = {
+ firstName: string;
+ lastName: string;
+};
- After running `pgflow install`, you already have a `flows/` directory with an example. Let's create a new flow:
+export const GreetUser = new Flow({
+ slug: 'greetUser',
+})
+ .step(
+ { slug: 'fullName' },
+ (input) => `${input.run.firstName} ${input.run.lastName}`
+ )
+ .step(
+ { slug: 'greeting', dependsOn: ['fullName'] },
+ (input) => `Hello, ${input.fullName}!`
+ );
+```
-
- - supabase
- - flows
- - greet-user.ts (your flow definition)
- - index.ts (re-exports all flows)
- - functions
- - pgflow
- - index.ts (Control Plane)
- - deno.json
-
+Let's break down each part:
-2. ### Create the flow definition
+### Input type
- Create a file called `supabase/flows/greet-user.ts` with this content:
+```typescript
+type Input = {
+ firstName: string;
+ lastName: string;
+};
+```
- ```typescript title="supabase/flows/greet-user.ts"
- import { Flow } from '@pgflow/dsl';
+This defines what data the flow accepts when started. The input is always accessible via `input.run` in any step.
- type Input = {
- firstName: string;
- lastName: string;
- };
+### Flow constructor
- export const GreetUser = new Flow({
- slug: 'greetUser',
- })
- .step(
- { slug: 'fullName' },
- (input) => `${input.run.firstName} ${input.run.lastName}`
- )
- .step(
- { slug: 'greeting', dependsOn: ['fullName'] },
- (input) => `Hello, ${input.fullName}!`
- );
- ```
+```typescript
+export const GreetUser = new Flow({
+ slug: 'greetUser',
+})
+```
-3. ### Register the flow
+- **Named export**: Flows use named exports (not default), making them easy to import elsewhere
+- **Generic type**: `Flow` provides type safety for the input
+- **Slug**: A unique identifier used in the database and when starting flows
- Add your flow to `supabase/flows/index.ts`:
+### First step: fullName
- ```diff lang="typescript" title="supabase/flows/index.ts"
- // Re-export all flows from this directory
- // Example: export { MyFlow } from './my-flow.ts';
+```typescript
+.step(
+ { slug: 'fullName' },
+ (input) => `${input.run.firstName} ${input.run.lastName}`
+)
+```
- export { ExampleFlow } from './example-flow.ts';
- +export { GreetUser } from './greet-user.ts';
- ```
+- **No dependencies**: This step runs immediately when the flow starts
+- **`input.run`**: Contains the original flow input
+- **Return value**: The string becomes available to dependent steps
+
+### Second step: greeting
+
+```typescript
+.step(
+ { slug: 'greeting', dependsOn: ['fullName'] },
+ (input) => `Hello, ${input.fullName}!`
+)
+```
+
+- **`dependsOn`**: This step waits for `fullName` to complete
+- **`input.fullName`**: Access the result from the `fullName` step by its slug
+- **Final output**: When the last step completes, its result becomes the flow output
+
+## Flow registration
+
+The `supabase/flows/index.ts` file re-exports all flows:
+
+```typescript title="supabase/flows/index.ts"
+export { GreetUser } from './greet-user.ts';
+```
+
+The Control Plane automatically imports all exports from this file:
+
+```typescript title="supabase/functions/pgflow/index.ts"
+import { ControlPlane } from '@pgflow/edge-worker';
+import * as flows from '../../flows/index.ts';
+
+ControlPlane.serve(flows);
+```
- That's it! The Control Plane in `supabase/functions/pgflow/index.ts` automatically imports all flows from your `flows/` directory using namespace imports:
+## Try modifying the flow
- ```typescript title="supabase/functions/pgflow/index.ts"
- import { ControlPlane } from '@pgflow/edge-worker';
- import * as flows from '../../flows/index.ts';
+1. Add a third step that uses the greeting:
- ControlPlane.serve(flows);
+ ```typescript
+ .step(
+ { slug: 'shout', dependsOn: ['greeting'] },
+ (input) => input.greeting.toUpperCase()
+ )
```
-
+2. Recompile: `npx pgflow@latest compile greetUser`
+3. Apply migration: `npx supabase migrations up`
+4. Restart the worker and trigger a new run
:::note[Key Concepts]
- **input.run**: Contains the original flow input, accessible in every step
diff --git a/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx b/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx
new file mode 100644
index 000000000..6f7f08a65
--- /dev/null
+++ b/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx
@@ -0,0 +1,119 @@
+---
+title: Quickstart
+description: Run the example flow in 5 minutes
+sidebar:
+ order: 15
+---
+
+import { Aside, Steps, CardGrid, LinkCard } from "@astrojs/starlight/components";
+
+See pgflow in action using the GreetUser flow scaffolded during installation.
+
+
+
+
+
+1. ### Start edge functions
+
+ Open a terminal and start the edge functions server:
+
+ ```bash frame="none"
+ npx supabase functions serve
+ ```
+
+ Keep this terminal open.
+
+2. ### Compile the flow
+
+ In a **new terminal**, compile the GreetUser flow:
+
+ ```bash frame="none"
+ npx pgflow@latest compile greetUser
+ ```
+
+ You should see:
+
+ ```
+ Successfully compiled flow to SQL
+ Migration file created: supabase/migrations/..._create_greetUser_flow.sql
+ ```
+
+3. ### Apply the migration
+
+ Apply the migration to register the flow in your database:
+
+ ```bash frame="none"
+ npx supabase migrations up
+ ```
+
+4. ### Start the worker
+
+ Send an HTTP request to start the scaffolded worker:
+
+ ```bash frame="none"
+ curl http://localhost:54321/functions/v1/greet-user-worker
+ ```
+
+ You should see worker activity in the edge functions terminal.
+
+5. ### Trigger your first flow
+
+ Open Supabase Studio (http://localhost:54323), go to the **SQL Editor**, and run:
+
+ ```sql
+ SELECT * FROM pgflow.start_flow(
+ flow_slug => 'greetUser',
+ input => '{"firstName": "Alice", "lastName": "Smith"}'::jsonb
+ );
+ ```
+
+6. ### Check the result
+
+ After a moment, query the run status:
+
+ ```sql
+ SELECT * FROM pgflow.runs
+ WHERE flow_slug = 'greetUser'
+ ORDER BY started_at DESC
+ LIMIT 1;
+ ```
+
+ You should see `status: completed` with output:
+
+ ```json
+ {"greeting": "Hello, Alice Smith!"}
+ ```
+
+
+
+## What just happened?
+
+
+Expand for details
+
+1. **Compilation** converted your TypeScript flow to a SQL migration
+2. **Migration** registered the flow structure in Postgres
+3. **Worker** began polling for tasks on the `greetUser` queue
+4. **Steps executed** in dependency order: `fullName` ran first, then `greeting` used its output
+
+The GreetUser flow demonstrates the core pgflow pattern - steps with dependencies that pass data between them.
+
+
+## Next steps
+
+
+
+
+
diff --git a/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx
index 3de3ba5fa..30a88c27c 100644
--- a/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx
+++ b/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx
@@ -10,13 +10,14 @@ next:
import { Aside, Steps, Tabs, CardGrid, LinkCard, FileTree } from "@astrojs/starlight/components";
-Now that you've defined and compiled your flow, it's time to execute it!
+This guide explains how to create Edge Workers to run your flows. If you just want to run the scaffolded example, see the [Quickstart](/get-started/flows/quickstart/).
-In this guide, we'll set up an Edge Worker to process your flow tasks, trigger your first flow, and observe its execution.
+
@@ -24,24 +25,23 @@ Before starting, make sure you have completed:
1. ### Create a worker function
- Create a new Edge Function that will process tasks for your flow:
+ For each flow, create an Edge Function to process its tasks:
```bash frame="none"
- npx supabase functions new greet-user-worker
+ npx supabase functions new my-flow-worker
```
Replace contents of `index.ts` file with the following:
- ```typescript title="supabase/functions/greet-user-worker/index.ts"
+ ```typescript title="supabase/functions/my-flow-worker/index.ts"
import { EdgeWorker } from "@pgflow/edge-worker";
- import { GreetUser } from '../../flows/greet-user.ts';
+ import { MyFlow } from '../../flows/my-flow.ts';
- // Pass the flow definition to the Edge Worker
- EdgeWorker.start(GreetUser);
+ EdgeWorker.start(MyFlow);
```