Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/automation-run-history-retention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@objectstack/service-automation': minor
---

Automation run observability follow-ups (#2585): retention for `sys_automation_run` run history, and durable single-run detail.

**Retention (closes the unbounded-growth risk #2581 introduced).** Terminal run-history rows are now bounded by default, ADR-0057 posture:

- A write-time per-flow cap keeps the newest 100 terminal runs per flow (`runHistoryMaxPerFlow`, `0` disables).
- A default-on periodic sweep deletes terminal rows older than 30 days (`runHistoryRetentionDays`, `0` disables; `runHistorySweepMs` tunes the interval, default 1 h).
- Suspended (`paused`) rows are live resumable state and are never pruned.

**Durable single-run detail.** `AutomationEngine.getRun(runId)` now falls back to the durable history row when the run is no longer in the in-memory buffer (e.g. after a restart), and terminal rows persist a bounded per-node step log (`steps_json`: newest 200 steps, stacks stripped, 64 KB cap) — so "open a past failed run and see which node blew up" survives a restart. New `SuspendedRunStore.loadTerminal(runId)` backs this; `RunRecord` gains `finishedAt` and `steps`.
62 changes: 59 additions & 3 deletions packages/services/service-automation/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ export interface ConnectorDescriptor {
*/
export const DEFAULT_MAX_EXECUTION_LOG_SIZE = 1000;

/**
* Max steps persisted per terminal run-history row (#2585). The tail of the
* step log is kept — the last steps carry the failure — so durable single-run
* detail stays meaningful without letting a pathological loop-heavy run write
* an unbounded `steps_json` column.
*/
export const MAX_PERSISTED_HISTORY_STEPS = 200;

/** Construction options for {@link AutomationEngine}. */
export interface AutomationEngineOptions {
/**
Expand Down Expand Up @@ -364,12 +372,20 @@ export interface RunRecord {
status: 'completed' | 'failed';
startedAt: string;
startTime?: number;
/** When the run reached its terminal state. */
finishedAt?: string;
durationMs?: number;
/** Failure reason for a `failed` run — what a designer needs to fix it. */
error?: string;
nodeId?: string;
organizationId?: string | null;
userId?: string | null;
/**
* Bounded per-node step log (see {@link AutomationEngine.compactStepsForHistory}),
* so "which node blew up?" survives a restart. Optional — history rows
* written before this field existed have none.
*/
steps?: StepLogEntry[];
}

export interface SuspendedRunStore {
Expand All @@ -391,6 +407,12 @@ export interface SuspendedRunStore {
recordTerminal?(record: RunRecord): Promise<void>;
/** Newest terminal run-history records for a flow (for the Runs tab). */
listHistory?(flowName: string, limit: number): Promise<RunRecord[]>;
/**
* Load one terminal run-history record by its raw `runId`, or `null` when
* none is stored. Backs {@link AutomationEngine.getRun}'s durable fallback
* so "open a past failed run" works after a restart.
*/
loadTerminal?(runId: string): Promise<RunRecord | null>;
}

export class AutomationEngine implements IAutomationService {
Expand Down Expand Up @@ -997,23 +1019,40 @@ export class AutomationEngine implements IAutomationService {
}

/** Rehydrate a durable {@link RunRecord} into an {@link ExecutionLogEntry}
* for the Runs list. Step-level detail isn't persisted in the summary. */
* for the Runs surfaces. Steps carry the bounded persisted step log (rows
* written before step persistence have none). */
private runRecordToLogEntry(r: RunRecord): ExecutionLogEntry {
return {
id: r.runId,
flowName: r.flowName,
flowVersion: r.flowVersion,
status: r.status, // 'completed' | 'failed' — both valid ExecutionLog statuses
startedAt: r.startedAt,
completedAt: r.finishedAt,
durationMs: r.durationMs,
trigger: { type: '', userId: r.userId ?? undefined },
steps: [],
steps: r.steps ?? [],
error: r.error,
};
}

async getRun(runId: string): Promise<ExecutionLogEntry | null> {
return this.executionLogs.find(l => l.id === runId) ?? null;
const inMem = this.executionLogs.find(l => l.id === runId);
if (inMem) return inMem;
// Durable fallback: after a restart (or ring-buffer eviction) the run's
// terminal history row still answers "what happened, at which node?".
// Best-effort — a store failure degrades to "not found", never throws.
if (this.store?.loadTerminal) {
try {
const rec = await this.store.loadTerminal(runId);
if (rec) return this.runRecordToLogEntry(rec);
} catch (err) {
this.logger.warn(
`[Automation] durable run lookup failed for '${runId}': ${(err as Error)?.message}`,
);
}
}
return null;
}

/**
Expand Down Expand Up @@ -1691,15 +1730,19 @@ export class AutomationEngine implements IAutomationService {
entry.status === 'cancelled' ||
entry.status === 'timed_out';
if (terminal && this.store?.recordTerminal) {
const lastStep = entry.steps[entry.steps.length - 1];
const record: RunRecord = {
runId: entry.id,
flowName: entry.flowName,
flowVersion: entry.flowVersion,
status: entry.status === 'completed' ? 'completed' : 'failed',
startedAt: entry.startedAt,
finishedAt: entry.completedAt,
durationMs: entry.durationMs,
error: entry.error,
userId: entry.trigger?.userId,
nodeId: lastStep?.nodeId,
steps: this.compactStepsForHistory(entry.steps),
};
void this.store.recordTerminal(record).catch((err) => {
this.logger.warn(
Expand All @@ -1709,6 +1752,19 @@ export class AutomationEngine implements IAutomationService {
}
}

/**
* Compact a run's step log for durable history: keep the newest
* {@link MAX_PERSISTED_HISTORY_STEPS} steps (the tail carries the failure)
* and drop `error.stack` (the code/message pair is the designer-facing
* "why"; stacks bloat rows without aiding the Runs surface). Bounds the
* `steps_json` column so history rows stay cheap under retention (#2585).
*/
private compactStepsForHistory(steps: StepLogEntry[]): StepLogEntry[] {
return steps.slice(-MAX_PERSISTED_HISTORY_STEPS).map((s) =>
s.error?.stack ? { ...s, error: { code: s.error.code, message: s.error.message } } : s,
);
}

/**
* Validate each node's `type` against the live action registry (ADR-0018).
* A type is known if it is structural (start/end), has a registered
Expand Down
7 changes: 5 additions & 2 deletions packages/services/service-automation/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

// Core engine
export { AutomationEngine, DEFAULT_MAX_EXECUTION_LOG_SIZE } from './engine.js';
export { AutomationEngine, DEFAULT_MAX_EXECUTION_LOG_SIZE, MAX_PERSISTED_HISTORY_STEPS } from './engine.js';
export type {
AutomationEngineOptions,
NodeExecutor,
Expand All @@ -15,6 +15,7 @@ export type {
ConnectorActionDescriptor,
SuspendedRun,
SuspendedRunStore,
RunRecord,
StepLogEntry,
} from './engine.js';

Expand All @@ -23,8 +24,10 @@ export type {
export {
InMemorySuspendedRunStore,
ObjectStoreSuspendedRunStore,
DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW,
DEFAULT_RUN_HISTORY_RETENTION_DAYS,
} from './suspended-run-store.js';
export type { SuspendedRunStoreEngine } from './suspended-run-store.js';
export type { SuspendedRunStoreEngine, ObjectStoreSuspendedRunStoreOptions } from './suspended-run-store.js';

// The sys_automation_run object backing the durable store — registered by
// AutomationServicePlugin and exported for hosts wiring a custom store.
Expand Down
68 changes: 66 additions & 2 deletions packages/services/service-automation/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import type { IJobService } from '@objectstack/spec/contracts';
import { AutomationEngine } from './engine.js';
import { installBuiltinNodes, rearmSuspendedWaitTimers } from './builtin/index.js';
import { SysAutomationRun } from './sys-automation-run.object.js';
import { ObjectStoreSuspendedRunStore, type SuspendedRunStoreEngine } from './suspended-run-store.js';
import {
ObjectStoreSuspendedRunStore,
DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW,
DEFAULT_RUN_HISTORY_RETENTION_DAYS,
type SuspendedRunStoreEngine,
} from './suspended-run-store.js';

/**
* Configuration options for the AutomationServicePlugin.
Expand All @@ -31,6 +36,26 @@ export interface AutomationServicePluginOptions {
* to {@link DEFAULT_MAX_EXECUTION_LOG_SIZE} (1000).
*/
maxLogSize?: number;
/**
* Retention window in days for durable terminal run history in
* `sys_automation_run` (#2585; ADR-0057 posture — platform self-telemetry
* must be bounded). When > 0, a periodic sweep deletes terminal
* (completed / failed) history rows older than the window; suspended
* (`paused`) rows are live resumable state and are never pruned.
* **Default-on** at {@link DEFAULT_RUN_HISTORY_RETENTION_DAYS} (30). Set
* to `0` to disable age pruning (history kept until the per-flow cap).
*/
runHistoryRetentionDays?: number;
/**
* Per-flow cap on terminal run-history rows, enforced at write time (the
* "or 100 runs/flow, whichever first" half of the #2585 retention
* contract). Defaults to {@link DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW} (100);
* `0` disables the cap.
*/
runHistoryMaxPerFlow?: number;
/** Run-history retention sweep interval in ms (default 1 hour). Only used
* when `runHistoryRetentionDays` > 0. */
runHistorySweepMs?: number;
}

/**
Expand Down Expand Up @@ -72,6 +97,8 @@ export class AutomationServicePlugin implements Plugin {

private engine?: AutomationEngine;
private readonly options: AutomationServicePluginOptions;
/** Periodic run-history retention sweep (#2585); cleared on destroy. */
private retentionTimer?: ReturnType<typeof setInterval>;
/**
* Flow names this plugin has registered into the engine from the
* artifact / ObjectQL registry, tracked so a `metadata:reloaded` re-sync
Expand Down Expand Up @@ -155,14 +182,47 @@ export class AutomationServicePlugin implements Plugin {
try { dataEngine = ctx.getService<SuspendedRunStoreEngine>('objectql'); }
catch { try { dataEngine = ctx.getService<SuspendedRunStoreEngine>('data'); } catch { /* none */ } }
if (dataEngine && typeof dataEngine.find === 'function' && typeof dataEngine.insert === 'function') {
durableStore = new ObjectStoreSuspendedRunStore(dataEngine, ctx.logger);
durableStore = new ObjectStoreSuspendedRunStore(dataEngine, ctx.logger, {
maxTerminalRunsPerFlow:
this.options.runHistoryMaxPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW,
});
this.engine.setSuspendedRunStore(durableStore);
ctx.logger.info('[Automation] Suspended-run persistence enabled (sys_automation_run)');
} else {
ctx.logger.info('[Automation] No ObjectQL engine — suspended runs kept in-memory only');
}
}

// Run-history age retention (#2585, ADR-0057 posture): default-on sweep
// so `sys_automation_run` terminal history can't grow without bound.
// Runs once at kernel:ready then on a low-frequency interval; the timer
// is unref'd so it never keeps the process alive. Mirrors the
// service-messaging notification retention sweep.
const retentionDays = this.options.runHistoryRetentionDays ?? DEFAULT_RUN_HISTORY_RETENTION_DAYS;
if (durableStore && retentionDays > 0 && typeof ctx.hook === 'function') {
const store = durableStore;
const sweepMs = this.options.runHistorySweepMs ?? 3_600_000;
ctx.hook('kernel:ready', async () => {
const sweep = () => {
void store.pruneHistory(retentionDays).then((deleted) => {
if (deleted === undefined || deleted > 0) {
ctx.logger.info(
`[Automation] run-history retention: pruned ${deleted ?? '?'} terminal run(s) older than ${retentionDays}d`,
);
}
}).catch((err) =>
ctx.logger.warn(`[Automation] run-history retention sweep failed: ${(err as Error)?.message ?? err}`),
);
};
sweep();
this.retentionTimer = setInterval(sweep, sweepMs);
this.retentionTimer.unref?.();
ctx.logger.info(
`[Automation] run-history retention on (terminal runs > ${retentionDays}d pruned every ${Math.round(sweepMs / 1000)}s; cap ${this.options.runHistoryMaxPerFlow ?? DEFAULT_MAX_TERMINAL_RUNS_PER_FLOW}/flow at write)`,
);
});
}

// #1870 — bridge `script`-node function calls to the host function
// registry. ObjectQL holds the name→handler map populated from
// `bundle.functions` / `defineStack({ functions })` (the same registry
Expand Down Expand Up @@ -422,6 +482,10 @@ export class AutomationServicePlugin implements Plugin {
}

async destroy(): Promise<void> {
if (this.retentionTimer) {
clearInterval(this.retentionTimer);
this.retentionTimer = undefined;
}
this.engine = undefined;
}
}
50 changes: 50 additions & 0 deletions packages/services/service-automation/src/run-history.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,56 @@ describe('automation run history (durable observability)', () => {
expect(runs[0].id).toBeTruthy();
});

