Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fuzzy-boxes-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/utils": patch
---

fix(utils): detect linux ports via /proc
10 changes: 10 additions & 0 deletions .changeset/kind-suns-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@workflow/world-postgres": patch
"@workflow/sveltekit": patch
"@workflow/builders": patch
---

- export stepEntrypoint and workflowEntrypoint from build
- add abstract queue driver to world postgres
- add execution strategy to world postgres
- add Graphile Worker as alternative queue driver (set `WORKFLOW_QUEUE_DRIVER=graphile` to enable)
58 changes: 49 additions & 9 deletions .github/scripts/aggregate-benchmarks.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const [, , resultsDir = '.'] = process.argv;
const backendConfig = {
local: { emoji: '💻', label: 'Local' },
postgres: { emoji: '🐘', label: 'Postgres' },
'postgres-pgboss': { emoji: '🐘', label: 'Postgres (pg-boss)' },
'postgres-graphile': { emoji: '🐘', label: 'Postgres (graphile)' },
vercel: { emoji: '▲', label: 'Vercel' },
};

Expand Down Expand Up @@ -49,9 +51,35 @@ function findBenchmarkFiles(dir) {
// Parse filename to extract app and backend
function parseFilename(filename) {
// Format: bench-results-{app}-{backend}.json
const match = filename.match(/bench-results-(.+)-(\w+)\.json$/);
if (!match) return null;
return { app: match[1], backend: match[2] };
// Backend can be: local, postgres, postgres-pgboss, postgres-graphile, vercel
const knownBackends = [
Comment on lines +54 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actively making improvements to the benchmarking stuff so expect some merge conflicts on adding your own stuff to benchmark

'postgres-pgboss',
'postgres-graphile',
'postgres',
'local',
'vercel',
];

const baseName = filename
.replace(/\.json$/, '')
.replace(/^bench-results-/, '');
if (!baseName) return null;

// Try each known backend (longest first to match postgres-pgboss before postgres)
for (const backend of knownBackends) {
if (baseName.endsWith(`-${backend}`)) {
const app = baseName.slice(0, -(backend.length + 1));
return { app, backend };
}
}

// Fallback: last segment after hyphen
const lastHyphen = baseName.lastIndexOf('-');
if (lastHyphen === -1) return null;
return {
app: baseName.slice(0, lastHyphen),
backend: baseName.slice(lastHyphen + 1),
};
}

// Load timing data for a benchmark file
Expand Down Expand Up @@ -147,11 +175,20 @@ function getAppsAndBackends(data) {
}
}

// Sort: local, postgres, vercel for backends
const backendOrder = ['local', 'postgres', 'vercel'];
const sortedBackends = [...backends].sort(
(a, b) => backendOrder.indexOf(a) - backendOrder.indexOf(b)
);
// Sort: local, postgres variants, vercel for backends
const backendOrder = [
'local',
'postgres',
'postgres-pgboss',
'postgres-graphile',
'vercel',
];
const sortedBackends = [...backends].sort((a, b) => {
const aIdx = backendOrder.indexOf(a);
const bIdx = backendOrder.indexOf(b);
// Unknown backends go to the end
return (aIdx === -1 ? 999 : aIdx) - (bIdx === -1 ? 999 : bIdx);
});

// Sort apps alphabetically
const sortedApps = [...apps].sort();
Expand Down Expand Up @@ -348,7 +385,10 @@ function renderComparison(data) {
console.log('');
console.log('**Backends:**');
console.log('- 💻 Local: In-memory filesystem backend');
console.log('- 🐘 Postgres: PostgreSQL database backend');
console.log('- 🐘 Postgres (pg-boss): PostgreSQL with pg-boss queue driver');
console.log(
'- 🐘 Postgres (graphile): PostgreSQL with Graphile Worker queue driver'
);
console.log('- ▲ Vercel: Vercel production backend');
console.log('</details>');
}
Expand Down
16 changes: 9 additions & 7 deletions .github/workflows/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jobs:

# Phase 2b: Postgres benchmarks (with postgres service)
benchmark-postgres:
name: Benchmark Postgres (${{ matrix.app }})
name: Benchmark Postgres/${{ matrix.queue-driver }} (${{ matrix.app }})
runs-on: ubuntu-latest
needs: build
timeout-minutes: 30
Expand All @@ -148,6 +148,7 @@ jobs:
matrix:
# Note: Use actual directory names, not symlinks (nitro -> nitro-v3)
app: [nextjs-turbopack, nitro-v3, express]
queue-driver: [pgboss, graphile]

