diff --git a/app/features/agents/agents/base.py b/app/features/agents/agents/base.py index f9f4e1a0..90c433b6 100644 --- a/app/features/agents/agents/base.py +++ b/app/features/agents/agents/base.py @@ -7,10 +7,12 @@ import functools import inspect +import json import os from collections.abc import Awaitable, Callable -from typing import Any +from typing import Any, cast +import httpx import structlog from pydantic_ai import ModelRetry from pydantic_ai.models import Model @@ -62,6 +64,71 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> ToolReturnT: return wrapper +def _coerce_null_message_content(body: bytes) -> bytes | None: + """Coerce ``messages[*].content: null`` -> ``""`` in a chat-request body. + + Ollama's OpenAI-compatible ``/v1/chat/completions`` rejects any message + whose ``content`` is JSON ``null`` and which carries no ``tool_calls`` with + ``400 invalid message content type: `` — stricter than the real OpenAI + API, which tolerates it. A weak local model can emit a degenerate empty + assistant turn (no text, no tool call); PydanticAI serialises it as + ``content: null`` and then *replays* that message on its validation-retry, + so every retry 400s and the whole run dies with a ``FallbackExceptionGroup``. + Coercing ``null`` -> ``""`` keeps the message OpenAI-spec-valid and lets the + retry loop proceed. + + Args: + body: The raw outgoing request body bytes. + + Returns: + Re-serialised body bytes when a null ``content`` was rewritten, or + ``None`` when nothing changed (the common case) so the caller can + forward the original request untouched. + """ + try: + parsed = json.loads(body) + except (ValueError, TypeError): + return None + if not isinstance(parsed, dict): + return None + payload = cast("dict[str, Any]", parsed) + messages = payload.get("messages") + if not isinstance(messages, list): + return None + message_list: list[Any] = messages + changed = False + for message in message_list: + if isinstance(message, dict) and "content" in message and message["content"] is None: + message["content"] = "" + changed = True + if not changed: + return None + return json.dumps(payload).encode("utf-8") + + +class _OllamaNullContentTransport(httpx.AsyncHTTPTransport): + """httpx transport that null-content-sanitises outgoing Ollama requests. + + See :func:`_coerce_null_message_content` for the Ollama-compat defect this + works around. Applied to the ``OllamaProvider``'s HTTP client so the fix + covers both the streaming and non-streaming agent paths. + """ + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + sanitized = _coerce_null_message_content(request.content) + if sanitized is not None: + headers = dict(request.headers) + headers.pop("content-length", None) # httpx recomputes from the new body + request = httpx.Request( + request.method, + request.url, + headers=headers, + content=sanitized, + extensions=request.extensions, + ) + return await super().handle_async_request(request) + + def build_agent_model(identifier: str) -> str | Model: """Build the PydanticAI ``model`` argument for an agent identifier. @@ -85,7 +152,17 @@ def build_agent_model(identifier: str) -> str | Model: model_name = identifier.split(":", 1)[1] # CRITICAL: Ollama's OpenAI-compatible base ends in /v1. base_url = settings.ollama_base_url.rstrip("/") + "/v1" - return OpenAIChatModel(model_name, provider=OllamaProvider(base_url=base_url)) + # The null-content sanitiser lives on the HTTP client (see + # _OllamaNullContentTransport). A generous read timeout is required because + # local generation on an 8B model routinely exceeds httpx's 5s default. + http_client = httpx.AsyncClient( + transport=_OllamaNullContentTransport(), + timeout=httpx.Timeout(600.0, connect=10.0), + ) + return OpenAIChatModel( + model_name, + provider=OllamaProvider(base_url=base_url, http_client=http_client), + ) def reset_agent_caches() -> None: diff --git a/app/features/agents/service.py b/app/features/agents/service.py index 20625e01..8009ea08 100644 --- a/app/features/agents/service.py +++ b/app/features/agents/service.py @@ -290,8 +290,23 @@ async def chat( error=str(e), error_type=type(e).__name__, ) - session.last_activity = datetime.now(UTC) + misbehavior_now = datetime.now(UTC) + session.last_activity = misbehavior_now + # A gated tool may have fired (and recorded a valid approval request) + # before the model misbehaved — surface the Approve card rather than + # discarding it behind the generic error (#344). + salvaged = self._salvage_pending_action(session, deps, misbehavior_now) await db.flush() + if salvaged is not None: + return ChatResponse( + session_id=session_id, + message=( + "I've prepared an action that needs your approval before " + "I can proceed. Please review the pending request." + ), + pending_approval=True, + pending_action=salvaged, + ) return ChatResponse( session_id=session_id, message=( @@ -650,6 +665,35 @@ async def stream_chat( error=str(e), error_type=type(e).__name__, ) + misbehavior_now = datetime.now(UTC) + session.last_activity = misbehavior_now + # A gated tool may have fired (and recorded a valid approval request) + # before the model misbehaved — surface the Approve card rather than + # discarding it behind the generic error (#344). + salvaged = self._salvage_pending_action(session, deps, misbehavior_now) + await db.flush() + if salvaged is not None: + yield StreamEvent( + event_type="approval_required", + data={ + "action": salvaged, + "message": "Human approval required before proceeding.", + }, + timestamp=misbehavior_now, + ) + yield StreamEvent( + event_type="complete", + data={ + "message": ( + "I've prepared an action that needs your approval before I can proceed." + ), + "tokens_used": 0, + "tool_calls_count": deps.tool_call_count, + "pending_approval": True, + }, + timestamp=misbehavior_now, + ) + return yield StreamEvent( event_type="error", data={ @@ -660,7 +704,7 @@ async def stream_chat( "error_type": "model_behavior_error", "recoverable": True, }, - timestamp=datetime.now(UTC), + timestamp=misbehavior_now, ) return @@ -858,6 +902,45 @@ def _deserialize_messages( ) return [] + def _salvage_pending_action( + self, + session: AgentSession, + deps: AgentDeps, + now: datetime, + ) -> PendingAction | None: + """Persist a gated tool's approval request captured before a misbehaving run. + + A gated tool sets ``deps.pending_action`` the moment it fires (#336), but + it does not halt the run. A weak model can ramble past the gate and + exhaust its retry budget, so ``agent.run()`` raises + ``UnexpectedModelBehavior`` BEFORE returning and the normal post-run + approval-surfacing path never executes. The gate did fire and the + captured arguments are valid, so surface the approval card instead of + discarding it behind a generic error (issue #344). + + Args: + session: The agent session to mutate. + deps: The agent deps that a gated tool may have written to. + now: Timestamp for created_at / expires_at. + + Returns: + The formatted :class:`PendingAction` when a gated tool recorded a + request, else ``None`` (the genuine "invalid tool call" case). + """ + if not deps.pending_action: + return None + action_type = str(deps.pending_action.get("action_type", "unknown")) + return self._record_pending_action( + session, + action_type=action_type, + arguments=deps.pending_action.get("arguments") or {}, + description=str( + deps.pending_action.get("description") + or f"Agent requested approval for {action_type}" + ), + now=now, + ) + def _record_pending_action( self, session: AgentSession, diff --git a/app/features/agents/tests/test_base.py b/app/features/agents/tests/test_base.py index ddfcd02b..c7d8d29a 100644 --- a/app/features/agents/tests/test_base.py +++ b/app/features/agents/tests/test_base.py @@ -1,10 +1,12 @@ """Unit tests for agent base helpers (Ollama-aware model factory).""" +import json import re from collections.abc import Iterator from typing import Any, cast from unittest.mock import AsyncMock +import httpx import pytest from pydantic_ai import ModelRetry from pydantic_ai.messages import ModelMessage, ModelResponse, TextPart @@ -15,6 +17,8 @@ from app.core.config import get_settings from app.features.agents.agents.base import ( TOOL_USAGE_INSTRUCTIONS, + _coerce_null_message_content, + _OllamaNullContentTransport, build_agent_model, build_agent_model_with_fallback, get_agent_retries, @@ -322,3 +326,134 @@ def respond(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: assert captured["output_tools"] == [] assert isinstance(result.output, RAGAnswer) assert result.output.confidence == "high" + + +class TestOllamaNullContentSanitizer: + """The Ollama HTTP client must convert ``content: null`` -> ``""`` (#344). + + Ollama's OpenAI-compatible ``/v1/chat/completions`` rejects any message + whose ``content`` is JSON ``null`` and carries no ``tool_calls`` with + ``400 invalid message content type: ``. PydanticAI emits that shape for + a degenerate empty assistant turn and then replays it on retry, so without + this coercion every retry 400s and the run dies with ``FallbackExceptionGroup``. + """ + + def test_coerce_rewrites_null_content_to_empty_string(self) -> None: + body = json.dumps( + { + "model": "qwen3:8b", + "messages": [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": None}, + ], + } + ).encode("utf-8") + + out = _coerce_null_message_content(body) + + assert out is not None + payload = json.loads(out) + assert payload["messages"][1]["content"] == "" + # Untouched fields survive the round-trip. + assert payload["messages"][0]["content"] == "hi" + assert payload["model"] == "qwen3:8b" + + def test_coerce_rewrites_null_content_even_with_tool_calls(self) -> None: + body = json.dumps( + { + "messages": [ + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "c1", + "type": "function", + "function": {"name": "x", "arguments": "{}"}, + } + ], + } + ] + } + ).encode("utf-8") + + out = _coerce_null_message_content(body) + + assert out is not None + payload = json.loads(out) + assert payload["messages"][0]["content"] == "" + assert payload["messages"][0]["tool_calls"][0]["id"] == "c1" + + def test_coerce_is_noop_when_no_null_content(self) -> None: + body = json.dumps({"messages": [{"role": "user", "content": "hi"}]}).encode("utf-8") + + assert _coerce_null_message_content(body) is None + + def test_coerce_ignores_missing_content_key(self) -> None: + # A message with no ``content`` key at all must not be rewritten — only + # an explicit JSON null is the Ollama-rejected shape. + body = json.dumps({"messages": [{"role": "assistant", "tool_calls": []}]}).encode("utf-8") + + assert _coerce_null_message_content(body) is None + + def test_coerce_handles_non_json_body(self) -> None: + assert _coerce_null_message_content(b"not json at all") is None + + def test_coerce_handles_non_dict_payload(self) -> None: + assert _coerce_null_message_content(b"[1, 2, 3]") is None + + @pytest.mark.asyncio + async def test_transport_sanitizes_outgoing_request( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The transport rewrites the body and fixes Content-Length before send.""" + captured: dict[str, bytes] = {} + + async def fake_send( + _self: httpx.AsyncHTTPTransport, request: httpx.Request + ) -> httpx.Response: + captured["body"] = request.content + captured["content_length"] = request.headers["content-length"].encode() + return httpx.Response(200, json={"ok": True}) + + monkeypatch.setattr(httpx.AsyncHTTPTransport, "handle_async_request", fake_send) + + transport = _OllamaNullContentTransport() + body = json.dumps({"messages": [{"role": "assistant", "content": None}]}).encode("utf-8") + request = httpx.Request("POST", "http://ollama/v1/chat/completions", content=body) + + await transport.handle_async_request(request) + + sent = json.loads(captured["body"]) + assert sent["messages"][0]["content"] == "" + # Content-Length must match the rewritten body, not the original. + assert int(captured["content_length"]) == len(captured["body"]) + + @pytest.mark.asyncio + async def test_transport_passthrough_when_nothing_to_sanitize( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + captured: dict[str, bytes] = {} + + async def fake_send( + _self: httpx.AsyncHTTPTransport, request: httpx.Request + ) -> httpx.Response: + captured["body"] = request.content + return httpx.Response(200, json={"ok": True}) + + monkeypatch.setattr(httpx.AsyncHTTPTransport, "handle_async_request", fake_send) + + transport = _OllamaNullContentTransport() + body = json.dumps({"messages": [{"role": "user", "content": "hi"}]}).encode("utf-8") + request = httpx.Request("POST", "http://ollama/v1/chat/completions", content=body) + + await transport.handle_async_request(request) + + # Forwarded unchanged. + assert json.loads(captured["body"])["messages"][0]["content"] == "hi" + + def test_build_agent_model_returns_openai_chat_model_for_ollama(self) -> None: + # The Ollama branch must hand back a configured OpenAIChatModel (whose + # HTTP client carries the sanitizing transport), not the bare identifier. + model = build_agent_model("ollama:qwen3:8b") + assert isinstance(model, OpenAIChatModel) diff --git a/app/features/agents/tests/test_service.py b/app/features/agents/tests/test_service.py index 74709be2..888260ec 100644 --- a/app/features/agents/tests/test_service.py +++ b/app/features/agents/tests/test_service.py @@ -434,6 +434,99 @@ async def __aexit__(self, *exc: object) -> bool: assert events[0].data["error_type"] == "model_behavior_error" assert "exceeded max retries" not in events[0].data["error"] + @pytest.mark.asyncio + async def test_chat_surfaces_pending_action_on_model_misbehavior( + self, + sample_active_session: AgentSession, + ) -> None: + """A gated tool that fired before the model misbehaved must surface the + Approve card, not the generic error (#344). + + A gated tool records ``deps.pending_action`` the moment it fires, but a + weak model can ramble past the gate and exhaust its retry budget, so + ``agent.run`` raises ``UnexpectedModelBehavior`` before returning. The + captured approval is valid and must not be discarded. + """ + service = AgentService() + mock_db = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = sample_active_session + mock_db.execute.return_value = mock_result + + def _fire_gate_then_misbehave(*_args: Any, **kwargs: Any) -> None: + deps: AgentDeps = kwargs["deps"] + deps.set_pending_action( + "create_alias", + {"alias_name": "champion", "run_id": "1" * 32}, + "Create alias champion", + ) + raise UnexpectedModelBehavior("Exceeded maximum output retries (3)") + + mock_agent = MagicMock() + mock_agent.run = AsyncMock(side_effect=_fire_gate_then_misbehave) + + with patch.object(service, "_get_agent", return_value=mock_agent): + response = await service.chat( + db=mock_db, + session_id=sample_active_session.session_id, + message="Create alias champion. Call tool_create_alias now.", + ) + + assert response.pending_approval is True + assert response.pending_action is not None + assert response.pending_action.action_type == "create_alias" + assert response.pending_action.arguments["alias_name"] == "champion" + assert "invalid tool call" not in response.message + # Session flipped so POST /approve can find the action. + assert sample_active_session.status == SessionStatus.AWAITING_APPROVAL.value + assert sample_active_session.pending_action is not None + + @pytest.mark.asyncio + async def test_stream_chat_surfaces_approval_on_model_misbehavior( + self, + sample_active_session: AgentSession, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """The streaming path must emit ``approval_required`` (not ``error``) + when a gated tool fired before the model misbehaved (#344).""" + service = AgentService() + # Pin ollama so stream_chat uses the non-streaming run() path (#342) — + # the real-world scenario where this surfaced. + monkeypatch.setattr(service.settings, "agent_default_model", "ollama:qwen3:8b") + mock_db = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = sample_active_session + mock_db.execute.return_value = mock_result + + def _fire_gate_then_misbehave(*_args: Any, **kwargs: Any) -> None: + deps: AgentDeps = kwargs["deps"] + deps.set_pending_action( + "create_alias", + {"alias_name": "champion", "run_id": "1" * 32}, + "Create alias champion", + ) + raise UnexpectedModelBehavior("Exceeded maximum output retries (3)") + + mock_agent = MagicMock() + mock_agent.run = AsyncMock(side_effect=_fire_gate_then_misbehave) + + with patch.object(service, "_get_agent", return_value=mock_agent): + events = [ + event + async for event in service.stream_chat( + db=mock_db, + session_id=sample_active_session.session_id, + message="Create alias champion. Call tool_create_alias now.", + ) + ] + + event_types = [event.event_type for event in events] + assert "approval_required" in event_types + assert "error" not in event_types + approval = next(e for e in events if e.event_type == "approval_required") + assert approval.data["action"].action_type == "create_alias" + assert sample_active_session.status == SessionStatus.AWAITING_APPROVAL.value + @pytest.mark.asyncio async def test_stream_chat_runs_tools_sequentially( self, diff --git a/frontend/src/lib/approval-report.test.ts b/frontend/src/lib/approval-report.test.ts new file mode 100644 index 00000000..f94f04d9 --- /dev/null +++ b/frontend/src/lib/approval-report.test.ts @@ -0,0 +1,67 @@ +import { describe, it, expect } from 'vitest' +import { formatApprovalReport } from './approval-report' +import type { ApprovalResponse } from '@/types/api' + +describe('formatApprovalReport', () => { + it('reports a successful execution', () => { + const res: ApprovalResponse = { + action_id: 'a1', + approved: true, + status: 'executed', + result: { alias_name: 'champion' }, + } + const msg = formatApprovalReport('create_alias', res) + expect(msg).toContain('✅') + expect(msg).toContain('create_alias') + expect(msg).toContain('executed successfully') + }) + + it('reports an approved-but-failed execution with the error cause', () => { + // The backend marks a failed execution `rejected` with the cause in result.error. + const res: ApprovalResponse = { + action_id: 'a2', + approved: true, + status: 'rejected', + result: { error: 'Run not found: 3c5d', error_type: 'ValueError' }, + } + const msg = formatApprovalReport('create_alias', res) + expect(msg).toContain('❌') + expect(msg).toContain('could not be executed') + expect(msg).toContain('Run not found: 3c5d') + }) + + it('reports an operator rejection (no execution)', () => { + const res: ApprovalResponse = { + action_id: 'a3', + approved: false, + status: 'rejected', + result: null, + } + const msg = formatApprovalReport('archive_run', res) + expect(msg).toContain('🚫') + expect(msg).toContain('Rejected') + expect(msg).toContain('No action was taken') + }) + + it('reports an expired approval', () => { + const res: ApprovalResponse = { + action_id: 'a4', + approved: true, + status: 'expired', + result: null, + } + const msg = formatApprovalReport('save_scenario', res) + expect(msg).toContain('⏰') + expect(msg).toContain('expired') + }) + + it('does not throw on a non-object result', () => { + const res: ApprovalResponse = { + action_id: 'a5', + approved: true, + status: 'executed', + result: 'ok', + } + expect(() => formatApprovalReport('create_alias', res)).not.toThrow() + }) +}) diff --git a/frontend/src/lib/approval-report.ts b/frontend/src/lib/approval-report.ts new file mode 100644 index 00000000..f041157e --- /dev/null +++ b/frontend/src/lib/approval-report.ts @@ -0,0 +1,44 @@ +import type { ApprovalResponse } from '@/types/api' + +/** + * Build a human-readable chat report for an approved/rejected agent action. + * + * The backend's `POST /approve` returns an {@link ApprovalResponse} for every + * outcome, but the chat UI previously discarded it — so a click produced no + * visible result ("nothing returned"). This formats a one-line report for ALL + * outcomes so the operator always sees what happened: + * + * - `executed` → the action ran successfully. + * - approved but `rejected` + error → the action was approved but execution + * failed (the backend marks a failed execution `rejected` and puts the cause + * in `result.error`). + * - `rejected` (not approved) → the operator rejected the action. + * - `expired` → the approval window lapsed before it ran. + * + * @param actionLabel - The gated action name (e.g. `create_alias`). + * @param res - The approval response from the backend. + * @returns A markdown-ish one-line report for the chat transcript. + */ +export function formatApprovalReport(actionLabel: string, res: ApprovalResponse): string { + const result = + res.result && typeof res.result === 'object' + ? (res.result as Record) + : undefined + const errorDetail = + result && 'error' in result ? String(result.error) : undefined + + if (res.status === 'executed') { + return `✅ Approved — \`${actionLabel}\` executed successfully.` + } + if (res.approved && errorDetail) { + return `❌ Approved, but \`${actionLabel}\` could not be executed: ${errorDetail}` + } + if (!res.approved) { + return `🚫 Rejected \`${actionLabel}\`. No action was taken.` + } + if (res.status === 'expired') { + return `⏰ The \`${actionLabel}\` approval expired before it could run.` + } + // Defensive fallback: approved, not executed, no error detail. + return `\`${actionLabel}\` finished with status: ${res.status}.` +} diff --git a/frontend/src/pages/chat.tsx b/frontend/src/pages/chat.tsx index cc22a9d5..6bbaaeb6 100644 --- a/frontend/src/pages/chat.tsx +++ b/frontend/src/pages/chat.tsx @@ -16,8 +16,15 @@ import { SelectValue, } from '@/components/ui/select' import { api } from '@/lib/api' +import { formatApprovalReport } from '@/lib/approval-report' import { WS_URL, ROUTES } from '@/lib/constants' -import type { ChatMessage as ChatMessageType, AgentStreamEvent, AgentType, AgentSession } from '@/types/api' +import type { + ChatMessage as ChatMessageType, + AgentStreamEvent, + AgentType, + AgentSession, + ApprovalResponse, +} from '@/types/api' export default function ChatPage() { const [sessionId, setSessionId] = useState(null) @@ -142,38 +149,42 @@ export default function ChatPage() { send({ session_id: sessionId, message: content }) } - const handleApprove = async () => { - if (!sessionId || !pendingAction?.actionId) return - setIsApproving(true) - try { - await api(`/agents/sessions/${sessionId}/approve`, { - method: 'POST', - body: { action_id: pendingAction.actionId, approved: true }, - }) - setPendingAction(null) - } catch (error) { - console.error('Failed to approve:', error) - } finally { - setIsApproving(false) - } + const appendAssistantMessage = (content: string) => { + setMessages((prev) => [ + ...prev, + { role: 'assistant', content, timestamp: new Date().toISOString() }, + ]) } - const handleReject = async () => { + // Approve or reject a pending action, then ALWAYS surface the execution + // report — for every outcome (executed / failed / rejected / expired). The + // handlers previously discarded the /approve response, so a click left the + // user with no feedback ("nothing returned"). + const decideAction = async (approved: boolean) => { if (!sessionId || !pendingAction?.actionId) return + const actionLabel = pendingAction.action setIsApproving(true) try { - await api(`/agents/sessions/${sessionId}/approve`, { + const res = await api(`/agents/sessions/${sessionId}/approve`, { method: 'POST', - body: { action_id: pendingAction.actionId, approved: false }, + body: { action_id: pendingAction.actionId, approved }, }) setPendingAction(null) + appendAssistantMessage(formatApprovalReport(actionLabel, res)) } catch (error) { - console.error('Failed to reject:', error) + console.error(approved ? 'Failed to approve:' : 'Failed to reject:', error) + setPendingAction(null) + const verb = approved ? 'approve' : 'reject' + const detail = error instanceof Error ? error.message : 'request failed' + appendAssistantMessage(`Error: could not ${verb} \`${actionLabel}\` — ${detail}`) } finally { setIsApproving(false) } } + const handleApprove = () => decideAction(true) + const handleReject = () => decideAction(false) + const handleNewSession = () => { setSessionId(null) setMessages([]) diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index 3c62f684..df2289f4 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -624,6 +624,15 @@ export interface ChatMessage { timestamp: string } +/** Response from POST /agents/sessions/{id}/approve (mirrors backend ApprovalResponse). */ +export interface ApprovalResponse { + action_id: string + approved: boolean + /** Execution result on success, or `{ error, error_type }` when execution failed. */ + result?: unknown + status: 'executed' | 'rejected' | 'expired' +} + export interface ToolCall { tool_name: string arguments: Record