From 91f00bda16d1550b81403bbb6ef9062644a90e03 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 12:55:10 -0700 Subject: [PATCH 1/6] fix(tables): retry transient DB/Redis failures in cell execution and surface error causes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workflow-group-cell runs intermittently failed on trivial DB reads/writes under heavy fan-out, stranding cells in `running`. Investigation showed the PlanetScale and ElastiCache backends were healthy at the time — the failures are transient connection-level faults that the cell (maxAttempts: 1) had no tolerance for, and the real cause was never logged (Drizzle wraps it as "Failed query: ..." and the driver cause lives in error.cause). Resilience: - Add retryTransient (lib/table/retry-transient.ts): retries only transient infra errors (reuses isRetryableInfrastructureError; adds an ioredis command-timeout match) with jittered backoff, then rethrows. Fail-fast for everything else. - Wrap the cell's getTableById/getRowById reads, the terminal write (cell-write updateRow — idempotent via the executionId guard), and the Redis cascade-lock acquire. Diagnostics: - Add describeError (lib/core/errors/retryable-infrastructure.ts): walks the .cause chain and always returns the underlying driver cause (code/errno/ syscall + causeChain), including for unclassified errors like AbortError. - Log `cause` + a `retryable` flag (and aborted/timedOut in the cell's main catch) across the cell + finalization error paths, mirroring the existing schedule-execution pattern. Logging-only; no behavior change. This lets the next recurrence reveal the real cause and whether the retry applies. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../background/workflow-column-execution.ts | 41 +++++++-- .../errors/retryable-infrastructure.test.ts | 64 ++++++++++++++ .../core/errors/retryable-infrastructure.ts | 46 ++++++++++ .../sim/lib/logs/execution/logging-session.ts | 16 +++- apps/sim/lib/table/cascade-lock.ts | 7 +- apps/sim/lib/table/cell-write.ts | 34 +++++--- apps/sim/lib/table/retry-transient.test.ts | 83 +++++++++++++++++++ apps/sim/lib/table/retry-transient.ts | 73 ++++++++++++++++ .../workflows/executor/pause-persistence.ts | 8 ++ 9 files changed, 349 insertions(+), 23 deletions(-) create mode 100644 apps/sim/lib/core/errors/retryable-infrastructure.test.ts create mode 100644 apps/sim/lib/table/retry-transient.test.ts create mode 100644 apps/sim/lib/table/retry-transient.ts diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 53c337842fa..338cfa57b2f 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -7,12 +7,14 @@ import { generateId } from '@sim/utils/id' import { backoffWithJitter } from '@sim/utils/retry' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' +import { describeError } from '@/lib/core/errors/retryable-infrastructure' import { createTimeoutAbortController } from '@/lib/core/execution-limits' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' import { preprocessExecution } from '@/lib/execution/preprocessing' import { withCascadeLock } from '@/lib/table/cascade-lock' import { isExecCancelled } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' +import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient' import type { RowData, RowExecutionMetadata, @@ -67,9 +69,15 @@ export async function executeWorkflowGroupCellJob( // marked, so stop re-driving this row. if (outcome.result === 'blocked') break if (signal?.aborted) break - const freshTable = await getTableById(tableId) + const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), { + signal, + }) if (!freshTable) break - const freshRow = await getRowById(tableId, rowId, workspaceId) + const freshRow = await retryTransient( + 'cascade getRowById', + () => getRowById(tableId, rowId, workspaceId), + { signal } + ) if (!freshRow) break const next = pickNextEligibleGroupForRow(freshTable, freshRow) if (!next) break @@ -113,7 +121,9 @@ export async function runRowCascadeLoop( while (true) { if (signal?.aborted) break - const freshTable = await getTableById(tableId) + const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), { + signal, + }) if (!freshTable) { logger.warn(`Table ${tableId} vanished mid-cascade`) break @@ -142,7 +152,11 @@ export async function runRowCascadeLoop( // would re-pick the still-pending queued marker and spin. if (result === 'blocked') return 'blocked' - const freshRow = await getRowById(tableId, rowId, workspaceId) + const freshRow = await retryTransient( + 'cascade getRowById', + () => getRowById(tableId, rowId, workspaceId), + { signal } + ) if (!freshRow) break const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId) if (!next) break @@ -597,8 +611,8 @@ async function runWorkflowAndWriteTerminal( }) .catch((err) => { logger.warn( - `Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId}):`, - err + `Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId})`, + { cause: describeError(err), retryable: isRetryableCellError(err) } ) }) } @@ -720,7 +734,14 @@ async function runWorkflowAndWriteTerminal( const message = toError(err).message logger.error( `Workflow group cell execution failed (table=${tableId} row=${rowId} group=${groupId})`, - { error: message, executionId } + { + error: message, + executionId, + cause: describeError(err), + retryable: isRetryableCellError(err), + aborted: abortSignal.aborted, + timedOut: timeoutController.isTimedOut(), + } ) terminalWritten = true await writeChain.catch(() => {}) @@ -735,7 +756,11 @@ async function runWorkflowAndWriteTerminal( blockErrors, }) } catch (writeErr) { - logger.error('Also failed to write error state', { error: toError(writeErr).message }) + logger.error('Also failed to write error state', { + error: toError(writeErr).message, + cause: describeError(writeErr), + retryable: isRetryableCellError(writeErr), + }) } return 'error' } diff --git a/apps/sim/lib/core/errors/retryable-infrastructure.test.ts b/apps/sim/lib/core/errors/retryable-infrastructure.test.ts new file mode 100644 index 00000000000..f1e4b482b13 --- /dev/null +++ b/apps/sim/lib/core/errors/retryable-infrastructure.test.ts @@ -0,0 +1,64 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { + describeError, + isRetryableInfrastructureError, +} from '@/lib/core/errors/retryable-infrastructure' + +describe('describeError', () => { + it('reports name and message for a plain error, omitting causeChain', () => { + const described = describeError(new Error('boom')) + expect(described).toEqual({ name: 'Error', message: 'boom' }) + expect(described.causeChain).toBeUndefined() + }) + + it('surfaces the deepest cause for a wrapped driver error', () => { + const driver = Object.assign(new Error('read ECONNRESET'), { + code: 'ECONNRESET', + errno: 'ECONNRESET', + syscall: 'read', + }) + const wrapped = new Error('Failed query: select ...', { cause: driver }) + + const described = describeError(wrapped) + expect(described.name).toBe('Error') + expect(described.message).toBe('read ECONNRESET') + expect(described.code).toBe('ECONNRESET') + expect(described.errno).toBe('ECONNRESET') + expect(described.syscall).toBe('read') + expect(described.causeChain).toEqual([ + 'Error: Failed query: select ...', + 'Error: read ECONNRESET', + ]) + }) + + it('always returns the cause for unclassified errors (AbortError)', () => { + const aborted = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }) + const described = describeError(aborted) + + expect(described.name).toBe('AbortError') + expect(described.message).toBe('The operation was aborted') + // The retryable classifier skips it entirely — describeError still surfaces it. + expect(isRetryableInfrastructureError(aborted)).toBe(false) + }) + + it('falls back to a populated description for non-Error input without throwing', () => { + expect(describeError('just a string')).toEqual({ name: 'Error', message: 'just a string' }) + expect(() => describeError({ weird: true })).not.toThrow() + }) + + it('stops walking the cause chain at depth 10 and does not loop on a cycle', () => { + const a = new Error('a') + const b = new Error('b') + ;(a as Error & { cause?: unknown }).cause = b + ;(b as Error & { cause?: unknown }).cause = a + + let described: ReturnType | undefined + expect(() => { + described = describeError(a) + }).not.toThrow() + expect(described?.causeChain?.length).toBeLessThanOrEqual(10) + }) +}) diff --git a/apps/sim/lib/core/errors/retryable-infrastructure.ts b/apps/sim/lib/core/errors/retryable-infrastructure.ts index 4eed69c6425..c686c9cdac5 100644 --- a/apps/sim/lib/core/errors/retryable-infrastructure.ts +++ b/apps/sim/lib/core/errors/retryable-infrastructure.ts @@ -1,3 +1,5 @@ +import { toError } from '@sim/utils/errors' + const RETRYABLE_DB_ERROR_CODES = new Set([ '08000', '08001', @@ -76,3 +78,47 @@ export function describeRetryableInfrastructureError( export function isRetryableInfrastructureError(error: unknown): boolean { return Boolean(describeRetryableInfrastructureError(error)) } + +export interface DescribedError { + name: string + message: string + code?: string + errno?: string + syscall?: string + /** `"Name: message"` per link in the `.cause` chain, outermost first. Present only when the chain has more than one link. */ + causeChain?: string[] +} + +/** + * Always-on diagnostic view of an error and its `.cause` chain. + * + * Unlike {@link describeRetryableInfrastructureError} — which returns + * `undefined` for errors outside its retryable allowlist — this returns the + * underlying cause for ANY error, including `AbortError` and otherwise + * unclassified causes. Reports the fields of the DEEPEST `.cause` link, because + * a wrapped driver error (e.g. Drizzle's `"Failed query: ..."` wrapping an + * `ECONNRESET`) carries the real reason there, not on the outer wrapper. + * + * `@sim/logger` does not serialize the non-enumerable `Error.prototype.cause`, + * so callers must pass the result as an explicit structured log field rather + * than relying on the logger to expand a raw error. + */ +export function describeError(error: unknown): DescribedError { + const chain = getErrorChain(error) + if (chain.length === 0) { + const normalized = toError(error) + return { name: normalized.name, message: normalized.message } + } + const deepest = chain[chain.length - 1] + const code = typeof deepest.code === 'string' ? deepest.code : undefined + const errno = typeof deepest.errno === 'string' ? deepest.errno : undefined + const syscall = typeof deepest.syscall === 'string' ? deepest.syscall : undefined + return { + name: deepest.name, + message: deepest.message, + ...(code ? { code } : {}), + ...(errno ? { errno } : {}), + ...(syscall ? { syscall } : {}), + ...(chain.length > 1 ? { causeChain: chain.map((e) => `${e.name}: ${e.message}`) } : {}), + } +} diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 09bfd2348ca..b4d9daa9b40 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -3,6 +3,10 @@ import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { and, eq, sql } from 'drizzle-orm' +import { + describeError, + isRetryableInfrastructureError, +} from '@/lib/core/errors/retryable-infrastructure' import { executionLogger } from '@/lib/logs/execution/logger' import { calculateCostSummary, @@ -177,6 +181,8 @@ export class LoggingSession { } catch (error) { logger.error(`Failed to persist last started block for execution ${this.executionId}:`, { error: toError(error).message, + cause: describeError(error), + retryable: isRetryableInfrastructureError(error), }) } } @@ -193,6 +199,8 @@ export class LoggingSession { } catch (error) { logger.error(`Failed to persist last completed block for execution ${this.executionId}:`, { error: toError(error).message, + cause: describeError(error), + retryable: isRetryableInfrastructureError(error), }) } } @@ -411,6 +419,8 @@ export class LoggingSession { executionId: this.executionId, error: toError(error).message, stack: error instanceof Error ? error.stack : undefined, + cause: describeError(error), + retryable: isRetryableInfrastructureError(error), }) throw error } @@ -1057,7 +1067,11 @@ export class LoggingSession { this.completionAttemptFailed = true logger.error( `[${this.requestId || 'unknown'}] Cost-only fallback also failed for execution ${this.executionId}:`, - { error: toError(fallbackError).message } + { + error: toError(fallbackError).message, + cause: describeError(fallbackError), + retryable: isRetryableInfrastructureError(fallbackError), + } ) } } diff --git a/apps/sim/lib/table/cascade-lock.ts b/apps/sim/lib/table/cascade-lock.ts index cfdb4702c5f..4bdbf6a367d 100644 --- a/apps/sim/lib/table/cascade-lock.ts +++ b/apps/sim/lib/table/cascade-lock.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis' +import { retryTransient } from '@/lib/table/retry-transient' const logger = createLogger('TableCascadeLock') @@ -40,7 +41,11 @@ export async function withCascadeLock( fn: () => Promise ): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> { const key = cascadeLockKey(tableId, rowId) - const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS) + // A timed-out/dropped Redis command here throws before the cell is picked up; + // retry so a transient Redis blip doesn't fail the run outright. + const acquired = await retryTransient('cascade acquireLock', () => + acquireLock(key, ownerId, LOCK_TTL_SECONDS) + ) if (!acquired) return { status: 'contended' } const heartbeat = setInterval(() => { diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index c5caf6dc2dc..45410856baf 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -12,6 +12,7 @@ import { createLogger } from '@sim/logger' import { isExecCancelled } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' +import { retryTransient } from '@/lib/table/retry-transient' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' const logger = createLogger('WorkflowCellWrite') @@ -46,12 +47,14 @@ export async function writeWorkflowGroupState( const requestId = ctx.requestId ?? `wfgrp-${executionId}` const { getTableById, getRowById, updateRow } = await import('@/lib/table/service') - const table = await getTableById(tableId) + const table = await retryTransient('cell-write getTableById', () => getTableById(tableId)) if (!table) { logger.warn(`Table ${tableId} vanished before group state write`) return 'wrote' } - const row = await getRowById(tableId, rowId, workspaceId) + const row = await retryTransient('cell-write getRowById', () => + getRowById(tableId, rowId, workspaceId) + ) if (!row) { logger.warn(`Row ${rowId} vanished before group state write`) return 'wrote' @@ -99,17 +102,22 @@ export async function writeWorkflowGroupState( // task writes (running/completed/error) get the SQL guard so an in-flight // partial can't clobber a stop click or a newer run that already committed. const cancellationGuard = bypassStaleWorker ? undefined : { groupId, executionId } - const result = await updateRow( - { - tableId, - rowId, - data: payload.dataPatch ?? {}, - workspaceId, - executionsPatch: { [groupId]: payload.executionState }, - cancellationGuard, - }, - table, - requestId + // The executionId/cancellation guard makes this write idempotent — a retry + // after a dropped connection re-applies the same terminal state, so retrying + // is safe and is what stops a transient blip from stranding the cell. + const result = await retryTransient('cell-write updateRow', () => + updateRow( + { + tableId, + rowId, + data: payload.dataPatch ?? {}, + workspaceId, + executionsPatch: { [groupId]: payload.executionState }, + cancellationGuard, + }, + table, + requestId + ) ) if (result === null) { logger.info( diff --git a/apps/sim/lib/table/retry-transient.test.ts b/apps/sim/lib/table/retry-transient.test.ts new file mode 100644 index 00000000000..73b813a3dc8 --- /dev/null +++ b/apps/sim/lib/table/retry-transient.test.ts @@ -0,0 +1,83 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockSleep } = vi.hoisted(() => ({ mockSleep: vi.fn().mockResolvedValue(undefined) })) + +vi.mock('@sim/utils/helpers', () => ({ sleep: mockSleep })) + +import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient' + +function connError(): Error { + return Object.assign(new Error('Failed query: select ...'), { code: 'ECONNRESET' }) +} + +function redisTimeout(): Error { + return new Error('Command timed out') +} + +describe('isRetryableCellError', () => { + it('classifies dropped Postgres connections (network errno) as retryable', () => { + expect(isRetryableCellError(connError())).toBe(true) + }) + + it('classifies ioredis command timeouts as retryable', () => { + expect(isRetryableCellError(redisTimeout())).toBe(true) + expect(isRetryableCellError(new Error('Connection is closed'))).toBe(true) + }) + + it('does not retry application/logic errors', () => { + expect(isRetryableCellError(new Error('row not found'))).toBe(false) + }) +}) + +describe('retryTransient', () => { + beforeEach(() => { + vi.clearAllMocks() + mockSleep.mockResolvedValue(undefined) + }) + + it('returns the result without retrying on success', async () => { + const fn = vi.fn().mockResolvedValue('ok') + await expect(retryTransient('t', fn)).resolves.toBe('ok') + expect(fn).toHaveBeenCalledTimes(1) + expect(mockSleep).not.toHaveBeenCalled() + }) + + it('retries a transient failure then succeeds', async () => { + const fn = vi + .fn() + .mockRejectedValueOnce(connError()) + .mockRejectedValueOnce(redisTimeout()) + .mockResolvedValue('recovered') + await expect(retryTransient('t', fn)).resolves.toBe('recovered') + expect(fn).toHaveBeenCalledTimes(3) + expect(mockSleep).toHaveBeenCalledTimes(2) + }) + + it('rethrows a non-transient error immediately without retrying', async () => { + const fn = vi.fn().mockRejectedValue(new Error('row not found')) + await expect(retryTransient('t', fn)).rejects.toThrow('row not found') + expect(fn).toHaveBeenCalledTimes(1) + expect(mockSleep).not.toHaveBeenCalled() + }) + + it('rethrows after exhausting maxAttempts on a persistent transient error', async () => { + const fn = vi.fn().mockRejectedValue(connError()) + await expect(retryTransient('t', fn, { maxAttempts: 3 })).rejects.toThrow('Failed query') + expect(fn).toHaveBeenCalledTimes(3) + expect(mockSleep).toHaveBeenCalledTimes(2) + }) + + it('does not retry once the signal is aborted', async () => { + const controller = new AbortController() + controller.abort() + const fn = vi.fn().mockRejectedValue(connError()) + await expect(retryTransient('t', fn, { signal: controller.signal })).rejects.toThrow( + 'Failed query' + ) + expect(fn).toHaveBeenCalledTimes(1) + expect(mockSleep).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/table/retry-transient.ts b/apps/sim/lib/table/retry-transient.ts new file mode 100644 index 00000000000..3544f08eb40 --- /dev/null +++ b/apps/sim/lib/table/retry-transient.ts @@ -0,0 +1,73 @@ +import { createLogger } from '@sim/logger' +import { getErrorMessage } from '@sim/utils/errors' +import { sleep } from '@sim/utils/helpers' +import { backoffWithJitter } from '@sim/utils/retry' +import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' + +const logger = createLogger('TableRetryTransient') + +/** Cell-task DB/Redis round-trips are short and idempotent reads/guarded + * writes, so a handful of fast retries comfortably outlasts a transient + * connection drop without risking duplicate side effects. */ +const DEFAULT_MAX_ATTEMPTS = 4 + +/** + * ioredis surfaces command timeouts and severed connections as plain `Error`s + * with no `code`/`errno`, so the SQLSTATE/errno-based + * {@link isRetryableInfrastructureError} classifier misses them. Match those by + * message instead — these are the Redis-side equivalents of a dropped socket. + */ +function isRetryableRedisError(error: unknown): boolean { + return /Command timed out|Connection is closed|Stream isn't writeable|Connection is in subscriber mode/i.test( + getErrorMessage(error) + ) +} + +/** + * True when `error` is a transient infrastructure failure worth retrying — a + * dropped Postgres connection (08xxx / network errno, via the shared + * classifier) or a timed-out/closed Redis command. + */ +export function isRetryableCellError(error: unknown): boolean { + return isRetryableInfrastructureError(error) || isRetryableRedisError(error) +} + +interface RetryTransientOptions { + maxAttempts?: number + /** Abort between attempts (e.g. trigger.dev cancellation). Aborting rethrows + * the last error rather than waiting out another backoff. */ + signal?: AbortSignal +} + +/** + * Runs `fn`, retrying only on transient infrastructure errors with jittered + * backoff. Non-transient errors rethrow immediately; transient errors rethrow + * once `maxAttempts` is exhausted — this is resilience, not error suppression. + * + * The cell task runs under trigger.dev `maxAttempts: 1`, so without this a + * single dropped DB/Redis connection mid-cascade kills the run and strands the + * cell in `running`. The backends answer these queries in sub-milliseconds once + * a fresh connection is established, so a short backoff reliably recovers. + */ +export async function retryTransient( + label: string, + fn: () => Promise, + options: RetryTransientOptions = {} +): Promise { + const maxAttempts = options.maxAttempts ?? DEFAULT_MAX_ATTEMPTS + for (let attempt = 1; ; attempt++) { + try { + return await fn() + } catch (error) { + if (options.signal?.aborted || attempt >= maxAttempts || !isRetryableCellError(error)) { + throw error + } + const waitMs = backoffWithJitter(attempt, null, { baseMs: 250, maxMs: 5_000 }) + logger.warn( + `Transient failure in ${label} (attempt ${attempt}/${maxAttempts}); retrying in ${Math.round(waitMs)}ms`, + { error: getErrorMessage(error) } + ) + await sleep(waitMs) + } + } +} diff --git a/apps/sim/lib/workflows/executor/pause-persistence.ts b/apps/sim/lib/workflows/executor/pause-persistence.ts index 2080668cccd..10a840539ba 100644 --- a/apps/sim/lib/workflows/executor/pause-persistence.ts +++ b/apps/sim/lib/workflows/executor/pause-persistence.ts @@ -1,5 +1,9 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' +import { + describeError, + isRetryableInfrastructureError, +} from '@/lib/core/errors/retryable-infrastructure' import type { LoggingSession } from '@/lib/logs/execution/logging-session' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import type { ExecutionResult } from '@/executor/types' @@ -46,6 +50,8 @@ export async function handlePostExecutionPauseState({ logger.error('Failed to persist pause result', { executionId, error: toError(pauseError).message, + cause: describeError(pauseError), + retryable: isRetryableInfrastructureError(pauseError), }) await loggingSession.markAsFailed( `Failed to persist pause state: ${toError(pauseError).message}` @@ -59,6 +65,8 @@ export async function handlePostExecutionPauseState({ logger.error('Failed to process queued resumes', { executionId, error: toError(resumeError).message, + cause: describeError(resumeError), + retryable: isRetryableInfrastructureError(resumeError), }) } } From 88121547a8e5c60e56577ae4dc4a27a6987e7d9c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 13:15:29 -0700 Subject: [PATCH 2/6] fix(tables): address review feedback on cell retry resilience MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - retryTransient: re-check the abort signal after the backoff sleep so a cancellation during sleep stops the next attempt (don't run/return work for an already-cancelled task). - isRetryableRedisError: walk the .cause chain (mirroring the infra classifier) so wrapped Redis timeouts are recognized; drop "Connection is in subscriber mode" — that's a connection-state programming error, not a transient drop, and would just fail identically every retry. - cascade-lock: stop wrapping acquireLock in retryTransient. acquireLock is a non-idempotent SET NX, so retrying after a timed-out-but-applied first SET returns false (key already ours) and yields a false `contended` that skips the cascade. A transient Redis blip here just fails the run before pickup (no stranded cell); the dispatcher re-drives it. - Tests: cause-chain Redis match, subscriber-mode exclusion, abort-during-sleep. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/sim/lib/table/cascade-lock.ts | 12 +++++------ apps/sim/lib/table/retry-transient.test.ts | 23 ++++++++++++++++++++++ apps/sim/lib/table/retry-transient.ts | 22 +++++++++++++++++---- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/apps/sim/lib/table/cascade-lock.ts b/apps/sim/lib/table/cascade-lock.ts index 4bdbf6a367d..d3d79e9f19c 100644 --- a/apps/sim/lib/table/cascade-lock.ts +++ b/apps/sim/lib/table/cascade-lock.ts @@ -1,7 +1,6 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis' -import { retryTransient } from '@/lib/table/retry-transient' const logger = createLogger('TableCascadeLock') @@ -41,11 +40,12 @@ export async function withCascadeLock( fn: () => Promise ): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> { const key = cascadeLockKey(tableId, rowId) - // A timed-out/dropped Redis command here throws before the cell is picked up; - // retry so a transient Redis blip doesn't fail the run outright. - const acquired = await retryTransient('cascade acquireLock', () => - acquireLock(key, ownerId, LOCK_TTL_SECONDS) - ) + // NOT wrapped in retryTransient: acquireLock is a non-idempotent `SET NX`, so + // a retry after a timed-out-but-applied first SET would see the key already + // present and return false — a false `contended` that skips the cascade. A + // transient Redis blip here just fails the run before pickup (no stranded + // cell); the dispatcher/re-drive recovers it. + const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS) if (!acquired) return { status: 'contended' } const heartbeat = setInterval(() => { diff --git a/apps/sim/lib/table/retry-transient.test.ts b/apps/sim/lib/table/retry-transient.test.ts index 73b813a3dc8..a0568f9dd4b 100644 --- a/apps/sim/lib/table/retry-transient.test.ts +++ b/apps/sim/lib/table/retry-transient.test.ts @@ -27,6 +27,14 @@ describe('isRetryableCellError', () => { expect(isRetryableCellError(new Error('Connection is closed'))).toBe(true) }) + it('matches a Redis timeout wrapped in a cause chain', () => { + expect(isRetryableCellError(new Error('outer wrapper', { cause: redisTimeout() }))).toBe(true) + }) + + it('does not retry Redis connection-state programming errors', () => { + expect(isRetryableCellError(new Error('Connection is in subscriber mode'))).toBe(false) + }) + it('does not retry application/logic errors', () => { expect(isRetryableCellError(new Error('row not found'))).toBe(false) }) @@ -80,4 +88,19 @@ describe('retryTransient', () => { expect(fn).toHaveBeenCalledTimes(1) expect(mockSleep).not.toHaveBeenCalled() }) + + it('stops retrying if the signal aborts during backoff sleep', async () => { + const controller = new AbortController() + // Cancellation fires mid-backoff, after the first failure but before the + // next attempt would run. + mockSleep.mockImplementationOnce(async () => { + controller.abort() + }) + const fn = vi.fn().mockRejectedValue(connError()) + await expect(retryTransient('t', fn, { signal: controller.signal })).rejects.toThrow( + 'Failed query' + ) + expect(fn).toHaveBeenCalledTimes(1) + expect(mockSleep).toHaveBeenCalledTimes(1) + }) }) diff --git a/apps/sim/lib/table/retry-transient.ts b/apps/sim/lib/table/retry-transient.ts index 3544f08eb40..c7127d92c5f 100644 --- a/apps/sim/lib/table/retry-transient.ts +++ b/apps/sim/lib/table/retry-transient.ts @@ -11,16 +11,26 @@ const logger = createLogger('TableRetryTransient') * connection drop without risking duplicate side effects. */ const DEFAULT_MAX_ATTEMPTS = 4 +/** Redis-side equivalents of a dropped socket. Excludes connection-STATE + * programming errors (e.g. "Connection is in subscriber mode"), which are + * misconfigurations that would just fail identically on every retry. */ +const RETRYABLE_REDIS_MESSAGE = /Command timed out|Connection is closed|Stream isn't writeable/i + /** * ioredis surfaces command timeouts and severed connections as plain `Error`s * with no `code`/`errno`, so the SQLSTATE/errno-based * {@link isRetryableInfrastructureError} classifier misses them. Match those by - * message instead — these are the Redis-side equivalents of a dropped socket. + * message instead, walking the `.cause` chain (depth-bounded) so a wrapped + * Redis failure is still recognized — mirroring how the infra classifier walks + * causes. */ function isRetryableRedisError(error: unknown): boolean { - return /Command timed out|Connection is closed|Stream isn't writeable|Connection is in subscriber mode/i.test( - getErrorMessage(error) - ) + let current: unknown = error + for (let depth = 0; depth < 10 && current instanceof Error; depth++) { + if (RETRYABLE_REDIS_MESSAGE.test(current.message)) return true + current = current.cause + } + return false } /** @@ -68,6 +78,10 @@ export async function retryTransient( { error: getErrorMessage(error) } ) await sleep(waitMs) + // Re-check after backoff: if cancellation fired during the sleep, don't + // run another attempt (and don't return a success from work that started + // after the task was already cancelled). + if (options.signal?.aborted) throw error } } } From e0882cd860f53776d4b49feba88573586987ae22 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 13:26:13 -0700 Subject: [PATCH 3/6] fix(tables): drop out-of-scope abort/timeout fields from cell catch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main catch logged `aborted`/`timedOut` from `abortSignal`/`timeoutController`, but those are declared inside the outer try block (the inner try around executeWorkflow is try/finally, so this catch belongs to the outer try) and are not in scope in the catch — `next build`'s type-check failed with "Cannot find name 'abortSignal'". Local incremental `tsc --noEmit` had skipped the file and falsely passed; the Cursor/Greptile reviewers flagged this correctly. Removed the two fields. Abort/timeout is still surfaced via `cause: describeError(err)` (an aborted run shows `name: 'AbortError'` / the timeout message), so no diagnostic signal is lost. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/sim/background/workflow-column-execution.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 338cfa57b2f..7b720f49ad6 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -739,8 +739,6 @@ async function runWorkflowAndWriteTerminal( executionId, cause: describeError(err), retryable: isRetryableCellError(err), - aborted: abortSignal.aborted, - timedOut: timeoutController.isTimedOut(), } ) terminalWritten = true From 67ceef4465a98bae1bbb0a0a7024604b0a6f5857 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 14:11:20 -0700 Subject: [PATCH 4/6] refactor(tables): drop in-process retry, keep cause diagnostics only In-process retry is the wrong layer for this path: the cell task is maxAttempts:1 by design, retrying on a possibly-degraded worker may not help, and it masks the very transient-failure signal we're trying to capture before we understand the root cause. Removed retryTransient entirely (file + all wrapping in cell-write, the cascade reads, and the lock acquire) and kept only the diagnostic logging. - Deleted lib/table/retry-transient.ts (+ test); cell-write and the cascade reads call getTableById/getRowById/updateRow directly again, fail-fast. - Kept describeError + `cause`/`retryable` fields across the cell + finalization catch blocks; the cell-path `retryable` flag now sources from isRetryableInfrastructureError (the canonical classifier) for consistency. Diagnostics-first: surface the real driver cause on the next recurrence, then decide the actual fix (e.g. task-level maxAttempts, or addressing the worker- side cause) from evidence rather than a speculative in-process retry. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../background/workflow-column-execution.ts | 32 ++---- apps/sim/lib/table/cascade-lock.ts | 5 - apps/sim/lib/table/cell-write.ts | 34 +++--- apps/sim/lib/table/retry-transient.test.ts | 106 ------------------ apps/sim/lib/table/retry-transient.ts | 87 -------------- 5 files changed, 24 insertions(+), 240 deletions(-) delete mode 100644 apps/sim/lib/table/retry-transient.test.ts delete mode 100644 apps/sim/lib/table/retry-transient.ts diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 7b720f49ad6..8708e034a8f 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -7,14 +7,16 @@ import { generateId } from '@sim/utils/id' import { backoffWithJitter } from '@sim/utils/retry' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' -import { describeError } from '@/lib/core/errors/retryable-infrastructure' +import { + describeError, + isRetryableInfrastructureError, +} from '@/lib/core/errors/retryable-infrastructure' import { createTimeoutAbortController } from '@/lib/core/execution-limits' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' import { preprocessExecution } from '@/lib/execution/preprocessing' import { withCascadeLock } from '@/lib/table/cascade-lock' import { isExecCancelled } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' -import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient' import type { RowData, RowExecutionMetadata, @@ -69,15 +71,9 @@ export async function executeWorkflowGroupCellJob( // marked, so stop re-driving this row. if (outcome.result === 'blocked') break if (signal?.aborted) break - const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), { - signal, - }) + const freshTable = await getTableById(tableId) if (!freshTable) break - const freshRow = await retryTransient( - 'cascade getRowById', - () => getRowById(tableId, rowId, workspaceId), - { signal } - ) + const freshRow = await getRowById(tableId, rowId, workspaceId) if (!freshRow) break const next = pickNextEligibleGroupForRow(freshTable, freshRow) if (!next) break @@ -121,9 +117,7 @@ export async function runRowCascadeLoop( while (true) { if (signal?.aborted) break - const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), { - signal, - }) + const freshTable = await getTableById(tableId) if (!freshTable) { logger.warn(`Table ${tableId} vanished mid-cascade`) break @@ -152,11 +146,7 @@ export async function runRowCascadeLoop( // would re-pick the still-pending queued marker and spin. if (result === 'blocked') return 'blocked' - const freshRow = await retryTransient( - 'cascade getRowById', - () => getRowById(tableId, rowId, workspaceId), - { signal } - ) + const freshRow = await getRowById(tableId, rowId, workspaceId) if (!freshRow) break const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId) if (!next) break @@ -612,7 +602,7 @@ async function runWorkflowAndWriteTerminal( .catch((err) => { logger.warn( `Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId})`, - { cause: describeError(err), retryable: isRetryableCellError(err) } + { cause: describeError(err), retryable: isRetryableInfrastructureError(err) } ) }) } @@ -738,7 +728,7 @@ async function runWorkflowAndWriteTerminal( error: message, executionId, cause: describeError(err), - retryable: isRetryableCellError(err), + retryable: isRetryableInfrastructureError(err), } ) terminalWritten = true @@ -757,7 +747,7 @@ async function runWorkflowAndWriteTerminal( logger.error('Also failed to write error state', { error: toError(writeErr).message, cause: describeError(writeErr), - retryable: isRetryableCellError(writeErr), + retryable: isRetryableInfrastructureError(writeErr), }) } return 'error' diff --git a/apps/sim/lib/table/cascade-lock.ts b/apps/sim/lib/table/cascade-lock.ts index d3d79e9f19c..cfdb4702c5f 100644 --- a/apps/sim/lib/table/cascade-lock.ts +++ b/apps/sim/lib/table/cascade-lock.ts @@ -40,11 +40,6 @@ export async function withCascadeLock( fn: () => Promise ): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> { const key = cascadeLockKey(tableId, rowId) - // NOT wrapped in retryTransient: acquireLock is a non-idempotent `SET NX`, so - // a retry after a timed-out-but-applied first SET would see the key already - // present and return false — a false `contended` that skips the cascade. A - // transient Redis blip here just fails the run before pickup (no stranded - // cell); the dispatcher/re-drive recovers it. const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS) if (!acquired) return { status: 'contended' } diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 45410856baf..c5caf6dc2dc 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -12,7 +12,6 @@ import { createLogger } from '@sim/logger' import { isExecCancelled } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' -import { retryTransient } from '@/lib/table/retry-transient' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' const logger = createLogger('WorkflowCellWrite') @@ -47,14 +46,12 @@ export async function writeWorkflowGroupState( const requestId = ctx.requestId ?? `wfgrp-${executionId}` const { getTableById, getRowById, updateRow } = await import('@/lib/table/service') - const table = await retryTransient('cell-write getTableById', () => getTableById(tableId)) + const table = await getTableById(tableId) if (!table) { logger.warn(`Table ${tableId} vanished before group state write`) return 'wrote' } - const row = await retryTransient('cell-write getRowById', () => - getRowById(tableId, rowId, workspaceId) - ) + const row = await getRowById(tableId, rowId, workspaceId) if (!row) { logger.warn(`Row ${rowId} vanished before group state write`) return 'wrote' @@ -102,22 +99,17 @@ export async function writeWorkflowGroupState( // task writes (running/completed/error) get the SQL guard so an in-flight // partial can't clobber a stop click or a newer run that already committed. const cancellationGuard = bypassStaleWorker ? undefined : { groupId, executionId } - // The executionId/cancellation guard makes this write idempotent — a retry - // after a dropped connection re-applies the same terminal state, so retrying - // is safe and is what stops a transient blip from stranding the cell. - const result = await retryTransient('cell-write updateRow', () => - updateRow( - { - tableId, - rowId, - data: payload.dataPatch ?? {}, - workspaceId, - executionsPatch: { [groupId]: payload.executionState }, - cancellationGuard, - }, - table, - requestId - ) + const result = await updateRow( + { + tableId, + rowId, + data: payload.dataPatch ?? {}, + workspaceId, + executionsPatch: { [groupId]: payload.executionState }, + cancellationGuard, + }, + table, + requestId ) if (result === null) { logger.info( diff --git a/apps/sim/lib/table/retry-transient.test.ts b/apps/sim/lib/table/retry-transient.test.ts deleted file mode 100644 index a0568f9dd4b..00000000000 --- a/apps/sim/lib/table/retry-transient.test.ts +++ /dev/null @@ -1,106 +0,0 @@ -/** - * @vitest-environment node - */ -import { beforeEach, describe, expect, it, vi } from 'vitest' - -const { mockSleep } = vi.hoisted(() => ({ mockSleep: vi.fn().mockResolvedValue(undefined) })) - -vi.mock('@sim/utils/helpers', () => ({ sleep: mockSleep })) - -import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient' - -function connError(): Error { - return Object.assign(new Error('Failed query: select ...'), { code: 'ECONNRESET' }) -} - -function redisTimeout(): Error { - return new Error('Command timed out') -} - -describe('isRetryableCellError', () => { - it('classifies dropped Postgres connections (network errno) as retryable', () => { - expect(isRetryableCellError(connError())).toBe(true) - }) - - it('classifies ioredis command timeouts as retryable', () => { - expect(isRetryableCellError(redisTimeout())).toBe(true) - expect(isRetryableCellError(new Error('Connection is closed'))).toBe(true) - }) - - it('matches a Redis timeout wrapped in a cause chain', () => { - expect(isRetryableCellError(new Error('outer wrapper', { cause: redisTimeout() }))).toBe(true) - }) - - it('does not retry Redis connection-state programming errors', () => { - expect(isRetryableCellError(new Error('Connection is in subscriber mode'))).toBe(false) - }) - - it('does not retry application/logic errors', () => { - expect(isRetryableCellError(new Error('row not found'))).toBe(false) - }) -}) - -describe('retryTransient', () => { - beforeEach(() => { - vi.clearAllMocks() - mockSleep.mockResolvedValue(undefined) - }) - - it('returns the result without retrying on success', async () => { - const fn = vi.fn().mockResolvedValue('ok') - await expect(retryTransient('t', fn)).resolves.toBe('ok') - expect(fn).toHaveBeenCalledTimes(1) - expect(mockSleep).not.toHaveBeenCalled() - }) - - it('retries a transient failure then succeeds', async () => { - const fn = vi - .fn() - .mockRejectedValueOnce(connError()) - .mockRejectedValueOnce(redisTimeout()) - .mockResolvedValue('recovered') - await expect(retryTransient('t', fn)).resolves.toBe('recovered') - expect(fn).toHaveBeenCalledTimes(3) - expect(mockSleep).toHaveBeenCalledTimes(2) - }) - - it('rethrows a non-transient error immediately without retrying', async () => { - const fn = vi.fn().mockRejectedValue(new Error('row not found')) - await expect(retryTransient('t', fn)).rejects.toThrow('row not found') - expect(fn).toHaveBeenCalledTimes(1) - expect(mockSleep).not.toHaveBeenCalled() - }) - - it('rethrows after exhausting maxAttempts on a persistent transient error', async () => { - const fn = vi.fn().mockRejectedValue(connError()) - await expect(retryTransient('t', fn, { maxAttempts: 3 })).rejects.toThrow('Failed query') - expect(fn).toHaveBeenCalledTimes(3) - expect(mockSleep).toHaveBeenCalledTimes(2) - }) - - it('does not retry once the signal is aborted', async () => { - const controller = new AbortController() - controller.abort() - const fn = vi.fn().mockRejectedValue(connError()) - await expect(retryTransient('t', fn, { signal: controller.signal })).rejects.toThrow( - 'Failed query' - ) - expect(fn).toHaveBeenCalledTimes(1) - expect(mockSleep).not.toHaveBeenCalled() - }) - - it('stops retrying if the signal aborts during backoff sleep', async () => { - const controller = new AbortController() - // Cancellation fires mid-backoff, after the first failure but before the - // next attempt would run. - mockSleep.mockImplementationOnce(async () => { - controller.abort() - }) - const fn = vi.fn().mockRejectedValue(connError()) - await expect(retryTransient('t', fn, { signal: controller.signal })).rejects.toThrow( - 'Failed query' - ) - expect(fn).toHaveBeenCalledTimes(1) - expect(mockSleep).toHaveBeenCalledTimes(1) - }) -}) diff --git a/apps/sim/lib/table/retry-transient.ts b/apps/sim/lib/table/retry-transient.ts deleted file mode 100644 index c7127d92c5f..00000000000 --- a/apps/sim/lib/table/retry-transient.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { createLogger } from '@sim/logger' -import { getErrorMessage } from '@sim/utils/errors' -import { sleep } from '@sim/utils/helpers' -import { backoffWithJitter } from '@sim/utils/retry' -import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' - -const logger = createLogger('TableRetryTransient') - -/** Cell-task DB/Redis round-trips are short and idempotent reads/guarded - * writes, so a handful of fast retries comfortably outlasts a transient - * connection drop without risking duplicate side effects. */ -const DEFAULT_MAX_ATTEMPTS = 4 - -/** Redis-side equivalents of a dropped socket. Excludes connection-STATE - * programming errors (e.g. "Connection is in subscriber mode"), which are - * misconfigurations that would just fail identically on every retry. */ -const RETRYABLE_REDIS_MESSAGE = /Command timed out|Connection is closed|Stream isn't writeable/i - -/** - * ioredis surfaces command timeouts and severed connections as plain `Error`s - * with no `code`/`errno`, so the SQLSTATE/errno-based - * {@link isRetryableInfrastructureError} classifier misses them. Match those by - * message instead, walking the `.cause` chain (depth-bounded) so a wrapped - * Redis failure is still recognized — mirroring how the infra classifier walks - * causes. - */ -function isRetryableRedisError(error: unknown): boolean { - let current: unknown = error - for (let depth = 0; depth < 10 && current instanceof Error; depth++) { - if (RETRYABLE_REDIS_MESSAGE.test(current.message)) return true - current = current.cause - } - return false -} - -/** - * True when `error` is a transient infrastructure failure worth retrying — a - * dropped Postgres connection (08xxx / network errno, via the shared - * classifier) or a timed-out/closed Redis command. - */ -export function isRetryableCellError(error: unknown): boolean { - return isRetryableInfrastructureError(error) || isRetryableRedisError(error) -} - -interface RetryTransientOptions { - maxAttempts?: number - /** Abort between attempts (e.g. trigger.dev cancellation). Aborting rethrows - * the last error rather than waiting out another backoff. */ - signal?: AbortSignal -} - -/** - * Runs `fn`, retrying only on transient infrastructure errors with jittered - * backoff. Non-transient errors rethrow immediately; transient errors rethrow - * once `maxAttempts` is exhausted — this is resilience, not error suppression. - * - * The cell task runs under trigger.dev `maxAttempts: 1`, so without this a - * single dropped DB/Redis connection mid-cascade kills the run and strands the - * cell in `running`. The backends answer these queries in sub-milliseconds once - * a fresh connection is established, so a short backoff reliably recovers. - */ -export async function retryTransient( - label: string, - fn: () => Promise, - options: RetryTransientOptions = {} -): Promise { - const maxAttempts = options.maxAttempts ?? DEFAULT_MAX_ATTEMPTS - for (let attempt = 1; ; attempt++) { - try { - return await fn() - } catch (error) { - if (options.signal?.aborted || attempt >= maxAttempts || !isRetryableCellError(error)) { - throw error - } - const waitMs = backoffWithJitter(attempt, null, { baseMs: 250, maxMs: 5_000 }) - logger.warn( - `Transient failure in ${label} (attempt ${attempt}/${maxAttempts}); retrying in ${Math.round(waitMs)}ms`, - { error: getErrorMessage(error) } - ) - await sleep(waitMs) - // Re-check after backoff: if cancellation fired during the sleep, don't - // run another attempt (and don't return a success from work that started - // after the task was already cancelled). - if (options.signal?.aborted) throw error - } - } -} From 6dfbea89dfdf4aba93439f146325c70eedb823ed Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 14:34:13 -0700 Subject: [PATCH 5/6] fix(schedules): log error cause on scheduled-execution failure paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduled-job failure paths logged the raw error (.message/stack only) — its `.cause` (the real driver error behind a Drizzle "Failed query: ..." wrapper) was never recorded, and the classified-only `describeRetryableInfrastructureError` returns undefined for unrecognized errors. A real failed run (same incident window as the cell failures) failed in `applyScheduleUpdate` with exactly this unrecorded cause. Added `cause: describeError(error)` (always-on, walks the cause chain) to the applyScheduleUpdate catch, the early-failure catch, and the unhandled-error catch — passed as a second arg so the existing message+stack still emit. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/sim/background/schedule-execution.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 4c6b1a9cba8..75c9a3ccf3d 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -16,6 +16,7 @@ import { and, eq, isNull, type SQL, sql } from 'drizzle-orm' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { + describeError, describeRetryableInfrastructureError, isRetryableInfrastructureError, } from '@/lib/core/errors/retryable-infrastructure' @@ -156,7 +157,7 @@ async function applyScheduleUpdate( return updatedRows.length > 0 } catch (error) { - logger.error(`[${requestId}] ${context}`, error) + logger.error(`[${requestId}] ${context}`, error, { cause: describeError(error) }) throw error } } @@ -530,7 +531,13 @@ async function runWorkflowExecution({ } } - logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error) + logger.error( + `[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, + error, + { + cause: describeError(error), + } + ) if (wasExecutionFinalizedByCore(error, executionId)) { throw error @@ -950,7 +957,9 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { return } - logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error) + logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error, { + cause: describeError(error), + }) await releaseClaim( now, `Failed to release schedule ${payload.scheduleId} after unhandled error` From 332223baf5f0d3239acd3ca9c7f2b2eba81e7640 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 3 Jun 2026 14:51:11 -0700 Subject: [PATCH 6/6] refactor(errors): move describeError to @sim/utils/errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `describeError` is a general-purpose error/cause-chain helper — it didn't belong in `lib/core/errors/retryable-infrastructure.ts` (that module is specifically about classifying retryable infra errors, and the name read wrong for a generic diagnostic). Moved it to `@sim/utils/errors` alongside `toError`/ `getErrorMessage`/`getPostgresErrorCode`, with its own cycle-safe cause walk. - Added describeError + DescribedError + tests to packages/utils/src/errors.ts. - Reverted the describeError addition from retryable-infrastructure.ts (it keeps only isRetryableInfrastructureError / describeRetryableInfrastructureError, which are accurately named and still used by the schedule retry path). - Re-pointed all consumers (cell, logging-session, pause-persistence, schedule) to import describeError from @sim/utils/errors. The `retryable` classification flag still sources from isRetryableInfrastructureError where used. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/sim/background/schedule-execution.ts | 3 +- .../background/workflow-column-execution.ts | 7 +- .../errors/retryable-infrastructure.test.ts | 64 ------------------- .../core/errors/retryable-infrastructure.ts | 46 ------------- .../sim/lib/logs/execution/logging-session.ts | 7 +- .../workflows/executor/pause-persistence.ts | 7 +- packages/utils/src/errors.test.ts | 53 ++++++++++++++- packages/utils/src/errors.ts | 54 ++++++++++++++++ 8 files changed, 113 insertions(+), 128 deletions(-) delete mode 100644 apps/sim/lib/core/errors/retryable-infrastructure.test.ts diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 75c9a3ccf3d..a786cefd14e 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -7,7 +7,7 @@ import { workflowSchedule, } from '@sim/db' import { createLogger, runWithRequestContext } from '@sim/logger' -import { toError } from '@sim/utils/errors' +import { describeError, toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { backoffWithJitter } from '@sim/utils/retry' import { task } from '@trigger.dev/sdk' @@ -16,7 +16,6 @@ import { and, eq, isNull, type SQL, sql } from 'drizzle-orm' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { - describeError, describeRetryableInfrastructureError, isRetryableInfrastructureError, } from '@/lib/core/errors/retryable-infrastructure' diff --git a/apps/sim/background/workflow-column-execution.ts b/apps/sim/background/workflow-column-execution.ts index 8708e034a8f..9f617cd5144 100644 --- a/apps/sim/background/workflow-column-execution.ts +++ b/apps/sim/background/workflow-column-execution.ts @@ -1,16 +1,13 @@ import { db } from '@sim/db' import { workflow as workflowTable } from '@sim/db/schema' import { createLogger, runWithRequestContext } from '@sim/logger' -import { toError } from '@sim/utils/errors' +import { describeError, toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { generateId } from '@sim/utils/id' import { backoffWithJitter } from '@sim/utils/retry' import { task } from '@trigger.dev/sdk' import { eq } from 'drizzle-orm' -import { - describeError, - isRetryableInfrastructureError, -} from '@/lib/core/errors/retryable-infrastructure' +import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import { createTimeoutAbortController } from '@/lib/core/execution-limits' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' import { preprocessExecution } from '@/lib/execution/preprocessing' diff --git a/apps/sim/lib/core/errors/retryable-infrastructure.test.ts b/apps/sim/lib/core/errors/retryable-infrastructure.test.ts deleted file mode 100644 index f1e4b482b13..00000000000 --- a/apps/sim/lib/core/errors/retryable-infrastructure.test.ts +++ /dev/null @@ -1,64 +0,0 @@ -/** - * @vitest-environment node - */ -import { describe, expect, it } from 'vitest' -import { - describeError, - isRetryableInfrastructureError, -} from '@/lib/core/errors/retryable-infrastructure' - -describe('describeError', () => { - it('reports name and message for a plain error, omitting causeChain', () => { - const described = describeError(new Error('boom')) - expect(described).toEqual({ name: 'Error', message: 'boom' }) - expect(described.causeChain).toBeUndefined() - }) - - it('surfaces the deepest cause for a wrapped driver error', () => { - const driver = Object.assign(new Error('read ECONNRESET'), { - code: 'ECONNRESET', - errno: 'ECONNRESET', - syscall: 'read', - }) - const wrapped = new Error('Failed query: select ...', { cause: driver }) - - const described = describeError(wrapped) - expect(described.name).toBe('Error') - expect(described.message).toBe('read ECONNRESET') - expect(described.code).toBe('ECONNRESET') - expect(described.errno).toBe('ECONNRESET') - expect(described.syscall).toBe('read') - expect(described.causeChain).toEqual([ - 'Error: Failed query: select ...', - 'Error: read ECONNRESET', - ]) - }) - - it('always returns the cause for unclassified errors (AbortError)', () => { - const aborted = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }) - const described = describeError(aborted) - - expect(described.name).toBe('AbortError') - expect(described.message).toBe('The operation was aborted') - // The retryable classifier skips it entirely — describeError still surfaces it. - expect(isRetryableInfrastructureError(aborted)).toBe(false) - }) - - it('falls back to a populated description for non-Error input without throwing', () => { - expect(describeError('just a string')).toEqual({ name: 'Error', message: 'just a string' }) - expect(() => describeError({ weird: true })).not.toThrow() - }) - - it('stops walking the cause chain at depth 10 and does not loop on a cycle', () => { - const a = new Error('a') - const b = new Error('b') - ;(a as Error & { cause?: unknown }).cause = b - ;(b as Error & { cause?: unknown }).cause = a - - let described: ReturnType | undefined - expect(() => { - described = describeError(a) - }).not.toThrow() - expect(described?.causeChain?.length).toBeLessThanOrEqual(10) - }) -}) diff --git a/apps/sim/lib/core/errors/retryable-infrastructure.ts b/apps/sim/lib/core/errors/retryable-infrastructure.ts index c686c9cdac5..4eed69c6425 100644 --- a/apps/sim/lib/core/errors/retryable-infrastructure.ts +++ b/apps/sim/lib/core/errors/retryable-infrastructure.ts @@ -1,5 +1,3 @@ -import { toError } from '@sim/utils/errors' - const RETRYABLE_DB_ERROR_CODES = new Set([ '08000', '08001', @@ -78,47 +76,3 @@ export function describeRetryableInfrastructureError( export function isRetryableInfrastructureError(error: unknown): boolean { return Boolean(describeRetryableInfrastructureError(error)) } - -export interface DescribedError { - name: string - message: string - code?: string - errno?: string - syscall?: string - /** `"Name: message"` per link in the `.cause` chain, outermost first. Present only when the chain has more than one link. */ - causeChain?: string[] -} - -/** - * Always-on diagnostic view of an error and its `.cause` chain. - * - * Unlike {@link describeRetryableInfrastructureError} — which returns - * `undefined` for errors outside its retryable allowlist — this returns the - * underlying cause for ANY error, including `AbortError` and otherwise - * unclassified causes. Reports the fields of the DEEPEST `.cause` link, because - * a wrapped driver error (e.g. Drizzle's `"Failed query: ..."` wrapping an - * `ECONNRESET`) carries the real reason there, not on the outer wrapper. - * - * `@sim/logger` does not serialize the non-enumerable `Error.prototype.cause`, - * so callers must pass the result as an explicit structured log field rather - * than relying on the logger to expand a raw error. - */ -export function describeError(error: unknown): DescribedError { - const chain = getErrorChain(error) - if (chain.length === 0) { - const normalized = toError(error) - return { name: normalized.name, message: normalized.message } - } - const deepest = chain[chain.length - 1] - const code = typeof deepest.code === 'string' ? deepest.code : undefined - const errno = typeof deepest.errno === 'string' ? deepest.errno : undefined - const syscall = typeof deepest.syscall === 'string' ? deepest.syscall : undefined - return { - name: deepest.name, - message: deepest.message, - ...(code ? { code } : {}), - ...(errno ? { errno } : {}), - ...(syscall ? { syscall } : {}), - ...(chain.length > 1 ? { causeChain: chain.map((e) => `${e.name}: ${e.message}`) } : {}), - } -} diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index b4d9daa9b40..a0fd011dc7d 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -1,12 +1,9 @@ import { db } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { toError } from '@sim/utils/errors' +import { describeError, toError } from '@sim/utils/errors' import { and, eq, sql } from 'drizzle-orm' -import { - describeError, - isRetryableInfrastructureError, -} from '@/lib/core/errors/retryable-infrastructure' +import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import { executionLogger } from '@/lib/logs/execution/logger' import { calculateCostSummary, diff --git a/apps/sim/lib/workflows/executor/pause-persistence.ts b/apps/sim/lib/workflows/executor/pause-persistence.ts index 10a840539ba..954329ad4fa 100644 --- a/apps/sim/lib/workflows/executor/pause-persistence.ts +++ b/apps/sim/lib/workflows/executor/pause-persistence.ts @@ -1,9 +1,6 @@ import { createLogger } from '@sim/logger' -import { toError } from '@sim/utils/errors' -import { - describeError, - isRetryableInfrastructureError, -} from '@/lib/core/errors/retryable-infrastructure' +import { describeError, toError } from '@sim/utils/errors' +import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import type { LoggingSession } from '@/lib/logs/execution/logging-session' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import type { ExecutionResult } from '@/executor/types' diff --git a/packages/utils/src/errors.test.ts b/packages/utils/src/errors.test.ts index fa11dc191f9..272c85e53a4 100644 --- a/packages/utils/src/errors.test.ts +++ b/packages/utils/src/errors.test.ts @@ -2,7 +2,7 @@ * @vitest-environment node */ import { describe, expect, it } from 'vitest' -import { getPostgresErrorCode, toError } from './errors.js' +import { describeError, getPostgresErrorCode, toError } from './errors.js' describe('toError', () => { it('returns the same Error when given an Error', () => { @@ -76,3 +76,54 @@ describe('getPostgresErrorCode', () => { expect(getPostgresErrorCode(err1)).toBeUndefined() }) }) + +describe('describeError', () => { + it('reports name and message for a plain error, omitting causeChain', () => { + const described = describeError(new Error('boom')) + expect(described).toEqual({ name: 'Error', message: 'boom' }) + expect(described.causeChain).toBeUndefined() + }) + + it('surfaces the deepest cause for a wrapped driver error', () => { + const driver = Object.assign(new Error('read ECONNRESET'), { + code: 'ECONNRESET', + errno: 'ECONNRESET', + syscall: 'read', + }) + const wrapped = new Error('Failed query: select ...', { cause: driver }) + const described = describeError(wrapped) + expect(described.message).toBe('read ECONNRESET') + expect(described.code).toBe('ECONNRESET') + expect(described.errno).toBe('ECONNRESET') + expect(described.syscall).toBe('read') + expect(described.causeChain).toEqual([ + 'Error: Failed query: select ...', + 'Error: read ECONNRESET', + ]) + }) + + it('always returns the cause for unclassified errors (AbortError)', () => { + const aborted = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }) + expect(describeError(aborted)).toEqual({ + name: 'AbortError', + message: 'The operation was aborted', + }) + }) + + it('falls back to a populated description for non-Error input without throwing', () => { + expect(describeError('just a string')).toEqual({ name: 'Error', message: 'just a string' }) + expect(() => describeError({ weird: true })).not.toThrow() + }) + + it('stops at depth 10 and does not loop on a cyclic cause', () => { + const a = new Error('a') + const b = new Error('b') + ;(a as { cause?: unknown }).cause = b + ;(b as { cause?: unknown }).cause = a + let described: ReturnType | undefined + expect(() => { + described = describeError(a) + }).not.toThrow() + expect(described?.causeChain?.length).toBeLessThanOrEqual(10) + }) +}) diff --git a/packages/utils/src/errors.ts b/packages/utils/src/errors.ts index 48fcee083c3..dc21d57b995 100644 --- a/packages/utils/src/errors.ts +++ b/packages/utils/src/errors.ts @@ -39,6 +39,60 @@ export function getPostgresConstraintName(error: unknown): string | undefined { return readPgErrorField(error, 'constraint_name') ?? readPgErrorField(error, 'constraint') } +export interface DescribedError { + name: string + message: string + code?: string + errno?: string + syscall?: string + /** `"Name: message"` per link in the `.cause` chain, outermost first. Present only when the chain has more than one link. */ + causeChain?: string[] +} + +/** + * Always-on diagnostic view of an error and its `.cause` chain. + * + * Reports the fields of the DEEPEST `.cause` link, because a wrapped driver + * error (e.g. Drizzle's `"Failed query: ..."` wrapping an `ECONNRESET`) carries + * the real reason there, not on the outer wrapper. Always returns a populated + * object — including for non-`Error` throws and unclassified errors like + * `AbortError`. Cycle-safe and depth-bounded. + * + * Loggers do not serialize the non-enumerable `Error.prototype.cause`, so pass + * the result as an explicit structured field rather than the raw error. + */ +export function describeError(error: unknown): DescribedError { + const chain: Error[] = [] + const seen = new Set() + let current: unknown = error + while (current instanceof Error && !seen.has(current) && chain.length < 10) { + seen.add(current) + chain.push(current) + current = current.cause + } + + if (chain.length === 0) { + const normalized = toError(error) + return { name: normalized.name, message: normalized.message } + } + + const deepest = chain[chain.length - 1] as Error & Record + const asString = (value: unknown): string | undefined => + typeof value === 'string' ? value : undefined + const code = asString(deepest.code) + const errno = asString(deepest.errno) + const syscall = asString(deepest.syscall) + + return { + name: deepest.name, + message: deepest.message, + ...(code ? { code } : {}), + ...(errno ? { errno } : {}), + ...(syscall ? { syscall } : {}), + ...(chain.length > 1 ? { causeChain: chain.map((e) => `${e.name}: ${e.message}`) } : {}), + } +} + function readPgErrorField(error: unknown, field: string): string | undefined { const seen = new Set() let current: unknown = error