Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9031c02
Core: require specifying runId when writing to stream
VaguelySerious Nov 5, 2025
69ae717
Changeset
VaguelySerious Nov 5, 2025
f9be1b6
Fix streamer URL
VaguelySerious Nov 5, 2025
69ba8e6
Add "use client" directive on top-level exports (node16 nextjs support)
VaguelySerious Nov 5, 2025
1399d4b
Revert "Add "use client" directive on top-level exports (node16 nextj…
VaguelySerious Nov 5, 2025
e4b21ff
Add "use client" directive on top-level exports (node16 nextjs support)
VaguelySerious Nov 5, 2025
d6e92a2
Merge branch 'peter/compat-nextjs-prev' into peter/stream-run-id
VaguelySerious Nov 5, 2025
091a24b
Fix: Stream operations during workflow argument serialization use a p…
vercel[bot] Nov 5, 2025
8f8ea13
Fix via Vade
vercel[bot] Nov 5, 2025
035876e
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 6, 2025
684cb73
runId only required for writing to streams
VaguelySerious Nov 6, 2025
7e6d081
Fixes
VaguelySerious Nov 6, 2025
fb783b9
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 6, 2025
7d07afc
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 7, 2025
a67edb5
Fix unit tests
VaguelySerious Nov 7, 2025
7935f4e
Fix unit tests
VaguelySerious Nov 7, 2025
09aeb0b
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 7, 2025
1a38950
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 7, 2025
c59cca3
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 7, 2025
205c22f
Might as well try
VaguelySerious Nov 7, 2025
2d4dbdc
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 8, 2025
6708473
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 10, 2025
f6ca5ae
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 10, 2025
0fe0303
Forward compat
VaguelySerious Nov 10, 2025
6edcd56
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 11, 2025
fd4449c
test
VaguelySerious Nov 11, 2025
c58e431
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 11, 2025
de32083
Revert "test"
VaguelySerious Nov 11, 2025
58e4247
Fix
VaguelySerious Nov 11, 2025
79ea6d5
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 12, 2025
5b83acc
Fix test
VaguelySerious Nov 12, 2025
598ee12
Revert "Fix"
VaguelySerious Nov 12, 2025
bcaf3ba
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 14, 2025
e3adee9
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 14, 2025
cd4faeb
Fix start serialization
VaguelySerious Nov 14, 2025
a498eae
Lazy resolve runId instead
VaguelySerious Nov 15, 2025
96a048f
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 15, 2025
ed18118
Fix build
VaguelySerious Nov 15, 2025
43d7e7b
Fix build
VaguelySerious Nov 15, 2025
4b43d39
Fix e2e test
VaguelySerious Nov 15, 2025
d05c589
Fix
VaguelySerious Nov 15, 2025
2684525
Merge branch 'main' into peter/stream-run-id
VaguelySerious Nov 16, 2025
844d4d9
Isolate changes
VaguelySerious Nov 16, 2025
b1ce7f5
Fix closeStream
VaguelySerious Nov 16, 2025
9a89a0f
Undo promise naming
VaguelySerious Nov 16, 2025
57d10e4
Add test for promise in local-world
VaguelySerious Nov 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/purple-pianos-stare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/web-shared": patch
"@workflow/world": patch
"@workflow/core": patch
---

Require specifying runId when writing to stream
14 changes: 13 additions & 1 deletion packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { withResolvers } from '@workflow/utils';
import { assert, describe, expect, test } from 'vitest';
import { dehydrateWorkflowArguments } from '../src/serialization';
import { cliInspectJson, isLocalDeployment } from './utils';
Expand All @@ -21,18 +22,29 @@

url.searchParams.set('workflowFile', workflowFile);
url.searchParams.set('workflowFn', workflowFn);

const ops: Promise<void>[] = [];
const { promise: runIdPromise, resolve: resolveRunId } =
withResolvers<string>();
const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise);

