Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
dist/
node_modules/
*.tgz

# meta-harness / evolve loop runtime state
.evolve/
76 changes: 76 additions & 0 deletions docs/agent-bus-protocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Agent-Bus Protocol

The wire-level contract that makes agent-to-agent communication composable across organizational and cloud boundaries. agent-runtime emits these headers on every outbound participant call; agent-gateway reads them on inbound and enforces the depth limit. Any third party can publish a gateway-fronted agent and participate, billed correctly, traced consistently, recursion-bounded.

This document is **normative**. agent-runtime + agent-gateway implementations MUST honor every section; downstream agent endpoints MAY rely on the contract.

## Headers

All header names are lowercased on the wire. Implementations MUST be case-insensitive on read.

| Header | Direction | Purpose |
|---|---|---|
| `x-tangle-forwarded-authorization` | inbound+outbound | The original user's `Bearer sk-tan-<user>` (or `Bearer <x402-token>`). Forwarded verbatim through every hop so the final billed party is always the human/agent that initiated the request, not whichever intermediate agent made the call. |
| `x-tangle-forwarded-depth` | inbound+outbound | Hop counter. Origin caller MUST omit or set to `0`. Every gateway / runtime MUST increment by 1 before forwarding. Recipients MUST refuse with HTTP `413 Payload Too Large` when the inbound depth ≥ `DEFAULT_MAX_DEPTH` (4 unless overridden). |
| `x-tangle-runid` | inbound+outbound | The top-level conversation's run identifier. Propagated unchanged through all nested calls so all hops correlate to one run. |
| `x-tangle-turnid` | outbound | This specific call's deterministic turn identifier. Format: `<runId>.t<index>.<speakerSlug>`. Stable across retries of the same logical turn; caching gateways MAY treat repeated turn ids as idempotency keys and return cached results. |
| `x-tangle-parent-turnid` | outbound (recursion) | When the call is *inside* another turn (i.e. the caller is itself a participant in a higher-order conversation), the enclosing turn's id. Used for trace stitching. Omit when at the top level. |
| `x-tangle-speaker` | outbound | The logical conversation-peer label at the sending side (e.g. `researcher`, `critic`). Cosmetic but useful for trace tagging and dashboard filtering. |

## Invariants

1. **Depth monotonicity.** A receiving runtime MUST NOT decrement, reset, or omit the depth counter on outbound calls. A misbehaving intermediate that resets depth breaks the recursion bound for everyone downstream.
2. **Authorization preservation.** A receiving runtime MUST forward `x-tangle-forwarded-authorization` verbatim on any outbound call it makes on behalf of the original caller. Substituting its own credentials silently re-bills the user incorrectly.
3. **runId immutability.** A nested conversation invoked via `createConversationBackend` does NOT mint a new `x-tangle-runid` — it inherits the parent's. (It does mint new `turnId`s, which include its own run scope.)
4. **Idempotency contract (advisory).** Gateways MAY dedupe by `(runId, turnId)`. If they do, they MUST return the cached response unchanged. If they don't, retries cost N× credits — that's the caller's choice.
5. **Refusal granularity.** Depth-limit refusal at a gateway MUST be `413 Payload Too Large` with a body describing the limit and the observed inbound depth, so the caller can route around (or accept) the limit instead of treating it as a generic error.

## Worked example

User triggers conversation. The driver runs on machine M.

```
USER → router.tangle.tools (POST /v1/chat/completions, bridge/sandbox/researcher)
↓ Authorization: Bearer sk-tan-user-123
↓ X-Tangle-Forwarded-Depth: 0 (origin)
router → researcher-sandbox-A (us-east)
↓ X-Tangle-Forwarded-Authorization: Bearer sk-tan-user-123
↓ X-Tangle-Forwarded-Depth: 1
↓ X-Tangle-RunId: conv_abc
↓ X-Tangle-TurnId: conv_abc.t0.researcher
sandbox-A's agent decides to call agent-runtime conversation for critic-review
↓ runConversation({ participants: [..., bridge/sandbox/critic], inboundDepth: 1 })
agent-runtime → critic-sandbox-B (eu-west)
↓ X-Tangle-Forwarded-Authorization: Bearer sk-tan-user-123
↓ X-Tangle-Forwarded-Depth: 2
↓ X-Tangle-RunId: conv_abc (preserved through hops)
↓ X-Tangle-TurnId: conv_abc.t0.critic (new, scoped to nested run)
↓ X-Tangle-Parent-TurnId: conv_abc.t0.researcher (recursion link)
```

