From e4fc88f028be5bd85840dcbe6b335ee753d1bc39 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Sat, 14 Feb 2026 14:15:26 -0600 Subject: [PATCH] Fix to prevent workflow & step retries repeating infinitely --- ARCHITECTURE.md | 15 +- packages/docs/docs/retries.mdx | 179 ++++++++------------- packages/openworkflow/CHANGELOG.md | 8 + packages/openworkflow/backend.testsuite.ts | 15 +- packages/openworkflow/client.test.ts | 8 +- packages/openworkflow/execution.ts | 10 +- packages/openworkflow/worker.test.ts | 132 +++++++++++++-- packages/openworkflow/worker.ts | 9 +- packages/openworkflow/workflow.test.ts | 17 ++ packages/openworkflow/workflow.ts | 7 +- 10 files changed, 252 insertions(+), 148 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 0bf75e07..af2631bd 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -234,12 +234,21 @@ execution, replay reaches the failed step and re-executes its function. ### 4.2. Workflow Failures & Retries If an error is unhandled by the workflow code, the entire workflow run fails. -The workflow run is rescheduled with backoff according to its **retry policy**. -By default, retries continue until canceled or until a configured deadline is -reached. If the run can no longer be retried (for example, because the next +Workflow-level retries are **disabled by default** (`maximumAttempts: 1`): an +unhandled error immediately marks the run as `failed`. To enable automatic +workflow-level retries, supply a `retryPolicy` when defining the workflow. +Set `maximumAttempts: 0` for unlimited retries. +If the run can no longer be retried (for example, because the next retry would exceed `deadlineAt` or `maximumAttempts` has been reached), its status is set to `failed` permanently. +When a worker claims a run but does not have the matching workflow definition +in its registry, this is treated as a deployment concern rather than an +application failure. The run is rescheduled with its own generous backoff +policy (5s initial, 5min cap, unlimited attempts) so it remains available +for a worker that does have the definition — for example during a rolling +deploy. + ### 4.3. Retry Policy OpenWorkflow uses the same `RetryPolicy` shape for two separate concerns: diff --git a/packages/docs/docs/retries.mdx b/packages/docs/docs/retries.mdx index e9b7bba5..99536838 100644 --- a/packages/docs/docs/retries.mdx +++ b/packages/docs/docs/retries.mdx @@ -3,28 +3,29 @@ title: Retries description: Automatic retry behavior for failed steps and workflows --- -In any application, things fail sometimes - a third-party API returns a -500 error, a database connection times out, or a network blip drops a request. -These are transient failures: they go away on their own if you try again. +In any app, things fail sometimes - a third-party API returns a 500, a database +connection times out, or a network blip drops a request. These are transient +failures that go away if you try again. -OpenWorkflow handles this automatically. When a step throws an error, the -workflow is rescheduled with an exponential backoff (increasing delays between -retries). Previously completed steps aren't re-run - only the failed step is -retried. +OpenWorkflow handles this automatically by retrying failed steps. When a step +throws, the workflow is rescheduled with an exponential backoff. Previously +completed steps aren't re-run, only the failed step re-executes. ## How Retries Work When a step throws an error: -1. The step attempt is marked as `failed` -2. The error is recorded in the database -3. The workflow is rescheduled with exponential backoff -4. When the workflow resumes, it replays to the failed step -5. The step function executes again (not the cached result) +1. The step attempt is marked as `failed` and the error is recorded +2. The workflow run is rescheduled with exponential backoff +3. When the workflow resumes, it replays to the failed step +4. The step function executes again -## Automatic Retries in Steps +Steps retry up to 10 times by default. If the step still fails after all +attempts, the workflow is permanently marked as `failed`. -Steps that throw are automatically retried: +## Step Retries + +Steps that throw are retried automatically: ```ts await step.run({ name: "call-api" }, async () => { @@ -39,19 +40,18 @@ await step.run({ name: "call-api" }, async () => { }); ``` -Each retry: - -- Replays the workflow from the beginning -- Returns cached results for completed steps -- Re-executes the failed step +### Step Retry Policy -## Retry Policy +Each step can define its own retry policy. If omitted, steps use these defaults: -Both steps and workflows use the same retry policy shape. A retry policy -controls exponential backoff — how long to wait between retries, how fast delays -grow, and when to stop retrying. +| Field | Default | Description | +| -------------------- | -------- | ---------------------------------------------------------- | +| `initialInterval` | `"1s"` | Delay before the first retry | +| `backoffCoefficient` | `2` | Multiplier applied to each subsequent retry delay | +| `maximumInterval` | `"100s"` | Upper bound for retry delay | +| `maximumAttempts` | `10` | Total attempts including the initial one (`0` = unlimited) | -With the defaults, retry delays look like this: +With these defaults, retry delays look like this: | Attempt | Delay | | ------- | --------- | @@ -62,23 +62,7 @@ With the defaults, retry delays look like this: | 5 | ~8s | | ... | ... | -This prevents overwhelming external services during outages. Retries continue -until canceled, until `deadlineAt` is reached (or the next retry would pass it), -or until `maximumAttempts` is exhausted. - -Retry policies have the following fields: - -| Field | Default | Description | -| -------------------- | ---------- | --------------------------------------------------- | -| `initialInterval` | `"1s"` | Delay before the first retry after a failed attempt | -| `backoffCoefficient` | `2` | Multiplier applied to each subsequent retry delay | -| `maximumInterval` | `"100s"` | Upper bound for retry delay | -| `maximumAttempts` | `Infinity` | Maximum attempts, including the initial one | - -### Step Retry Policy - -Each `step.run(...)` can define its own retry policy. If you omit `retryPolicy`, -OpenWorkflow uses the defaults shown above. +Override the defaults per step: ```ts await step.run( @@ -97,11 +81,16 @@ await step.run( ); ``` -### Workflow Retry Policy +Retries also stop early if the workflow has a `deadlineAt` and the next retry +would exceed it. + +## Workflow Retries -Workflow-level `retryPolicy` applies to non-step failures — for example, missing -workflow definitions or errors thrown outside `step.run`. If you omit -`retryPolicy` (or individual fields), OpenWorkflow uses the same defaults. +Errors thrown outside of `step.run(...)` are workflow-level failures. +**Workflow-level failures are not retried by default** — the workflow is +marked as `failed`. + +To enable workflow-level retries, set a `retryPolicy` on the workflow spec: ```ts import { defineWorkflow } from "openworkflow"; @@ -122,23 +111,38 @@ defineWorkflow( ); ``` + + Step retries and workflow retries are independent. Step failures use the + step's own retry policy. The workflow retry policy only applies to errors + thrown outside steps. + + +## Missing Workflow Definitions + +If a worker claims a run but doesn't have the matching workflow registered, it +reschedules the run with exponential backoff (starting at 5s, capped at 5min). +This keeps the run alive during rolling deploys or multi-worker setups where the +right worker hasn't started yet. + +Once a worker with the correct definition comes online, it claims the run and +executes normally. + ## What Triggers a Retry Retries happen when: -- A step function throws an exception -- A step function returns a rejected promise -- The worker crashes during step execution +- A step function throws an error or returns a rejected promise +- A worker crashes during step execution (the step is re-executed on recovery) Retries do **not** happen for: -- Completed steps (they return cached results) +- Completed steps (cached results are returned) - Explicitly canceled workflows -- Workflows that complete successfully +- Workflow-level errors (unless a workflow `retryPolicy` is configured) ## Error Handling -You can catch and handle errors within your workflow: +You can catch step errors inside a workflow to run fallback logic: ```ts defineWorkflow({ name: "with-error-handling" }, async ({ input, step }) => { @@ -147,10 +151,7 @@ defineWorkflow({ name: "with-error-handling" }, async ({ input, step }) => { await externalApi.call(); }); } catch (error) { - // Log the error and continue with fallback - console.error("API call failed:", error); - - await step.run({ name: "fallback-operation" }, async () => { + await step.run({ name: "fallback" }, async () => { await fallbackApi.call(); }); } @@ -158,71 +159,21 @@ defineWorkflow({ name: "with-error-handling" }, async ({ input, step }) => { ``` - When you catch an error, the workflow continues normally. The step is still - marked as failed in the database, but the workflow doesn't retry from that - point. + When you catch an error the workflow continues normally. The step is still + recorded as failed, but no retry is triggered. -## Permanent Failures - -A workflow is marked as `failed` permanently when it can no longer be retried -(for example, because `deadlineAt` is reached, the next retry would exceed that -deadline, or `maximumAttempts` has been reached): - -- The error is stored in the workflow run record -- No more automatic retries occur -- You can view failed workflows in the dashboard -- Failed workflows can be manually retried or investigated - -## Transient vs. Permanent Errors - -Design your steps to distinguish between transient and permanent errors: +## Terminal Failures -```ts -await step.run({ name: "call-api" }, async () => { - const response = await fetch("https://api.example.com/data"); - - if (response.status === 503) { - // Transient - throw to trigger retry - throw new Error("Service temporarily unavailable"); - } - - if (response.status === 400) { - // Permanent - bad request won't succeed on retry - // Handle differently (return error result, cancel workflow, etc.) - return { success: false, error: "Invalid request" }; - } - - return await response.json(); -}); -``` - -## Best Practices - -### Use Meaningful Error Messages - -Include context in errors for debugging: - -```ts -await step.run({ name: "fetch-user" }, async () => { - const user = await db.users.findOne({ id: input.userId }); - - if (!user) { - throw new Error(`User not found: ${input.userId}`); - } - - return user; -}); -``` +A workflow is permanently marked `failed` when step retries are exhausted +(`maximumAttempts` reached) or `deadlineAt` expires. -## Monitoring Retries +Once terminal, no more automatic retries occur. You can inspect and manually +retry failed workflows from the [dashboard](/docs/dashboard). -Use the dashboard to monitor workflow health: +## Monitoring -- View failed workflow runs -- Inspect step attempt errors -- See retry history for a workflow -- Identify patterns in failures +Use the [dashboard](/docs/dashboard) to monitor retry health: ```bash npm diff --git a/packages/openworkflow/CHANGELOG.md b/packages/openworkflow/CHANGELOG.md index b2c3ec2c..91b4feff 100644 --- a/packages/openworkflow/CHANGELOG.md +++ b/packages/openworkflow/CHANGELOG.md @@ -1,5 +1,13 @@ # openworkflow +## Unreleased + +- Fix to prevent workflows retrying indefinitely on default policies +- Unbounded retries are still supported by setting `retryPolicy.maximumAttempts` + to `Infinity` or 0 +- Unregistered workflows are still rescheduled infinitely with backoff instead + of failing terminally so runs survive long rolling deploys + ## 0.7.0 - Add configurable workflow and step retry policies (#279, #294) diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index f29c4fdd..41dfd9bf 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -26,6 +26,11 @@ export interface TestBackendOptions { */ export function testBackend(options: TestBackendOptions): void { const { setup, teardown } = options; + const RESCHEDULING_RETRY_POLICY = { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 3, + } as const; + describe("Backend", () => { let backend: Backend; @@ -837,7 +842,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: claimed.id, workerId, error, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + retryPolicy: RESCHEDULING_RETRY_POLICY, }); // rescheduled, not permanently failed @@ -874,7 +879,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: claimed.id, workerId, error: { message: "first failure" }, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + retryPolicy: RESCHEDULING_RETRY_POLICY, }); expect(firstFailed.status).toBe("pending"); @@ -895,7 +900,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: claimed.id, workerId, error: { message: "second failure" }, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + retryPolicy: RESCHEDULING_RETRY_POLICY, }); expect(secondFailed.status).toBe("pending"); @@ -1435,7 +1440,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: created.id, workerId, error: { message: "test error" }, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + retryPolicy: RESCHEDULING_RETRY_POLICY, }); expect(failed.status).toBe("failed"); @@ -1473,7 +1478,7 @@ export function testBackend(options: TestBackendOptions): void { workflowRunId: created.id, workerId, error: { message: "test error" }, - retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + retryPolicy: RESCHEDULING_RETRY_POLICY, }); expect(failed.status).toBe("pending"); diff --git a/packages/openworkflow/client.test.ts b/packages/openworkflow/client.test.ts index 627ea908..0e154cb8 100644 --- a/packages/openworkflow/client.test.ts +++ b/packages/openworkflow/client.test.ts @@ -227,7 +227,7 @@ describe("OpenWorkflow", () => { expect(claimed).not.toBeNull(); if (!claimed) throw new Error("workflow run was not claimed"); - // mark as failed (should reschedule)) + // mark as failed (terminal by default) await backend.failWorkflowRun({ workflowRunId: claimed.id, workerId, @@ -235,11 +235,11 @@ describe("OpenWorkflow", () => { retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, }); - const rescheduled = await backend.getWorkflowRun({ + const failedRun = await backend.getWorkflowRun({ workflowRunId: claimed.id, }); - expect(rescheduled?.status).toBe("pending"); - expect(rescheduled?.error).toEqual({ message: "boom" }); + expect(failedRun?.status).toBe("failed"); + expect(failedRun?.error).toEqual({ message: "boom" }); }); test("creates workflow run with deadline", async () => { diff --git a/packages/openworkflow/execution.ts b/packages/openworkflow/execution.ts index 968d504a..18bac5ac 100644 --- a/packages/openworkflow/execution.ts +++ b/packages/openworkflow/execution.ts @@ -13,6 +13,7 @@ import { import type { WorkflowRun } from "./core/workflow.js"; import { computeFailedWorkflowRunUpdate, + DEFAULT_WORKFLOW_RETRY_POLICY, type RetryPolicy, } from "./workflow.js"; @@ -116,12 +117,7 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { initialInterval: "1s", backoffCoefficient: 2, maximumInterval: "100s", - maximumAttempts: Infinity, // unlimited -}; - -const TERMINAL_RETRY_POLICY: RetryPolicy = { - ...DEFAULT_STEP_RETRY_POLICY, - maximumAttempts: 0, + maximumAttempts: 10, }; /** @@ -417,7 +413,7 @@ export async function executeWorkflow( workflowRunId: workflowRun.id, workerId, error: serializedError, - retryPolicy: TERMINAL_RETRY_POLICY, + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, }); return; } diff --git a/packages/openworkflow/worker.test.ts b/packages/openworkflow/worker.test.ts index d7824b2a..51080264 100644 --- a/packages/openworkflow/worker.test.ts +++ b/packages/openworkflow/worker.test.ts @@ -76,7 +76,7 @@ describe("Worker", () => { expect(executionCount).toBe(1); }); - test("marks workflow for retry when definition is missing", async () => { + test("reschedules workflow when definition is missing", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -99,23 +99,31 @@ describe("Worker", () => { }); expect(updated?.status).toBe("pending"); - expect(updated?.error).toBeDefined(); + expect(updated?.error).toEqual({ + message: 'Workflow "missing" is not registered', + }); expect(updated?.availableAt).not.toBeNull(); }); - test("retries failed workflows automatically", async () => { + test("retries failed workflows when workflow retry policy allows it", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); let attemptCount = 0; - const workflow = client.defineWorkflow({ name: "retry-test" }, () => { - attemptCount++; - if (attemptCount < 2) { - throw new Error(`Attempt ${String(attemptCount)} failed`); - } - return { success: true, attempts: attemptCount }; - }); + const workflow = client.defineWorkflow( + { + name: "retry-test", + retryPolicy: { maximumAttempts: 2 }, + }, + () => { + attemptCount++; + if (attemptCount < 2) { + throw new Error(`Attempt ${String(attemptCount)} failed`); + } + return { success: true, attempts: attemptCount }; + }, + ); const worker = client.newWorker(); @@ -138,6 +146,33 @@ describe("Worker", () => { expect(result).toEqual({ success: true, attempts: 2 }); }); + test("fails non-step workflow errors terminally by default", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let attemptCount = 0; + const workflow = client.defineWorkflow( + { name: "default-workflow-retry-terminal" }, + () => { + attemptCount++; + throw new Error("always fails"); + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + await worker.tick(); + await sleep(100); + + expect(attemptCount).toBe(1); + const failed = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(failed?.status).toBe("failed"); + expect(failed?.availableAt).toBeNull(); + }); + test("tick is a no-op when no work is available", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -678,7 +713,8 @@ describe("Worker", () => { workflowRunId: handle.workflowRun.id, }); - expect(failed?.status).toBe("pending"); // should be retrying + expect(failed?.status).toBe("failed"); + expect(failed?.availableAt).toBeNull(); expect(failed?.error).toBeDefined(); expect(failed?.error?.message).toContain("Invalid duration format"); }); @@ -1129,7 +1165,7 @@ describe("Worker", () => { expect(resultV2).toBe("v2-result"); }); - test("worker fails workflow run when version is not registered", async () => { + test("worker reschedules workflow run when version is not registered", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -1161,6 +1197,7 @@ describe("Worker", () => { expect(updated?.error).toEqual({ message: 'Workflow "version-check" (version: v2) is not registered', }); + expect(updated?.availableAt).not.toBeNull(); }); test("unversioned workflow does not match versioned run", async () => { @@ -1195,6 +1232,7 @@ describe("Worker", () => { expect(updated?.error).toEqual({ message: 'Workflow "version-mismatch" (version: v1) is not registered', }); + expect(updated?.availableAt).not.toBeNull(); }); test("versioned workflow does not match unversioned run", async () => { @@ -1229,6 +1267,7 @@ describe("Worker", () => { expect(updated?.error).toEqual({ message: 'Workflow "version-required" is not registered', }); + expect(updated?.availableAt).not.toBeNull(); }); test("workflow receives run's version, not registered version", async () => { @@ -1424,6 +1463,75 @@ describe("Worker", () => { expect(result).toBe("ok"); }); + test("fails a step after the default maximum attempts (10)", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: "default-step-max-attempts" }, + async ({ step }) => { + await step.run({ name: "always-fails" }, () => { + throw new Error("boom"); + }); + return "unreachable"; + }, + ); + + const handle = await workflow.run(); + + const seedWorkerId = randomUUID(); + const seededRun = await backend.claimWorkflowRun({ + workerId: seedWorkerId, + leaseDurationMs: 1000, + }); + expect(seededRun?.id).toBe(handle.workflowRun.id); + if (!seededRun) throw new Error("Expected workflow run to be claimed"); + + for (let index = 0; index < 9; index++) { + const seededAttempt = await backend.createStepAttempt({ + workflowRunId: seededRun.id, + workerId: seedWorkerId, + stepName: "always-fails", + kind: "function", + config: {}, + context: null, + }); + + await backend.failStepAttempt({ + workflowRunId: seededRun.id, + stepAttemptId: seededAttempt.id, + workerId: seedWorkerId, + error: { message: `seeded failure ${String(index + 1)}` }, + }); + } + + await backend.rescheduleWorkflowRunAfterFailedStepAttempt({ + workflowRunId: seededRun.id, + workerId: seedWorkerId, + error: { message: "seeded step failures" }, + availableAt: new Date(Date.now() - 1000), + }); + + const worker = client.newWorker(); + await worker.tick(); + await sleep(100); + + const failedRun = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(failedRun?.status).toBe("failed"); + expect(failedRun?.availableAt).toBeNull(); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + }); + const failedAttempts = attempts.data.filter( + (attempt) => + attempt.stepName === "always-fails" && attempt.status === "failed", + ); + expect(failedAttempts).toHaveLength(10); + }); + test("uses step-level retry overrides and terminal step limits", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/worker.ts b/packages/openworkflow/worker.ts index 68ca2f8f..b5306620 100644 --- a/packages/openworkflow/worker.ts +++ b/packages/openworkflow/worker.ts @@ -18,6 +18,13 @@ const DEFAULT_POLL_JITTER_FACTOR_MIN = 0.5; const DEFAULT_POLL_JITTER_FACTOR_MAX = 1; const DEFAULT_CONCURRENCY = 1; +const MISSING_DEFINITION_RETRY_POLICY: RetryPolicy = { + initialInterval: "5s", + backoffCoefficient: 2, + maximumInterval: "5m", + maximumAttempts: 0, // unlimited – keep retrying until the right worker picks it up +}; + /** * Configures how a Worker polls the backend, leases workflow runs, and * registers workflows. @@ -165,7 +172,7 @@ export class Worker { error: { message: `Workflow "${workflowRun.workflowName}"${versionStr} is not registered`, }, - retryPolicy: resolveRetryPolicy(), + retryPolicy: MISSING_DEFINITION_RETRY_POLICY, }); return null; } diff --git a/packages/openworkflow/workflow.test.ts b/packages/openworkflow/workflow.test.ts index c7c14c45..391435c0 100644 --- a/packages/openworkflow/workflow.test.ts +++ b/packages/openworkflow/workflow.test.ts @@ -172,4 +172,21 @@ describe("computeFailedWorkflowRunUpdate", () => { expect(result.finishedAt).toBe(now); expect(result.error).toEqual({ message: "boom" }); }); + + test("retries indefinitely when maximumAttempts is 0", () => { + const unlimitedPolicy: RetryPolicy = { ...policy, maximumAttempts: 0 }; + const now = new Date("2026-01-01T00:00:00.000Z"); + + const result = computeFailedWorkflowRunUpdate( + unlimitedPolicy, + 100, + null, + { message: "boom" }, + now, + ); + + expect(result.status).toBe("pending"); + expect(result.availableAt).not.toBeNull(); + expect(result.finishedAt).toBeNull(); + }); }); diff --git a/packages/openworkflow/workflow.ts b/packages/openworkflow/workflow.ts index 1fc51dd0..56589cf6 100644 --- a/packages/openworkflow/workflow.ts +++ b/packages/openworkflow/workflow.ts @@ -135,7 +135,7 @@ export const DEFAULT_WORKFLOW_RETRY_POLICY: RetryPolicy = { initialInterval: "1s", backoffCoefficient: 2, maximumInterval: "100s", - maximumAttempts: Infinity, // unlimited + maximumAttempts: 1, }; /** @@ -164,7 +164,10 @@ export function computeFailedWorkflowRunUpdate( error: Readonly, now: Readonly, ): FailedWorkflowRunUpdate { - if (attempts >= retryPolicy.maximumAttempts) { + if ( + retryPolicy.maximumAttempts > 0 && // 0 = unlimited attempts + attempts >= retryPolicy.maximumAttempts + ) { return { status: "failed", availableAt: null,