Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('createFlowsDirectory', () => {

// Verify all files exist
const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');

expect(fs.existsSync(indexPath)).toBe(true);
expect(fs.existsSync(exampleFlowPath)).toBe(true);
Expand All @@ -51,26 +51,26 @@ describe('createFlowsDirectory', () => {
const indexContent = fs.readFileSync(indexPath, 'utf8');

// Should have export for ExampleFlow
expect(indexContent).toContain("export { ExampleFlow } from './example_flow.ts'");
expect(indexContent).toContain("export { ExampleFlow } from './example-flow.ts';");
// Should have documenting comment
expect(indexContent).toContain('Re-export all flows');
});

it('should create example_flow.ts with named export', async () => {
it('should create example-flow.ts with named export', async () => {
await createFlowsDirectory({
supabasePath,
autoConfirm: true,
});

const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');
const exampleFlowContent = fs.readFileSync(exampleFlowPath, 'utf8');

// Should use named export (not default)
expect(exampleFlowContent).toContain('export const ExampleFlow');
// Should import Flow from @pgflow/dsl
expect(exampleFlowContent).toContain("import { Flow } from '@pgflow/dsl'");
// Should have correct slug
expect(exampleFlowContent).toContain("slug: 'example_flow'");
expect(exampleFlowContent).toContain("slug: 'exampleFlow'");
// Should have input type
expect(exampleFlowContent).toContain('type Input');
// Should have at least one step
Expand All @@ -82,7 +82,7 @@ describe('createFlowsDirectory', () => {
fs.mkdirSync(flowsDir, { recursive: true });

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');

fs.writeFileSync(indexPath, '// existing content');
fs.writeFileSync(exampleFlowPath, '// existing content');
Expand All @@ -105,7 +105,7 @@ describe('createFlowsDirectory', () => {
fs.mkdirSync(flowsDir, { recursive: true });

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');

// Only create index.ts
fs.writeFileSync(indexPath, '// existing content');
Expand All @@ -115,13 +115,13 @@ describe('createFlowsDirectory', () => {
autoConfirm: true,
});

// Should return true because example_flow.ts was created
// Should return true because example-flow.ts was created
expect(result).toBe(true);

// Verify index.ts was not modified
expect(fs.readFileSync(indexPath, 'utf8')).toBe('// existing content');

// Verify example_flow.ts was created
// Verify example-flow.ts was created
expect(fs.existsSync(exampleFlowPath)).toBe(true);

const exampleContent = fs.readFileSync(exampleFlowPath, 'utf8');
Expand All @@ -145,6 +145,6 @@ describe('createFlowsDirectory', () => {

// Verify files exist
expect(fs.existsSync(path.join(flowsDir, 'index.ts'))).toBe(true);
expect(fs.existsSync(path.join(flowsDir, 'example_flow.ts'))).toBe(true);
expect(fs.existsSync(path.join(flowsDir, 'example-flow.ts'))).toBe(true);
});
});
10 changes: 5 additions & 5 deletions pkgs/cli/src/commands/install/create-flows-directory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import { log, confirm } from '@clack/prompts';
import chalk from 'chalk';

const INDEX_TS_TEMPLATE = `// Re-export all flows from this directory
// Example: export { MyFlow } from './my_flow.ts';
// Example: export { MyFlow } from './my-flow.ts';

export { ExampleFlow } from './example_flow.ts';
export { ExampleFlow } from './example-flow.ts';
`;

const EXAMPLE_FLOW_TEMPLATE = `import { Flow } from '@pgflow/dsl';

type Input = { name: string };

export const ExampleFlow = new Flow<Input>({ slug: 'example_flow' })
export const ExampleFlow = new Flow<Input>({ slug: 'exampleFlow' })
.step({ slug: 'greet' }, (input) => \`Hello, \${input.run.name}!\`);
`;

Expand All @@ -27,12 +27,12 @@ export async function createFlowsDirectory({
const flowsDir = path.join(supabasePath, 'flows');

const indexPath = path.join(flowsDir, 'index.ts');
const exampleFlowPath = path.join(flowsDir, 'example_flow.ts');
const exampleFlowPath = path.join(flowsDir, 'example-flow.ts');

// Relative paths for display
const relativeFlowsDir = 'supabase/flows';
const relativeIndexPath = `${relativeFlowsDir}/index.ts`;
const relativeExampleFlowPath = `${relativeFlowsDir}/example_flow.ts`;
const relativeExampleFlowPath = `${relativeFlowsDir}/example-flow.ts`;

// Check what needs to be created
const filesToCreate: Array<{ path: string; relativePath: string }> = [];
Expand Down
2 changes: 1 addition & 1 deletion pkgs/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ When using with `@pgflow/dsl`, you get full type safety:
import { Flow } from '@pgflow/dsl';

// Define your flow
const AnalyzeWebsite = new Flow<{ url: string }>({ slug: 'analyze_website' })
const AnalyzeWebsite = new Flow<{ url: string }>({ slug: 'analyzeWebsite' })
.step({ slug: 'scrape' }, async (input) => ({ content: 'html...' }))
.step({ slug: 'analyze' }, async (input) => ({ sentiment: 0.8 }));

Expand Down
30 changes: 17 additions & 13 deletions pkgs/client/SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ pgflow ships with NO permissions. The SQL below is a **convenience snippet** tha

> [!CAUTION]
> This SQL grants BROAD permissions! After running this, ANY authenticated user can:
>
> - Start ANY flow
> - View ANY run (if they know the run_id)
> - See ALL flow definitions
>
>
> It is YOUR responsibility to:
>
> - Tailor these permissions to your specific needs
> - Implement Row Level Security policies
> - Add proper access controls
Expand All @@ -51,40 +53,42 @@ This is suitable for development and trusted environments only.
Since pgflow doesn't handle security yet, you might want to:

1. **Add Row Level Security**

The key to implementing RLS with pgflow is to include a `user_id` field in your flow's input object. This allows you to create policies that check if the current user matches the user who started the flow.

First, include user_id in your flow input type:

```typescript
import { Flow } from '@pgflow/dsl';

// Define input type with user_id
type MyFlowInput = {
user_id: string; // <<<<< Add this field
user_id: string; // <<<<< Add this field
data: string;
// ... other fields
};

export const MyFlow = new Flow<MyFlowInput>({
slug: 'my_secure_flow',
})
slug: 'mySecureFlow',
});
// ... rest of flow definition
```

Then create RLS policies and an index for performance:

```sql
-- Enable RLS on tables you want to protect
ALTER TABLE pgflow.runs ENABLE ROW LEVEL SECURITY;

-- Create index for better RLS performance
CREATE INDEX idx_runs_user_id ON pgflow.runs ((input->>'user_id'));

-- Create your own policies based on your needs
-- Example: Users can only see their own runs
CREATE POLICY "Users see own runs" ON pgflow.runs
FOR SELECT USING ((SELECT auth.uid())::text = input->>'user_id');
```

For more details about the pgflow schema and the `runs` table structure, see the [Schema Design section](../core/README.md#schema-design) in the core documentation.

2. **Track User Attribution**
Expand All @@ -93,4 +97,4 @@ Since pgflow doesn't handle security yet, you might want to:

## Questions?

If you have security concerns or suggestions, please share them in the [GitHub discussions](https://github.com/pgflow-dev/pgflow/discussions).
If you have security concerns or suggestions, please share them in the [GitHub discussions](https://github.com/pgflow-dev/pgflow/discussions).
24 changes: 12 additions & 12 deletions pkgs/dsl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Input = {

// Define a flow with steps and dependencies
export const AnalyzeWebsite = new Flow<Input>({
slug: 'analyze_website',
slug: 'analyzeWebsite',
maxAttempts: 3,
baseDelay: 5,
timeout: 10,
Expand Down Expand Up @@ -98,13 +98,13 @@ A semantic wrapper around `.step()` that provides type enforcement for steps tha
```typescript
// Fetch an array of items to be processed
.array(
{ slug: 'fetch_items' },
{ slug: 'fetchItems' },
async () => [1, 2, 3, 4, 5]
)

// With dependencies - combining data from multiple sources
.array(
{ slug: 'combine_results', dependsOn: ['source1', 'source2'] },
{ slug: 'combineResults', dependsOn: ['source1', 'source2'] },
async (input) => [...input.source1, ...input.source2]
)
```
Expand All @@ -131,15 +131,15 @@ Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The
```typescript
// ROOT MAP - No array: property means use flow input
// Flow input MUST be an array (e.g., ["hello", "world"])
new Flow<string[]>({ slug: 'process_strings' })
new Flow<string[]>({ slug: 'processStrings' })
.map(
{ slug: 'uppercase' }, // No array: property!
(item) => item.toUpperCase()
);
// Each string in the input array gets uppercased in parallel

// DEPENDENT MAP - array: property specifies the source step
new Flow<{}>({ slug: 'data_pipeline' })
new Flow<{}>({ slug: 'dataPipeline' })
.array({ slug: 'numbers' }, () => [1, 2, 3])
.map(
{ slug: 'double', array: 'numbers' }, // Processes 'numbers' output
Expand All @@ -166,7 +166,7 @@ The `.map()` method provides full TypeScript type inference for array elements:
```typescript
type User = { id: number; name: string };

new Flow<{}>({ slug: 'user_flow' })
new Flow<{}>({ slug: 'userFlow' })
.array({ slug: 'users' }, (): User[] => [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
Expand All @@ -181,7 +181,7 @@ new Flow<{}>({ slug: 'user_flow' })

```typescript
// Batch processing - process multiple items in parallel
new Flow<number[]>({ slug: 'batch_processor' })
new Flow<number[]>({ slug: 'batchProcessor' })
.map({ slug: 'validate' }, (item) => {
if (item < 0) throw new Error('Invalid item');
return item;
Expand All @@ -192,9 +192,9 @@ new Flow<number[]>({ slug: 'batch_processor' })
});

// Data transformation pipeline
new Flow<{}>({ slug: 'etl_pipeline' })
.step({ slug: 'fetch_urls' }, () => ['url1', 'url2', 'url3'])
.map({ slug: 'scrape', array: 'fetch_urls' }, async (url) => {
new Flow<{}>({ slug: 'etlPipeline' })
.step({ slug: 'fetchUrls' }, () => ['url1', 'url2', 'url3'])
.map({ slug: 'scrape', array: 'fetchUrls' }, async (url) => {
return await fetchContent(url);
})
.map({ slug: 'extract', array: 'scrape' }, (html) => {
Expand Down Expand Up @@ -271,7 +271,7 @@ To use Supabase resources, import the `Flow` class from the Supabase preset:
import { Flow } from '@pgflow/dsl/supabase';

const MyFlow = new Flow<{ userId: string }>({
slug: 'my_flow',
slug: 'myFlow',
}).step({ slug: 'process' }, async (input, context) => {
// TypeScript knows context includes Supabase resources
const { data } = await context.supabase
Expand Down Expand Up @@ -300,7 +300,7 @@ Configure flows and steps with runtime options:

```typescript
new Flow<Input>({
slug: 'my_flow', // Required: Unique flow identifier
slug: 'myFlow', // Required: Unique flow identifier
maxAttempts: 3, // Optional: Maximum retry attempts (default: 1)
baseDelay: 5, // Optional: Base delay in seconds for retries (default: 1)
timeout: 10, // Optional: Task timeout in seconds (default: 30)
Expand Down
4 changes: 2 additions & 2 deletions pkgs/edge-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import { Flow } from 'jsr:@pgflow/dsl/supabase';

// Define a flow using Supabase preset for Supabase resources
const AnalyzeWebsite = new Flow<{ url: string }>({
slug: 'analyze_website',
slug: 'analyzeWebsite',
})
.step({ slug: 'fetch' }, async (input, context) => {
// Access Supabase resources through context
Expand Down Expand Up @@ -172,7 +172,7 @@ When defining flows that use Supabase resources, import `Flow` from the Supabase
```typescript
import { Flow } from 'jsr:@pgflow/dsl/supabase';

const MyFlow = new Flow<InputType>({ slug: 'my_flow' }).step(
const MyFlow = new Flow<InputType>({ slug: 'myFlow' }).step(
{ slug: 'process' },
async (input, context) => {
// TypeScript knows context includes all Supabase resources
Expand Down
2 changes: 1 addition & 1 deletion pkgs/website/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ export default defineConfig({
label: 'Context object',
link: '/concepts/context-object/',
},
{ label: 'Naming steps', link: '/concepts/naming-steps/' },
{ label: 'Naming conventions', link: '/concepts/naming-conventions/' },
],
},
],
Expand Down
3 changes: 2 additions & 1 deletion pkgs/website/redirects.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export const redirects = {
'/how-to/manual-installation/': '/reference/manual-installation/',
'/how-to/manually-compile-flow/': '/reference/compile-api/',
'/how-to/monitor-flow-execution/': '/deploy/monitor-execution/',
'/how-to/naming-steps/': '/concepts/naming-steps/',
'/how-to/naming-steps/': '/concepts/naming-conventions/',
'/concepts/naming-steps/': '/concepts/naming-conventions/',
'/how-to/organize-flows-code/': '/build/organize-flow-code/',
'/how-to/prepare-db-string/': '/deploy/connection-string/',
'/how-to/prune-old-records/': '/deploy/prune-records/',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ To create task functions that can be used across multiple flows without tight co
}

// Flow uses handler to adapt context to task parameters
new Flow<{ userId: string }>({ slug: 'user_flow' })
new Flow<{ userId: string }>({ slug: 'userFlow' })
.step({ slug: 'profile' }, async (input) =>
await fetchUserProfile(input.run.userId)
)
Expand Down
16 changes: 8 additions & 8 deletions pkgs/website/src/content/docs/build/delaying-steps.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ Onboarding emails with different start delays (all are root steps with no depend

```typescript
new Flow({
slug: 'user_onboarding',
slug: 'userOnboarding',
maxAttempts: 3,
baseDelay: 1,
})
.step({
slug: 'send_welcome_email',
slug: 'sendWelcomeEmail',
// Executes immediately when flow starts
}, sendWelcomeHandler)
.step({
slug: 'send_day_3_tips',
slug: 'sendDay3Tips',
startDelay: 259200, // 3 days after flow starts
}, sendTipsHandler)
.step({
slug: 'send_week_review',
slug: 'sendWeekReview',
startDelay: 604800, // 7 days after flow starts
}, sendReviewHandler)
```
Expand All @@ -59,16 +59,16 @@ Wait period after a specific step completes:

```typescript
new Flow({
slug: 'trial_conversion',
slug: 'trialConversion',
maxAttempts: 3,
baseDelay: 1,
})
.step({
slug: 'provision_trial',
slug: 'provisionTrial',
}, provisionHandler)
.step({
slug: 'send_upgrade_reminder',
dependsOn: ['provision_trial'],
slug: 'sendUpgradeReminder',
dependsOn: ['provisionTrial'],
startDelay: 1209600, // 14 days after trial provisioning completes
}, reminderHandler)
```
Expand Down
2 changes: 1 addition & 1 deletion pkgs/website/src/content/docs/build/delete-flows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pgflow.delete_flow_and_data(flow_slug TEXT)
Example usage in local development:
```sql
-- Delete a specific flow and all its data
SELECT pgflow.delete_flow_and_data('analyze_website');
SELECT pgflow.delete_flow_and_data('analyzeWebsite');
```

This deletes the flow definition, all runs, queued messages, and task outputs for the specified flow.
Expand Down
Loading