Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
50eaa28
Support passing step function references across serialization layer
TooTallNate Nov 11, 2025
7b8fbeb
tests
TooTallNate Nov 11, 2025
476147a
.
TooTallNate Nov 11, 2025
530e477
.
TooTallNate Nov 11, 2025
c4cd86b
Use nate's devalue fork
TooTallNate Nov 12, 2025
7c4e850
Fix build
TooTallNate Nov 12, 2025
52d1206
Changeset
TooTallNate Nov 12, 2025
aabc1a1
Merge branch 'main' of github.com:vercel/workflow into update/seriali…
TooTallNate Nov 12, 2025
42f7adc
.
TooTallNate Nov 12, 2025
e7a491d
Update packages/core/src/serialization.ts
TooTallNate Nov 12, 2025
3ea314d
Merge branch 'main' of github.com:vercel/workflow into update/seriali…
TooTallNate Nov 12, 2025
10b77c4
Use latest devalue
TooTallNate Nov 12, 2025
1480889
Isolate test
TooTallNate Nov 12, 2025
0da9cc2
Merge branch 'main' of github.com:vercel/workflow into update/seriali…
TooTallNate Nov 12, 2025
c60de66
Add step name transform to arrow function steps
TooTallNate Nov 12, 2025
57e4899
.
TooTallNate Nov 12, 2025
921ae65
update tsconfig
ijjk Nov 12, 2025
344439c
update
ijjk Nov 12, 2025
de91783
update
ijjk Nov 12, 2025
c655f2b
fix
ijjk Nov 12, 2025
a5f85e4
fix
ijjk Nov 12, 2025
a16ee81
debug stuff
ijjk Nov 12, 2025
8a91661
logs
ijjk Nov 12, 2025
c3674cd
Merge branch 'main' of github.com:vercel/workflow into update/seriali…
TooTallNate Nov 12, 2025
881a9e3
Merge branch 'update/serialize-step-function-reference' of github.com…
TooTallNate Nov 12, 2025
edf679a
.
TooTallNate Nov 12, 2025
456fa95
peter
TooTallNate Nov 13, 2025
35384f6
Fix test
TooTallNate Nov 13, 2025
bc59732
Merge branch 'main' of github.com:vercel/workflow into update/seriali…
TooTallNate Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tidy-states-see.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/swc-plugin": patch
"@workflow/core": patch
---

Support serializing step function references
32 changes: 32 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
workflowFile: 'workflows/98_duplicate_case.ts',
workflowFn: 'addTenWorkflow',
},
])('addTenWorkflow', { timeout: 60_000 }, async (workflow) => {

Check failure on line 79 in packages/core/e2e/e2e.test.ts

View workflow job for this annotation

GitHub Actions / E2E Local Dev Tests (nuxt - stable)

packages/core/e2e/e2e.test.ts > e2e > addTenWorkflow

Error: Test timed out in 60000ms. If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout". ❯ packages/core/e2e/e2e.test.ts:79:4
const run = await triggerWorkflow(workflow, [123]);
const returnValue = await getWorkflowReturnValue(run.runId);
expect(returnValue).toBe(133);
Expand Down Expand Up @@ -687,4 +687,36 @@
expect(run2Data.status).toBe('completed');
}
);

test(
'stepFunctionPassingWorkflow - step function references can be passed as arguments',
{ timeout: 60_000 },
async () => {
// This workflow passes a step function reference to another step
// The receiving step calls the passed function and returns the result
const run = await triggerWorkflow('stepFunctionPassingWorkflow', []);
const returnValue = await getWorkflowReturnValue(run.runId);

// doubleNumber(10) = 20, then multiply by 2 = 40
expect(returnValue).toBe(40);

// Verify the run completed successfully
const { json: runData } = await cliInspectJson(
`runs ${run.runId} --withData`
);
expect(runData.status).toBe('completed');
expect(runData.output).toBe(40);

// Verify that exactly 2 steps were executed:
// 1. stepWithStepFunctionArg(doubleNumber)
// (doubleNumber(10) is run inside the stepWithStepFunctionArg step)
const { json: eventsData } = await cliInspectJson(
`events --run ${run.runId} --json`
);
const stepCompletedEvents = eventsData.filter(
(event) => event.eventType === 'step_completed'
);
expect(stepCompletedEvents).toHaveLength(1);
}
);
});
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"@workflow/world-local": "workspace:*",
"@workflow/world-vercel": "workspace:*",
"debug": "4.4.3",
"devalue": "5.4.1",
"devalue": "5.5.0",
"ms": "2.1.3",
"nanoid": "5.1.6",
"seedrandom": "3.0.5",
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime/world.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { createRequire } from 'node:module';
import Path from 'node:path';
import { join } from 'node:path';
import type { World } from '@workflow/world';
import { createEmbeddedWorld } from '@workflow/world-local';
import { createVercelWorld } from '@workflow/world-vercel';