At depth 4, a gateway refuses with 413 — the chain stops there, sk-tan-user-123 is billed for hops 1–4 only, the trace shows the refusal point.

## Implementation contract for endpoint authors

A `@tangle-network/agent-gateway`-fronted endpoint:
- MUST read `x-tangle-forwarded-depth` on inbound, default 0, refuse with 413 when ≥ `DEFAULT_MAX_DEPTH`.
- MUST honor `x-tangle-forwarded-authorization` if present, using it as the effective billing identity instead of the direct caller's auth (when the direct caller is whitelisted as an inter-agent caller).
- SHOULD propagate `x-tangle-runid`, `x-tangle-turnid`, `x-tangle-parent-turnid` into its own trace records.
- MAY use `(runId, turnId)` as an idempotency key to dedupe retries.

A `@tangle-network/agent-runtime` consumer (driver code):
- Passes `propagatedHeaders` + `inboundDepth` + `parentTurnId` to `runConversation` / `runConversationStream` from its inbound request context.
- The runtime automatically emits `buildForwardHeaders(...)` on every participant backend call.
- Backends that issue HTTP (e.g. `createOpenAICompatibleBackend`) merge `context.propagatedHeaders` into their outbound request automatically.

## Reference

- `@tangle-network/agent-runtime/headers` — names, builders, depth parsing, exported constants.
- `@tangle-network/agent-runtime/call-policy` — deadline / retry / circuit-breaker primitives that compose with the protocol.
- `@tangle-network/agent-runtime/journal` — durable transcript so a driver crash mid-run doesn't lose acknowledged turns.
- `@tangle-network/agent-runtime/turn-id` — deterministic `turnId(runId, index, speaker)`.
- `@tangle-network/agent-gateway` — Hono middleware for inbound enforcement.

## Versioning

This protocol is currently `v0`. Breaking changes will bump to `v1` and add a `x-tangle-protocol-version` header; until then there is no version header (implicit `v0`). Additive headers do not require a version bump.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tangle-network/agent-runtime",
"version": "0.25.2",
"version": "0.26.0",
"description": "Reusable runtime lifecycle for domain-specific agents.",
"homepage": "https://github.com/tangle-network/agent-runtime#readme",
"repository": {
Expand Down
7 changes: 7 additions & 0 deletions src/backends.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ export function createOpenAICompatibleBackend<
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': 'application/json',
// Cross-gateway forwarding: when this call is part of a
// multi-agent conversation, the runner stamps run/turn/
// depth/forwarded-auth headers onto the context. They flow
// through to the downstream gateway verbatim so the original
// user gets billed, the recursion depth stays bounded, and
// the trace correlates across hops.
...(context.propagatedHeaders ?? {}),
},
body: requestBody,
signal: attemptSignal.signal,
Expand Down
178 changes: 178 additions & 0 deletions src/conversation/call-policy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* @stable
*
* Per-call resilience policy for participant backends: deadline, retry with
* backoff, and a circuit breaker. Each policy is applied *around* a single
* turn's backend invocation, not across the whole conversation — the
* conversation-level credit cap and `maxTurns` bound the broader run.
*
* Deadlines abort the underlying backend stream via `AbortSignal` linkage so
* the OpenAI/SDK clients tear down their HTTP request cleanly instead of
* leaking sockets. Retries replay the same logical turn (same `turnId`) so
* any caching gateway can dedupe. Circuit breakers are *per participant*: A's
* failures don't open B's breaker.
*/

/** Pure judgment of whether an error is worth retrying. Defaults: TimeoutError, AbortError, fetch-level network errors. */
export type RetryableErrorPredicate = (err: unknown) => boolean

/** Backoff between attempts. Constant ms, or `(attempt: 1-indexed) => ms`. */
export type RetryBackoff = number | ((attempt: number) => number)

/** Circuit-breaker tuning. `failuresToOpen` consecutive failures opens it; closed only after `cooldownMs`. */
export interface CircuitBreakerConfig {
failuresToOpen: number
cooldownMs: number
}

export interface BackendCallPolicy {
/** Per-attempt wall clock limit. Exceeding fires an AbortSignal and is treated as a retryable failure. */
perAttemptDeadlineMs?: number
/** Number of retries after the first attempt; total attempts = 1 + maxRetries. Default 0. */
maxRetries?: number
/** Backoff between attempts. Default 250ms with jitter. */
retryBackoffMs?: RetryBackoff
/** Custom retry classifier. Defaults to {@link defaultIsRetryable}. */
isRetryable?: RetryableErrorPredicate
/** Circuit breaker that opens after N consecutive failures per participant. */
circuitBreaker?: CircuitBreakerConfig
}

