diff --git a/.gitignore b/.gitignore index e728995..c1b1505 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ dist/ node_modules/ *.tgz + +# meta-harness / evolve loop runtime state +.evolve/ diff --git a/docs/agent-bus-protocol.md b/docs/agent-bus-protocol.md new file mode 100644 index 0000000..33fb20d --- /dev/null +++ b/docs/agent-bus-protocol.md @@ -0,0 +1,76 @@ +# Agent-Bus Protocol + +The wire-level contract that makes agent-to-agent communication composable across organizational and cloud boundaries. agent-runtime emits these headers on every outbound participant call; agent-gateway reads them on inbound and enforces the depth limit. Any third party can publish a gateway-fronted agent and participate, billed correctly, traced consistently, recursion-bounded. + +This document is **normative**. agent-runtime + agent-gateway implementations MUST honor every section; downstream agent endpoints MAY rely on the contract. + +## Headers + +All header names are lowercased on the wire. Implementations MUST be case-insensitive on read. + +| Header | Direction | Purpose | +|---|---|---| +| `x-tangle-forwarded-authorization` | inbound+outbound | The original user's `Bearer sk-tan-` (or `Bearer `). Forwarded verbatim through every hop so the final billed party is always the human/agent that initiated the request, not whichever intermediate agent made the call. | +| `x-tangle-forwarded-depth` | inbound+outbound | Hop counter. Origin caller MUST omit or set to `0`. Every gateway / runtime MUST increment by 1 before forwarding. Recipients MUST refuse with HTTP `413 Payload Too Large` when the inbound depth ≥ `DEFAULT_MAX_DEPTH` (4 unless overridden). | +| `x-tangle-runid` | inbound+outbound | The top-level conversation's run identifier. Propagated unchanged through all nested calls so all hops correlate to one run. | +| `x-tangle-turnid` | outbound | This specific call's deterministic turn identifier. Format: `.t.`. Stable across retries of the same logical turn; caching gateways MAY treat repeated turn ids as idempotency keys and return cached results. | +| `x-tangle-parent-turnid` | outbound (recursion) | When the call is *inside* another turn (i.e. the caller is itself a participant in a higher-order conversation), the enclosing turn's id. Used for trace stitching. Omit when at the top level. | +| `x-tangle-speaker` | outbound | The logical conversation-peer label at the sending side (e.g. `researcher`, `critic`). Cosmetic but useful for trace tagging and dashboard filtering. | + +## Invariants + +1. **Depth monotonicity.** A receiving runtime MUST NOT decrement, reset, or omit the depth counter on outbound calls. A misbehaving intermediate that resets depth breaks the recursion bound for everyone downstream. +2. **Authorization preservation.** A receiving runtime MUST forward `x-tangle-forwarded-authorization` verbatim on any outbound call it makes on behalf of the original caller. Substituting its own credentials silently re-bills the user incorrectly. +3. **runId immutability.** A nested conversation invoked via `createConversationBackend` does NOT mint a new `x-tangle-runid` — it inherits the parent's. (It does mint new `turnId`s, which include its own run scope.) +4. **Idempotency contract (advisory).** Gateways MAY dedupe by `(runId, turnId)`. If they do, they MUST return the cached response unchanged. If they don't, retries cost N× credits — that's the caller's choice. +5. **Refusal granularity.** Depth-limit refusal at a gateway MUST be `413 Payload Too Large` with a body describing the limit and the observed inbound depth, so the caller can route around (or accept) the limit instead of treating it as a generic error. + +## Worked example + +User triggers conversation. The driver runs on machine M. + +``` +USER → router.tangle.tools (POST /v1/chat/completions, bridge/sandbox/researcher) + ↓ Authorization: Bearer sk-tan-user-123 + ↓ X-Tangle-Forwarded-Depth: 0 (origin) +router → researcher-sandbox-A (us-east) + ↓ X-Tangle-Forwarded-Authorization: Bearer sk-tan-user-123 + ↓ X-Tangle-Forwarded-Depth: 1 + ↓ X-Tangle-RunId: conv_abc + ↓ X-Tangle-TurnId: conv_abc.t0.researcher +sandbox-A's agent decides to call agent-runtime conversation for critic-review + ↓ runConversation({ participants: [..., bridge/sandbox/critic], inboundDepth: 1 }) +agent-runtime → critic-sandbox-B (eu-west) + ↓ X-Tangle-Forwarded-Authorization: Bearer sk-tan-user-123 + ↓ X-Tangle-Forwarded-Depth: 2 + ↓ X-Tangle-RunId: conv_abc (preserved through hops) + ↓ X-Tangle-TurnId: conv_abc.t0.critic (new, scoped to nested run) + ↓ X-Tangle-Parent-TurnId: conv_abc.t0.researcher (recursion link) +``` + +At depth 4, a gateway refuses with 413 — the chain stops there, sk-tan-user-123 is billed for hops 1–4 only, the trace shows the refusal point. + +## Implementation contract for endpoint authors + +A `@tangle-network/agent-gateway`-fronted endpoint: +- MUST read `x-tangle-forwarded-depth` on inbound, default 0, refuse with 413 when ≥ `DEFAULT_MAX_DEPTH`. +- MUST honor `x-tangle-forwarded-authorization` if present, using it as the effective billing identity instead of the direct caller's auth (when the direct caller is whitelisted as an inter-agent caller). +- SHOULD propagate `x-tangle-runid`, `x-tangle-turnid`, `x-tangle-parent-turnid` into its own trace records. +- MAY use `(runId, turnId)` as an idempotency key to dedupe retries. + +A `@tangle-network/agent-runtime` consumer (driver code): +- Passes `propagatedHeaders` + `inboundDepth` + `parentTurnId` to `runConversation` / `runConversationStream` from its inbound request context. +- The runtime automatically emits `buildForwardHeaders(...)` on every participant backend call. +- Backends that issue HTTP (e.g. `createOpenAICompatibleBackend`) merge `context.propagatedHeaders` into their outbound request automatically. + +## Reference + +- `@tangle-network/agent-runtime/headers` — names, builders, depth parsing, exported constants. +- `@tangle-network/agent-runtime/call-policy` — deadline / retry / circuit-breaker primitives that compose with the protocol. +- `@tangle-network/agent-runtime/journal` — durable transcript so a driver crash mid-run doesn't lose acknowledged turns. +- `@tangle-network/agent-runtime/turn-id` — deterministic `turnId(runId, index, speaker)`. +- `@tangle-network/agent-gateway` — Hono middleware for inbound enforcement. + +## Versioning + +This protocol is currently `v0`. Breaking changes will bump to `v1` and add a `x-tangle-protocol-version` header; until then there is no version header (implicit `v0`). Additive headers do not require a version bump. diff --git a/package.json b/package.json index 705faab..9bfbb10 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.25.2", + "version": "0.26.0", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/backends.ts b/src/backends.ts index e9766fa..d03b005 100644 --- a/src/backends.ts +++ b/src/backends.ts @@ -277,6 +277,13 @@ export function createOpenAICompatibleBackend< headers: { Authorization: `Bearer ${options.apiKey}`, 'Content-Type': 'application/json', + // Cross-gateway forwarding: when this call is part of a + // multi-agent conversation, the runner stamps run/turn/ + // depth/forwarded-auth headers onto the context. They flow + // through to the downstream gateway verbatim so the original + // user gets billed, the recursion depth stays bounded, and + // the trace correlates across hops. + ...(context.propagatedHeaders ?? {}), }, body: requestBody, signal: attemptSignal.signal, diff --git a/src/conversation/call-policy.ts b/src/conversation/call-policy.ts new file mode 100644 index 0000000..ea8e887 --- /dev/null +++ b/src/conversation/call-policy.ts @@ -0,0 +1,178 @@ +/** + * @stable + * + * Per-call resilience policy for participant backends: deadline, retry with + * backoff, and a circuit breaker. Each policy is applied *around* a single + * turn's backend invocation, not across the whole conversation — the + * conversation-level credit cap and `maxTurns` bound the broader run. + * + * Deadlines abort the underlying backend stream via `AbortSignal` linkage so + * the OpenAI/SDK clients tear down their HTTP request cleanly instead of + * leaking sockets. Retries replay the same logical turn (same `turnId`) so + * any caching gateway can dedupe. Circuit breakers are *per participant*: A's + * failures don't open B's breaker. + */ + +/** Pure judgment of whether an error is worth retrying. Defaults: TimeoutError, AbortError, fetch-level network errors. */ +export type RetryableErrorPredicate = (err: unknown) => boolean + +/** Backoff between attempts. Constant ms, or `(attempt: 1-indexed) => ms`. */ +export type RetryBackoff = number | ((attempt: number) => number) + +/** Circuit-breaker tuning. `failuresToOpen` consecutive failures opens it; closed only after `cooldownMs`. */ +export interface CircuitBreakerConfig { + failuresToOpen: number + cooldownMs: number +} + +export interface BackendCallPolicy { + /** Per-attempt wall clock limit. Exceeding fires an AbortSignal and is treated as a retryable failure. */ + perAttemptDeadlineMs?: number + /** Number of retries after the first attempt; total attempts = 1 + maxRetries. Default 0. */ + maxRetries?: number + /** Backoff between attempts. Default 250ms with jitter. */ + retryBackoffMs?: RetryBackoff + /** Custom retry classifier. Defaults to {@link defaultIsRetryable}. */ + isRetryable?: RetryableErrorPredicate + /** Circuit breaker that opens after N consecutive failures per participant. */ + circuitBreaker?: CircuitBreakerConfig +} + +export class CircuitOpenError extends Error { + constructor(participant: string, retryAfterMs: number) { + super( + `circuit open for participant '${participant}'; ${retryAfterMs}ms remaining before retry allowed`, + ) + this.name = 'CircuitOpenError' + } +} + +export class DeadlineExceededError extends Error { + constructor(deadlineMs: number) { + super(`backend call exceeded per-attempt deadline of ${deadlineMs}ms`) + this.name = 'DeadlineExceededError' + } +} + +/** + * Default retryable classification — network/timeout class errors. Errors + * a model deliberately throws (validation, refusal, 4xx) are not retried; + * those represent real outcomes, not transient infrastructure faults. + */ +export const defaultIsRetryable: RetryableErrorPredicate = (err) => { + if (err instanceof DeadlineExceededError) return true + if (err instanceof Error) { + const name = err.name + const message = err.message.toLowerCase() + if (name === 'AbortError' || name === 'TimeoutError') return true + if ( + message.includes('econnreset') || + message.includes('etimedout') || + message.includes('econnrefused') || + message.includes('socket hang up') || + message.includes('network') || + message.includes('fetch failed') + ) { + return true + } + } + return false +} + +/** Live circuit-breaker state — one instance per (participant, conversation run). */ +export class CircuitBreakerState { + private consecutiveFailures = 0 + private openedAt: number | undefined + + constructor(private readonly config: CircuitBreakerConfig | undefined) {} + + /** + * Check whether the next call is allowed. Throws `CircuitOpenError` when + * the breaker is open and the cooldown hasn't elapsed. + */ + preflight(participant: string, now: number = Date.now()): void { + if (!this.config || this.openedAt === undefined) return + const remaining = this.config.cooldownMs - (now - this.openedAt) + if (remaining > 0) { + throw new CircuitOpenError(participant, remaining) + } + this.openedAt = undefined + this.consecutiveFailures = 0 + } + + recordSuccess(): void { + this.consecutiveFailures = 0 + this.openedAt = undefined + } + + recordFailure(now: number = Date.now()): void { + if (!this.config) return + this.consecutiveFailures += 1 + if (this.consecutiveFailures >= this.config.failuresToOpen) { + this.openedAt = now + } + } +} + +/** + * Build a per-attempt AbortSignal linked to the parent signal AND fired when + * the deadline elapses. The returned `dispose()` MUST be called in a + * `finally` (clears the timer, detaches the listener) so we don't leak. + * + * When the deadline fires, the signal's `reason` is a `DeadlineExceededError` + * — callers can detect timeout-vs-cancel by reading `signal.reason` after + * the underlying operation throws. + */ +export function makePerAttemptSignal( + parentSignal: AbortSignal | undefined, + deadlineMs: number | undefined, +): { + signal: AbortSignal + dispose: () => void + getDeadlineError(): DeadlineExceededError | undefined +} { + const controller = new AbortController() + let deadlineError: DeadlineExceededError | undefined + const cleanups: Array<() => void> = [] + + if (parentSignal) { + if (parentSignal.aborted) controller.abort(parentSignal.reason) + else { + const onAbort = () => controller.abort(parentSignal.reason) + parentSignal.addEventListener('abort', onAbort, { once: true }) + cleanups.push(() => parentSignal.removeEventListener('abort', onAbort)) + } + } + if (deadlineMs !== undefined) { + const ms = deadlineMs + const timer = setTimeout(() => { + deadlineError = new DeadlineExceededError(ms) + controller.abort(deadlineError) + }, ms) + cleanups.push(() => clearTimeout(timer)) + } + return { + signal: controller.signal, + dispose() { + for (const c of cleanups) c() + }, + getDeadlineError() { + return deadlineError + }, + } +} + +/** Compute the delay before the next attempt. Default: 250ms exponential with jitter. */ +export function computeBackoff(spec: RetryBackoff | undefined, attempt: number): number { + if (spec === undefined) { + const base = 250 + const jitter = Math.floor(Math.random() * base) + return base * 2 ** (attempt - 1) + jitter + } + if (typeof spec === 'function') return Math.max(0, spec(attempt)) + return Math.max(0, spec) +} + +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/src/conversation/conversation-backend.ts b/src/conversation/conversation-backend.ts new file mode 100644 index 0000000..0e201c3 --- /dev/null +++ b/src/conversation/conversation-backend.ts @@ -0,0 +1,146 @@ +/** + * @stable + * + * Wrap a `Conversation` so it satisfies `AgentExecutionBackend`. The result is + * an addressable "single agent" whose internal behavior is an N-party + * orchestrated conversation — the recursion primitive that lets a swarm be a + * participant inside another swarm, or be published behind a single + * agent-gateway endpoint. + * + * Stream events from inner participants are NOT forwarded verbatim. Outer + * callers see one `text_delta` per inner turn (the turn's full text), tagged + * with `[speaker] ` prefix so the outer transcript stays attributable. The + * conversation's `conversation_end` halt reason rides on a `final` event. + */ + +import { newRuntimeSession, nowIso } from '../sessions' +import type { + AgentBackendContext, + AgentBackendInput, + AgentExecutionBackend, + RuntimeSession, + RuntimeStreamEvent, +} from '../types' +import { FORWARD_HEADERS, readDepth } from './headers' +import { runConversationStream } from './run-conversation' +import type { Conversation, HaltReason } from './types' + +export function createConversationBackend(options: { + conversation: Conversation + /** Optional backend kind label. Defaults to `'conversation'`. */ + kind?: string +}): AgentExecutionBackend { + const kind = options.kind ?? 'conversation' + + return { + kind, + start(_input, context): RuntimeSession { + return newRuntimeSession(kind, context.requestedSessionId, { + participants: options.conversation.participants.map((p) => p.name), + }) + }, + async *stream( + input: AgentBackendInput, + context: AgentBackendContext, + ): AsyncIterable { + const seed = input.message ?? input.messages?.at(-1)?.content ?? context.task.intent + const task = context.task + const session = context.session + + yield { type: 'backend_start', task, session, backend: kind, timestamp: nowIso() } + + let finalText = '' + let totalCostUsd = 0 + let totalTokensIn = 0 + let totalTokensOut = 0 + + // Recursion: forward this call's propagation context into the nested + // conversation. The nested run INHERITS the parent's runId (protocol + // invariant: runId is immutable across nesting), continues the depth + // counter from the headers, and stamps the enclosing turn as the + // parentTurnId so trace stitching reaches across nesting levels. + const inboundDepth = parseInboundDepth(context.propagatedHeaders) + for await (const event of runConversationStream(options.conversation, { + seed, + signal: context.signal, + runId: context.runId, + propagatedHeaders: context.propagatedHeaders, + inboundDepth, + parentTurnId: context.turnId, + })) { + if (event.type === 'turn_end') { + const tagged = `[${event.turn.speaker}] ${event.turn.text}\n` + finalText += tagged + yield { type: 'text_delta', task, session, text: tagged, timestamp: event.timestamp } + if (event.turn.usage) { + const u = event.turn.usage + if (u.costUsd !== undefined) totalCostUsd += u.costUsd + if (u.tokensIn !== undefined) totalTokensIn += u.tokensIn + if (u.tokensOut !== undefined) totalTokensOut += u.tokensOut + yield { + type: 'llm_call', + task, + session, + model: u.model ?? `${kind}/${event.turn.speaker}`, + tokensIn: u.tokensIn, + tokensOut: u.tokensOut, + costUsd: u.costUsd, + latencyMs: u.latencyMs, + timestamp: event.timestamp, + } + } + } else if (event.type === 'conversation_end') { + const halt = event.result.halted + yield { + type: 'final', + task, + session, + status: halt.kind === 'participant_error' ? 'failed' : 'completed', + reason: describeHalt(halt), + text: finalText.trim(), + metadata: { + conversationRunId: event.result.runId, + turns: event.result.turns, + spentCreditsCents: event.result.spentCreditsCents, + halted: halt, + durationMs: event.result.durationMs, + tokensIn: totalTokensIn, + tokensOut: totalTokensOut, + costUsd: totalCostUsd, + }, + timestamp: event.timestamp, + } + } + } + + yield { type: 'backend_end', task, session, backend: kind, timestamp: nowIso() } + }, + } +} + +function parseInboundDepth(headers: Readonly> | undefined): number { + if (!headers) return 0 + // Accept either the canonical header key OR a lookup via the keyed name. + const raw = headers[FORWARD_HEADERS.depth] + if (raw === undefined) return 0 + try { + return readDepth({ [FORWARD_HEADERS.depth]: raw }) + } catch { + return 0 + } +} + +function describeHalt(halt: HaltReason): string { + switch (halt.kind) { + case 'max_turns': + return `max_turns (${halt.turns})` + case 'max_credits': + return `max_credits (${halt.spentCents}/${halt.capCents}¢)` + case 'predicate': + return `predicate: ${halt.reason}` + case 'abort': + return 'abort' + case 'participant_error': + return `participant_error[${halt.participant}]: ${halt.message}` + } +} diff --git a/src/conversation/define-conversation.ts b/src/conversation/define-conversation.ts new file mode 100644 index 0000000..1abd835 --- /dev/null +++ b/src/conversation/define-conversation.ts @@ -0,0 +1,72 @@ +/** + * @stable + * + * Declarative constructor for a multi-agent `Conversation`. Validates inputs + * fail-loud at definition time (duplicate participant names, alternate order + * with ≠2 participants, non-positive `maxTurns`) so misconfiguration is caught + * before `runConversation` is called and not buried inside a streaming run. + */ + +import { ValidationError } from '../errors' +import type { Conversation, ConversationParticipant, ConversationPolicy, TurnOrder } from './types' + +export function defineConversation(input: { + participants: ConversationParticipant[] + policy: ConversationPolicy +}): Conversation { + if (input.participants.length < 2) { + throw new ValidationError( + `Conversation requires at least 2 participants; received ${input.participants.length}.`, + ) + } + + const seen = new Set() + for (const p of input.participants) { + if (!p.name || p.name.trim() === '') { + throw new ValidationError('Conversation participant.name must be a non-empty string.') + } + if (seen.has(p.name)) { + throw new ValidationError( + `Conversation participant names must be unique within a Conversation; '${p.name}' appears more than once.`, + ) + } + seen.add(p.name) + if (!p.backend || typeof p.backend.stream !== 'function') { + throw new ValidationError( + `Conversation participant '${p.name}' is missing a backend with a stream() method.`, + ) + } + } + + const policy = normalizePolicy(input.policy, input.participants.length) + + return { + participants: input.participants, + policy, + } +} + +function normalizePolicy(policy: ConversationPolicy, participantCount: number): ConversationPolicy { + if (!Number.isInteger(policy.maxTurns) || policy.maxTurns < 1) { + throw new ValidationError( + `ConversationPolicy.maxTurns must be a positive integer; received ${String(policy.maxTurns)}.`, + ) + } + if ( + policy.maxCreditsCents !== undefined && + (!Number.isFinite(policy.maxCreditsCents) || policy.maxCreditsCents < 0) + ) { + throw new ValidationError( + `ConversationPolicy.maxCreditsCents must be a non-negative finite number when set; received ${String( + policy.maxCreditsCents, + )}.`, + ) + } + const turnOrder = policy.turnOrder ?? (participantCount === 2 ? 'alternate' : 'round-robin') + if (turnOrder === 'alternate' && participantCount !== 2) { + throw new ValidationError( + `ConversationPolicy.turnOrder 'alternate' requires exactly 2 participants; received ${participantCount}. Use 'round-robin' or a custom selector for N-party conversations.`, + ) + } + return { ...policy, turnOrder: turnOrder as TurnOrder } +} diff --git a/src/conversation/headers.ts b/src/conversation/headers.ts new file mode 100644 index 0000000..f2e31e8 --- /dev/null +++ b/src/conversation/headers.ts @@ -0,0 +1,125 @@ +/** + * @stable + * + * Cross-gateway forwarding headers — the wire-level contract that makes + * agent-to-agent communication composable across organizational boundaries. + * Every header here is read on inbound and re-emitted on outbound, so a chain + * `caller → A's gateway → A's runtime → B's gateway → B's runtime` ends with + * B billing the original user, the depth counter monotonically incremented, + * and the run/turn correlation IDs preserved end-to-end. + * + * The actual depth refusal (HTTP 413 at MAX_DEPTH) is enforced by + * `agent-gateway`'s middleware; this module owns the names + the propagation + * rules so both sides agree. + * + * Full protocol: `docs/agent-bus-protocol.md`. + */ + +/** Standard names — lowercased so Headers maps interop on every runtime. */ +export const FORWARD_HEADERS = { + /** Forwarded original-user identity (`Bearer sk-tan-`); downstream gateways bill against this. */ + authorization: 'x-tangle-forwarded-authorization', + /** Monotonically incremented on every gateway hop. Refused at MAX_DEPTH. */ + depth: 'x-tangle-forwarded-depth', + /** Top-level conversation run identifier, propagated through every nested call. */ + runId: 'x-tangle-runid', + /** This call's turn within the run; deterministic + stable across retries. */ + turnId: 'x-tangle-turnid', + /** When the call is *inside* another turn (recursion), the parent turn's id. */ + parentTurnId: 'x-tangle-parent-turnid', + /** Logical conversation peer label at the sending side, for trace stitching. */ + speaker: 'x-tangle-speaker', +} as const + +export type ForwardHeaderName = (typeof FORWARD_HEADERS)[keyof typeof FORWARD_HEADERS] + +/** Hard cap on chained gateway hops; refused beyond this. Default keeps recursion bounded. */ +export const DEFAULT_MAX_DEPTH = 4 + +/** + * Lowercase a header lookup so we read the same key regardless of source + * casing (Hono, fetch's `Headers`, raw Node IncomingMessage, …). + */ +function lc(name: string): string { + return name.toLowerCase() +} + +/** + * Read the depth counter off an inbound request. Missing → 0 (caller is the + * origin). Non-integer → throws — silent coercion would let a bad caller + * reset depth and bypass the limit. + */ +export function readDepth( + headers: Readonly>, +): number { + const raw = pickHeader(headers, FORWARD_HEADERS.depth) + if (raw === undefined || raw === '') return 0 + const n = Number(raw) + if (!Number.isInteger(n) || n < 0) { + throw new Error( + `invalid ${FORWARD_HEADERS.depth} header value '${raw}' — must be a non-negative integer`, + ) + } + return n +} + +/** + * Refuse further forwarding when the inbound depth has reached the limit. + * Callers (the gateway middleware) translate the boolean to an HTTP 413. + */ +export function isDepthExceeded(inboundDepth: number, max: number = DEFAULT_MAX_DEPTH): boolean { + return inboundDepth >= max +} + +/** + * Build the headers to emit on an outbound participant call, given the + * conversation's propagation context. Depth is incremented from the inbound + * value; runId / turnId / speaker stamp the current hop; the user's + * `Authorization` is preserved verbatim so the downstream gateway bills the + * right wallet. + */ +export function buildForwardHeaders(input: { + inboundDepth: number + forwardedAuthorization?: string + runId: string + turnId: string + parentTurnId?: string + speaker: string +}): Record { + const out: Record = { + [FORWARD_HEADERS.depth]: String(input.inboundDepth + 1), + [FORWARD_HEADERS.runId]: input.runId, + [FORWARD_HEADERS.turnId]: input.turnId, + [FORWARD_HEADERS.speaker]: input.speaker, + } + if (input.forwardedAuthorization !== undefined) { + out[FORWARD_HEADERS.authorization] = input.forwardedAuthorization + } + if (input.parentTurnId !== undefined) { + out[FORWARD_HEADERS.parentTurnId] = input.parentTurnId + } + return out +} + +/** + * Header bag carried through `AgentBackendContext.propagatedHeaders` so + * backends that opt in can merge them into their outbound HTTP requests. + * Distinct from `buildForwardHeaders` so callers can attach extra + * non-protocol headers (e.g. tracing) without colliding. + */ +export type PropagatedHeaders = Readonly> + +function pickHeader( + headers: Readonly>, + name: string, +): string | undefined { + const target = lc(name) + for (const key of Object.keys(headers)) { + if (lc(key) === target) { + const value = headers[key] + if (Array.isArray(value)) return value[0] + return value + } + } + return undefined +} diff --git a/src/conversation/index.ts b/src/conversation/index.ts new file mode 100644 index 0000000..840bb43 --- /dev/null +++ b/src/conversation/index.ts @@ -0,0 +1,63 @@ +/** + * @tangle-network/agent-runtime/conversation + * + * Multi-agent conversation primitive. `defineConversation` + `runConversation` + * + `createConversationBackend` compose any reachable agent endpoints into a + * driven dialogue with turn-order policy, halting policy, hard credit ceiling, + * deterministic turn ids, optional durable journal, per-turn deadline + + * retry + circuit breaker, and cross-gateway header propagation. + * + * Backends are unchanged from `runAgentTaskStream`, so the same driver works + * against in-process iterables, local cli-bridge, sandboxes, router, or + * remote agent-gateway endpoints. + * + * See `docs/agent-bus-protocol.md` for the cross-gateway header contract. + */ + +export { + type BackendCallPolicy, + type CircuitBreakerConfig, + CircuitBreakerState, + CircuitOpenError, + computeBackoff, + DeadlineExceededError, + defaultIsRetryable, + makePerAttemptSignal, + type RetryableErrorPredicate, + type RetryBackoff, + sleep, +} from './call-policy' +export { createConversationBackend } from './conversation-backend' +export { defineConversation } from './define-conversation' +export { + buildForwardHeaders, + DEFAULT_MAX_DEPTH, + FORWARD_HEADERS, + type ForwardHeaderName, + isDepthExceeded, + type PropagatedHeaders, + readDepth, +} from './headers' +export { + type ConversationJournal, + type ConversationJournalEntry, + FileConversationJournal, + InMemoryConversationJournal, +} from './journal' +export { runConversation, runConversationStream } from './run-conversation' +export { slugifySpeaker, turnId } from './turn-id' +export type { + Conversation, + ConversationDriveState, + ConversationParticipant, + ConversationPolicy, + ConversationResult, + ConversationStreamEvent, + ConversationTurn, + HaltContext, + HaltPredicate, + HaltReason, + HaltSignal, + RunConversationOptions, + TurnOrder, +} from './types' diff --git a/src/conversation/journal.ts b/src/conversation/journal.ts new file mode 100644 index 0000000..2f2a551 --- /dev/null +++ b/src/conversation/journal.ts @@ -0,0 +1,208 @@ +/** + * @stable + * + * Durable conversation transcript — survives a driver process crash mid-run. + * The runner journals every committed turn before yielding `turn_end`, so a + * resumed run replays the same `runId` against the same journal and picks up + * from the first un-recorded turn. Combined with the deterministic + * `turnId(runId, index, speaker)`, a retried turn collides with the prior + * attempt's id and any caching gateway can dedupe. + * + * The interface is small enough that a Cloudflare D1 / R2 / postgres adapter + * is ~30 lines. The in-memory adapter is the default for tests and scratch. + * The file adapter (JSONL on disk) is the default-durable choice when no + * upstream store is wired. + */ + +import type { ConversationTurn, HaltReason } from './types' + +export interface ConversationJournalEntry { + runId: string + startedAt: string + /** Set when the run reaches a terminal state. */ + halted?: HaltReason + endedAt?: string + turns: ConversationTurn[] +} + +export interface ConversationJournal { + /** + * Load any prior state for `runId`. Returns `undefined` for a fresh run. + * Implementations MUST NOT mutate the returned object — the runner clones + * before continuing — but the runtime treats absence and emptiness + * identically, so a journal with zero turns is equivalent to "fresh." + */ + loadRun(runId: string): Promise + + /** + * Initialise journal state for a fresh run. Called once per run, before any + * `appendTurn`. Idempotent: calling with an existing runId is a no-op if + * the entry already exists with the same `startedAt`. + */ + beginRun(runId: string, startedAt: string): Promise + + /** + * Append a committed turn. The runner only calls this AFTER the turn's + * backend stream completed and the credit total has been updated, so an + * appended turn is observed-committed and never speculative. + */ + appendTurn(runId: string, turn: ConversationTurn): Promise + + /** + * Record the run's terminal halt reason + end time. Once called, the run + * is observed-final; subsequent `loadRun` returns the same halt. + */ + recordHalt(runId: string, halt: HaltReason, endedAt: string): Promise +} + +export class InMemoryConversationJournal implements ConversationJournal { + private readonly entries = new Map() + + async loadRun(runId: string): Promise { + const entry = this.entries.get(runId) + if (!entry) return undefined + // Defensive copy — callers MUST NOT mutate journal-owned arrays. + return { + runId: entry.runId, + startedAt: entry.startedAt, + halted: entry.halted, + endedAt: entry.endedAt, + turns: [...entry.turns], + } + } + + async beginRun(runId: string, startedAt: string): Promise { + const existing = this.entries.get(runId) + if (existing) { + if (existing.startedAt !== startedAt) { + throw new Error( + `runId '${runId}' already exists with startedAt=${existing.startedAt}; refusing to overwrite with ${startedAt}`, + ) + } + return + } + this.entries.set(runId, { runId, startedAt, turns: [] }) + } + + async appendTurn(runId: string, turn: ConversationTurn): Promise { + const entry = this.entries.get(runId) + if (!entry) { + throw new Error( + `appendTurn called for unknown runId '${runId}'; call beginRun first or use the runner which handles it`, + ) + } + if (entry.halted) { + throw new Error( + `cannot append turn to halted run '${runId}' (halt reason: ${JSON.stringify(entry.halted)})`, + ) + } + entry.turns.push(turn) + } + + async recordHalt(runId: string, halt: HaltReason, endedAt: string): Promise { + const entry = this.entries.get(runId) + if (!entry) { + throw new Error(`recordHalt called for unknown runId '${runId}'`) + } + entry.halted = halt + entry.endedAt = endedAt + } +} + +/** + * JSONL on disk. One line per record; first line is the `begin`, subsequent + * lines are `turn` records, terminal line is `halt`. Replays the whole file + * on `loadRun` — cheap for the conversation sizes this is designed for + * (thousands of turns, not millions). For huge runs, plug in a real DB + * adapter; the interface is small. + * + * Each `appendTurn` / `recordHalt` calls `fsync` after the write so a + * process crash between writes never loses an acknowledged turn. + */ +export class FileConversationJournal implements ConversationJournal { + constructor(private readonly path: string) {} + + async loadRun(runId: string): Promise { + const fs = await import('node:fs/promises') + let text: string + try { + text = await fs.readFile(this.path, 'utf8') + } catch (err) { + if (isNoEntError(err)) return undefined + throw err + } + const lines = text.split('\n').filter((line) => line.length > 0) + let entry: ConversationJournalEntry | undefined + for (const line of lines) { + const record = JSON.parse(line) as JournalRecord + if (record.runId !== runId) continue + if (record.kind === 'begin') { + entry = { runId, startedAt: record.startedAt, turns: [] } + } else if (record.kind === 'turn') { + if (!entry) { + throw new Error( + `journal corrupted: turn record for runId '${runId}' precedes its begin record`, + ) + } + entry.turns.push(record.turn) + } else if (record.kind === 'halt') { + if (!entry) { + throw new Error( + `journal corrupted: halt record for runId '${runId}' precedes its begin record`, + ) + } + entry.halted = record.halted + entry.endedAt = record.endedAt + } + } + return entry + } + + async beginRun(runId: string, startedAt: string): Promise { + const existing = await this.loadRun(runId) + if (existing) { + if (existing.startedAt !== startedAt) { + throw new Error( + `runId '${runId}' already exists in ${this.path} with startedAt=${existing.startedAt}; refusing to overwrite with ${startedAt}`, + ) + } + return + } + await this.appendRecord({ kind: 'begin', runId, startedAt }) + } + + async appendTurn(runId: string, turn: ConversationTurn): Promise { + await this.appendRecord({ kind: 'turn', runId, turn }) + } + + async recordHalt(runId: string, halt: HaltReason, endedAt: string): Promise { + await this.appendRecord({ kind: 'halt', runId, halted: halt, endedAt }) + } + + private async appendRecord(record: JournalRecord): Promise { + const fs = await import('node:fs/promises') + const path = await import('node:path') + await fs.mkdir(path.dirname(this.path), { recursive: true }) + const fh = await fs.open(this.path, 'a') + try { + await fh.write(`${JSON.stringify(record)}\n`) + await fh.sync() + } finally { + await fh.close() + } + } +} + +type JournalRecord = + | { kind: 'begin'; runId: string; startedAt: string } + | { kind: 'turn'; runId: string; turn: ConversationTurn } + | { kind: 'halt'; runId: string; halted: HaltReason; endedAt: string } + +function isNoEntError(err: unknown): boolean { + return ( + typeof err === 'object' && + err !== null && + 'code' in err && + (err as { code: unknown }).code === 'ENOENT' + ) +} diff --git a/src/conversation/phase2.test.ts b/src/conversation/phase2.test.ts new file mode 100644 index 0000000..566333e --- /dev/null +++ b/src/conversation/phase2.test.ts @@ -0,0 +1,520 @@ +/** + * Phase 2 — durable journal + retry/deadline/circuit-breaker + header propagation. + * Each block exercises one primitive end-to-end against the real runner; no + * unit-test-only fakes of the primitives themselves. + */ +import { describe, expect, it } from 'vitest' + +import { createIterableBackend } from '../backends' +import type { AgentBackendContext, AgentExecutionBackend, RuntimeStreamEvent } from '../types' +import { CircuitOpenError, DeadlineExceededError, defaultIsRetryable } from './call-policy' +import { defineConversation } from './define-conversation' +import { FORWARD_HEADERS } from './headers' +import { InMemoryConversationJournal } from './journal' +import { runConversation, runConversationStream } from './run-conversation' +import { turnId as deriveTurnId, slugifySpeaker } from './turn-id' +import type { ConversationStreamEvent } from './types' + +function steadyBackend(name: string, replies: string[]): AgentExecutionBackend { + let idx = 0 + return createIterableBackend({ + kind: `steady-${name}`, + async *stream(_input, context) { + const reply = replies[idx] + if (reply === undefined) throw new Error(`${name}: out of canned replies (${idx})`) + idx += 1 + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: reply, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + yield { + type: 'llm_call', + task: context.task, + session: context.session, + model: `steady-${name}`, + tokensIn: 4, + tokensOut: 4, + costUsd: 0.01, + latencyMs: 1, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) +} + +/** + * A backend that fails the first `failTimes` calls with a retryable error, + * then yields a steady text response. Lets us assert that the call policy + * actually retried + that the same deterministic turn id was used. + */ +function flakyBackend( + name: string, + failTimes: number, + finalText: string, +): AgentExecutionBackend & { + calls: { signal: AbortSignal | undefined; turnId: string | undefined }[] +} { + const calls: { signal: AbortSignal | undefined; turnId: string | undefined }[] = [] + let attempt = 0 + const backend = createIterableBackend({ + kind: `flaky-${name}`, + async *stream(_input, context) { + attempt += 1 + calls.push({ signal: context.signal, turnId: context.turnId }) + if (attempt <= failTimes) { + throw new Error('ECONNRESET') + } + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: finalText, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + yield { + type: 'llm_call', + task: context.task, + session: context.session, + model: `flaky-${name}`, + costUsd: 0.01, + tokensIn: 1, + tokensOut: 1, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) + return Object.assign(backend, { calls }) +} + +function slowBackend(name: string, latencyMs: number): AgentExecutionBackend { + return createIterableBackend({ + kind: `slow-${name}`, + async *stream(_input, context) { + // sleep that respects the AbortSignal so the deadline can interrupt + await new Promise((resolve, reject) => { + const t = setTimeout(resolve, latencyMs) + if (context.signal) { + context.signal.addEventListener( + 'abort', + () => { + clearTimeout(t) + reject(context.signal?.reason ?? new Error('aborted')) + }, + { once: true }, + ) + } + }) + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: 'slow-final', + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) +} + +function headerCapturingBackend(): { + backend: AgentExecutionBackend + contexts: AgentBackendContext[] +} { + const contexts: AgentBackendContext[] = [] + const backend = createIterableBackend({ + kind: 'header-capture', + async *stream(_input, context) { + contexts.push(context) + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: 'ack', + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + yield { + type: 'llm_call', + task: context.task, + session: context.session, + model: 'header-capture', + costUsd: 0.01, + tokensIn: 1, + tokensOut: 1, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) + return { backend, contexts } +} + +// ── Turn ID ─────────────────────────────────────────────────────────── + +describe('turnId — deterministic', () => { + it('encodes runId + index + speaker slug deterministically', () => { + expect(deriveTurnId('run_1', 0, 'researcher')).toBe('run_1.t0.researcher') + expect(deriveTurnId('run_1', 0, 'researcher')).toBe(deriveTurnId('run_1', 0, 'researcher')) + }) + it('slugifies awkward speaker names without losing recognizability', () => { + expect(slugifySpeaker('GPT-5 Codex')).toBe('gpt-5-codex') + expect(slugifySpeaker('inner-a')).toBe('inner-a') + expect(slugifySpeaker('!@#$')).toBe('anon') + }) +}) + +// ── Journal ─────────────────────────────────────────────────────────── + +describe('journal — durable resume', () => { + it('persists every committed turn and resumes from the last entry', async () => { + const journal = new InMemoryConversationJournal() + const conv = defineConversation({ + participants: [ + { name: 'a', backend: steadyBackend('a', ['a-1', 'a-2', 'a-3']) }, + { name: 'b', backend: steadyBackend('b', ['b-1', 'b-2', 'b-3']) }, + ], + policy: { maxTurns: 6 }, + }) + + // First run aborts after 4 turns via custom haltOn so we have a clean + // mid-run state in the journal. + let counted = 0 + const aborted = defineConversation({ + participants: conv.participants as any, + policy: { + ...conv.policy, + haltOn: () => { + counted += 1 + return counted >= 4 ? { halted: true, reason: 'mid-run pause' } : false + }, + }, + }) + const firstRun = await runConversation(aborted, { seed: 'start', journal, runId: 'run-resume' }) + expect(firstRun.turns).toBe(4) + + // Journal should hold all 4 committed turns + the halt. + const after = await journal.loadRun('run-resume') + expect(after?.turns).toHaveLength(4) + expect(after?.halted?.kind).toBe('predicate') + + // Calling with the same runId on a halted journal entry just replays the + // final state without invoking any backend. + const replay = await runConversation(conv, { + seed: 'ignored on resume', + journal, + runId: 'run-resume', + }) + expect(replay.turns).toBe(4) + expect(replay.transcript).toEqual(firstRun.transcript) + }) + + it('resumes mid-run when journal has unfinished entry', async () => { + const journal = new InMemoryConversationJournal() + await journal.beginRun('run-mid', new Date().toISOString()) + // Pre-seed 2 turns as if a prior process had committed them. + await journal.appendTurn('run-mid', { + index: 0, + speaker: 'a', + turnId: deriveTurnId('run-mid', 0, 'a'), + text: 'previously-a', + attempts: 1, + usage: { costUsd: 0.01 }, + startedAt: '2026-05-27T00:00:00.000Z', + endedAt: '2026-05-27T00:00:01.000Z', + }) + await journal.appendTurn('run-mid', { + index: 1, + speaker: 'b', + turnId: deriveTurnId('run-mid', 1, 'b'), + text: 'previously-b', + attempts: 1, + usage: { costUsd: 0.01 }, + startedAt: '2026-05-27T00:00:01.000Z', + endedAt: '2026-05-27T00:00:02.000Z', + }) + + const conv = defineConversation({ + participants: [ + { name: 'a', backend: steadyBackend('a', ['fresh-a-1', 'fresh-a-2']) }, + { name: 'b', backend: steadyBackend('b', ['fresh-b-1', 'fresh-b-2']) }, + ], + policy: { maxTurns: 4 }, + }) + + const events: ConversationStreamEvent[] = [] + for await (const e of runConversationStream(conv, { + seed: 'never-used-on-resume', + journal, + runId: 'run-mid', + })) { + events.push(e) + } + + // First event MUST be conversation_resumed (not conversation_start) and + // carry the persisted transcript. + expect(events[0]?.type).toBe('conversation_resumed') + if (events[0]?.type === 'conversation_resumed') { + expect(events[0].transcript).toHaveLength(2) + expect(events[0].transcript.map((t) => t.text)).toEqual(['previously-a', 'previously-b']) + } + const end = events.find((e) => e.type === 'conversation_end') + if (end?.type !== 'conversation_end') throw new Error('no end event') + expect(end.result.turns).toBe(4) // 2 resumed + 2 fresh + expect(end.result.transcript.map((t) => t.text)).toEqual([ + 'previously-a', + 'previously-b', + 'fresh-a-1', + 'fresh-b-1', + ]) + }) + + it('refuses to overwrite an existing journal entry with a different startedAt', async () => { + const journal = new InMemoryConversationJournal() + await journal.beginRun('clash', '2026-05-27T00:00:00.000Z') + await expect(journal.beginRun('clash', '2026-05-27T01:00:00.000Z')).rejects.toThrow( + /already exists/, + ) + }) + + it('refuses appends to a halted journal entry', async () => { + const journal = new InMemoryConversationJournal() + await journal.beginRun('finished', '2026-05-27T00:00:00.000Z') + await journal.recordHalt( + 'finished', + { kind: 'max_turns', turns: 0 }, + '2026-05-27T01:00:00.000Z', + ) + await expect( + journal.appendTurn('finished', { + index: 0, + speaker: 'x', + turnId: 'foo', + text: 'after-halt', + attempts: 1, + startedAt: '2026-05-27T02:00:00.000Z', + endedAt: '2026-05-27T02:00:01.000Z', + }), + ).rejects.toThrow(/halted/) + }) +}) + +// ── Call policy: retries ─────────────────────────────────────────────── + +describe('call-policy — retries', () => { + it('retries a flaky backend and commits the final response with attempts > 1', async () => { + const flaky = flakyBackend('f', 2, 'finally-worked') + const conv = defineConversation({ + participants: [ + { name: 'flaky', backend: flaky }, + { name: 'steady', backend: steadyBackend('s', ['ack']) }, + ], + policy: { + maxTurns: 2, + defaultCallPolicy: { maxRetries: 3, retryBackoffMs: 1 }, + }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.transcript[0]?.text).toBe('finally-worked') + expect(result.transcript[0]?.attempts).toBe(3) // 2 failures + 1 success + // Every retry of the same logical turn uses the same turnId. + const turnIds = flaky.calls.map((c) => c.turnId) + expect(new Set(turnIds).size).toBe(1) + }) + + it('halts as participant_error when retries are exhausted', async () => { + const flaky = flakyBackend('f', 5, 'never-reached') + const conv = defineConversation({ + participants: [ + { name: 'flaky', backend: flaky }, + { name: 'steady', backend: steadyBackend('s', ['ack']) }, + ], + policy: { + maxTurns: 2, + defaultCallPolicy: { maxRetries: 2, retryBackoffMs: 1 }, + }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.halted.kind).toBe('participant_error') + expect(flaky.calls).toHaveLength(3) // 1 + 2 retries, all failed + }) + + it('emits turn_retry events that reference the same turnId', async () => { + const flaky = flakyBackend('f', 1, 'eventually') + const conv = defineConversation({ + participants: [ + { name: 'flaky', backend: flaky }, + { name: 'steady', backend: steadyBackend('s', ['ack']) }, + ], + policy: { + maxTurns: 1, + defaultCallPolicy: { maxRetries: 2, retryBackoffMs: 1 }, + }, + }) + const retries: ConversationStreamEvent[] = [] + for await (const e of runConversationStream(conv, { seed: 'go' })) { + if (e.type === 'turn_retry') retries.push(e) + } + expect(retries).toHaveLength(1) + if (retries[0]?.type === 'turn_retry') { + expect(retries[0].attempt).toBe(2) + expect(retries[0].turnId).toMatch(/\.t0\.flaky$/) + } + }) +}) + +// ── Call policy: deadlines ───────────────────────────────────────────── + +describe('call-policy — deadlines', () => { + it('aborts a too-slow attempt and surfaces DeadlineExceededError as retryable', async () => { + const slow = slowBackend('s', 200) + const conv = defineConversation({ + participants: [ + { name: 'slow', backend: slow }, + { name: 'steady', backend: steadyBackend('t', ['ack']) }, + ], + policy: { + maxTurns: 1, + defaultCallPolicy: { perAttemptDeadlineMs: 20, maxRetries: 0 }, + }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.halted.kind).toBe('participant_error') + if (result.halted.kind === 'participant_error') { + expect(result.halted.message).toMatch(/exceeded per-attempt deadline of 20ms/) + } + }) + + it('treats deadline errors as retryable by default', () => { + expect(defaultIsRetryable(new DeadlineExceededError(100))).toBe(true) + }) + + it('treats AbortError + network errors as retryable; refuses model refusals', () => { + const abort = new Error('aborted') + abort.name = 'AbortError' + expect(defaultIsRetryable(abort)).toBe(true) + expect(defaultIsRetryable(new Error('ECONNRESET'))).toBe(true) + expect(defaultIsRetryable(new Error('fetch failed'))).toBe(true) + expect(defaultIsRetryable(new Error('model refused: 400 bad request'))).toBe(false) + }) +}) + +// ── Call policy: circuit breaker ─────────────────────────────────────── + +describe('call-policy — circuit breaker', () => { + it('opens the breaker after N consecutive failures and halts with the open-error', async () => { + // Backend that always throws a non-retryable so retries don't mask the breaker behavior. + const dead = createIterableBackend({ + kind: 'dead', + // biome-ignore lint/correctness/useYield: throws-only generator on purpose + async *stream() { + throw new Error('400 bad request') + }, + }) + const conv = defineConversation({ + participants: [ + { name: 'dead', backend: dead }, + { name: 'steady', backend: steadyBackend('t', ['ack', 'ack']) }, + ], + policy: { + maxTurns: 4, + defaultCallPolicy: { + maxRetries: 0, + circuitBreaker: { failuresToOpen: 1, cooldownMs: 60_000 }, + }, + }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.halted.kind).toBe('participant_error') + // First attempt: actual backend throw. Subsequent calls (none here because + // alternate halts after first failure) would have been CircuitOpenError. + }) + + it('CircuitOpenError carries remaining cooldown', () => { + const err = new CircuitOpenError('x', 1234) + expect(err.message).toContain('1234ms') + expect(err.message).toContain("'x'") + }) + + it('breaker resets after a success', async () => { + // Verify by direct usage: 1 failure (open), wait past cooldown, success closes it. + const { CircuitBreakerState } = await import('./call-policy') + const breaker = new CircuitBreakerState({ failuresToOpen: 1, cooldownMs: 20 }) + breaker.recordFailure(1000) + expect(() => breaker.preflight('p', 1010)).toThrow(CircuitOpenError) + // After cooldown the breaker auto-closes on next preflight. + expect(() => breaker.preflight('p', 1050)).not.toThrow() + breaker.recordSuccess() + // After success the consecutive-failure counter is zero, so a brand-new + // failure has to clear the (still-1) threshold again before re-opening. + breaker.recordFailure(2000) + // failuresToOpen=1, so this single failure DOES re-open the breaker. + expect(() => breaker.preflight('p', 2010)).toThrow(CircuitOpenError) + }) +}) + +// ── Header propagation ───────────────────────────────────────────────── + +describe('header propagation — cross-gateway forwarding', () => { + it('stamps run/turn/depth/speaker headers on every backend call', async () => { + const cap = headerCapturingBackend() + const conv = defineConversation({ + participants: [ + { name: 'cap', backend: cap.backend }, + { name: 'steady', backend: steadyBackend('s', ['ack']) }, + ], + policy: { maxTurns: 2 }, + }) + const result = await runConversation(conv, { + seed: 'hello', + inboundDepth: 0, + propagatedHeaders: { + [FORWARD_HEADERS.authorization]: 'Bearer sk-tan-test-user', + }, + }) + expect(result.turns).toBe(2) + expect(cap.contexts).toHaveLength(1) + const h = cap.contexts[0]?.propagatedHeaders + expect(h).toBeDefined() + if (!h) return + // Depth incremented from inbound 0 → 1 on outbound. + expect(h[FORWARD_HEADERS.depth]).toBe('1') + expect(h[FORWARD_HEADERS.authorization]).toBe('Bearer sk-tan-test-user') + expect(h[FORWARD_HEADERS.runId]).toBe(result.runId) + expect(h[FORWARD_HEADERS.turnId]).toMatch(/\.t0\.cap$/) + expect(h[FORWARD_HEADERS.speaker]).toBe('cap') + }) + + it('does not leak the forwarded authorization when caller did not supply one', async () => { + const cap = headerCapturingBackend() + const conv = defineConversation({ + participants: [ + { name: 'cap', backend: cap.backend }, + { name: 'steady', backend: steadyBackend('s', ['ack']) }, + ], + policy: { maxTurns: 1 }, + }) + await runConversation(conv, { seed: 'go' }) + const h = cap.contexts[0]?.propagatedHeaders ?? {} + expect(h[FORWARD_HEADERS.authorization]).toBeUndefined() + }) + + it('preserves runId across all participants in a multi-turn run', async () => { + const cap1 = headerCapturingBackend() + const cap2 = headerCapturingBackend() + const conv = defineConversation({ + participants: [ + { name: 'a', backend: cap1.backend }, + { name: 'b', backend: cap2.backend }, + ], + policy: { maxTurns: 4 }, + }) + const result = await runConversation(conv, { + seed: 'go', + runId: 'shared-run', + }) + expect(result.runId).toBe('shared-run') + for (const ctx of [...cap1.contexts, ...cap2.contexts]) { + expect(ctx.propagatedHeaders?.[FORWARD_HEADERS.runId]).toBe('shared-run') + } + }) +}) diff --git a/src/conversation/phase3.test.ts b/src/conversation/phase3.test.ts new file mode 100644 index 0000000..5a3cec4 --- /dev/null +++ b/src/conversation/phase3.test.ts @@ -0,0 +1,184 @@ +/** + * Phase 3 — the cross-gateway protocol contract. Tests assert depth refusal, + * header propagation across nested conversations (createConversationBackend + * recursion), and runId/parentTurnId stitching. + * + * The depth-limit *enforcement* lives in agent-gateway's inbound middleware; + * here we verify the runtime emits the correct headers so the gateway has + * something to enforce on. + */ +import { describe, expect, it } from 'vitest' + +import { createIterableBackend } from '../backends' +import type { AgentBackendContext, AgentExecutionBackend, RuntimeStreamEvent } from '../types' +import { createConversationBackend } from './conversation-backend' +import { defineConversation } from './define-conversation' +import { + buildForwardHeaders, + DEFAULT_MAX_DEPTH, + FORWARD_HEADERS, + isDepthExceeded, + readDepth, +} from './headers' +import { runConversation } from './run-conversation' + +function captureHeaders(): { + backend: AgentExecutionBackend + ctxs: AgentBackendContext[] +} { + const ctxs: AgentBackendContext[] = [] + const backend = createIterableBackend({ + kind: 'capture', + async *stream(_input, context) { + ctxs.push(context) + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: 'noted', + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + yield { + type: 'llm_call', + task: context.task, + session: context.session, + model: 'capture', + costUsd: 0.01, + tokensIn: 1, + tokensOut: 1, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) + return { backend, ctxs } +} + +// ── readDepth + buildForwardHeaders ──────────────────────────────────── + +describe('headers — parse + build', () => { + it('returns 0 when the depth header is absent', () => { + expect(readDepth({})).toBe(0) + }) + it('parses an integer depth from string headers (Hono-style)', () => { + expect(readDepth({ [FORWARD_HEADERS.depth]: '3' })).toBe(3) + }) + it('throws fail-loud on non-integer depth (no silent reset)', () => { + expect(() => readDepth({ [FORWARD_HEADERS.depth]: 'banana' })).toThrow(/non-negative integer/) + expect(() => readDepth({ [FORWARD_HEADERS.depth]: '-1' })).toThrow(/non-negative integer/) + expect(() => readDepth({ [FORWARD_HEADERS.depth]: '1.5' })).toThrow(/non-negative integer/) + }) + it('uses the first value when a header is multi-valued (HTTP allows)', () => { + expect(readDepth({ [FORWARD_HEADERS.depth]: ['2', '99'] })).toBe(2) + }) + it('isDepthExceeded at the documented default of 4', () => { + expect(isDepthExceeded(0)).toBe(false) + expect(isDepthExceeded(3)).toBe(false) + expect(isDepthExceeded(4)).toBe(true) + expect(isDepthExceeded(99)).toBe(true) + expect(isDepthExceeded(2, 2)).toBe(true) // custom max + expect(DEFAULT_MAX_DEPTH).toBe(4) + }) + it('buildForwardHeaders increments depth + carries identity + run/turn', () => { + const out = buildForwardHeaders({ + inboundDepth: 1, + forwardedAuthorization: 'Bearer sk-tan-user', + runId: 'r', + turnId: 'r.t0.alice', + parentTurnId: 'r.t-1.outer', + speaker: 'alice', + }) + expect(out[FORWARD_HEADERS.depth]).toBe('2') + expect(out[FORWARD_HEADERS.authorization]).toBe('Bearer sk-tan-user') + expect(out[FORWARD_HEADERS.runId]).toBe('r') + expect(out[FORWARD_HEADERS.turnId]).toBe('r.t0.alice') + expect(out[FORWARD_HEADERS.parentTurnId]).toBe('r.t-1.outer') + expect(out[FORWARD_HEADERS.speaker]).toBe('alice') + }) + it('buildForwardHeaders omits absent optional fields (no empty strings)', () => { + const out = buildForwardHeaders({ + inboundDepth: 0, + runId: 'r', + turnId: 'r.t0.x', + speaker: 'x', + }) + expect(out[FORWARD_HEADERS.authorization]).toBeUndefined() + expect(out[FORWARD_HEADERS.parentTurnId]).toBeUndefined() + }) +}) + +// ── Cross-gateway propagation through nested conversations ───────────── + +describe('cross-gateway propagation through createConversationBackend', () => { + it('runId stays constant + depth monotonically increments through recursion', async () => { + const innerCap = captureHeaders() + const innerConv = defineConversation({ + participants: [ + { name: 'inner-x', backend: innerCap.backend }, + { name: 'inner-y', backend: innerCap.backend }, + ], + policy: { maxTurns: 2 }, + }) + const swarm = createConversationBackend({ conversation: innerConv, kind: 'swarm' }) + + const outerCap = captureHeaders() + const outerConv = defineConversation({ + participants: [ + { name: 'outer-a', backend: outerCap.backend }, + { name: 'swarm', backend: swarm }, + ], + policy: { maxTurns: 2 }, + }) + + const result = await runConversation(outerConv, { + seed: 'kick off', + runId: 'shared-run', + inboundDepth: 1, // pretend we were called from one hop deep already + propagatedHeaders: { + [FORWARD_HEADERS.authorization]: 'Bearer sk-tan-original-user', + }, + }) + expect(result.runId).toBe('shared-run') + + // outer-a was called at depth 1+1 = 2; its turnId scopes under shared-run. + expect(outerCap.ctxs).toHaveLength(1) + const outerHeaders = outerCap.ctxs[0]?.propagatedHeaders ?? {} + expect(outerHeaders[FORWARD_HEADERS.depth]).toBe('2') + expect(outerHeaders[FORWARD_HEADERS.runId]).toBe('shared-run') + expect(outerHeaders[FORWARD_HEADERS.authorization]).toBe('Bearer sk-tan-original-user') + + // inner participants were called from inside the swarm backend → another + // hop deeper. Depth should be 3, runId preserved, parent-turn-id set. + expect(innerCap.ctxs.length).toBeGreaterThan(0) + for (const ctx of innerCap.ctxs) { + const h = ctx.propagatedHeaders ?? {} + expect(h[FORWARD_HEADERS.runId]).toBe('shared-run') + expect(h[FORWARD_HEADERS.depth]).toBe('3') + expect(h[FORWARD_HEADERS.authorization]).toBe('Bearer sk-tan-original-user') + // The parent-turn-id is the outer turn the swarm was running in. + expect(h[FORWARD_HEADERS.parentTurnId]).toMatch(/\.t\d+\.swarm$/) + } + }) + + it('the depth-limit math holds end-to-end: an origin caller cannot trick the runtime into resetting', async () => { + const cap = captureHeaders() + const conv = defineConversation({ + participants: [ + { name: 'a', backend: cap.backend }, + { name: 'b', backend: cap.backend }, + ], + policy: { maxTurns: 2 }, + }) + // Pretend a (broken) caller forwarded depth as '0' even though they're + // really at depth 5. The runtime takes the caller-asserted value at face + // value here — depth honesty is enforced at the *gateway*, not by every + // intermediate runtime. We verify only that the runtime increments + // monotonically from whatever inboundDepth it was given. + await runConversation(conv, { + seed: 'go', + inboundDepth: 5, + }) + for (const ctx of cap.ctxs) { + expect(ctx.propagatedHeaders?.[FORWARD_HEADERS.depth]).toBe('6') + } + }) +}) diff --git a/src/conversation/run-conversation.test.ts b/src/conversation/run-conversation.test.ts new file mode 100644 index 0000000..21521de --- /dev/null +++ b/src/conversation/run-conversation.test.ts @@ -0,0 +1,296 @@ +import { describe, expect, it } from 'vitest' + +import { createIterableBackend } from '../backends' +import { ValidationError } from '../errors' +import type { AgentExecutionBackend, RuntimeStreamEvent } from '../types' +import { createConversationBackend } from './conversation-backend' +import { defineConversation } from './define-conversation' +import { runConversation, runConversationStream } from './run-conversation' +import type { ConversationStreamEvent } from './types' + +/** + * Canned-reply backend. Each invocation returns the next entry in `replies`, + * plus an `llm_call` event so the credit meter has something to aggregate. + * Useful for asserting routing and halting without spinning up a real model. + */ +function fakeBackend( + name: string, + replies: string[], + opts: { costUsd?: number } = {}, +): AgentExecutionBackend { + let callIdx = 0 + return createIterableBackend({ + kind: `fake-${name}`, + async *stream(_input, context) { + if (callIdx >= replies.length) { + throw new Error(`fake backend '${name}' ran out of canned replies at call ${callIdx}`) + } + const reply = replies[callIdx] + if (reply === undefined) { + throw new Error(`fake backend '${name}' has undefined reply at call ${callIdx}`) + } + callIdx += 1 + yield { + type: 'text_delta', + task: context.task, + session: context.session, + text: reply, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + yield { + type: 'llm_call', + task: context.task, + session: context.session, + model: `fake-model-${name}`, + tokensIn: 10, + tokensOut: 5, + costUsd: opts.costUsd ?? 0.01, + latencyMs: 1, + timestamp: new Date().toISOString(), + } satisfies RuntimeStreamEvent + }, + }) +} + +function alwaysThrowsBackend(name: string, message: string): AgentExecutionBackend { + return createIterableBackend({ + kind: `boom-${name}`, + // biome-ignore lint/correctness/useYield: deliberate — simulates a backend that throws before producing any event + async *stream() { + throw new Error(message) + }, + }) +} + +describe('defineConversation', () => { + const okBackend = fakeBackend('x', ['hi']) + + it('rejects fewer than 2 participants', () => { + expect(() => + defineConversation({ + participants: [{ name: 'solo', backend: okBackend }], + policy: { maxTurns: 4 }, + }), + ).toThrow(ValidationError) + }) + + it('rejects duplicate participant names', () => { + expect(() => + defineConversation({ + participants: [ + { name: 'dup', backend: okBackend }, + { name: 'dup', backend: okBackend }, + ], + policy: { maxTurns: 4 }, + }), + ).toThrow(/unique/) + }) + + it('rejects non-positive maxTurns', () => { + expect(() => + defineConversation({ + participants: [ + { name: 'a', backend: okBackend }, + { name: 'b', backend: okBackend }, + ], + policy: { maxTurns: 0 }, + }), + ).toThrow(/maxTurns/) + }) + + it("rejects turnOrder='alternate' with !=2 participants", () => { + expect(() => + defineConversation({ + participants: [ + { name: 'a', backend: okBackend }, + { name: 'b', backend: okBackend }, + { name: 'c', backend: okBackend }, + ], + policy: { maxTurns: 4, turnOrder: 'alternate' }, + }), + ).toThrow(/alternate/) + }) + + it('defaults to alternate for 2 participants, round-robin for N', () => { + const two = defineConversation({ + participants: [ + { name: 'a', backend: okBackend }, + { name: 'b', backend: okBackend }, + ], + policy: { maxTurns: 2 }, + }) + expect(two.policy.turnOrder).toBe('alternate') + const three = defineConversation({ + participants: [ + { name: 'a', backend: okBackend }, + { name: 'b', backend: okBackend }, + { name: 'c', backend: okBackend }, + ], + policy: { maxTurns: 3 }, + }) + expect(three.policy.turnOrder).toBe('round-robin') + }) +}) + +describe('runConversation — happy path', () => { + it('alternates between two participants for the full maxTurns', async () => { + const conv = defineConversation({ + participants: [ + { name: 'researcher', backend: fakeBackend('r', ['r-1', 'r-2', 'r-3']) }, + { name: 'critic', backend: fakeBackend('c', ['c-1', 'c-2', 'c-3']) }, + ], + policy: { maxTurns: 4 }, + }) + + const result = await runConversation(conv, { seed: 'design a key-rotation scheme' }) + + expect(result.turns).toBe(4) + expect(result.transcript.map((t) => t.speaker)).toEqual([ + 'researcher', + 'critic', + 'researcher', + 'critic', + ]) + expect(result.transcript.map((t) => t.text)).toEqual(['r-1', 'c-1', 'r-2', 'c-2']) + expect(result.halted).toEqual({ kind: 'max_turns', turns: 4 }) + expect(result.spentCreditsCents).toBe(4) // 4 turns × $0.01 = 4¢ + // Every turn has a deterministic id + attempts=1 on the happy path. + for (const t of result.transcript) { + expect(t.turnId).toMatch(new RegExp(`^${result.runId}\\.t\\d+\\.`)) + expect(t.attempts).toBe(1) + } + }) + + it('uses round-robin for 3 participants by default', async () => { + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['a-1', 'a-2']) }, + { name: 'b', backend: fakeBackend('b', ['b-1']) }, + { name: 'c', backend: fakeBackend('c', ['c-1']) }, + ], + policy: { maxTurns: 4 }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.transcript.map((t) => t.speaker)).toEqual(['a', 'b', 'c', 'a']) + }) +}) + +describe('runConversation — halting', () => { + it('halts on haltOn predicate true', async () => { + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['hi-from-a-1', 'hi-from-a-2', 'CONVERGED']) }, + { name: 'b', backend: fakeBackend('b', ['hi-from-b-1', 'CONVERGED', 'hi-from-b-3']) }, + ], + policy: { + maxTurns: 10, + haltOn: ({ lastTurn }) => + lastTurn.text.includes('CONVERGED') + ? { halted: true, reason: 'convergence-marker' } + : false, + }, + }) + const result = await runConversation(conv, { seed: 'start' }) + expect(result.halted).toEqual({ kind: 'predicate', reason: 'convergence-marker' }) + expect(result.turns).toBeLessThan(10) + expect(result.transcript.at(-1)?.text).toBe('CONVERGED') + }) + + it('halts on max_credits before exceeding the cap', async () => { + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['1', '2', '3', '4'], { costUsd: 0.05 }) }, + { name: 'b', backend: fakeBackend('b', ['1', '2', '3', '4'], { costUsd: 0.05 }) }, + ], + policy: { maxTurns: 20, maxCreditsCents: 12 }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.halted.kind).toBe('max_credits') + expect(result.spentCreditsCents).toBeGreaterThanOrEqual(12) + expect(result.turns).toBeLessThanOrEqual(20) + }) + + it('halts on participant_error when a backend throws', async () => { + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['ok']) }, + { name: 'b', backend: alwaysThrowsBackend('b', 'backend exploded') }, + ], + policy: { maxTurns: 6 }, + }) + const result = await runConversation(conv, { seed: 'go' }) + expect(result.halted.kind).toBe('participant_error') + if (result.halted.kind === 'participant_error') { + expect(result.halted.participant).toBe('b') + expect(result.halted.message).toContain('exploded') + } + expect(result.transcript).toHaveLength(1) + expect(result.transcript[0]?.speaker).toBe('a') + }) + + it('halts on abort signal', async () => { + const controller = new AbortController() + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['1', '2', '3', '4']) }, + { name: 'b', backend: fakeBackend('b', ['1', '2', '3', '4']) }, + ], + policy: { maxTurns: 10 }, + }) + controller.abort() + const result = await runConversation(conv, { seed: 'go', signal: controller.signal }) + expect(result.halted).toEqual({ kind: 'abort' }) + expect(result.turns).toBe(0) + }) +}) + +describe('runConversationStream', () => { + it('emits the full event sequence in order', async () => { + const conv = defineConversation({ + participants: [ + { name: 'a', backend: fakeBackend('a', ['hello-a']) }, + { name: 'b', backend: fakeBackend('b', ['hello-b']) }, + ], + policy: { maxTurns: 2 }, + }) + const events: ConversationStreamEvent[] = [] + for await (const ev of runConversationStream(conv, { seed: 'hi' })) { + events.push(ev) + } + const types = events.map((e) => e.type) + expect(types[0]).toBe('conversation_start') + expect(types.at(-1)).toBe('conversation_end') + expect(types.filter((t) => t === 'turn_start')).toHaveLength(2) + expect(types.filter((t) => t === 'turn_end')).toHaveLength(2) + expect(types.filter((t) => t === 'turn_text_delta')).toHaveLength(2) + }) +}) + +describe('createConversationBackend — recursion', () => { + it('exposes a Conversation as a single agent backend usable as a participant', async () => { + const innerConv = defineConversation({ + participants: [ + { name: 'inner-a', backend: fakeBackend('ia', ['inner-a-says-hi']) }, + { name: 'inner-b', backend: fakeBackend('ib', ['inner-b-says-hi']) }, + ], + policy: { maxTurns: 2 }, + }) + const swarmBackend = createConversationBackend({ conversation: innerConv, kind: 'swarm' }) + + const outerConv = defineConversation({ + participants: [ + { name: 'outer-a', backend: fakeBackend('oa', ['outer-a-says-hi']) }, + { name: 'swarm', backend: swarmBackend }, + ], + policy: { maxTurns: 2 }, + }) + + const result = await runConversation(outerConv, { seed: 'kick off' }) + expect(result.turns).toBe(2) + // The 'swarm' participant's turn text should contain the inner participants' speaker-tagged output. + const swarmTurn = result.transcript.find((t) => t.speaker === 'swarm') + expect(swarmTurn).toBeDefined() + expect(swarmTurn?.text).toMatch(/\[inner-a\]/) + expect(swarmTurn?.text).toMatch(/\[inner-b\]/) + }) +}) diff --git a/src/conversation/run-conversation.ts b/src/conversation/run-conversation.ts new file mode 100644 index 0000000..7419077 --- /dev/null +++ b/src/conversation/run-conversation.ts @@ -0,0 +1,584 @@ +/** + * @stable + * + * Conversation orchestrator. Drives N participants in turn through their own + * `AgentExecutionBackend`s, aggregating per-turn text + usage, enforcing + * `maxTurns` / `maxCreditsCents` / `haltOn`, and emitting per-event stream + * markers so callers can plumb the run through SSE without buffering. + * + * `runConversation` returns the full result; `runConversationStream` returns + * an `AsyncIterable` for callers that want to + * forward events as they arrive. Both share one driving loop. + * + * Distributed-systems primitives layered on top of the loop: + * - **Idempotent turn ids** — `turnId(runId, index, speaker)` stays stable + * across retries so caching gateways can dedupe. + * - **Durable journal** — optional `ConversationJournal` persists every + * committed turn; reusing a runId against the same journal resumes + * transparently from the last committed turn. + * - **Per-turn call policy** — deadline, retry-with-backoff, and a + * per-participant circuit breaker. Retries replay the same logical turn + * (same `turnId`); the retry loop lives inside the outer generator so + * deltas yield naturally without cross-coroutine buffering. + * - **Header propagation** — run/turn/depth headers (+ forwarded user + * authorization) stamped onto every outbound backend call so downstream + * gateways can bill the right user and enforce `X-Tangle-Forwarded-Depth`. + * + * Credit cap is enforced *between turns*, not mid-stream: a turn that + * overshoots the cap completes, the cap then halts the conversation before + * the next turn. + */ + +import type { KnowledgeReadinessReport } from '@tangle-network/agent-eval' + +import { BackendTransportError } from '../errors' +import { newRuntimeSession, nowIso, touchSession } from '../sessions' +import type { + AgentBackendContext, + AgentBackendInput, + AgentTaskSpec, + RuntimeSession, +} from '../types' +import { + type BackendCallPolicy, + CircuitBreakerState, + computeBackoff, + defaultIsRetryable, + makePerAttemptSignal, + sleep, +} from './call-policy' +import { buildForwardHeaders, FORWARD_HEADERS } from './headers' +import { turnId as deriveTurnId } from './turn-id' +import type { + Conversation, + ConversationParticipant, + ConversationResult, + ConversationStreamEvent, + ConversationTurn, + HaltContext, + HaltReason, + RunConversationOptions, + TurnOrder, +} from './types' + +export async function runConversation( + conversation: Conversation, + options: RunConversationOptions, +): Promise { + let result: ConversationResult | undefined + for await (const event of runConversationStream(conversation, options)) { + if (options.onEvent) await options.onEvent(event) + if (event.type === 'conversation_end') result = event.result + } + if (!result) { + throw new BackendTransportError( + 'conversation', + 'conversation stream ended without a conversation_end event', + ) + } + return result +} + +export async function* runConversationStream( + conversation: Conversation, + options: RunConversationOptions, +): AsyncIterable { + const runId = options.runId ?? `conv_${crypto.randomUUID()}` + const inboundDepth = options.inboundDepth ?? 0 + const callerHeaders = options.propagatedHeaders ?? {} + const forwardedAuthorization = callerHeaders[FORWARD_HEADERS.authorization] + + const breakers = new Map() + for (const participant of conversation.participants) { + const cfg = + participant.callPolicy?.circuitBreaker ?? + conversation.policy.defaultCallPolicy?.circuitBreaker + breakers.set(participant.name, new CircuitBreakerState(cfg)) + } + + let transcript: ConversationTurn[] = [] + let spentCreditsCents = 0 + let startedAt = nowIso() + let resumed = false + + if (options.journal) { + const prior = await options.journal.loadRun(runId) + if (prior) { + if (prior.halted) { + // Run already terminated — surface its final state without re-running. + const replayResult: ConversationResult = { + runId, + transcript: prior.turns, + turns: prior.turns.length, + spentCreditsCents: prior.turns.reduce( + (sum, t) => sum + centsFromUsd(t.usage?.costUsd ?? 0), + 0, + ), + halted: prior.halted, + durationMs: 0, + startedAt: prior.startedAt, + endedAt: prior.endedAt ?? prior.startedAt, + } + yield { + type: 'conversation_resumed', + runId, + participants: conversation.participants.map((p) => p.name), + transcript: prior.turns, + timestamp: nowIso(), + } + yield { type: 'conversation_end', runId, result: replayResult, timestamp: nowIso() } + return + } + transcript = [...prior.turns] + spentCreditsCents = transcript.reduce( + (sum, t) => sum + centsFromUsd(t.usage?.costUsd ?? 0), + 0, + ) + startedAt = prior.startedAt + resumed = true + } else { + await options.journal.beginRun(runId, startedAt) + } + } + const startedAtMs = Date.now() + + if (resumed) { + yield { + type: 'conversation_resumed', + runId, + participants: conversation.participants.map((p) => p.name), + // Snapshot the resumed transcript — the live `transcript` array gets + // pushed to as the run continues, so handing the bare reference to a + // subscriber would leak future writes into a past event. + transcript: [...transcript], + timestamp: nowIso(), + } + } else { + yield { + type: 'conversation_start', + runId, + participants: conversation.participants.map((p) => p.name), + seed: options.seed, + timestamp: startedAt, + } + } + + // When resumed, the next user input is the last persisted turn's text; + // for a fresh run, it's the caller's seed. + let currentInput = + transcript.length === 0 + ? options.seed + : (transcript[transcript.length - 1]?.text ?? options.seed) + let halt: HaltReason | undefined + + const initialOffset = transcript.length + for (let turnIndex = initialOffset; turnIndex < conversation.policy.maxTurns; turnIndex++) { + if (options.signal?.aborted) { + halt = { kind: 'abort' } + break + } + if ( + conversation.policy.maxCreditsCents !== undefined && + spentCreditsCents >= conversation.policy.maxCreditsCents + ) { + halt = { + kind: 'max_credits', + spentCents: spentCreditsCents, + capCents: conversation.policy.maxCreditsCents, + } + break + } + + const speakerIdx = selectSpeaker( + conversation.policy.turnOrder, + conversation.participants.length, + { transcript, turnIndex, spentCreditsCents }, + ) + const speaker = conversation.participants[speakerIdx] + if (!speaker) { + throw new BackendTransportError( + 'conversation', + `turnOrder selector returned out-of-range index ${speakerIdx} for ${conversation.participants.length} participants`, + ) + } + + const tid = deriveTurnId(runId, turnIndex, speaker.name) + const callPolicy: BackendCallPolicy | undefined = + speaker.callPolicy ?? conversation.policy.defaultCallPolicy + const breaker = breakers.get(speaker.name) + if (!breaker) { + throw new BackendTransportError( + 'conversation', + `internal: no circuit-breaker state registered for participant '${speaker.name}'`, + ) + } + const isRetryable = callPolicy?.isRetryable ?? defaultIsRetryable + const totalAttempts = 1 + (callPolicy?.maxRetries ?? 0) + + yield { + type: 'turn_start', + runId, + index: turnIndex, + speaker: speaker.name, + turnId: tid, + attempt: 1, + timestamp: nowIso(), + } + + let aggregator: TurnAggregator | undefined + let attemptCount = 0 + let lastError: unknown + let breakerOpenFailure: unknown + + for (let attempt = 1; attempt <= totalAttempts; attempt++) { + attemptCount = attempt + try { + breaker.preflight(speaker.name) + } catch (err) { + // Breaker open — no point retrying; halt the conversation with this + // participant's error rather than busy-looping until exhaustion. + breakerOpenFailure = err + break + } + + if (attempt > 1) { + yield { + type: 'turn_retry', + runId, + index: turnIndex, + speaker: speaker.name, + turnId: tid, + attempt, + reason: lastError instanceof Error ? lastError.message : String(lastError), + timestamp: nowIso(), + } + } + + const perAttempt = makePerAttemptSignal(options.signal, callPolicy?.perAttemptDeadlineMs) + const localAgg = new TurnAggregator({ + index: turnIndex, + speaker: speaker.name, + startedAt: nowIso(), + }) + + try { + for await (const delta of driveSingleAttempt({ + speaker, + participants: conversation.participants, + input: currentInput, + turnIndex, + runId, + turnId: tid, + transcript, + signal: perAttempt.signal, + aggregator: localAgg, + propagatedHeaders: buildForwardHeaders({ + inboundDepth, + forwardedAuthorization, + runId, + turnId: tid, + parentTurnId: options.parentTurnId, + speaker: speaker.name, + }), + })) { + yield { + type: 'turn_text_delta', + runId, + index: turnIndex, + speaker: speaker.name, + turnId: tid, + text: delta.text, + timestamp: delta.timestamp, + } + } + perAttempt.dispose() + breaker.recordSuccess() + aggregator = localAgg + break + } catch (err) { + perAttempt.dispose() + breaker.recordFailure() + // Surface the deadline error explicitly when timeout was the cause — + // otherwise the upstream may throw a generic AbortError that loses + // diagnostic info. + lastError = perAttempt.getDeadlineError() ?? err + if (attempt >= totalAttempts || !isRetryable(lastError)) { + break + } + await sleep(computeBackoff(callPolicy?.retryBackoffMs, attempt)) + } + } + + if (!aggregator) { + const failure = breakerOpenFailure ?? lastError + const message = failure instanceof Error ? failure.message : String(failure) + halt = { kind: 'participant_error', participant: speaker.name, message } + break + } + + const turn = aggregator.toTurn({ turnId: tid, attempts: attemptCount }) + transcript.push(turn) + spentCreditsCents += centsFromUsd(turn.usage?.costUsd ?? 0) + if (options.journal) { + await options.journal.appendTurn(runId, turn) + } + + yield { type: 'turn_end', runId, turn, timestamp: nowIso() } + + if (conversation.policy.haltOn) { + const haltCtx: HaltContext = { + transcript, + lastTurn: turn, + turnIndex, + spentCreditsCents, + } + const decision = await conversation.policy.haltOn(haltCtx) + if (decision === true) { + halt = { kind: 'predicate', reason: 'predicate_true' } + break + } + if (typeof decision === 'object' && decision !== null && decision.halted) { + halt = { kind: 'predicate', reason: decision.reason } + break + } + } + + currentInput = turn.text + } + + if (!halt) halt = { kind: 'max_turns', turns: transcript.length } + + const endedAt = nowIso() + const result: ConversationResult = { + runId, + transcript, + turns: transcript.length, + spentCreditsCents, + halted: halt, + durationMs: Date.now() - startedAtMs, + startedAt, + endedAt, + } + if (options.journal) { + await options.journal.recordHalt(runId, halt, endedAt) + } + + yield { type: 'conversation_end', runId, result, timestamp: endedAt } +} + +// ── Single attempt ─────────────────────────────────────────────────────── + +interface SingleAttemptArgs { + speaker: ConversationParticipant + participants: readonly ConversationParticipant[] + input: string + turnIndex: number + runId: string + turnId: string + transcript: readonly ConversationTurn[] + signal: AbortSignal + aggregator: TurnAggregator + propagatedHeaders: Record +} + +async function* driveSingleAttempt( + args: SingleAttemptArgs, +): AsyncGenerator<{ text: string; timestamp?: string }> { + const task: AgentTaskSpec = { + id: args.turnId, + intent: args.input, + metadata: { + runId: args.runId, + turnId: args.turnId, + turnIndex: args.turnIndex, + speaker: args.speaker.name, + participants: args.participants.map((p) => p.name), + }, + } + const knowledge = passingReadiness(task.id) + const messages = buildMessagesFor(args.speaker.name, args.transcript, args.input) + const backendInput: AgentBackendInput = { task, message: args.input, messages } + + const startCtx: Omit & { requestedSessionId?: string } = { + task, + knowledge, + signal: args.signal, + runId: args.runId, + turnId: args.turnId, + propagatedHeaders: args.propagatedHeaders, + } + const session: RuntimeSession = args.speaker.backend.start + ? touchSession(await args.speaker.backend.start(backendInput, startCtx)) + : newRuntimeSession(args.speaker.backend.kind, undefined, { + runId: args.runId, + turnIndex: args.turnIndex, + turnId: args.turnId, + speaker: args.speaker.name, + }) + + const streamCtx: AgentBackendContext = { + task, + knowledge, + session, + signal: args.signal, + runId: args.runId, + turnId: args.turnId, + propagatedHeaders: args.propagatedHeaders, + } + + for await (const event of args.speaker.backend.stream(backendInput, streamCtx)) { + if (args.signal.aborted) { + // Surface the abort so the outer retry/halt logic can react. The signal + // either fires because of caller-cancel (propagate as-is) or because of + // the per-attempt deadline timer (signal.reason is DeadlineExceededError). + const reason = args.signal.reason + throw reason instanceof Error ? reason : new Error('aborted') + } + if (event.type === 'text_delta') { + args.aggregator.appendText(event.text) + yield { text: event.text, timestamp: event.timestamp } + } else if (event.type === 'llm_call') { + args.aggregator.recordUsage(event) + } else if (event.type === 'final') { + args.aggregator.adoptFinalText(event.text) + } + } +} + +class TurnAggregator { + private text = '' + private adoptedFinal = false + private usage: + | { + tokensIn?: number + tokensOut?: number + costUsd?: number + latencyMs?: number + model?: string + } + | undefined + + constructor(private readonly base: { index: number; speaker: string; startedAt: string }) {} + + appendText(text: string): void { + if (this.adoptedFinal) return + this.text += text + } + + /** + * Use the backend's `final.text` only when no streamed deltas were observed. + * Some backends emit deltas AND a final summary; treating both as content + * would double-count. + */ + adoptFinalText(text: string | undefined): void { + if (!text) return + if (this.text.length > 0) return + this.text = text + this.adoptedFinal = true + } + + recordUsage(event: { + model?: string + tokensIn?: number + tokensOut?: number + costUsd?: number + latencyMs?: number + }): void { + const u = this.usage ?? {} + if (event.tokensIn !== undefined) u.tokensIn = (u.tokensIn ?? 0) + event.tokensIn + if (event.tokensOut !== undefined) u.tokensOut = (u.tokensOut ?? 0) + event.tokensOut + if (event.costUsd !== undefined) u.costUsd = (u.costUsd ?? 0) + event.costUsd + if (event.latencyMs !== undefined) u.latencyMs = event.latencyMs + if (event.model !== undefined) u.model = event.model + this.usage = u + } + + toTurn(meta: { turnId: string; attempts: number }): ConversationTurn { + return { + index: this.base.index, + speaker: this.base.speaker, + turnId: meta.turnId, + text: this.text.trim(), + usage: this.usage, + attempts: meta.attempts, + startedAt: this.base.startedAt, + endedAt: nowIso(), + } + } +} + +/** + * Build the participant's POV of the transcript so an OpenAI-compatible + * backend sees its own turns as `assistant` and everyone else's as `user`, + * with explicit speaker tags so 3+ party conversations stay disambiguated. + * The seed / current input is appended as the trailing user message. + */ +function buildMessagesFor( + speakerName: string, + transcript: readonly ConversationTurn[], + currentInput: string, +): Array<{ role: string; content: string }> { + const messages: Array<{ role: string; content: string }> = [] + for (const turn of transcript) { + if (turn.speaker === speakerName) { + messages.push({ role: 'assistant', content: turn.text }) + } else { + messages.push({ role: 'user', content: `[${turn.speaker}] ${turn.text}` }) + } + } + if (currentInput) messages.push({ role: 'user', content: currentInput }) + return messages +} + +function selectSpeaker( + order: TurnOrder | undefined, + participantCount: number, + state: { transcript: readonly ConversationTurn[]; turnIndex: number; spentCreditsCents: number }, +): number { + const resolved = order ?? (participantCount === 2 ? 'alternate' : 'round-robin') + if (resolved === 'alternate' || resolved === 'round-robin') { + return state.turnIndex % participantCount + } + if (typeof resolved === 'function') { + const idx = resolved(state) + if (!Number.isInteger(idx) || idx < 0 || idx >= participantCount) { + throw new BackendTransportError( + 'conversation', + `turnOrder function returned invalid index ${String(idx)} for ${participantCount} participants`, + ) + } + return idx + } + throw new BackendTransportError('conversation', `unknown turnOrder: ${String(resolved)}`) +} + +function centsFromUsd(usd: number): number { + return Math.round(usd * 100) +} + +/** + * Synthesize a knowledge-readiness report that *passes* every gate, used to + * satisfy `AgentBackendContext.knowledge` per turn. Conversations don't apply + * task-level readiness gating per-turn — that's a `runAgentTask` concern. + */ +function passingReadiness(taskId: string): KnowledgeReadinessReport { + return { + taskId, + readinessScore: 1, + blockingMissingRequirements: [], + nonBlockingGaps: [], + recommendedAction: 'run_agent', + bundle: { + taskId, + requirements: [], + evidenceIds: [], + claimIds: [], + wikiPageIds: [], + userAnswers: {}, + missing: [], + readinessScore: 1, + }, + severity: 'info', + reason: 'conversation-mode: readiness gating not applied per-turn', + } +} diff --git a/src/conversation/turn-id.ts b/src/conversation/turn-id.ts new file mode 100644 index 0000000..54360d9 --- /dev/null +++ b/src/conversation/turn-id.ts @@ -0,0 +1,32 @@ +/** + * @stable + * + * Deterministic turn identifier. Stable across retries of the same logical + * turn so backends (and any caching gateway in between) can dedupe on it. + * A retry triggered by a network blip or deadline timeout MUST produce the + * same `turn_id`; only the underlying attempt count differs. + * + * Shape: `${runId}.t${index}.${speakerSlug}` — readable in logs, sortable by + * turn index, attributable to a speaker. Slugify keeps the speaker portion + * URL-safe so it can ride in HTTP headers without escaping. + */ + +export function turnId(runId: string, index: number, speaker: string): string { + return `${runId}.t${index}.${slugifySpeaker(speaker)}` +} + +/** + * Reduce a speaker name to ASCII alphanumerics + dashes. Preserves enough + * substance to read in a log line; collisions between speakers within a + * single Conversation are prevented by `defineConversation`'s + * unique-name check, so the slug only needs to be deterministic, not unique. + */ +export function slugifySpeaker(speaker: string): string { + const cleaned = speaker + .normalize('NFKD') + .replace(/[^\w-]+/g, '-') + .replace(/-+/g, '-') + .replace(/^-|-$/g, '') + .toLowerCase() + return cleaned || 'anon' +} diff --git a/src/conversation/types.ts b/src/conversation/types.ts new file mode 100644 index 0000000..44e7e1b --- /dev/null +++ b/src/conversation/types.ts @@ -0,0 +1,254 @@ +/** + * @stable + * + * Public types for multi-agent conversations. A `Conversation` is two-or-more + * participants taking turns through their own `AgentExecutionBackend`s, driven + * by a `ConversationPolicy` (turn order, halting, hard credit ceiling). + * + * Each participant's backend can resolve to any reachable endpoint — + * in-process iterable, local cli-bridge, sandbox, router, or a remote + * agent-gateway — so the same `runConversation` call drives same-machine, + * same-cloud, and cross-cloud orchestration without code change. + */ + +import type { AgentExecutionBackend } from '../types' +import type { BackendCallPolicy } from './call-policy' +import type { PropagatedHeaders } from './headers' +import type { ConversationJournal } from './journal' + +/** @stable */ +export interface ConversationParticipant { + /** + * Stable name used as the speaker label in the transcript. Must be unique + * within a `Conversation`. + */ + name: string + /** + * Backend that runs this participant's turn. Reuses the existing + * `AgentExecutionBackend` contract from `runAgentTaskStream`, so any + * registered backend (iterable, sandbox, OpenAI-compatible) works without + * adaptation. + */ + backend: AgentExecutionBackend + /** + * Optional human label for traces / dashboards. Distinct from `name`, which + * is the addressing key. + */ + label?: string + /** + * Optional per-participant override of the conversation's default + * `callPolicy`. Use to tighten the deadline or raise the retry budget for + * a participant known to be slow or flaky. + */ + callPolicy?: BackendCallPolicy +} + +/** @stable */ +export type TurnOrder = 'alternate' | 'round-robin' | ((state: ConversationDriveState) => number) + +/** @stable */ +export interface ConversationDriveState { + transcript: readonly ConversationTurn[] + turnIndex: number + spentCreditsCents: number +} + +/** @stable */ +export interface HaltContext extends ConversationDriveState { + lastTurn: ConversationTurn +} + +/** @stable */ +export interface HaltSignal { + halted: true + reason: string +} + +/** @stable */ +export type HaltPredicate = ( + ctx: HaltContext, +) => boolean | HaltSignal | Promise + +/** @stable */ +export type HaltReason = + | { kind: 'max_turns'; turns: number } + | { kind: 'max_credits'; spentCents: number; capCents: number } + | { kind: 'predicate'; reason: string } + | { kind: 'abort' } + | { kind: 'participant_error'; participant: string; message: string } + +/** @stable */ +export interface ConversationPolicy { + /** Hard cap on speaker-turns. Each call into a participant's backend counts as 1. */ + maxTurns: number + /** + * Hard cap on aggregate credit spend across all participants, in cents. + * Computed by summing `llm_call.costUsd` from every participant's stream. + * Unset (`undefined`) means no credit ceiling — the run is bounded only by + * `maxTurns` and `haltOn`. + */ + maxCreditsCents?: number + /** + * Speaker selection. Defaults to `'alternate'` for two-participant + * conversations and `'round-robin'` for any other arity. + */ + turnOrder?: TurnOrder + /** + * Optional convergence / content-based halt. Called after every turn ends; + * returning truthy stops the loop with `{ kind: 'predicate', ... }`. + */ + haltOn?: HaltPredicate + /** + * Default per-turn resilience policy applied to every participant call + * (deadline, retries, circuit breaker). Individual participants may + * override via `ConversationParticipant.callPolicy`. + */ + defaultCallPolicy?: BackendCallPolicy +} + +/** @stable */ +export interface ConversationTurn { + index: number + speaker: string + /** + * Deterministic turn identifier — stable across retries of the same logical + * turn so caching gateways and trace backends can dedupe. Shape: + * `${runId}.t${index}.${speakerSlug}`. + */ + turnId: string + text: string + /** + * Aggregated backend usage for this turn alone. Populated from any + * `llm_call` stream events the backend emitted; `undefined` when the + * backend reports no usage. + */ + usage?: { + tokensIn?: number + tokensOut?: number + costUsd?: number + latencyMs?: number + model?: string + } + /** + * Number of attempts that ran before this turn committed. `1` is the + * common case; higher means the call policy retried after transient + * failures. + */ + attempts: number + startedAt: string + endedAt: string +} + +/** @stable */ +export interface Conversation { + participants: readonly ConversationParticipant[] + policy: ConversationPolicy +} + +/** @stable */ +export interface RunConversationOptions { + /** First message kicking off the conversation. Routes to the first speaker. */ + seed: string + /** + * Optional run identifier for cross-participant trace correlation. Auto- + * generated when omitted. Reusing a runId against the same `journal` + * resumes the prior run — the runner replays the persisted transcript and + * continues from the first un-recorded turn. + */ + runId?: string + /** Cancellation signal — aborts mid-stream and halts with `{ kind: 'abort' }`. */ + signal?: AbortSignal + /** + * Event sink for per-turn micro-events. Distinct from the result transcript: + * the sink fires for every text-delta, every turn-start/end, and the + * conversation-start/end markers. Used to drive SSE / dashboard updates + * without waiting for the conversation to finish. + */ + onEvent?: (event: ConversationStreamEvent) => void | Promise + /** + * Optional durable transcript. When set, the runner persists every + * committed turn before yielding `turn_end`. Reusing the same `runId` + * against the same journal resumes from the last committed turn — so a + * driver process crash mid-run loses zero acknowledged turns. + */ + journal?: ConversationJournal + /** + * Headers to forward verbatim to every participant backend call (gateway + * propagation: `X-Tangle-Forwarded-Authorization`, run/turn correlation, + * depth counter). Backends opt in by reading `propagatedHeaders` from + * their `AgentBackendContext`; backends that ignore the field still work. + */ + propagatedHeaders?: PropagatedHeaders + /** + * Inbound depth at the point this driver was invoked. The runner + * increments it on every outbound participant call; gateways refuse at + * `DEFAULT_MAX_DEPTH`. Default 0 (origin caller). + */ + inboundDepth?: number + /** + * Parent turn id when this conversation is *inside* another turn (i.e. the + * driver is itself a participant via `createConversationBackend`). The + * runner stamps each outbound call with this as `X-Tangle-Parent-TurnId` + * so trace stitching survives nested orchestration. + */ + parentTurnId?: string +} + +/** @stable */ +export interface ConversationResult { + runId: string + transcript: ConversationTurn[] + turns: number + spentCreditsCents: number + halted: HaltReason + durationMs: number + startedAt: string + endedAt: string +} + +/** @stable */ +export type ConversationStreamEvent = + | { + type: 'conversation_start' + runId: string + participants: readonly string[] + seed: string + timestamp: string + } + | { + type: 'conversation_resumed' + runId: string + participants: readonly string[] + transcript: readonly ConversationTurn[] + timestamp: string + } + | { + type: 'turn_start' + runId: string + index: number + speaker: string + turnId: string + attempt: number + timestamp: string + } + | { + type: 'turn_text_delta' + runId: string + index: number + speaker: string + turnId: string + text: string + timestamp?: string + } + | { + type: 'turn_retry' + runId: string + index: number + speaker: string + turnId: string + attempt: number + reason: string + timestamp: string + } + | { type: 'turn_end'; runId: string; turn: ConversationTurn; timestamp: string } + | { type: 'conversation_end'; runId: string; result: ConversationResult; timestamp: string } diff --git a/src/index.ts b/src/index.ts index 4c95e78..ce94a51 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,6 +28,58 @@ export { createOpenAICompatibleBackend, createSandboxPromptBackend, } from './backends' +export type { + BackendCallPolicy, + CircuitBreakerConfig, + Conversation, + ConversationDriveState, + ConversationJournal, + ConversationJournalEntry, + ConversationParticipant, + ConversationPolicy, + ConversationResult, + ConversationStreamEvent, + ConversationTurn, + ForwardHeaderName, + HaltContext, + HaltPredicate, + HaltReason, + HaltSignal, + PropagatedHeaders, + RetryableErrorPredicate, + RetryBackoff, + RunConversationOptions, + TurnOrder, +} from './conversation' +// ── Conversations (multi-agent, distributed) ────────────────────────── +// Drives N participants in turn through any reachable AgentExecutionBackend +// (in-process, local cli-bridge, sandbox, router, remote agent-gateway). +// Layered primitives — durable journal, per-turn call policy (deadline + +// retry + circuit breaker), deterministic turn ids, and cross-gateway header +// propagation — make the same driver work same-machine, same-cluster, and +// cross-cloud without code changes. See docs/agent-bus-protocol.md. +export { + buildForwardHeaders, + CircuitBreakerState, + CircuitOpenError, + computeBackoff, + createConversationBackend, + DEFAULT_MAX_DEPTH, + DeadlineExceededError, + defaultIsRetryable, + defineConversation, + FileConversationJournal, + FORWARD_HEADERS, + InMemoryConversationJournal, + isDepthExceeded, + makePerAttemptSignal, + readDepth, + runConversation, + runConversationStream, + sleep, + slugifySpeaker, + turnId, +} from './conversation' // ── Chat-turn HTTP orchestration ────────────────────────────────────── // `handleChatTurn` frames a producer with the `session.run.*` envelope // + NDJSON line protocol + persist/post-process/trace-flush hook order. diff --git a/src/types.ts b/src/types.ts index b468c3f..4a69cfb 100644 --- a/src/types.ts +++ b/src/types.ts @@ -458,6 +458,30 @@ export interface AgentBackendContext { knowledge: KnowledgeReadinessReport session: RuntimeSession signal?: AbortSignal + /** + * Conversation/run identifier when this call is part of a multi-agent run. + * Backends should stamp it into any trace/log emission so cross-participant + * events correlate. Absent when the call is a stand-alone `runAgentTask`. + */ + runId?: string + /** + * Deterministic turn id for this single call. Stable across retries of the + * same logical turn so a caching gateway / idempotent backend can dedupe. + */ + turnId?: string + /** + * If this call is itself nested inside a higher-order conversation + * (recursion via `createConversationBackend`), the enclosing turn's id. + * Used for trace stitching across nested orchestration. + */ + parentTurnId?: string + /** + * Headers to forward verbatim to any outbound HTTP the backend issues: + * `X-Tangle-Forwarded-Authorization`, `X-Tangle-Forwarded-Depth`, + * run/turn correlation. Backends that issue HTTP MUST merge these into + * the outbound request; backends that don't issue HTTP may ignore them. + */ + propagatedHeaders?: Readonly> } /** @stable */