const res = await fetch(url, {
method: 'POST',
body: JSON.stringify(dehydrateWorkflowArguments(args, [], globalThis)),
body: JSON.stringify(dehydratedArgs),
});
if (!res.ok) {
throw new Error(

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > sleepingWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=sleepingWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=sleepingWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:312:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > webhookWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=webhookWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=webhookWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:212:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > hookWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=hookWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=hookWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:149:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > readableStreamWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=readableStreamWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=readableStreamWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:132:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseAnyWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseAnyWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseAnyWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:126:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseRaceWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseRaceWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseRaceWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:120:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > promiseAllWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseAllWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=promiseAllWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:114:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F98_duplicate_case.ts&workflowFn=addTenWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F98_duplicate_case.ts&workflowFn=addTenWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:92:17

Check failure on line 36 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Failed to trigger workflow: http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=addTenWorkflow 500: { "error": true, "url": "http://localhost:3000/api/trigger?workflowFile=workflows%2F99_e2e.ts&workflowFn=addTenWorkflow", "status": 500, "message": "Entrypoint is not defined", "stack": [ "Entrypoint is not defined", "at #generateError (/home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:658:11)", "at /home/runner/work/workflow/workflow/node_modules/.pnpm/nitro-nightly@3.0.1-20251031-202656-883af4f9_@netlify+blobs@9.1.2_@vercel+functions@3.1_a6d9319db1166ce0e8174b6f5f9420d7/node_modules/nitro-nightly/dist/_dev.mjs:516:43)" ] } ❯ triggerWorkflow packages/core/e2e/e2e.test.ts:36:11 ❯ packages/core/e2e/e2e.test.ts:92:17
`Failed to trigger workflow: ${res.url} ${
res.status
}: ${await res.text()}`
);
}
const run = await res.json();
resolveRunId(run.runId);

// Resolve and wait for any stream operations
await Promise.all(ops);

return run;
}

Expand Down Expand Up @@ -79,7 +91,7 @@
])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => {
const run = await triggerWorkflow(workflow, [123]);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe(133);

Check failure on line 94 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (hono - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

AssertionError: expected { error: true, …(4) } to be 133 // Object.is equality - Expected: 133 + Received: { "error": true, "message": "fetch failed", "stack": [ "fetch failed", "at node:internal/deps/undici/undici:14900:13", ], "status": 500, "url": "http://localhost:3000/api/trigger?runId=wrun_01KA78VH95NZ4R0ZHFFXSYDYPA", } ❯ packages/core/e2e/e2e.test.ts:94:25

const { json } = await cliInspectJson(`runs ${run.runId} --withData`);
expect(json).toMatchObject({
Expand Down Expand Up @@ -291,7 +303,7 @@
method: 'POST',
body: JSON.stringify({}),
});
expect(res.status).toBe(404);

Check failure on line 306 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nitro - stable)

packages/core/e2e/e2e.test.ts > e2e > webhook route with invalid token

AssertionError: expected 500 to be 404 // Object.is equality - Expected + Received - 404 + 500 ❯ packages/core/e2e/e2e.test.ts:306:24
const body = await res.text();
expect(body).toBe('');
});
Expand Down
36 changes: 29 additions & 7 deletions packages/core/src/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,21 @@ const streamPrintRevivers: Record<string, (value: any) => any> = {
};

const hydrateStepIO = <
T extends { stepId?: string; input?: any; output?: any },
T extends { stepId?: string; input?: any; output?: any; runId?: string },
>(
step: T
): T => {
return {
...step,
input:
step.input && Array.isArray(step.input) && step.input.length
? hydrateStepArguments(step.input, [], globalThis, streamPrintRevivers)
? hydrateStepArguments(
step.input,
[],
step.runId as string,
globalThis,
streamPrintRevivers
)
: step.input,
output: step.output
? hydrateStepReturnValue(step.output, globalThis, streamPrintRevivers)
Expand All @@ -79,14 +85,17 @@ const hydrateWorkflowIO = <
? hydrateWorkflowReturnValue(
workflow.output,
[],
workflow.runId as string,
globalThis,
streamPrintRevivers
)
: workflow.output,
};
};

