-
Notifications
You must be signed in to change notification settings - Fork 479
Description
Check Existing Issues
- I have searched for any existing and/or related issues.
- I have searched for any existing and/or related discussions.
- I have also searched in the CLOSED issues AND CLOSED discussions and found no related items (your issue might already be addressed on the development branch!).
- I am using the latest version of Open WebUI.
Installation Method
Git Clone
Open WebUI Version
v.0.6.34
Ollama Version (if applicable)
No response
Operating System
Ubuntu 24.04
Browser (if applicable)
No response
Confirmation
- I have read and followed all instructions in
README.md. - I am using the latest version of both Open WebUI and Ollama.
- I have included the browser console logs.
- I have included the Docker container logs.
- I have provided every relevant configuration, setting, and environment variable used in my setup.
- I have clearly listed every relevant configuration, custom setting, environment variable, and command-line option that influences my setup (such as Docker Compose overrides, .env values, browser settings, authentication configurations, etc).
- I have documented step-by-step reproduction instructions that are precise, sequential, and leave nothing to interpretation. My steps:
- Start with the initial platform/version/OS and dependencies used,
- Specify exact install/launch/configure commands,
- List URLs visited, user input (incl. example values/emails/passwords if needed),
- Describe all options and toggles enabled or changed,
- Include any files or environmental changes,
- Identify the expected and actual result at each stage,
- Ensure any reasonably skilled user can follow and hit the same issue.
Expected Behavior
When a tool have defined a specific version of a required package it can be used.
Actual Behavior
When different tools (of same or different users) use different versions of same packages not allways it can be used by the tool.
When Open WebUI starts, it collects all requirements from active functions and admin tools into a single comma-separated string, then installs them all at once with a single pip install command.
For example, if:
User A's tool specifies requirements: pandas==1.5.0
Your tool specifies requirements: pandas==2.0.0
The system concatenates these into: "pandas==1.5.0, pandas==2.0.0" and runs: pip install pandas==1.5.0 pandas==2.0.0
But the Installation order is non-deterministic - it depends on the order functions/tools are retrieved from the database
Steps to Reproduce
Define and active different tools that use different versions of same package
Logs & Screenshots
By the moment I haven't problem with that, I wrote this issue as it is comment in discord by "giada sp" (https://discord.com/channels/1170866489302188073/1210732510250541067/1439950854466568273 and subsequent )
Additional Information
There are differents ways to solve it, as use OpenAPI tool servers, Pipelines, isolate tools in own docker, virtual environment per tool,....
Since it seems that the most robust integration involves having virtual environments per tool, I outline a possible implementation for this (with shared virtual environment pool that maintains long-running Python processes, one per tool, to avoid the subprocess spawn overhead on each tool invocation):
- Add a ToolWorker and ToolWorkerPool classes in backend/open_webui/utils/plugin.py
import asyncio
import json
import venv
import subprocess
from pathlib import Path
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class ToolWorker:
"""Represents a persistent worker process for a tool."""
tool_id: str
venv_dir: Path
process: asyncio.subprocess.Process
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
class ToolWorkerPool:
"""Manages a pool of persistent tool worker processes."""
def __init__(self):
self.workers: Dict[str, ToolWorker] = {}
self.lock = asyncio.Lock()
async def get_or_create_worker(
self,
tool_id: str,
content: str,
requirements: str
) -> ToolWorker:
"""Get existing worker or create new one."""
async with self.lock:
if tool_id in self.workers:
# Check if worker is still alive
if self.workers[tool_id].process.returncode is None:
return self.workers[tool_id]
else:
# Worker died, remove it
del self.workers[tool_id]
# Create new worker
worker = await self._create_worker(tool_id, content, requirements)
self.workers[tool_id] = worker
return worker
async def _create_worker(
self,
tool_id: str,
content: str,
requirements: str
) -> ToolWorker:
"""Create a new persistent worker process."""
venv_dir = CACHE_DIR / "tool_venvs" / tool_id
# Create venv if needed
if not venv_dir.exists():
log.info(f"Creating venv for tool {tool_id}")
venv.create(venv_dir, with_pip=True)
# Install requirements
if requirements:
pip_executable = venv_dir / "bin" / "pip"
req_list = [req.strip() for req in requirements.split(",")]
subprocess.check_call(
[str(pip_executable), "install"] + req_list
)
# Create worker script that runs as a persistent server
worker_script = venv_dir / f"worker_{tool_id}.py"
worker_script.write_text(f"""
import sys
import json
import types
import traceback
import asyncio
import inspect
# Load the tool module once at startup
module = types.ModuleType('tool_{tool_id}')
exec('''{content}''', module.__dict__)
tool_instance = module.Tools()
# Main loop
while True:
try:
line = sys.stdin.readline()
if not line:
break
command = json.loads(line)
method_name = command['method']
args = command['args']
# THIS IS WHERE THE ASYNC HANDLING GOES - inside the worker script
method = getattr(tool_instance, method_name)
if inspect.iscoroutinefunction(method):
result = asyncio.run(method(**args))
else:
result = method(**args)
response = {{"status": "success", "result": result}}
print(json.dumps(response), flush=True)
except Exception as e:
response = {{
"status": "error",
"error": str(e),
"traceback": traceback.format_exc()
}}
print(json.dumps(response), flush=True)
""")
# Start the worker process
python_executable = venv_dir / "bin" / "python"
process = await asyncio.create_subprocess_exec(
str(python_executable),
str(worker_script),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
return ToolWorker(
tool_id=tool_id,
venv_dir=venv_dir,
process=process,
reader=process.stdout,
writer=process.stdin,
)
async def execute_tool_method(
self,
tool_id: str,
content: str,
requirements: str,
method_name: str,
args: dict,
timeout: float = 30.0
) -> Any:
"""Execute a tool method using the worker pool."""
worker = await self.get_or_create_worker(tool_id, content, requirements)
# Send command to worker
command = {
"method": method_name,
"args": args
}
worker.writer.write((json.dumps(command) + "\n").encode())
await worker.writer.drain()
# Read response with timeout
try:
response_line = await asyncio.wait_for(
worker.reader.readline(),
timeout=timeout
)
response = json.loads(response_line.decode())
if response["status"] == "success":
return response["result"]
else:
raise Exception(
f"Tool execution failed: {response['error']}\n"
f"{response.get('traceback', '')}"
)
except asyncio.TimeoutError:
# Kill the worker and remove from pool
worker.process.kill()
async with self.lock:
del self.workers[tool_id]
raise Exception(f"Tool execution timed out after {timeout}s")
async def shutdown(self):
"""Shutdown all worker processes."""
async with self.lock:
for worker in self.workers.values():
worker.process.terminate()
await worker.process.wait()
self.workers.clear()
# Global pool instance
_tool_worker_pool = None
def get_tool_worker_pool() -> ToolWorkerPool:
"""Get or create the global tool worker pool."""
global _tool_worker_pool
if _tool_worker_pool is None:
_tool_worker_pool = ToolWorkerPool()
return _tool_worker_pool
- Modify load_tool_module_by_id() to use the pool (in backend/open_webui/utils/plugin.py)
async def load_tool_module_by_id(tool_id, content=None):
"""Modified to use worker pool."""
if content is None:
tool = Tools.get_tool_by_id(tool_id)
if not tool:
raise Exception(f"Toolkit not found: {tool_id}")
content = tool.content
content = replace_imports(content)
Tools.update_tool_by_id(tool_id, {"content": content})
frontmatter = extract_frontmatter(content)
requirements = frontmatter.get("requirements", "")
# Return async proxy that uses the worker pool
class AsyncToolProxy:
def __init__(self, tool_id, content, requirements):
self.tool_id = tool_id
self.content = content
self.requirements = requirements
self.pool = get_tool_worker_pool()
def __getattr__(self, method_name):
async def method_wrapper(**kwargs):
return await self.pool.execute_tool_method(
self.tool_id,
self.content,
self.requirements,
method_name,
kwargs
)
return method_wrapper
return AsyncToolProxy(tool_id, content, requirements), frontmatter
- Add lifecycle management in lifespan() (in backend/open_webui/main.py)
@asynccontextmanager
async def lifespan(app: FastAPI):
# ... existing startup code ...
# Initialize worker pool
app.state.tool_worker_pool = get_tool_worker_pool()
yield
# Shutdown worker pool
await app.state.tool_worker_pool.shutdown()
# ... existing shutdown code ...
This shared venv pool implementation provides a good balance between isolation and performance. It's more complex than a simple subprocess approach but significantly faster. The worker processes are created lazily (on first use) and persist for the lifetime of the application.
This is just a sketch, for production it should also be implemented a:
- Worker health checks and automatic restart.
- Configurable pool size limits.
- Metrics/monitoring for worker performance.
- Graceful degradation if workers fail.