From ea13b41f3d3d7b82ce465dcf6883598478505521 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Tue, 11 Nov 2025 16:48:44 +0100 Subject: [PATCH 01/16] Extract QueueOptions type Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world/src/queue.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 0ad9edccd..383355bbc 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -43,6 +43,11 @@ export const QueuePayloadSchema = z.union([ ]); export type QueuePayload = z.infer; +export interface QueueOptions { + deploymentId?: string; + idempotencyKey?: string; +} + export interface Queue { getDeploymentId(): Promise; @@ -56,10 +61,7 @@ export interface Queue { queue( queueName: ValidQueueName, message: QueuePayload, - opts?: { - deploymentId?: string; - idempotencyKey?: string; - } + opts?: QueueOptions ): Promise<{ messageId: MessageId }>; /** From fb3e7f2fd80ec3a1ecde8b0ce17a94fb3908a03c Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Wed, 12 Nov 2025 11:49:33 +0100 Subject: [PATCH 02/16] Export raw entrypoints Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/builders/src/base-builder.ts | 8 +++++--- packages/sveltekit/src/builder.ts | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 9715d5e20..5024e91cb 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -320,11 +320,13 @@ export abstract class BaseBuilder { const entryContent = ` // Built in steps + import { stepEntrypoint } from 'workflow/runtime'; import '${builtInSteps}'; // User steps ${imports} // API entrypoint - export { stepEntrypoint as POST } from 'workflow/runtime';`; + export const __wkf_entrypoint = stepEntrypoint; + export const POST = stepEntrypoint;`; // Bundle with esbuild and our custom SWC plugin const esbuildCtx = await esbuild.context({ @@ -549,8 +551,8 @@ export abstract class BaseBuilder { import { workflowEntrypoint } from 'workflow/runtime'; const workflowCode = \`${workflowBundleCode.replace(/[\\`$]/g, '\\$&')}\`; - -export const POST = workflowEntrypoint(workflowCode);`; +export const __wkf_entrypoint = workflowEntrypoint(workflowCode); +export const POST = __wkf_entrypoint`; // we skip the final bundling step for Next.js so it can bundle itself if (!bundleFinalOutput) { diff --git a/packages/sveltekit/src/builder.ts b/packages/sveltekit/src/builder.ts index e26b568c5..6f02dec07 100644 --- a/packages/sveltekit/src/builder.ts +++ b/packages/sveltekit/src/builder.ts @@ -92,7 +92,7 @@ export class SvelteKitBuilder extends BaseBuilder { // Replace the default export with SvelteKit-compatible handler stepsRouteContent = stepsRouteContent.replace( - /export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m, + /export\s*const\s*POST\s*=\s*stepEntrypoint\s*;$/m, `${SVELTEKIT_REQUEST_CONVERTER} export const POST = async ({request}) => { const normalRequest = await convertSvelteKitRequest(request); @@ -133,11 +133,11 @@ export const POST = async ({request}) => { // Replace the default export with SvelteKit-compatible handler workflowsRouteContent = workflowsRouteContent.replace( - /export const POST = workflowEntrypoint\(workflowCode\);?$/m, + /export\s*const\s*POST\s*=\s*__wkf_entrypoint\s*;?$/m, `${SVELTEKIT_REQUEST_CONVERTER} export const POST = async ({request}) => { const normalRequest = await convertSvelteKitRequest(request); - return workflowEntrypoint(workflowCode)(normalRequest); + return __wkf_entrypoint(normalRequest); }` ); await writeFile(workflowsRouteFile, workflowsRouteContent); From 83acc5800b7d0bb4b6ec9748ffe59d1e23cf6341 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Wed, 12 Nov 2025 16:28:37 +0100 Subject: [PATCH 03/16] Queue Drivers + Direct wf/steps proxy Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/config.ts | 8 +- packages/world-postgres/src/index.ts | 12 +- .../src/queue-drivers/pg-boss.ts | 86 ++++++++++ .../src/{boss.ts => queue-drivers/types.ts} | 25 ++- .../src/queue-drivers/wkf-proxy.ts | 29 ++++ packages/world-postgres/src/queue.ts | 162 +++++++++--------- 6 files changed, 229 insertions(+), 93 deletions(-) create mode 100644 packages/world-postgres/src/queue-drivers/pg-boss.ts rename packages/world-postgres/src/{boss.ts => queue-drivers/types.ts} (55%) create mode 100644 packages/world-postgres/src/queue-drivers/wkf-proxy.ts diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index 369784f0e..3b3304c79 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -1,5 +1,11 @@ -export interface PostgresWorldConfig { +import type { QueueDriver } from './queue-drivers/types.js'; + +export interface QueueConfig { connectionString: string; jobPrefix?: string; queueConcurrency?: number; } + +export type PostgresWorldConfig = QueueConfig & { + queueFactory: (config: QueueConfig) => QueueDriver; +}; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 2efce00c7..b2b7f2d9f 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -1,9 +1,9 @@ import type { Storage, World } from '@workflow/world'; -import PgBoss from 'pg-boss'; import createPostgres from 'postgres'; import type { PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; import { createQueue } from './queue.js'; +import { createPgBossQueue } from './queue-drivers/pg-boss.js'; import { createEventsStorage, createHooksStorage, @@ -30,14 +30,14 @@ export function createWorld( queueConcurrency: parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || 10, + queueFactory: (config) => createPgBossQueue(config), } ): World & { start(): Promise } { - const boss = new PgBoss({ - connectionString: config.connectionString, - }); + const queueDriver = config.queueFactory(config); const postgres = createPostgres(config.connectionString); const drizzle = createClient(postgres); - const queue = createQueue(boss, config); + + const queue = createQueue(queueDriver); const storage = createStorage(drizzle); const streamer = createStreamer(postgres, drizzle); @@ -46,7 +46,7 @@ export function createWorld( ...streamer, ...queue, async start() { - await queue.start(); + await queueDriver.start(); }, }; } diff --git a/packages/world-postgres/src/queue-drivers/pg-boss.ts b/packages/world-postgres/src/queue-drivers/pg-boss.ts new file mode 100644 index 000000000..31eee598b --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/pg-boss.ts @@ -0,0 +1,86 @@ +import PgBoss from 'pg-boss'; +import type { QueueConfig } from '../config.js'; +import { MessageData } from './types.js'; +import { proxyStep, proxyWorkflow } from './wkf-proxy.js'; + +export function createPgBossQueue(config: QueueConfig) { + let startPromise: Promise | null = null; + const boss = new PgBoss(config.connectionString); + + const stepQueueName = 'workflow_steps'; + const workflowQueueName = 'workflow_flows'; + + const ensureStarted = async () => { + if (!startPromise) { + startPromise = boss.start().then(() => { + return Promise.all([ + boss.createQueue(workflowQueueName), + boss.createQueue(stepQueueName), + ]); + }); + } + + await startPromise; + }; + + return { + pushStep: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(stepQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + pushFlow: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(workflowQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + start: async () => { + await ensureStarted(); + + const workflowHandler = async ([job]: PgBoss.Job[]) => { + const message = MessageData.parse(job.data); + + console.log(`[${job.id}] running: ${message.queueName}`); + + try { + await proxyWorkflow(message); + } catch (error) { + console.error( + `[${job.id}] Error handling workflow: ${message.queueName}`, + error + ); + throw error; + } + }; + + const stepHandler = async ([job]: PgBoss.Job[]) => { + const message = MessageData.parse(job.data); + + console.log(`[${job.id}] running: ${message.queueName}`); + + try { + await proxyStep(message); + } catch (error) { + console.error( + `[${job.id}] Error handling step: ${message.queueName}`, + error + ); + throw error; + } + }; + + for (let i = 0; i < (config.queueConcurrency || 10); i++) { + await boss.work(workflowQueueName, workflowHandler); + await boss.work(stepQueueName, stepHandler); + } + }, + }; +} diff --git a/packages/world-postgres/src/boss.ts b/packages/world-postgres/src/queue-drivers/types.ts similarity index 55% rename from packages/world-postgres/src/boss.ts rename to packages/world-postgres/src/queue-drivers/types.ts index b0af95454..51d04b94b 100644 --- a/packages/world-postgres/src/boss.ts +++ b/packages/world-postgres/src/queue-drivers/types.ts @@ -1,21 +1,30 @@ import { MessageId } from '@workflow/world'; import * as z from 'zod'; -import { Base64Buffer } from './zod.js'; +import { Base64Buffer } from '../zod.js'; /** -/* pgboss is using JSON under the hood, so we need to base64 encode -/* the body to ensure binary safety -/* maybe later we can have a `blobs` table for larger payloads - **/ +/* Most queues are using JSON under the hood, so we need to base64 +/* encode the body to ensure binary safety maybe later we can +/* have a `blobs` table for larger payloads +**/ export const MessageData = z.object({ - attempt: z.number().describe('The attempt number of the message'), - messageId: MessageId.describe('The unique ID of the message'), - idempotencyKey: z.string().optional(), id: z .string() .describe( "The ID of the sub-queue. For workflows, it's the workflow name. For steps, it's the step name." ), + + idempotencyKey: z.string().optional(), + queueName: z.string().describe('The name of the queue'), data: Base64Buffer.describe('The message that was sent'), + messageId: MessageId.describe('The unique ID of the message'), + attempt: z.number().describe('The attempt number of the message'), }); + export type MessageData = z.infer; + +export interface QueueDriver { + pushStep: (message: MessageData) => Promise; + pushFlow: (message: MessageData) => Promise; + start: () => Promise; +} diff --git a/packages/world-postgres/src/queue-drivers/wkf-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-proxy.ts new file mode 100644 index 000000000..b495790d9 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/wkf-proxy.ts @@ -0,0 +1,29 @@ +import { createRequire } from 'node:module'; +import Path from 'node:path'; +import { MessageData } from './types.js'; + +const require = createRequire(Path.join(process.cwd(), 'index.js')); + +export async function proxyWorkflow(message: MessageData) { + const workflows = require(process.env.WORKFLOW_POSTGRES_FLOWS!); + + const request = new Request('https://world-postgres.local/wkf-direct-call', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(MessageData.encode(message)), + }); + + return (await workflows.__wkf_entrypoint(request)) as Promise; +} + +export async function proxyStep(message: MessageData) { + const steps = require(process.env.WORKFLOW_POSTGRES_STEPS!); + + const request = new Request('https://world-postgres.local/wkf-direct-call', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(MessageData.encode(message)), + }); + + return (await steps.__wkf_entrypoint(request)) as Promise; +} diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 5aded60f0..79c09c16a 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -1,4 +1,4 @@ -import * as Stream from 'node:stream'; +import Stream from 'node:stream'; import { JsonTransport } from '@vercel/queue'; import { MessageId, @@ -7,129 +7,135 @@ import { type QueuePrefix, type ValidQueueName, } from '@workflow/world'; -import { createEmbeddedWorld } from '@workflow/world-local'; -import type PgBoss from 'pg-boss'; import { monotonicFactory } from 'ulid'; -import { MessageData } from './boss.js'; -import type { PostgresWorldConfig } from './config.js'; +import { MessageData, type QueueDriver } from './queue-drivers/types.js'; + +const QUEUE_MAX_VISIBILITY = + parseInt(process.env.WORKFLOW_POSTGRES_QUEUE_MAX_VISIBILITY ?? '0', 10) || + Infinity; /** * The Postgres queue works by creating two job types in pg-boss: * - `workflow` for workflow jobs * - `step` for step jobs * - * When a message is queued, it is sent to pg-boss with the appropriate job type. + * When a message is queued, it is prepared and sent to the queue driver with. * When a job is processed, it is deserialized and then re-queued into the _embedded world_, showing that * we can reuse the embedded world, mix and match worlds to build * hybrid architectures, and even migrate between worlds. */ -export function createQueue( - boss: PgBoss, - config: PostgresWorldConfig -): Queue & { start(): Promise } { - const port = process.env.PORT ? Number(process.env.PORT) : undefined; - const embeddedWorld = createEmbeddedWorld({ dataDir: undefined, port }); - +export function createQueue(queueImplementation: QueueDriver): Queue { const transport = new JsonTransport(); const generateMessageId = monotonicFactory(); - const prefix = config.jobPrefix || 'workflow_'; - const Queues = { - __wkf_workflow_: `${prefix}flows`, - __wkf_step_: `${prefix}steps`, - } as const satisfies Record; - - const createQueueHandler = embeddedWorld.createQueueHandler; - const getDeploymentId: Queue['getDeploymentId'] = async () => { return 'postgres'; }; - const createdQueues = new Map>(); - - function createQueue(name: string) { - let createdQueue = createdQueues.get(name); - if (!createdQueue) { - createdQueue = boss.createQueue(name); - createdQueues.set(name, createdQueue); - } - return createdQueue; - } - const queue: Queue['queue'] = async (queue, message, opts) => { - await boss.start(); - const [prefix, queueId] = parseQueueName(queue); - const jobName = Queues[prefix]; - await createQueue(jobName); const body = transport.serialize(message); + const [prefix, queueId] = parseQueueName(queue); const messageId = MessageId.parse(`msg_${generateMessageId()}`); - await boss.send({ - name: jobName, - options: { - singletonKey: opts?.idempotencyKey ?? messageId, - retryLimit: 3, - }, - data: MessageData.encode({ - id: queueId, - data: body, - attempt: 1, - messageId, - idempotencyKey: opts?.idempotencyKey, - }), - }); + + const payload = { + id: queueId, + data: body, + attempt: 1, + messageId, + idempotencyKey: opts?.idempotencyKey, + queueName: `${prefix}${queueId}`, + }; + + switch (prefix) { + case '__wkf_step_': + await queueImplementation.pushStep(payload); + break; + + case '__wkf_workflow_': + await queueImplementation.pushFlow(payload); + break; + } + return { messageId }; }; - async function setupListener(queue: QueuePrefix, jobName: string) { - await createQueue(jobName); - await Promise.all( - Array.from({ length: config.queueConcurrency || 10 }, async () => { - await boss.work(jobName, work); - }) - ); - - async function work([job]: PgBoss.Job[]) { - const messageData = MessageData.parse(job.data); + const createQueueHandler: Queue['createQueueHandler'] = ( + _prefix, + handler + ) => { + return async (req) => { + const reqBody = await req.json(); + const messageData = MessageData.parse(reqBody); const bodyStream = Stream.Readable.toWeb( Stream.Readable.from([messageData.data]) ); + const body = await transport.deserialize( bodyStream as ReadableStream ); + const message = QueuePayloadSchema.parse(body); - const queueName = `${queue}${messageData.id}` as const; - await embeddedWorld.queue(queueName, message, { - idempotencyKey: messageData.idempotencyKey, - }); - } - } - async function setupListeners() { - for (const [prefix, jobName] of Object.entries(Queues) as [ - QueuePrefix, - string, - ][]) { - await setupListener(prefix, jobName); - } - } + if (!isValidQueueName(messageData.queueName)) { + return Response.json( + { error: `Invalid queue name: ${messageData.queueName}` }, + { status: 400 } + ); + } + + try { + const result = await handler(message, { + attempt: messageData.attempt, + queueName: messageData.queueName, + messageId: messageData.messageId, + }); + + let timeoutSeconds: number | null = null; + if (typeof result?.timeoutSeconds === 'number') { + timeoutSeconds = Math.min( + result.timeoutSeconds, + QUEUE_MAX_VISIBILITY + ); + } + + if (timeoutSeconds) { + return Response.json({ timeoutSeconds }, { status: 503 }); + } + + return Response.json({ ok: true }); + } catch (error) { + return Response.json(String(error), { status: 500 }); + } + }; + }; return { createQueueHandler, getDeploymentId, queue, - async start() { - boss = await boss.start(); - await setupListeners(); - }, }; } const parseQueueName = (name: ValidQueueName): [QueuePrefix, string] => { const prefixes: QueuePrefix[] = ['__wkf_step_', '__wkf_workflow_']; + for (const prefix of prefixes) { if (name.startsWith(prefix)) { return [prefix, name.slice(prefix.length)]; } } + throw new Error(`Invalid queue name: ${name}`); }; + +function isValidQueueName(name: string): name is ValidQueueName { + const prefixes: QueuePrefix[] = ['__wkf_step_', '__wkf_workflow_']; + + for (const prefix of prefixes) { + if (name.startsWith(prefix)) { + return true; + } + } + + return false; +} From ea714f46c81cbadff418a9b9f7ad2be7eaf5b9ae Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:23:24 +0100 Subject: [PATCH 04/16] Generate latest migrations Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/DEV_NOTES.md | 7 - packages/world-postgres/README.md | 4 +- .../drizzle/migrations/0001_handy_jocasta.sql | 2 + .../migrations/meta/0001_snapshot.json | 506 ++++++++++++++++++ .../src/drizzle/migrations/meta/_journal.json | 7 + 5 files changed, 517 insertions(+), 9 deletions(-) delete mode 100644 packages/world-postgres/DEV_NOTES.md create mode 100644 packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql create mode 100644 packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json diff --git a/packages/world-postgres/DEV_NOTES.md b/packages/world-postgres/DEV_NOTES.md deleted file mode 100644 index 5efa73261..000000000 --- a/packages/world-postgres/DEV_NOTES.md +++ /dev/null @@ -1,7 +0,0 @@ -# Generate migrations - -The migrations are generated and managed by drizzle. When you perform schema changes you have to generate new migrations using the following command: - -``` -pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations -``` diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index efdbee755..875c2cbce 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -128,8 +128,8 @@ For local development, you can use the included Docker Compose configuration: docker-compose up -d # Create and run migrations -pnpm drizzle-kit generate -pnpm drizzle-kit migrate +pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations +pnpm bin/setup.js # Set environment variables for local development export WORKFLOW_POSTGRES_URL="postgres://world:world@localhost:5432/world" diff --git a/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql b/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql new file mode 100644 index 000000000..38b4631eb --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0001_handy_jocasta.sql @@ -0,0 +1,2 @@ +ALTER TABLE "workflow"."workflow_runs" DROP COLUMN "error_code";--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" DROP COLUMN "error_code"; \ No newline at end of file diff --git a/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json b/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json new file mode 100644 index 000000000..c75386cf7 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/meta/0001_snapshot.json @@ -0,0 +1,506 @@ +{ + "id": "a1cbf36d-8801-4509-80c4-8bab67191377", + "prevId": "fbf17a09-4b7e-4939-9ee9-89ada6e197b1", + "version": "7", + "dialect": "postgresql", + "tables": { + "workflow.workflow_events": { + "name": "workflow_events", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "type": { + "name": "type", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "correlation_id": { + "name": "correlation_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "payload": { + "name": "payload", + "type": "jsonb", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_events_run_id_index": { + "name": "workflow_events_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_events_correlation_id_index": { + "name": "workflow_events_correlation_id_index", + "columns": [ + { + "expression": "correlation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_hooks": { + "name": "workflow_hooks", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "hook_id": { + "name": "hook_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "token": { + "name": "token", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "owner_id": { + "name": "owner_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "project_id": { + "name": "project_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "environment": { + "name": "environment", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "metadata": { + "name": "metadata", + "type": "jsonb", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_hooks_run_id_index": { + "name": "workflow_hooks_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_hooks_token_index": { + "name": "workflow_hooks_token_index", + "columns": [ + { + "expression": "token", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_runs": { + "name": "workflow_runs", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "output": { + "name": "output", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "deployment_id": { + "name": "deployment_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "execution_context": { + "name": "execution_context", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_runs_name_index": { + "name": "workflow_runs_name_index", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_runs_status_index": { + "name": "workflow_runs_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_steps": { + "name": "workflow_steps", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "step_id": { + "name": "step_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "step_name": { + "name": "step_name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "step_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "output": { + "name": "output", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "attempt": { + "name": "attempt", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "retry_after": { + "name": "retry_after", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_steps_run_id_index": { + "name": "workflow_steps_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_steps_status_index": { + "name": "workflow_steps_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_stream_chunks": { + "name": "workflow_stream_chunks", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "stream_id": { + "name": "stream_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "data": { + "name": "data", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "eof": { + "name": "eof", + "type": "boolean", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "workflow_stream_chunks_stream_id_id_pk": { + "name": "workflow_stream_chunks_stream_id_id_pk", + "columns": ["stream_id", "id"] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.step_status": { + "name": "step_status", + "schema": "public", + "values": ["pending", "running", "completed", "failed", "cancelled"] + }, + "public.status": { + "name": "status", + "schema": "public", + "values": [ + "pending", + "running", + "completed", + "failed", + "paused", + "cancelled" + ] + } + }, + "schemas": { + "workflow": "workflow" + }, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json index c5099ff85..2eebb1f60 100644 --- a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json +++ b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1762873019948, "tag": "0000_cultured_the_anarchist", "breakpoints": true + }, + { + "idx": 1, + "version": "7", + "when": 1762968119388, + "tag": "0001_handy_jocasta", + "breakpoints": true } ] } From 855988af3a489e6df5e2373e8925054f21cbfe31 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 13 Nov 2025 00:30:21 +0100 Subject: [PATCH 05/16] Multiple proxies Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/config.ts | 10 +--- packages/world-postgres/src/index.ts | 54 ++++++++++------- .../{pg-boss.ts => pgboss-base.ts} | 56 ++++++++++++++++-- .../queue-drivers/pgboss-function-proxy.ts | 35 +++++++++++ .../src/queue-drivers/pgboss-http-proxy.ts | 35 +++++++++++ .../world-postgres/src/queue-drivers/types.ts | 8 +-- .../src/queue-drivers/wkf-function-proxy.ts | 57 ++++++++++++++++++ .../src/queue-drivers/wkf-http-proxy.ts | 53 +++++++++++++++++ .../src/queue-drivers/wkf-proxy.ts | 29 --------- packages/world-postgres/src/queue.ts | 59 ++++++++++++------- 10 files changed, 310 insertions(+), 86 deletions(-) rename packages/world-postgres/src/queue-drivers/{pg-boss.ts => pgboss-base.ts} (55%) create mode 100644 packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts create mode 100644 packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts create mode 100644 packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts create mode 100644 packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts delete mode 100644 packages/world-postgres/src/queue-drivers/wkf-proxy.ts diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index 3b3304c79..b0b6291ed 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -1,11 +1,7 @@ import type { QueueDriver } from './queue-drivers/types.js'; -export interface QueueConfig { +export type PostgresWorldConfig = { + securityToken: string; connectionString: string; - jobPrefix?: string; - queueConcurrency?: number; -} - -export type PostgresWorldConfig = QueueConfig & { - queueFactory: (config: QueueConfig) => QueueDriver; + queueFactory: () => QueueDriver; }; diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index b2b7f2d9f..f841c64e6 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -3,7 +3,7 @@ import createPostgres from 'postgres'; import type { PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; import { createQueue } from './queue.js'; -import { createPgBossQueue } from './queue-drivers/pg-boss.js'; +import { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js'; import { createEventsStorage, createHooksStorage, @@ -12,33 +12,21 @@ import { } from './storage.js'; import { createStreamer } from './streamer.js'; -function createStorage(drizzle: Drizzle): Storage { - return { - runs: createRunsStorage(drizzle), - events: createEventsStorage(drizzle), - hooks: createHooksStorage(drizzle), - steps: createStepsStorage(drizzle), - }; -} +export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world'; export function createWorld( config: PostgresWorldConfig = { - connectionString: - process.env.WORKFLOW_POSTGRES_URL || - 'postgres://world:world@localhost:5432/world', - jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX, - queueConcurrency: - parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || - 10, - queueFactory: (config) => createPgBossQueue(config), + connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, + securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'secret', + queueFactory: defaultQueueFactory, } ): World & { start(): Promise } { - const queueDriver = config.queueFactory(config); + const queueDriver = config.queueFactory(); const postgres = createPostgres(config.connectionString); const drizzle = createClient(postgres); - const queue = createQueue(queueDriver); const storage = createStorage(drizzle); + const queue = createQueue(queueDriver, config.securityToken); const streamer = createStreamer(postgres, drizzle); return { @@ -51,6 +39,32 @@ export function createWorld( }; } -// Re-export schema for users who want to extend or inspect the database schema +function createStorage(drizzle: Drizzle): Storage { + return { + runs: createRunsStorage(drizzle), + events: createEventsStorage(drizzle), + hooks: createHooksStorage(drizzle), + steps: createStepsStorage(drizzle), + }; +} + +function defaultQueueFactory() { + return createPgBossHttpProxy({ + securityToken: + process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe', + baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL, + connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, + queueConcurrency: + parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || + 10, + }); +} + export type { PostgresWorldConfig } from './config.js'; +// Re-export schema for users who want to extend or inspect the database schema export * from './drizzle/schema.js'; +export type { PgBossFunctionProxyConfig } from './queue-drivers/pgboss-function-proxy.js'; +export { createPgBossFunctionProxy } from './queue-drivers/pgboss-function-proxy.js'; +export type { PgBossHttpProxyConfig } from './queue-drivers/pgboss-http-proxy.js'; +// Re-export queue drivers for custom configurations +export { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js'; diff --git a/packages/world-postgres/src/queue-drivers/pg-boss.ts b/packages/world-postgres/src/queue-drivers/pgboss-base.ts similarity index 55% rename from packages/world-postgres/src/queue-drivers/pg-boss.ts rename to packages/world-postgres/src/queue-drivers/pgboss-base.ts index 31eee598b..9ffe00975 100644 --- a/packages/world-postgres/src/queue-drivers/pg-boss.ts +++ b/packages/world-postgres/src/queue-drivers/pgboss-base.ts @@ -1,9 +1,24 @@ import PgBoss from 'pg-boss'; -import type { QueueConfig } from '../config.js'; -import { MessageData } from './types.js'; -import { proxyStep, proxyWorkflow } from './wkf-proxy.js'; +import { MessageData, type QueueDriver } from './types.js'; -export function createPgBossQueue(config: QueueConfig) { +export interface ProxyFunctions { + proxyWorkflow: (message: MessageData) => Promise; + proxyStep: (message: MessageData) => Promise; +} + +/** + * Base QueueDriver implementation using pg-boss for job management. + * Accepts a proxy implementation that handles the actual workflow/step execution. + * This eliminates code duplication between HTTP and function-based proxies. + */ +export function createPgBossQueue( + config: { + connectionString: string; + jobPrefix?: string; + queueConcurrency?: number; + }, + proxy: ProxyFunctions +): QueueDriver { let startPromise: Promise | null = null; const boss = new PgBoss(config.connectionString); @@ -51,7 +66,22 @@ export function createPgBossQueue(config: QueueConfig) { console.log(`[${job.id}] running: ${message.queueName}`); try { - await proxyWorkflow(message); + const response = await proxy.proxyWorkflow(message); + + // TODO: Properly handle sleep + if (response.status === 503) { + const body = (await response.json()) as { + timeoutSeconds?: number; + }; + if (body.timeoutSeconds) { + throw new Error(`Retry after ${body.timeoutSeconds}s`); + } + } + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Workflow failed: ${text}`); + } } catch (error) { console.error( `[${job.id}] Error handling workflow: ${message.queueName}`, @@ -67,7 +97,21 @@ export function createPgBossQueue(config: QueueConfig) { console.log(`[${job.id}] running: ${message.queueName}`); try { - await proxyStep(message); + const response = await proxy.proxyStep(message); + + if (response.status === 503) { + const body = (await response.json()) as { + timeoutSeconds?: number; + }; + if (body.timeoutSeconds) { + throw new Error(`Retry after ${body.timeoutSeconds}s`); + } + } + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Step failed: ${text}`); + } } catch (error) { console.error( `[${job.id}] Error handling step: ${message.queueName}`, diff --git a/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts b/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts new file mode 100644 index 000000000..ebe10b0d5 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts @@ -0,0 +1,35 @@ +import { createPgBossQueue } from './pgboss-base.js'; +import type { QueueDriver } from './types.js'; +import { createFunctionProxy } from './wkf-function-proxy.js'; + +export interface PgBossFunctionProxyConfig { + connectionString: string; + securityToken: string; + jobPrefix?: string; + queueConcurrency?: number; + workflowEntrypoint: (request: Request) => Promise; + stepEntrypoint: (request: Request) => Promise; +} + +/** + * QueueDriver implementation using pg-boss for job management and direct function calls for execution. + * Workers call entrypoint functions directly in-process without HTTP overhead. + */ +export function createPgBossFunctionProxy( + config: PgBossFunctionProxyConfig +): QueueDriver { + const proxy = createFunctionProxy({ + securityToken: config.securityToken, + workflowEntrypoint: config.workflowEntrypoint, + stepEntrypoint: config.stepEntrypoint, + }); + + return createPgBossQueue( + { + connectionString: config.connectionString, + jobPrefix: config.jobPrefix, + queueConcurrency: config.queueConcurrency, + }, + proxy + ); +} diff --git a/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts b/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts new file mode 100644 index 000000000..ab9bfb454 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts @@ -0,0 +1,35 @@ +import { createPgBossQueue } from './pgboss-base.js'; +import type { QueueDriver } from './types.js'; +import { createHttpProxy } from './wkf-http-proxy.js'; + +export interface PgBossHttpProxyConfig { + connectionString: string; + securityToken: string; + jobPrefix?: string; + queueConcurrency?: number; + baseUrl?: string; + port?: number; +} + +/** + * QueueDriver implementation using pg-boss for job management and HTTP for execution. + * Workers make HTTP calls to the Next.js app's .well-known/workflow/v1/* endpoints. + */ +export function createPgBossHttpProxy( + config: PgBossHttpProxyConfig +): QueueDriver { + const proxy = createHttpProxy({ + securityToken: config.securityToken, + baseUrl: config.baseUrl, + port: config.port, + }); + + return createPgBossQueue( + { + connectionString: config.connectionString, + jobPrefix: config.jobPrefix, + queueConcurrency: config.queueConcurrency, + }, + proxy + ); +} diff --git a/packages/world-postgres/src/queue-drivers/types.ts b/packages/world-postgres/src/queue-drivers/types.ts index 51d04b94b..7adf9ba80 100644 --- a/packages/world-postgres/src/queue-drivers/types.ts +++ b/packages/world-postgres/src/queue-drivers/types.ts @@ -3,10 +3,10 @@ import * as z from 'zod'; import { Base64Buffer } from '../zod.js'; /** -/* Most queues are using JSON under the hood, so we need to base64 -/* encode the body to ensure binary safety maybe later we can -/* have a `blobs` table for larger payloads -**/ + * Most queues are using JSON under the hood, so we need to base64 + * encode the body to ensure binary safety maybe later we can + * have a `blobs` table for larger payloads + */ export const MessageData = z.object({ id: z .string() diff --git a/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts new file mode 100644 index 000000000..72d674b99 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts @@ -0,0 +1,57 @@ +import { MessageData } from './types.js'; + +export interface FunctionProxyConfig { + securityToken: string; + stepEntrypoint: (request: Request) => Promise; + workflowEntrypoint: (request: Request) => Promise; +} + +export interface FunctionProxyFunctions { + proxyStep: (message: MessageData) => Promise; + proxyWorkflow: (message: MessageData) => Promise; +} + +/** + * Creates function-based proxy functions that call workflow/step entrypoints directly. + * Workers call entrypoint functions in-process without HTTP overhead. + */ +export function createFunctionProxy( + config: FunctionProxyConfig +): FunctionProxyFunctions { + const createHeaders = () => { + const headers: Record = { + 'Content-Type': 'application/json', + 'X-Workflow-Secret': config.securityToken, + }; + + return headers; + }; + + return { + proxyWorkflow: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + { + method: 'POST', + headers: createHeaders(), + body: JSON.stringify(MessageData.encode(message)), + } + ); + + return config.workflowEntrypoint(request); + }, + + proxyStep: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + { + method: 'POST', + headers: createHeaders(), + body: JSON.stringify(MessageData.encode(message)), + } + ); + + return config.stepEntrypoint(request); + }, + }; +} diff --git a/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts new file mode 100644 index 000000000..a095cfcaa --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts @@ -0,0 +1,53 @@ +import { MessageData } from './types.js'; + +export interface HttpProxyConfig { + port?: number; + baseUrl?: string; + securityToken: string; +} + +export interface HttpProxyFunctions { + proxyWorkflow: (message: MessageData) => Promise; + proxyStep: (message: MessageData) => Promise; +} + +/** + * Creates HTTP-based proxy functions that call the Next.js app's workflow endpoints. + * Workers communicate with the app via HTTP fetch to .well-known/workflow/v1/* endpoints. + */ +export function createHttpProxy(config: HttpProxyConfig): HttpProxyFunctions { + const resolveBaseUrl = (): string => { + if (config.baseUrl) return config.baseUrl; + if (config.port) return `http://localhost:${config.port}`; + return 'http://localhost:3000'; + }; + + const createHeaders = () => { + const headers: Record = { + 'Content-Type': 'application/json', + 'X-Workflow-Secret': config.securityToken, + }; + + return headers; + }; + + const baseUrl = resolveBaseUrl(); + + return { + proxyWorkflow: async (message: MessageData): Promise => { + return fetch(`${baseUrl}/.well-known/workflow/v1/flow`, { + method: 'POST', + headers: createHeaders(), + body: JSON.stringify(MessageData.encode(message)), + }); + }, + + proxyStep: async (message: MessageData): Promise => { + return fetch(`${baseUrl}/.well-known/workflow/v1/step`, { + method: 'POST', + headers: createHeaders(), + body: JSON.stringify(MessageData.encode(message)), + }); + }, + }; +} diff --git a/packages/world-postgres/src/queue-drivers/wkf-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-proxy.ts deleted file mode 100644 index b495790d9..000000000 --- a/packages/world-postgres/src/queue-drivers/wkf-proxy.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { createRequire } from 'node:module'; -import Path from 'node:path'; -import { MessageData } from './types.js'; - -const require = createRequire(Path.join(process.cwd(), 'index.js')); - -export async function proxyWorkflow(message: MessageData) { - const workflows = require(process.env.WORKFLOW_POSTGRES_FLOWS!); - - const request = new Request('https://world-postgres.local/wkf-direct-call', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(MessageData.encode(message)), - }); - - return (await workflows.__wkf_entrypoint(request)) as Promise; -} - -export async function proxyStep(message: MessageData) { - const steps = require(process.env.WORKFLOW_POSTGRES_STEPS!); - - const request = new Request('https://world-postgres.local/wkf-direct-call', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(MessageData.encode(message)), - }); - - return (await steps.__wkf_entrypoint(request)) as Promise; -} diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 79c09c16a..5e1ff10ac 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -3,6 +3,7 @@ import { JsonTransport } from '@vercel/queue'; import { MessageId, type Queue, + type QueuePayload, QueuePayloadSchema, type QueuePrefix, type ValidQueueName, @@ -10,6 +11,8 @@ import { import { monotonicFactory } from 'ulid'; import { MessageData, type QueueDriver } from './queue-drivers/types.js'; +const transport = new JsonTransport(); + const QUEUE_MAX_VISIBILITY = parseInt(process.env.WORKFLOW_POSTGRES_QUEUE_MAX_VISIBILITY ?? '0', 10) || Infinity; @@ -24,8 +27,10 @@ const QUEUE_MAX_VISIBILITY = * we can reuse the embedded world, mix and match worlds to build * hybrid architectures, and even migrate between worlds. */ -export function createQueue(queueImplementation: QueueDriver): Queue { - const transport = new JsonTransport(); +export function createQueue( + queueDriver: QueueDriver, + securityToken: string +): Queue { const generateMessageId = monotonicFactory(); const getDeploymentId: Queue['getDeploymentId'] = async () => { @@ -48,11 +53,11 @@ export function createQueue(queueImplementation: QueueDriver): Queue { switch (prefix) { case '__wkf_step_': - await queueImplementation.pushStep(payload); + await queueDriver.pushStep(payload); break; case '__wkf_workflow_': - await queueImplementation.pushFlow(payload); + await queueDriver.pushFlow(payload); break; } @@ -64,30 +69,28 @@ export function createQueue(queueImplementation: QueueDriver): Queue { handler ) => { return async (req) => { - const reqBody = await req.json(); - const messageData = MessageData.parse(reqBody); - const bodyStream = Stream.Readable.toWeb( - Stream.Readable.from([messageData.data]) - ); - - const body = await transport.deserialize( - bodyStream as ReadableStream - ); + const secret = req.headers.get('X-Workflow-Secret'); + const [message, payload] = await parse(req); - const message = QueuePayloadSchema.parse(body); + if (!secret || securityToken !== secret) { + return Response.json( + { error: 'Unauthorized: Invalid or missing secret key' }, + { status: 401 } + ); + } - if (!isValidQueueName(messageData.queueName)) { + if (!isValidQueueName(message.queueName)) { return Response.json( - { error: `Invalid queue name: ${messageData.queueName}` }, + { error: `Invalid queue name: ${message.queueName}` }, { status: 400 } ); } try { - const result = await handler(message, { - attempt: messageData.attempt, - queueName: messageData.queueName, - messageId: messageData.messageId, + const result = await handler(payload, { + attempt: message.attempt, + queueName: message.queueName, + messageId: message.messageId, }); let timeoutSeconds: number | null = null; @@ -139,3 +142,19 @@ function isValidQueueName(name: string): name is ValidQueueName { return false; } + +async function parse(req: Request): Promise<[MessageData, QueuePayload]> { + const reqBody = await req.json(); + const messageData = MessageData.parse(reqBody); + const bodyStream = Stream.Readable.toWeb( + Stream.Readable.from([messageData.data]) + ); + + const body = await transport.deserialize( + bodyStream as ReadableStream + ); + + const payload = QueuePayloadSchema.parse(body); + + return [messageData, payload]; +} From a876f17cc7beb814e323d51b84501958e39ae57a Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:04:10 +0100 Subject: [PATCH 06/16] Code organisation Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/index.ts | 20 +-- .../src/proxies/function-proxy.ts | 29 ++++ .../world-postgres/src/proxies/http-proxy.ts | 33 +++++ packages/world-postgres/src/proxies/types.ts | 6 + packages/world-postgres/src/proxies/utils.ts | 15 ++ .../src/queue-drivers/factories.ts | 56 ++++++++ .../src/queue-drivers/pgboss-base.ts | 130 ------------------ .../queue-drivers/pgboss-function-proxy.ts | 35 ----- .../src/queue-drivers/pgboss-http-proxy.ts | 35 ----- .../src/queue-drivers/pgboss.ts | 100 ++++++++++++++ .../src/queue-drivers/wkf-function-proxy.ts | 57 -------- .../src/queue-drivers/wkf-http-proxy.ts | 53 ------- packages/world-postgres/src/queue.ts | 11 +- 13 files changed, 252 insertions(+), 328 deletions(-) create mode 100644 packages/world-postgres/src/proxies/function-proxy.ts create mode 100644 packages/world-postgres/src/proxies/http-proxy.ts create mode 100644 packages/world-postgres/src/proxies/types.ts create mode 100644 packages/world-postgres/src/proxies/utils.ts create mode 100644 packages/world-postgres/src/queue-drivers/factories.ts delete mode 100644 packages/world-postgres/src/queue-drivers/pgboss-base.ts delete mode 100644 packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts delete mode 100644 packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts create mode 100644 packages/world-postgres/src/queue-drivers/pgboss.ts delete mode 100644 packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts delete mode 100644 packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index f841c64e6..b09f38e39 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -2,8 +2,14 @@ import type { Storage, World } from '@workflow/world'; import createPostgres from 'postgres'; import type { PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; +import { createFunctionProxy } from './proxies/function-proxy.js'; +import { createHttpProxy } from './proxies/http-proxy.js'; import { createQueue } from './queue.js'; -import { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js'; +import { + createPgBossFunctionProxy, + createPgBossHttpProxy, +} from './queue-drivers/factories.js'; +import { createPgBossQueue } from './queue-drivers/pgboss.js'; import { createEventsStorage, createHooksStorage, @@ -50,10 +56,10 @@ function createStorage(drizzle: Drizzle): Storage { function defaultQueueFactory() { return createPgBossHttpProxy({ - securityToken: - process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe', baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL, connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, + securityToken: + process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe', queueConcurrency: parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || 10, @@ -63,8 +69,6 @@ function defaultQueueFactory() { export type { PostgresWorldConfig } from './config.js'; // Re-export schema for users who want to extend or inspect the database schema export * from './drizzle/schema.js'; -export type { PgBossFunctionProxyConfig } from './queue-drivers/pgboss-function-proxy.js'; -export { createPgBossFunctionProxy } from './queue-drivers/pgboss-function-proxy.js'; -export type { PgBossHttpProxyConfig } from './queue-drivers/pgboss-http-proxy.js'; -// Re-export queue drivers for custom configurations -export { createPgBossHttpProxy } from './queue-drivers/pgboss-http-proxy.js'; + +export { createFunctionProxy, createHttpProxy }; +export { createPgBossQueue, createPgBossFunctionProxy, createPgBossHttpProxy }; diff --git a/packages/world-postgres/src/proxies/function-proxy.ts b/packages/world-postgres/src/proxies/function-proxy.ts new file mode 100644 index 000000000..9a6705bdc --- /dev/null +++ b/packages/world-postgres/src/proxies/function-proxy.ts @@ -0,0 +1,29 @@ +import type { MessageData } from '../queue-drivers/types.js'; +import type { WkfProxy } from './types.js'; +import { prepareRequestParams } from './utils.js'; + +export function createFunctionProxy(opts: { + securityToken: string; + stepEntrypoint: (request: Request) => Promise; + workflowEntrypoint: (request: Request) => Promise; +}): WkfProxy { + return { + proxyWorkflow: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + prepareRequestParams(message, opts.securityToken) + ); + + return opts.workflowEntrypoint(request); + }, + + proxyStep: async (message: MessageData): Promise => { + const request = new Request( + 'https://world-postgres.local/wkf-direct-call', + prepareRequestParams(message, opts.securityToken) + ); + + return opts.stepEntrypoint(request); + }, + }; +} diff --git a/packages/world-postgres/src/proxies/http-proxy.ts b/packages/world-postgres/src/proxies/http-proxy.ts new file mode 100644 index 000000000..7bb9b5440 --- /dev/null +++ b/packages/world-postgres/src/proxies/http-proxy.ts @@ -0,0 +1,33 @@ +import type { MessageData } from '../queue-drivers/types.js'; +import type { WkfProxy } from './types.js'; +import { prepareRequestParams } from './utils.js'; + +export function createHttpProxy(opts: { + port?: number; + baseUrl?: string; + securityToken: string; +}): WkfProxy { + const resolveBaseUrl = (): string => { + if (opts.baseUrl) return opts.baseUrl; + if (opts.port) return `http://localhost:${opts.port}`; + return 'http://localhost:3000'; + }; + + const baseUrl = resolveBaseUrl(); + + return { + proxyWorkflow: async (message: MessageData): Promise => { + return fetch( + `${baseUrl}/.well-known/workflow/v1/flow`, + prepareRequestParams(message, opts.securityToken) + ); + }, + + proxyStep: async (message: MessageData): Promise => { + return fetch( + `${baseUrl}/.well-known/workflow/v1/step`, + prepareRequestParams(message, opts.securityToken) + ); + }, + }; +} diff --git a/packages/world-postgres/src/proxies/types.ts b/packages/world-postgres/src/proxies/types.ts new file mode 100644 index 000000000..faca2b9ca --- /dev/null +++ b/packages/world-postgres/src/proxies/types.ts @@ -0,0 +1,6 @@ +import type { MessageData } from '../queue-drivers/types.js'; + +export interface WkfProxy { + proxyWorkflow: (message: MessageData) => Promise; + proxyStep: (message: MessageData) => Promise; +} diff --git a/packages/world-postgres/src/proxies/utils.ts b/packages/world-postgres/src/proxies/utils.ts new file mode 100644 index 000000000..18563abca --- /dev/null +++ b/packages/world-postgres/src/proxies/utils.ts @@ -0,0 +1,15 @@ +import { MessageData } from '../queue-drivers/types.js'; + +export const prepareRequestParams = ( + message: MessageData, + securityToken: string +) => { + return { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Workflow-Secret': securityToken, + }, + body: JSON.stringify(MessageData.encode(message)), + }; +}; diff --git a/packages/world-postgres/src/queue-drivers/factories.ts b/packages/world-postgres/src/queue-drivers/factories.ts new file mode 100644 index 000000000..d027651ed --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/factories.ts @@ -0,0 +1,56 @@ +import { createFunctionProxy } from '../proxies/function-proxy.js'; +import { createHttpProxy } from '../proxies/http-proxy.js'; +import { createPgBossQueue } from './pgboss.js'; +import type { QueueDriver } from './types.js'; + +/** + * QueueDriver implementation using pg-boss for job management + * and direct function calls for execution. + */ +export function createPgBossFunctionProxy(opts: { + jobPrefix?: string; + securityToken: string; + connectionString: string; + queueConcurrency?: number; + stepEntrypoint: (request: Request) => Promise; + workflowEntrypoint: (request: Request) => Promise; +}): QueueDriver { + return createPgBossQueue( + { + jobPrefix: opts.jobPrefix, + connectionString: opts.connectionString, + queueConcurrency: opts.queueConcurrency, + }, + createFunctionProxy({ + securityToken: opts.securityToken, + stepEntrypoint: opts.stepEntrypoint, + workflowEntrypoint: opts.workflowEntrypoint, + }) + ); +} + +/** + * QueueDriver implementation using pg-boss for job management + * and HTTP for execution. + */ +export function createPgBossHttpProxy(config: { + port?: number; + baseUrl?: string; + jobPrefix?: string; + securityToken: string; + connectionString: string; + queueConcurrency?: number; +}): QueueDriver { + return createPgBossQueue( + { + jobPrefix: config.jobPrefix, + connectionString: config.connectionString, + queueConcurrency: config.queueConcurrency, + }, + createHttpProxy({ + port: config.port, + baseUrl: config.baseUrl, + securityToken: config.securityToken, + }) + ); +} diff --git a/packages/world-postgres/src/queue-drivers/pgboss-base.ts b/packages/world-postgres/src/queue-drivers/pgboss-base.ts deleted file mode 100644 index 9ffe00975..000000000 --- a/packages/world-postgres/src/queue-drivers/pgboss-base.ts +++ /dev/null @@ -1,130 +0,0 @@ -import PgBoss from 'pg-boss'; -import { MessageData, type QueueDriver } from './types.js'; - -export interface ProxyFunctions { - proxyWorkflow: (message: MessageData) => Promise; - proxyStep: (message: MessageData) => Promise; -} - -/** - * Base QueueDriver implementation using pg-boss for job management. - * Accepts a proxy implementation that handles the actual workflow/step execution. - * This eliminates code duplication between HTTP and function-based proxies. - */ -export function createPgBossQueue( - config: { - connectionString: string; - jobPrefix?: string; - queueConcurrency?: number; - }, - proxy: ProxyFunctions -): QueueDriver { - let startPromise: Promise | null = null; - const boss = new PgBoss(config.connectionString); - - const stepQueueName = 'workflow_steps'; - const workflowQueueName = 'workflow_flows'; - - const ensureStarted = async () => { - if (!startPromise) { - startPromise = boss.start().then(() => { - return Promise.all([ - boss.createQueue(workflowQueueName), - boss.createQueue(stepQueueName), - ]); - }); - } - - await startPromise; - }; - - return { - pushStep: async (message: MessageData) => { - await ensureStarted(); - - await boss.send(stepQueueName, MessageData.encode(message), { - singletonKey: message?.idempotencyKey ?? message.messageId, - retryLimit: 3, - }); - }, - - pushFlow: async (message: MessageData) => { - await ensureStarted(); - - await boss.send(workflowQueueName, MessageData.encode(message), { - singletonKey: message?.idempotencyKey ?? message.messageId, - retryLimit: 3, - }); - }, - - start: async () => { - await ensureStarted(); - - const workflowHandler = async ([job]: PgBoss.Job[]) => { - const message = MessageData.parse(job.data); - - console.log(`[${job.id}] running: ${message.queueName}`); - - try { - const response = await proxy.proxyWorkflow(message); - - // TODO: Properly handle sleep - if (response.status === 503) { - const body = (await response.json()) as { - timeoutSeconds?: number; - }; - if (body.timeoutSeconds) { - throw new Error(`Retry after ${body.timeoutSeconds}s`); - } - } - - if (!response.ok) { - const text = await response.text(); - throw new Error(`Workflow failed: ${text}`); - } - } catch (error) { - console.error( - `[${job.id}] Error handling workflow: ${message.queueName}`, - error - ); - throw error; - } - }; - - const stepHandler = async ([job]: PgBoss.Job[]) => { - const message = MessageData.parse(job.data); - - console.log(`[${job.id}] running: ${message.queueName}`); - - try { - const response = await proxy.proxyStep(message); - - if (response.status === 503) { - const body = (await response.json()) as { - timeoutSeconds?: number; - }; - if (body.timeoutSeconds) { - throw new Error(`Retry after ${body.timeoutSeconds}s`); - } - } - - if (!response.ok) { - const text = await response.text(); - throw new Error(`Step failed: ${text}`); - } - } catch (error) { - console.error( - `[${job.id}] Error handling step: ${message.queueName}`, - error - ); - throw error; - } - }; - - for (let i = 0; i < (config.queueConcurrency || 10); i++) { - await boss.work(workflowQueueName, workflowHandler); - await boss.work(stepQueueName, stepHandler); - } - }, - }; -} diff --git a/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts b/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts deleted file mode 100644 index ebe10b0d5..000000000 --- a/packages/world-postgres/src/queue-drivers/pgboss-function-proxy.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { createPgBossQueue } from './pgboss-base.js'; -import type { QueueDriver } from './types.js'; -import { createFunctionProxy } from './wkf-function-proxy.js'; - -export interface PgBossFunctionProxyConfig { - connectionString: string; - securityToken: string; - jobPrefix?: string; - queueConcurrency?: number; - workflowEntrypoint: (request: Request) => Promise; - stepEntrypoint: (request: Request) => Promise; -} - -/** - * QueueDriver implementation using pg-boss for job management and direct function calls for execution. - * Workers call entrypoint functions directly in-process without HTTP overhead. - */ -export function createPgBossFunctionProxy( - config: PgBossFunctionProxyConfig -): QueueDriver { - const proxy = createFunctionProxy({ - securityToken: config.securityToken, - workflowEntrypoint: config.workflowEntrypoint, - stepEntrypoint: config.stepEntrypoint, - }); - - return createPgBossQueue( - { - connectionString: config.connectionString, - jobPrefix: config.jobPrefix, - queueConcurrency: config.queueConcurrency, - }, - proxy - ); -} diff --git a/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts b/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts deleted file mode 100644 index ab9bfb454..000000000 --- a/packages/world-postgres/src/queue-drivers/pgboss-http-proxy.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { createPgBossQueue } from './pgboss-base.js'; -import type { QueueDriver } from './types.js'; -import { createHttpProxy } from './wkf-http-proxy.js'; - -export interface PgBossHttpProxyConfig { - connectionString: string; - securityToken: string; - jobPrefix?: string; - queueConcurrency?: number; - baseUrl?: string; - port?: number; -} - -/** - * QueueDriver implementation using pg-boss for job management and HTTP for execution. - * Workers make HTTP calls to the Next.js app's .well-known/workflow/v1/* endpoints. - */ -export function createPgBossHttpProxy( - config: PgBossHttpProxyConfig -): QueueDriver { - const proxy = createHttpProxy({ - securityToken: config.securityToken, - baseUrl: config.baseUrl, - port: config.port, - }); - - return createPgBossQueue( - { - connectionString: config.connectionString, - jobPrefix: config.jobPrefix, - queueConcurrency: config.queueConcurrency, - }, - proxy - ); -} diff --git a/packages/world-postgres/src/queue-drivers/pgboss.ts b/packages/world-postgres/src/queue-drivers/pgboss.ts new file mode 100644 index 000000000..46cab0146 --- /dev/null +++ b/packages/world-postgres/src/queue-drivers/pgboss.ts @@ -0,0 +1,100 @@ +import PgBoss from 'pg-boss'; +import type { WkfProxy } from '../proxies/types.js'; +import { MessageData, type QueueDriver } from './types.js'; + +/** + * Base QueueDriver implementation using pg-boss for job management. + * Takes in a proxy that will handle the actual step/flow execution. + */ +export function createPgBossQueue( + config: { + jobPrefix?: string; + connectionString: string; + queueConcurrency?: number; + }, + proxy: WkfProxy +): QueueDriver { + let startPromise: Promise | null = null; + const boss = new PgBoss(config.connectionString); + + const stepQueueName = 'workflow_steps'; + const workflowQueueName = 'workflow_flows'; + + const ensureStarted = async () => { + if (!startPromise) { + startPromise = boss.start().then(() => { + return Promise.all([ + boss.createQueue(workflowQueueName), + boss.createQueue(stepQueueName), + ]); + }); + } + + await startPromise; + }; + + return { + pushStep: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(stepQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + pushFlow: async (message: MessageData) => { + await ensureStarted(); + + await boss.send(workflowQueueName, MessageData.encode(message), { + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + }, + + start: async () => { + await ensureStarted(); + + const stepWorker = createWorker(proxy.proxyStep); + const workflowWorker = createWorker(proxy.proxyWorkflow); + + for (let i = 0; i < (config.queueConcurrency || 10); i++) { + await boss.work(workflowQueueName, workflowWorker); + await boss.work(stepQueueName, stepWorker); + } + }, + }; +} + +function createWorker(proxy: WkfProxy[keyof WkfProxy]) { + return async ([job]: PgBoss.Job[]) => { + const message = MessageData.parse(job.data); + + console.log(`[${job.id}] running: ${message.queueName}`); + + try { + const response = await proxy(message); + + // TODO: Properly handle 503 + if (response.status === 503) { + const body = (await response.json()) as { + timeoutSeconds?: number; + }; + if (body.timeoutSeconds) { + throw new Error(`Retry after ${body.timeoutSeconds}s`); + } + } + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Step failed: ${text}`); + } + } catch (error) { + console.error( + `[${job.id}] Error handling step: ${message.queueName}`, + error + ); + throw error; + } + }; +} diff --git a/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts deleted file mode 100644 index 72d674b99..000000000 --- a/packages/world-postgres/src/queue-drivers/wkf-function-proxy.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { MessageData } from './types.js'; - -export interface FunctionProxyConfig { - securityToken: string; - stepEntrypoint: (request: Request) => Promise; - workflowEntrypoint: (request: Request) => Promise; -} - -export interface FunctionProxyFunctions { - proxyStep: (message: MessageData) => Promise; - proxyWorkflow: (message: MessageData) => Promise; -} - -/** - * Creates function-based proxy functions that call workflow/step entrypoints directly. - * Workers call entrypoint functions in-process without HTTP overhead. - */ -export function createFunctionProxy( - config: FunctionProxyConfig -): FunctionProxyFunctions { - const createHeaders = () => { - const headers: Record = { - 'Content-Type': 'application/json', - 'X-Workflow-Secret': config.securityToken, - }; - - return headers; - }; - - return { - proxyWorkflow: async (message: MessageData): Promise => { - const request = new Request( - 'https://world-postgres.local/wkf-direct-call', - { - method: 'POST', - headers: createHeaders(), - body: JSON.stringify(MessageData.encode(message)), - } - ); - - return config.workflowEntrypoint(request); - }, - - proxyStep: async (message: MessageData): Promise => { - const request = new Request( - 'https://world-postgres.local/wkf-direct-call', - { - method: 'POST', - headers: createHeaders(), - body: JSON.stringify(MessageData.encode(message)), - } - ); - - return config.stepEntrypoint(request); - }, - }; -} diff --git a/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts b/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts deleted file mode 100644 index a095cfcaa..000000000 --- a/packages/world-postgres/src/queue-drivers/wkf-http-proxy.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { MessageData } from './types.js'; - -export interface HttpProxyConfig { - port?: number; - baseUrl?: string; - securityToken: string; -} - -export interface HttpProxyFunctions { - proxyWorkflow: (message: MessageData) => Promise; - proxyStep: (message: MessageData) => Promise; -} - -/** - * Creates HTTP-based proxy functions that call the Next.js app's workflow endpoints. - * Workers communicate with the app via HTTP fetch to .well-known/workflow/v1/* endpoints. - */ -export function createHttpProxy(config: HttpProxyConfig): HttpProxyFunctions { - const resolveBaseUrl = (): string => { - if (config.baseUrl) return config.baseUrl; - if (config.port) return `http://localhost:${config.port}`; - return 'http://localhost:3000'; - }; - - const createHeaders = () => { - const headers: Record = { - 'Content-Type': 'application/json', - 'X-Workflow-Secret': config.securityToken, - }; - - return headers; - }; - - const baseUrl = resolveBaseUrl(); - - return { - proxyWorkflow: async (message: MessageData): Promise => { - return fetch(`${baseUrl}/.well-known/workflow/v1/flow`, { - method: 'POST', - headers: createHeaders(), - body: JSON.stringify(MessageData.encode(message)), - }); - }, - - proxyStep: async (message: MessageData): Promise => { - return fetch(`${baseUrl}/.well-known/workflow/v1/step`, { - method: 'POST', - headers: createHeaders(), - body: JSON.stringify(MessageData.encode(message)), - }); - }, - }; -} diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 5e1ff10ac..85d38ef1d 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -17,16 +17,6 @@ const QUEUE_MAX_VISIBILITY = parseInt(process.env.WORKFLOW_POSTGRES_QUEUE_MAX_VISIBILITY ?? '0', 10) || Infinity; -/** - * The Postgres queue works by creating two job types in pg-boss: - * - `workflow` for workflow jobs - * - `step` for step jobs - * - * When a message is queued, it is prepared and sent to the queue driver with. - * When a job is processed, it is deserialized and then re-queued into the _embedded world_, showing that - * we can reuse the embedded world, mix and match worlds to build - * hybrid architectures, and even migrate between worlds. - */ export function createQueue( queueDriver: QueueDriver, securityToken: string @@ -93,6 +83,7 @@ export function createQueue( messageId: message.messageId, }); + // TODO: Understand what's this? let timeoutSeconds: number | null = null; if (typeof result?.timeoutSeconds === 'number') { timeoutSeconds = Math.min( From 1bfab80da74c9ff2db528adad61e32c03981343a Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:15:03 +0100 Subject: [PATCH 07/16] Use queue prefix Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/index.ts | 13 +++++++++---- .../world-postgres/src/queue-drivers/factories.ts | 4 ++-- packages/world-postgres/src/queue-drivers/pgboss.ts | 5 +++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index b09f38e39..3a4f2a066 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -6,8 +6,8 @@ import { createFunctionProxy } from './proxies/function-proxy.js'; import { createHttpProxy } from './proxies/http-proxy.js'; import { createQueue } from './queue.js'; import { - createPgBossFunctionProxy, - createPgBossHttpProxy, + createPgBossFunctionProxyQueue, + createPgBossHttpProxyQueue, } from './queue-drivers/factories.js'; import { createPgBossQueue } from './queue-drivers/pgboss.js'; import { @@ -55,11 +55,12 @@ function createStorage(drizzle: Drizzle): Storage { } function defaultQueueFactory() { - return createPgBossHttpProxy({ + return createPgBossHttpProxyQueue({ baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL, connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe', + jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX, queueConcurrency: parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || 10, @@ -71,4 +72,8 @@ export type { PostgresWorldConfig } from './config.js'; export * from './drizzle/schema.js'; export { createFunctionProxy, createHttpProxy }; -export { createPgBossQueue, createPgBossFunctionProxy, createPgBossHttpProxy }; +export { + createPgBossQueue, + createPgBossFunctionProxyQueue as createPgBossFunctionProxy, + createPgBossHttpProxyQueue as createPgBossHttpProxy, +}; diff --git a/packages/world-postgres/src/queue-drivers/factories.ts b/packages/world-postgres/src/queue-drivers/factories.ts index d027651ed..21e3f0ae1 100644 --- a/packages/world-postgres/src/queue-drivers/factories.ts +++ b/packages/world-postgres/src/queue-drivers/factories.ts @@ -7,7 +7,7 @@ import type { QueueDriver } from './types.js'; * QueueDriver implementation using pg-boss for job management * and direct function calls for execution. */ -export function createPgBossFunctionProxy(opts: { +export function createPgBossFunctionProxyQueue(opts: { jobPrefix?: string; securityToken: string; connectionString: string; @@ -33,7 +33,7 @@ export function createPgBossFunctionProxy(opts: { * QueueDriver implementation using pg-boss for job management * and HTTP for execution. */ -export function createPgBossHttpProxy(config: { +export function createPgBossHttpProxyQueue(config: { port?: number; baseUrl?: string; jobPrefix?: string; diff --git a/packages/world-postgres/src/queue-drivers/pgboss.ts b/packages/world-postgres/src/queue-drivers/pgboss.ts index 46cab0146..40d1bf6c2 100644 --- a/packages/world-postgres/src/queue-drivers/pgboss.ts +++ b/packages/world-postgres/src/queue-drivers/pgboss.ts @@ -17,8 +17,9 @@ export function createPgBossQueue( let startPromise: Promise | null = null; const boss = new PgBoss(config.connectionString); - const stepQueueName = 'workflow_steps'; - const workflowQueueName = 'workflow_flows'; + const prefix = config.jobPrefix || 'workflow_'; + const stepQueueName = `${prefix}steps`; + const workflowQueueName = `${prefix}flows`; const ensureStarted = async () => { if (!startPromise) { From f2a49f8bb463477364ecc16b2482368364b2b9e9 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 13 Nov 2025 12:23:45 +0100 Subject: [PATCH 08/16] Config Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/cli.ts | 3 +- packages/world-postgres/src/config.ts | 59 +++++++++++++++++-- packages/world-postgres/src/index.ts | 34 ++++------- .../src/queue-drivers/factories.ts | 59 ++++++++++++++----- .../src/queue-drivers/pgboss.ts | 15 +++-- 5 files changed, 119 insertions(+), 51 deletions(-) diff --git a/packages/world-postgres/src/cli.ts b/packages/world-postgres/src/cli.ts index 627ac4cd1..209dd466c 100644 --- a/packages/world-postgres/src/cli.ts +++ b/packages/world-postgres/src/cli.ts @@ -4,6 +4,7 @@ import { config } from 'dotenv'; import { drizzle } from 'drizzle-orm/postgres-js'; import { migrate } from 'drizzle-orm/postgres-js/migrator'; import postgres from 'postgres'; +import { DEFAULT_PG_URL } from './config.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -14,7 +15,7 @@ async function setupDatabase() { const connectionString = process.env.WORKFLOW_POSTGRES_URL || process.env.DATABASE_URL || - 'postgres://world:world@localhost:5432/world'; + DEFAULT_PG_URL; console.log('🔧 Setting up database schema...'); console.log( diff --git a/packages/world-postgres/src/config.ts b/packages/world-postgres/src/config.ts index b0b6291ed..f27c8698d 100644 --- a/packages/world-postgres/src/config.ts +++ b/packages/world-postgres/src/config.ts @@ -1,7 +1,58 @@ import type { QueueDriver } from './queue-drivers/types.js'; -export type PostgresWorldConfig = { - securityToken: string; - connectionString: string; - queueFactory: () => QueueDriver; +export type BaseWorldConfig = { + connectionString?: string; + securityToken?: string; }; + +export type PostgresWorldConfig = BaseWorldConfig & { + queueFactory?: () => QueueDriver; +}; + +export type ResolvedBaseWorldConfig = Required; + +export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world'; +export const DEFAULT_SECURITY_TOKEN = 'secret'; +export const DEFAULT_JOB_PREFIX = 'workflow_'; +export const DEFAULT_QUEUE_CONCURRENCY = 10; + +let worldConfig: ResolvedBaseWorldConfig | null = null; + +export function loadWorldConfig( + config: BaseWorldConfig = {} +): ResolvedBaseWorldConfig { + worldConfig = { + connectionString: + config.connectionString ?? + process.env.WORKFLOW_POSTGRES_URL ?? + process.env.DATABASE_URL ?? + DEFAULT_PG_URL, + + securityToken: + config.securityToken ?? + process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN ?? + DEFAULT_SECURITY_TOKEN, + }; + + return worldConfig; +} + +export function getWorldConfig(): ResolvedBaseWorldConfig { + if (!worldConfig) { + throw new Error( + 'World config not loaded. Call createWorld() or loadWorldConfig().' + ); + } + + return worldConfig; +} + +export function getQueueConfig() { + return { + jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX ?? DEFAULT_JOB_PREFIX, + queueConcurrency: + (process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY + ? parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY, 10) + : undefined) ?? DEFAULT_QUEUE_CONCURRENCY, + }; +} diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 3a4f2a066..4afaf427f 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -1,6 +1,6 @@ import type { Storage, World } from '@workflow/world'; import createPostgres from 'postgres'; -import type { PostgresWorldConfig } from './config.js'; +import { loadWorldConfig, type PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; import { createFunctionProxy } from './proxies/function-proxy.js'; import { createHttpProxy } from './proxies/http-proxy.js'; @@ -18,16 +18,15 @@ import { } from './storage.js'; import { createStreamer } from './streamer.js'; -export const DEFAULT_PG_URL = 'postgres://world:world@localhost:5432/world'; - export function createWorld( - config: PostgresWorldConfig = { - connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, - securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'secret', - queueFactory: defaultQueueFactory, - } + opts: PostgresWorldConfig = {} ): World & { start(): Promise } { - const queueDriver = config.queueFactory(); + const config = loadWorldConfig(opts); + + const queueDriver = opts.queueFactory + ? opts.queueFactory() + : createPgBossHttpProxyQueue(); + const postgres = createPostgres(config.connectionString); const drizzle = createClient(postgres); @@ -54,19 +53,6 @@ function createStorage(drizzle: Drizzle): Storage { }; } -function defaultQueueFactory() { - return createPgBossHttpProxyQueue({ - baseUrl: process.env.WORKFLOW_POSTGRES_BASE_URL, - connectionString: process.env.WORKFLOW_POSTGRES_URL || DEFAULT_PG_URL, - securityToken: - process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN || 'this-is-not-safe', - jobPrefix: process.env.WORKFLOW_POSTGRES_JOB_PREFIX, - queueConcurrency: - parseInt(process.env.WORKFLOW_POSTGRES_WORKER_CONCURRENCY || '10', 10) || - 10, - }); -} - export type { PostgresWorldConfig } from './config.js'; // Re-export schema for users who want to extend or inspect the database schema export * from './drizzle/schema.js'; @@ -74,6 +60,6 @@ export * from './drizzle/schema.js'; export { createFunctionProxy, createHttpProxy }; export { createPgBossQueue, - createPgBossFunctionProxyQueue as createPgBossFunctionProxy, - createPgBossHttpProxyQueue as createPgBossHttpProxy, + createPgBossFunctionProxyQueue, + createPgBossHttpProxyQueue, }; diff --git a/packages/world-postgres/src/queue-drivers/factories.ts b/packages/world-postgres/src/queue-drivers/factories.ts index 21e3f0ae1..df314e157 100644 --- a/packages/world-postgres/src/queue-drivers/factories.ts +++ b/packages/world-postgres/src/queue-drivers/factories.ts @@ -1,3 +1,4 @@ +import { getQueueConfig, getWorldConfig } from '../config.js'; import { createFunctionProxy } from '../proxies/function-proxy.js'; import { createHttpProxy } from '../proxies/http-proxy.js'; import { createPgBossQueue } from './pgboss.js'; @@ -9,20 +10,30 @@ import type { QueueDriver } from './types.js'; */ export function createPgBossFunctionProxyQueue(opts: { jobPrefix?: string; - securityToken: string; - connectionString: string; + securityToken?: string; + connectionString?: string; queueConcurrency?: number; stepEntrypoint: (request: Request) => Promise; workflowEntrypoint: (request: Request) => Promise; }): QueueDriver { + const worldDefaults = getWorldConfig(); + const queueDefaults = getQueueConfig(); + + const config = { + connectionString: opts.connectionString ?? worldDefaults.connectionString, + securityToken: opts.securityToken ?? worldDefaults.securityToken, + jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix, + queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency, + }; + return createPgBossQueue( { - jobPrefix: opts.jobPrefix, - connectionString: opts.connectionString, - queueConcurrency: opts.queueConcurrency, + jobPrefix: config.jobPrefix, + connectionString: config.connectionString, + queueConcurrency: config.queueConcurrency, }, createFunctionProxy({ - securityToken: opts.securityToken, + securityToken: config.securityToken, stepEntrypoint: opts.stepEntrypoint, workflowEntrypoint: opts.workflowEntrypoint, }) @@ -33,14 +44,34 @@ export function createPgBossFunctionProxyQueue(opts: { * QueueDriver implementation using pg-boss for job management * and HTTP for execution. */ -export function createPgBossHttpProxyQueue(config: { - port?: number; - baseUrl?: string; - jobPrefix?: string; - securityToken: string; - connectionString: string; - queueConcurrency?: number; -}): QueueDriver { +export function createPgBossHttpProxyQueue( + opts: { + port?: number; + baseUrl?: string; + jobPrefix?: string; + securityToken?: string; + connectionString?: string; + queueConcurrency?: number; + } = {} +): QueueDriver { + const worldDefaults = getWorldConfig(); + const queueDefaults = getQueueConfig(); + + const config = { + connectionString: opts.connectionString ?? worldDefaults.connectionString, + securityToken: opts.securityToken ?? worldDefaults.securityToken, + jobPrefix: opts.jobPrefix ?? queueDefaults.jobPrefix, + queueConcurrency: opts.queueConcurrency ?? queueDefaults.queueConcurrency, + + port: + opts.port ?? + (process.env.WORKFLOW_POSTGRES_APP_PORT + ? parseInt(process.env.WORKFLOW_POSTGRES_APP_PORT, 10) + : undefined), + + baseUrl: opts.baseUrl ?? process.env.WORKFLOW_POSTGRES_APP_URL, + }; + return createPgBossQueue( { jobPrefix: config.jobPrefix, diff --git a/packages/world-postgres/src/queue-drivers/pgboss.ts b/packages/world-postgres/src/queue-drivers/pgboss.ts index 40d1bf6c2..3e720aba0 100644 --- a/packages/world-postgres/src/queue-drivers/pgboss.ts +++ b/packages/world-postgres/src/queue-drivers/pgboss.ts @@ -7,19 +7,18 @@ import { MessageData, type QueueDriver } from './types.js'; * Takes in a proxy that will handle the actual step/flow execution. */ export function createPgBossQueue( - config: { - jobPrefix?: string; + opts: { + jobPrefix: string; connectionString: string; - queueConcurrency?: number; + queueConcurrency: number; }, proxy: WkfProxy ): QueueDriver { let startPromise: Promise | null = null; - const boss = new PgBoss(config.connectionString); + const boss = new PgBoss(opts.connectionString); - const prefix = config.jobPrefix || 'workflow_'; - const stepQueueName = `${prefix}steps`; - const workflowQueueName = `${prefix}flows`; + const stepQueueName = `${opts.jobPrefix}steps`; + const workflowQueueName = `${opts.jobPrefix}flows`; const ensureStarted = async () => { if (!startPromise) { @@ -59,7 +58,7 @@ export function createPgBossQueue( const stepWorker = createWorker(proxy.proxyStep); const workflowWorker = createWorker(proxy.proxyWorkflow); - for (let i = 0; i < (config.queueConcurrency || 10); i++) { + for (let i = 0; i < opts.queueConcurrency; i++) { await boss.work(workflowQueueName, workflowWorker); await boss.work(stepQueueName, stepWorker); } From 99326165e726e6f474ab2e0341e1087a9235bb9a Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:15:24 +0100 Subject: [PATCH 09/16] Updated README Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/README.md | 369 ++++++++++++++++++++++++++---- 1 file changed, 328 insertions(+), 41 deletions(-) diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index 875c2cbce..5234c195c 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -1,6 +1,6 @@ # @workflow/world-postgres -An embedded worker/workflow system backed by PostgreSQL for multi-host self-hosted solutions. This is a reference implementation - a production-ready solution might run workers in separate processes with a more robust queuing system. +A PostgreSQL-backed workflow runtime for durable, long-running workflows in JavaScript/TypeScript. Designed for multi-host self-hosted solutions with flexible queue and execution strategies. ## Installation @@ -12,61 +12,259 @@ pnpm add @workflow/world-postgres yarn add @workflow/world-postgres ``` -## Usage +## Quick Start (Next.js) -### Basic Setup +Get started in seconds by adding this to your Next.js project: -The postgres world can be configured by setting the `WORKFLOW_TARGET_WORLD` environment variable to the package name: +```typescript +// instrumentation.ts +export async function register() { + if (process.env.NEXT_RUNTIME !== 'edge') { + const { createWorld } = await import("@workflow/world-postgres"); + + const world = createWorld(); + await world.start(); + } +} +``` + +Set these environment variables: ```bash export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" +export WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +export WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +export WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" ``` -### Configuration +**⚠️ IMPORTANT**: Always set a strong `WORKFLOW_POSTGRES_SECURITY_TOKEN` in production. This token authenticates queue workers when they call your workflow endpoints and prevents unauthorized access. + +That's it! The world will automatically use pg-boss for queuing and HTTP proxy for executing workflows. + +## Architecture + +This package provides a layered architecture with three key components: + +- **Storage**: Persists workflow state in PostgreSQL (runs, events, steps, hooks, streaming chunks). All tables are isolated in their own PostgreSQL schema. +- **Queue Driver**: Manages job queuing and worker orchestration (default: pg-boss, or bring your own) +- **Proxy Strategy**: Determines how jobs are executed (HTTP calls or direct function invocation) + +## Execution Patterns + +The package supports flexible execution patterns based on two dimensions: + +### Queue Strategy +- **Built-in pg-boss** (default): Reliable PostgreSQL-backed job queue +- **Custom queue**: Implement your own queue system (Redis, SQS, RabbitMQ, etc.) + +### Proxy Strategy +- **HTTP Proxy**: Workers call workflow endpoints over HTTP (`/.well-known/workflow/v1/flow` and `/.well-known/workflow/v1/step`) +- **Function Proxy**: Workers invoke workflow/step functions directly in-process + +### Execution Environment +- **Same Process**: Workers run alongside your application (e.g., in Next.js `instrumentation.ts`) +- **Separate Process**: Dedicated worker process(es) for better isolation and scaling +- **Serverless**: Receive messages from your queue and call a proxy to execute workflows -Configure the PostgreSQL world using environment variables: +## Configuration Patterns +### Pattern 1: pg-boss + HTTP Proxy (Default) + +The simplest setup - workers make HTTP calls to your application: + +```typescript +import { createWorld } from "@workflow/world-postgres"; + +const world = createWorld(); +await world.start(); +``` + +**Required Environment Variables:** ```bash -# Required: PostgreSQL connection string -export WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" +``` -# Optional: Job prefix for queue operations -export WORKFLOW_POSTGRES_JOB_PREFIX="myapp" +**Optional Environment Variables:** +```bash +WORKFLOW_POSTGRES_JOB_PREFIX="myapp_" +WORKFLOW_POSTGRES_WORKER_CONCURRENCY="10" +``` -# Optional: Worker concurrency (default: 10) -export WORKFLOW_POSTGRES_WORKER_CONCURRENCY="10" +**Programmatic Configuration:** +```typescript +const world = createWorld({ + connectionString: "postgres://...", + securityToken: "your-secret-token", +}); ``` -### Programmatic Usage +### Pattern 2: pg-boss + Function Proxy + +Workers call workflow functions directly in the same process - better performance, simpler deployment: + +```typescript +import { createWorld, createPgBossFunctionProxyQueue } from "@workflow/world-postgres"; + +// Import entrypoints from your Next.js API routes +import { __wkf_entrypoint as workflowEntrypoint } from './app/.well-known/workflow/v1/flow/route'; +import { __wkf_entrypoint as stepEntrypoint } from './app/.well-known/workflow/v1/step/route'; + +const world = createWorld({ + queueFactory: () => + createPgBossFunctionProxyQueue({ + stepEntrypoint, + workflowEntrypoint, + }), +}); + +await world.start(); +``` + +**Required Environment Variables:** +```bash +WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +``` -You can also create a PostgreSQL world directly in your code: +**Optional:** +All configuration can be passed programmatically: + +```typescript +createPgBossFunctionProxyQueue({ + stepEntrypoint, + workflowEntrypoint, + connectionString: "postgres://...", + securityToken: "your-secret-token", + jobPrefix: "myapp_", + queueConcurrency: 10, +}) +``` + +### Pattern 3: Custom Queue Driver + +Implement your own queue system for maximum flexibility: ```typescript import { createWorld } from "@workflow/world-postgres"; +import type { QueueDriver, MessageData } from "@workflow/world-postgres/queue-drivers/types"; + +const myCustomQueue: QueueDriver = { + pushStep: async (message: MessageData) => { + // Push step execution message to your queue + await myQueue.push('steps', message); + }, + + pushFlow: async (message: MessageData) => { + // Push workflow execution message to your queue + await myQueue.push('workflows', message); + }, + + start: async () => { + // Start consuming from your queue and execute via proxy + const proxy = createHttpProxy({ + baseUrl: 'http://localhost:3000', + securityToken: process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN!, + }); + + await myQueue.consume('steps', async (message) => { + await proxy.proxyStep(message); + }); + + await myQueue.consume('workflows', async (message) => { + await proxy.proxyWorkflow(message); + }); + }, +}; const world = createWorld({ - connectionString: "postgres://username:password@localhost:5432/database", - jobPrefix: "myapp", // optional - queueConcurrency: 10, // optional + queueFactory: () => myCustomQueue, }); + +await world.start(); ``` -## Configuration Options +You can use the helper proxies: +- `createHttpProxy({ baseUrl, securityToken })` - for HTTP execution +- `createFunctionProxy({ stepEntrypoint, workflowEntrypoint, securityToken })` - for in-process execution -| Option | Type | Default | Description | -| ------------------ | -------- | -------------------------------------------------------------------------------------- | ----------------------------------- | -| `connectionString` | `string` | `process.env.WORKFLOW_POSTGRES_URL` or `'postgres://world:world@localhost:5432/world'` | PostgreSQL connection string | -| `jobPrefix` | `string` | `process.env.WORKFLOW_POSTGRES_JOB_PREFIX` | Optional prefix for queue job names | -| `queueConcurrency` | `number` | `10` | Number of concurrent queue workers | +See `src/queue-drivers/types.ts` for the full `QueueDriver` interface and `MessageData` structure. + +## Execution Environment Examples + +### Same Process (Next.js instrumentation.ts) + +Run workers in the same process as your Next.js application: + +```typescript +// instrumentation.ts +export async function register() { + if (process.env.NEXT_RUNTIME !== 'edge') { + const { createWorld, createPgBossFunctionProxyQueue } = + await import("@workflow/world-postgres"); + + const { __wkf_entrypoint: workflowEntrypoint } = + await import('./app/.well-known/workflow/v1/flow/route'); + const { __wkf_entrypoint: stepEntrypoint } = + await import('./app/.well-known/workflow/v1/step/route'); + + const world = createWorld({ + queueFactory: () => + createPgBossFunctionProxyQueue({ + stepEntrypoint, + workflowEntrypoint, + }), + }); + + await world.start(); + } +} +``` -## Environment Variables +### Separate Worker Process -| Variable | Description | Default | -| -------------------------------------- | ------------------------------------------------------------ | ----------------------------------------------- | -| `WORKFLOW_TARGET_WORLD` | Set to `"@workflow/world-postgres"` to use this world | - | -| `WORKFLOW_POSTGRES_URL` | PostgreSQL connection string | `'postgres://world:world@localhost:5432/world'` | -| `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | - | -| `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` | +Run workers in a dedicated process for better isolation: + +```typescript +// worker.ts +import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres"; + +const world = createWorld({ + queueFactory: () => + createPgBossHttpProxyQueue({ + baseUrl: "http://localhost:3000", // Your app URL + }), +}); + +await world.start(); +``` + +Then run: `node worker.ts` + +### Serverless + +In a serverless environment, receive messages from your queue and execute them via proxy: + +```typescript +// queue-handler.ts +import { createHttpProxy } from "@workflow/world-postgres"; +import type { MessageData } from "@workflow/world-postgres/queue-drivers/types"; + +const proxy = createHttpProxy({ + baseUrl: process.env.APP_URL, + securityToken: process.env.SECURITY_TOKEN, +}); + +export async function handleQueueMessage(message: MessageData) { + // Determine if it's a step or workflow + if (message.queueName.includes('step')) { + await proxy.proxyStep(message); + } else { + await proxy.proxyWorkflow(message); + } +} +``` ## Database Setup @@ -93,7 +291,7 @@ The CLI automatically loads `.env` files and will use the connection string from ### Database Schema -The setup creates the following tables: +All workflow data is stored in its own PostgreSQL schema, keeping it isolated from your application data. The setup creates the following tables: - `workflow_runs` - Stores workflow execution runs - `workflow_events` - Stores workflow events @@ -109,15 +307,103 @@ import { runs, events, steps, hooks, streams } from '@workflow/world-postgres'; import * as schema from '@workflow/world-postgres/schema'; ``` -Make sure your PostgreSQL database is accessible and the user has sufficient permissions to create tables and manage jobs. +Make sure your PostgreSQL database is accessible and the user has sufficient permissions to create schemas, tables, and manage jobs. + +## Environment Variables Reference + +| Variable | Description | Default | Required For | +| -------------------------------------- | -------------------------------------------- | ----------------------------------------------- | -------------------------- | +| `WORKFLOW_TARGET_WORLD` | Package name to use as workflow world | - | All patterns | +| `WORKFLOW_POSTGRES_URL` | PostgreSQL connection string | `postgres://world:world@localhost:5432/world` | All patterns | +| `WORKFLOW_POSTGRES_SECURITY_TOKEN` | Security token for queue worker auth | `secret` | **Required in production** | +| `WORKFLOW_POSTGRES_JOB_PREFIX` | Prefix for queue job names | `workflow_` | Optional | +| `WORKFLOW_POSTGRES_WORKER_CONCURRENCY` | Number of concurrent workers | `10` | Optional | +| `WORKFLOW_POSTGRES_APP_URL` | Base URL for HTTP proxy | - | Pattern 1 (HTTP proxy) | +| `WORKFLOW_POSTGRES_APP_PORT` | Port for HTTP proxy (if URL not provided) | `3000` | Pattern 1 (HTTP proxy) | + +All environment variables can be overridden by passing configuration programmatically to `createWorld()` or the queue factory functions. ## Features -- **Durable Storage**: Stores workflow runs, events, steps, hooks, and webhooks in PostgreSQL -- **Queue Processing**: Uses pg-boss for reliable job queue processing +- **Durable Storage**: Stores workflow runs, events, steps, hooks, and webhooks in PostgreSQL with schema isolation +- **Flexible Queue System**: Use built-in pg-boss or integrate any queue system (Redis, SQS, RabbitMQ, etc.) +- **Multiple Execution Strategies**: HTTP proxy for distributed systems, function proxy for co-located workers - **Streaming**: Real-time event streaming capabilities - **Health Checks**: Built-in connection health monitoring - **Configurable Concurrency**: Adjustable worker concurrency for queue processing +- **Type-Safe**: Full TypeScript support with exported types + +## API Reference + +### `createWorld(options)` + +Creates a workflow world instance with PostgreSQL storage. + +**Options:** +- `connectionString?: string` - PostgreSQL connection string (default: `process.env.WORKFLOW_POSTGRES_URL`) +- `securityToken?: string` - Token for authenticating queue workers (default: `process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN`) +- `queueFactory?: () => QueueDriver` - Factory function to create queue driver (default: `createPgBossHttpProxyQueue()`) + +**Returns:** World instance with `start()` method + +### Built-in Queue Factories + +#### `createPgBossHttpProxyQueue(options)` + +Creates a pg-boss queue driver with HTTP proxy execution. + +**Options:** +- `connectionString?: string` +- `securityToken?: string` +- `jobPrefix?: string` +- `queueConcurrency?: number` +- `port?: number` +- `baseUrl?: string` + +#### `createPgBossFunctionProxyQueue(options)` + +Creates a pg-boss queue driver with direct function call execution. + +**Options:** +- `stepEntrypoint: (request: Request) => Promise` - Required +- `workflowEntrypoint: (request: Request) => Promise` - Required +- `connectionString?: string` +- `securityToken?: string` +- `jobPrefix?: string` +- `queueConcurrency?: number` + +### Proxy Helpers + +#### `createHttpProxy(options)` + +Creates an HTTP proxy for executing workflows via HTTP calls. + +**Options:** +- `baseUrl?: string` - Base URL of your application +- `port?: number` - Port (if baseUrl not provided) +- `securityToken: string` - Security token for authentication + +#### `createFunctionProxy(options)` + +Creates a function proxy for executing workflows via direct function calls. + +**Options:** +- `stepEntrypoint: (request: Request) => Promise` - Required +- `workflowEntrypoint: (request: Request) => Promise` - Required +- `securityToken: string` - Security token for authentication + +## TypeScript Support + +All public APIs are fully typed. Import types from the package: + +```typescript +import type { + QueueDriver, + MessageData, + WkfProxy, + PostgresWorldConfig +} from "@workflow/world-postgres"; +``` ## Development @@ -127,19 +413,20 @@ For local development, you can use the included Docker Compose configuration: # Start PostgreSQL database docker-compose up -d -# Create and run migrations -pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations -pnpm bin/setup.js +# Run database setup +pnpm exec workflow-postgres-setup # Set environment variables for local development export WORKFLOW_POSTGRES_URL="postgres://world:world@localhost:5432/world" export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" ``` -## World Selection - -To use the PostgreSQL world, set the `WORKFLOW_TARGET_WORLD` environment variable to the package name: +### Creating Migrations ```bash -export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" +pnpm drizzle-kit generate --dialect=postgresql --schema=./src/drizzle/schema.ts --out src/drizzle/migrations ``` + +## License + +See [LICENSE.md](./LICENSE.md) From 334da22fc4a1f2a3342a8c3f4be1c8b579f90e12 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Tue, 18 Nov 2025 00:24:42 +0100 Subject: [PATCH 10/16] Rework README Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/HOW_IT_WORKS.md | 50 ------ packages/world-postgres/README.md | 225 +++++------------------- 2 files changed, 42 insertions(+), 233 deletions(-) delete mode 100644 packages/world-postgres/HOW_IT_WORKS.md diff --git a/packages/world-postgres/HOW_IT_WORKS.md b/packages/world-postgres/HOW_IT_WORKS.md deleted file mode 100644 index a75c62dee..000000000 --- a/packages/world-postgres/HOW_IT_WORKS.md +++ /dev/null @@ -1,50 +0,0 @@ -# How PostgreSQL World Works - -This document explains the architecture and components of the PostgreSQL world implementation for workflow management. - -This implementation is using [Drizzle Schema](./src/drizzle/schema.ts) that can be pushed or migrated into your PostgreSQL schema and backed by Postgres.js. - -If you want to use any other ORM, query builder or underlying database client, you should be able to fork this implementation and replace the Drizzle parts with your own. - -## Job Queue System - -```mermaid -graph LR - Client --> PG[pg-boss queue] - PG --> Worker[Embedded Worker] - Worker --> HTTP[HTTP fetch] - HTTP --> EW[Embedded World] - - PG -.-> F["${prefix}flows
(workflows)"] - PG -.-> S["${prefix}steps
(steps)"] -``` - -Jobs include retry logic (3 attempts), idempotency keys, and configurable worker concurrency (default: 10). - -## Streaming - -Real-time data streaming via **PostgreSQL LISTEN/NOTIFY**: - -- Stream chunks stored in `workflow_stream_chunks` table -- `pg_notify` triggers sent on writes to `workflow_event_chunk` topic -- Subscribers receive notifications and fetch chunk data -- ULID-based ordering ensures correct sequence -- Single connection for listening to notifications, with an in-process EventEmitter for distributing events to multiple subscribers - -## Setup - -Call `world.start()` to initialize pg-boss workers. When `.start()` is called, workers begin listening to pg-boss queues. When a job arrives, workers make HTTP fetch calls to the embedded world endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) to execute the actual workflow logic. - -In **Next.js**, the `world.setup()` function needs to be added to `instrumentation.ts|js` to ensure workers start before request handling: - -```ts -// instrumentation.ts - -if (process.env.NEXT_RUNTIME !== "edge") { - import("workflow/api").then(async ({ getWorld }) => { - // start listening to the jobs. - await getWorld().start?.(); - }); -} -``` - diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index 5234c195c..4a7132304 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -1,6 +1,6 @@ # @workflow/world-postgres -A PostgreSQL-backed workflow runtime for durable, long-running workflows in JavaScript/TypeScript. Designed for multi-host self-hosted solutions with flexible queue and execution strategies. +An embedded worker/workflow system backed by PostgreSQL for multi-host self-hosted solutions ## Installation @@ -12,52 +12,54 @@ pnpm add @workflow/world-postgres yarn add @workflow/world-postgres ``` -## Quick Start (Next.js) +## Usage -Get started in seconds by adding this to your Next.js project: +### Basic Setup -```typescript -// instrumentation.ts -export async function register() { - if (process.env.NEXT_RUNTIME !== 'edge') { - const { createWorld } = await import("@workflow/world-postgres"); +The postgres world can be configured by setting the WORKFLOW_TARGET_WORLD environment variable to the package name. - const world = createWorld(); - await world.start(); - } -} +```bash +WORKFLOW_TARGET_WORLD="@workflow/world-postgres" ``` -Set these environment variables: +### Configuration + +Configure the PostgreSQL world using environment variables: ```bash -export WORKFLOW_TARGET_WORLD="@workflow/world-postgres" -export WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" -export WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" -export WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" +WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" +WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" +WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" ``` -**⚠️ IMPORTANT**: Always set a strong `WORKFLOW_POSTGRES_SECURITY_TOKEN` in production. This token authenticates queue workers when they call your workflow endpoints and prevents unauthorized access. +### Programmatic Usage -That's it! The world will automatically use pg-boss for queuing and HTTP proxy for executing workflows. +You can also create a PostgreSQL world directly in your code: -## Architecture +```typescript +import { createWorld, createPgBossQueue } from "@workflow/world-postgres"; -This package provides a layered architecture with three key components: +const world = createWorld({ + connectionString: "postgres://username:password@localhost:5432/database", + securityToken: "your-secret-token-here", + queueFactorya: createPgBossHttpProxyQueue({ + jobPrefix: "my-app", + queueConcurrency: 10, + }) +}); +``` -- **Storage**: Persists workflow state in PostgreSQL (runs, events, steps, hooks, streaming chunks). All tables are isolated in their own PostgreSQL schema. -- **Queue Driver**: Manages job queuing and worker orchestration (default: pg-boss, or bring your own) -- **Proxy Strategy**: Determines how jobs are executed (HTTP calls or direct function invocation) +**⚠️ IMPORTANT**: Always set a strong `WORKFLOW_POSTGRES_SECURITY_TOKEN` in production. This token authenticates queue workers when they call your workflow endpoints and prevents unauthorized access. -## Execution Patterns +## Architecture -The package supports flexible execution patterns based on two dimensions: +The package supports flexible queues and execution patterns, letting you choose how jobs are queued and where the steps and workflows execution will be happen. ### Queue Strategy - **Built-in pg-boss** (default): Reliable PostgreSQL-backed job queue - **Custom queue**: Implement your own queue system (Redis, SQS, RabbitMQ, etc.) -### Proxy Strategy +### Execution Proxy Strategy - **HTTP Proxy**: Workers call workflow endpoints over HTTP (`/.well-known/workflow/v1/flow` and `/.well-known/workflow/v1/step`) - **Function Proxy**: Workers invoke workflow/step functions directly in-process @@ -66,11 +68,11 @@ The package supports flexible execution patterns based on two dimensions: - **Separate Process**: Dedicated worker process(es) for better isolation and scaling - **Serverless**: Receive messages from your queue and call a proxy to execute workflows -## Configuration Patterns +## Advanced Usage -### Pattern 1: pg-boss + HTTP Proxy (Default) +### pg-boss + HTTP Proxy (Default) -The simplest setup - workers make HTTP calls to your application: +The simplest setup - jobs are queued usning pg-boss and workers make HTTP calls to your application: ```typescript import { createWorld } from "@workflow/world-postgres"; @@ -100,14 +102,15 @@ const world = createWorld({ }); ``` -### Pattern 2: pg-boss + Function Proxy +### pg-boss + Function Proxy -Workers call workflow functions directly in the same process - better performance, simpler deployment: +Jobs are using pg-boss and workers directly call workflow functions in the same process ```typescript +const { setWorld } = await import('workflow/runtime'); import { createWorld, createPgBossFunctionProxyQueue } from "@workflow/world-postgres"; -// Import entrypoints from your Next.js API routes +// Import entrypoints from your framework API routes import { __wkf_entrypoint as workflowEntrypoint } from './app/.well-known/workflow/v1/flow/route'; import { __wkf_entrypoint as stepEntrypoint } from './app/.well-known/workflow/v1/step/route'; @@ -119,34 +122,17 @@ const world = createWorld({ }), }); -await world.start(); -``` +setWorld(world); -**Required Environment Variables:** -```bash -WORKFLOW_POSTGRES_URL="postgres://username:password@localhost:5432/database" -WORKFLOW_POSTGRES_SECURITY_TOKEN="your-secret-token-here" -``` - -**Optional:** -All configuration can be passed programmatically: - -```typescript -createPgBossFunctionProxyQueue({ - stepEntrypoint, - workflowEntrypoint, - connectionString: "postgres://...", - securityToken: "your-secret-token", - jobPrefix: "myapp_", - queueConcurrency: 10, -}) +await world.start(); ``` -### Pattern 3: Custom Queue Driver +### Custom Queue Driver + HTTP Proxy Implement your own queue system for maximum flexibility: ```typescript +const { setWorld } = await import('workflow/runtime'); import { createWorld } from "@workflow/world-postgres"; import type { QueueDriver, MessageData } from "@workflow/world-postgres/queue-drivers/types"; @@ -182,67 +168,12 @@ const world = createWorld({ queueFactory: () => myCustomQueue, }); -await world.start(); -``` - -You can use the helper proxies: -- `createHttpProxy({ baseUrl, securityToken })` - for HTTP execution -- `createFunctionProxy({ stepEntrypoint, workflowEntrypoint, securityToken })` - for in-process execution - -See `src/queue-drivers/types.ts` for the full `QueueDriver` interface and `MessageData` structure. - -## Execution Environment Examples - -### Same Process (Next.js instrumentation.ts) - -Run workers in the same process as your Next.js application: - -```typescript -// instrumentation.ts -export async function register() { - if (process.env.NEXT_RUNTIME !== 'edge') { - const { createWorld, createPgBossFunctionProxyQueue } = - await import("@workflow/world-postgres"); - - const { __wkf_entrypoint: workflowEntrypoint } = - await import('./app/.well-known/workflow/v1/flow/route'); - const { __wkf_entrypoint: stepEntrypoint } = - await import('./app/.well-known/workflow/v1/step/route'); - - const world = createWorld({ - queueFactory: () => - createPgBossFunctionProxyQueue({ - stepEntrypoint, - workflowEntrypoint, - }), - }); - - await world.start(); - } -} -``` - -### Separate Worker Process - -Run workers in a dedicated process for better isolation: - -```typescript -// worker.ts -import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres"; - -const world = createWorld({ - queueFactory: () => - createPgBossHttpProxyQueue({ - baseUrl: "http://localhost:3000", // Your app URL - }), -}); +setWorld(world); await world.start(); ``` -Then run: `node worker.ts` - -### Serverless +### Serverless execution In a serverless environment, receive messages from your queue and execute them via proxy: @@ -333,78 +264,6 @@ All environment variables can be overridden by passing configuration programmati - **Configurable Concurrency**: Adjustable worker concurrency for queue processing - **Type-Safe**: Full TypeScript support with exported types -## API Reference - -### `createWorld(options)` - -Creates a workflow world instance with PostgreSQL storage. - -**Options:** -- `connectionString?: string` - PostgreSQL connection string (default: `process.env.WORKFLOW_POSTGRES_URL`) -- `securityToken?: string` - Token for authenticating queue workers (default: `process.env.WORKFLOW_POSTGRES_SECURITY_TOKEN`) -- `queueFactory?: () => QueueDriver` - Factory function to create queue driver (default: `createPgBossHttpProxyQueue()`) - -**Returns:** World instance with `start()` method - -### Built-in Queue Factories - -#### `createPgBossHttpProxyQueue(options)` - -Creates a pg-boss queue driver with HTTP proxy execution. - -**Options:** -- `connectionString?: string` -- `securityToken?: string` -- `jobPrefix?: string` -- `queueConcurrency?: number` -- `port?: number` -- `baseUrl?: string` - -#### `createPgBossFunctionProxyQueue(options)` - -Creates a pg-boss queue driver with direct function call execution. - -**Options:** -- `stepEntrypoint: (request: Request) => Promise` - Required -- `workflowEntrypoint: (request: Request) => Promise` - Required -- `connectionString?: string` -- `securityToken?: string` -- `jobPrefix?: string` -- `queueConcurrency?: number` - -### Proxy Helpers - -#### `createHttpProxy(options)` - -Creates an HTTP proxy for executing workflows via HTTP calls. - -**Options:** -- `baseUrl?: string` - Base URL of your application -- `port?: number` - Port (if baseUrl not provided) -- `securityToken: string` - Security token for authentication - -#### `createFunctionProxy(options)` - -Creates a function proxy for executing workflows via direct function calls. - -**Options:** -- `stepEntrypoint: (request: Request) => Promise` - Required -- `workflowEntrypoint: (request: Request) => Promise` - Required -- `securityToken: string` - Security token for authentication - -## TypeScript Support - -All public APIs are fully typed. Import types from the package: - -```typescript -import type { - QueueDriver, - MessageData, - WkfProxy, - PostgresWorldConfig -} from "@workflow/world-postgres"; -``` - ## Development For local development, you can use the included Docker Compose configuration: From a349d1f25c0985bed0bf436458243a0b7f0820e0 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Tue, 18 Nov 2025 00:27:20 +0100 Subject: [PATCH 11/16] Changeset Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- .changeset/kind-suns-wonder.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/kind-suns-wonder.md diff --git a/.changeset/kind-suns-wonder.md b/.changeset/kind-suns-wonder.md new file mode 100644 index 000000000..04d953bcd --- /dev/null +++ b/.changeset/kind-suns-wonder.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/sveltekit": patch +"@workflow/builders": patch +--- + +- export stepEntrypoint and workflowEntrypoint from build +- add abstract queue driver to world postgres +- add execution strategy to world postgres From 1ebb038dbd9f5d94f7d8ff458939f1f58549fff3 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Thu, 20 Nov 2025 23:17:20 +0100 Subject: [PATCH 12/16] Store metadata Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/storage.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index bfe9726a4..20aaf57a4 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -469,6 +469,7 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { runId, hookId: data.hookId, token: data.token, + metadata: data.metadata as any, ownerId: '', // TODO: get from context projectId: '', // TODO: get from context environment: '', // TODO: get from context From dfe0f42e9b4c9cf5b4020f490d6eac22f4771b40 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Fri, 21 Nov 2025 00:15:04 +0100 Subject: [PATCH 13/16] Store data as bytea to support null bytes Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- .../migrations/0002_striped_lifeguard.sql | 7 + .../migrations/meta/0002_snapshot.json | 506 ++++++++++++++++++ .../src/drizzle/migrations/meta/_journal.json | 7 + packages/world-postgres/src/drizzle/schema.ts | 24 +- packages/world-postgres/src/storage.ts | 114 +++- 5 files changed, 629 insertions(+), 29 deletions(-) create mode 100644 packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql create mode 100644 packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json diff --git a/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql b/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql new file mode 100644 index 000000000..4a1af6570 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0002_striped_lifeguard.sql @@ -0,0 +1,7 @@ +-- Convert JSONB to bytea by serializing to text and encoding as UTF-8 +ALTER TABLE "workflow"."workflow_events" ALTER COLUMN "payload" SET DATA TYPE bytea USING convert_to("payload"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_hooks" ALTER COLUMN "metadata" SET DATA TYPE bytea USING convert_to("metadata"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_runs" ALTER COLUMN "output" SET DATA TYPE bytea USING convert_to("output"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_runs" ALTER COLUMN "input" SET DATA TYPE bytea USING convert_to("input"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" ALTER COLUMN "input" SET DATA TYPE bytea USING convert_to("input"::text, 'UTF8');--> statement-breakpoint +ALTER TABLE "workflow"."workflow_steps" ALTER COLUMN "output" SET DATA TYPE bytea USING convert_to("output"::text, 'UTF8'); \ No newline at end of file diff --git a/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json b/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json new file mode 100644 index 000000000..ea249d2a4 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/meta/0002_snapshot.json @@ -0,0 +1,506 @@ +{ + "id": "ccda4a09-c27c-4386-a806-d0520bbbcac6", + "prevId": "a1cbf36d-8801-4509-80c4-8bab67191377", + "version": "7", + "dialect": "postgresql", + "tables": { + "workflow.workflow_events": { + "name": "workflow_events", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "type": { + "name": "type", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "correlation_id": { + "name": "correlation_id", + "type": "varchar", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "payload": { + "name": "payload", + "type": "bytea", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_events_run_id_index": { + "name": "workflow_events_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_events_correlation_id_index": { + "name": "workflow_events_correlation_id_index", + "columns": [ + { + "expression": "correlation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_hooks": { + "name": "workflow_hooks", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "hook_id": { + "name": "hook_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "token": { + "name": "token", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "owner_id": { + "name": "owner_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "project_id": { + "name": "project_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "environment": { + "name": "environment", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "metadata": { + "name": "metadata", + "type": "bytea", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_hooks_run_id_index": { + "name": "workflow_hooks_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_hooks_token_index": { + "name": "workflow_hooks_token_index", + "columns": [ + { + "expression": "token", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_runs": { + "name": "workflow_runs", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "output": { + "name": "output", + "type": "bytea", + "primaryKey": false, + "notNull": false + }, + "deployment_id": { + "name": "deployment_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "execution_context": { + "name": "execution_context", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "input": { + "name": "input", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_runs_name_index": { + "name": "workflow_runs_name_index", + "columns": [ + { + "expression": "name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_runs_status_index": { + "name": "workflow_runs_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_steps": { + "name": "workflow_steps", + "schema": "workflow", + "columns": { + "run_id": { + "name": "run_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "step_id": { + "name": "step_id", + "type": "varchar", + "primaryKey": true, + "notNull": true + }, + "step_name": { + "name": "step_name", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "step_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "input": { + "name": "input", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "output": { + "name": "output", + "type": "bytea", + "primaryKey": false, + "notNull": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "attempt": { + "name": "attempt", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "retry_after": { + "name": "retry_after", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": { + "workflow_steps_run_id_index": { + "name": "workflow_steps_run_id_index", + "columns": [ + { + "expression": "run_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "workflow_steps_status_index": { + "name": "workflow_steps_status_index", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "workflow.workflow_stream_chunks": { + "name": "workflow_stream_chunks", + "schema": "workflow", + "columns": { + "id": { + "name": "id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "stream_id": { + "name": "stream_id", + "type": "varchar", + "primaryKey": false, + "notNull": true + }, + "data": { + "name": "data", + "type": "bytea", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "eof": { + "name": "eof", + "type": "boolean", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "workflow_stream_chunks_stream_id_id_pk": { + "name": "workflow_stream_chunks_stream_id_id_pk", + "columns": ["stream_id", "id"] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.step_status": { + "name": "step_status", + "schema": "public", + "values": ["pending", "running", "completed", "failed", "cancelled"] + }, + "public.status": { + "name": "status", + "schema": "public", + "values": [ + "pending", + "running", + "completed", + "failed", + "paused", + "cancelled" + ] + } + }, + "schemas": { + "workflow": "workflow" + }, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json index 2eebb1f60..2eb16aa7e 100644 --- a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json +++ b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json @@ -15,6 +15,13 @@ "when": 1762968119388, "tag": "0001_handy_jocasta", "breakpoints": true + }, + { + "idx": 2, + "version": "7", + "when": 1763679632642, + "tag": "0002_striped_lifeguard", + "breakpoints": true } ] } diff --git a/packages/world-postgres/src/drizzle/schema.ts b/packages/world-postgres/src/drizzle/schema.ts index 11347be28..d43da02cf 100644 --- a/packages/world-postgres/src/drizzle/schema.ts +++ b/packages/world-postgres/src/drizzle/schema.ts @@ -49,18 +49,24 @@ type DrizzlishOfType = { */ export type SerializedContent = any[]; +const bytea = customType<{ data: Buffer; notNull: false; default: false }>({ + dataType() { + return 'bytea'; + }, +}); + export const schema = pgSchema('workflow'); export const runs = schema.table( 'workflow_runs', { runId: varchar('id').primaryKey(), - output: jsonb('output').$type(), + output: bytea('output'), deploymentId: varchar('deployment_id').notNull(), status: workflowRunStatus('status').notNull(), workflowName: varchar('name').notNull(), executionContext: jsonb('execution_context').$type>(), - input: jsonb('input').$type().notNull(), + input: bytea('input').notNull(), error: text('error'), createdAt: timestamp('created_at').defaultNow().notNull(), updatedAt: timestamp('updated_at') @@ -84,7 +90,7 @@ export const events = schema.table( correlationId: varchar('correlation_id'), createdAt: timestamp('created_at').defaultNow().notNull(), runId: varchar('run_id').notNull(), - eventData: jsonb('payload'), + eventData: bytea('payload'), } satisfies DrizzlishOfType, (tb) => ({ runFk: index().on(tb.runId), @@ -99,8 +105,8 @@ export const steps = schema.table( stepId: varchar('step_id').primaryKey(), stepName: varchar('step_name').notNull(), status: stepStatus('status').notNull(), - input: jsonb('input').$type().notNull(), - output: jsonb('output').$type(), + input: bytea('input').notNull(), + output: bytea('output'), error: text('error'), attempt: integer('attempt').notNull(), startedAt: timestamp('started_at'), @@ -128,7 +134,7 @@ export const hooks = schema.table( projectId: varchar('project_id').notNull(), environment: varchar('environment').notNull(), createdAt: timestamp('created_at').defaultNow().notNull(), - metadata: jsonb('metadata').$type(), + metadata: bytea('metadata'), } satisfies DrizzlishOfType, (tb) => ({ runFk: index().on(tb.runId), @@ -136,12 +142,6 @@ export const hooks = schema.table( }) ); -const bytea = customType<{ data: Buffer; notNull: false; default: false }>({ - dataType() { - return 'bytea'; - }, -}); - export const streams = schema.table( 'workflow_stream_chunks', { diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 20aaf57a4..7fa438ee6 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -51,10 +51,17 @@ function serializeRunError(data: UpdateWorkflowRunRequest): any { * - If errorStack/errorCode exist (legacy) → combine into StructuredError */ function deserializeRunError(run: any): WorkflowRun { - const { error, errorStack, errorCode, ...rest } = run; + const { error, errorStack, errorCode, input, output, ...rest } = run; + + const deserializedInput = fromBuffer(input); + const deserializedOutput = fromBuffer(output); if (!error && !errorStack && !errorCode) { - return run as WorkflowRun; + return { + ...run, + input: deserializedInput, + output: deserializedOutput, + } as WorkflowRun; } // Try to parse as structured error JSON @@ -64,6 +71,8 @@ function deserializeRunError(run: any): WorkflowRun { if (typeof parsed === 'object' && parsed.message !== undefined) { return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: parsed.message, stack: parsed.stack, @@ -79,6 +88,8 @@ function deserializeRunError(run: any): WorkflowRun { // Backwards compatibility: handle legacy separate fields or plain string error return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: error || '', stack: errorStack, @@ -110,10 +121,17 @@ function serializeStepError(data: UpdateStepRequest): any { * Deserialize error JSON string (or legacy flat fields) into a StructuredError object for steps */ function deserializeStepError(step: any): Step { - const { error, ...rest } = step; + const { error, input, output, ...rest } = step; + + const deserializedInput = fromBuffer(input); + const deserializedOutput = fromBuffer(output); if (!error) { - return step as Step; + return { + ...step, + input: deserializedInput, + output: deserializedOutput, + } as Step; } // Try to parse as structured error JSON @@ -123,6 +141,8 @@ function deserializeStepError(step: any): Step { if (typeof parsed === 'object' && parsed.message !== undefined) { return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: parsed.message, stack: parsed.stack, @@ -138,6 +158,8 @@ function deserializeStepError(step: any): Step { // Backwards compatibility: handle legacy separate fields or plain string error return { ...rest, + input: deserializedInput, + output: deserializedOutput, error: { message: error || '', }, @@ -267,11 +289,18 @@ export function createRunsStorage(drizzle: Drizzle): Storage['runs'] { }, async create(data) { const runId = `wrun_${ulid()}`; + const inputBuffer = toBuffer(data.input); + if (!inputBuffer) { + throw new WorkflowAPIError(`Invalid input data`, { + status: 400, + }); + } + const [value] = await drizzle .insert(runs) .values({ runId, - input: data.input, + input: inputBuffer, executionContext: data.executionContext as Record< string, unknown @@ -306,7 +335,7 @@ export function createRunsStorage(drizzle: Drizzle): Storage['runs'] { const updates: Partial = { ...serialized, - output: data.output as SerializedContent, + output: toBuffer(data.output), }; // Only set startedAt the first time transitioning to 'running' @@ -360,7 +389,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { eventId, correlationId: data.correlationId, eventType: data.eventType, - eventData: 'eventData' in data ? data.eventData : undefined, + eventData: 'eventData' in data ? toBuffer(data.eventData) : undefined, }) .returning({ createdAt: events.createdAt }); if (!value) { @@ -399,7 +428,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = EventSchema.parse(compact(v)); + const deserialized = deserializeEvent(compact(v)); + const parsed = EventSchema.parse(deserialized); return filterEventData(parsed, resolveData); }), cursor: values.at(-1)?.eventId ?? null, @@ -432,7 +462,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = EventSchema.parse(compact(v)); + const deserialized = deserializeEvent(compact(v)); + const parsed = EventSchema.parse(deserialized); return filterEventData(parsed, resolveData); }), cursor: values.at(-1)?.eventId ?? null, @@ -458,7 +489,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { .from(hooks) .where(eq(hooks.hookId, hookId)) .limit(1); - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -469,7 +501,7 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { runId, hookId: data.hookId, token: data.token, - metadata: data.metadata as any, + metadata: toBuffer(data.metadata), ownerId: '', // TODO: get from context projectId: '', // TODO: get from context environment: '', // TODO: get from context @@ -481,7 +513,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 409, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -492,7 +525,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 404, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -516,7 +550,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { const resolveData = params?.resolveData ?? 'all'; return { data: values.map((v) => { - const parsed = HookSchema.parse(compact(v)); + const deserialized = deserializeHook(compact(v)); + const parsed = HookSchema.parse(deserialized); return filterHookData(parsed, resolveData); }), cursor: values.at(-1)?.hookId ?? null, @@ -533,7 +568,8 @@ export function createHooksStorage(drizzle: Drizzle): Storage['hooks'] { status: 404, }); } - const parsed = HookSchema.parse(compact(value)); + const deserialized = deserializeHook(compact(value)); + const parsed = HookSchema.parse(deserialized); const resolveData = params?.resolveData ?? 'all'; return filterHookData(parsed, resolveData); }, @@ -545,13 +581,20 @@ export function createStepsStorage(drizzle: Drizzle): Storage['steps'] { return { async create(runId, data) { + const inputBuffer = toBuffer(data.input); + if (!inputBuffer) { + throw new WorkflowAPIError(`Invalid input data`, { + status: 400, + }); + } + const [value] = await drizzle .insert(steps) .values({ runId, stepId: data.stepId, stepName: data.stepName, - input: data.input as SerializedContent, + input: inputBuffer, status: 'pending', attempt: 1, }) @@ -607,7 +650,7 @@ export function createStepsStorage(drizzle: Drizzle): Storage['steps'] { const updates: Partial = { ...serialized, - output: data.output as SerializedContent, + output: toBuffer(data.output), }; const now = new Date(); // Only set startedAt the first time the step transitions to 'running' @@ -691,6 +734,22 @@ function filterHookData(hook: Hook, resolveData: ResolveData): Hook { return hook; } +function deserializeEvent(event: any): Event { + const { eventData, ...rest } = event; + return { + ...rest, + eventData: eventData ? fromBuffer(eventData) : undefined, + } as Event; +} + +function deserializeHook(hook: any): Hook { + const { metadata, ...rest } = hook; + return { + ...rest, + metadata: metadata ? fromBuffer(metadata) : undefined, + } as Hook; +} + function filterEventData(event: Event, resolveData: ResolveData): Event { if (resolveData === 'none' && 'eventData' in event) { const { eventData: _, ...rest } = event; @@ -699,3 +758,24 @@ function filterEventData(event: Event, resolveData: ResolveData): Event { } return event; } + +/** + * Convert SerializedContent (array) to Buffer for BYTEA storage. + * PostgreSQL BYTEA natively supports null bytes, unlike JSONB. + */ +function toBuffer( + data: SerializedContent | unknown | undefined | null +): Buffer | undefined { + if (data === undefined || data === null) return undefined; + return Buffer.from(JSON.stringify(data), 'utf8'); +} + +/** + * Convert Buffer from BYTEA storage back to SerializedContent (array). + */ +function fromBuffer( + buffer: Buffer | undefined | null +): SerializedContent | undefined { + if (!buffer) return undefined; + return JSON.parse(buffer.toString('utf8')); +} From 29d029d9c9a16a9890be3b77017abebe81c98953 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Fri, 21 Nov 2025 00:35:43 +0100 Subject: [PATCH 14/16] Respect sleep Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/README.md | 2 +- .../src/queue-drivers/pgboss.ts | 32 ++++++++++++++----- packages/world-postgres/src/queue.ts | 1 - 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index 4a7132304..1e06649ed 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -42,7 +42,7 @@ import { createWorld, createPgBossQueue } from "@workflow/world-postgres"; const world = createWorld({ connectionString: "postgres://username:password@localhost:5432/database", securityToken: "your-secret-token-here", - queueFactorya: createPgBossHttpProxyQueue({ + queueFactory: createPgBossHttpProxyQueue({ jobPrefix: "my-app", queueConcurrency: 10, }) diff --git a/packages/world-postgres/src/queue-drivers/pgboss.ts b/packages/world-postgres/src/queue-drivers/pgboss.ts index 3e720aba0..ff7fb4014 100644 --- a/packages/world-postgres/src/queue-drivers/pgboss.ts +++ b/packages/world-postgres/src/queue-drivers/pgboss.ts @@ -55,8 +55,12 @@ export function createPgBossQueue( start: async () => { await ensureStarted(); - const stepWorker = createWorker(proxy.proxyStep); - const workflowWorker = createWorker(proxy.proxyWorkflow); + const stepWorker = createWorker(boss, stepQueueName, proxy.proxyStep); + const workflowWorker = createWorker( + boss, + workflowQueueName, + proxy.proxyWorkflow + ); for (let i = 0; i < opts.queueConcurrency; i++) { await boss.work(workflowQueueName, workflowWorker); @@ -66,7 +70,11 @@ export function createPgBossQueue( }; } -function createWorker(proxy: WkfProxy[keyof WkfProxy]) { +function createWorker( + boss: PgBoss, + queueName: string, + proxy: WkfProxy[keyof WkfProxy] +) { return async ([job]: PgBoss.Job[]) => { const message = MessageData.parse(job.data); @@ -75,13 +83,21 @@ function createWorker(proxy: WkfProxy[keyof WkfProxy]) { try { const response = await proxy(message); - // TODO: Properly handle 503 if (response.status === 503) { - const body = (await response.json()) as { - timeoutSeconds?: number; - }; + const body = (await response.json()) as { timeoutSeconds?: number }; + if (body.timeoutSeconds) { - throw new Error(`Retry after ${body.timeoutSeconds}s`); + await boss.send(queueName, job.data, { + startAfter: new Date(Date.now() + body.timeoutSeconds * 1000), + singletonKey: message?.idempotencyKey ?? message.messageId, + retryLimit: 3, + }); + + console.log( + `[${job.id}] requeued: ${message.queueName} for ${body.timeoutSeconds}s` + ); + + return; } } diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 85d38ef1d..55376abe9 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -83,7 +83,6 @@ export function createQueue( messageId: message.messageId, }); - // TODO: Understand what's this? let timeoutSeconds: number | null = null; if (typeof result?.timeoutSeconds === 'number') { timeoutSeconds = Math.min( From a27e86fd072fde4db6cda0eb2ef2c9bfc7063e20 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Fri, 21 Nov 2025 01:14:58 +0100 Subject: [PATCH 15/16] Queue with 0 attempts Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/src/queue.ts | 2 +- packages/world-postgres/src/storage.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 55376abe9..1396342af 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -35,7 +35,7 @@ export function createQueue( const payload = { id: queueId, data: body, - attempt: 1, + attempt: 0, messageId, idempotencyKey: opts?.idempotencyKey, queueName: `${prefix}${queueId}`, diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 7fa438ee6..0b89607fd 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -596,7 +596,7 @@ export function createStepsStorage(drizzle: Drizzle): Storage['steps'] { stepName: data.stepName, input: inputBuffer, status: 'pending', - attempt: 1, + attempt: 0, }) .onConflictDoNothing() .returning(); From 376b4bd02f79383381d9557a660f16eeb92d2307 Mon Sep 17 00:00:00 2001 From: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> Date: Fri, 21 Nov 2025 01:19:04 +0100 Subject: [PATCH 16/16] Typo Signed-off-by: paulhenri-l <25308170+paulhenri-l@users.noreply.github.com> --- packages/world-postgres/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/world-postgres/README.md b/packages/world-postgres/README.md index 1e06649ed..42027d2d5 100644 --- a/packages/world-postgres/README.md +++ b/packages/world-postgres/README.md @@ -37,7 +37,7 @@ WORKFLOW_POSTGRES_APP_URL="http://localhost:3000" You can also create a PostgreSQL world directly in your code: ```typescript -import { createWorld, createPgBossQueue } from "@workflow/world-postgres"; +import { createWorld, createPgBossHttpProxyQueue } from "@workflow/world-postgres"; const world = createWorld({ connectionString: "postgres://username:password@localhost:5432/database",