From 98f0546b4cd22fcbf8d78b9f219aca87f8c91b2a Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 27 Nov 2025 22:53:47 +0100 Subject: [PATCH] scaffold GreetUser example flow and worker during install with quickstart docs --- .../install/create-example-worker.test.ts | 192 ++++++++++++++++++ .../install/create-flows-directory.test.ts | 66 +++--- pkgs/cli/examples/async-function-hang.ts | 48 ----- .../commands/install/create-example-worker.ts | 99 +++++++++ .../install/create-flows-directory.ts | 38 ++-- pkgs/cli/src/commands/install/index.ts | 13 +- pkgs/website/astro.config.mjs | 1 + .../docs/get-started/flows/create-flow.mdx | 167 +++++++++------ .../docs/get-started/flows/quickstart.mdx | 119 +++++++++++ .../docs/get-started/flows/run-flow.mdx | 26 +-- .../content/docs/get-started/installation.mdx | 13 +- 11 files changed, 616 insertions(+), 166 deletions(-) create mode 100644 pkgs/cli/__tests__/commands/install/create-example-worker.test.ts delete mode 100644 pkgs/cli/examples/async-function-hang.ts create mode 100644 pkgs/cli/src/commands/install/create-example-worker.ts create mode 100644 pkgs/website/src/content/docs/get-started/flows/quickstart.mdx diff --git a/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts b/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts new file mode 100644 index 000000000..2bb486053 --- /dev/null +++ b/pkgs/cli/__tests__/commands/install/create-example-worker.test.ts @@ -0,0 +1,192 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import fs from 'fs'; +import path from 'path'; +import os from 'os'; +import { createExampleWorker } from '../../../src/commands/install/create-example-worker'; +import { getVersion } from '../../../src/utils/get-version'; + +describe('createExampleWorker', () => { + let tempDir: string; + let supabasePath: string; + let workerDir: string; + + beforeEach(() => { + // Create a temporary directory for testing + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'pgflow-test-')); + supabasePath = path.join(tempDir, 'supabase'); + workerDir = path.join(supabasePath, 'functions', 'greet-user-worker'); + }); + + afterEach(() => { + // Clean up the temporary directory + fs.rmSync(tempDir, { recursive: true, force: true }); + }); + + it('should create both files when none exist', async () => { + const result = await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + // Should return true because files were created + expect(result).toBe(true); + + // Verify directory was created + expect(fs.existsSync(workerDir)).toBe(true); + + // Verify all files exist + const indexPath = path.join(workerDir, 'index.ts'); + const denoJsonPath = path.join(workerDir, 'deno.json'); + + expect(fs.existsSync(indexPath)).toBe(true); + expect(fs.existsSync(denoJsonPath)).toBe(true); + }); + + it('should create index.ts that imports GreetUser and starts EdgeWorker', async () => { + await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + const indexPath = path.join(workerDir, 'index.ts'); + const indexContent = fs.readFileSync(indexPath, 'utf8'); + + // Should import EdgeWorker + expect(indexContent).toContain("import { EdgeWorker } from '@pgflow/edge-worker'"); + // Should import GreetUser from flows directory + expect(indexContent).toContain("import { GreetUser } from '../../flows/greet-user.ts'"); + // Should start EdgeWorker with GreetUser + expect(indexContent).toContain('EdgeWorker.start(GreetUser)'); + }); + + it('should create deno.json with correct import mappings', async () => { + await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + const denoJsonPath = path.join(workerDir, 'deno.json'); + const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8'); + const denoJson = JSON.parse(denoJsonContent); + + // Verify imports exist + expect(denoJson.imports).toBeDefined(); + expect(denoJson.imports['@pgflow/core']).toBeDefined(); + expect(denoJson.imports['@pgflow/dsl']).toBeDefined(); + expect(denoJson.imports['@pgflow/edge-worker']).toBeDefined(); + }); + + it('should inject package version instead of @latest in deno.json', async () => { + await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + const denoJsonPath = path.join(workerDir, 'deno.json'); + const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8'); + const denoJson = JSON.parse(denoJsonContent); + + const version = getVersion(); + + // Verify version is not 'unknown' + expect(version).not.toBe('unknown'); + + // Verify that @latest is NOT used + expect(denoJsonContent).not.toContain('@latest'); + + // Verify that the actual version is used + expect(denoJson.imports['@pgflow/core']).toBe(`npm:@pgflow/core@${version}`); + expect(denoJson.imports['@pgflow/dsl']).toBe(`npm:@pgflow/dsl@${version}`); + expect(denoJson.imports['@pgflow/edge-worker']).toBe(`jsr:@pgflow/edge-worker@${version}`); + }); + + it('should not create files when they already exist', async () => { + // Pre-create the directory and files + fs.mkdirSync(workerDir, { recursive: true }); + + const indexPath = path.join(workerDir, 'index.ts'); + const denoJsonPath = path.join(workerDir, 'deno.json'); + + fs.writeFileSync(indexPath, '// existing content'); + fs.writeFileSync(denoJsonPath, '// existing content'); + + const result = await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + // Should return false because no changes were needed + expect(result).toBe(false); + + // Verify files still exist with original content + expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content'); + expect(fs.readFileSync(denoJsonPath, 'utf8')).toBe('// existing content'); + }); + + it('should create only missing files when some already exist', async () => { + // Pre-create the directory and one file + fs.mkdirSync(workerDir, { recursive: true }); + + const indexPath = path.join(workerDir, 'index.ts'); + const denoJsonPath = path.join(workerDir, 'deno.json'); + + // Only create index.ts + fs.writeFileSync(indexPath, '// existing content'); + + const result = await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + // Should return true because deno.json was created + expect(result).toBe(true); + + // Verify index.ts was not modified + expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content'); + + // Verify deno.json was created + expect(fs.existsSync(denoJsonPath)).toBe(true); + + const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8'); + expect(denoJsonContent).toContain('"imports"'); + }); + + it('should create parent directories if they do not exist', async () => { + // Don't create anything - let the function create it all + expect(fs.existsSync(supabasePath)).toBe(false); + + const result = await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + expect(result).toBe(true); + + // Verify all parent directories were created + expect(fs.existsSync(supabasePath)).toBe(true); + expect(fs.existsSync(path.join(supabasePath, 'functions'))).toBe(true); + expect(fs.existsSync(workerDir)).toBe(true); + + // Verify files exist + expect(fs.existsSync(path.join(workerDir, 'index.ts'))).toBe(true); + expect(fs.existsSync(path.join(workerDir, 'deno.json'))).toBe(true); + }); + + it('should include subpath exports for Deno import mapping', async () => { + await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + + const denoJsonPath = path.join(workerDir, 'deno.json'); + const denoJsonContent = fs.readFileSync(denoJsonPath, 'utf8'); + const denoJson = JSON.parse(denoJsonContent); + + const version = getVersion(); + + // Verify subpath exports include versions (needed for proper Deno import mapping) + expect(denoJson.imports['@pgflow/core/']).toBe(`npm:@pgflow/core@${version}/`); + expect(denoJson.imports['@pgflow/dsl/']).toBe(`npm:@pgflow/dsl@${version}/`); + expect(denoJson.imports['@pgflow/edge-worker/']).toBe(`jsr:@pgflow/edge-worker@${version}/`); + }); +}); diff --git a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts index e7dd42444..4dc942afc 100644 --- a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts +++ b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts @@ -35,10 +35,10 @@ describe('createFlowsDirectory', () => { // Verify all files exist const indexPath = path.join(flowsDir, 'index.ts'); - const exampleFlowPath = path.join(flowsDir, 'example-flow.ts'); + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); expect(fs.existsSync(indexPath)).toBe(true); - expect(fs.existsSync(exampleFlowPath)).toBe(true); + expect(fs.existsSync(greetUserPath)).toBe(true); }); it('should create index.ts with barrel export pattern', async () => { @@ -50,31 +50,49 @@ describe('createFlowsDirectory', () => { const indexPath = path.join(flowsDir, 'index.ts'); const indexContent = fs.readFileSync(indexPath, 'utf8'); - // Should have export for ExampleFlow - expect(indexContent).toContain("export { ExampleFlow } from './example-flow.ts';"); + // Should have export for GreetUser + expect(indexContent).toContain("export { GreetUser } from './greet-user.ts';"); // Should have documenting comment expect(indexContent).toContain('Re-export all flows'); }); - it('should create example-flow.ts with named export', async () => { + it('should create greet-user.ts with named export', async () => { await createFlowsDirectory({ supabasePath, autoConfirm: true, }); - const exampleFlowPath = path.join(flowsDir, 'example-flow.ts'); - const exampleFlowContent = fs.readFileSync(exampleFlowPath, 'utf8'); + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); + const greetUserContent = fs.readFileSync(greetUserPath, 'utf8'); // Should use named export (not default) - expect(exampleFlowContent).toContain('export const ExampleFlow'); + expect(greetUserContent).toContain('export const GreetUser'); // Should import Flow from @pgflow/dsl - expect(exampleFlowContent).toContain("import { Flow } from '@pgflow/dsl'"); + expect(greetUserContent).toContain("import { Flow } from '@pgflow/dsl'"); // Should have correct slug - expect(exampleFlowContent).toContain("slug: 'exampleFlow'"); - // Should have input type - expect(exampleFlowContent).toContain('type Input'); - // Should have at least one step - expect(exampleFlowContent).toContain('.step('); + expect(greetUserContent).toContain("slug: 'greetUser'"); + // Should have input type with firstName and lastName + expect(greetUserContent).toContain('type Input'); + expect(greetUserContent).toContain('firstName'); + expect(greetUserContent).toContain('lastName'); + }); + + it('should create greet-user.ts with two steps showing dependsOn', async () => { + await createFlowsDirectory({ + supabasePath, + autoConfirm: true, + }); + + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); + const greetUserContent = fs.readFileSync(greetUserPath, 'utf8'); + + // Should have two steps + expect(greetUserContent).toContain("slug: 'fullName'"); + expect(greetUserContent).toContain("slug: 'greeting'"); + // Second step should depend on first + expect(greetUserContent).toContain("dependsOn: ['fullName']"); + // Second step should access result from first step + expect(greetUserContent).toContain('input.fullName'); }); it('should not create files when they already exist', async () => { @@ -82,10 +100,10 @@ describe('createFlowsDirectory', () => { fs.mkdirSync(flowsDir, { recursive: true }); const indexPath = path.join(flowsDir, 'index.ts'); - const exampleFlowPath = path.join(flowsDir, 'example-flow.ts'); + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); fs.writeFileSync(indexPath, '// existing content'); - fs.writeFileSync(exampleFlowPath, '// existing content'); + fs.writeFileSync(greetUserPath, '// existing content'); const result = await createFlowsDirectory({ supabasePath, @@ -97,7 +115,7 @@ describe('createFlowsDirectory', () => { // Verify files still exist with original content expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content'); - expect(fs.readFileSync(exampleFlowPath, 'utf8')).toBe('// existing content'); + expect(fs.readFileSync(greetUserPath, 'utf8')).toBe('// existing content'); }); it('should create only missing files when some already exist', async () => { @@ -105,7 +123,7 @@ describe('createFlowsDirectory', () => { fs.mkdirSync(flowsDir, { recursive: true }); const indexPath = path.join(flowsDir, 'index.ts'); - const exampleFlowPath = path.join(flowsDir, 'example-flow.ts'); + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); // Only create index.ts fs.writeFileSync(indexPath, '// existing content'); @@ -115,17 +133,17 @@ describe('createFlowsDirectory', () => { autoConfirm: true, }); - // Should return true because example-flow.ts was created + // Should return true because greet-user.ts was created expect(result).toBe(true); // Verify index.ts was not modified expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content'); - // Verify example-flow.ts was created - expect(fs.existsSync(exampleFlowPath)).toBe(true); + // Verify greet-user.ts was created + expect(fs.existsSync(greetUserPath)).toBe(true); - const exampleContent = fs.readFileSync(exampleFlowPath, 'utf8'); - expect(exampleContent).toContain('export const ExampleFlow'); + const greetUserContent = fs.readFileSync(greetUserPath, 'utf8'); + expect(greetUserContent).toContain('export const GreetUser'); }); it('should create parent directories if they do not exist', async () => { @@ -145,6 +163,6 @@ describe('createFlowsDirectory', () => { // Verify files exist expect(fs.existsSync(path.join(flowsDir, 'index.ts'))).toBe(true); - expect(fs.existsSync(path.join(flowsDir, 'example-flow.ts'))).toBe(true); + expect(fs.existsSync(path.join(flowsDir, 'greet-user.ts'))).toBe(true); }); }); diff --git a/pkgs/cli/examples/async-function-hang.ts b/pkgs/cli/examples/async-function-hang.ts deleted file mode 100644 index 33babbaaa..000000000 --- a/pkgs/cli/examples/async-function-hang.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Flow } from '@pgflow/dsl'; - -type Input = { - logo_url: string; - company_slug: string; - company_id: string; -}; - -// Dummy async task that simulates real async work -async function processCompanyLogoTask(input: { logo_url: string; company_slug: string }) { - // Simulate some async work with a promise - await new Promise((resolve) => setTimeout(resolve, 100)); - return { - file_path: `/uploads/${input.company_slug}/logo.png`, - }; -} - -// Another dummy async task -async function updateCompanyLogoUrlTask(input: { company_id: string; file_path: string }) { - // Simulate some async work - await new Promise((resolve) => setTimeout(resolve, 50)); - return { - success: true, - company_id: input.company_id, - logo_url: input.file_path, - }; -} - -export default new Flow({ - slug: 'upload_company_logo', - maxAttempts: 3, - timeout: 60, - baseDelay: 2, -}) - .step( - { slug: 'process_company_logo' }, - async (input) => await processCompanyLogoTask({ - logo_url: input.run.logo_url, - company_slug: input.run.company_slug, - }) - ) - .step( - { slug: 'update_company_logo_url', dependsOn: ['process_company_logo'] }, - async (input) => await updateCompanyLogoUrlTask({ - company_id: input.run.company_id, - file_path: (input.process_company_logo as { file_path: string }).file_path, - }) - ); diff --git a/pkgs/cli/src/commands/install/create-example-worker.ts b/pkgs/cli/src/commands/install/create-example-worker.ts new file mode 100644 index 000000000..137ccc722 --- /dev/null +++ b/pkgs/cli/src/commands/install/create-example-worker.ts @@ -0,0 +1,99 @@ +import fs from 'fs'; +import path from 'path'; +import { log, confirm } from '@clack/prompts'; +import chalk from 'chalk'; +import { getVersion } from '../../utils/get-version.js'; + +const INDEX_TS_TEMPLATE = `import { EdgeWorker } from '@pgflow/edge-worker'; +import { GreetUser } from '../../flows/greet-user.ts'; + +EdgeWorker.start(GreetUser); +`; + +const DENO_JSON_TEMPLATE = (version: string) => `{ + "imports": { + "@pgflow/core": "npm:@pgflow/core@${version}", + "@pgflow/core/": "npm:@pgflow/core@${version}/", + "@pgflow/dsl": "npm:@pgflow/dsl@${version}", + "@pgflow/dsl/": "npm:@pgflow/dsl@${version}/", + "@pgflow/dsl/supabase": "npm:@pgflow/dsl@${version}/supabase", + "@pgflow/edge-worker": "jsr:@pgflow/edge-worker@${version}", + "@pgflow/edge-worker/": "jsr:@pgflow/edge-worker@${version}/", + "@pgflow/edge-worker/_internal": "jsr:@pgflow/edge-worker@${version}/_internal" + } +} +`; + +export async function createExampleWorker({ + supabasePath, + autoConfirm = false, +}: { + supabasePath: string; + autoConfirm?: boolean; +}): Promise { + const functionsDir = path.join(supabasePath, 'functions'); + const workerDir = path.join(functionsDir, 'greet-user-worker'); + + const indexPath = path.join(workerDir, 'index.ts'); + const denoJsonPath = path.join(workerDir, 'deno.json'); + + // Relative paths for display + const relativeWorkerDir = 'supabase/functions/greet-user-worker'; + const relativeIndexPath = `${relativeWorkerDir}/index.ts`; + const relativeDenoJsonPath = `${relativeWorkerDir}/deno.json`; + + // Check what needs to be created + const filesToCreate: Array<{ path: string; relativePath: string }> = []; + + if (!fs.existsSync(indexPath)) { + filesToCreate.push({ path: indexPath, relativePath: relativeIndexPath }); + } + + if (!fs.existsSync(denoJsonPath)) { + filesToCreate.push({ path: denoJsonPath, relativePath: relativeDenoJsonPath }); + } + + // If all files exist, return success + if (filesToCreate.length === 0) { + log.success('Example worker already up to date'); + return false; + } + + // Show preview and ask for confirmation only when not auto-confirming + if (!autoConfirm) { + const summaryMsg = [ + `Create ${chalk.cyan('functions/greet-user-worker/')} ${chalk.dim('(example worker for GreetUser flow)')}:`, + '', + ...filesToCreate.map((file) => ` ${chalk.bold(path.basename(file.relativePath))}`), + ].join('\n'); + + log.info(summaryMsg); + + const confirmResult = await confirm({ + message: `Create functions/greet-user-worker/?`, + }); + + if (confirmResult !== true) { + log.warn('Example worker installation skipped'); + return false; + } + } + + // Create the directory if it doesn't exist + if (!fs.existsSync(workerDir)) { + fs.mkdirSync(workerDir, { recursive: true }); + } + + // Create files + if (filesToCreate.some((f) => f.path === indexPath)) { + fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE); + } + + if (filesToCreate.some((f) => f.path === denoJsonPath)) { + fs.writeFileSync(denoJsonPath, DENO_JSON_TEMPLATE(getVersion())); + } + + log.success('Example worker created'); + + return true; +} diff --git a/pkgs/cli/src/commands/install/create-flows-directory.ts b/pkgs/cli/src/commands/install/create-flows-directory.ts index 32f6b80bd..0b2014451 100644 --- a/pkgs/cli/src/commands/install/create-flows-directory.ts +++ b/pkgs/cli/src/commands/install/create-flows-directory.ts @@ -6,15 +6,27 @@ import chalk from 'chalk'; const INDEX_TS_TEMPLATE = `// Re-export all flows from this directory // Example: export { MyFlow } from './my-flow.ts'; -export { ExampleFlow } from './example-flow.ts'; +export { GreetUser } from './greet-user.ts'; `; -const EXAMPLE_FLOW_TEMPLATE = `import { Flow } from '@pgflow/dsl'; - -type Input = { name: string }; - -export const ExampleFlow = new Flow({ slug: 'exampleFlow' }) - .step({ slug: 'greet' }, (input) => \`Hello, \${input.run.name}!\`); +const GREET_USER_TEMPLATE = `import { Flow } from '@pgflow/dsl'; + +type Input = { + firstName: string; + lastName: string; +}; + +export const GreetUser = new Flow({ + slug: 'greetUser', +}) + .step( + { slug: 'fullName' }, + (input) => \`\${input.run.firstName} \${input.run.lastName}\` + ) + .step( + { slug: 'greeting', dependsOn: ['fullName'] }, + (input) => \`Hello, \${input.fullName}!\` + ); `; export async function createFlowsDirectory({ @@ -27,12 +39,12 @@ export async function createFlowsDirectory({ const flowsDir = path.join(supabasePath, 'flows'); const indexPath = path.join(flowsDir, 'index.ts'); - const exampleFlowPath = path.join(flowsDir, 'example-flow.ts'); + const greetUserPath = path.join(flowsDir, 'greet-user.ts'); // Relative paths for display const relativeFlowsDir = 'supabase/flows'; const relativeIndexPath = `${relativeFlowsDir}/index.ts`; - const relativeExampleFlowPath = `${relativeFlowsDir}/example-flow.ts`; + const relativeGreetUserPath = `${relativeFlowsDir}/greet-user.ts`; // Check what needs to be created const filesToCreate: Array<{ path: string; relativePath: string }> = []; @@ -41,8 +53,8 @@ export async function createFlowsDirectory({ filesToCreate.push({ path: indexPath, relativePath: relativeIndexPath }); } - if (!fs.existsSync(exampleFlowPath)) { - filesToCreate.push({ path: exampleFlowPath, relativePath: relativeExampleFlowPath }); + if (!fs.existsSync(greetUserPath)) { + filesToCreate.push({ path: greetUserPath, relativePath: relativeGreetUserPath }); } // If all files exist, return success @@ -81,8 +93,8 @@ export async function createFlowsDirectory({ fs.writeFileSync(indexPath, INDEX_TS_TEMPLATE); } - if (filesToCreate.some((f) => f.path === exampleFlowPath)) { - fs.writeFileSync(exampleFlowPath, EXAMPLE_FLOW_TEMPLATE); + if (filesToCreate.some((f) => f.path === greetUserPath)) { + fs.writeFileSync(greetUserPath, GREET_USER_TEMPLATE); } log.success('Flows directory created'); diff --git a/pkgs/cli/src/commands/install/index.ts b/pkgs/cli/src/commands/install/index.ts index 95fdbb1c2..c4fd61c1d 100644 --- a/pkgs/cli/src/commands/install/index.ts +++ b/pkgs/cli/src/commands/install/index.ts @@ -6,6 +6,7 @@ import { updateConfigToml } from './update-config-toml.js'; import { updateEnvFile } from './update-env-file.js'; import { createEdgeFunction } from './create-edge-function.js'; import { createFlowsDirectory } from './create-flows-directory.js'; +import { createExampleWorker } from './create-example-worker.js'; import { supabasePathPrompt } from './supabase-path-prompt.js'; export default (program: Command) => { @@ -35,8 +36,9 @@ export default (program: Command) => { '', ` • Update ${chalk.cyan('supabase/config.toml')} ${chalk.dim('(enable pooler, per_worker runtime)')}`, ` • Add pgflow migrations to ${chalk.cyan('supabase/migrations/')}`, - ` • Create ${chalk.cyan('supabase/flows/')} ${chalk.dim('(flow definitions with example)')}`, + ` • Create ${chalk.cyan('supabase/flows/')} ${chalk.dim('(flow definitions with GreetUser example)')}`, ` • Create Control Plane in ${chalk.cyan('supabase/functions/pgflow/')}`, + ` • Create ${chalk.cyan('supabase/functions/greet-user-worker/')} ${chalk.dim('(example worker)')}`, ` • Configure ${chalk.cyan('supabase/functions/.env')}`, '', ` ${chalk.green('✓ Safe to re-run - completed steps will be skipped')}`, @@ -85,6 +87,11 @@ export default (program: Command) => { autoConfirm: true, }); + const exampleWorker = await createExampleWorker({ + supabasePath, + autoConfirm: true, + }); + const envFile = await updateEnvFile({ supabasePath, autoConfirm: true, @@ -93,7 +100,7 @@ export default (program: Command) => { // Step 4: Show completion message const outroMessages: string[] = []; - if (migrations || configUpdate || flowsDirectory || edgeFunction || envFile) { + if (migrations || configUpdate || flowsDirectory || edgeFunction || exampleWorker || envFile) { outroMessages.push(chalk.green.bold('✓ Installation complete!')); } else { outroMessages.push( @@ -122,7 +129,7 @@ export default (program: Command) => { } outroMessages.push( - ` ${stepNumber}. Create your first flow: ${chalk.blue.underline('https://pgflow.dev/getting-started/create-first-flow/')}` + ` ${stepNumber}. Run the example: ${chalk.blue.underline('https://pgflow.dev/get-started/flows/quickstart/')}` ); outro(outroMessages.join('\n')); diff --git a/pkgs/website/astro.config.mjs b/pkgs/website/astro.config.mjs index 034b0101d..2e71facc3 100644 --- a/pkgs/website/astro.config.mjs +++ b/pkgs/website/astro.config.mjs @@ -166,6 +166,7 @@ export default defineConfig({ ], promote: [ 'get-started/installation', + 'get-started/flows/quickstart', 'get-started/flows/create-flow', 'get-started/flows/compile-flow', 'get-started/flows/run-flow', diff --git a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx index 45adbebac..fcadb5fcf 100644 --- a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx +++ b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx @@ -1,93 +1,140 @@ --- -title: Create your first flow -description: Learn how to define a flow using pgflow's TypeScript DSL +title: Understanding Flows +description: Learn how the scaffolded GreetUser flow works sidebar: order: 20 --- -import { Aside, Steps, Tabs, TabItem, CardGrid, LinkCard } from "@astrojs/starlight/components"; +import { Aside, CardGrid, LinkCard } from "@astrojs/starlight/components"; import { FileTree } from '@astrojs/starlight/components'; -Now that pgflow is installed, let's create a simple hello world flow that demonstrates the core concepts. +The `pgflow install` command created an example flow that demonstrates core pgflow concepts. Let's understand how it works. -Our flow will: + -1. Take a person's **first name** and **last name** as input -2. Format a **full name** -3. Create a personalized **greeting** +## Project structure - + +- supabase + - flows + - greet-user.ts (flow definition) + - index.ts (re-exports all flows) + - functions + - pgflow + - index.ts (Control Plane) + - greet-user-worker + - index.ts (Edge Worker) + -## Creating a Simple Greeting Flow +## The GreetUser flow - +Open `supabase/flows/greet-user.ts`: +```typescript title="supabase/flows/greet-user.ts" +import { Flow } from '@pgflow/dsl'; -1. ### Set up your project structure +type Input = { + firstName: string; + lastName: string; +}; - After running `pgflow install`, you already have a `flows/` directory with an example. Let's create a new flow: +export const GreetUser = new Flow({ + slug: 'greetUser', +}) + .step( + { slug: 'fullName' }, + (input) => `${input.run.firstName} ${input.run.lastName}` + ) + .step( + { slug: 'greeting', dependsOn: ['fullName'] }, + (input) => `Hello, ${input.fullName}!` + ); +``` - - - supabase - - flows - - greet-user.ts (your flow definition) - - index.ts (re-exports all flows) - - functions - - pgflow - - index.ts (Control Plane) - - deno.json - +Let's break down each part: -2. ### Create the flow definition +### Input type - Create a file called `supabase/flows/greet-user.ts` with this content: +```typescript +type Input = { + firstName: string; + lastName: string; +}; +``` - ```typescript title="supabase/flows/greet-user.ts" - import { Flow } from '@pgflow/dsl'; +This defines what data the flow accepts when started. The input is always accessible via `input.run` in any step. - type Input = { - firstName: string; - lastName: string; - }; +### Flow constructor - export const GreetUser = new Flow({ - slug: 'greetUser', - }) - .step( - { slug: 'fullName' }, - (input) => `${input.run.firstName} ${input.run.lastName}` - ) - .step( - { slug: 'greeting', dependsOn: ['fullName'] }, - (input) => `Hello, ${input.fullName}!` - ); - ``` +```typescript +export const GreetUser = new Flow({ + slug: 'greetUser', +}) +``` -3. ### Register the flow +- **Named export**: Flows use named exports (not default), making them easy to import elsewhere +- **Generic type**: `Flow` provides type safety for the input +- **Slug**: A unique identifier used in the database and when starting flows - Add your flow to `supabase/flows/index.ts`: +### First step: fullName - ```diff lang="typescript" title="supabase/flows/index.ts" - // Re-export all flows from this directory - // Example: export { MyFlow } from './my-flow.ts'; +```typescript +.step( + { slug: 'fullName' }, + (input) => `${input.run.firstName} ${input.run.lastName}` +) +``` - export { ExampleFlow } from './example-flow.ts'; - +export { GreetUser } from './greet-user.ts'; - ``` +- **No dependencies**: This step runs immediately when the flow starts +- **`input.run`**: Contains the original flow input +- **Return value**: The string becomes available to dependent steps + +### Second step: greeting + +```typescript +.step( + { slug: 'greeting', dependsOn: ['fullName'] }, + (input) => `Hello, ${input.fullName}!` +) +``` + +- **`dependsOn`**: This step waits for `fullName` to complete +- **`input.fullName`**: Access the result from the `fullName` step by its slug +- **Final output**: When the last step completes, its result becomes the flow output + +## Flow registration + +The `supabase/flows/index.ts` file re-exports all flows: + +```typescript title="supabase/flows/index.ts" +export { GreetUser } from './greet-user.ts'; +``` + +The Control Plane automatically imports all exports from this file: + +```typescript title="supabase/functions/pgflow/index.ts" +import { ControlPlane } from '@pgflow/edge-worker'; +import * as flows from '../../flows/index.ts'; + +ControlPlane.serve(flows); +``` - That's it! The Control Plane in `supabase/functions/pgflow/index.ts` automatically imports all flows from your `flows/` directory using namespace imports: +## Try modifying the flow - ```typescript title="supabase/functions/pgflow/index.ts" - import { ControlPlane } from '@pgflow/edge-worker'; - import * as flows from '../../flows/index.ts'; +1. Add a third step that uses the greeting: - ControlPlane.serve(flows); + ```typescript + .step( + { slug: 'shout', dependsOn: ['greeting'] }, + (input) => input.greeting.toUpperCase() + ) ``` - +2. Recompile: `npx pgflow@latest compile greetUser` +3. Apply migration: `npx supabase migrations up` +4. Restart the worker and trigger a new run :::note[Key Concepts] - **input.run**: Contains the original flow input, accessible in every step diff --git a/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx b/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx new file mode 100644 index 000000000..6f7f08a65 --- /dev/null +++ b/pkgs/website/src/content/docs/get-started/flows/quickstart.mdx @@ -0,0 +1,119 @@ +--- +title: Quickstart +description: Run the example flow in 5 minutes +sidebar: + order: 15 +--- + +import { Aside, Steps, CardGrid, LinkCard } from "@astrojs/starlight/components"; + +See pgflow in action using the GreetUser flow scaffolded during installation. + + + + + +1. ### Start edge functions + + Open a terminal and start the edge functions server: + + ```bash frame="none" + npx supabase functions serve + ``` + + Keep this terminal open. + +2. ### Compile the flow + + In a **new terminal**, compile the GreetUser flow: + + ```bash frame="none" + npx pgflow@latest compile greetUser + ``` + + You should see: + + ``` + Successfully compiled flow to SQL + Migration file created: supabase/migrations/..._create_greetUser_flow.sql + ``` + +3. ### Apply the migration + + Apply the migration to register the flow in your database: + + ```bash frame="none" + npx supabase migrations up + ``` + +4. ### Start the worker + + Send an HTTP request to start the scaffolded worker: + + ```bash frame="none" + curl http://localhost:54321/functions/v1/greet-user-worker + ``` + + You should see worker activity in the edge functions terminal. + +5. ### Trigger your first flow + + Open Supabase Studio (http://localhost:54323), go to the **SQL Editor**, and run: + + ```sql + SELECT * FROM pgflow.start_flow( + flow_slug => 'greetUser', + input => '{"firstName": "Alice", "lastName": "Smith"}'::jsonb + ); + ``` + +6. ### Check the result + + After a moment, query the run status: + + ```sql + SELECT * FROM pgflow.runs + WHERE flow_slug = 'greetUser' + ORDER BY started_at DESC + LIMIT 1; + ``` + + You should see `status: completed` with output: + + ```json + {"greeting": "Hello, Alice Smith!"} + ``` + + + +## What just happened? + +
+Expand for details + +1. **Compilation** converted your TypeScript flow to a SQL migration +2. **Migration** registered the flow structure in Postgres +3. **Worker** began polling for tasks on the `greetUser` queue +4. **Steps executed** in dependency order: `fullName` ran first, then `greeting` used its output + +The GreetUser flow demonstrates the core pgflow pattern - steps with dependencies that pass data between them. +
+ +## Next steps + + + + + diff --git a/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx index 3de3ba5fa..30a88c27c 100644 --- a/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx +++ b/pkgs/website/src/content/docs/get-started/flows/run-flow.mdx @@ -10,13 +10,14 @@ next: import { Aside, Steps, Tabs, CardGrid, LinkCard, FileTree } from "@astrojs/starlight/components"; -Now that you've defined and compiled your flow, it's time to execute it! +This guide explains how to create Edge Workers to run your flows. If you just want to run the scaffolded example, see the [Quickstart](/get-started/flows/quickstart/). -In this guide, we'll set up an Edge Worker to process your flow tasks, trigger your first flow, and observe its execution. + @@ -24,24 +25,23 @@ Before starting, make sure you have completed: 1. ### Create a worker function - Create a new Edge Function that will process tasks for your flow: + For each flow, create an Edge Function to process its tasks: ```bash frame="none" - npx supabase functions new greet-user-worker + npx supabase functions new my-flow-worker ``` Replace contents of `index.ts` file with the following: - ```typescript title="supabase/functions/greet-user-worker/index.ts" + ```typescript title="supabase/functions/my-flow-worker/index.ts" import { EdgeWorker } from "@pgflow/edge-worker"; - import { GreetUser } from '../../flows/greet-user.ts'; + import { MyFlow } from '../../flows/my-flow.ts'; - // Pass the flow definition to the Edge Worker - EdgeWorker.start(GreetUser); + EdgeWorker.start(MyFlow); ```