const require = createRequire(Path.join(process.cwd(), 'index.js'));
const require = createRequire(join(process.cwd(), 'index.js'));

let worldCache: World | undefined;
let stubbedWorldCache: World | undefined;
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ export type Serializable =
| Uint8ClampedArray
| Uint16Array
| Uint32Array
| WritableStream<Uint8Array>;
| WritableStream<Uint8Array>
| ((...args: Serializable[]) => Promise<Serializable>); // Step function
132 changes: 131 additions & 1 deletion packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { runInContext } from 'node:vm';
import type { WorkflowRuntimeError } from '@workflow/errors';
import { describe, expect, it } from 'vitest';
import { getStepFunction, registerStepFunction } from './private.js';
import {
dehydrateStepArguments,
dehydrateStepReturnValue,
dehydrateWorkflowArguments,
dehydrateWorkflowReturnValue,
getCommonRevivers,
getStreamType,
getWorkflowReducers,
hydrateWorkflowArguments,
} from './serialization.js';
import { STREAM_NAME_SYMBOL } from './symbols.js';
import { STEP_FUNCTION_NAME_SYMBOL, STREAM_NAME_SYMBOL } from './symbols.js';
import { createContext } from './vm/index.js';

describe('getStreamType', () => {
Expand Down Expand Up @@ -783,3 +786,130 @@ describe('step return value', () => {
);
});
});

describe('step function serialization', () => {
const { globalThis: vmGlobalThis } = createContext({
seed: 'test',
fixedTimestamp: 1714857600000,
});

it('should detect step function by checking for STEP_FUNCTION_NAME_SYMBOL', () => {
const stepName = 'myStep';
const stepFn = async (x: number) => x * 2;

// Attach the symbol like useStep() does
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});

// Verify the symbol is attached correctly
expect((stepFn as any)[STEP_FUNCTION_NAME_SYMBOL]).toBe(stepName);
});

it('should not have STEP_FUNCTION_NAME_SYMBOL on regular functions', () => {
const regularFn = async (x: number) => x * 2;

// Regular functions should not have the symbol
expect((regularFn as any)[STEP_FUNCTION_NAME_SYMBOL]).toBeUndefined();
});

it('should lookup registered step function by name', () => {
const stepName = 'myRegisteredStep';
const stepFn = async (x: number) => x * 2;

// Register the step function
registerStepFunction(stepName, stepFn);

// Should be retrievable by name
const retrieved = getStepFunction(stepName);
expect(retrieved).toBe(stepFn);
});

it('should return undefined for non-existent registered step function', () => {
const retrieved = getStepFunction('nonExistentStep');
expect(retrieved).toBeUndefined();
});

it('should deserialize step function name through reviver', () => {
const stepName = 'testStep';
const stepFn = async () => 42;

// Register the step function
registerStepFunction(stepName, stepFn);

// Get the reviver and test it directly
const revivers = getCommonRevivers(vmGlobalThis);
const result = revivers.StepFunction(stepName);

expect(result).toBe(stepFn);
});

it('should throw error when reviver cannot find registered step function', () => {
const revivers = getCommonRevivers(vmGlobalThis);

let err: Error | undefined;
try {
revivers.StepFunction('nonExistentStep');
} catch (err_) {
err = err_ as Error;
}

expect(err).toBeDefined();
expect(err?.message).toContain('Step function "nonExistentStep" not found');
expect(err?.message).toContain('Make sure the step function is registered');
});

it('should dehydrate step function passed as argument to a step', () => {
const stepName = 'step//workflows/test.ts//myStep';
const stepFn = async (x: number) => x * 2;

// Register the step function
registerStepFunction(stepName, stepFn);

// Attach the symbol to the function (like the SWC compiler would)
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});

// Simulate passing a step function as an argument within a workflow
// When calling a step from within a workflow context
const args = [stepFn, 42];

// This should serialize the step function by its name using the reducer
const dehydrated = dehydrateStepArguments(args, globalThis);

