Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions cmd/claw-mcp-stdio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
defaultRestartBackoffMS = 1000
defaultRestartMaxMS = 15000
defaultMaxBodyBytes = 1 << 20
lateResponseTTL = 5 * time.Minute
)

type config struct {
Expand Down Expand Up @@ -59,6 +60,11 @@ type pendingCall struct {
ch chan []byte
}

type lateResponse struct {
reason string
at time.Time
}

type initCache struct {
response []byte
}
Expand All @@ -74,6 +80,7 @@ type stdioBridge struct {
available bool
generation int64
pending map[string]pendingCall
late map[string]lateResponse
sessions map[string]int64
initialized bool
init *initCache
Expand Down Expand Up @@ -248,6 +255,7 @@ func newStdioBridge(ctx context.Context, cfg config) *stdioBridge {
ctx: childCtx,
cancel: cancel,
pending: make(map[string]pendingCall),
late: make(map[string]lateResponse),
sessions: make(map[string]int64),
}
}
Expand Down Expand Up @@ -331,6 +339,7 @@ func (b *stdioBridge) markAvailable(stdin io.WriteCloser) int64 {
b.stdin = stdin
b.available = true
b.pending = make(map[string]pendingCall)
b.late = make(map[string]lateResponse)
b.sessions = make(map[string]int64)
b.initialized = false
b.init = nil
Expand All @@ -344,6 +353,7 @@ func (b *stdioBridge) markUnavailable(message string) {
pending.ch <- rpcErrorResponse(pending.originalID, -32000, message)
}
b.pending = make(map[string]pendingCall)
b.late = make(map[string]lateResponse)
b.sessions = make(map[string]int64)
b.initialized = false
b.init = nil
Expand Down Expand Up @@ -374,8 +384,16 @@ func (b *stdioBridge) readStdout(generation int64, r io.Reader) {
if ok {
delete(b.pending, key)
}
late, wasLate := b.late[key]
if wasLate {
delete(b.late, key)
}
b.mu.Unlock()
if !ok {
if wasLate {
fmt.Fprintf(os.Stderr, "claw-mcp-stdio received late response for canceled id %s (%s)\n", key, late.reason)
continue
}
fmt.Fprintf(os.Stderr, "claw-mcp-stdio ignored response for unknown id %s\n", key)
continue
}
Expand All @@ -397,6 +415,7 @@ func (b *stdioBridge) failGeneration(generation int64, message string) {
pending.ch <- rpcErrorResponse(pending.originalID, -32000, message)
}
b.pending = make(map[string]pendingCall)
b.late = make(map[string]lateResponse)
b.available = false
b.stdin = nil
b.sessions = make(map[string]int64)
Expand Down Expand Up @@ -576,10 +595,10 @@ func (b *stdioBridge) forwardRequest(ctx context.Context, body []byte, originalI
case resp := <-ch:
return resp, nil
case <-ctx.Done():
b.removePending(key)
b.removePending(key, "request context canceled")
return nil, ctx.Err()
case <-timer.C:
b.removePending(key)
b.removePending(key, "request timeout")
return nil, fmt.Errorf("stdio child request timed out after %s", timeout)
}
}
Expand Down Expand Up @@ -614,15 +633,35 @@ func (b *stdioBridge) writeToChild(body []byte, pending *pendingCall) (string, e
return key, nil
}

func (b *stdioBridge) removePending(key string) {
func (b *stdioBridge) removePending(key string, reason string) {
if key == "" {
return
}
b.mu.Lock()
delete(b.pending, key)
if _, ok := b.pending[key]; ok {
delete(b.pending, key)
b.rememberLateResponseLocked(key, reason)
}
b.mu.Unlock()
}

func (b *stdioBridge) rememberLateResponseLocked(key string, reason string) {
if b.late == nil {
b.late = make(map[string]lateResponse)
}
now := time.Now()
for existing, late := range b.late {
if now.Sub(late.at) > lateResponseTTL {
delete(b.late, existing)
}
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "request canceled"
}
b.late[key] = lateResponse{reason: reason, at: now}
}

func rewriteRequestID(body []byte, id json.RawMessage) ([]byte, error) {
var obj map[string]json.RawMessage
if err := json.Unmarshal(body, &obj); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions cmd/claw-mcp-stdio/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,29 @@ func TestBridgeRoutesConcurrentDuplicateClientIDs(t *testing.T) {
}
}

func TestBridgeLogsCanceledLateResponseSeparately(t *testing.T) {
bridge := newStdioBridge(testContext(t), config{})
bridge.mu.Lock()
bridge.generation = 1
bridge.pending["7"] = pendingCall{
originalID: json.RawMessage(`7`),
ch: make(chan []byte, 1),
}
bridge.mu.Unlock()

bridge.removePending("7", "request context canceled")
logged := captureStderr(t, func() {
bridge.readStdout(2, strings.NewReader(`{"jsonrpc":"2.0","id":7,"result":{"ok":true}}`+"\n"))
})

if !strings.Contains(logged, "late response for canceled id 7") || !strings.Contains(logged, "request context canceled") {
t.Fatalf("expected canceled late-response log, got %q", logged)
}
if strings.Contains(logged, "unknown id") {
t.Fatalf("late canceled response should not be logged as unknown: %q", logged)
}
}

func TestBridgeRejectsUnknownSession(t *testing.T) {
server, cleanup := newTestWrapper(t, "")
defer cleanup()
Expand Down Expand Up @@ -233,6 +256,27 @@ func jsonPath(data []byte, path string) any {
return current
}

func captureStderr(t *testing.T, fn func()) string {
t.Helper()
old := os.Stderr
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
os.Stderr = w
defer func() {
os.Stderr = old
}()
fn()
_ = w.Close()
raw, err := io.ReadAll(r)
if err != nil {
t.Fatal(err)
}
_ = r.Close()
return string(raw)
}

func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
Expand Down
1 change: 1 addition & 0 deletions cmd/claw/skill_data/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ When a service subscribes to tools via `x-claw.tools`, cllama performs bounded t

- Tools are injected into the LLM request as available tool definitions
- When the model calls a tool, cllama executes against the providing service. The execution path depends on the descriptor: HTTP-native services use the per-tool `http` metadata; MCP sidecars (descriptor declares a top-level `mcp` block, v0.11.0+) are reached via the Streamable HTTP `tools/call` endpoint with cached `initialize` sessions.
- Repeated identical managed tool calls in one mediated turn are not re-executed; cllama returns a structured `duplicate_tool_call` result and records duplicate metadata in `tool_trace`
- Tool results are fed back to the model for up to 8 rounds (configurable)
- `tool_trace` entries appear in session history for auditability
- Works with both OpenAI-compatible and Anthropic-format requests
Expand Down
3 changes: 3 additions & 0 deletions internal/audit/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type Event struct {
ToolName string `json:"tool_name,omitempty"`
ToolService string `json:"tool_service,omitempty"`
ToolRound *int `json:"tool_round,omitempty"`
ToolDuplicate bool `json:"tool_duplicate,omitempty"`
ToolDuplicateRound *int `json:"tool_duplicate_of_round,omitempty"`
ToolDuplicateCount *int `json:"tool_duplicate_count,omitempty"`
// provider_pool event fields
Provider string `json:"provider,omitempty"`
KeyID string `json:"key_id,omitempty"`
Expand Down
11 changes: 11 additions & 0 deletions internal/audit/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func NormalizeSessionHistoryLine(line []byte) ([]Event, error) {
value := roundNumber
event.ToolRound = &value
}
if duplicate, _ := boolField(callMap, "duplicate"); duplicate {
event.ToolDuplicate = true
}
if duplicateRound, ok := intField(callMap, "duplicate_of_round"); ok {
value := duplicateRound
event.ToolDuplicateRound = &value
}
if duplicateCount, ok := intField(callMap, "duplicate_count"); ok {
value := duplicateCount
event.ToolDuplicateCount = &value
}
if hasTotalRounds {
value := totalRounds
event.TotalRounds = &value
Expand Down
24 changes: 24 additions & 0 deletions internal/audit/normalize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,30 @@ func TestNormalizeSessionHistoryLineOpenAIManagedTool(t *testing.T) {
}
}

func TestNormalizeSessionHistoryLineManagedToolDuplicateMetadata(t *testing.T) {
line := `{"version":1,"id":"hist1_dup","status":"ok","ts":"2026-04-03T12:00:00Z","claw_id":"agent","path":"/v1/chat/completions","requested_model":"xai/grok","effective_provider":"xai","effective_model":"grok","status_code":200,"usage":{"total_rounds":3},"tool_trace":[{"round":2,"tool_calls":[{"name":"search.query","service":"search","status_code":409,"duplicate":true,"duplicate_of_round":1,"duplicate_count":2,"result":{"ok":false,"error":{"code":"duplicate_tool_call","message":"already ran"}}}]}]}`
events, err := NormalizeSessionHistoryLine([]byte(line))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(events) != 1 {
t.Fatalf("expected 1 tool event, got %+v", events)
}
event := events[0]
if !event.ToolDuplicate {
t.Fatalf("expected duplicate flag, got %+v", event)
}
if event.ToolDuplicateRound == nil || *event.ToolDuplicateRound != 1 {
t.Fatalf("expected duplicate source round 1, got %+v", event)
}
if event.ToolDuplicateCount == nil || *event.ToolDuplicateCount != 2 {
t.Fatalf("expected duplicate count 2, got %+v", event)
}
if event.Error != "already ran" {
t.Fatalf("expected extracted duplicate error message, got %+v", event)
}
}

func TestNormalizeSessionHistoryLineAnthropicManagedToolFailure(t *testing.T) {
line := `{"version":1,"id":"hist1_def","status":"error","ts":"2026-04-03T12:05:00Z","claw_id":"nano-bot","path":"/v1/messages","requested_model":"anthropic/claude-sonnet-4","effective_provider":"anthropic","effective_model":"anthropic/claude-sonnet-4","status_code":502,"response":{"format":"json","json":{"error":{"message":"tool result rejected"}}},"usage":{"prompt_tokens":140,"completion_tokens":30,"total_rounds":2},"tool_trace":[{"round":1,"tool_calls":[{"name":"trading-api.get_market_context","service":"trading-api","latency_ms":145,"status_code":504,"result":{"ok":false,"error":"backend timeout"}}]}]}`
events, err := NormalizeSessionHistoryLine([]byte(line))
Expand Down
2 changes: 2 additions & 0 deletions site/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ outline: deep

## Unreleased

- Fix: managed tool mediation now skips repeated identical managed tool calls within a single turn and feeds the model a structured `duplicate_tool_call` result instead of re-executing the sidecar ([#191](https://github.com/mostlydev/clawdapus/issues/191)). Session history records duplicate metadata in `tool_trace`, cllama logs a duplicate intervention with the tool name, and the stdio MCP wrapper now distinguishes late responses after cancellation or timeout from truly unknown JSON-RPC IDs.

## v0.13.3 <Badge type="tip" text="Latest" /> {#v0-13-3}

*2026-04-28*
Expand Down
4 changes: 3 additions & 1 deletion site/guide/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ When the LLM returns a response containing tool calls, cllama dispatches based o

Unknown managed tool names are rejected at validation time.

Within a single mediated turn, cllama also detects repeated managed tool calls with the same canonical tool name and arguments. The first call executes normally; later duplicates are skipped and returned to the model as a structured `duplicate_tool_call` tool result, with `tool_trace` metadata showing the original round and duplicate count. This keeps retry loops from repeatedly burning MCP sidecar time for the same query.

**Budget limits** (configurable in pod YAML, compiled into `tools.json`):

| Limit | Default |
Expand All @@ -257,7 +259,7 @@ Hidden tool rounds are preserved across turns. When the runner sends its next re

### Error Handling

Tool execution errors are fed back to the LLM as structured results inside the mediated loop — the LLM decides how to communicate the failure to the runner. If cllama itself encounters a fatal error (budget exhaustion, internal failure), it returns `502` to the runner.
Tool execution errors are fed back to the LLM as structured results inside the mediated loop — the LLM decides how to communicate the failure to the runner. Duplicate managed tool calls are treated as structured tool errors rather than sidecar executions. If cllama itself encounters a fatal error (budget exhaustion, internal failure), it returns `502` to the runner.

## Telemetry and Audit

Expand Down
1 change: 1 addition & 0 deletions skills/clawdapus/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ When a service subscribes to tools via `x-claw.tools`, cllama performs bounded t

- Tools are injected into the LLM request as available tool definitions
- When the model calls a tool, cllama executes against the providing service. The execution path depends on the descriptor: HTTP-native services use the per-tool `http` metadata; MCP sidecars (descriptor declares a top-level `mcp` block, v0.11.0+) are reached via the Streamable HTTP `tools/call` endpoint with cached `initialize` sessions.
- Repeated identical managed tool calls in one mediated turn are not re-executed; cllama returns a structured `duplicate_tool_call` result and records duplicate metadata in `tool_trace`
- Tool results are fed back to the model for up to 8 rounds (configurable)
- `tool_trace` entries appear in session history for auditability
- Works with both OpenAI-compatible and Anthropic-format requests
Expand Down
Loading