export class CircuitOpenError extends Error {
constructor(participant: string, retryAfterMs: number) {
super(
`circuit open for participant '${participant}'; ${retryAfterMs}ms remaining before retry allowed`,
)
this.name = 'CircuitOpenError'
}
}

export class DeadlineExceededError extends Error {
constructor(deadlineMs: number) {
super(`backend call exceeded per-attempt deadline of ${deadlineMs}ms`)
this.name = 'DeadlineExceededError'
}
}

/**
* Default retryable classification — network/timeout class errors. Errors
* a model deliberately throws (validation, refusal, 4xx) are not retried;
* those represent real outcomes, not transient infrastructure faults.
*/
export const defaultIsRetryable: RetryableErrorPredicate = (err) => {
if (err instanceof DeadlineExceededError) return true
if (err instanceof Error) {
const name = err.name
const message = err.message.toLowerCase()
if (name === 'AbortError' || name === 'TimeoutError') return true
if (
message.includes('econnreset') ||
message.includes('etimedout') ||
message.includes('econnrefused') ||
message.includes('socket hang up') ||
message.includes('network') ||
message.includes('fetch failed')
) {
return true
}
}
return false
}

/** Live circuit-breaker state — one instance per (participant, conversation run). */
export class CircuitBreakerState {
private consecutiveFailures = 0
private openedAt: number | undefined

constructor(private readonly config: CircuitBreakerConfig | undefined) {}

/**
* Check whether the next call is allowed. Throws `CircuitOpenError` when
* the breaker is open and the cooldown hasn't elapsed.
*/
preflight(participant: string, now: number = Date.now()): void {
if (!this.config || this.openedAt === undefined) return
const remaining = this.config.cooldownMs - (now - this.openedAt)
if (remaining > 0) {
throw new CircuitOpenError(participant, remaining)
}
this.openedAt = undefined
this.consecutiveFailures = 0
}

recordSuccess(): void {
this.consecutiveFailures = 0
this.openedAt = undefined
}

recordFailure(now: number = Date.now()): void {
if (!this.config) return
this.consecutiveFailures += 1
if (this.consecutiveFailures >= this.config.failuresToOpen) {
this.openedAt = now
}
}
}

/**
* Build a per-attempt AbortSignal linked to the parent signal AND fired when
* the deadline elapses. The returned `dispose()` MUST be called in a
* `finally` (clears the timer, detaches the listener) so we don't leak.
*
* When the deadline fires, the signal's `reason` is a `DeadlineExceededError`
* — callers can detect timeout-vs-cancel by reading `signal.reason` after
* the underlying operation throws.
*/
export function makePerAttemptSignal(
parentSignal: AbortSignal | undefined,
deadlineMs: number | undefined,
): {
signal: AbortSignal
dispose: () => void
getDeadlineError(): DeadlineExceededError | undefined
} {
const controller = new AbortController()
let deadlineError: DeadlineExceededError | undefined
const cleanups: Array<() => void> = []

if (parentSignal) {
if (parentSignal.aborted) controller.abort(parentSignal.reason)
else {
const onAbort = () => controller.abort(parentSignal.reason)
parentSignal.addEventListener('abort', onAbort, { once: true })
cleanups.push(() => parentSignal.removeEventListener('abort', onAbort))
}
}
if (deadlineMs !== undefined) {
const ms = deadlineMs
const timer = setTimeout(() => {
deadlineError = new DeadlineExceededError(ms)
controller.abort(deadlineError)
}, ms)
cleanups.push(() => clearTimeout(timer))
}
return {
signal: controller.signal,
dispose() {
for (const c of cleanups) c()
},
getDeadlineError() {
return deadlineError
},
}
}

/** Compute the delay before the next attempt. Default: 250ms exponential with jitter. */
export function computeBackoff(spec: RetryBackoff | undefined, attempt: number): number {
if (spec === undefined) {
const base = 250
const jitter = Math.floor(Math.random() * base)
return base * 2 ** (attempt - 1) + jitter
}
if (typeof spec === 'function') return Math.max(0, spec(attempt))
return Math.max(0, spec)
}

export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
Loading
Loading