// Verify it dehydrated successfully
expect(dehydrated).toBeDefined();
expect(Array.isArray(dehydrated)).toBe(true);
// The dehydrated structure is the flattened format from devalue
// It should contain the step function serialized as its name
expect(dehydrated).toContain(stepName);
expect(dehydrated).toContain(42);
});

it('should serialize step function to name through reducer', () => {
const stepName = 'step//workflows/test.ts//anotherStep';
const stepFn = async () => 'result';

// Attach the symbol to the function (like the SWC compiler would)
Object.defineProperty(stepFn, STEP_FUNCTION_NAME_SYMBOL, {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});

// Get the reducer and verify it detects the step function
const reducer = getWorkflowReducers(globalThis).StepFunction;
const result = reducer(stepFn);

// Should return the step name
expect(result).toBe(stepName);
});
});
19 changes: 18 additions & 1 deletion packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { WorkflowRuntimeError } from '@workflow/errors';
import * as devalue from 'devalue';
import { getStepFunction } from './private.js';
import { getWorld } from './runtime/world.js';
import {
BODY_INIT_SYMBOL,
STEP_FUNCTION_NAME_SYMBOL,
STREAM_NAME_SYMBOL,
STREAM_TYPE_SYMBOL,
WEBHOOK_RESPONSE_WRITABLE,
Expand Down Expand Up @@ -169,6 +171,7 @@ export interface SerializableSpecial {
redirected: boolean;
};
Set: any[];
StepFunction: string; // step function name/ID
URL: string;
URLSearchParams: string;
Uint8Array: string; // base64 string
Expand Down Expand Up @@ -275,6 +278,11 @@ function getCommonReducers(global: Record<string, any> = globalThis) {
};
},
Set: (value) => value instanceof global.Set && Array.from(value),
StepFunction: (value) => {
if (typeof value !== 'function') return false;
const stepName = value[STEP_FUNCTION_NAME_SYMBOL];
return typeof stepName === 'string' ? stepName : false;
},
URL: (value) => value instanceof global.URL && value.href,
URLSearchParams: (value) => {
if (!(value instanceof global.URLSearchParams)) return false;
Expand Down Expand Up @@ -464,7 +472,7 @@ function getStepReducers(
};
}

function getCommonRevivers(global: Record<string, any> = globalThis) {
export function getCommonRevivers(global: Record<string, any> = globalThis) {
function reviveArrayBuffer(value: string) {
// Handle sentinel value for zero-length buffers
const base64 = value === '.' ? '' : value;
Expand Down Expand Up @@ -516,6 +524,15 @@ function getCommonRevivers(global: Record<string, any> = globalThis) {
Map: (value) => new global.Map(value),
RegExp: (value) => new global.RegExp(value.source, value.flags),
Set: (value) => new global.Set(value),
StepFunction: (value) => {
const stepFn = getStepFunction(value);
if (!stepFn) {
throw new Error(
`Step function "${value}" not found. Make sure the step function is registered.`
);
}
return stepFn;
},
URL: (value) => new global.URL(value),
URLSearchParams: (value) =>
new global.URLSearchParams(value === '.' ? '' : value),
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ export const BODY_INIT_SYMBOL = Symbol.for('BODY_INIT');
export const WEBHOOK_RESPONSE_WRITABLE = Symbol.for(
'WEBHOOK_RESPONSE_WRITABLE'
);
export const STEP_FUNCTION_NAME_SYMBOL = Symbol.for(
'WORKFLOW_STEP_FUNCTION_NAME'
);
3 changes: 2 additions & 1 deletion packages/nuxt/src/module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { defineNuxtModule } from '@nuxt/kit';
import type { NuxtModule } from '@nuxt/schema';
import type { ModuleOptions as NitroModuleOptions } from '@workflow/nitro';

// Module options TypeScript interface definition
Expand Down Expand Up @@ -30,4 +31,4 @@ export default defineNuxtModule<ModuleOptions>({
nuxt.options.nitro.modules.push('@workflow/nitro');
}
},
});
}) satisfies NuxtModule<ModuleOptions>;
8 changes: 6 additions & 2 deletions packages/nuxt/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"extends": "./.nuxt/tsconfig.json",
"exclude": ["dist", "node_modules"]
"extends": "@workflow/tsconfig/base.json",
"exclude": ["dist", "node_modules"],
"compilerOptions": {
"declaration": false,
"declarationMap": false
}
}
Loading
Loading