Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/runtime/world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const createWorld = (): World => {
return createEmbeddedWorld({
dataDir: process.env.WORKFLOW_EMBEDDED_DATA_DIR,
port: process.env.PORT ? Number(process.env.PORT) : undefined,
baseUrl: process.env.WORKFLOW_BASE_URL,
});
}

Expand Down
30 changes: 29 additions & 1 deletion packages/world-local/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ const getDataDirFromEnv = () => {

export const DEFAULT_RESOLVE_DATA_OPTION = 'all';

export const normalizeBaseUrl = (
value: string,
sourceLabel = 'baseUrl' // NOTE: Should I keep this? This helps us throw errors like Invalid baseUrl: … or WORKFLOW_BASE_URL cannot be empty
) => {
const candidate = value.trim();
if (candidate.length === 0) {
throw new Error(`${sourceLabel} cannot be empty`);
}

try {
const url = new URL(candidate);
// Preserve any path segments (for reverse proxies) but trim trailing slashes to avoid duplicates.
return url.href.replace(/\/+$/, '');
} catch {
throw new Error(`Invalid ${sourceLabel}: ${value}`);
}
};

const getBaseUrlFromEnv = () => {
const baseUrl = process.env.WORKFLOW_BASE_URL;
if (!baseUrl) {
return null;
}

return normalizeBaseUrl(baseUrl, 'WORKFLOW_BASE_URL');
};

const getPortFromEnv = () => {
const port = process.env.PORT;
if (port) {
Expand All @@ -18,6 +45,7 @@ const getPortFromEnv = () => {
export const config = once(() => {
const dataDir = getDataDirFromEnv();
const port = getPortFromEnv();
const baseUrl = getBaseUrlFromEnv();

return { dataDir, port };
return { dataDir, port, baseUrl };
});
21 changes: 18 additions & 3 deletions packages/world-local/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { World } from '@workflow/world';
import { config } from './config.js';
import { config, normalizeBaseUrl } from './config.js';
import { createQueue } from './queue.js';
import { createStorage } from './storage.js';
import { createStreamer } from './streamer.js';
Expand All @@ -9,18 +9,33 @@ import { createStreamer } from './streamer.js';
*
* @param dataDir - The directory to use for storage. If not provided, the default data dir will be used.
* @param port - The port to use for the queue. If not provided, the default port will be used.
* @param baseUrl - The base URL to use for the queue. Takes priority over port if provided.
*/
export function createEmbeddedWorld({
dataDir,
port,
baseUrl,
Comment thread
vercel[bot] marked this conversation as resolved.
}: {
dataDir?: string;
port?: number;
baseUrl?: string;
}): World {
const dir = dataDir ?? config.value.dataDir;
const queuePort = port ?? config.value.port;

const overrideBaseUrl =
typeof baseUrl === 'string' ? normalizeBaseUrl(baseUrl, 'baseUrl') : null;

// TODO: confirm with Gal on priority. Current Priority - baseUrl parameter > config baseUrl > port parameter > config port > default 3000
const queueBaseUrl =
overrideBaseUrl ?? config.value.baseUrl ?? port ?? config.value.port;

const finalUrl =
typeof queueBaseUrl === 'string'
? queueBaseUrl
: `http://localhost:${queueBaseUrl}`;

return {
...createQueue(queuePort),
...createQueue(finalUrl),
...createStorage(dir),
...createStreamer(dir),
};
Expand Down
9 changes: 7 additions & 2 deletions packages/world-local/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ const httpAgent = new Agent({
headersTimeout: 0,
});

export function createQueue(port?: number): Queue {
export function createQueue(portOrBaseUrl?: number | string): Queue {
const transport = new JsonTransport();
const generateId = monotonicFactory();

const baseUrl =
typeof portOrBaseUrl === 'string'
? portOrBaseUrl
: `http://localhost:${portOrBaseUrl ?? 3000}`; // NOTE: nullish coalescing keeps port 0 working for ephemeral bindings.

/**
* holds inflight messages by idempotency key to ensure
* that we don't queue the same message multiple times
Expand Down Expand Up @@ -61,7 +66,7 @@ export function createQueue(port?: number): Queue {
defaultRetriesLeft--;

const response = await fetch(
`http://localhost:${port}/.well-known/workflow/v1/${pathname}`,
`${baseUrl}/.well-known/workflow/v1/${pathname}`,
{
method: 'POST',
duplex: 'half',
Expand Down
Loading