Skip to content
Draft
5 changes: 5 additions & 0 deletions .changeset/young-crabs-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Fix premature workflow suspension while replay is still propagating hydrated step results across the VM boundary.
40 changes: 40 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2312,4 +2312,44 @@ describe('e2e', () => {
expect(returnValue).toBe(133);
}
);

/**
* Regression test for the scheduleWhenIdle premature-suspension bug.
*
* Stresses replay with setup steps followed by a moderately wide outer Promise.all
* where each item advances through multiple sequential waves. A few
* phase-one steps lag 2-3s behind the rest of the batch, while fast items
* continue through later waves.
*
* Expected (after fix): status === 'completed', completed === 12.
* Before fix: run can fail with WorkflowRuntimeError "Unconsumed event in
* event log" for one of the next-wave steps because
* scheduleWhenIdle fires WorkflowSuspension in the gap between fast
* hydrations completing and the next useStep callback registering.
*
* PR with full context: https://github.com/vercel/workflow/pull/1961/changes#top
*/
test.skipIf(
process.env.APP_NAME !== 'nextjs-turbopack' ||
!process.env.WORKFLOW_VERCEL_ENV
)(
'scheduleWhenIdle - concurrent multi-wave workflow completes without unconsumed event error',
{ timeout: 600_000 },
async () => {
const run = await start(
await getWorkflowMetadata(
deploymentUrl,
'workflows/96_many_steps.ts',
'concurrentMultiWaveWorkflow'
),
[]
);

const returnValue = await run.returnValue;
expect(returnValue).toEqual({ totalItems: 12, completed: 12 });

const { json } = await cliInspectJson(`runs ${run.runId}`);
expect(json.status).toBe('completed');
}
);
});
22 changes: 18 additions & 4 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { EventsConsumer } from './events-consumer.js';
import type { WorkflowOrchestratorContext } from './private.js';
import { isVmIdle, type WorkflowOrchestratorContext } from './private.js';
import { dehydrateStepReturnValue } from './serialization.js';
import { createUseStep } from './step.js';
import { createContext } from './vm/index.js';
Expand All @@ -28,23 +28,37 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
});
const ulid = monotonicFactory(() => context.globalThis.Math.random());
const workflowStartedAt = context.globalThis.Date.now();
return {
const promiseQueueHolder = { current: Promise.resolve() };
const ctx: WorkflowOrchestratorContext = {
runId: 'wrun_test',
encryptionKey: undefined,
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events, {
onUnconsumedEvent: () => {},
getPromiseQueue: () => Promise.resolve(),
getPromiseQueue: () => promiseQueueHolder.current,
isVmIdle: () => isVmIdle(ctx),
onceVmIdle: (callback) => {
ctx.vmIdleObservers.add(callback);
return () => ctx.vmIdleObservers.delete(callback);
},
}),
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
),
onWorkflowError: vi.fn(),
promiseQueue: Promise.resolve(),
get promiseQueue() {
return promiseQueueHolder.current;
},
set promiseQueue(value: Promise<void>) {
promiseQueueHolder.current = value;
},
pendingDeliveries: 0,
pendingVmWork: 0,
vmIdleObservers: new Set<() => void>(),
};
return ctx;
}

describe('async deserialization ordering', () => {
Expand Down
Loading
Loading