it('getRun falls back to durable history after a restart, WITH step detail (#2585)', async () => {
const store = new InMemorySuspendedRunStore();
const engineA = new AutomationEngine(silent, store);
engineA.registerNodeExecutor({
type: 'boom',
async execute() { throw new Error('kaboom'); },
} as never);
engineA.registerFlow('bad_flow', failingFlow('bad_flow') as never);
await engineA.execute('bad_flow', { event: 'test' } as AutomationContext);
await flush();

// Simulate a restart: a fresh engine with empty in-memory logs. Before
// #2585 its getRun returned null even though the history row existed.
const engineB = new AutomationEngine(silent, store);
const [listed] = await engineB.listRuns('bad_flow', { limit: 1 });
const run = await engineB.getRun(listed.id);
expect(run).not.toBeNull();
expect(run!.status).toBe('failed');
expect(run!.error ?? '').toMatch(/kaboom/);
// Durable single-run detail: the persisted step log names the node that
// blew up (stacks are stripped — code/message is the designer-facing why).
const failedStep = run!.steps.find((s) => s.status === 'failure');
expect(failedStep?.nodeId).toBe('boom');
expect(failedStep?.error?.message).toMatch(/kaboom/);
expect(failedStep?.error?.stack).toBeUndefined();
});

it('getRun returns null for an unknown run id even with a store attached', async () => {
const engine = new AutomationEngine(silent, new InMemorySuspendedRunStore());
expect(await engine.getRun('run_nope')).toBeNull();
});

it('caps terminal history per flow (retention stop-gap, #2585)', async () => {
const store = new InMemorySuspendedRunStore({ maxTerminalRunsPerFlow: 2 });
const engine = new AutomationEngine(silent, store);
engine.registerFlow('busy', trivialFlow('busy') as never);
engine.registerFlow('other', trivialFlow('other') as never);

for (let i = 0; i < 5; i++) {
await engine.execute('busy', { event: 'test' } as AutomationContext);
}
await engine.execute('other', { event: 'test' } as AutomationContext);
await flush();

// Only the newest 2 'busy' runs survive; the cap is per flow, so the
// single 'other' run is untouched.
expect(await store.listHistory('busy', 10)).toHaveLength(2);
expect(await store.listHistory('other', 10)).toHaveLength(1);
});

it('a failing history store never breaks the run (best-effort isolation)', async () => {
const store = new InMemorySuspendedRunStore();
// Override recordTerminal to reject — the run must still complete.
Expand Down
Loading