const hydrateEventData = <T extends { eventId?: string; eventData?: any }>(
const hydrateEventData = <
T extends { eventId?: string; eventData?: any; runId?: string },
>(
event: T
): T => {
if (!event.eventData) {
Expand All @@ -99,7 +108,13 @@ const hydrateEventData = <T extends { eventId?: string; eventData?: any }>(
eventData: Object.fromEntries(
Object.entries(event.eventData).map(([key, value]) => [
key,
hydrateStepArguments(value as any, [], globalThis),
hydrateStepArguments(
value as any,
[],
event.runId as string,
globalThis,
streamPrintRevivers
),
])
),
};
Expand All @@ -110,9 +125,16 @@ const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
): T => {
return {
...hook,
metadata: hook.metadata
? hydrateStepArguments(hook.metadata, [], globalThis)
: hook.metadata,
metadata:
hook.metadata && 'runId' in hook
? hydrateStepArguments(
hook.metadata,
[],
hook.runId as string,
globalThis,
streamPrintRevivers
)
: hook.metadata,
};
};

Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export class Run<TResult> {
): ReadableStream<R> {
const { ops = [], global = globalThis, startIndex, namespace } = options;
const name = getWorkflowRunStreamId(this.runId, namespace);
return getExternalRevivers(global, ops).ReadableStream({
return getExternalRevivers(global, ops, this.runId).ReadableStream({
name,
startIndex,
}) as ReadableStream<R>;
Expand All @@ -197,7 +197,7 @@ export class Run<TResult> {
const run = await this.world.runs.get(this.runId);

if (run.status === 'completed') {
return hydrateWorkflowReturnValue(run.output, [], globalThis);
return hydrateWorkflowReturnValue(run.output, [], this.runId);
}

if (run.status === 'cancelled') {
Expand Down Expand Up @@ -671,7 +671,7 @@ export const stepEntrypoint =
}
// Hydrate the step input arguments
const ops: Promise<void>[] = [];
const args = hydrateStepArguments(step.input, ops);
const args = hydrateStepArguments(step.input, ops, workflowRunId);

span?.setAttributes({
...Attribute.StepArgumentsCount(args.length),
Expand All @@ -698,7 +698,7 @@ export const stepEntrypoint =
() => stepFn(...args)
);

result = dehydrateStepReturnValue(result, ops);
result = dehydrateStepReturnValue(result, ops, workflowRunId);

waitUntil(Promise.all(ops));

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export async function getHookByToken(token: string): Promise<Hook> {
const world = getWorld();
const hook = await world.hooks.getByToken(token);
if (typeof hook.metadata !== 'undefined') {
hook.metadata = hydrateStepArguments(hook.metadata as any, [], globalThis);
hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId);
}
return hook;
}
Expand Down Expand Up @@ -79,7 +79,7 @@ export async function resumeHook<T = any>(
const dehydratedPayload = dehydrateStepReturnValue(
payload,
ops,
globalThis
hook.runId
);
waitUntil(Promise.all(ops));

Expand Down
13 changes: 12 additions & 1 deletion packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { waitUntil } from '@vercel/functions';
import { WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { Run } from '../runtime.js';
import type { Serializable, WorkflowInvokePayload } from '../schemas.js';
import { dehydrateWorkflowArguments } from '../serialization.js';
Expand Down Expand Up @@ -87,7 +88,14 @@ export async function start<TArgs extends unknown[], TResult>(
const world = getWorld();
const deploymentId = opts.deploymentId ?? (await world.getDeploymentId());
const ops: Promise<void>[] = [];
const workflowArguments = dehydrateWorkflowArguments(args, ops);
const { promise: runIdPromise, resolve: resolveRunId } =
withResolvers<string>();

const workflowArguments = dehydrateWorkflowArguments(
args,
ops,
runIdPromise
);
// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();

Expand All @@ -97,6 +105,9 @@ export async function start<TArgs extends unknown[], TResult>(
input: workflowArguments,
executionContext: { traceCarrier },
});

resolveRunId(runResponse.runId);

waitUntil(Promise.all(ops));

span?.setAttributes({
Expand Down
42 changes: 22 additions & 20 deletions packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
import { STREAM_NAME_SYMBOL } from './symbols.js';
import { createContext } from './vm/index.js';

const mockRunId = 'wrun_mockidnumber0001';

describe('getStreamType', () => {
it('should return `undefined` for a regular stream', () => {
const stream = new ReadableStream();
Expand All @@ -41,7 +43,7 @@ describe('workflow arguments', () => {

it('should work with Date', () => {
const date = new Date('2025-07-17T04:30:34.824Z');
const serialized = dehydrateWorkflowArguments(date, []);
const serialized = dehydrateWorkflowArguments(date, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -61,7 +63,7 @@ describe('workflow arguments', () => {

it('should work with invalid Date', () => {
const date = new Date('asdf');
const serialized = dehydrateWorkflowArguments(date, []);
const serialized = dehydrateWorkflowArguments(date, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -81,7 +83,7 @@ describe('workflow arguments', () => {

it('should work with BigInt', () => {
const bigInt = BigInt('9007199254740992');
const serialized = dehydrateWorkflowArguments(bigInt, []);
const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -99,7 +101,7 @@ describe('workflow arguments', () => {

it('should work with BigInt negative', () => {
const bigInt = BigInt('-12345678901234567890');
const serialized = dehydrateWorkflowArguments(bigInt, []);
const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -120,7 +122,7 @@ describe('workflow arguments', () => {
[2, 'foo'],
[6, 'bar'],
]);
const serialized = dehydrateWorkflowArguments(map, []);
const serialized = dehydrateWorkflowArguments(map, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand Down Expand Up @@ -154,7 +156,7 @@ describe('workflow arguments', () => {

it('should work with Set', () => {
const set = new Set([1, '2', true]);
const serialized = dehydrateWorkflowArguments(set, []);
const serialized = dehydrateWorkflowArguments(set, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -180,7 +182,7 @@ describe('workflow arguments', () => {

it('should work with WritableStream', () => {
const stream = new WritableStream();
const serialized = dehydrateWorkflowArguments(stream, []);
const serialized = dehydrateWorkflowArguments(stream, [], mockRunId);
const uuid = serialized[2];
expect(serialized).toMatchInlineSnapshot(`
[
Expand All @@ -205,7 +207,7 @@ describe('workflow arguments', () => {

it('should work with ReadableStream', () => {
const stream = new ReadableStream();
const serialized = dehydrateWorkflowArguments(stream, []);
const serialized = dehydrateWorkflowArguments(stream, [], mockRunId);
const uuid = serialized[2];
expect(serialized).toMatchInlineSnapshot(`
[
Expand Down Expand Up @@ -233,7 +235,7 @@ describe('workflow arguments', () => {
headers.set('foo', 'bar');
headers.append('set-cookie', 'a');
headers.append('set-cookie', 'b');
const serialized = dehydrateWorkflowArguments(headers, []);
const serialized = dehydrateWorkflowArguments(headers, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand Down Expand Up @@ -281,7 +283,7 @@ describe('workflow arguments', () => {
['set-cookie', 'b'],
]),
});
const serialized = dehydrateWorkflowArguments(response, []);
const serialized = dehydrateWorkflowArguments(response, [], mockRunId);
const bodyUuid = serialized[serialized.length - 3];
expect(serialized).toMatchInlineSnapshot(`
[
Expand Down Expand Up @@ -372,7 +374,7 @@ describe('workflow arguments', () => {
it('should work with URLSearchParams', () => {
const params = new URLSearchParams('a=1&b=2&a=3');

const serialized = dehydrateWorkflowArguments(params, []);
const serialized = dehydrateWorkflowArguments(params, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -399,7 +401,7 @@ describe('workflow arguments', () => {
it('should work with empty URLSearchParams', () => {
const params = new URLSearchParams();

const serialized = dehydrateWorkflowArguments(params, []);
const serialized = dehydrateWorkflowArguments(params, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -420,7 +422,7 @@ describe('workflow arguments', () => {
it('should work with empty ArrayBuffer', () => {
const buffer = new ArrayBuffer(0);

const serialized = dehydrateWorkflowArguments(buffer, []);
const serialized = dehydrateWorkflowArguments(buffer, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -440,7 +442,7 @@ describe('workflow arguments', () => {
it('should work with empty Uint8Array', () => {
const array = new Uint8Array(0);

const serialized = dehydrateWorkflowArguments(array, []);
const serialized = dehydrateWorkflowArguments(array, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -461,7 +463,7 @@ describe('workflow arguments', () => {
it('should work with empty Int32Array', () => {
const array = new Int32Array(0);

const serialized = dehydrateWorkflowArguments(array, []);
const serialized = dehydrateWorkflowArguments(array, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand All @@ -482,7 +484,7 @@ describe('workflow arguments', () => {
it('should work with empty Float64Array', () => {
const array = new Float64Array(0);

const serialized = dehydrateWorkflowArguments(array, []);
const serialized = dehydrateWorkflowArguments(array, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand Down Expand Up @@ -517,7 +519,7 @@ describe('workflow arguments', () => {
duplex: 'half',
} as RequestInit);

const serialized = dehydrateWorkflowArguments(request, []);
const serialized = dehydrateWorkflowArguments(request, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand Down Expand Up @@ -625,7 +627,7 @@ describe('workflow arguments', () => {
const responseWritable = new WritableStream();
request[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')] = responseWritable;

const serialized = dehydrateWorkflowArguments(request, []);
const serialized = dehydrateWorkflowArguments(request, [], mockRunId);
expect(serialized).toMatchInlineSnapshot(`
[
[
Expand Down Expand Up @@ -727,7 +729,7 @@ describe('workflow arguments', () => {
class Foo {}
let err: WorkflowRuntimeError | undefined;
try {
dehydrateWorkflowArguments(new Foo(), []);
dehydrateWorkflowArguments(new Foo(), [], mockRunId);
} catch (err_) {
err = err_ as WorkflowRuntimeError;
}
Expand Down Expand Up @@ -775,7 +777,7 @@ describe('step return value', () => {
class Foo {}
let err: WorkflowRuntimeError | undefined;
try {
dehydrateStepReturnValue(new Foo(), []);
dehydrateStepReturnValue(new Foo(), [], mockRunId);
} catch (err_) {
err = err_ as WorkflowRuntimeError;
}
Expand Down
Loading
Loading