Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 76 additions & 23 deletions api/services/extension_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import platform
import queue
import re
import subprocess
import sys
import threading
Expand All @@ -20,6 +21,10 @@
from typing import Callable, Optional

_RUNNER_PATH = Path(__file__).parent.parent / "runner.py"
_MISSING_MODULE_RE = re.compile(r"No module named ['\"]([^'\"]+)['\"]")
_AUTO_REPAIR_PACKAGE_MAP = {
"PIL": "Pillow",
}


def _venv_python(ext_dir: Path) -> Path:
Expand Down Expand Up @@ -91,35 +96,83 @@ def _start(self) -> None:
"Run the extension's setup.py first."
)

self._proc = subprocess.Popen(
[str(python), str(_RUNNER_PATH)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=self._build_env(),
)
for attempt in range(3):
self._queue = queue.Queue()
self._proc = subprocess.Popen(
[str(python), str(_RUNNER_PATH)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=self._build_env(),
)

# Background thread: read stdout → queue
reader = threading.Thread(target=self._read_loop, daemon=True)
reader.start()

# Background thread: read stdout → queue
reader = threading.Thread(target=self._read_loop, daemon=True)
reader.start()
# Background thread: forward stderr to our stderr
stderr_fwd = threading.Thread(target=self._stderr_loop, daemon=True)
stderr_fwd.start()

# Background thread: forward stderr to our stderr
stderr_fwd = threading.Thread(target=self._stderr_loop, daemon=True)
stderr_fwd.start()
# Wait for ready — runner sends params_schema in this message
msg = self._recv(timeout=None)
if msg.get("type") == "ready":
# Override params_schema with what the generator class actually declares
if msg.get("params_schema"):
self._params_schema = msg["params_schema"]

print(f"[ExtensionProcess] {self.MODEL_ID} subprocess started (pid {self._proc.pid})")
return

# Wait for ready — runner sends params_schema in this message
msg = self._recv(timeout=None)
if msg.get("type") != "ready":
self._proc.kill()
raise RuntimeError(f"[{self.MODEL_ID}] Expected 'ready', got: {msg}")
missing_module = self._extract_missing_module(msg)
package_name = self._resolve_auto_repair_package(missing_module) if missing_module else None
if package_name and attempt < 2:
self._install_missing_package(python, missing_module, package_name)
continue

# Override params_schema with what the generator class actually declares
if msg.get("params_schema"):
self._params_schema = msg["params_schema"]
raise RuntimeError(f"[{self.MODEL_ID}] Expected 'ready', got: {msg}")

print(f"[ExtensionProcess] {self.MODEL_ID} subprocess started (pid {self._proc.pid})")
def _extract_missing_module(self, msg: dict) -> Optional[str]:
"""Returns missing import name from a runner error payload, if present."""
blob = f"{msg.get('message', '')}\n{msg.get('traceback', '')}"
match = _MISSING_MODULE_RE.search(blob)
return match.group(1) if match else None

def _resolve_auto_repair_package(self, module_name: str) -> Optional[str]:
"""
Maps a missing import name to a pip package for safe auto-repair.

Important: do not guess package names for arbitrary missing modules,
because that can install wrong packages and break environments.
"""
if module_name in _AUTO_REPAIR_PACKAGE_MAP:
return _AUTO_REPAIR_PACKAGE_MAP[module_name]
root = module_name.split(".")[0]
return _AUTO_REPAIR_PACKAGE_MAP.get(root)

def _install_missing_package(self, python: Path, module_name: str, package_name: str) -> None:
"""Best-effort auto-repair for a known missing import in extension venv."""
print(
f"[ExtensionProcess] {self.MODEL_ID} missing module '{module_name}' "
f"-> installing '{package_name}'"
)
try:
subprocess.run(
[str(python), "-m", "pip", "install", package_name],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except subprocess.CalledProcessError as exc:
details = (exc.stderr or exc.stdout or "").strip()
raise RuntimeError(
f"[{self.MODEL_ID}] Auto-repair failed while installing '{package_name}' "
f"for missing module '{module_name}'.\n{details[-2000:]}"
) from exc

def _read_loop(self) -> None:
"""Continuously reads stdout and pushes parsed JSON to the queue."""
Expand Down
Loading