services:
postgres:
Expand All @@ -169,6 +170,7 @@ jobs:
TURBO_TEAM: ${{ vars.TURBO_TEAM }}
WORKFLOW_TARGET_WORLD: "@workflow/world-postgres"
WORKFLOW_POSTGRES_URL: "postgres://world:world@localhost:5432/world"
WORKFLOW_QUEUE_DRIVER: ${{ matrix.queue-driver }}

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -207,22 +209,22 @@ jobs:
echo "Waiting for server to start..."
sleep 15
cd ../..
pnpm vitest bench packages/core/e2e/bench.bench.ts --run --outputJson=bench-results-${{ matrix.app }}-postgres.json
pnpm vitest bench packages/core/e2e/bench.bench.ts --run --outputJson=bench-results-${{ matrix.app }}-postgres-${{ matrix.queue-driver }}.json

- name: Render benchmark results
uses: ./.github/actions/render-benchmarks
with:
benchmark-file: bench-results-${{ matrix.app }}-postgres.json
benchmark-file: bench-results-${{ matrix.app }}-postgres-${{ matrix.queue-driver }}.json
app-name: ${{ matrix.app }}
backend: postgres
backend: postgres-${{ matrix.queue-driver }}

- name: Upload benchmark results
uses: actions/upload-artifact@v4
with:
name: bench-results-${{ matrix.app }}-postgres
name: bench-results-${{ matrix.app }}-postgres-${{ matrix.queue-driver }}
path: |
bench-results-${{ matrix.app }}-postgres.json
bench-timings-${{ matrix.app }}-postgres.json
bench-results-${{ matrix.app }}-postgres-${{ matrix.queue-driver }}.json
bench-timings-${{ matrix.app }}-postgres-${{ matrix.queue-driver }}.json

