From 88d27810e519c6288bfda7010354d122faf91c21 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 4 Jul 2026 17:51:49 +0000 Subject: [PATCH] feat(automation): run-history retention + durable single-run detail (#2585) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Item 1 — retention for sys_automation_run terminal history (closes the unbounded-growth risk #2581 introduced, ADR-0057 posture): - write-time per-flow cap in recordTerminal (default 100 newest terminal runs per flow; runHistoryMaxPerFlow, 0 disables) - default-on periodic age sweep pruning terminal rows older than 30 days (runHistoryRetentionDays / runHistorySweepMs), mirroring the service-messaging notification retention pattern - suspended (paused) rows are live resumable state and are never pruned - new {status, created_at} index for the sweep Item 3 — durable single-run detail: - SuspendedRunStore.loadTerminal(runId); AutomationEngine.getRun falls back to the durable history row after a restart / ring-buffer eviction - terminal rows persist a bounded step log (steps_json: newest 200 steps, stacks stripped, 64 KB byte cap) plus finished_at and the last node reached, so "which node blew up" survives a restart Co-Authored-By: Claude Fable 5 Claude-Session: https://claude.ai/code/session_01VfAct34NhDJCJWrF6zhN1N --- .../automation-run-history-retention.md | 13 ++ .../services/service-automation/src/engine.ts | 62 +++++- .../services/service-automation/src/index.ts | 7 +- .../services/service-automation/src/plugin.ts | 68 +++++- .../src/run-history.test.ts | 50 +++++ .../src/suspended-run-store.test.ts | 136 +++++++++++- .../src/suspended-run-store.ts | 193 +++++++++++++++++- .../src/sys-automation-run.object.ts | 17 +- 8 files changed, 525 insertions(+), 21 deletions(-) create mode 100644 .changeset/automation-run-history-retention.md diff --git a/.changeset/automation-run-history-retention.md b/.changeset/automation-run-history-retention.md new file mode 100644 index 0000000000..b8fb7bbffd --- /dev/null +++ b/.changeset/automation-run-history-retention.md @@ -0,0 +1,13 @@ +--- +'@objectstack/service-automation': minor +--- + +Automation run observability follow-ups (#2585): retention for `sys_automation_run` run history, and durable single-run detail. + +**Retention (closes the unbounded-growth risk #2581 introduced).** Terminal run-history rows are now bounded by default, ADR-0057 posture: + +- A write-time per-flow cap keeps the newest 100 terminal runs per flow (`runHistoryMaxPerFlow`, `0` disables). +- A default-on periodic sweep deletes terminal rows older than 30 days (`runHistoryRetentionDays`, `0` disables; `runHistorySweepMs` tunes the interval, default 1 h). +- Suspended (`paused`) rows are live resumable state and are never pruned. + +**Durable single-run detail.** `AutomationEngine.getRun(runId)` now falls back to the durable history row when the run is no longer in the in-memory buffer (e.g. after a restart), and terminal rows persist a bounded per-node step log (`steps_json`: newest 200 steps, stacks stripped, 64 KB cap) — so "open a past failed run and see which node blew up" survives a restart. New `SuspendedRunStore.loadTerminal(runId)` backs this; `RunRecord` gains `finishedAt` and `steps`. diff --git a/packages/services/service-automation/src/engine.ts b/packages/services/service-automation/src/engine.ts index 5d7e52eaa6..a51ac72f83 100644 --- a/packages/services/service-automation/src/engine.ts +++ b/packages/services/service-automation/src/engine.ts @@ -242,6 +242,14 @@ export interface ConnectorDescriptor { */ export const DEFAULT_MAX_EXECUTION_LOG_SIZE = 1000; +/** + * Max steps persisted per terminal run-history row (#2585). The tail of the + * step log is kept — the last steps carry the failure — so durable single-run + * detail stays meaningful without letting a pathological loop-heavy run write + * an unbounded `steps_json` column. + */ +export const MAX_PERSISTED_HISTORY_STEPS = 200; + /** Construction options for {@link AutomationEngine}. */ export interface AutomationEngineOptions { /** @@ -364,12 +372,20 @@ export interface RunRecord { status: 'completed' | 'failed'; startedAt: string; startTime?: number; + /** When the run reached its terminal state. */ + finishedAt?: string; durationMs?: number; /** Failure reason for a `failed` run — what a designer needs to fix it. */ error?: string; nodeId?: string; organizationId?: string | null; userId?: string | null; + /** + * Bounded per-node step log (see {@link AutomationEngine.compactStepsForHistory}), + * so "which node blew up?" survives a restart. Optional — history rows + * written before this field existed have none. + */ + steps?: StepLogEntry[]; } export interface SuspendedRunStore { @@ -391,6 +407,12 @@ export interface SuspendedRunStore { recordTerminal?(record: RunRecord): Promise; /** Newest terminal run-history records for a flow (for the Runs tab). */ listHistory?(flowName: string, limit: number): Promise; + /** + * Load one terminal run-history record by its raw `runId`, or `null` when + * none is stored. Backs {@link AutomationEngine.getRun}'s durable fallback + * so "open a past failed run" works after a restart. + */ + loadTerminal?(runId: string): Promise; } export class AutomationEngine implements IAutomationService { @@ -997,7 +1019,8 @@ export class AutomationEngine implements IAutomationService { } /** Rehydrate a durable {@link RunRecord} into an {@link ExecutionLogEntry} - * for the Runs list. Step-level detail isn't persisted in the summary. */ + * for the Runs surfaces. Steps carry the bounded persisted step log (rows + * written before step persistence have none). */ private runRecordToLogEntry(r: RunRecord): ExecutionLogEntry { return { id: r.runId, @@ -1005,15 +1028,31 @@ export class AutomationEngine implements IAutomationService { flowVersion: r.flowVersion, status: r.status, // 'completed' | 'failed' — both valid ExecutionLog statuses startedAt: r.startedAt, + completedAt: r.finishedAt, durationMs: r.durationMs, trigger: { type: '', userId: r.userId ?? undefined }, - steps: [], + steps: r.steps ?? [], error: r.error, }; } async getRun(runId: string): Promise { - return this.executionLogs.find(l => l.id === runId) ?? null; + const inMem = this.executionLogs.find(l => l.id === runId); + if (inMem) return inMem; + // Durable fallback: after a restart (or ring-buffer eviction) the run's + // terminal history row still answers "what happened, at which node?". + // Best-effort — a store failure degrades to "not found", never throws. + if (this.store?.loadTerminal) { + try { + const rec = await this.store.loadTerminal(runId); + if (rec) return this.runRecordToLogEntry(rec); + } catch (err) { + this.logger.warn( + `[Automation] durable run lookup failed for '${runId}': ${(err as Error)?.message}`, + ); + } + } + return null; } /** @@ -1691,15 +1730,19 @@ export class AutomationEngine implements IAutomationService { entry.status === 'cancelled' || entry.status === 'timed_out'; if (terminal && this.store?.recordTerminal) { + const lastStep = entry.steps[entry.steps.length - 1]; const record: RunRecord = { runId: entry.id, flowName: entry.flowName, flowVersion: entry.flowVersion, status: entry.status === 'completed' ? 'completed' : 'failed', startedAt: entry.startedAt, + finishedAt: entry.completedAt, durationMs: entry.durationMs, error: entry.error, userId: entry.trigger?.userId, + nodeId: lastStep?.nodeId, + steps: this.compactStepsForHistory(entry.steps), }; void this.store.recordTerminal(record).catch((err) => { this.logger.warn( @@ -1709,6 +1752,19 @@ export class AutomationEngine implements IAutomationService { } } + /** + * Compact a run's step log for durable history: keep the newest + * {@link MAX_PERSISTED_HISTORY_STEPS} steps (the tail carries the failure) + * and drop `error.stack` (the code/message pair is the designer-facing + * "why"; stacks bloat rows without aiding the Runs surface). Bounds the + * `steps_json` column so history rows stay cheap under retention (#2585). + */ + private compactStepsForHistory(steps: StepLogEntry[]): StepLogEntry[] { + return steps.slice(-MAX_PERSISTED_HISTORY_STEPS).map((s) => + s.error?.stack ? { ...s, error: { code: s.error.code, message: s.error.message } } : s, + ); + } + /** * Validate each node's `type` against the live action registry (ADR-0018). * A type is known if it is structural (start/end), has a registered diff --git a/packages/services/service-automation/src/index.ts b/packages/services/service-automation/src/index.ts index e83c9adc42..5322afdedc 100644 --- a/packages/services/service-automation/src/index.ts +++ b/packages/services/service-automation/src/index.ts @@ -1,7 +1,7 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. // Core engine -export { AutomationEngine, DEFAULT_MAX_EXECUTION_LOG_SIZE } from './engine.js'; +export { AutomationEngine, DEFAULT_MAX_EXECUTION_LOG_SIZE, MAX_PERSISTED_HISTORY_STEPS } from './engine.js'; export type { AutomationEngineOptions, NodeExecutor, @@ -15,6 +15,7 @@ export type { ConnectorActionDescriptor, SuspendedRun, SuspendedRunStore, + RunRecord, StepLogEntry, } from './engine.js'; @@ -23,8 +24,10 @@ export type { export { InMemorySuspendedRunStore, ObjectStoreSuspendedRunStore, + DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW, + DEFAULT_RUN_HISTORY_RETENTION_DAYS, } from './suspended-run-store.js'; -export type { SuspendedRunStoreEngine } from './suspended-run-store.js'; +export type { SuspendedRunStoreEngine, ObjectStoreSuspendedRunStoreOptions } from './suspended-run-store.js'; // The sys_automation_run object backing the durable store — registered by // AutomationServicePlugin and exported for hosts wiring a custom store. diff --git a/packages/services/service-automation/src/plugin.ts b/packages/services/service-automation/src/plugin.ts index bb8c9ae007..a0e83bc64c 100644 --- a/packages/services/service-automation/src/plugin.ts +++ b/packages/services/service-automation/src/plugin.ts @@ -5,7 +5,12 @@ import type { IJobService } from '@objectstack/spec/contracts'; import { AutomationEngine } from './engine.js'; import { installBuiltinNodes, rearmSuspendedWaitTimers } from './builtin/index.js'; import { SysAutomationRun } from './sys-automation-run.object.js'; -import { ObjectStoreSuspendedRunStore, type SuspendedRunStoreEngine } from './suspended-run-store.js'; +import { + ObjectStoreSuspendedRunStore, + DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW, + DEFAULT_RUN_HISTORY_RETENTION_DAYS, + type SuspendedRunStoreEngine, +} from './suspended-run-store.js'; /** * Configuration options for the AutomationServicePlugin. @@ -31,6 +36,26 @@ export interface AutomationServicePluginOptions { * to {@link DEFAULT_MAX_EXECUTION_LOG_SIZE} (1000). */ maxLogSize?: number; + /** + * Retention window in days for durable terminal run history in + * `sys_automation_run` (#2585; ADR-0057 posture — platform self-telemetry + * must be bounded). When > 0, a periodic sweep deletes terminal + * (completed / failed) history rows older than the window; suspended + * (`paused`) rows are live resumable state and are never pruned. + * **Default-on** at {@link DEFAULT_RUN_HISTORY_RETENTION_DAYS} (30). Set + * to `0` to disable age pruning (history kept until the per-flow cap). + */ + runHistoryRetentionDays?: number; + /** + * Per-flow cap on terminal run-history rows, enforced at write time (the + * "or 100 runs/flow, whichever first" half of the #2585 retention + * contract). Defaults to {@link DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW} (100); + * `0` disables the cap. + */ + runHistoryMaxPerFlow?: number; + /** Run-history retention sweep interval in ms (default 1 hour). Only used + * when `runHistoryRetentionDays` > 0. */ + runHistorySweepMs?: number; } /** @@ -72,6 +97,8 @@ export class AutomationServicePlugin implements Plugin { private engine?: AutomationEngine; private readonly options: AutomationServicePluginOptions; + /** Periodic run-history retention sweep (#2585); cleared on destroy. */ + private retentionTimer?: ReturnType; /** * Flow names this plugin has registered into the engine from the * artifact / ObjectQL registry, tracked so a `metadata:reloaded` re-sync @@ -155,7 +182,10 @@ export class AutomationServicePlugin implements Plugin { try { dataEngine = ctx.getService('objectql'); } catch { try { dataEngine = ctx.getService('data'); } catch { /* none */ } } if (dataEngine && typeof dataEngine.find === 'function' && typeof dataEngine.insert === 'function') { - durableStore = new ObjectStoreSuspendedRunStore(dataEngine, ctx.logger); + durableStore = new ObjectStoreSuspendedRunStore(dataEngine, ctx.logger, { + maxTerminalRunsPerFlow: + this.options.runHistoryMaxPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW, + }); this.engine.setSuspendedRunStore(durableStore); ctx.logger.info('[Automation] Suspended-run persistence enabled (sys_automation_run)'); } else { @@ -163,6 +193,36 @@ export class AutomationServicePlugin implements Plugin { } } + // Run-history age retention (#2585, ADR-0057 posture): default-on sweep + // so `sys_automation_run` terminal history can't grow without bound. + // Runs once at kernel:ready then on a low-frequency interval; the timer + // is unref'd so it never keeps the process alive. Mirrors the + // service-messaging notification retention sweep. + const retentionDays = this.options.runHistoryRetentionDays ?? DEFAULT_RUN_HISTORY_RETENTION_DAYS; + if (durableStore && retentionDays > 0 && typeof ctx.hook === 'function') { + const store = durableStore; + const sweepMs = this.options.runHistorySweepMs ?? 3_600_000; + ctx.hook('kernel:ready', async () => { + const sweep = () => { + void store.pruneHistory(retentionDays).then((deleted) => { + if (deleted === undefined || deleted > 0) { + ctx.logger.info( + `[Automation] run-history retention: pruned ${deleted ?? '?'} terminal run(s) older than ${retentionDays}d`, + ); + } + }).catch((err) => + ctx.logger.warn(`[Automation] run-history retention sweep failed: ${(err as Error)?.message ?? err}`), + ); + }; + sweep(); + this.retentionTimer = setInterval(sweep, sweepMs); + this.retentionTimer.unref?.(); + ctx.logger.info( + `[Automation] run-history retention on (terminal runs > ${retentionDays}d pruned every ${Math.round(sweepMs / 1000)}s; cap ${this.options.runHistoryMaxPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW}/flow at write)`, + ); + }); + } + // #1870 — bridge `script`-node function calls to the host function // registry. ObjectQL holds the name→handler map populated from // `bundle.functions` / `defineStack({ functions })` (the same registry @@ -422,6 +482,10 @@ export class AutomationServicePlugin implements Plugin { } async destroy(): Promise { + if (this.retentionTimer) { + clearInterval(this.retentionTimer); + this.retentionTimer = undefined; + } this.engine = undefined; } } diff --git a/packages/services/service-automation/src/run-history.test.ts b/packages/services/service-automation/src/run-history.test.ts index 5959335aeb..3331d95510 100644 --- a/packages/services/service-automation/src/run-history.test.ts +++ b/packages/services/service-automation/src/run-history.test.ts @@ -86,6 +86,56 @@ describe('automation run history (durable observability)', () => { expect(runs[0].id).toBeTruthy(); }); + it('getRun falls back to durable history after a restart, WITH step detail (#2585)', async () => { + const store = new InMemorySuspendedRunStore(); + const engineA = new AutomationEngine(silent, store); + engineA.registerNodeExecutor({ + type: 'boom', + async execute() { throw new Error('kaboom'); }, + } as never); + engineA.registerFlow('bad_flow', failingFlow('bad_flow') as never); + await engineA.execute('bad_flow', { event: 'test' } as AutomationContext); + await flush(); + + // Simulate a restart: a fresh engine with empty in-memory logs. Before + // #2585 its getRun returned null even though the history row existed. + const engineB = new AutomationEngine(silent, store); + const [listed] = await engineB.listRuns('bad_flow', { limit: 1 }); + const run = await engineB.getRun(listed.id); + expect(run).not.toBeNull(); + expect(run!.status).toBe('failed'); + expect(run!.error ?? '').toMatch(/kaboom/); + // Durable single-run detail: the persisted step log names the node that + // blew up (stacks are stripped — code/message is the designer-facing why). + const failedStep = run!.steps.find((s) => s.status === 'failure'); + expect(failedStep?.nodeId).toBe('boom'); + expect(failedStep?.error?.message).toMatch(/kaboom/); + expect(failedStep?.error?.stack).toBeUndefined(); + }); + + it('getRun returns null for an unknown run id even with a store attached', async () => { + const engine = new AutomationEngine(silent, new InMemorySuspendedRunStore()); + expect(await engine.getRun('run_nope')).toBeNull(); + }); + + it('caps terminal history per flow (retention stop-gap, #2585)', async () => { + const store = new InMemorySuspendedRunStore({ maxTerminalRunsPerFlow: 2 }); + const engine = new AutomationEngine(silent, store); + engine.registerFlow('busy', trivialFlow('busy') as never); + engine.registerFlow('other', trivialFlow('other') as never); + + for (let i = 0; i < 5; i++) { + await engine.execute('busy', { event: 'test' } as AutomationContext); + } + await engine.execute('other', { event: 'test' } as AutomationContext); + await flush(); + + // Only the newest 2 'busy' runs survive; the cap is per flow, so the + // single 'other' run is untouched. + expect(await store.listHistory('busy', 10)).toHaveLength(2); + expect(await store.listHistory('other', 10)).toHaveLength(1); + }); + it('a failing history store never breaks the run (best-effort isolation)', async () => { const store = new InMemorySuspendedRunStore(); // Override recordTerminal to reject — the run must still complete. diff --git a/packages/services/service-automation/src/suspended-run-store.test.ts b/packages/services/service-automation/src/suspended-run-store.test.ts index dd59d9d910..1518f49ce7 100644 --- a/packages/services/service-automation/src/suspended-run-store.test.ts +++ b/packages/services/service-automation/src/suspended-run-store.test.ts @@ -3,7 +3,7 @@ import { describe, it, expect } from 'vitest'; import { AutomationEngine } from './engine.js'; import { ObjectStoreSuspendedRunStore, type SuspendedRunStoreEngine } from './suspended-run-store.js'; -import type { SuspendedRun } from './engine.js'; +import type { RunRecord, SuspendedRun } from './engine.js'; function createTestLogger() { return { info: () => {}, warn: () => {}, error: () => {}, debug: () => {}, child: () => createTestLogger() } as any; @@ -17,8 +17,13 @@ function createTestLogger() { */ function createFakeEngine(): SuspendedRunStoreEngine & { rows: Map } { const rows = new Map(); + // Equality plus the `$lt` operator the retention sweep uses. const matches = (row: any, where: any) => - !where || Object.entries(where).every(([k, v]) => row[k] === v); + !where || Object.entries(where).every(([k, v]) => + v && typeof v === 'object' && '$lt' in (v as any) + ? row[k] < (v as any).$lt + : row[k] === v, + ); return { rows, async find(_object, options) { @@ -37,6 +42,11 @@ function createFakeEngine(): SuspendedRunStoreEngine & { rows: Map return rows.get(String(id)); }, async delete(_object, options) { + if (options?.multi) { + const doomed = [...rows.values()].filter(r => matches(r, options?.where)); + for (const r of doomed) rows.delete(String(r.id)); + return doomed.length; + } const id = options?.where?.id; rows.delete(String(id)); return true; @@ -153,3 +163,125 @@ describe('ObjectStoreSuspendedRunStore', () => { expect(finalRows[0].status).toBe('completed'); }); }); + +const terminalRecord = (n: number, overrides: Partial = {}): RunRecord => ({ + runId: `r${n}`, + flowName: 'busy_flow', + status: 'completed', + startedAt: `2026-01-01T00:00:${String(n).padStart(2, '0')}.000Z`, + durationMs: 5, + steps: [{ nodeId: 'start', nodeType: 'start', status: 'success', startedAt: '2026-01-01T00:00:00.000Z' }], + ...overrides, +}); + +describe('ObjectStoreSuspendedRunStore — run-history retention + durable detail (#2585)', () => { + it('persists terminal steps and round-trips them through loadTerminal', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + const record = terminalRecord(1, { + status: 'failed', + error: 'kaboom', + finishedAt: '2026-01-01T00:01:00.000Z', + nodeId: 'boom', + steps: [ + { nodeId: 'start', nodeType: 'start', status: 'success', startedAt: '2026-01-01T00:00:01.000Z' }, + { nodeId: 'boom', nodeType: 'script', status: 'failure', startedAt: '2026-01-01T00:00:02.000Z', error: { code: 'X', message: 'kaboom' } }, + ], + }); + await store.recordTerminal(record); + + const row = engine.rows.get('run_r1'); + expect(typeof row.steps_json).toBe('string'); + expect(row.finished_at).toBe('2026-01-01T00:01:00.000Z'); + + const loaded = await store.loadTerminal('r1'); + expect(loaded).not.toBeNull(); + expect(loaded!.status).toBe('failed'); + expect(loaded!.error).toBe('kaboom'); + expect(loaded!.finishedAt).toBe('2026-01-01T00:01:00.000Z'); + expect(loaded!.steps).toEqual(record.steps); + }); + + it('loadTerminal returns null for unknown ids and never matches a paused row', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + await store.save(baseRun()); // paused row, id 'run_abc' + expect(await store.loadTerminal('nope')).toBeNull(); + // A raw runId of 'abc' would look up history id 'run_abc' — the paused + // row's id. It must NOT be served as terminal history. + expect(await store.loadTerminal('abc')).toBeNull(); + }); + + it('caps terminal history per flow at write time, leaving other flows and paused rows alone', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger(), { maxTerminalRunsPerFlow: 3 }); + await store.save(baseRun()); // a live pause must survive every prune + for (let n = 1; n <= 5; n++) await store.recordTerminal(terminalRecord(n)); + await store.recordTerminal(terminalRecord(9, { runId: 'other1', flowName: 'other_flow' })); + + const busy = await store.listHistory('busy_flow', 10); + expect(busy.map((r) => r.runId)).toEqual(['r5', 'r4', 'r3']); // newest 3 kept + expect(await store.listHistory('other_flow', 10)).toHaveLength(1); + expect(await store.load('run_abc')).not.toBeNull(); // pause untouched + }); + + it('a re-emitted terminal (upsert) does not trigger the overflow prune path', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger(), { maxTerminalRunsPerFlow: 3 }); + for (let n = 1; n <= 3; n++) await store.recordTerminal(terminalRecord(n)); + await store.recordTerminal(terminalRecord(2)); // update in place + expect(await store.listHistory('busy_flow', 10)).toHaveLength(3); + }); + + it('pruneHistory deletes terminal rows past the age window but never paused rows', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + const now = Date.parse('2026-03-01T00:00:00.000Z'); + // Backdate created_at: two ancient terminals (one failed), one fresh. + await store.recordTerminal(terminalRecord(1)); + await store.recordTerminal(terminalRecord(2, { status: 'failed', error: 'x' })); + await store.recordTerminal(terminalRecord(3)); + engine.rows.get('run_r1').created_at = '2026-01-01T00:00:00.000Z'; + engine.rows.get('run_r2').created_at = '2026-01-02T00:00:00.000Z'; + engine.rows.get('run_r3').created_at = '2026-02-28T00:00:00.000Z'; + // An old suspended run — live resumable state, exempt from retention. + await store.save(baseRun()); + engine.rows.get('run_abc').created_at = '2025-06-01T00:00:00.000Z'; + + const deleted = await store.pruneHistory(30, now); + expect(deleted).toBe(2); + expect(await store.loadTerminal('r1')).toBeNull(); + expect(await store.loadTerminal('r2')).toBeNull(); + expect(await store.loadTerminal('r3')).not.toBeNull(); + expect(await store.load('run_abc')).not.toBeNull(); + }); + + it('pruneHistory is a no-op for a non-positive window', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + await store.recordTerminal(terminalRecord(1)); + engine.rows.get('run_r1').created_at = '2020-01-01T00:00:00.000Z'; + expect(await store.pruneHistory(0)).toBe(0); + expect(await store.pruneHistory(-5)).toBe(0); + expect(await store.loadTerminal('r1')).not.toBeNull(); + }); + + it('bounds steps_json bytes by dropping the oldest steps first', async () => { + const engine = createFakeEngine(); + const store = new ObjectStoreSuspendedRunStore(engine, createTestLogger()); + const bigMessage = 'x'.repeat(30 * 1024); + await store.recordTerminal(terminalRecord(1, { + steps: [ + { nodeId: 'old1', nodeType: 'script', status: 'failure', startedAt: 't', error: { code: 'E', message: bigMessage } }, + { nodeId: 'old2', nodeType: 'script', status: 'failure', startedAt: 't', error: { code: 'E', message: bigMessage } }, + { nodeId: 'old3', nodeType: 'script', status: 'failure', startedAt: 't', error: { code: 'E', message: bigMessage } }, + { nodeId: 'last', nodeType: 'script', status: 'failure', startedAt: 't', error: { code: 'E', message: 'the real reason' } }, + ], + })); + const row = engine.rows.get('run_r1'); + expect(row.steps_json.length).toBeLessThanOrEqual(64 * 1024); + const kept = (await store.loadTerminal('r1'))!.steps!; + expect(kept.length).toBeLessThan(4); + expect(kept[kept.length - 1].nodeId).toBe('last'); // the tail survives + }); +}); diff --git a/packages/services/service-automation/src/suspended-run-store.ts b/packages/services/service-automation/src/suspended-run-store.ts index 50a0f5eca6..6fadbfc14b 100644 --- a/packages/services/service-automation/src/suspended-run-store.ts +++ b/packages/services/service-automation/src/suspended-run-store.ts @@ -26,6 +26,39 @@ const TABLE = 'sys_automation_run'; const HISTORY_PREFIX = 'run_'; const SYSTEM_CTX = { isSystem: true, roles: [], permissions: [] } as const; +/** + * Default per-flow cap on terminal run-history rows (#2585 retention stop-gap + * until the ADR-0057 lifecycle sweep covers `sys_automation_run`). A busy + * per-record-change flow otherwise persists one row per execution forever — + * exactly the unbounded self-telemetry growth ADR-0057 exists to prevent. + * 100 newest terminal runs per flow keeps the Runs surface useful while + * bounding the table. `0` disables the cap. + */ +export const DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW = 100; + +/** + * Default age-based retention window for terminal run-history rows, in days. + * Enforced by {@link ObjectStoreSuspendedRunStore.pruneHistory}, swept + * periodically by the service plugin. `0` disables age pruning. Suspended + * (`paused`) rows are live resumable state and are NEVER age-pruned. + */ +export const DEFAULT_RUN_HISTORY_RETENTION_DAYS = 30; + +/** Max deletes one write-time overflow prune may issue — bounds the write + * amplification a single `recordTerminal` can incur on a legacy oversized + * table (the periodic age sweep handles bulk convergence). */ +const OVERFLOW_PRUNE_BATCH = 50; + +/** Byte cap for a terminal row's persisted `steps_json`. When over, the step + * tail is halved until it fits — the newest steps carry the failure. */ +const MAX_STEPS_JSON_BYTES = 64 * 1024; + +const TERMINAL_STATUSES = ['completed', 'failed'] as const; + +function isTerminalStatus(status: unknown): boolean { + return status === 'completed' || status === 'failed'; +} + /** Deep clone via JSON so a stored snapshot can't alias live engine state. */ function jsonClone(value: T): T { return JSON.parse(JSON.stringify(value)) as T; @@ -53,6 +86,12 @@ function parseJson(raw: unknown, fallback: T): T { export class InMemorySuspendedRunStore implements SuspendedRunStore { private readonly runs = new Map(); private readonly history = new Map(); + private readonly maxTerminalRunsPerFlow: number; + + constructor(options?: { maxTerminalRunsPerFlow?: number }) { + this.maxTerminalRunsPerFlow = + options?.maxTerminalRunsPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW; + } async save(run: SuspendedRun): Promise { this.runs.set(run.runId, jsonClone(run)); @@ -73,6 +112,16 @@ export class InMemorySuspendedRunStore implements SuspendedRunStore { async recordTerminal(record: RunRecord): Promise { this.history.set(record.runId, jsonClone(record)); + // Per-flow cap (#2585 retention): evict the oldest terminal runs beyond + // the cap, mirroring the DB-backed store's write-time prune. + if (this.maxTerminalRunsPerFlow > 0) { + const flowRuns = [...this.history.values()] + .filter((r) => r.flowName === record.flowName) + .sort((a, b) => (b.startedAt ?? '').localeCompare(a.startedAt ?? '')); + for (const evicted of flowRuns.slice(this.maxTerminalRunsPerFlow)) { + this.history.delete(evicted.runId); + } + } } async listHistory(flowName: string, limit: number): Promise { @@ -82,6 +131,11 @@ export class InMemorySuspendedRunStore implements SuspendedRunStore { .slice(0, limit) .map(jsonClone); } + + async loadTerminal(runId: string): Promise { + const record = this.history.get(runId); + return record ? jsonClone(record) : null; + } } /** @@ -101,21 +155,38 @@ interface MinimalLogger { debug?: Logger['debug']; } +/** Tuning knobs for the DB-backed store's run-history retention (#2585). */ +export interface ObjectStoreSuspendedRunStoreOptions { + /** + * Per-flow cap on terminal history rows, enforced at write time in + * {@link ObjectStoreSuspendedRunStore.recordTerminal}. Defaults to + * {@link DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW}; `0` disables the cap. + */ + maxTerminalRunsPerFlow?: number; +} + /** * Durable {@link SuspendedRunStore} backed by the `sys_automation_run` object. * * Persists the resumable run state (`variables` / `steps` / `context` / `screen`) - * JSON-serialized, so the engine's `Map`-based variable context round-trips. The - * row is keyed by `runId` and removed on terminal completion; only live pauses - * are stored. All access uses a system context — these are infrastructure rows, - * not tenant data subject to RLS (the tenant is captured in `organization_id` - * for scoping/observability). + * JSON-serialized, so the engine's `Map`-based variable context round-trips. A + * live pause is keyed by `runId` and removed on terminal completion; terminal + * runs are kept as `run_`-prefixed history rows (bounded by the per-flow cap + * and the age sweep, #2585). All access uses a system context — these are + * infrastructure rows, not tenant data subject to RLS (the tenant is captured + * in `organization_id` for scoping/observability). */ export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { + private readonly maxTerminalRunsPerFlow: number; + constructor( private readonly engine: SuspendedRunStoreEngine, private readonly logger?: MinimalLogger, - ) {} + options?: ObjectStoreSuspendedRunStoreOptions, + ) { + this.maxTerminalRunsPerFlow = + options?.maxTerminalRunsPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW; + } async save(run: SuspendedRun): Promise { const now = new Date().toISOString(); @@ -185,9 +256,10 @@ export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { user_id: record.userId ?? null, started_at: record.startedAt, start_time: record.startTime ?? null, - finished_at: now, + finished_at: record.finishedAt ?? now, duration_ms: record.durationMs ?? null, error: record.error ?? null, + steps_json: serializeStepsBounded(record.steps), }; const existing = await this.engine.find(TABLE, { where: { id }, limit: 1, context: SYSTEM_CTX, @@ -196,9 +268,83 @@ export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { await this.engine.update(TABLE, { ...row, updated_at: now }, { where: { id }, context: SYSTEM_CTX }); } else { await this.engine.insert(TABLE, { ...row, created_at: now, updated_at: now }, { context: SYSTEM_CTX }); + // Write-time retention (#2585): keep only the newest N terminal rows per + // flow. Best-effort — a prune failure never fails the history write. + try { + await this.pruneFlowOverflow(record.flowName); + } catch (err) { + this.logger?.warn?.( + `[automation] run-history overflow prune failed for '${record.flowName}': ${(err as Error)?.message}`, + ); + } + } + } + + /** + * Enforce the per-flow terminal-history cap: fetch the flow's rows, keep the + * newest {@link ObjectStoreSuspendedRunStoreOptions.maxTerminalRunsPerFlow} + * terminal ones, delete the overflow (bounded per call by + * {@link OVERFLOW_PRUNE_BATCH}). Paused rows are live resumable state and are + * never touched. Steady state deletes at most one row per terminal write. + */ + private async pruneFlowOverflow(flowName: string): Promise { + const max = this.maxTerminalRunsPerFlow; + if (!(max > 0) || typeof this.engine.delete !== 'function') return; + const rows = await this.engine.find(TABLE, { + where: { flow_name: flowName }, + limit: max * 2 + OVERFLOW_PRUNE_BATCH, + context: SYSTEM_CTX, + }); + const overflow = (Array.isArray(rows) ? rows : []) + .filter((r) => isTerminalStatus(r?.status)) + .sort((a, b) => String(b.started_at ?? '').localeCompare(String(a.started_at ?? ''))) + .slice(max, max + OVERFLOW_PRUNE_BATCH); + for (const row of overflow) { + await this.engine.delete(TABLE, { where: { id: row.id }, context: SYSTEM_CTX }); + } + if (overflow.length > 0) { + this.logger?.debug?.( + `[automation] run-history cap: pruned ${overflow.length} terminal run(s) of '${flowName}' beyond newest ${max}`, + ); } } + /** + * Age-based retention sweep (#2585, ADR-0057 posture): delete terminal + * history rows older than `retentionDays`. Two equality-filtered bulk + * deletes (one per terminal status) so `paused` rows — live resumable state — + * can never match. Returns the number of rows deleted when the driver + * reports it. No-op for a non-positive window or a delete-less engine. + */ + async pruneHistory(retentionDays: number, now: number = Date.now()): Promise { + if (!(retentionDays > 0) || typeof this.engine.delete !== 'function') return 0; + const cutoffIso = new Date(now - retentionDays * 86_400_000).toISOString(); + let total: number | undefined = 0; + for (const status of TERMINAL_STATUSES) { + // ISO-8601 comparand: `created_at` is a native timestamp column, which + // rejects a bare epoch-ms number on Postgres (see service-messaging's + // NotificationRetention for the prior art this mirrors). + const res = await this.engine.delete(TABLE, { + where: { status, created_at: { $lt: cutoffIso } }, + multi: true, + context: SYSTEM_CTX, + }); + const n = countDeleted(res); + total = n === undefined || total === undefined ? undefined : total + n; + } + return total; + } + + /** Load one terminal history row by raw `runId` (durable `getRun` fallback). */ + async loadTerminal(runId: string): Promise { + const rows = await this.engine.find(TABLE, { + where: { id: HISTORY_PREFIX + runId }, limit: 1, context: SYSTEM_CTX, + }); + const row = Array.isArray(rows) ? rows[0] : null; + if (!row || !isTerminalStatus(row.status)) return null; + return this.deserializeTerminal(row); + } + /** Newest terminal (`completed` / `failed`) run-history rows for one flow. */ async listHistory(flowName: string, limit: number): Promise { // Fetch the flow's rows and filter terminal in memory — avoids depending on @@ -223,11 +369,13 @@ export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { status: row.status === 'failed' ? 'failed' : 'completed', startedAt: row.started_at ?? row.created_at ?? '', startTime: typeof row.start_time === 'number' ? row.start_time : undefined, + finishedAt: row.finished_at ?? undefined, durationMs: typeof row.duration_ms === 'number' ? row.duration_ms : undefined, error: row.error ?? undefined, nodeId: row.node_id ?? undefined, organizationId: row.organization_id ?? null, userId: row.user_id ?? undefined, + steps: parseJson(row.steps_json, undefined), }; } @@ -271,3 +419,34 @@ export class ObjectStoreSuspendedRunStore implements SuspendedRunStore { }; } } + +/** + * JSON-encode a terminal run's step log under the {@link MAX_STEPS_JSON_BYTES} + * cap. The engine already bounds step COUNT (and strips stacks); this bounds + * BYTES — a few huge step errors can still blow up a row. When over, the step + * tail is halved until it fits (the newest steps carry the failure); an empty + * result stores `null`. + */ +function serializeStepsBounded(steps: RunRecord['steps']): string | null { + let tail = steps ?? []; + while (tail.length > 0) { + const json = JSON.stringify(tail); + if (json.length <= MAX_STEPS_JSON_BYTES) return json; + tail = tail.slice(Math.ceil(tail.length / 2)); + } + return null; +} + +/** Best-effort row-count extraction from a driver's delete result (mirrors + * service-messaging's retention sweeper). */ +function countDeleted(res: unknown): number | undefined { + if (typeof res === 'number') return res; + if (Array.isArray(res)) return res.length; + if (res && typeof res === 'object') { + const r = res as Record; + for (const k of ['deletedCount', 'deleted', 'count', 'affected', 'affectedRows']) { + if (typeof r[k] === 'number') return r[k] as number; + } + } + return undefined; +} diff --git a/packages/services/service-automation/src/sys-automation-run.object.ts b/packages/services/service-automation/src/sys-automation-run.object.ts index e4b59a86fc..08aced1afa 100644 --- a/packages/services/service-automation/src/sys-automation-run.object.ts +++ b/packages/services/service-automation/src/sys-automation-run.object.ts @@ -13,10 +13,15 @@ import { ObjectSchema, Field } from '@objectstack/spec/data'; * the engine writes a row on suspend and deletes it on terminal completion, so a * cold-booted kernel can rehydrate and continue. * - * Lifecycle: one row per *currently* suspended run. The row is removed when the - * run resumes to completion or fails — only live pauses are stored. `id` is the - * `runId`; `correlation` ties back to the pausing node's external state (e.g. - * `sys_approval_request.id`, mirrored by `sys_approval_request.flow_run_id`). + * Lifecycle: one row per *currently* suspended run (`status: 'paused'`, id = + * raw `runId`, removed on terminal completion) plus bounded terminal history + * (`status: 'completed' | 'failed'`, id = `run_`-prefixed). History rows are + * subject to retention (#2585, ADR-0057 posture): a write-time per-flow cap + * (default 100) plus a periodic age sweep (default 30 days) — see + * `ObjectStoreSuspendedRunStore` / `AutomationServicePluginOptions`. Paused + * rows are live resumable state and are never pruned. `correlation` ties back + * to the pausing node's external state (e.g. `sys_approval_request.id`, + * mirrored by `sys_approval_request.flow_run_id`). * * The resumable state (`variables` / `steps` / `context` / `screen`) is stored * JSON-serialized — the engine works with a `Map`, which round-trips through @@ -106,7 +111,7 @@ export const SysAutomationRun = ObjectSchema.create({ steps_json: Field.textarea({ label: 'Steps', required: false, - description: 'JSON snapshot of the executed step logs so far.', + description: 'JSON step log: for a paused run, the steps executed so far (resume state); for a terminal history row, the bounded per-node step log (durable run detail, #2585).', group: 'State', }), @@ -171,6 +176,8 @@ export const SysAutomationRun = ObjectSchema.create({ { fields: ['status', 'updated_at'] }, // Run-history reads for the Studio "Runs" tab: newest terminal runs per flow. { fields: ['flow_name', 'started_at'] }, + // Retention age sweep: delete terminal rows older than the window (#2585). + { fields: ['status', 'created_at'] }, // Look up a suspended run by the pausing node's correlation key. { fields: ['correlation'] }, ],