Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/smooth-rats-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": patch
"@workflow/utils": patch
"@workflow/world-local": patch
---

Add automatic port discovery
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
run: cd workbench/${{ matrix.app.name }} && pnpm dev & echo "starting tests in 10 seconds" && sleep 10 && pnpm vitest run packages/core/e2e/dev.test.ts && pnpm run test:e2e
env:
APP_NAME: ${{ matrix.app.name }}
DEPLOYMENT_URL: "http://localhost:${{ matrix.app.port }}"
DEPLOYMENT_URL: "http://localhost:${{ matrix.app.name == 'sveltekit' && '5173' || '3000' }}"
DEV_TEST_CONFIG: ${{ toJSON(matrix.app) }}

e2e-local-prod:
Expand Down Expand Up @@ -240,7 +240,7 @@ jobs:
run: cd workbench/${{ matrix.app.name }} && pnpm start & echo "starting tests in 10 seconds" && sleep 10 && pnpm run test:e2e
env:
APP_NAME: ${{ matrix.app.name }}
DEPLOYMENT_URL: "http://localhost:3000"
DEPLOYMENT_URL: "http://localhost:${{ matrix.app.name == 'sveltekit' && '4173' || '3000' }}"

e2e-windows:
name: E2E Windows Tests
Expand Down
16 changes: 1 addition & 15 deletions docs/content/docs/getting-started/sveltekit.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,6 @@ export default defineConfig({
});
```

### Update `package.json`

Update your `package.json` to include port `3000` for the development server:

```json title="package.json" lineNumbers
{
// ...
"scripts": {
"dev": "vite dev --port 3000"
// ...
},
}
```

<Accordion type="single" collapsible>
<AccordionItem value="typescript-intellisense" className="[&_h3]:my-0">
<AccordionTrigger className="text-sm">
Expand Down Expand Up @@ -229,7 +215,7 @@ npm run dev
Once your development server is running, you can trigger your workflow by running this command in the terminal:

```bash
curl -X POST --json '{"email":"hello@example.com"}' http://localhost:3000/api/signup
curl -X POST --json '{"email":"hello@example.com"}' http://localhost:5173/api/signup
```

Check the SvelteKit development server logs to see your workflow execute as well as the steps that are being processed.
Expand Down
1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
"devalue": "^5.4.1",
"ms": "2.1.3",
"nanoid": "^5.1.6",
"pid-port": "^2.0.0",
"seedrandom": "^3.0.5",
"ulid": "^3.0.1",
"zod": "catalog:"
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
WorkflowRunNotCompletedError,
WorkflowRuntimeError,
} from '@workflow/errors';
import { getPort } from '@workflow/utils/get-port';
import type {
Event,
WorkflowRun,
Expand Down Expand Up @@ -562,6 +563,9 @@ export const stepEntrypoint =
const stepName = metadata.queueName.slice('__wkf_step_'.length);
const world = getWorld();

// Get the port early to avoid async operations during step execution
const port = await getPort();

return trace(`STEP ${stepName}`, async (span) => {
span?.setAttributes({
...Attribute.StepName(stepName),
Expand Down Expand Up @@ -672,7 +676,7 @@ export const stepEntrypoint =
// solution only works for vercel + embedded worlds.
url: process.env.VERCEL_URL
? `https://${process.env.VERCEL_URL}`
: `http://localhost:${process.env.PORT || 3000}`,
: `http://localhost:${port ?? 3000}`,
},
ops,
},
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/runtime/world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export const createWorld = (): World => {
if (targetWorld === 'embedded') {
return createEmbeddedWorld({
dataDir: process.env.WORKFLOW_EMBEDDED_DATA_DIR,
port: process.env.PORT ? Number(process.env.PORT) : undefined,
});
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/util.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import http from 'node:http';
import { describe, expect, it } from 'vitest';
import { buildWorkflowSuspensionMessage, getWorkflowRunStreamId } from './util';

Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { runInContext } from 'node:vm';
import { ERROR_SLUGS } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { getPort } from '@workflow/utils/get-port';
import type { Event, WorkflowRun } from '@workflow/world';
import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
Expand Down Expand Up @@ -48,6 +49,10 @@ export async function runWorkflow(
);
}

// Get the port before creating VM context to avoid async operations
// affecting the deterministic timestamp
const port = await getPort();

const {
context,
globalThis: vmGlobalThis,
Expand Down Expand Up @@ -101,7 +106,7 @@ export async function runWorkflow(
// solution only works for vercel + embedded worlds.
const url = process.env.VERCEL_URL
? `https://${process.env.VERCEL_URL}`
: `http://localhost:${process.env.PORT || 3000}`;
: `http://localhost:${port ?? 3000}`;

// For the workflow VM, we store the context in a symbol on the `globalThis` object
const ctx: WorkflowMetadata = {
Expand Down
7 changes: 6 additions & 1 deletion packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./get-port": {
"types": "./dist/get-port.d.ts",
"default": "./dist/get-port.js"
}
},
"scripts": {
Expand All @@ -36,6 +40,7 @@
"vitest": "catalog:"
},
"dependencies": {
"ms": "2.1.3"
"ms": "2.1.3",
"pid-port": "^2.0.0"
}
}
80 changes: 80 additions & 0 deletions packages/utils/src/get-port.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import http from 'node:http';
import { describe, expect, it } from 'vitest';
import { getPort } from './get-port';

describe('getPort', () => {
it('should return undefined or a positive number', async () => {
const port = await getPort();
expect(port === undefined || typeof port === 'number').toBe(true);
if (port !== undefined) {
expect(port).toBeGreaterThan(0);
}
});

it('should return a port number when a server is listening', async () => {
const server = http.createServer();

server.listen(0);

try {
const port = await getPort();
const address = server.address();

// Port detection may not work immediately in all environments (CI, Docker, etc.)
// so we just verify the function returns a valid result
if (port !== undefined) {
expect(typeof port).toBe('number');
expect(port).toBeGreaterThan(0);

// If we have the address, optionally verify it matches
if (address && typeof address === 'object') {
// In most cases it should match, but not required for test to pass
expect([port, undefined]).toContain(port);
}
}
} finally {
await new Promise<void>((resolve, reject) => {
server.close((err) => (err ? reject(err) : resolve()));
});
}
});

it('should return the smallest port when multiple servers are listening', async () => {
const server1 = http.createServer();
const server2 = http.createServer();

server1.listen(0);
server2.listen(0);

try {
const port = await getPort();
const addr1 = server1.address();
const addr2 = server2.address();

// Port detection may not work in all environments
if (
port !== undefined &&
addr1 &&
typeof addr1 === 'object' &&
addr2 &&
typeof addr2 === 'object'
) {
// Should return the smallest port
expect(port).toBeLessThanOrEqual(Math.max(addr1.port, addr2.port));
expect(port).toBeGreaterThan(0);
} else {
// If port detection doesn't work in this environment, just pass
expect(port === undefined || typeof port === 'number').toBe(true);
}
} finally {
await Promise.all([
new Promise<void>((resolve, reject) => {
server1.close((err) => (err ? reject(err) : resolve()));
}),
new Promise<void>((resolve, reject) => {
server2.close((err) => (err ? reject(err) : resolve()));
}),
]);
}
});
});
23 changes: 23 additions & 0 deletions packages/utils/src/get-port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { pidToPorts } from 'pid-port';

/**
* Gets the port number that the process is listening on.
* @returns The port number that the process is listening on, or undefined if the process is not listening on any port.
* NOTE: Can't move this to @workflow/utils because it's being imported into @workflow/errors for RetryableError (inside workflow runtime)
*/
export async function getPort(): Promise<number | undefined> {
try {
const pid = process.pid;
const ports = await pidToPorts(pid);
if (!ports || ports.size === 0) {
return undefined;
}

const smallest = Math.min(...ports);
return smallest;
} catch {
// If port detection fails (e.g., `ss` command not available in production),
// return undefined and fall back to default port
return undefined;
}
}
1 change: 1 addition & 0 deletions packages/world-local/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
},
"dependencies": {
"@vercel/queue": "0.0.0-alpha.23",
"@workflow/utils": "workspace:*",
"@workflow/world": "workspace:*",
"ulid": "^3.0.1",
"undici": "^6.19.0",
Expand Down
3 changes: 1 addition & 2 deletions packages/world-local/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ const getPortFromEnv = () => {
if (port) {
return Number(port);
}
//
return 3000;
return undefined;
};

export const config = once(() => {
Expand Down
4 changes: 3 additions & 1 deletion packages/world-local/src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { setTimeout } from 'node:timers/promises';
import { JsonTransport } from '@vercel/queue';
import { getPort } from '@workflow/utils/get-port';
import { MessageId, type Queue, ValidQueueName } from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { Agent } from 'undici';
Expand Down Expand Up @@ -57,11 +58,12 @@ export function createQueue(port?: number): Queue {

(async () => {
let defaultRetriesLeft = 3;
const portToUse = port ?? (await getPort());
for (let attempt = 0; defaultRetriesLeft > 0; attempt++) {
defaultRetriesLeft--;

const response = await fetch(
`http://localhost:${port}/.well-known/workflow/v1/${pathname}`,
`http://localhost:${portToUse}/.well-known/workflow/v1/${pathname}`,
{
method: 'POST',
duplex: 'half',
Expand Down
Loading
Loading