diff --git a/.changeset/purple-pianos-stare.md b/.changeset/purple-pianos-stare.md new file mode 100644 index 000000000..6a6a899c9 --- /dev/null +++ b/.changeset/purple-pianos-stare.md @@ -0,0 +1,10 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/web-shared": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Require specifying runId when writing to stream diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 5b6ce1aa6..27cfaeba3 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1,3 +1,4 @@ +import { withResolvers } from '@workflow/utils'; import { assert, describe, expect, test } from 'vitest'; import { dehydrateWorkflowArguments } from '../src/serialization'; import { cliInspectJson, isLocalDeployment } from './utils'; @@ -21,9 +22,15 @@ async function triggerWorkflow( url.searchParams.set('workflowFile', workflowFile); url.searchParams.set('workflowFn', workflowFn); + + const ops: Promise[] = []; + const { promise: runIdPromise, resolve: resolveRunId } = + withResolvers(); + const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise); + const res = await fetch(url, { method: 'POST', - body: JSON.stringify(dehydrateWorkflowArguments(args, [], globalThis)), + body: JSON.stringify(dehydratedArgs), }); if (!res.ok) { throw new Error( @@ -33,6 +40,11 @@ async function triggerWorkflow( ); } const run = await res.json(); + resolveRunId(run.runId); + + // Resolve and wait for any stream operations + await Promise.all(ops); + return run; } diff --git a/packages/core/src/observability.ts b/packages/core/src/observability.ts index feaed5f39..9422e6f1a 100644 --- a/packages/core/src/observability.ts +++ b/packages/core/src/observability.ts @@ -44,7 +44,7 @@ const streamPrintRevivers: Record any> = { }; const hydrateStepIO = < - T extends { stepId?: string; input?: any; output?: any }, + T extends { stepId?: string; input?: any; output?: any; runId?: string }, >( step: T ): T => { @@ -52,7 +52,13 @@ const hydrateStepIO = < ...step, input: step.input && Array.isArray(step.input) && step.input.length - ? hydrateStepArguments(step.input, [], globalThis, streamPrintRevivers) + ? hydrateStepArguments( + step.input, + [], + step.runId as string, + globalThis, + streamPrintRevivers + ) : step.input, output: step.output ? hydrateStepReturnValue(step.output, globalThis, streamPrintRevivers) @@ -79,6 +85,7 @@ const hydrateWorkflowIO = < ? hydrateWorkflowReturnValue( workflow.output, [], + workflow.runId as string, globalThis, streamPrintRevivers ) @@ -86,7 +93,9 @@ const hydrateWorkflowIO = < }; }; -const hydrateEventData = ( +const hydrateEventData = < + T extends { eventId?: string; eventData?: any; runId?: string }, +>( event: T ): T => { if (!event.eventData) { @@ -99,7 +108,13 @@ const hydrateEventData = ( eventData: Object.fromEntries( Object.entries(event.eventData).map(([key, value]) => [ key, - hydrateStepArguments(value as any, [], globalThis), + hydrateStepArguments( + value as any, + [], + event.runId as string, + globalThis, + streamPrintRevivers + ), ]) ), }; @@ -110,9 +125,16 @@ const hydrateHookMetadata = ( ): T => { return { ...hook, - metadata: hook.metadata - ? hydrateStepArguments(hook.metadata, [], globalThis) - : hook.metadata, + metadata: + hook.metadata && 'runId' in hook + ? hydrateStepArguments( + hook.metadata, + [], + hook.runId as string, + globalThis, + streamPrintRevivers + ) + : hook.metadata, }; }; diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 069accbb3..3a7c66fff 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -180,7 +180,7 @@ export class Run { ): ReadableStream { const { ops = [], global = globalThis, startIndex, namespace } = options; const name = getWorkflowRunStreamId(this.runId, namespace); - return getExternalRevivers(global, ops).ReadableStream({ + return getExternalRevivers(global, ops, this.runId).ReadableStream({ name, startIndex, }) as ReadableStream; @@ -197,7 +197,7 @@ export class Run { const run = await this.world.runs.get(this.runId); if (run.status === 'completed') { - return hydrateWorkflowReturnValue(run.output, [], globalThis); + return hydrateWorkflowReturnValue(run.output, [], this.runId); } if (run.status === 'cancelled') { @@ -671,7 +671,7 @@ export const stepEntrypoint = } // Hydrate the step input arguments const ops: Promise[] = []; - const args = hydrateStepArguments(step.input, ops); + const args = hydrateStepArguments(step.input, ops, workflowRunId); span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), @@ -698,7 +698,7 @@ export const stepEntrypoint = () => stepFn(...args) ); - result = dehydrateStepReturnValue(result, ops); + result = dehydrateStepReturnValue(result, ops, workflowRunId); waitUntil(Promise.all(ops)); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 23252d923..6659ceee1 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -23,7 +23,7 @@ export async function getHookByToken(token: string): Promise { const world = getWorld(); const hook = await world.hooks.getByToken(token); if (typeof hook.metadata !== 'undefined') { - hook.metadata = hydrateStepArguments(hook.metadata as any, [], globalThis); + hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId); } return hook; } @@ -79,7 +79,7 @@ export async function resumeHook( const dehydratedPayload = dehydrateStepReturnValue( payload, ops, - globalThis + hook.runId ); waitUntil(Promise.all(ops)); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index d144d2221..f57a2dc08 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,5 +1,6 @@ import { waitUntil } from '@vercel/functions'; import { WorkflowRuntimeError } from '@workflow/errors'; +import { withResolvers } from '@workflow/utils'; import { Run } from '../runtime.js'; import type { Serializable, WorkflowInvokePayload } from '../schemas.js'; import { dehydrateWorkflowArguments } from '../serialization.js'; @@ -87,7 +88,14 @@ export async function start( const world = getWorld(); const deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); const ops: Promise[] = []; - const workflowArguments = dehydrateWorkflowArguments(args, ops); + const { promise: runIdPromise, resolve: resolveRunId } = + withResolvers(); + + const workflowArguments = dehydrateWorkflowArguments( + args, + ops, + runIdPromise + ); // Serialize current trace context to propagate across queue boundary const traceCarrier = await serializeTraceCarrier(); @@ -97,6 +105,9 @@ export async function start( input: workflowArguments, executionContext: { traceCarrier }, }); + + resolveRunId(runResponse.runId); + waitUntil(Promise.all(ops)); span?.setAttributes({ diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 23b311c65..36f11b479 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -15,6 +15,8 @@ import { import { STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; +const mockRunId = 'wrun_mockidnumber0001'; + describe('getStreamType', () => { it('should return `undefined` for a regular stream', () => { const stream = new ReadableStream(); @@ -41,7 +43,7 @@ describe('workflow arguments', () => { it('should work with Date', () => { const date = new Date('2025-07-17T04:30:34.824Z'); - const serialized = dehydrateWorkflowArguments(date, []); + const serialized = dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -61,7 +63,7 @@ describe('workflow arguments', () => { it('should work with invalid Date', () => { const date = new Date('asdf'); - const serialized = dehydrateWorkflowArguments(date, []); + const serialized = dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -81,7 +83,7 @@ describe('workflow arguments', () => { it('should work with BigInt', () => { const bigInt = BigInt('9007199254740992'); - const serialized = dehydrateWorkflowArguments(bigInt, []); + const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -99,7 +101,7 @@ describe('workflow arguments', () => { it('should work with BigInt negative', () => { const bigInt = BigInt('-12345678901234567890'); - const serialized = dehydrateWorkflowArguments(bigInt, []); + const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -120,7 +122,7 @@ describe('workflow arguments', () => { [2, 'foo'], [6, 'bar'], ]); - const serialized = dehydrateWorkflowArguments(map, []); + const serialized = dehydrateWorkflowArguments(map, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -154,7 +156,7 @@ describe('workflow arguments', () => { it('should work with Set', () => { const set = new Set([1, '2', true]); - const serialized = dehydrateWorkflowArguments(set, []); + const serialized = dehydrateWorkflowArguments(set, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -180,7 +182,7 @@ describe('workflow arguments', () => { it('should work with WritableStream', () => { const stream = new WritableStream(); - const serialized = dehydrateWorkflowArguments(stream, []); + const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); const uuid = serialized[2]; expect(serialized).toMatchInlineSnapshot(` [ @@ -205,7 +207,7 @@ describe('workflow arguments', () => { it('should work with ReadableStream', () => { const stream = new ReadableStream(); - const serialized = dehydrateWorkflowArguments(stream, []); + const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); const uuid = serialized[2]; expect(serialized).toMatchInlineSnapshot(` [ @@ -233,7 +235,7 @@ describe('workflow arguments', () => { headers.set('foo', 'bar'); headers.append('set-cookie', 'a'); headers.append('set-cookie', 'b'); - const serialized = dehydrateWorkflowArguments(headers, []); + const serialized = dehydrateWorkflowArguments(headers, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -281,7 +283,7 @@ describe('workflow arguments', () => { ['set-cookie', 'b'], ]), }); - const serialized = dehydrateWorkflowArguments(response, []); + const serialized = dehydrateWorkflowArguments(response, [], mockRunId); const bodyUuid = serialized[serialized.length - 3]; expect(serialized).toMatchInlineSnapshot(` [ @@ -372,7 +374,7 @@ describe('workflow arguments', () => { it('should work with URLSearchParams', () => { const params = new URLSearchParams('a=1&b=2&a=3'); - const serialized = dehydrateWorkflowArguments(params, []); + const serialized = dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -399,7 +401,7 @@ describe('workflow arguments', () => { it('should work with empty URLSearchParams', () => { const params = new URLSearchParams(); - const serialized = dehydrateWorkflowArguments(params, []); + const serialized = dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -420,7 +422,7 @@ describe('workflow arguments', () => { it('should work with empty ArrayBuffer', () => { const buffer = new ArrayBuffer(0); - const serialized = dehydrateWorkflowArguments(buffer, []); + const serialized = dehydrateWorkflowArguments(buffer, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -440,7 +442,7 @@ describe('workflow arguments', () => { it('should work with empty Uint8Array', () => { const array = new Uint8Array(0); - const serialized = dehydrateWorkflowArguments(array, []); + const serialized = dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -461,7 +463,7 @@ describe('workflow arguments', () => { it('should work with empty Int32Array', () => { const array = new Int32Array(0); - const serialized = dehydrateWorkflowArguments(array, []); + const serialized = dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -482,7 +484,7 @@ describe('workflow arguments', () => { it('should work with empty Float64Array', () => { const array = new Float64Array(0); - const serialized = dehydrateWorkflowArguments(array, []); + const serialized = dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -517,7 +519,7 @@ describe('workflow arguments', () => { duplex: 'half', } as RequestInit); - const serialized = dehydrateWorkflowArguments(request, []); + const serialized = dehydrateWorkflowArguments(request, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -625,7 +627,7 @@ describe('workflow arguments', () => { const responseWritable = new WritableStream(); request[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')] = responseWritable; - const serialized = dehydrateWorkflowArguments(request, []); + const serialized = dehydrateWorkflowArguments(request, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` [ [ @@ -727,7 +729,7 @@ describe('workflow arguments', () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateWorkflowArguments(new Foo(), []); + dehydrateWorkflowArguments(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -775,7 +777,7 @@ describe('step return value', () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateStepReturnValue(new Foo(), []); + dehydrateStepReturnValue(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index d5e32a346..57fa8d9ae 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -114,17 +114,28 @@ export class WorkflowServerReadableStream extends ReadableStream { } export class WorkflowServerWritableStream extends WritableStream { - constructor(name: string) { + constructor(name: string, runId: string | Promise) { + // runId can be a promise, because we need a runID to write to a stream, + // but at class instantiation time, we might not have a run ID yet. This + // mainly happens when calling start() for a workflow with already-serialized + // arguments. + if (typeof runId !== 'string' && !(runId instanceof Promise)) { + throw new Error( + `"runId" must be a string or a promise that resolves to a string, got "${typeof runId}"` + ); + } if (typeof name !== 'string' || name.length === 0) { throw new Error(`"name" is required, got "${name}"`); } const world = getWorld(); super({ async write(chunk) { - await world.writeToStream(name, chunk); + const _runId = await runId; + await world.writeToStream(name, _runId, chunk); }, async close() { - await world.closeStream(name); + const _runId = await runId; + await world.closeStream(name, _runId); }, }); } @@ -312,7 +323,8 @@ function getCommonReducers(global: Record = globalThis) { */ export function getExternalReducers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId: string | Promise ): Reducers { return { ...getCommonReducers(global), @@ -328,13 +340,15 @@ export function getExternalReducers( const name = global.crypto.randomUUID(); const type = getStreamType(value); - const writable = new WorkflowServerWritableStream(name); + const writable = new WorkflowServerWritableStream(name, runId); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { ops.push( value - .pipeThrough(getSerializeStream(getExternalReducers(global, ops))) + .pipeThrough( + getSerializeStream(getExternalReducers(global, ops, runId)) + ) .pipeTo(writable) ); } @@ -346,12 +360,12 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; + const name = global.crypto.randomUUID(); - ops.push( - new WorkflowServerReadableStream(name) - .pipeThrough(getDeserializeStream(getExternalRevivers(global, ops))) - .pipeTo(value) - ); + + const readable = new WorkflowServerReadableStream(name); + ops.push(readable.pipeTo(value)); + return { name }; }, }; @@ -409,11 +423,13 @@ export function getWorkflowReducers( * * @param global * @param ops + * @param runId * @returns */ function getStepReducers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId: string | Promise ): Reducers { return { ...getCommonReducers(global), @@ -433,16 +449,24 @@ function getStepReducers( let type = value[STREAM_TYPE_SYMBOL]; if (!name) { + if (!runId) { + throw new Error( + 'ReadableStream cannot be serialized without a valid runId' + ); + } + name = global.crypto.randomUUID(); type = getStreamType(value); - const writable = new WorkflowServerWritableStream(name); + const writable = new WorkflowServerWritableStream(name, runId); if (type === 'bytes') { ops.push(value.pipeTo(writable)); } else { ops.push( value - .pipeThrough(getSerializeStream(getStepReducers(global, ops))) + .pipeThrough( + getSerializeStream(getStepReducers(global, ops, runId)) + ) .pipeTo(writable) ); } @@ -458,10 +482,18 @@ function getStepReducers( let name = value[STREAM_NAME_SYMBOL]; if (!name) { + if (!runId) { + throw new Error( + 'WritableStream cannot be serialized without a valid runId' + ); + } + name = global.crypto.randomUUID(); ops.push( new WorkflowServerReadableStream(name) - .pipeThrough(getDeserializeStream(getStepRevivers(global, ops))) + .pipeThrough( + getDeserializeStream(getStepRevivers(global, ops, runId)) + ) .pipeTo(value) ); } @@ -560,10 +592,12 @@ export function getCommonRevivers(global: Record = globalThis) { * * @param global * @param ops + * @param runId */ export function getExternalRevivers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId: string | Promise ): Revivers { return { ...getCommonRevivers(global), @@ -603,16 +637,20 @@ export function getExternalRevivers( return readable; } else { const transform = getDeserializeStream( - getExternalRevivers(global, ops) + getExternalRevivers(global, ops, runId) ); ops.push(readable.pipeTo(transform.writable)); return transform.readable; } }, WritableStream: (value) => { - const serialize = getSerializeStream(getExternalReducers(global, ops)); + const serialize = getSerializeStream( + getExternalReducers(global, ops, runId) + ); ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(value.name)) + serialize.readable.pipeTo( + new WorkflowServerWritableStream(value.name, runId) + ) ); return serialize.writable; }, @@ -691,11 +729,13 @@ export function getWorkflowRevivers( * * @param global * @param ops + * @param runId * @returns */ function getStepRevivers( global: Record = globalThis, - ops: Promise[] + ops: Promise[], + runId: string | Promise ): Revivers { return { ...getCommonRevivers(global), @@ -740,15 +780,25 @@ function getStepRevivers( if (value.type === 'bytes') { return readable; } else { - const transform = getDeserializeStream(getStepRevivers(global, ops)); + const transform = getDeserializeStream( + getStepRevivers(global, ops, runId) + ); ops.push(readable.pipeTo(transform.writable)); return transform.readable; } }, WritableStream: (value) => { - const serialize = getSerializeStream(getStepReducers(global, ops)); + if (!runId) { + throw new Error( + 'WritableStream cannot be revived without a valid runId' + ); + } + + const serialize = getSerializeStream(getStepReducers(global, ops, runId)); ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(value.name)) + serialize.readable.pipeTo( + new WorkflowServerWritableStream(value.name, runId) + ) ); return serialize.writable; }, @@ -762,15 +812,20 @@ function getStepRevivers( * * @param value * @param global + * @param runId * @returns The dehydrated value, ready to be inserted into the database */ export function dehydrateWorkflowArguments( value: unknown, - ops: Promise[], + ops: Promise[], + runId: string | Promise, global: Record = globalThis ) { try { - const str = devalue.stringify(value, getExternalReducers(global, ops)); + const str = devalue.stringify( + value, + getExternalReducers(global, ops, runId) + ); return revive(str); } catch (error) { throw new WorkflowRuntimeError( @@ -830,17 +885,21 @@ export function dehydrateWorkflowReturnValue( * return value of a completed workflow run. * * @param value + * @param ops * @param global + * @param extraRevivers + * @param runId * @returns The hydrated return value, ready to be consumed by the client */ export function hydrateWorkflowReturnValue( value: Parameters[0], - ops: Promise[], + ops: Promise[], + runId: string | Promise, global: Record = globalThis, extraRevivers: Record any> = {} ) { const obj = devalue.unflatten(value, { - ...getExternalRevivers(global, ops), + ...getExternalRevivers(global, ops, runId), ...extraRevivers, }); return obj; @@ -875,17 +934,21 @@ export function dehydrateStepArguments( * from the database at the start of the step execution. * * @param value + * @param ops * @param global + * @param extraRevivers + * @param runId * @returns The hydrated value, ready to be consumed by the step user-code function */ export function hydrateStepArguments( value: Parameters[0], ops: Promise[], + runId: string | Promise, global: Record = globalThis, extraRevivers: Record any> = {} ) { const obj = devalue.unflatten(value, { - ...getStepRevivers(global, ops), + ...getStepRevivers(global, ops, runId), ...extraRevivers, }); return obj; @@ -897,16 +960,19 @@ export function hydrateStepArguments( * into a format that can be saved to the database. * * @param value + * @param ops * @param global + * @param runId * @returns The dehydrated value, ready to be inserted into the database */ export function dehydrateStepReturnValue( value: unknown, ops: Promise[], + runId: string | Promise, global: Record = globalThis ) { try { - const str = devalue.stringify(value, getStepReducers(global, ops)); + const str = devalue.stringify(value, getStepReducers(global, ops, runId)); return revive(str); } catch (error) { throw new WorkflowRuntimeError( @@ -922,6 +988,8 @@ export function dehydrateStepReturnValue( * * @param value * @param global + * @param extraRevivers + * @param runId * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ export function hydrateStepReturnValue( diff --git a/packages/core/src/step/context-storage.ts b/packages/core/src/step/context-storage.ts index 681fcbe41..4749f2ff1 100644 --- a/packages/core/src/step/context-storage.ts +++ b/packages/core/src/step/context-storage.ts @@ -5,5 +5,5 @@ import type { StepMetadata } from './get-step-metadata.js'; export const contextStorage = /* @__PURE__ */ new AsyncLocalStorage<{ stepMetadata: StepMetadata; workflowMetadata: WorkflowMetadata; - ops: Promise[]; + ops: Promise[]; }>(); diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index c51e0c0dc..3dc80b68c 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -1,7 +1,7 @@ import { - WorkflowServerWritableStream, - getSerializeStream, getExternalReducers, + getSerializeStream, + WorkflowServerWritableStream, } from '../serialization.js'; import { getWorkflowRunStreamId } from '../util.js'; import type { WorkflowWritableStreamOptions } from '../writable-stream.js'; @@ -30,20 +30,18 @@ export function getWritable( } const { namespace } = options; - const name = getWorkflowRunStreamId( - ctx.workflowMetadata.workflowRunId, - namespace - ); + const runId = ctx.workflowMetadata.workflowRunId; + const name = getWorkflowRunStreamId(runId, namespace); // Create a transform stream that serializes chunks and pipes to the workflow server const serialize = getSerializeStream( - getExternalReducers(globalThis, ctx.ops) + getExternalReducers(globalThis, ctx.ops, runId) ); // Pipe the serialized data to the workflow server stream // Register this async operation with the runtime's ops array so it's awaited via waitUntil ctx.ops.push( - serialize.readable.pipeTo(new WorkflowServerWritableStream(name)) + serialize.readable.pipeTo(new WorkflowServerWritableStream(name, runId)) ); // Return the writable side of the transform stream diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index a5bbf82f4..0ad2e1d86 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -9,6 +9,8 @@ import { serializeChunk, } from './streamer.js'; +const TEST_RUN_ID = 'wrun_test12345678901234'; + describe('streamer', () => { describe('serializeChunk and deserializeChunk', () => { it('should serialize and deserialize non-EOF chunks correctly', () => { @@ -100,9 +102,8 @@ describe('streamer', () => { const chunk = deserializeChunk( await fs.readFile(`${testDir}/streams/chunks/${file}`) ); - const time = decodeTime( - String(file.split('-').at(-1)).split('.')[0] - ); + const stream_id = String(file.split('-').at(-1)).split('.')[0]; + const time = decodeTime(stream_id); const timeDiff = time - lastTime; lastTime = time; @@ -131,8 +132,8 @@ describe('streamer', () => { const { testDir, streamer } = await setupStreamer(); const streamName = 'test-stream'; - await streamer.writeToStream(streamName, 'hello'); - await streamer.writeToStream(streamName, ' world'); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'hello'); + await streamer.writeToStream(streamName, TEST_RUN_ID, ' world'); // Verify chunks directory was created const chunksDir = path.join(testDir, 'streams', 'chunks'); @@ -149,8 +150,8 @@ describe('streamer', () => { const buffer1 = Buffer.from('chunk1'); const buffer2 = Buffer.from('chunk2'); - await streamer.writeToStream(streamName, buffer1); - await streamer.writeToStream(streamName, buffer2); + await streamer.writeToStream(streamName, TEST_RUN_ID, buffer1); + await streamer.writeToStream(streamName, TEST_RUN_ID, buffer2); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -164,7 +165,7 @@ describe('streamer', () => { const streamName = 'uint8-stream'; const uint8Array = new Uint8Array([1, 2, 3, 4]); - await streamer.writeToStream(streamName, uint8Array); + await streamer.writeToStream(streamName, TEST_RUN_ID, uint8Array); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -176,9 +177,9 @@ describe('streamer', () => { it('should handle multiple streams independently', async () => { const { testDir, streamer } = await setupStreamer(); - await streamer.writeToStream('stream1', 'data1'); - await streamer.writeToStream('stream2', 'data2'); - await streamer.writeToStream('stream1', 'data3'); + await streamer.writeToStream('stream1', TEST_RUN_ID, 'data1'); + await streamer.writeToStream('stream2', TEST_RUN_ID, 'data2'); + await streamer.writeToStream('stream1', TEST_RUN_ID, 'data3'); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -194,10 +195,9 @@ describe('streamer', () => { describe('closeStream', () => { it('should close an empty stream', async () => { const { testDir, streamer } = await setupStreamer(); - const streamName = 'empty-stream'; - await streamer.closeStream(streamName); + await streamer.closeStream(streamName, TEST_RUN_ID); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -208,12 +208,11 @@ describe('streamer', () => { it('should close a stream with existing chunks', async () => { const { testDir, streamer } = await setupStreamer(); - const streamName = 'existing-stream'; - await streamer.writeToStream(streamName, 'chunk1'); - await streamer.writeToStream(streamName, 'chunk2'); - await streamer.closeStream(streamName); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk1'); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'chunk2'); + await streamer.closeStream(streamName, TEST_RUN_ID); const chunksDir = path.join(testDir, 'streams', 'chunks'); const files = await fs.readdir(chunksDir); @@ -225,16 +224,15 @@ describe('streamer', () => { describe('readFromStream', () => { it('should read chunks from a completed stream', async () => { const { streamer } = await setupStreamer(); - const streamName = 'read-stream'; const chunk1 = 'hello '; const chunk2 = 'world'; - await streamer.writeToStream(streamName, chunk1); + await streamer.writeToStream(streamName, TEST_RUN_ID, chunk1); // Add a small delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, chunk2); - await streamer.closeStream(streamName); + await streamer.writeToStream(streamName, TEST_RUN_ID, chunk2); + await streamer.closeStream(streamName, TEST_RUN_ID); const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); @@ -256,16 +254,15 @@ describe('streamer', () => { it('should read binary data correctly', async () => { const { streamer } = await setupStreamer(); - const streamName = 'binary-stream'; const binaryData1 = new Uint8Array([1, 2, 3]); const binaryData2 = new Uint8Array([4, 5, 6]); - await streamer.writeToStream(streamName, binaryData1); + await streamer.writeToStream(streamName, TEST_RUN_ID, binaryData1); // Add delay to ensure different ULID timestamps await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, binaryData2); - await streamer.closeStream(streamName); + await streamer.writeToStream(streamName, TEST_RUN_ID, binaryData2); + await streamer.closeStream(streamName, TEST_RUN_ID); const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); @@ -295,16 +292,15 @@ describe('streamer', () => { it('should preserve chunk order based on ULID timestamps', async () => { const { streamer } = await setupStreamer(); - const streamName = 'ordered-stream'; // Write chunks with small delays to ensure different ULID timestamps - await streamer.writeToStream(streamName, '1'); + await streamer.writeToStream(streamName, TEST_RUN_ID, '1'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, '2'); + await streamer.writeToStream(streamName, TEST_RUN_ID, '2'); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, '3'); - await streamer.closeStream(streamName); + await streamer.writeToStream(streamName, TEST_RUN_ID, '3'); + await streamer.closeStream(streamName, TEST_RUN_ID); const stream = await streamer.readFromStream(streamName); const reader = stream.getReader(); @@ -327,18 +323,17 @@ describe('streamer', () => { describe('integration scenarios', () => { it('should handle complete write-close-read cycle', async () => { const { streamer } = await setupStreamer(); - const streamName = 'integration-stream'; // Write chunks with proper timing - await streamer.writeToStream(streamName, 'start '); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'start '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, 'middle '); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'middle '); await new Promise((resolve) => setTimeout(resolve, 2)); - await streamer.writeToStream(streamName, 'end'); + await streamer.writeToStream(streamName, TEST_RUN_ID, 'end'); // Close the stream - await streamer.closeStream(streamName); + await streamer.closeStream(streamName, TEST_RUN_ID); // Read complete stream const completeStream = await streamer.readFromStream(streamName); @@ -365,16 +360,16 @@ describe('streamer', () => { const streamName = `race-${iteration}`; // Write a few chunks to disk first - await streamer.writeToStream(streamName, '0\n'); - await streamer.writeToStream(streamName, '1\n'); + await streamer.writeToStream(streamName, TEST_RUN_ID, '0\n'); + await streamer.writeToStream(streamName, TEST_RUN_ID, '1\n'); // Start writing chunks in background IMMEDIATELY before reading const writeTask = (async () => { for (let i = 2; i < 10; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(streamName, TEST_RUN_ID, `${i}\n`); // No delay - fire them off as fast as possible to hit the race window } - await streamer.closeStream(streamName); + await streamer.closeStream(streamName, TEST_RUN_ID); })(); // Start reading - this triggers start() which should set up listeners @@ -430,7 +425,7 @@ describe('streamer', () => { // Write chunks 0-4 to disk for (let i = 0; i < 5; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(streamName, TEST_RUN_ID, `${i}\n`); await new Promise((resolve) => setTimeout(resolve, 2)); } @@ -452,16 +447,98 @@ describe('streamer', () => { // Immediately write more chunks (5-9) while disk reading might be in progress for (let i = 5; i < 10; i++) { - await streamer.writeToStream(streamName, `${i}\n`); + await streamer.writeToStream(streamName, TEST_RUN_ID, `${i}\n`); } - await streamer.closeStream(streamName); + await streamer.closeStream(streamName, TEST_RUN_ID); await readPromise; // Verify chunks are in exact chronological order (not just all present) const content = chunks.join(''); expect(content).toBe('0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'); }); + + it('should handle runId as a promise and flush correctly when promise resolves', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'promise-runid-test'; + + // Create a promise that we'll resolve later + let resolveRunId: (value: string) => void = () => {}; + const runIdPromise = new Promise((resolve) => { + resolveRunId = resolve; + }); + + // Write chunks with the promise (before it's resolved) + const writePromise1 = streamer.writeToStream( + streamName, + runIdPromise, + 'chunk1\n' + ); + const writePromise2 = streamer.writeToStream( + streamName, + runIdPromise, + 'chunk2\n' + ); + + // Verify that writes are pending (not yet flushed) + let writes1Complete = false; + let writes2Complete = false; + writePromise1.then(() => { + writes1Complete = true; + }); + writePromise2.then(() => { + writes2Complete = true; + }); + + // Give a small delay to ensure writes are initiated but blocked + await new Promise((resolve) => setTimeout(resolve, 10)); + + // At this point, writes should be pending + expect(writes1Complete).toBe(false); + expect(writes2Complete).toBe(false); + + // Now resolve the runId promise + resolveRunId(TEST_RUN_ID); + + // Wait for writes to complete + await writePromise1; + await writePromise2; + + expect(writes1Complete).toBe(true); + expect(writes2Complete).toBe(true); + + // Close the stream with another promise + let resolveCloseRunId: (value: string) => void = () => {}; + const closeRunIdPromise = new Promise((resolve) => { + resolveCloseRunId = resolve; + }); + + const closePromise = streamer.closeStream( + streamName, + closeRunIdPromise + ); + + // Resolve the close promise + resolveCloseRunId(TEST_RUN_ID); + await closePromise; + + // Now read and verify all chunks were written correctly + const stream = await streamer.readFromStream(streamName); + const reader = stream.getReader(); + const chunks: string[] = []; + + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + if (result.value) { + chunks.push(Buffer.from(result.value).toString()); + } + } + + const content = chunks.join(''); + expect(content).toBe('chunk1\nchunk2\n'); + }); }); }); }); diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index df234d553..9667b6afe 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -48,7 +48,14 @@ export function createStreamer(basedir: string): Streamer { }>(); return { - async writeToStream(name, chunk) { + async writeToStream( + name: string, + _runId: string | Promise, + chunk: string | Uint8Array + ) { + // Await runId if it's a promise to ensure proper flushing + await _runId; + const chunkId = `strm_${monotonicUlid()}`; if (typeof chunk === 'string') { @@ -83,7 +90,10 @@ export function createStreamer(basedir: string): Streamer { }); }, - async closeStream(name) { + async closeStream(name: string, _runId: string | Promise) { + // Await runId if it's a promise to ensure proper flushing + await _runId; + const chunkId = `strm_${monotonicUlid()}`; const chunkPath = path.join( basedir, @@ -100,7 +110,7 @@ export function createStreamer(basedir: string): Streamer { streamEmitter.emit(`close:${name}` as const, { streamName: name }); }, - async readFromStream(name, startIndex = 0) { + async readFromStream(name: string, startIndex = 0) { const chunksDir = path.join(basedir, 'streams', 'chunks'); let removeListeners = () => {}; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 36696f4f2..957df885d 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -88,7 +88,14 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { }); return { - async writeToStream(name, chunk) { + async writeToStream( + name: string, + _runId: string | Promise, + chunk: string | Uint8Array + ) { + // Await runId if it's a promise to ensure proper flushing + await _runId; + const chunkId = genChunkId(); await drizzle.insert(streams).values({ chunkId, @@ -106,7 +113,13 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ) ); }, - async closeStream(name: string): Promise { + async closeStream( + name: string, + _runId: string | Promise + ): Promise { + // Await runId if it's a promise to ensure proper flushing + await _runId; + const chunkId = genChunkId(); await drizzle.insert(streams).values({ chunkId, diff --git a/packages/world-testing/src/idempotency.mts b/packages/world-testing/src/idempotency.mts index 219996bb4..97219b33c 100644 --- a/packages/world-testing/src/idempotency.mts +++ b/packages/world-testing/src/idempotency.mts @@ -21,7 +21,7 @@ export function idempotency(world: string) { } ); - const output = await hydrateWorkflowReturnValue(run.output, [], globalThis); + const output = await hydrateWorkflowReturnValue(run.output, [], run.runId); expect(output).toEqual({ numbers: Array.from({ length: 110 }, () => expect.any(Number)), diff --git a/packages/world-vercel/src/runs.ts b/packages/world-vercel/src/runs.ts index 673079ec0..1a2647cac 100644 --- a/packages/world-vercel/src/runs.ts +++ b/packages/world-vercel/src/runs.ts @@ -50,6 +50,8 @@ const WorkflowRunWireWithRefsSchema = WorkflowRunWireBaseSchema.omit({ outputRef: z.any().optional(), input: z.array(z.any()).optional(), output: z.any().optional(), + blobStorageBytes: z.number().optional(), + streamStorageBytes: z.number().optional(), }); // Helper to filter run data based on resolveData setting diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index eae1fac89..bf8fe05d3 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -1,15 +1,31 @@ import type { Streamer } from '@workflow/world'; import { type APIConfig, getHttpConfig, type HttpConfig } from './utils.js'; -function getStreamUrl(name: string, httpConfig: HttpConfig) { +function getStreamUrl( + name: string, + runId: string | undefined, + httpConfig: HttpConfig +) { + if (runId) { + return new URL( + `${httpConfig.baseUrl}/v1/runs/${runId}/stream/${encodeURIComponent(name)}` + ); + } return new URL(`${httpConfig.baseUrl}/v1/stream/${encodeURIComponent(name)}`); } export function createStreamer(config?: APIConfig): Streamer { return { - async writeToStream(name, chunk) { + async writeToStream( + name: string, + runId: string | Promise, + chunk: string | Uint8Array + ) { + // Await runId if it's a promise to ensure proper flushing + const resolvedRunId = await runId; + const httpConfig = await getHttpConfig(config); - await fetch(getStreamUrl(name, httpConfig), { + await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', body: chunk, headers: httpConfig.headers, @@ -17,18 +33,21 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, - async closeStream(name) { + async closeStream(name: string, runId: string | Promise) { + // Await runId if it's a promise to ensure proper flushing + const resolvedRunId = await runId; + const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); - await fetch(getStreamUrl(name, httpConfig), { + await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { method: 'PUT', headers: httpConfig.headers, }); }, - async readFromStream(name, startIndex) { + async readFromStream(name: string, startIndex?: number) { const httpConfig = await getHttpConfig(config); - const url = getStreamUrl(name, httpConfig); + const url = getStreamUrl(name, undefined, httpConfig); if (typeof startIndex === 'number') { url.searchParams.set('startIndex', String(startIndex)); } diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 29f1827c7..1682d9df7 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -32,8 +32,12 @@ import type { } from './steps.js'; export interface Streamer { - writeToStream(name: string, chunk: string | Uint8Array): Promise; - closeStream(name: string): Promise; + writeToStream( + name: string, + runId: string | Promise, + chunk: string | Uint8Array + ): Promise; + closeStream(name: string, runId: string | Promise): Promise; readFromStream( name: string, startIndex?: number