From 8d8372d677b8805e0828e8093b4eab9861a27d37 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 6 Jun 2026 20:53:31 +0300 Subject: [PATCH] feat(runtime): emit sandbox run hook events --- bench/src/commit0-gate.mts | 31 ++++- bench/src/corpus.ts | 7 + bench/src/experiment.ts | 5 +- bench/src/runtime-hook-recorder.ts | 38 ++++++ bench/src/worker.ts | 16 ++- src/runtime/sandbox-run.ts | 211 +++++++++++++++++++++++++++-- tests/runtime/sandbox-run.test.ts | 106 +++++++++++++++ 7 files changed, 401 insertions(+), 13 deletions(-) create mode 100644 bench/src/runtime-hook-recorder.ts diff --git a/bench/src/commit0-gate.mts b/bench/src/commit0-gate.mts index 164db05..dae5913 100644 --- a/bench/src/commit0-gate.mts +++ b/bench/src/commit0-gate.mts @@ -37,6 +37,7 @@ import { Sandbox } from '@tangle-network/sandbox' import { createCommit0Adapter } from './benchmarks/commit0' import type { BenchTask } from './benchmarks/types' import { type AttemptRecord, appendRunRecord, type RunRecord } from './corpus' +import { type BenchRuntimeHookEvent, createRuntimeHookRecorder } from './runtime-hook-recorder' function must(name: string): string { const v = process.env[name] @@ -64,6 +65,7 @@ interface Shot { wallMs: number /** measured count of stream events from the rollout (0 if it errored before streaming) */ events: number + runtimeEvents?: BenchRuntimeHookEvent[] } /** Build the rollout prompt: clone the stub, implement the source, write the diff to @@ -149,8 +151,19 @@ async function runShot(task: BenchTask, attempt: number, cfg: ShotCfg): Promise< }, } let run: SandboxRun | undefined + const runtime = createRuntimeHookRecorder() try { - run = await openSandboxRun(client, { agentRun, signal: controller.signal }, commit0Deliverable) + run = await openSandboxRun( + client, + { + agentRun, + signal: controller.signal, + hooks: runtime.hooks, + runId: `commit0:${task.id}:${attempt}`, + scenarioId: task.id, + }, + commit0Deliverable, + ) const turn = await run.start(rolloutPrompt(meta)) const ok = turn.out.diff.trim().length > 0 return { @@ -159,12 +172,22 @@ async function runShot(task: BenchTask, attempt: number, cfg: ShotCfg): Promise< diff: turn.out.diff, ok, events: turn.events.length, + runtimeEvents: runtime.events, wallMs: Date.now() - startedAt, ...(ok ? {} : { detail: `empty patch${turn.readError ? ` (read failed: ${turn.readError.slice(0, 120)})` : ''}${turn.out.lastErr ? `; lastError=${turn.out.lastErr}` : ''}` }), } } catch (err) { const msg = err instanceof Error ? err.message : String(err) - return { task, attempt, diff: '', ok: false, events: 0, wallMs: Date.now() - startedAt, detail: `rollout error: ${msg.slice(0, 200)}` } + return { + task, + attempt, + diff: '', + ok: false, + events: 0, + runtimeEvents: runtime.events, + wallMs: Date.now() - startedAt, + detail: `rollout error: ${msg.slice(0, 200)}`, + } } finally { if (timer) clearTimeout(timer) if (run) await run.close() @@ -312,6 +335,9 @@ async function main(): Promise { for (const task of tasks) { let built = false const attempts: AttemptRecord[] = [] + const runtimeEvents = shots + .filter((x) => x.task.id === task.id) + .flatMap((x) => x.runtimeEvents ?? []) for (let i = 0; i < k; i += 1) { const s = shots.find((x) => x.task.id === task.id && x.attempt === i) let sc: { score: number; resolved: boolean } | undefined @@ -350,6 +376,7 @@ async function main(): Promise { resolved: attempts.some((a) => a.valid === true), attempts, infraError: false, + ...(runtimeEvents.length > 0 ? { runtimeEvents } : {}), } await appendRunRecord(corpusPath, record) // incremental: partial progress survives a crash } diff --git a/bench/src/corpus.ts b/bench/src/corpus.ts index fa76fd9..f7f9466 100644 --- a/bench/src/corpus.ts +++ b/bench/src/corpus.ts @@ -17,6 +17,7 @@ import { dirname } from 'node:path' import { hashContent, type RunSplitTag, validateRunRecord } from '@tangle-network/agent-eval' import type { CorpusRecord } from '@tangle-network/agent-eval/rl' import type { Iteration } from '@tangle-network/agent-runtime/loops' +import type { BenchRuntimeHookEvent } from './runtime-hook-recorder' /** One attempt within a condition-run: the prompt/steer sent, the output, the * verdict, the measured economics, and a bounded trace summary. @@ -67,6 +68,8 @@ export interface RunRecord { seed?: number splitTag?: RunSplitTag commitSha?: string + /** Passive runtime hook evidence captured during the run. Optional and bounded by producers. */ + runtimeEvents?: BenchRuntimeHookEvent[] } const TRACE_TAIL_MAX = 600 @@ -116,6 +119,7 @@ export function buildRunRecord(args: { seed?: number splitTag?: RunSplitTag commitSha?: string + runtimeEvents?: BenchRuntimeHookEvent[] }): RunRecord { const attempts = args.iterations.map(summarizeAttempt) return { @@ -131,6 +135,9 @@ export function buildRunRecord(args: { ...(args.seed !== undefined ? { seed: args.seed } : {}), ...(args.splitTag !== undefined ? { splitTag: args.splitTag } : {}), ...(args.commitSha !== undefined ? { commitSha: args.commitSha } : {}), + ...(args.runtimeEvents !== undefined && args.runtimeEvents.length > 0 + ? { runtimeEvents: args.runtimeEvents } + : {}), } } diff --git a/bench/src/experiment.ts b/bench/src/experiment.ts index f2557db..670866d 100644 --- a/bench/src/experiment.ts +++ b/bench/src/experiment.ts @@ -34,6 +34,7 @@ import { import type { BenchmarkAdapter, BenchTask } from './benchmarks/types' import { appendRunRecord, buildRunRecord } from './corpus' import { routerChatWithUsage } from './router-client' +import { createRuntimeHookRecorder } from './runtime-hook-recorder' import { runPool } from './run-pool' import { runSteeringExperiment } from './steering-experiment' @@ -292,13 +293,14 @@ export async function runExperiment(cfg: ExperimentConfig): Promise({ driver: createDynamicDriver({ planner, maxIterations: rounds }), agentRun: cfg.agentRun, output, validator, task: task.prompt, - ctx: { sandboxClient: cfg.sandboxClient }, + ctx: { sandboxClient: cfg.sandboxClient, hooks: runtime.hooks }, maxIterations: rounds, }) const iter0 = result.iterations[0] @@ -319,6 +321,7 @@ export async function runExperiment(cfg: ExperimentConfig): Promise console.error( diff --git a/bench/src/runtime-hook-recorder.ts b/bench/src/runtime-hook-recorder.ts new file mode 100644 index 0000000..0b48782 --- /dev/null +++ b/bench/src/runtime-hook-recorder.ts @@ -0,0 +1,38 @@ +export type BenchRuntimeHookPhase = 'before' | 'after' | 'error' | 'event' + +export interface BenchRuntimeHookEvent { + id: string + runId: string + scenarioId?: string + target: string + phase: BenchRuntimeHookPhase + timestamp: number + stepIndex?: number + parentId?: string + payload?: Payload + metadata?: Record +} + +export interface BenchRuntimeHooks { + onEvent?: ( + event: BenchRuntimeHookEvent, + context: { signal?: AbortSignal }, + ) => void | Promise +} + +export interface RuntimeHookRecorder { + readonly events: BenchRuntimeHookEvent[] + readonly hooks: BenchRuntimeHooks +} + +export function createRuntimeHookRecorder(): RuntimeHookRecorder { + const events: BenchRuntimeHookEvent[] = [] + return { + events, + hooks: { + onEvent: (event) => { + events.push(event) + }, + }, + } +} diff --git a/bench/src/worker.ts b/bench/src/worker.ts index 0d0246c..02c2710 100644 --- a/bench/src/worker.ts +++ b/bench/src/worker.ts @@ -13,6 +13,7 @@ import { } from '@tangle-network/agent-runtime/loops' import { Sandbox } from '@tangle-network/sandbox' import type { BenchTask } from './benchmarks/types' +import { type BenchRuntimeHookEvent, createRuntimeHookRecorder } from './runtime-hook-recorder' export interface WorkerConfig { sandboxBaseUrl: string @@ -28,6 +29,7 @@ export interface ShotResult { patch: string ok: boolean detail?: string + runtimeEvents?: BenchRuntimeHookEvent[] } const PATCH_PATH = '/tmp/solution.patch' @@ -97,7 +99,18 @@ export async function solveShot( }, }, } - const run = await openSandboxRun(client, { agentRun, signal: controller.signal }, swePatchDeliverable) + const runtime = createRuntimeHookRecorder() + const run = await openSandboxRun( + client, + { + agentRun, + signal: controller.signal, + hooks: runtime.hooks, + runId: `swe-bench:${task.id}`, + scenarioId: task.id, + }, + swePatchDeliverable, + ) try { const turn = await run.start(prompt) const empty = turn.out.patch.trim().length === 0 @@ -107,6 +120,7 @@ export async function solveShot( detail: empty ? `empty patch${turn.readError ? ` (patch read failed: ${turn.readError.slice(0, 120)})` : ''}${turn.out.lastErr ? `; lastError=${turn.out.lastErr}` : ''}` : undefined, + runtimeEvents: runtime.events, } } finally { if (timer) clearTimeout(timer) diff --git a/src/runtime/sandbox-run.ts b/src/runtime/sandbox-run.ts index 301595a..35fb9a2 100644 --- a/src/runtime/sandbox-run.ts +++ b/src/runtime/sandbox-run.ts @@ -32,10 +32,12 @@ */ import type { SandboxEvent, SandboxInstance } from '@tangle-network/sandbox' +import type { RuntimeHooks, RuntimeHookTarget } from '../runtime-hooks' +import { notifyRuntimeHookEvent } from '../runtime-hooks' import { probeSandboxCapabilities } from './sandbox-capabilities' import { createSandboxLineage, type SandboxLineageHandle } from './sandbox-lineage' import type { AgentRunSpec, LoopSandboxClient } from './types' -import { throwIfAborted } from './util' +import { randomSuffix, throwIfAborted } from './util' /** * @experimental @@ -78,6 +80,14 @@ export interface OpenSandboxRunOptions { /** Profile + sandbox env/overrides. `sandboxOverrides.backend.type` is the harness. */ agentRun: AgentRunSpec signal: AbortSignal + /** Optional execution-scoped observers. Hook failures never fail the run. */ + hooks?: RuntimeHooks + /** Stable run id for trace joins. Defaults to a short runtime-minted id. */ + runId?: string + /** Optional benchmark/scenario id carried into emitted hook events. */ + scenarioId?: string + /** Test seam for deterministic hook timestamps. Defaults to `Date.now`. */ + now?: () => number /** Bounds box-creation bursts inside lineage fanout. Default from lineage. */ maxConcurrency?: number } @@ -93,12 +103,76 @@ export async function openSandboxRun( options: OpenSandboxRunOptions, deliverable: Deliverable, ): Promise> { + const runId = options.runId ?? `sandbox-run-${randomSuffix()}` + const now = options.now ?? Date.now const capabilities = await probeSandboxCapabilities(client) const lineage = createSandboxLineage(client, capabilities, { ...(options.maxConcurrency !== undefined ? { maxConcurrency: options.maxConcurrency } : {}), }) let handle: SandboxLineageHandle | undefined let started = false + let runStartedAt: number | undefined + let failed = false + let turnCount = 0 + + function emit(event: { + target: RuntimeHookTarget + phase: 'before' | 'after' | 'error' + timestamp: number + stepIndex?: number + payload?: Record + }): void { + notifyRuntimeHookEvent( + options.hooks, + { + id: `${runId}:${event.target}:${event.phase}${ + event.stepIndex === undefined ? '' : `:${event.stepIndex}` + }`, + runId, + scenarioId: options.scenarioId, + target: event.target, + phase: event.phase, + timestamp: event.timestamp, + stepIndex: event.stepIndex, + payload: event.payload, + metadata: { producer: 'openSandboxRun' }, + }, + { signal: options.signal }, + ) + } + + const runPayload = (): Record => ({ + agentName: options.agentRun.name ?? options.agentRun.profile.name ?? 'agent', + profileName: options.agentRun.profile.name, + backendType: backendType(options.agentRun), + deliverableKind: deliverable.kind, + ...(deliverable.kind === 'artifact' ? { deliverablePath: deliverable.path } : {}), + ...(handle ? { sessionId: handle.sessionId, sandboxId: handle.box.id } : {}), + }) + + const turnPayload = ( + prompt: string, + turnKind: 'start' | 'resume', + startedAt: number, + result?: TurnResult, + error?: unknown, + ): Record => ({ + ...runPayload(), + turnKind, + promptChars: prompt.length, + promptHash: hashText(prompt), + ...(result !== undefined || error !== undefined + ? { durationMs: Math.max(0, now() - startedAt) } + : {}), + ...(result + ? { + eventCount: result.events.length, + eventTypes: eventTypeCounts(result.events), + ...(result.readError !== undefined ? { readError: result.readError } : {}), + } + : {}), + ...(error !== undefined ? { error: errorMessage(error) } : {}), + }) // `box` is passed in (not read from the closed-over `handle`) so the invariant // is type-level, not call-order discipline. @@ -141,22 +215,141 @@ export async function openSandboxRun( 'openSandboxRun: start() already called — use resume() to continue the session', ) started = true + runStartedAt = now() + emit({ + target: 'agent.run', + phase: 'before', + timestamp: runStartedAt, + payload: { ...runPayload(), turnCount: 0 }, + }) + const stepIndex = turnCount + const turnStartedAt = now() + emit({ + target: 'agent.turn', + phase: 'before', + timestamp: turnStartedAt, + stepIndex, + payload: turnPayload(prompt, 'start', turnStartedAt), + }) // lineage.start uses only spec.profile + sandboxOverrides (the prompt is passed // directly, not via taskToPrompt), so the task type is irrelevant here. - const r = await lineage.start( - options.agentRun as AgentRunSpec, - prompt, - options.signal, - ) - handle = r.handle - return settle(handle.box, r.events) + try { + const r = await lineage.start( + options.agentRun as AgentRunSpec, + prompt, + options.signal, + ) + handle = r.handle + const result = await settle(handle.box, r.events) + turnCount += 1 + emit({ + target: 'agent.turn', + phase: 'after', + timestamp: now(), + stepIndex, + payload: turnPayload(prompt, 'start', turnStartedAt, result), + }) + return result + } catch (error) { + failed = true + emit({ + target: 'agent.turn', + phase: 'error', + timestamp: now(), + stepIndex, + payload: turnPayload(prompt, 'start', turnStartedAt, undefined, error), + }) + emit({ + target: 'agent.run', + phase: 'error', + timestamp: now(), + payload: { ...runPayload(), turnCount, error: errorMessage(error) }, + }) + throw error + } }, async resume(prompt) { if (!handle) throw new Error('openSandboxRun: resume() called before start()') - return settle(handle.box, await lineage.continue(handle, prompt, options.signal)) + const stepIndex = turnCount + const turnStartedAt = now() + emit({ + target: 'agent.turn', + phase: 'before', + timestamp: turnStartedAt, + stepIndex, + payload: turnPayload(prompt, 'resume', turnStartedAt), + }) + try { + const result = await settle( + handle.box, + await lineage.continue(handle, prompt, options.signal), + ) + turnCount += 1 + emit({ + target: 'agent.turn', + phase: 'after', + timestamp: now(), + stepIndex, + payload: turnPayload(prompt, 'resume', turnStartedAt, result), + }) + return result + } catch (error) { + failed = true + emit({ + target: 'agent.turn', + phase: 'error', + timestamp: now(), + stepIndex, + payload: turnPayload(prompt, 'resume', turnStartedAt, undefined, error), + }) + emit({ + target: 'agent.run', + phase: 'error', + timestamp: now(), + payload: { ...runPayload(), turnCount, error: errorMessage(error) }, + }) + throw error + } }, async close() { await lineage.teardown() + if (runStartedAt !== undefined) { + emit({ + target: 'agent.run', + phase: 'after', + timestamp: now(), + payload: { + ...runPayload(), + turnCount, + status: failed ? 'error' : 'completed', + durationMs: Math.max(0, now() - runStartedAt), + }, + }) + } }, } } + +function backendType(spec: AgentRunSpec): unknown { + const backend = spec.sandboxOverrides?.backend as { type?: unknown } | undefined + return backend?.type +} + +function eventTypeCounts(events: SandboxEvent[]): Record { + const counts: Record = {} + for (const event of events) counts[event.type] = (counts[event.type] ?? 0) + 1 + return counts +} + +function hashText(value: string): string { + let hash = 2166136261 + for (let i = 0; i < value.length; i += 1) { + hash ^= value.charCodeAt(i) + hash = Math.imul(hash, 16777619) + } + return (hash >>> 0).toString(16).padStart(8, '0') +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} diff --git a/tests/runtime/sandbox-run.test.ts b/tests/runtime/sandbox-run.test.ts index c6bf8b9..d2648d3 100644 --- a/tests/runtime/sandbox-run.test.ts +++ b/tests/runtime/sandbox-run.test.ts @@ -1,6 +1,7 @@ import type { SandboxEvent, SandboxInstance } from '@tangle-network/sandbox' import { describe, expect, it } from 'vitest' import { type AgentRunSpec, type Deliverable, openSandboxRun } from '../../src/runtime' +import type { RuntimeHookEvent } from '../../src/runtime-hooks' interface FakeOpts { /** Resolves the artifact read; throw to exercise the `readError` path. Default: canned. */ @@ -79,6 +80,15 @@ function spec(name = 'w'): AgentRunSpec { return { profile: { name }, name, taskToPrompt: (t) => t } } +function backendSpec(name = 'w'): AgentRunSpec { + return { + profile: { name }, + name, + taskToPrompt: (t) => t, + sandboxOverrides: { backend: { type: 'opencode' } }, + } +} + const eventsDeliverable: Deliverable<{ text: string }> = { kind: 'events', fromEvents: (events) => ({ @@ -248,3 +258,99 @@ describe('openSandboxRun — abort-aware artifact read', () => { expect(readPaths).toHaveLength(0) }) }) + +describe('openSandboxRun — runtime hooks', () => { + it('emits passive run and turn lifecycle events for start, resume, and close', async () => { + const { client } = createFakeClient({ sessionLive: true }) + const events: RuntimeHookEvent[] = [] + let t = 1_000 + const run = await openSandboxRun( + client, + { + agentRun: backendSpec('coder'), + signal: new AbortController().signal, + hooks: { onEvent: (event) => void events.push(event) }, + runId: 'bench-run-1', + scenarioId: 'case-1', + now: () => t++, + }, + artifactDeliverable('solution.patch'), + ) + + await run.start('first turn') + await run.resume('second turn') + await run.close() + + expect( + events.map((event) => `${event.target}:${event.phase}:${event.stepIndex ?? '-'}`), + ).toEqual([ + 'agent.run:before:-', + 'agent.turn:before:0', + 'agent.turn:after:0', + 'agent.turn:before:1', + 'agent.turn:after:1', + 'agent.run:after:-', + ]) + expect(events.every((event) => event.runId === 'bench-run-1')).toBe(true) + expect(events.every((event) => event.scenarioId === 'case-1')).toBe(true) + expect(events.every((event) => event.metadata?.producer === 'openSandboxRun')).toBe(true) + expect(events[0]!.payload).toMatchObject({ + agentName: 'coder', + profileName: 'coder', + backendType: 'opencode', + deliverableKind: 'artifact', + deliverablePath: 'solution.patch', + turnCount: 0, + }) + expect(events[2]!.payload).toMatchObject({ + agentName: 'coder', + turnKind: 'start', + promptChars: 'first turn'.length, + eventCount: 1, + eventTypes: { result: 1 }, + sessionId: expect.any(String), + sandboxId: 'box-0', + }) + expect(events[4]!.payload).toMatchObject({ + turnKind: 'resume', + promptChars: 'second turn'.length, + eventCount: 1, + eventTypes: { result: 1 }, + sessionId: (events[2]!.payload as { sessionId: string }).sessionId, + sandboxId: 'box-0', + }) + expect(events[5]!.payload).toMatchObject({ + turnCount: 2, + sessionId: (events[2]!.payload as { sessionId: string }).sessionId, + sandboxId: 'box-0', + }) + }) + + it('keeps hook failures non-fatal and reports them through onHookError', async () => { + const { client } = createFakeClient() + const hookErrors: string[] = [] + const run = await openSandboxRun( + client, + { + agentRun: spec(), + signal: new AbortController().signal, + hooks: { + onEvent: () => { + throw new Error('hook down') + }, + onHookError: (error, context) => { + hookErrors.push(`${context.hook}:${context.target}:${context.phase}:${error.message}`) + }, + }, + runId: 'bench-run-2', + }, + eventsDeliverable, + ) + + const turn = await run.start('still runs') + + expect(turn.out).toEqual({ text: 'streamed' }) + expect(hookErrors).toContain('onEvent:agent.run:before:hook down') + expect(hookErrors).toContain('onEvent:agent.turn:after:hook down') + }) +})