Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/loops/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,20 @@ export { reportLoopUsage, type UsageSink } from './report-usage'
export type { RunLoopOptions } from './run-loop'
export { createSandboxForSpec, runLoop } from './run-loop'
export { type AcquireOptions, acquireSandbox } from './sandbox-acquire'
export {
type CriuCapableClient,
probeSandboxCapabilities,
type SandboxCapabilities,
} from './sandbox-capabilities'
export { extractLlmCallEvent, mapSandboxEvent } from './sandbox-events'
export {
type CheckpointCapableBox,
createSandboxLineage,
type ForkCapableBox,
type SandboxLineage,
type SandboxLineageHandle,
type SessionCapableBox,
} from './sandbox-lineage'
export type {
AgentRunSpec,
DefaultVerdict,
Expand All @@ -82,6 +95,7 @@ export type {
LoopIterationDispatchPayload,
LoopIterationEndedPayload,
LoopIterationStartedPayload,
LoopLineageOptions,
LoopPlanDescription,
LoopPlanPayload,
LoopResult,
Expand Down
265 changes: 259 additions & 6 deletions src/loops/run-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,19 @@ import type { SandboxEvent, SandboxInstance } from '@tangle-network/sandbox'
import { ValidationError } from '../errors'
import { acquireSandbox } from './sandbox-acquire'
import { buildBackendOptions } from './sandbox-backend'
import { probeSandboxCapabilities } from './sandbox-capabilities'
import { extractLlmCallEvent } from './sandbox-events'
import {
createSandboxLineage,
type SandboxLineage,
type SandboxLineageHandle,
} from './sandbox-lineage'
import type {
AgentRunSpec,
Driver,
ExecCtx,
Iteration,
LoopLineageOptions,
LoopResult,
LoopSandboxClient,
LoopSandboxPlacement,
Expand Down Expand Up @@ -108,6 +115,18 @@ export interface RunLoopOptions<Task, Output, Decision> {
* a planner sees an arbitrary branch's filesystem — pair it with refine.
*/
onWorkerBox?: (box: SandboxInstance | undefined) => void
/**
* Opt-in box-lineage controls. Default OFF — unset means every iteration
* acquires a fresh box, streams once, and tears it down (today's behavior,
* byte-identical). With `sessionContinuity` on, a refine round continues the
* parent iteration's session on its live box; with `forkFanout` on (and a
* fork-capable platform), a fanout round forks the parent's checkpoint so the
* branches share a context prefix. The lineage owns every box it starts or
* forks and tears them all down at loop end — so these paths are mutually
* exclusive with `onWorkerBox`, which claims the same box-ownership channel.
* @experimental
*/
lineage?: LoopLineageOptions
}

/** @experimental */
Expand Down Expand Up @@ -142,6 +161,12 @@ export async function runLoop<Task, Output, Decision>(
}
: undefined

// Opt-in box lineage: when either flag is set, a backend-blind lineage owns
// box+session handles so a refine continues the parent session and a fanout
// forks the parent checkpoint. Both flags off ⇒ lineage stays undefined and
// the per-iteration acquire/stream/teardown path is byte-identical to today.
const lineageState = await setUpLineage(options, maxConcurrency)

await emitTrace(options.ctx.traceEmitter, {
kind: 'loop.started',
runId,
Expand Down Expand Up @@ -213,6 +238,13 @@ export async function runLoop<Task, Output, Decision>(
})
}

// Decide how this round acquires its sandbox streams. Without lineage it's
// a fresh box per iteration (today's path). With lineage it may continue
// the parent session (refine) or fork the parent checkpoint (fanout).
const lineagePlan = lineageState
? planLineageRound(lineageState, specs, slice, parentIndex, controller.signal)
: undefined

await runBatch({
slice,
baseIndex,
Expand All @@ -228,6 +260,8 @@ export async function runLoop<Task, Output, Decision>(
roundIndex,
parentIndex,
collectBox,
lineagePlan,
lineageState,
})

if (controller.signal.aborted) throwAbort()
Expand All @@ -244,6 +278,10 @@ export async function runLoop<Task, Output, Decision>(
if (isTerminalDecision(decision)) {
return await finalizeAndEmitEnded(options, decision, iterations, loopStart, now, runId)
}
// The loop continues: free any lineage boxes no future round can descend
// from, so the live-box set tracks the active frontier instead of growing
// with every round. No-op unless pruning is provably safe (see canPrune).
if (lineageState) await pruneLineage(lineageState, iterations)
}

// Either the cap was reached without a terminal decision, or plan() returned
Expand All @@ -259,9 +297,184 @@ export async function runLoop<Task, Output, Decision>(
ownedBoxes.map((b) => destroySandboxSafe(b, options.ctx.traceEmitter, runId, now)),
)
if (options.onWorkerBox) options.onWorkerBox(undefined)
// The lineage owns every box it started or forked across all rounds; it tears
// them down at loop end (kept alive between rounds so a later round can
// continue/fork them).
if (lineageState) await lineageState.lineage.teardown()
}
}

/**
* Per-loop lineage state: the backend-blind lineage, the caller's opt-in flags,
* and the live handle for each completed iteration so a later round can continue
* or fork from it. `undefined` ⇒ no lineage; the kernel uses the fresh-box path.
*/
interface LineageState {
lineage: SandboxLineage
options: LoopLineageOptions
/** iteration index → its live box+session handle (kept alive across rounds). */
handles: Map<number, SandboxLineageHandle>
/**
* Whether the kernel may free non-frontier boxes after each round. Safe only
* when the driver never authors its own branch point (`describePlan` absent),
* so the kernel-inferred `branchPoint` — which moves monotonically toward
* higher-scoring iterations — is the only descent source. A driver that
* declares `parentIndex` may descend from any prior iteration, so no box can
* be freed before loop end.
*/
canPrune: boolean
}

/**
* Build the lineage when either lineage flag is set. Probes the platform's fork
* capability once per run (the lineage degrades gracefully when it's absent).
* Rejects the lineage + `onWorkerBox` combination: both claim the same
* box-ownership channel, and silently honoring one would leak or double-free.
*/
async function setUpLineage<Task, Output, Decision>(
options: RunLoopOptions<Task, Output, Decision>,
maxConcurrency: number,
): Promise<LineageState | undefined> {
const lineageOpts = options.lineage
if (!lineageOpts || (!lineageOpts.sessionContinuity && !lineageOpts.forkFanout)) return undefined
if (options.onWorkerBox) {
throw new ValidationError(
'runLoop: `lineage` and `onWorkerBox` both own worker boxes — pass only one',
)
}
const capabilities = await probeSandboxCapabilities(options.ctx.sandboxClient)
return {
lineage: createSandboxLineage(options.ctx.sandboxClient, capabilities, { maxConcurrency }),
options: lineageOpts,
handles: new Map(),
canPrune: typeof options.driver.describePlan !== 'function',
}
}

/**
* One iteration's sandbox-stream source for a lineage round. The kernel awaits
* `acquire()` inside the concurrency-bounded batch (so a fork's per-branch
* `streamPrompt` and a continue's same-box stream are both rate-limited and
* abort-checked like a fresh create). Returns the live event stream plus the
* handle to record for the NEXT round to descend from.
*/
interface LineageStreamSource {
acquire(): Promise<{ events: AsyncIterable<SandboxEvent>; handle: SandboxLineageHandle }>
}

/** The per-round lineage plan: a stream source per slice offset, or `undefined`
* for offsets with no lineage source (defensive — never expected). */
type LineageRoundPlan = (LineageStreamSource | undefined)[]

/**
* Decide, for one round, how each iteration acquires its sandbox stream:
* - refine (1 task) + `sessionContinuity` + a live parent handle ⇒ continue
* the parent session on its box.
* - fanout (N tasks) + `forkFanout` + a live parent handle ⇒ fork the parent
* checkpoint once and stream each branch from a child box (degrades to fresh
* boxes inside the lineage when the platform can't fork).
* - otherwise (round 0, no parent, the off flag) ⇒ start a fresh box per
* iteration THROUGH the lineage so it's owned + a handle is recorded for a
* later round to descend from.
* Round 0 (parentIndex undefined) always starts fresh — the independence of the
* first batch is preserved.
*/
function planLineageRound<Task>(
state: LineageState,
specs: AgentRunSpec<Task>[],
slice: Task[],
parentIndex: number | undefined,
signal: AbortSignal,
): LineageRoundPlan {
const lineage = state.lineage
const parent = parentIndex !== undefined ? state.handles.get(parentIndex) : undefined
const promptFor = (offset: number): string => {
const spec = specs[offset % specs.length]
if (!spec) throw new ValidationError('runLoop: no AgentRunSpec available for lineage iteration')
return spec.taskToPrompt(slice[offset] as Task)
}
const specAt = (offset: number): AgentRunSpec<unknown> => {
const spec = specs[offset % specs.length]
if (!spec) throw new ValidationError('runLoop: no AgentRunSpec available for lineage iteration')
return spec as AgentRunSpec<unknown>
}

// Continue the parent session: a single-task round descending from a live
// handle, with the flag on. Reuses the parent's box + session id.
if (slice.length === 1 && parent && state.options.sessionContinuity) {
return [
{
async acquire() {
const events = await lineage.continue(parent, promptFor(0), signal)
// Continuation threads the SAME handle forward — later rounds keep
// descending from this box's evolving session.
return { events, handle: parent }
},
},
]
}

// Fork the parent checkpoint: a multi-task round descending from a live handle,
// with the flag on. One checkpoint, N child streams — lazily awaited once and
// shared across the offsets so the batch checkpoints exactly once.
if (slice.length > 1 && parent && state.options.forkFanout) {
const prompts = slice.map((_, offset) => promptFor(offset))
const childSpecs = slice.map((_, offset) => specAt(offset))
let forked: Promise<{ handle: SandboxLineageHandle; events: AsyncIterable<SandboxEvent> }[]>
const ensureForked = () => {
forked ??= lineage.fork(parent, prompts, childSpecs, signal)
return forked
}
return slice.map((_, offset) => ({
async acquire() {
const branches = await ensureForked()
const branch = branches[offset]
if (!branch)
throw new ValidationError('runLoop: lineage fork produced no branch for offset')
return branch
},
}))
}

// Fresh through the lineage (round 0, no parent, or the relevant flag off):
// start an owned box per iteration and record a handle for later descent.
return slice.map((_, offset) => ({
async acquire() {
return lineage.start(specAt(offset), promptFor(offset), signal)
},
}))
}

/**
* After a round, free lineage boxes no future round can descend from. The only
* descent source for a kernel-inferred topology is `branchPoint`, which moves
* monotonically toward higher-scoring iterations and never returns to one it has
* passed — so every box except the current branch point's is unreachable and can
* be torn down now instead of at loop end. Skipped entirely when the driver
* authors its own branch point (`canPrune` false): it may descend from any prior
* iteration. Also skipped when the branch point has no recorded handle (its
* acquire failed) — that conservative case keeps every box.
*/
async function pruneLineage<Task, Output>(
state: LineageState,
iterations: ReadonlyArray<Iteration<Task, Output>>,
): Promise<void> {
if (!state.canPrune) return
const keepIndex = branchPoint(iterations)
if (keepIndex === undefined) return
const keep = state.handles.get(keepIndex)
if (!keep) return
await state.lineage.prune([keep])
// Drop handle entries pointing at the now-freed boxes so the map never hands a
// later round a deleted box. Entries sharing the kept box (a refine chain)
// stay.
const stale: number[] = []
for (const [index, handle] of state.handles) {
if (handle.box !== keep.box) stale.push(index)
}
for (const index of stale) state.handles.delete(index)
}

interface RunBatchArgs<Task, Output> {
slice: Task[]
baseIndex: number
Expand All @@ -284,6 +497,16 @@ interface RunBatchArgs<Task, Output> {
* default per-iteration teardown.
*/
collectBox?: (box: SandboxInstance) => void
/**
* Lineage mode: per-offset stream sources for this round. When set, an
* iteration acquires its sandbox stream through the lineage (continue / fork /
* fresh) instead of `createSandboxForSpec`, and the lineage — not the
* iteration — owns box teardown (deferred to loop end).
*/
lineagePlan?: LineageRoundPlan
/** The loop's lineage state; iterations record their handle here for the next
* round to descend from. Set iff `lineagePlan` is. */
lineageState?: LineageState
}

async function runBatch<Task, Output>(args: RunBatchArgs<Task, Output>) {
Expand Down Expand Up @@ -349,8 +572,27 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
})

let box: SandboxInstance | undefined
// Lineage-owned boxes are torn down by the lineage at loop end, not here. The
// flag tracks whether THIS iteration's box came from the lineage so the
// teardown branch below skips it.
let lineageOwned = false
try {
box = await createSandboxForSpec(args.ctx.sandboxClient, spec, args.signal)
// Stream source: the lineage (continue / fork / fresh) when this round runs
// under lineage, else a fresh box + a single `streamPrompt` (today's path,
// byte-identical when no lineage). The lineage path supplies a session id on
// the stream; the fresh path passes none — preserving N-independent-boxes.
let stream: AsyncIterable<SandboxEvent>
const source = args.lineagePlan?.[args.item.index - args.baseIndex]
if (source) {
const acquired = await source.acquire()
box = acquired.handle.box
lineageOwned = true
args.lineageState?.handles.set(args.item.index, acquired.handle)
stream = acquired.events
} else {
box = await createSandboxForSpec(args.ctx.sandboxClient, spec, args.signal)
stream = box.streamPrompt(spec.taskToPrompt(args.item.task), { signal: args.signal })
}
const placement = describeSandboxPlacement(args.ctx.sandboxClient, box)
await emitTrace(args.ctx.traceEmitter, {
kind: 'loop.iteration.dispatch',
Expand All @@ -367,9 +609,8 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
parentIndex: args.parentIndex,
},
})
const message = spec.taskToPrompt(args.item.task)
const events: SandboxEvent[] = []
for await (const event of box.streamPrompt(message, { signal: args.signal })) {
for await (const event of stream) {
events.push(event)
const llmCall = extractLlmCallEvent(event, slot.agentRunName)
if (llmCall) {
Expand Down Expand Up @@ -413,9 +654,16 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
})
// The loop owns the per-shot box lifecycle. Default: tear it down now so
// sandboxes don't leak. Same-sandbox mode: hand it to the kernel to keep
// alive for the planner (it tears these down at loop end instead).
if (args.collectBox && box) args.collectBox(box)
else await destroySandboxSafe(box, args.ctx.traceEmitter, args.runId, args.now)
// alive for the planner. Lineage mode: the lineage owns the box and keeps it
// alive across rounds (a later round may continue/fork it), tearing it down
// at loop end — so skip per-iteration teardown here.
if (lineageOwned) {
// no-op: lineage.teardown() reaps this box at loop end
} else if (args.collectBox && box) {
args.collectBox(box)
} else {
await destroySandboxSafe(box, args.ctx.traceEmitter, args.runId, args.now)
}
}
// An abort caught above is NOT a soft per-iteration failure — it must
// short-circuit the batch, not degrade to a recorded empty iteration. The
Expand All @@ -424,6 +672,11 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
if (slot.error) throw slot.error
throwAbort()
}
// A structural lineage error (a dropped session, a fork-capability contract
// violation, a missing spec) is likewise not a soft worker failure: it
// invalidates the run's continuity/branching guarantee, so propagate it
// instead of degrading to a recorded empty iteration the driver might ignore.
if (slot.error instanceof ValidationError) throw slot.error
}

function isAbortError(err: unknown): boolean {
Expand Down
Loading
Loading