From 8f34ee7115885b9ba1b459ce9930df7ca6581aac Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 18 May 2026 14:34:58 -0700 Subject: [PATCH 1/3] [core] Exclude inline step execution from replay timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v5 combined workflow+step handler wraps inline step bodies in the same setTimeout(..., REPLAY_TIMEOUT_MS) guard that previously only bounded the v4 'workflows' function's fast deterministic replay. As a result, any workflow with a single step exceeding 240s hard-fails with FatalError: Workflow replay exceeded maximum duration (240s) after 4 attempts — even though the step could legitimately run for the full function maxDuration (up to 800s on Pro Fluid). Replace the setTimeout guard with a per-invocation budget that only accumulates non-step time. pauseReplayBudget() / resumeReplayBudget() bracket each executeStep() call, and the loop checks the budget at iteration boundaries. The retry-then-fail semantics from #1567 are preserved verbatim for the pure-replay case. Also adds a WORKFLOW_REPLAY_TIMEOUT_MS env var override (clamped to 30s..780s) so operators can adjust the bare-replay ceiling without patching @workflow/core. Fixes #2009. --- .../replay-timeout-excludes-step-bodies.md | 5 + packages/core/src/describe-error.test.ts | 6 +- packages/core/src/describe-error.ts | 2 +- packages/core/src/runtime.ts | 254 +++++++++++------- packages/core/src/runtime/constants.test.ts | 82 ++++++ packages/core/src/runtime/constants.ts | 55 +++- 6 files changed, 301 insertions(+), 103 deletions(-) create mode 100644 .changeset/replay-timeout-excludes-step-bodies.md create mode 100644 packages/core/src/runtime/constants.test.ts diff --git a/.changeset/replay-timeout-excludes-step-bodies.md b/.changeset/replay-timeout-excludes-step-bodies.md new file mode 100644 index 0000000000..21f2ed3e21 --- /dev/null +++ b/.changeset/replay-timeout-excludes-step-bodies.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Stop counting inline step execution time toward the workflow replay timeout. The v5 combined workflow+step handler was wrapping inline step bodies in the 240s `REPLAY_TIMEOUT_MS` guard, causing legitimately long steps (e.g. model inference, long sleeps, long external API calls) to fail with `REPLAY_TIMEOUT` after 4 attempts. The replay budget now only covers replay/workflow-VM time between step boundaries, restoring v4 long-step semantics. Adds a `WORKFLOW_REPLAY_TIMEOUT_MS` env var override (clamped to 30s–780s). diff --git a/packages/core/src/describe-error.test.ts b/packages/core/src/describe-error.test.ts index 252566246c..6431a32a68 100644 --- a/packages/core/src/describe-error.test.ts +++ b/packages/core/src/describe-error.test.ts @@ -73,7 +73,7 @@ describe('describeError', () => { const result = describeError(undefined, RUN_ERROR_CODES.REPLAY_TIMEOUT); expect(result.attribution).toBe('sdk'); expect(result.errorCode).toBe(RUN_ERROR_CODES.REPLAY_TIMEOUT); - expect(result.hint).toContain('replay took too long'); + expect(result.hint).toContain('replay between step boundaries'); }); test('MAX_DELIVERIES_EXCEEDED via precomputed errorCode is attributed to the SDK', () => { @@ -141,7 +141,7 @@ describe('describeRunError', () => { errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, }); expect(result.attribution).toBe('sdk'); - expect(result.hint).toContain('replay took too long'); + expect(result.hint).toContain('replay between step boundaries'); }); test('MAX_DELIVERIES_EXCEEDED errorCode is attributed to the SDK', () => { @@ -233,7 +233,7 @@ describe('describeError — payload shape snapshots', () => { { "attribution": "sdk", "errorCode": "REPLAY_TIMEOUT", - "hint": "The workflow replay took too long. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work between step boundaries.", + "hint": "The workflow replay between step boundaries took too long. This bounds workflow-VM and event-log replay time only — step bodies (\`"use step"\` functions) are excluded. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work in workflow code outside of step bodies. Override the default budget via the WORKFLOW_REPLAY_TIMEOUT_MS env var if needed.", } `); }); diff --git a/packages/core/src/describe-error.ts b/packages/core/src/describe-error.ts index 420419a1ee..b045e73278 100644 --- a/packages/core/src/describe-error.ts +++ b/packages/core/src/describe-error.ts @@ -77,7 +77,7 @@ const CONTEXT_ERROR_HINT = const RUNTIME_ERROR_HINT = 'This is an internal workflow SDK error, not a bug in your code. If it keeps happening, please report it with the stack trace and the runId.'; const REPLAY_TIMEOUT_HINT = - 'The workflow replay took too long. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work between step boundaries.'; + 'The workflow replay between step boundaries took too long. This bounds workflow-VM and event-log replay time only — step bodies (`"use step"` functions) are excluded. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work in workflow code outside of step bodies. Override the default budget via the WORKFLOW_REPLAY_TIMEOUT_MS env var if needed.'; const MAX_DELIVERIES_HINT = 'The workflow queue exceeded its max-delivery budget. This usually indicates a persistent runtime failure — check the most recent stack traces for the underlying cause.'; diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index cae3cc5dae..57a83fa7dc 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -18,9 +18,9 @@ import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; import { + getReplayTimeoutMs, MAX_QUEUE_DELIVERIES, REPLAY_TIMEOUT_MAX_RETRIES, - REPLAY_TIMEOUT_MS, } from './runtime/constants.js'; import { getQueueOverhead, @@ -214,84 +214,123 @@ export function workflowEntrypoint( const spanLinks = await linkToCurrentContext(); - // --- Replay timeout guard --- - // If the replay takes longer than the timeout, fail the run and exit. - // This must be lower than the function's maxDuration to ensure - // the failure is recorded before the platform kills the function. - let replayTimeout: NodeJS.Timeout | undefined; - if (process.env.VERCEL_URL !== undefined) { - replayTimeout = setTimeout(async () => { - // Allow a few retries before permanently failing the run. - // On early attempts, just exit so the queue retries the message. - if (metadata.attempt <= REPLAY_TIMEOUT_MAX_RETRIES) { - runLogger.warn( - 'Workflow replay exceeded timeout but will be re-attempted (attempt < maxRetries)', - { - timeoutMs: REPLAY_TIMEOUT_MS, - attempt: metadata.attempt, - maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, - } - ); - process.exit(1); - } + // --- Replay budget bookkeeping --- + // The replay budget bounds the *non-step* portion of a single + // handler invocation: deterministic event-log replay, workflow-VM + // execution between step boundaries, suspension handling, queue + // round-trips, etc. Inline step bodies (`"use step"` functions + // invoked via `executeStep`) are intentionally excluded — they are + // bounded by the platform's function `maxDuration` and the + // `NO_INLINE_REPLAY_AFTER_MS` early-return guard below. + // + // We track elapsed time by recording when each non-step interval + // starts; before any `executeStep(...)` call we pause the budget + // (adding the elapsed delta to `replayElapsedMs`) and resume after. + // The budget is checked at loop boundaries — if exceeded, we hit + // the retry-then-fail path that was previously driven by a + // `setTimeout` wrapping the whole handler. + // + // Earlier versions (pre-#2009 fix) used a single `setTimeout` that + // also bounded step bodies, which broke any workflow with a single + // step longer than the budget. See packages/core/src/runtime/constants.ts + // for the env var override. + const replayTimeoutMs = getReplayTimeoutMs(); + let replayElapsedMs = 0; + let nonStepStart = Date.now(); - const replayTimeoutDescription = describeError( - undefined, - RUN_ERROR_CODES.REPLAY_TIMEOUT - ); - runLogger.error( - 'Workflow replay exceeded timeout and max retries exceeded. Failing the run', + // Accumulate the elapsed delta since the last checkpoint (handler + // entry or the most recent `resumeReplayBudget`) and reset the + // start marker. Call this immediately before any inline step body. + const pauseReplayBudget = (): void => { + replayElapsedMs += Date.now() - nonStepStart; + }; + + // Reset the non-step interval start marker. Call this immediately + // after a step body returns, before any further non-step work. + const resumeReplayBudget = (): void => { + nonStepStart = Date.now(); + }; + + // Returns true if the replay budget is exhausted. The caller is + // expected to call `handleReplayBudgetExhausted()` afterward and + // return from the handler. + const isReplayBudgetExhausted = (): boolean => { + return Date.now() - nonStepStart + replayElapsedMs >= replayTimeoutMs; + }; + + // Fail the run (or retry, on early attempts) when the replay + // budget is exhausted. Matches the semantics of the old setTimeout + // guard: on attempts <= REPLAY_TIMEOUT_MAX_RETRIES it `process.exit(1)`s + // so the queue redelivers; on the next attempt it writes + // `run_failed` with `RUN_ERROR_CODES.REPLAY_TIMEOUT` then exits. + const handleReplayBudgetExhausted = async (): Promise => { + if (metadata.attempt <= REPLAY_TIMEOUT_MAX_RETRIES) { + runLogger.warn( + 'Workflow replay exceeded timeout but will be re-attempted (attempt < maxRetries)', { - timeoutMs: REPLAY_TIMEOUT_MS, + timeoutMs: replayTimeoutMs, attempt: metadata.attempt, maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, - errorCode: replayTimeoutDescription.errorCode, - errorAttribution: replayTimeoutDescription.attribution, } ); + process.exit(1); + } - try { - const world = await getWorld(); - const getEncryptionKey = memoizeEncryptionKey(world, runId); - const timeoutErr = new FatalError( - `Workflow replay exceeded maximum duration (${REPLAY_TIMEOUT_MS / 1000}s) after ${metadata.attempt} attempts` - ); - await world.events.create( - runId, - { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: await dehydrateRunError( - timeoutErr, - runId, - await getEncryptionKey() - ), - errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, - }, - }, - { requestId } - ); - } catch (err) { - // Best effort — process exits regardless. Surface why so - // operators can diagnose repeat timeouts against the backend. - runLogger.warn( - 'Unable to mark run as failed. The queue will continue to retry', - { - attempt: metadata.attempt, - errorName: err instanceof Error ? err.name : 'UnknownError', - errorMessage: - err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - } - ); + const replayTimeoutDescription = describeError( + undefined, + RUN_ERROR_CODES.REPLAY_TIMEOUT + ); + runLogger.error( + 'Workflow replay exceeded timeout and max retries exceeded. Failing the run', + { + timeoutMs: replayTimeoutMs, + attempt: metadata.attempt, + maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, + errorCode: replayTimeoutDescription.errorCode, + errorAttribution: replayTimeoutDescription.attribution, } - // Note that this also prevents the runtime from acking the queue message, - // so the queue will call back once, after which a 410 will get it to exit early. - process.exit(1); - }, REPLAY_TIMEOUT_MS); - replayTimeout.unref(); - } + ); + + try { + const world = await getWorld(); + const getEncryptionKey = memoizeEncryptionKey(world, runId); + const timeoutErr = new FatalError( + `Workflow replay exceeded maximum duration (${replayTimeoutMs / 1000}s) after ${metadata.attempt} attempts` + ); + await world.events.create( + runId, + { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: await dehydrateRunError( + timeoutErr, + runId, + await getEncryptionKey() + ), + errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, + }, + }, + { requestId } + ); + } catch (err) { + // Best effort — process exits regardless. Surface why so + // operators can diagnose repeat timeouts against the backend. + runLogger.warn( + 'Unable to mark run as failed. The queue will continue to retry', + { + attempt: metadata.attempt, + errorName: err instanceof Error ? err.name : 'UnknownError', + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + } + ); + } + // Note that this also prevents the runtime from acking the queue + // message, so the queue will call back once, after which a 410 + // will get it to exit early. + process.exit(1); + }; return await withTraceContext(traceContext, async () => { return await withWorkflowBaggage( @@ -346,14 +385,24 @@ export function workflowEntrypoint( const bgStartedAt = bgRun.startedAt ? +bgRun.startedAt : Date.now(); - const stepResult = await executeStep({ - world, - workflowRunId: runId, - workflowName, - workflowStartedAt: bgStartedAt, - stepId: incomingStepId, - stepName: incomingStepName, - }); + // Pause the replay budget while the step body runs — + // step duration is bounded by the platform's function + // maxDuration, not by the replay timeout. See the + // budget bookkeeping helpers defined above. + pauseReplayBudget(); + let stepResult: Awaited>; + try { + stepResult = await executeStep({ + world, + workflowRunId: runId, + workflowName, + workflowStartedAt: bgStartedAt, + stepId: incomingStepId, + stepName: incomingStepName, + }); + } finally { + resumeReplayBudget(); + } if (stepResult.type === 'retry') { return { timeoutSeconds: stepResult.timeoutSeconds }; } @@ -593,6 +642,19 @@ export function workflowEntrypoint( while (true) { loopIteration++; + // Replay-budget check: bail out (retry or fail) if + // non-step time within this invocation has exceeded + // the configured budget. Step bodies are excluded + // because pauseReplayBudget/resumeReplayBudget bracket + // every `executeStep` call. + if (isReplayBudgetExhausted()) { + await handleReplayBudgetExhausted(); + // handleReplayBudgetExhausted always exits the + // process, but return for type-safety and to make + // the control flow explicit. + return; + } + // Check timeout before replay if ( Date.now() - invocationStartTime >= @@ -950,15 +1012,27 @@ export function workflowEntrypoint( return; } - // Execute inline step - const stepResult = await executeStep({ - world, - workflowRunId: runId, - workflowName, - workflowStartedAt, - stepId: inlineStep.correlationId, - stepName: inlineStep.stepName, - }); + // Execute inline step. Pause the replay budget + // for the duration of the step body — step duration + // is bounded by the platform's function maxDuration, + // not by the replay timeout. Without this the + // replay-budget check at the top of the next loop + // iteration would (incorrectly) charge the step + // body against the budget. + pauseReplayBudget(); + let stepResult: Awaited>; + try { + stepResult = await executeStep({ + world, + workflowRunId: runId, + workflowName, + workflowStartedAt, + stepId: inlineStep.correlationId, + stepName: inlineStep.stepName, + }); + } finally { + resumeReplayBudget(); + } if (stepResult.type === 'retry') { // Step needs retry — queue self with stepId for retry @@ -1136,10 +1210,6 @@ export function workflowEntrypoint( ); // End trace } ); // End withWorkflowBaggage - }).finally(() => { - if (replayTimeout) { - clearTimeout(replayTimeout); - } }); // End withTraceContext } ); diff --git a/packages/core/src/runtime/constants.test.ts b/packages/core/src/runtime/constants.test.ts new file mode 100644 index 0000000000..d99ab4d23f --- /dev/null +++ b/packages/core/src/runtime/constants.test.ts @@ -0,0 +1,82 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + getReplayTimeoutMs, + MAX_REPLAY_TIMEOUT_MS, + MIN_REPLAY_TIMEOUT_MS, + REPLAY_TIMEOUT_MS, +} from './constants.js'; + +describe('getReplayTimeoutMs', () => { + const originalEnv = process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + + beforeEach(() => { + delete process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + }); + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + } else { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = originalEnv; + } + }); + + it('returns the default when the env var is unset', () => { + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('returns the default when the env var is empty', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = ''; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('returns the default when the env var is non-numeric', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'not-a-number'; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('returns the default when the env var is zero', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '0'; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('returns the default when the env var is negative', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '-100'; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('clamps to MIN_REPLAY_TIMEOUT_MS when the env var is below the floor', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '5000'; + expect(getReplayTimeoutMs()).toBe(MIN_REPLAY_TIMEOUT_MS); + }); + + it('clamps to MAX_REPLAY_TIMEOUT_MS when the env var is above the ceiling', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '9999999'; + expect(getReplayTimeoutMs()).toBe(MAX_REPLAY_TIMEOUT_MS); + }); + + it('honors an in-range override', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '600000'; + expect(getReplayTimeoutMs()).toBe(600_000); + }); + + it('accepts the lower-bound value exactly', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = String(MIN_REPLAY_TIMEOUT_MS); + expect(getReplayTimeoutMs()).toBe(MIN_REPLAY_TIMEOUT_MS); + }); + + it('accepts the upper-bound value exactly', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = String(MAX_REPLAY_TIMEOUT_MS); + expect(getReplayTimeoutMs()).toBe(MAX_REPLAY_TIMEOUT_MS); + }); + + it('rejects Infinity and falls back to the default', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'Infinity'; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); + + it('rejects NaN and falls back to the default', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'NaN'; + expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + }); +}); diff --git a/packages/core/src/runtime/constants.ts b/packages/core/src/runtime/constants.ts index 59d8a20879..e2576e195f 100644 --- a/packages/core/src/runtime/constants.ts +++ b/packages/core/src/runtime/constants.ts @@ -12,15 +12,56 @@ // safely under the 24-hour message visibility limit. export const MAX_QUEUE_DELIVERIES = 48; -// Maximum time allowed for a single workflow replay execution (in ms). -// If a replay exceeds this duration, the process exits so the queue can retry. -// This must be lower than the function's maxDuration to ensure the -// timeout handler has time to post the run_failed event before the platform -// kills the function. -// Note that on hobby plan, the maxDuration is 60s, so this barrier will not be hit, -// and the queue will re-try until the visibility window expires. +/** + * Default maximum time allowed for the *replay* portion of a single workflow + * handler invocation (in ms). This budget only covers deterministic-replay + * and workflow-VM execution between step boundaries — inline step bodies + * (`"use step"` functions invoked via `executeStep`) do NOT count against + * it. Step bodies are bounded separately by the platform's function + * `maxDuration` (e.g. 800s on Vercel Pro Fluid) and `NO_INLINE_REPLAY_AFTER_MS`. + * + * If the non-step ("replay") time within a single invocation exceeds this + * budget, the handler exits so the queue can retry. After + * `REPLAY_TIMEOUT_MAX_RETRIES` exhausted attempts the run is failed with + * `RUN_ERROR_CODES.REPLAY_TIMEOUT`. + * + * Note that on Vercel Hobby, the platform `maxDuration` is 300s, so this + * budget will not be hit unless overridden lower; the queue will re-try + * until the visibility window expires. + * + * Override via the `WORKFLOW_REPLAY_TIMEOUT_MS` env var (clamped to + * `MIN_REPLAY_TIMEOUT_MS`..`MAX_REPLAY_TIMEOUT_MS`). + */ export const REPLAY_TIMEOUT_MS = 240_000; +/** Lower bound for the replay-timeout env var override. */ +export const MIN_REPLAY_TIMEOUT_MS = 30_000; + +/** + * Upper bound for the replay-timeout env var override. 780s leaves ≥20s of + * headroom under Vercel Pro Fluid's 800s function ceiling so the handler + * can write `run_failed` before SIGTERM. + */ +export const MAX_REPLAY_TIMEOUT_MS = 780_000; + +/** + * Resolve the effective replay-timeout budget for the current process. + * + * Reads `process.env.WORKFLOW_REPLAY_TIMEOUT_MS` lazily so tests and + * deployments can override per invocation. Invalid / out-of-range values + * fall back to the default (no throw — the env var is an escape hatch, not + * a hard requirement). + */ +export function getReplayTimeoutMs(): number { + const raw = process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + if (!raw) return REPLAY_TIMEOUT_MS; + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed <= 0) return REPLAY_TIMEOUT_MS; + if (parsed < MIN_REPLAY_TIMEOUT_MS) return MIN_REPLAY_TIMEOUT_MS; + if (parsed > MAX_REPLAY_TIMEOUT_MS) return MAX_REPLAY_TIMEOUT_MS; + return parsed; +} + // Number of queue delivery attempts to allow before permanently failing a run // due to a replay timeout. On attempts 1 through this value, the timeout // handler exits without writing run_failed so the queue retries the message. From ece3bc5fc688fdaea617b19a7c6959c964f30dd5 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 18 May 2026 15:37:03 -0700 Subject: [PATCH 2/3] Address PR review feedback - Extract budget bookkeeping into ReplayBudget class (replay-budget.ts) with sentinel-protected idempotent pause()/resume() to avoid double-counting in future refactors that nest step execution - Restore VERCEL_URL gate around process.exit(1) so a long pure-replay in local dev/non-Vercel runtimes can't hard-kill the host process - Warn (once per distinct raw value) when WORKFLOW_REPLAY_TIMEOUT_MS is clamped or rejected, so misconfiguration is observable - Correct Hobby maxDuration comment (60s standard / 300s Fluid) - Document budget-check responsiveness trade-off vs. old setTimeout - Tighten describe-error test assertions to match the full new hint - Shorten changeset description - Add ReplayBudget unit tests (9) including 8-minute step regression - Add warn-once tests for getReplayTimeoutMs (extended) --- .../replay-timeout-excludes-step-bodies.md | 2 +- packages/core/src/describe-error.test.ts | 8 +- packages/core/src/runtime.ts | 175 ++++--------- packages/core/src/runtime/constants.test.ts | 47 +++- packages/core/src/runtime/constants.ts | 66 ++++- .../core/src/runtime/replay-budget.test.ts | 132 ++++++++++ packages/core/src/runtime/replay-budget.ts | 237 ++++++++++++++++++ 7 files changed, 517 insertions(+), 150 deletions(-) create mode 100644 packages/core/src/runtime/replay-budget.test.ts create mode 100644 packages/core/src/runtime/replay-budget.ts diff --git a/.changeset/replay-timeout-excludes-step-bodies.md b/.changeset/replay-timeout-excludes-step-bodies.md index 21f2ed3e21..7a439742e3 100644 --- a/.changeset/replay-timeout-excludes-step-bodies.md +++ b/.changeset/replay-timeout-excludes-step-bodies.md @@ -2,4 +2,4 @@ "@workflow/core": patch --- -Stop counting inline step execution time toward the workflow replay timeout. The v5 combined workflow+step handler was wrapping inline step bodies in the 240s `REPLAY_TIMEOUT_MS` guard, causing legitimately long steps (e.g. model inference, long sleeps, long external API calls) to fail with `REPLAY_TIMEOUT` after 4 attempts. The replay budget now only covers replay/workflow-VM time between step boundaries, restoring v4 long-step semantics. Adds a `WORKFLOW_REPLAY_TIMEOUT_MS` env var override (clamped to 30s–780s). +Exclude inline step execution from the workflow replay timeout. Long-running steps no longer hit `REPLAY_TIMEOUT` (fixes #2009). Adds `WORKFLOW_REPLAY_TIMEOUT_MS` env var override. diff --git a/packages/core/src/describe-error.test.ts b/packages/core/src/describe-error.test.ts index 6431a32a68..89f4025400 100644 --- a/packages/core/src/describe-error.test.ts +++ b/packages/core/src/describe-error.test.ts @@ -73,7 +73,9 @@ describe('describeError', () => { const result = describeError(undefined, RUN_ERROR_CODES.REPLAY_TIMEOUT); expect(result.attribution).toBe('sdk'); expect(result.errorCode).toBe(RUN_ERROR_CODES.REPLAY_TIMEOUT); - expect(result.hint).toContain('replay between step boundaries'); + expect(result.hint).toContain( + 'replay between step boundaries took too long' + ); }); test('MAX_DELIVERIES_EXCEEDED via precomputed errorCode is attributed to the SDK', () => { @@ -141,7 +143,9 @@ describe('describeRunError', () => { errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, }); expect(result.attribution).toBe('sdk'); - expect(result.hint).toContain('replay between step boundaries'); + expect(result.hint).toContain( + 'replay between step boundaries took too long' + ); }); test('MAX_DELIVERIES_EXCEEDED errorCode is attributed to the SDK', () => { diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 57a83fa7dc..6e146f0c71 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -17,11 +17,7 @@ import { classifyRunError } from './classify-error.js'; import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; -import { - getReplayTimeoutMs, - MAX_QUEUE_DELIVERIES, - REPLAY_TIMEOUT_MAX_RETRIES, -} from './runtime/constants.js'; +import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { getQueueOverhead, getWorkflowQueueName, @@ -32,6 +28,10 @@ import { queueMessage, withHealthCheck, } from './runtime/helpers.js'; +import { + handleReplayBudgetExhausted, + ReplayBudget, +} from './runtime/replay-budget.js'; import { executeStep } from './runtime/step-executor.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { @@ -223,114 +223,22 @@ export function workflowEntrypoint( // bounded by the platform's function `maxDuration` and the // `NO_INLINE_REPLAY_AFTER_MS` early-return guard below. // - // We track elapsed time by recording when each non-step interval - // starts; before any `executeStep(...)` call we pause the budget - // (adding the elapsed delta to `replayElapsedMs`) and resume after. - // The budget is checked at loop boundaries — if exceeded, we hit - // the retry-then-fail path that was previously driven by a - // `setTimeout` wrapping the whole handler. + // The budget is checked at loop boundaries (top of each `while` + // iteration). Note this is *less responsive* than the old + // `setTimeout`-based approach: a single pathological `runWorkflow` + // call processing a huge event log can overshoot the budget by up + // to one iteration before bailing. In practice the headroom built + // into `MAX_REPLAY_TIMEOUT_MS` (and the platform `maxDuration` + // SIGTERM as ultimate backstop) gives us slack — the previous + // `setTimeout` approach also relied on the platform kill as the + // hard backstop. Do *not* "fix" this by adding a `setInterval`; + // it would risk the same bug we just removed (bounding step + // bodies). // - // Earlier versions (pre-#2009 fix) used a single `setTimeout` that - // also bounded step bodies, which broke any workflow with a single - // step longer than the budget. See packages/core/src/runtime/constants.ts - // for the env var override. - const replayTimeoutMs = getReplayTimeoutMs(); - let replayElapsedMs = 0; - let nonStepStart = Date.now(); - - // Accumulate the elapsed delta since the last checkpoint (handler - // entry or the most recent `resumeReplayBudget`) and reset the - // start marker. Call this immediately before any inline step body. - const pauseReplayBudget = (): void => { - replayElapsedMs += Date.now() - nonStepStart; - }; - - // Reset the non-step interval start marker. Call this immediately - // after a step body returns, before any further non-step work. - const resumeReplayBudget = (): void => { - nonStepStart = Date.now(); - }; - - // Returns true if the replay budget is exhausted. The caller is - // expected to call `handleReplayBudgetExhausted()` afterward and - // return from the handler. - const isReplayBudgetExhausted = (): boolean => { - return Date.now() - nonStepStart + replayElapsedMs >= replayTimeoutMs; - }; - - // Fail the run (or retry, on early attempts) when the replay - // budget is exhausted. Matches the semantics of the old setTimeout - // guard: on attempts <= REPLAY_TIMEOUT_MAX_RETRIES it `process.exit(1)`s - // so the queue redelivers; on the next attempt it writes - // `run_failed` with `RUN_ERROR_CODES.REPLAY_TIMEOUT` then exits. - const handleReplayBudgetExhausted = async (): Promise => { - if (metadata.attempt <= REPLAY_TIMEOUT_MAX_RETRIES) { - runLogger.warn( - 'Workflow replay exceeded timeout but will be re-attempted (attempt < maxRetries)', - { - timeoutMs: replayTimeoutMs, - attempt: metadata.attempt, - maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, - } - ); - process.exit(1); - } - - const replayTimeoutDescription = describeError( - undefined, - RUN_ERROR_CODES.REPLAY_TIMEOUT - ); - runLogger.error( - 'Workflow replay exceeded timeout and max retries exceeded. Failing the run', - { - timeoutMs: replayTimeoutMs, - attempt: metadata.attempt, - maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, - errorCode: replayTimeoutDescription.errorCode, - errorAttribution: replayTimeoutDescription.attribution, - } - ); - - try { - const world = await getWorld(); - const getEncryptionKey = memoizeEncryptionKey(world, runId); - const timeoutErr = new FatalError( - `Workflow replay exceeded maximum duration (${replayTimeoutMs / 1000}s) after ${metadata.attempt} attempts` - ); - await world.events.create( - runId, - { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: await dehydrateRunError( - timeoutErr, - runId, - await getEncryptionKey() - ), - errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, - }, - }, - { requestId } - ); - } catch (err) { - // Best effort — process exits regardless. Surface why so - // operators can diagnose repeat timeouts against the backend. - runLogger.warn( - 'Unable to mark run as failed. The queue will continue to retry', - { - attempt: metadata.attempt, - errorName: err instanceof Error ? err.name : 'UnknownError', - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - } - ); - } - // Note that this also prevents the runtime from acking the queue - // message, so the queue will call back once, after which a 410 - // will get it to exit early. - process.exit(1); - }; + // Earlier versions (pre-#2009 fix) used a single `setTimeout` + // that also bounded step bodies, which broke any workflow with a + // single step longer than the budget. + const replayBudget = new ReplayBudget(); return await withTraceContext(traceContext, async () => { return await withWorkflowBaggage( @@ -388,8 +296,8 @@ export function workflowEntrypoint( // Pause the replay budget while the step body runs — // step duration is bounded by the platform's function // maxDuration, not by the replay timeout. See the - // budget bookkeeping helpers defined above. - pauseReplayBudget(); + // ReplayBudget docs for the contract. + replayBudget.pause(); let stepResult: Awaited>; try { stepResult = await executeStep({ @@ -401,7 +309,7 @@ export function workflowEntrypoint( stepName: incomingStepName, }); } finally { - resumeReplayBudget(); + replayBudget.resume(); } if (stepResult.type === 'retry') { return { timeoutSeconds: stepResult.timeoutSeconds }; @@ -645,13 +553,20 @@ export function workflowEntrypoint( // Replay-budget check: bail out (retry or fail) if // non-step time within this invocation has exceeded // the configured budget. Step bodies are excluded - // because pauseReplayBudget/resumeReplayBudget bracket - // every `executeStep` call. - if (isReplayBudgetExhausted()) { - await handleReplayBudgetExhausted(); - // handleReplayBudgetExhausted always exits the - // process, but return for type-safety and to make - // the control flow explicit. + // because replayBudget.pause()/resume() bracket every + // `executeStep` call. + if (replayBudget.isExhausted()) { + await handleReplayBudgetExhausted({ + runId, + workflowName, + requestId, + attempt: metadata.attempt, + limitMs: replayBudget.configuredLimitMs, + }); + // On Vercel, handleReplayBudgetExhausted always + // exits the process. On local dev it returns; we + // fall through and the request ends normally + // (run_failed has been written best-effort). return; } @@ -1013,13 +928,13 @@ export function workflowEntrypoint( } // Execute inline step. Pause the replay budget - // for the duration of the step body — step duration - // is bounded by the platform's function maxDuration, - // not by the replay timeout. Without this the - // replay-budget check at the top of the next loop - // iteration would (incorrectly) charge the step - // body against the budget. - pauseReplayBudget(); + // for the duration of the step body — step + // duration is bounded by the platform's function + // maxDuration, not by the replay timeout. Without + // this the replay-budget check at the top of the + // next loop iteration would (incorrectly) charge + // the step body against the budget. + replayBudget.pause(); let stepResult: Awaited>; try { stepResult = await executeStep({ @@ -1031,7 +946,7 @@ export function workflowEntrypoint( stepName: inlineStep.stepName, }); } finally { - resumeReplayBudget(); + replayBudget.resume(); } if (stepResult.type === 'retry') { diff --git a/packages/core/src/runtime/constants.test.ts b/packages/core/src/runtime/constants.test.ts index d99ab4d23f..fe71939262 100644 --- a/packages/core/src/runtime/constants.test.ts +++ b/packages/core/src/runtime/constants.test.ts @@ -1,5 +1,7 @@ -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { runtimeLogger } from '../logger.js'; import { + _resetReplayTimeoutWarnCacheForTests, getReplayTimeoutMs, MAX_REPLAY_TIMEOUT_MS, MIN_REPLAY_TIMEOUT_MS, @@ -8,9 +10,12 @@ import { describe('getReplayTimeoutMs', () => { const originalEnv = process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + let warnSpy: ReturnType; beforeEach(() => { delete process.env.WORKFLOW_REPLAY_TIMEOUT_MS; + _resetReplayTimeoutWarnCacheForTests(); + warnSpy = vi.spyOn(runtimeLogger, 'warn').mockImplementation(() => {}); }); afterEach(() => { @@ -19,64 +24,88 @@ describe('getReplayTimeoutMs', () => { } else { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = originalEnv; } + warnSpy.mockRestore(); }); it('returns the default when the env var is unset', () => { expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).not.toHaveBeenCalled(); }); it('returns the default when the env var is empty', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = ''; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).not.toHaveBeenCalled(); }); - it('returns the default when the env var is non-numeric', () => { + it('returns the default and warns when the env var is non-numeric', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'not-a-number'; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy.mock.calls[0][0]).toContain('not a positive finite number'); }); - it('returns the default when the env var is zero', () => { + it('returns the default and warns when the env var is zero', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '0'; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); }); - it('returns the default when the env var is negative', () => { + it('returns the default and warns when the env var is negative', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '-100'; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); }); - it('clamps to MIN_REPLAY_TIMEOUT_MS when the env var is below the floor', () => { + it('clamps to MIN_REPLAY_TIMEOUT_MS and warns when below floor', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '5000'; expect(getReplayTimeoutMs()).toBe(MIN_REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy.mock.calls[0][0]).toContain('below minimum'); }); - it('clamps to MAX_REPLAY_TIMEOUT_MS when the env var is above the ceiling', () => { + it('clamps to MAX_REPLAY_TIMEOUT_MS and warns when above ceiling', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '9999999'; expect(getReplayTimeoutMs()).toBe(MAX_REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy.mock.calls[0][0]).toContain('above maximum'); }); - it('honors an in-range override', () => { + it('honors an in-range override without warning', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '600000'; expect(getReplayTimeoutMs()).toBe(600_000); + expect(warnSpy).not.toHaveBeenCalled(); }); - it('accepts the lower-bound value exactly', () => { + it('accepts the lower-bound value exactly without warning', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = String(MIN_REPLAY_TIMEOUT_MS); expect(getReplayTimeoutMs()).toBe(MIN_REPLAY_TIMEOUT_MS); + expect(warnSpy).not.toHaveBeenCalled(); }); - it('accepts the upper-bound value exactly', () => { + it('accepts the upper-bound value exactly without warning', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = String(MAX_REPLAY_TIMEOUT_MS); expect(getReplayTimeoutMs()).toBe(MAX_REPLAY_TIMEOUT_MS); + expect(warnSpy).not.toHaveBeenCalled(); }); it('rejects Infinity and falls back to the default', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'Infinity'; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); }); it('rejects NaN and falls back to the default', () => { process.env.WORKFLOW_REPLAY_TIMEOUT_MS = 'NaN'; expect(getReplayTimeoutMs()).toBe(REPLAY_TIMEOUT_MS); + expect(warnSpy).toHaveBeenCalledTimes(1); + }); + + it('only warns once per distinct raw env var value', () => { + process.env.WORKFLOW_REPLAY_TIMEOUT_MS = '5000'; + getReplayTimeoutMs(); + getReplayTimeoutMs(); + getReplayTimeoutMs(); + expect(warnSpy).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/core/src/runtime/constants.ts b/packages/core/src/runtime/constants.ts index e2576e195f..6c1c241b85 100644 --- a/packages/core/src/runtime/constants.ts +++ b/packages/core/src/runtime/constants.ts @@ -1,3 +1,5 @@ +import { runtimeLogger } from '../logger.js'; + // Maximum number of queue delivery attempts before the handler gives up and // gracefully fails the run/step. This must be bounded under the VQS message // max visibility window (24 hours) so that our handler-side failure path @@ -25,9 +27,11 @@ export const MAX_QUEUE_DELIVERIES = 48; * `REPLAY_TIMEOUT_MAX_RETRIES` exhausted attempts the run is failed with * `RUN_ERROR_CODES.REPLAY_TIMEOUT`. * - * Note that on Vercel Hobby, the platform `maxDuration` is 300s, so this - * budget will not be hit unless overridden lower; the queue will re-try - * until the visibility window expires. + * Note that on Vercel Hobby (standard functions), the platform `maxDuration` + * is 60s — well below this budget, so the platform SIGTERM will fire first + * and the queue will re-deliver until the visibility window expires. With + * Fluid Compute on Hobby the per-function ceiling rises to 300s, still + * under the default budget. * * Override via the `WORKFLOW_REPLAY_TIMEOUT_MS` env var (clamped to * `MIN_REPLAY_TIMEOUT_MS`..`MAX_REPLAY_TIMEOUT_MS`). @@ -44,24 +48,70 @@ export const MIN_REPLAY_TIMEOUT_MS = 30_000; */ export const MAX_REPLAY_TIMEOUT_MS = 780_000; +// Track which raw env var values we've already warned about so the warning +// log only fires once per process (the function may be called many times). +const warnedReplayTimeoutValues = new Set(); + +function warnOnce( + raw: string, + message: string, + data: Record +): void { + if (warnedReplayTimeoutValues.has(raw)) return; + warnedReplayTimeoutValues.add(raw); + runtimeLogger.warn(message, data); +} + /** * Resolve the effective replay-timeout budget for the current process. * * Reads `process.env.WORKFLOW_REPLAY_TIMEOUT_MS` lazily so tests and * deployments can override per invocation. Invalid / out-of-range values - * fall back to the default (no throw — the env var is an escape hatch, not - * a hard requirement). + * fall back to a safe value (no throw — the env var is an escape hatch, + * not a hard requirement) and emit a one-time warning so misconfiguration + * is observable. */ export function getReplayTimeoutMs(): number { const raw = process.env.WORKFLOW_REPLAY_TIMEOUT_MS; if (!raw) return REPLAY_TIMEOUT_MS; const parsed = Number(raw); - if (!Number.isFinite(parsed) || parsed <= 0) return REPLAY_TIMEOUT_MS; - if (parsed < MIN_REPLAY_TIMEOUT_MS) return MIN_REPLAY_TIMEOUT_MS; - if (parsed > MAX_REPLAY_TIMEOUT_MS) return MAX_REPLAY_TIMEOUT_MS; + if (!Number.isFinite(parsed) || parsed <= 0) { + warnOnce( + raw, + 'Ignoring WORKFLOW_REPLAY_TIMEOUT_MS: not a positive finite number; using default', + { raw, defaultMs: REPLAY_TIMEOUT_MS } + ); + return REPLAY_TIMEOUT_MS; + } + if (parsed < MIN_REPLAY_TIMEOUT_MS) { + warnOnce(raw, 'WORKFLOW_REPLAY_TIMEOUT_MS below minimum; clamped', { + raw, + clampedMs: MIN_REPLAY_TIMEOUT_MS, + minMs: MIN_REPLAY_TIMEOUT_MS, + }); + return MIN_REPLAY_TIMEOUT_MS; + } + if (parsed > MAX_REPLAY_TIMEOUT_MS) { + warnOnce(raw, 'WORKFLOW_REPLAY_TIMEOUT_MS above maximum; clamped', { + raw, + clampedMs: MAX_REPLAY_TIMEOUT_MS, + maxMs: MAX_REPLAY_TIMEOUT_MS, + }); + return MAX_REPLAY_TIMEOUT_MS; + } return parsed; } +/** + * Reset the warn-once cache. Test-only — exported so unit tests can + * exercise the warn path repeatedly without sharing state. + * + * @internal + */ +export function _resetReplayTimeoutWarnCacheForTests(): void { + warnedReplayTimeoutValues.clear(); +} + // Number of queue delivery attempts to allow before permanently failing a run // due to a replay timeout. On attempts 1 through this value, the timeout // handler exits without writing run_failed so the queue retries the message. diff --git a/packages/core/src/runtime/replay-budget.test.ts b/packages/core/src/runtime/replay-budget.test.ts new file mode 100644 index 0000000000..41085affa2 --- /dev/null +++ b/packages/core/src/runtime/replay-budget.test.ts @@ -0,0 +1,132 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ReplayBudget } from './replay-budget.js'; + +describe('ReplayBudget', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('starts unpaused; elapsed time counts toward budget', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(500); + expect(budget.elapsed()).toBe(500); + expect(budget.isExhausted()).toBe(false); + vi.advanceTimersByTime(500); + expect(budget.elapsed()).toBe(1000); + expect(budget.isExhausted()).toBe(true); + }); + + it('pause() stops counting; resume() resumes', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(300); + expect(budget.elapsed()).toBe(300); + budget.pause(); + vi.advanceTimersByTime(10_000); + // Time during pause is not charged + expect(budget.elapsed()).toBe(300); + expect(budget.isExhausted()).toBe(false); + budget.resume(); + vi.advanceTimersByTime(700); + expect(budget.elapsed()).toBe(1000); + expect(budget.isExhausted()).toBe(true); + }); + + it('pause() is idempotent — calling twice does not double-count', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(400); + budget.pause(); + // Second pause is a no-op + budget.pause(); + vi.advanceTimersByTime(5_000); + budget.resume(); + vi.advanceTimersByTime(100); + expect(budget.elapsed()).toBe(500); + }); + + it('resume() is idempotent in the sense that back-to-back resumes do not skew elapsed', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(200); + budget.pause(); + vi.advanceTimersByTime(100); + budget.resume(); + budget.resume(); // no-op since no time has passed + vi.advanceTimersByTime(100); + expect(budget.elapsed()).toBe(300); + }); + + it('handles multiple pause/resume cycles (e.g. multiple inline steps)', () => { + const budget = new ReplayBudget(10_000); + + // Initial non-step interval + vi.advanceTimersByTime(100); + expect(budget.elapsed()).toBe(100); + + // Step 1 + budget.pause(); + vi.advanceTimersByTime(60_000); // long step + budget.resume(); + + // More non-step work + vi.advanceTimersByTime(200); + + // Step 2 + budget.pause(); + vi.advanceTimersByTime(30_000); + budget.resume(); + + // Final non-step work + vi.advanceTimersByTime(300); + + // 100 + 200 + 300 = 600ms charged, 90s of step time excluded + expect(budget.elapsed()).toBe(600); + expect(budget.isExhausted()).toBe(false); + }); + + it('configuredLimitMs returns the configured limit', () => { + const budget = new ReplayBudget(12_345); + expect(budget.configuredLimitMs).toBe(12_345); + }); + + it('isExhausted() reflects current state including the open interval', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(999); + expect(budget.isExhausted()).toBe(false); + vi.advanceTimersByTime(1); + expect(budget.isExhausted()).toBe(true); + }); + + it('isExhausted() does not advance while paused', () => { + const budget = new ReplayBudget(1000); + vi.advanceTimersByTime(500); + budget.pause(); + vi.advanceTimersByTime(60_000); // simulate very long step + expect(budget.isExhausted()).toBe(false); + budget.resume(); + vi.advanceTimersByTime(499); + expect(budget.isExhausted()).toBe(false); + vi.advanceTimersByTime(1); + expect(budget.isExhausted()).toBe(true); + }); + + it('regression: 8-minute step does not exhaust default budget', () => { + // Reproduces the user scenario from + // https://github.com/vercel/workflow/issues/2009 — an 8-minute step + // under the default 240s budget should not trip exhaustion because + // step time is excluded from the budget. + const budget = new ReplayBudget(240_000); + // 100ms of non-step work (event load, replay setup) + vi.advanceTimersByTime(100); + // Step body: 8 minutes + budget.pause(); + vi.advanceTimersByTime(8 * 60 * 1000); + budget.resume(); + // A bit more non-step work (write result event) + vi.advanceTimersByTime(50); + expect(budget.isExhausted()).toBe(false); + expect(budget.elapsed()).toBe(150); + }); +}); diff --git a/packages/core/src/runtime/replay-budget.ts b/packages/core/src/runtime/replay-budget.ts new file mode 100644 index 0000000000..936da5661e --- /dev/null +++ b/packages/core/src/runtime/replay-budget.ts @@ -0,0 +1,237 @@ +import { FatalError, RUN_ERROR_CODES } from '@workflow/errors'; +import { SPEC_VERSION_CURRENT } from '@workflow/world'; +import { describeError } from '../describe-error.js'; +import { runtimeLogger } from '../logger.js'; +import { dehydrateRunError } from '../serialization.js'; +import { getReplayTimeoutMs, REPLAY_TIMEOUT_MAX_RETRIES } from './constants.js'; +import { memoizeEncryptionKey } from './helpers.js'; +import { getWorld } from './world.js'; + +/** + * Per-invocation accounting of the *non-step* portion of a workflow + * handler run: deterministic event-log replay, workflow-VM execution + * between step boundaries, suspension handling, queue round-trips, etc. + * Inline step bodies (`"use step"` functions invoked via `executeStep`) + * are intentionally excluded — they are bounded by the platform's + * function `maxDuration` and the `NO_INLINE_REPLAY_AFTER_MS` early-return + * guard. + * + * Usage: + * + * ```ts + * const budget = new ReplayBudget(); + * // …non-step work happens here, accumulates against the budget… + * budget.pause(); + * try { + * await executeStep(...); // not charged + * } finally { + * budget.resume(); + * } + * // back to charging + * if (budget.isExhausted()) { ... } + * ``` + * + * Implementation notes: + * + * - `pause()` and `resume()` are idempotent: calling `pause()` while + * already paused (or `resume()` while already resumed) is a no-op. + * This protects against double-counting in future refactors that nest + * step execution or take an early-return path between a `pause()` and + * the matching `resume()`. + * - `isExhausted()` is checked at loop boundaries by the caller — the + * budget itself does not arm any timers. This means an in-flight + * pathological `runWorkflow` call (e.g. a huge event-log replay) can + * overshoot the budget by up to one iteration's worth of work before + * the next check fires. In practice the 20s headroom built into + * `MAX_REPLAY_TIMEOUT_MS` (and the function `maxDuration` ceiling) + * gives us slack; the old `setTimeout`-based approach also ultimately + * relied on the platform SIGTERM as the hard backstop. + */ +export class ReplayBudget { + private readonly limitMs: number; + private elapsedMs = 0; + private intervalStart: number | null; + + constructor(limitMs: number = getReplayTimeoutMs()) { + this.limitMs = limitMs; + this.intervalStart = Date.now(); + } + + /** + * The configured replay-timeout limit, in ms. Useful for log messages. + */ + get configuredLimitMs(): number { + return this.limitMs; + } + + /** + * Total non-step time accumulated so far, in ms. Includes the + * currently-active interval if the budget is not paused. + */ + elapsed(): number { + const open = + this.intervalStart === null ? 0 : Date.now() - this.intervalStart; + return this.elapsedMs + open; + } + + /** + * Stop counting elapsed time toward the budget. Idempotent — safe to + * call multiple times in a row; subsequent calls are no-ops until + * `resume()` reopens an interval. + */ + pause(): void { + if (this.intervalStart === null) return; + this.elapsedMs += Date.now() - this.intervalStart; + this.intervalStart = null; + } + + /** + * Resume counting elapsed time toward the budget. Idempotent — safe to + * call multiple times in a row; subsequent calls re-anchor the + * interval start to `now()`, which is fine because no time accrues + * between back-to-back `resume()` calls. + */ + resume(): void { + this.intervalStart = Date.now(); + } + + /** + * True if the budget has been exhausted (`elapsed() >= limitMs`). + * Callers should invoke `handleExhausted(...)` afterward and return + * from the handler. + */ + isExhausted(): boolean { + return this.elapsed() >= this.limitMs; + } +} + +/** + * Fail the run (or retry, on early attempts) when the replay budget is + * exhausted. On attempts <= `REPLAY_TIMEOUT_MAX_RETRIES` we exit the + * process so the queue redelivers; on the next attempt we write + * `run_failed` with `RUN_ERROR_CODES.REPLAY_TIMEOUT` then exit. + * + * Gated by `process.env.VERCEL_URL` to preserve prior behavior on local + * dev / non-Vercel runtimes (`pnpm dev` should not be hard-killed by a + * single pathological workflow). Outside Vercel the function logs and + * returns; the queue handler will then ack the message and the request + * completes normally. + */ +export async function handleReplayBudgetExhausted(args: { + runId: string; + workflowName: string; + requestId: string | undefined; + attempt: number; + limitMs: number; +}): Promise { + const { runId, workflowName, requestId, attempt, limitMs } = args; + const runLogger = runtimeLogger.forRun(runId, workflowName); + + // Outside Vercel we don't have a queue redelivery system to rely on, + // and calling `process.exit(1)` would kill the user's dev server. + // Surface the failure via the event log if we can, then return — + // the caller will fall through and the request finishes normally. + if (process.env.VERCEL_URL === undefined) { + runLogger.warn( + 'Workflow replay exceeded timeout (non-Vercel runtime: not exiting process)', + { timeoutMs: limitMs, attempt } + ); + try { + const world = await getWorld(); + const getEncryptionKey = memoizeEncryptionKey(world, runId); + const timeoutErr = new FatalError( + `Workflow replay exceeded maximum duration (${limitMs / 1000}s)` + ); + await world.events.create( + runId, + { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: await dehydrateRunError( + timeoutErr, + runId, + await getEncryptionKey() + ), + errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, + }, + }, + { requestId } + ); + } catch (err) { + runLogger.warn('Unable to mark run as failed (non-Vercel runtime)', { + attempt, + errorName: err instanceof Error ? err.name : 'UnknownError', + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + return; + } + + if (attempt <= REPLAY_TIMEOUT_MAX_RETRIES) { + runLogger.warn( + 'Workflow replay exceeded timeout but will be re-attempted (attempt < maxRetries)', + { + timeoutMs: limitMs, + attempt, + maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, + } + ); + process.exit(1); + } + + const replayTimeoutDescription = describeError( + undefined, + RUN_ERROR_CODES.REPLAY_TIMEOUT + ); + runLogger.error( + 'Workflow replay exceeded timeout and max retries exceeded. Failing the run', + { + timeoutMs: limitMs, + attempt, + maxRetries: REPLAY_TIMEOUT_MAX_RETRIES, + errorCode: replayTimeoutDescription.errorCode, + errorAttribution: replayTimeoutDescription.attribution, + } + ); + + try { + const world = await getWorld(); + const getEncryptionKey = memoizeEncryptionKey(world, runId); + const timeoutErr = new FatalError( + `Workflow replay exceeded maximum duration (${limitMs / 1000}s) after ${attempt} attempts` + ); + await world.events.create( + runId, + { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: await dehydrateRunError( + timeoutErr, + runId, + await getEncryptionKey() + ), + errorCode: RUN_ERROR_CODES.REPLAY_TIMEOUT, + }, + }, + { requestId } + ); + } catch (err) { + // Best effort — process exits regardless. Surface why so operators + // can diagnose repeat timeouts against the backend. + runLogger.warn( + 'Unable to mark run as failed. The queue will continue to retry', + { + attempt, + errorName: err instanceof Error ? err.name : 'UnknownError', + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + } + ); + } + // Note that this also prevents the runtime from acking the queue + // message, so the queue will call back once, after which a 410 will + // get it to exit early. + process.exit(1); +} From daae93946b32b54bf64fe0b633a893b67a4bd5cd Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 18 May 2026 17:11:17 -0700 Subject: [PATCH 3/3] Replace VERCEL_URL gate with World capability Per review feedback, gating runtime behavior on process.env.VERCEL_URL leaks deployment-environment concerns into @workflow/core. Replace the check with a new optional capability on the World interface: processExitTriggersQueueRedelivery (default false). - @workflow/world: declare the new optional capability on World - @workflow/world-vercel: set it to true (Vercel fails the function invocation on non-zero exit and VQS redelivers via fresh invocation) - @workflow/core: handleReplayBudgetExhausted reads world.processExitTriggersQueueRedelivery instead of process.env.VERCEL_URL; behavior is otherwise unchanged - Add 4 unit tests for handleReplayBudgetExhausted covering both branches (exit-for-redelivery and best-effort run_failed) --- .../replay-timeout-excludes-step-bodies.md | 4 +- .../core/src/runtime/replay-budget.test.ts | 126 +++++++++++++++++- packages/core/src/runtime/replay-budget.ts | 40 +++--- packages/world-vercel/src/index.ts | 6 + packages/world/src/interfaces.ts | 22 +++ 5 files changed, 179 insertions(+), 19 deletions(-) diff --git a/.changeset/replay-timeout-excludes-step-bodies.md b/.changeset/replay-timeout-excludes-step-bodies.md index 7a439742e3..32ad7208fe 100644 --- a/.changeset/replay-timeout-excludes-step-bodies.md +++ b/.changeset/replay-timeout-excludes-step-bodies.md @@ -1,5 +1,7 @@ --- "@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch --- -Exclude inline step execution from the workflow replay timeout. Long-running steps no longer hit `REPLAY_TIMEOUT` (fixes #2009). Adds `WORKFLOW_REPLAY_TIMEOUT_MS` env var override. +Exclude inline step execution from the workflow replay timeout. Long-running steps no longer hit `REPLAY_TIMEOUT` (fixes #2009). Adds a `WORKFLOW_REPLAY_TIMEOUT_MS` env var override and a new optional `World.processExitTriggersQueueRedelivery` capability used to gate the runtime's `process.exit(1)` failure path. diff --git a/packages/core/src/runtime/replay-budget.test.ts b/packages/core/src/runtime/replay-budget.test.ts index 41085affa2..7f0c9e96f1 100644 --- a/packages/core/src/runtime/replay-budget.test.ts +++ b/packages/core/src/runtime/replay-budget.test.ts @@ -1,5 +1,20 @@ +import type { World } from '@workflow/world'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { ReplayBudget } from './replay-budget.js'; +import { runtimeLogger } from '../logger.js'; +import { handleReplayBudgetExhausted, ReplayBudget } from './replay-budget.js'; +import { getWorld } from './world.js'; + +vi.mock('./world.js', () => ({ + getWorld: vi.fn(), +})); + +vi.mock('../serialization.js', () => ({ + dehydrateRunError: vi.fn(async () => new Uint8Array([1, 2, 3])), +})); + +vi.mock('./helpers.js', () => ({ + memoizeEncryptionKey: () => async () => undefined, +})); describe('ReplayBudget', () => { beforeEach(() => { @@ -130,3 +145,112 @@ describe('ReplayBudget', () => { expect(budget.elapsed()).toBe(150); }); }); + +describe('handleReplayBudgetExhausted', () => { + let mockEventsCreate: ReturnType; + let exitSpy: ReturnType; + + function makeMockWorld( + processExitTriggersQueueRedelivery: boolean | undefined + ): World { + return { + processExitTriggersQueueRedelivery, + events: { create: mockEventsCreate }, + } as unknown as World; + } + + beforeEach(() => { + mockEventsCreate = vi.fn().mockResolvedValue({}); + // `process.exit` would terminate vitest. Throw a sentinel instead so + // the test can observe the exit attempt without crashing. + exitSpy = vi.spyOn(process, 'exit').mockImplementation(((code?: number) => { + throw new Error(`__test_process_exit__:${code}`); + }) as never); + + // Silence the run-scoped logger; tests don't introspect its calls. + const noopLogger = { + warn: vi.fn(), + error: vi.fn(), + info: vi.fn(), + debug: vi.fn(), + forRun: vi.fn(), + child: vi.fn(), + }; + noopLogger.forRun.mockReturnValue(noopLogger); + noopLogger.child.mockReturnValue(noopLogger); + vi.spyOn(runtimeLogger, 'forRun').mockReturnValue(noopLogger as never); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('does not call process.exit when World does not support exit-for-redelivery (in-process world)', async () => { + vi.mocked(getWorld).mockResolvedValue(makeMockWorld(false)); + + await handleReplayBudgetExhausted({ + runId: 'wrun_test', + workflowName: 'wf', + requestId: undefined, + attempt: 1, + limitMs: 30_000, + }); + + expect(exitSpy).not.toHaveBeenCalled(); + // run_failed event should be written best-effort + expect(mockEventsCreate).toHaveBeenCalledTimes(1); + expect(mockEventsCreate.mock.calls[0][1].eventType).toBe('run_failed'); + }); + + it('does not call process.exit when World omits the capability (default = false)', async () => { + vi.mocked(getWorld).mockResolvedValue(makeMockWorld(undefined)); + + await handleReplayBudgetExhausted({ + runId: 'wrun_test', + workflowName: 'wf', + requestId: undefined, + attempt: 5, + limitMs: 30_000, + }); + + expect(exitSpy).not.toHaveBeenCalled(); + }); + + it('exits without writing run_failed on early attempts when World supports exit-for-redelivery', async () => { + vi.mocked(getWorld).mockResolvedValue(makeMockWorld(true)); + + await expect( + handleReplayBudgetExhausted({ + runId: 'wrun_test', + workflowName: 'wf', + requestId: undefined, + attempt: 1, + limitMs: 240_000, + }) + ).rejects.toThrow('__test_process_exit__:1'); + + expect(exitSpy).toHaveBeenCalledWith(1); + expect(mockEventsCreate).not.toHaveBeenCalled(); + }); + + it('writes run_failed then exits on attempt > REPLAY_TIMEOUT_MAX_RETRIES (Vercel-style World)', async () => { + vi.mocked(getWorld).mockResolvedValue(makeMockWorld(true)); + + await expect( + handleReplayBudgetExhausted({ + runId: 'wrun_test', + workflowName: 'wf', + requestId: 'req_test', + attempt: 4, + limitMs: 240_000, + }) + ).rejects.toThrow('__test_process_exit__:1'); + + expect(exitSpy).toHaveBeenCalledWith(1); + expect(mockEventsCreate).toHaveBeenCalledTimes(1); + expect(mockEventsCreate.mock.calls[0][1].eventType).toBe('run_failed'); + expect(mockEventsCreate.mock.calls[0][1].eventData.errorCode).toBe( + 'REPLAY_TIMEOUT' + ); + }); +}); diff --git a/packages/core/src/runtime/replay-budget.ts b/packages/core/src/runtime/replay-budget.ts index 936da5661e..40aa97fbc1 100644 --- a/packages/core/src/runtime/replay-budget.ts +++ b/packages/core/src/runtime/replay-budget.ts @@ -107,15 +107,19 @@ export class ReplayBudget { /** * Fail the run (or retry, on early attempts) when the replay budget is - * exhausted. On attempts <= `REPLAY_TIMEOUT_MAX_RETRIES` we exit the - * process so the queue redelivers; on the next attempt we write - * `run_failed` with `RUN_ERROR_CODES.REPLAY_TIMEOUT` then exit. + * exhausted. The handling depends on whether the underlying World + * supports `process.exit(1)` as a queue redelivery signal (see + * `World.processExitTriggersQueueRedelivery`): * - * Gated by `process.env.VERCEL_URL` to preserve prior behavior on local - * dev / non-Vercel runtimes (`pnpm dev` should not be hard-killed by a - * single pathological workflow). Outside Vercel the function logs and - * returns; the queue handler will then ack the message and the request - * completes normally. + * - **Managed-platform Worlds** (`world-vercel`): on attempts <= + * `REPLAY_TIMEOUT_MAX_RETRIES` exit the process so the platform fails + * the invocation and the queue redelivers; on the next attempt write + * `run_failed` with `RUN_ERROR_CODES.REPLAY_TIMEOUT` and exit. + * + * - **In-process Worlds** (`world-local`, dev servers): calling + * `process.exit()` would terminate the host (e.g. `pnpm dev`), so + * instead log a warning, write `run_failed` best-effort, and return. + * The framework completes the request normally. */ export async function handleReplayBudgetExhausted(args: { runId: string; @@ -127,17 +131,20 @@ export async function handleReplayBudgetExhausted(args: { const { runId, workflowName, requestId, attempt, limitMs } = args; const runLogger = runtimeLogger.forRun(runId, workflowName); - // Outside Vercel we don't have a queue redelivery system to rely on, - // and calling `process.exit(1)` would kill the user's dev server. - // Surface the failure via the event log if we can, then return — - // the caller will fall through and the request finishes normally. - if (process.env.VERCEL_URL === undefined) { + const world = await getWorld(); + const canExitForRedelivery = + world.processExitTriggersQueueRedelivery === true; + + // Worlds without managed-platform redelivery (e.g. world-local, custom + // in-process worlds) must not have us exit the process — that would + // kill the user's host (`pnpm dev`, CLI, etc.) without producing a + // retry. Surface the failure via the event log if we can, then return. + if (!canExitForRedelivery) { runLogger.warn( - 'Workflow replay exceeded timeout (non-Vercel runtime: not exiting process)', + 'Workflow replay exceeded timeout; current World does not support process exit for redelivery — failing the run and returning', { timeoutMs: limitMs, attempt } ); try { - const world = await getWorld(); const getEncryptionKey = memoizeEncryptionKey(world, runId); const timeoutErr = new FatalError( `Workflow replay exceeded maximum duration (${limitMs / 1000}s)` @@ -159,7 +166,7 @@ export async function handleReplayBudgetExhausted(args: { { requestId } ); } catch (err) { - runLogger.warn('Unable to mark run as failed (non-Vercel runtime)', { + runLogger.warn('Unable to mark run as failed', { attempt, errorName: err instanceof Error ? err.name : 'UnknownError', errorMessage: err instanceof Error ? err.message : String(err), @@ -196,7 +203,6 @@ export async function handleReplayBudgetExhausted(args: { ); try { - const world = await getWorld(); const getEncryptionKey = memoizeEncryptionKey(world, runId); const timeoutErr = new FatalError( `Workflow replay exceeded maximum duration (${limitMs / 1000}s) after ${attempt} attempts` diff --git a/packages/world-vercel/src/index.ts b/packages/world-vercel/src/index.ts index f15c480869..76182c7041 100644 --- a/packages/world-vercel/src/index.ts +++ b/packages/world-vercel/src/index.ts @@ -26,6 +26,12 @@ export function createVercelWorld(config?: APIConfig): World { return { specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + // On Vercel the platform fails the function invocation when the + // process exits non-zero, and VQS redelivers the queue message via a + // fresh invocation. The core runtime uses this to decide whether + // `process.exit(1)` is an acceptable response to an exhausted replay + // budget. + processExitTriggersQueueRedelivery: true, ...createQueue(config), ...createStorage(config), ...instrumentObject('world.streams', createStreamer(config)), diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index ce8a5d2b94..45d8143c4b 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -249,6 +249,28 @@ export interface World extends Queue, Streamer, Storage { */ specVersion?: number; + /** + * Whether calling `process.exit(1)` from a queue handler is observed by + * the World as a delivery failure that will be retried. + * + * Set to `true` for worlds running inside a managed serverless platform + * (e.g. `world-vercel`) where the platform fails the invocation when the + * function process exits non-zero, and the queue redelivers the message + * via a separate fresh invocation. + * + * Set to `false` (the default) for in-process worlds (e.g. `world-local`, + * dev servers) where calling `process.exit()` would terminate the host + * process — including the user's `pnpm dev` — without producing a + * redelivery. Such worlds should instead surface failures via the event + * log and return normally. + * + * The core runtime reads this when deciding how to handle an exhausted + * replay budget: when `true` it exits so the queue redelivers; when + * `false` it writes `run_failed` best-effort and returns. See + * `packages/core/src/runtime/replay-budget.ts`. + */ + processExitTriggersQueueRedelivery?: boolean; + /** * A function that will be called to start any background tasks needed by the World implementation. * For example, in the case of a queue backed World, this would start the queue processing.