# Phase 2c: Vercel benchmarks (needs build artifacts for packages)
benchmark-vercel:
Expand Down
8 changes: 5 additions & 3 deletions packages/builders/src/base-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,13 @@ export abstract class BaseBuilder {

const entryContent = `
// Built in steps
import { stepEntrypoint } from 'workflow/runtime';
import '${builtInSteps}';
// User steps
${imports}
// API entrypoint
export { stepEntrypoint as POST } from 'workflow/runtime';`;
export const __wkf_entrypoint = stepEntrypoint;
export const POST = stepEntrypoint;`;

// Bundle with esbuild and our custom SWC plugin
const esbuildCtx = await esbuild.context({
Expand Down Expand Up @@ -549,8 +551,8 @@ export abstract class BaseBuilder {
import { workflowEntrypoint } from 'workflow/runtime';

const workflowCode = \`${workflowBundleCode.replace(/[\\`$]/g, '\\$&')}\`;

export const POST = workflowEntrypoint(workflowCode);`;
export const __wkf_entrypoint = workflowEntrypoint(workflowCode);
export const POST = __wkf_entrypoint`;

// we skip the final bundling step for Next.js so it can bundle itself
if (!bundleFinalOutput) {
Expand Down
6 changes: 3 additions & 3 deletions packages/sveltekit/src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class SvelteKitBuilder extends BaseBuilder {

// Replace the default export with SvelteKit-compatible handler
stepsRouteContent = stepsRouteContent.replace(
/export\s*\{\s*stepEntrypoint\s+as\s+POST\s*\}\s*;?$/m,
/export\s*const\s*POST\s*=\s*stepEntrypoint\s*;$/m,
`${SVELTEKIT_REQUEST_CONVERTER}
export const POST = async ({request}) => {
const normalRequest = await convertSvelteKitRequest(request);
Expand Down Expand Up @@ -133,11 +133,11 @@ export const POST = async ({request}) => {

// Replace the default export with SvelteKit-compatible handler
workflowsRouteContent = workflowsRouteContent.replace(
/export const POST = workflowEntrypoint\(workflowCode\);?$/m,
/export\s*const\s*POST\s*=\s*__wkf_entrypoint\s*;?$/m,
`${SVELTEKIT_REQUEST_CONVERTER}
export const POST = async ({request}) => {
const normalRequest = await convertSvelteKitRequest(request);
return workflowEntrypoint(workflowCode)(normalRequest);
return __wkf_entrypoint(normalRequest);
}`
);
await writeFile(workflowsRouteFile, workflowsRouteContent);
Expand Down
126 changes: 122 additions & 4 deletions packages/utils/src/get-port.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,118 @@
import { readdir, readFile, readlink } from 'node:fs/promises';
import { execa } from 'execa';

/**
* Parses a port string and returns it if valid (0-65535), otherwise undefined.
*/
function parsePort(value: string, radix = 10): number | undefined {
const port = parseInt(value, radix);
if (!Number.isNaN(port) && port >= 0 && port <= 65535) {
return port;
}
return undefined;
}

/**
* Gets listening ports for the current process on Linux by reading /proc filesystem.
* This approach requires no external commands and works on all Linux systems.
*/
async function getLinuxPort(pid: number): Promise<number | undefined> {
const listenState = '0A'; // TCP LISTEN state in /proc/net/tcp
const tcpFiles = ['/proc/net/tcp', '/proc/net/tcp6'] as const;

// Step 1: Get socket inodes from /proc/<pid>/fd/ in order
// We preserve order to maintain deterministic behavior (return first port)
// Use both array (for order) and Set (for O(1) lookup)
const socketInodes: string[] = [];
const socketInodesSet = new Set<string>();
const fdPath = `/proc/${pid}/fd`;

try {
const fds = await readdir(fdPath);
// Sort FDs numerically to ensure deterministic order (FDs are always numeric strings)
const sortedFds = fds.sort((a, b) => {
const numA = Number.parseInt(a, 10);
const numB = Number.parseInt(b, 10);
return numA - numB;
});

const results = await Promise.allSettled(
sortedFds.map(async (fd) => {
const link = await readlink(`${fdPath}/${fd}`);
// Socket links look like: socket:[12345]
const match = link.match(/^socket:\[(\d+)\]$/);
return match?.[1] ?? null;
})
);

for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
socketInodes.push(result.value);
socketInodesSet.add(result.value);
}
}
} catch {
// Process might not exist or no permission
return undefined;
}

if (socketInodes.length === 0) {
return undefined;
}

// Step 2: Read /proc/net/tcp and /proc/net/tcp6 to find listening sockets
// Format: sl local_address rem_address st ... inode
// local_address is hex IP:port, st=0A means LISTEN
// We iterate through socket inodes in order to maintain deterministic behavior
for (const tcpFile of tcpFiles) {
try {
const content = await readFile(tcpFile, 'utf8');
const lines = content.split('\n').slice(1); // Skip header

// Build a map of inode -> port for quick lookup
const inodeToPort = new Map<string, number>();
for (const line of lines) {
if (!line.trim()) continue; // Skip empty lines

const parts = line.trim().split(/\s+/);
if (parts.length < 10) continue;

const localAddr = parts[1]; // e.g., "00000000:0BB8" (0.0.0.0:3000)
const state = parts[3]; // "0A" = LISTEN
const inode = parts[9];

if (!localAddr || state !== listenState || !inode) continue;
if (!socketInodesSet.has(inode)) continue;

// Extract port from hex format (e.g., "0BB8" -> 3000)
const colonIndex = localAddr.indexOf(':');
if (colonIndex === -1) continue;

const portHex = localAddr.slice(colonIndex + 1);
if (!portHex) continue;

const port = parsePort(portHex, 16);
if (port !== undefined) {
inodeToPort.set(inode, port);
}
}

// Return the first port matching our socket inodes in order
for (const inode of socketInodes) {
const port = inodeToPort.get(inode);
if (port !== undefined) {
return port;
}
}
} catch {
// File might not exist (e.g., no IPv6 support) - continue to next file
continue;
}
}

return undefined;
}

/**
* Gets the port number that the process is listening on.
* @returns The port number that the process is listening on, or undefined if the process is not listening on any port.
Expand All @@ -11,7 +124,10 @@ export async function getPort(): Promise<number | undefined> {

try {
switch (platform) {
case 'linux':
case 'linux': {
port = await getLinuxPort(pid);
break;
}
case 'darwin': {
const lsofResult = await execa('lsof', [
'-a',
Expand All @@ -28,7 +144,7 @@ export async function getPort(): Promise<number | undefined> {
input: lsofResult.stdout,
}
);
port = parseInt(awkResult.stdout.trim(), 10);
port = parsePort(awkResult.stdout.trim());
break;
}

Expand All @@ -50,8 +166,10 @@ export async function getPort(): Promise<number | undefined> {
.trim()
.match(/^\s*TCP\s+(?:\[[\da-f:]+\]|[\d.]+):(\d+)\s+/i);
if (match) {
port = parseInt(match[1], 10);
break;
port = parsePort(match[1]);
if (port !== undefined) {
break;
}
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions packages/world-postgres/DEV_NOTES.md

This file was deleted.

Loading
Loading