diff --git a/gpt_oss/responses_api/api_server.py b/gpt_oss/responses_api/api_server.py index f3ba26ae..009fa8d8 100644 --- a/gpt_oss/responses_api/api_server.py +++ b/gpt_oss/responses_api/api_server.py @@ -131,6 +131,7 @@ def generate_response( dict[str, list[CodeInterpreterOutputLogs | CodeInterpreterOutputImage]] ] = None, reasoning_ids: Optional[list[str]] = None, + message_ids: Optional[list[str]] = None, treat_functions_python_as_builtin: bool = False, ) -> ResponseObject: output = [] @@ -157,6 +158,7 @@ def generate_response( browser_tool_index = 0 python_tool_index = 0 reasoning_ids_iter = iter(reasoning_ids or []) + message_ids_iter = iter(message_ids or []) for entry in entries: entry_dict = entry.to_dict() @@ -296,8 +298,10 @@ def generate_response( ) ) + message_id = next(message_ids_iter, None) output.append( Item( + id=message_id, type="message", role="assistant", content=content, @@ -305,6 +309,11 @@ def generate_response( ) ) elif entry_dict["channel"] == "analysis": + if entry_dict.get("recipient"): + continue + author_dict = entry_dict.get("author") or {} + if author_dict.get("role") and author_dict.get("role") != "assistant": + continue summary = [] content = [ ReasoningTextContentItem( @@ -374,6 +383,7 @@ def generate_response( ) class StreamResponsesEvents: + BROWSER_RESERVED_FUNCTIONS = {"browser.search", "browser.open", "browser.find"} initial_tokens: list[int] tokens: list[int] output_tokens: list[int] @@ -429,7 +439,48 @@ def __init__( ] = {} self.reasoning_item_ids: list[str] = [] self.current_reasoning_item_id: Optional[str] = None + self.message_item_ids: list[str] = [] + self.current_message_item_id: Optional[str] = None self.functions_python_as_builtin = functions_python_as_builtin + self.user_defined_function_names = { + name + for tool in (request_body.tools or []) + for name in [getattr(tool, "name", None)] + if getattr(tool, "type", None) == "function" and name + } + + def _resolve_browser_recipient( + self, recipient: Optional[str] + ) -> tuple[Optional[str], bool]: + if not self.use_browser_tool or not recipient: + return (None, False) + + if recipient.startswith("browser."): + return (recipient, False) + + if recipient.startswith("functions."): + potential = recipient[len("functions.") :] + if ( + potential in self.BROWSER_RESERVED_FUNCTIONS + and potential not in self.user_defined_function_names + ): + return (potential, True) + + return (None, False) + + def _ensure_message_item_id(self) -> str: + if self.current_message_item_id is None: + message_id = f"item_{uuid.uuid4().hex}" + self.current_message_item_id = message_id + self.message_item_ids.append(message_id) + return self.current_message_item_id + + def _ensure_reasoning_item_id(self) -> str: + if self.current_reasoning_item_id is None: + reasoning_id = f"rs_{uuid.uuid4().hex}" + self.current_reasoning_item_id = reasoning_id + self.reasoning_item_ids.append(reasoning_id) + return self.current_reasoning_item_id def _send_event(self, event: ResponseEvent): event.sequence_number = self.sequence_number @@ -455,6 +506,7 @@ async def run(self): python_call_ids=self.python_call_ids, python_call_outputs=getattr(self, "python_call_outputs", None), reasoning_ids=self.reasoning_item_ids, + message_ids=self.message_item_ids, treat_functions_python_as_builtin=self.functions_python_as_builtin, ) initial_response.status = "in_progress" @@ -508,8 +560,11 @@ async def run(self): previous_item = self.parser.messages[-1] if previous_item.recipient is not None: recipient = previous_item.recipient + browser_recipient, _ = self._resolve_browser_recipient( + recipient + ) if ( - not recipient.startswith("browser.") + browser_recipient is None and not ( recipient == "python" or ( @@ -542,18 +597,23 @@ async def run(self): ), ) ) - if previous_item.channel == "analysis": - reasoning_id = self.current_reasoning_item_id - if reasoning_id is None: - reasoning_id = f"rs_{uuid.uuid4().hex}" - self.reasoning_item_ids.append(reasoning_id) - self.current_reasoning_item_id = reasoning_id + if ( + previous_item.channel == "analysis" + and previous_item.recipient is None + ): + reasoning_id = ( + self.current_reasoning_item_id + if self.current_reasoning_item_id is not None + else self._ensure_reasoning_item_id() + ) + reasoning_text = previous_item.content[0].text yield self._send_event( ResponseReasoningTextDone( type="response.reasoning_text.done", output_index=current_output_index, content_index=current_content_index, - text=previous_item.content[0].text, + item_id=reasoning_id, + text=reasoning_text, ) ) yield self._send_event( @@ -561,9 +621,10 @@ async def run(self): type="response.content_part.done", output_index=current_output_index, content_index=current_content_index, + item_id=reasoning_id, part=ReasoningTextContentItem( type="reasoning_text", - text=previous_item.content[0].text, + text=reasoning_text, ), ) ) @@ -578,7 +639,7 @@ async def run(self): content=[ ReasoningTextContentItem( type="reasoning_text", - text=previous_item.content[0].text, + text=reasoning_text, ) ], ), @@ -605,11 +666,17 @@ async def run(self): text=normalized_text, annotations=annotations, ) + message_id = ( + self.current_message_item_id + if self.current_message_item_id is not None + else self._ensure_message_item_id() + ) yield self._send_event( ResponseOutputTextDone( type="response.output_text.done", output_index=current_output_index, content_index=current_content_index, + item_id=message_id, text=normalized_text, ) ) @@ -618,6 +685,7 @@ async def run(self): type="response.content_part.done", output_index=current_output_index, content_index=current_content_index, + item_id=message_id, part=text_content, ) ) @@ -626,6 +694,7 @@ async def run(self): type="response.output_item.done", output_index=current_output_index, item=Item( + id=message_id, type="message", role="assistant", content=[text_content], @@ -634,6 +703,7 @@ async def run(self): ) current_annotations = [] current_output_text_content = "" + self.current_message_item_id = None if ( self.parser.last_content_delta @@ -642,11 +712,17 @@ async def run(self): ): if not sent_output_item_added: sent_output_item_added = True + message_id = self._ensure_message_item_id() yield self._send_event( ResponseOutputItemAdded( type="response.output_item.added", output_index=current_output_index, - item=Item(type="message", role="assistant", content=[]), + item=Item( + id=message_id, + type="message", + role="assistant", + content=[], + ), ) ) yield self._send_event( @@ -654,6 +730,7 @@ async def run(self): type="response.content_part.added", output_index=current_output_index, content_index=current_content_index, + item_id=message_id, part=TextContentItem(type="output_text", text=""), ) ) @@ -685,11 +762,13 @@ async def run(self): for a in new_annotations: current_annotations.append(a) citation = UrlCitation(**a) + message_id = self._ensure_message_item_id() yield self._send_event( ResponseOutputTextAnnotationAdded( type="response.output_text.annotation.added", output_index=current_output_index, content_index=current_content_index, + item_id=message_id, annotation_index=len(current_annotations), annotation=citation, ) @@ -699,11 +778,13 @@ async def run(self): should_send_output_text_delta = False if should_send_output_text_delta: + message_id = self._ensure_message_item_id() yield self._send_event( ResponseOutputTextDelta( type="response.output_text.delta", output_index=current_output_index, content_index=current_content_index, + item_id=message_id, delta=output_delta_buffer, ) ) @@ -717,9 +798,7 @@ async def run(self): ): if not sent_output_item_added: sent_output_item_added = True - reasoning_id = f"rs_{uuid.uuid4().hex}" - self.current_reasoning_item_id = reasoning_id - self.reasoning_item_ids.append(reasoning_id) + reasoning_id = self._ensure_reasoning_item_id() yield self._send_event( ResponseOutputItemAdded( type="response.output_item.added", @@ -737,16 +816,19 @@ async def run(self): type="response.content_part.added", output_index=current_output_index, content_index=current_content_index, + item_id=reasoning_id, part=ReasoningTextContentItem( type="reasoning_text", text="" ), ) ) + reasoning_id = self._ensure_reasoning_item_id() yield self._send_event( ResponseReasoningTextDelta( type="response.reasoning_text.delta", output_index=current_output_index, content_index=current_content_index, + item_id=reasoning_id, delta=self.parser.last_content_delta, ) ) @@ -763,14 +845,20 @@ async def run(self): if next_tok in encoding.stop_tokens_for_assistant_actions(): if len(self.parser.messages) > 0: last_message = self.parser.messages[-1] - if ( - self.use_browser_tool - and last_message.recipient is not None - and last_message.recipient.startswith("browser.") - ): - function_name = last_message.recipient[len("browser.") :] + browser_recipient, is_browser_fallback = ( + self._resolve_browser_recipient(last_message.recipient) + ) + if browser_recipient is not None and browser_tool is not None: + message_for_browser = ( + last_message + if not is_browser_fallback + else last_message.with_recipient(browser_recipient) + ) + function_name = browser_recipient[len("browser.") :] action = None - parsed_args = browser_tool.process_arguments(last_message) + parsed_args = browser_tool.process_arguments( + message_for_browser + ) if function_name == "search": action = WebSearchActionSearch( type="search", @@ -810,17 +898,19 @@ async def run(self): ), ) ) - yield self._send_event( - ResponseWebSearchCallInProgress( - type="response.web_search_call.in_progress", - output_index=current_output_index, - id=web_search_call_id, - ) + yield self._send_event( + ResponseWebSearchCallInProgress( + type="response.web_search_call.in_progress", + output_index=current_output_index, + item_id=web_search_call_id, ) + ) async def run_tool(): results = [] - async for msg in browser_tool.process(last_message): + async for msg in browser_tool.process( + message_for_browser + ): results.append(msg) return results @@ -828,7 +918,7 @@ async def run_tool(): ResponseWebSearchCallSearching( type="response.web_search_call.searching", output_index=current_output_index, - id=web_search_call_id, + item_id=web_search_call_id, ) ) result = await run_tool() @@ -852,7 +942,7 @@ async def run_tool(): ResponseWebSearchCallCompleted( type="response.web_search_call.completed", output_index=current_output_index, - id=web_search_call_id, + item_id=web_search_call_id, ) ) yield self._send_event( @@ -1030,6 +1120,7 @@ async def run_python_tool(): python_call_ids=self.python_call_ids, python_call_outputs=self.python_call_outputs, reasoning_ids=self.reasoning_item_ids, + message_ids=self.message_item_ids, treat_functions_python_as_builtin=self.functions_python_as_builtin, ) if self.store_callback and self.request_body.store: diff --git a/gpt_oss/responses_api/types.py b/gpt_oss/responses_api/types.py index 6a5e04c0..2250aa28 100644 --- a/gpt_oss/responses_api/types.py +++ b/gpt_oss/responses_api/types.py @@ -43,6 +43,7 @@ class ReasoningItem(BaseModel): class Item(BaseModel): + id: Optional[str] = None type: Optional[Literal["message"]] = "message" role: Literal["user", "assistant", "system"] content: Union[list[TextContentItem], str] diff --git a/gpt_oss/tools/python_docker/docker_tool.py b/gpt_oss/tools/python_docker/docker_tool.py index 91704fa2..1e3535ac 100644 --- a/gpt_oss/tools/python_docker/docker_tool.py +++ b/gpt_oss/tools/python_docker/docker_tool.py @@ -1,11 +1,15 @@ # Run this before running the tool: # $ docker image pull python:3.11 +import asyncio +import contextlib import io -import tarfile -from typing import Any, AsyncIterator -import tempfile import os +import queue import subprocess +import tarfile +import tempfile +from pathlib import Path +from typing import Any, AsyncIterator import docker from openai_harmony import ( @@ -21,10 +25,17 @@ _docker_client = None -PYTHON_EXECUTION_BACKEND = "docker" +VALID_EXECUTION_BACKENDS = { + "docker", + "dangerously_use_uv", + "dangerously_use_local_jupyter", +} + +_default_backend = os.environ.get("PYTHON_EXECUTION_BACKEND", "docker") +if _default_backend not in VALID_EXECUTION_BACKENDS: + _default_backend = "docker" -if os.environ.get("PYTHON_EXECUTION_BACKEND") == "dangerously_use_uv": - PYTHON_EXECUTION_BACKEND = "dangerously_use_uv" +PYTHON_EXECUTION_BACKEND = _default_backend def call_python_script(script: str) -> str: @@ -87,13 +98,184 @@ def call_python_script_with_uv(script: str) -> str: ) +class LocalJupyterSession: + """Stateful helper that proxies execution through a local Jupyter kernel.""" + + def __init__( + self, + connection_file: str | None = None, + *, + timeout: float = 120.0, + ) -> None: + try: + from jupyter_client import BlockingKernelClient, KernelManager + except ImportError as exc: # pragma: no cover - optional dependency + raise RuntimeError( + "The dangerously_use_local_jupyter backend requires the jupyter_client package to be installed." + ) from exc + + self._default_timeout = timeout + self._owns_kernel = False + self._client: BlockingKernelClient + self._km: KernelManager | None = None + + if connection_file: + connection_path = Path(connection_file).expanduser() + if not connection_path.exists(): + raise FileNotFoundError( + f"Cannot find Jupyter connection file at '{connection_path}'." + ) + client = BlockingKernelClient() + client.load_connection_file(str(connection_path)) + client.start_channels() + # Ensure the connection is ready before executing. + client.wait_for_ready(timeout=self._default_timeout) + self._client = client + else: + km = KernelManager() + km.start_kernel() + client = km.blocking_client() + client.start_channels() + client.wait_for_ready(timeout=self._default_timeout) + self._client = client + self._km = km + self._owns_kernel = True + + def execute(self, code: str, *, timeout: float | None = None) -> str: + """Execute code in the kernel, returning combined stdout/stderr output.""" + + client = self._client + effective_timeout = timeout or self._default_timeout + msg_id = client.execute( + code, + store_history=True, + allow_stdin=False, + stop_on_error=False, + ) + + stdout_parts: list[str] = [] + stderr_parts: list[str] = [] + + while True: + try: + msg = client.get_iopub_msg(timeout=effective_timeout) + except queue.Empty as exc: + raise TimeoutError("Timed out waiting for Jupyter kernel output.") from exc + + if msg.get("parent_header", {}).get("msg_id") != msg_id: + continue + + msg_type = msg.get("msg_type") + content = msg.get("content", {}) + + if msg_type == "stream": + text = content.get("text", "") + if content.get("name") == "stdout": + stdout_parts.append(text) + else: + stderr_parts.append(text) + elif msg_type == "error": + traceback_data = content.get("traceback") + if traceback_data: + stderr_parts.append("\n".join(traceback_data)) + else: + ename = content.get("ename", "") + evalue = content.get("evalue", "") + stderr_parts.append(f"{ename}: {evalue}".strip()) + elif msg_type in {"execute_result", "display_data"}: + data = content.get("data", {}) + text = data.get("text/plain") + if text: + stdout_parts.append(text if text.endswith("\n") else f"{text}\n") + elif msg_type == "status" and content.get("execution_state") == "idle": + break + + # Drain the shell channel to capture final execution status. + while True: + try: + reply = client.get_shell_msg(timeout=effective_timeout) + except queue.Empty as exc: + raise TimeoutError( + "Timed out waiting for Jupyter kernel execution reply." + ) from exc + + if reply.get("parent_header", {}).get("msg_id") != msg_id: + continue + + reply_content = reply.get("content", {}) + if reply_content.get("status") == "error": + traceback_data = reply_content.get("traceback") + if traceback_data: + stderr_parts.append("\n".join(traceback_data)) + else: + ename = reply_content.get("ename", "") + evalue = reply_content.get("evalue", "") + stderr_parts.append(f"{ename}: {evalue}".strip()) + break + + stdout = "".join(stdout_parts) + stderr = "".join(stderr_parts) + + if stderr: + if stdout: + stdout = f"{stdout.rstrip()}\n{stderr}" + else: + stdout = stderr + + if not stdout.strip(): + stdout = ( + "[WARN] No output available. Use print() to output anything to stdout to " + "receive the output" + ) + + return stdout + + def close(self) -> None: + with contextlib.suppress(Exception): + self._client.stop_channels() + + if self._owns_kernel and self._km is not None: + with contextlib.suppress(Exception): + self._km.shutdown_kernel(now=True) + + def __del__(self) -> None: # pragma: no cover - best-effort cleanup + self.close() + class PythonTool(Tool): def __init__( self, name: str = "python", + *, + execution_backend: str | None = None, + local_jupyter_connection_file: str | None = None, + local_jupyter_timeout: float = 60.0, ): assert name == "python" + backend = execution_backend or PYTHON_EXECUTION_BACKEND + if backend not in VALID_EXECUTION_BACKENDS: + raise ValueError( + "execution_backend must be one of: " + + ", ".join(sorted(VALID_EXECUTION_BACKENDS)) + ) + + self._execution_backend = backend + self._local_jupyter_connection_file = ( + local_jupyter_connection_file + or os.environ.get("PYTHON_LOCAL_JUPYTER_CONNECTION_FILE") + ) + self._local_jupyter_timeout = local_jupyter_timeout + + self._jupyter_session: LocalJupyterSession | None = None + self._execution_lock: asyncio.Lock | None = None + + if self._execution_backend == "dangerously_use_local_jupyter": + self._execution_lock = asyncio.Lock() + self._jupyter_session = LocalJupyterSession( + connection_file=self._local_jupyter_connection_file, + timeout=self._local_jupyter_timeout, + ) + @classmethod def get_tool_name(cls) -> str: return "python" @@ -104,9 +286,17 @@ def name(self) -> str: @property def instruction(self) -> str: - return """ + if self._execution_backend == "dangerously_use_local_jupyter": + return """ Use this tool to execute Python code in your chain of thought. The code will not be shown to the user. This tool should be used for internal reasoning, but not for code that is intended to be visible to the user (e.g. when creating plots, tables, or files). +When you send a message containing Python code to python, it will be executed in a stateful Jupyter notebook environment. python will respond with the output of the execution or time out after 120.0 seconds. Internet access for this session is UNKNOWN. Depends on the cluster. + """.strip() + + return """ +Use this tool to execute STATELESS Python code in your chain of thought. The code will not be shown to the user. This tool should be used for internal reasoning, but not for code that is intended to be visible to the user (e.g. when creating plots, tables, or files). When you send a message containing python code to python, it will be executed in a stateless docker container, and the stdout of that process will be returned to you. You have to use print statements to access the output. + +IMPORTANT: Your python environment is not shared between calls. You will have to pass your entire code each time. """.strip() @property @@ -147,12 +337,34 @@ def make_response( async def _process(self, message: Message) -> AsyncIterator[Message]: script = message.content[0].text channel = message.channel - if PYTHON_EXECUTION_BACKEND == "docker": + + if self._execution_backend == "docker": output = call_python_script(script) - elif PYTHON_EXECUTION_BACKEND == "dangerously_use_uv": + elif self._execution_backend == "dangerously_use_uv": output = call_python_script_with_uv(script) + elif self._execution_backend == "dangerously_use_local_jupyter": + assert self._jupyter_session is not None + lock = self._execution_lock + if lock is not None: + async with lock: + try: + output = self._jupyter_session.execute(script) + except TimeoutError as exc: + output = f"[ERROR] {exc}" + else: + try: + output = self._jupyter_session.execute(script) + except TimeoutError as exc: + output = f"[ERROR] {exc}" else: raise ValueError( - f"Invalid PYTHON_EXECUTION_BACKEND: {PYTHON_EXECUTION_BACKEND}" + f"Invalid PYTHON_EXECUTION_BACKEND: {self._execution_backend}" ) yield self._make_response(output, channel=channel) + + def close(self) -> None: + if self._jupyter_session is not None: + self._jupyter_session.close() + + def __del__(self) -> None: # pragma: no cover - best-effort cleanup + self.close() diff --git a/pyproject.toml b/pyproject.toml index da46bd9f..d2595a16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "uvicorn>=0.35.0", "requests>=2.31.0", "termcolor", + "jupyter-client>=8.6.3", ] readme = "README.md" requires-python = ">=3.12"