diff --git a/README.md b/README.md index 4501212..72d9d72 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,9 @@ the rest of the operating system. - **Complete Results**: Captures standard output, standard error, and return values - **Asynchronous Support**: Runs async code properly - **Error Handling**: Provides detailed error reports for debugging +- **Timeouts**: Supports execution time limits to prevent long-running code. Default is 60s, can be customised via CLI +- **File Output**: Can output and return files & images. Useful for things like generating graphs. + - **Important:** Disabled by default for backwards compatibility! _(This code was previously part of [Pydantic AI](https://github.com/pydantic/pydantic-ai) but was moved to a separate repo to make it easier to maintain.)_ @@ -34,7 +37,7 @@ To use this server, you must have both Python and [Deno](https://deno.com/) inst The server can be run with `deno` installed using `uvx`: ```bash -uvx mcp-run-python [-h] [--version] [--port PORT] [--deps DEPS] {stdio,streamable-http,example} +uvx mcp-run-python [-h] [--version] [--port PORT] [--deps DEPS] [--enable-file-outputs] {stdio,streamable-http,example} ``` where: @@ -49,6 +52,13 @@ where: - `example` will run a minimal Python script using `numpy`, useful for checking that the package is working, for the code to run successfully, you'll need to install `numpy` using `uvx mcp-run-python --deps numpy example` +--- + +For all available options, +```bash +uvx mcp-run-python --help +``` + ## Usage with Pydantic AI Then you can use `mcp-run-python` with Pydantic AI: @@ -105,7 +115,7 @@ logfire.instrument_pydantic_ai() async def main(): - async with async_prepare_deno_env('stdio') as deno_env: + async with async_prepare_deno_env('stdio', enable_file_outputs=True) as deno_env: server = MCPServerStdio('deno', args=deno_env.args, cwd=deno_env.cwd, timeout=10) agent = Agent('claude-3-5-haiku-latest', toolsets=[server]) async with agent: @@ -165,3 +175,10 @@ edit the filesystem. * `deno` is then run with read-only permissions to the `node_modules` directory to run untrusted code. Dependencies must be provided when initializing the server so they can be installed in the first step. + +## File Outputs + +`mcp_run_python` supports outputting files using [embedded resources](https://modelcontextprotocol.io/specification/2025-06-18/server/tools#embedded-resources). +This can be very useful when having the LLM do complex calculation to create some csv file, or when the code it writes generates binary blobs like images - for example when interacting with matplotlib. + +To preserve backwards compatibility, this is an **opt-in** feature and needs to be enabled. Pass `--enable-file-outputs` to your run command to enable this. diff --git a/examples/direct.py b/examples/direct.py index 2f9819c..a1d3f28 100644 --- a/examples/direct.py +++ b/examples/direct.py @@ -12,7 +12,7 @@ async def main(): - async with async_prepare_deno_env('stdio', dependencies=['numpy']) as deno_env: + async with async_prepare_deno_env('stdio', dependencies=['numpy'], enable_file_outputs=True) as deno_env: server_params = StdioServerParameters(command='deno', args=deno_env.args, cwd=deno_env.cwd) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: diff --git a/mcp_run_python/_cli.py b/mcp_run_python/_cli.py index 89258c5..03fea22 100644 --- a/mcp_run_python/_cli.py +++ b/mcp_run_python/_cli.py @@ -27,6 +27,25 @@ def cli_logic(args_list: Sequence[str] | None = None) -> int: '--disable-networking', action='store_true', help='Disable networking during execution of python code' ) parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') + parser.add_argument('--enable-file-outputs', action='store_true', help='Enable file output functionality') + parser.add_argument( + '--pyodide-max-workers', + help='How many pyodide workers should be spawned at a time max. This is the amount of concurrent function executions you can have. Default: 10', + default=10, + type=int, + ) + parser.add_argument( + '--pyodide-code-run-timeout-sec', + help='How long the code execution is allowed to last. Default: 60 seconds', + default=60, + type=int, + ) + parser.add_argument( + '--pyodide-worker-wait-timeout-sec', + help='How many long pyodide should wait for a free worker. Default: 60 seconds', + default=60, + type=int, + ) parser.add_argument('--version', action='store_true', help='Show version and exit') parser.add_argument( 'mode', @@ -53,6 +72,10 @@ def cli_logic(args_list: Sequence[str] | None = None) -> int: http_port=args.port, dependencies=deps, deps_log_handler=deps_log_handler, + enable_file_outputs=args.enable_file_outputs, + pyodide_max_workers=args.pyodide_max_workers, + pyodide_worker_wait_timeout_sec=args.pyodide_worker_wait_timeout_sec, + pyodide_code_run_timeout_sec=args.pyodide_code_run_timeout_sec, ) return return_code else: diff --git a/mcp_run_python/code_sandbox.py b/mcp_run_python/code_sandbox.py index f552861..19a7d89 100644 --- a/mcp_run_python/code_sandbox.py +++ b/mcp_run_python/code_sandbox.py @@ -56,6 +56,10 @@ async def code_sandbox( dependencies: list[str] | None = None, log_handler: LogHandler | None = None, allow_networking: bool = True, + enable_file_outputs: bool = False, + pyodide_max_workers: int = 10, + pyodide_worker_wait_timeout_sec: int = 60, + pyodide_code_run_timeout_sec: int = 60, ) -> AsyncIterator['CodeSandbox']: """Create a secure sandbox. @@ -64,6 +68,10 @@ async def code_sandbox( log_handler: A callback function to handle print statements when code is running. deps_log_handler: A callback function to run on log statements during initial install of dependencies. allow_networking: Whether to allow networking or not while executing python code. + enable_file_outputs: Whether to enable output files + pyodide_max_workers: How many pyodide workers to max use at the same time + pyodide_worker_wait_timeout_sec: How long to wait for a free pyodide worker in seconds. + pyodide_code_run_timeout_sec: How long to wait for pyodide code to run in seconds. """ async with async_prepare_deno_env( 'stdio', @@ -71,6 +79,10 @@ async def code_sandbox( deps_log_handler=log_handler, return_mode='json', allow_networking=allow_networking, + enable_file_outputs=enable_file_outputs, + pyodide_max_workers=pyodide_max_workers, + pyodide_code_run_timeout_sec=pyodide_code_run_timeout_sec, + pyodide_worker_wait_timeout_sec=pyodide_worker_wait_timeout_sec, ) as deno_env: server_params = StdioServerParameters(command='deno', args=deno_env.args, cwd=deno_env.cwd) diff --git a/mcp_run_python/deno/deno.jsonc b/mcp_run_python/deno/deno.jsonc index 57866be..58e4378 100644 --- a/mcp_run_python/deno/deno.jsonc +++ b/mcp_run_python/deno/deno.jsonc @@ -17,7 +17,9 @@ "imports": { "@modelcontextprotocol/sdk": "npm:@modelcontextprotocol/sdk@^1.17.5", "@std/cli": "jsr:@std/cli@^1.0.15", + "@std/encoding": "jsr:@std/encoding@^1.0.10", "@std/path": "jsr:@std/path@^1.0.8", + "mime-types": "npm:mime-types@^3.0.1", "pyodide": "npm:pyodide@0.28.2", "zod": "npm:zod@^3.24.4" }, diff --git a/mcp_run_python/deno/deno.lock b/mcp_run_python/deno/deno.lock index 4138faa..2ab1021 100644 --- a/mcp_run_python/deno/deno.lock +++ b/mcp_run_python/deno/deno.lock @@ -2,14 +2,30 @@ "version": "5", "specifiers": { "jsr:@std/cli@^1.0.15": "1.0.20", + "jsr:@std/encoding@^1.0.10": "1.0.10", + "jsr:@std/internal@^1.0.10": "1.0.12", + "jsr:@std/path@^1.0.8": "1.1.2", "npm:@modelcontextprotocol/sdk@^1.17.5": "1.17.5_express@5.1.0_zod@3.25.76", "npm:@types/node@22.12.0": "22.12.0", + "npm:mime-types@^3.0.1": "3.0.1", "npm:pyodide@0.28.2": "0.28.2", "npm:zod@^3.24.4": "3.25.76" }, "jsr": { "@std/cli@1.0.20": { "integrity": "a8c384a2c98cec6ec6a2055c273a916e2772485eb784af0db004c5ab8ba52333" + }, + "@std/encoding@1.0.10": { + "integrity": "8783c6384a2d13abd5e9e87a7ae0520a30e9f56aeeaa3bdf910a3eaaf5c811a1" + }, + "@std/internal@1.0.12": { + "integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027" + }, + "@std/path@1.1.2": { + "integrity": "c0b13b97dfe06546d5e16bf3966b1cadf92e1cc83e56ba5476ad8b498d9e3038", + "dependencies": [ + "jsr:@std/internal" + ] } }, "npm": { @@ -527,8 +543,10 @@ "workspace": { "dependencies": [ "jsr:@std/cli@^1.0.15", + "jsr:@std/encoding@^1.0.10", "jsr:@std/path@^1.0.8", "npm:@modelcontextprotocol/sdk@^1.17.5", + "npm:mime-types@^3.0.1", "npm:pyodide@0.28.2", "npm:zod@^3.24.4" ] diff --git a/mcp_run_python/deno/src/main.ts b/mcp_run_python/deno/src/main.ts index d00d549..5b49031 100644 --- a/mcp_run_python/deno/src/main.ts +++ b/mcp_run_python/deno/src/main.ts @@ -20,17 +20,49 @@ const VERSION = '0.0.13' export async function main() { const { args } = Deno const flags = parseArgs(Deno.args, { - string: ['deps', 'return-mode', 'port'], - default: { port: '3001', 'return-mode': 'xml' }, + string: [ + 'deps', + 'return-mode', + 'port', + 'pyodide-max-workers', + 'pyodide-code-run-timeout-sec', + 'pyodide-worker-wait-timeout-sec', + ], + boolean: ['enable-file-outputs'], + default: { + port: '3001', + 'return-mode': 'xml', + 'enable-file-outputs': false, + 'pyodide-max-workers': '10', + 'pyodide-code-run-timeout-sec': '60', + 'pyodide-worker-wait-timeout-sec': '60', + }, }) + + console.debug(flags) const deps = flags.deps?.split(',') ?? [] if (args.length >= 1) { if (args[0] === 'stdio') { - await runStdio(deps, flags['return-mode']) + await runStdio( + deps, + flags['return-mode'], + flags['enable-file-outputs'], + parseInt(flags['pyodide-max-workers']), + parseInt(flags['pyodide-code-run-timeout-sec']), + parseInt(flags['pyodide-worker-wait-timeout-sec']), + ) return } else if (args[0] === 'streamable_http') { const port = parseInt(flags.port) - runStreamableHttp(port, deps, flags['return-mode']) + runStreamableHttp( + port, + deps, + flags['return-mode'], + flags['enable-file-outputs'], + parseInt(flags['pyodide-max-workers']), + parseInt(flags['pyodide-code-run-timeout-sec']), + parseInt(flags['pyodide-worker-wait-timeout-sec']), + ) return } else if (args[0] === 'example') { await example(deps) @@ -57,7 +89,14 @@ options: /* * Create an MCP server with the `run_python_code` tool registered. */ -function createServer(deps: string[], returnMode: string): McpServer { +function createServer( + deps: string[], + returnMode: string, + enableFileOutputs: boolean, + pyodideMaxWorkers: number, + pyodideCodeRunTimeoutSec: number, + pyodideWorkerWaitTimeoutSec: number, +): McpServer { const runCode = new RunCode() const server = new McpServer( { @@ -72,12 +111,17 @@ function createServer(deps: string[], returnMode: string): McpServer { }, ) - const toolDescription = `Tool to execute Python code and return stdout, stderr, and return value. + let toolDescription = `Tool to execute Python code and return stdout, stderr, and return value. -The code may be async, and the value on the last line will be returned as the return value. - -The code will be executed with Python 3.13. +### Guidelines +- The code may be async, and the value on the last line will be returned as the return value. +- The code will be executed with Python 3.13 using pyodide - so adapt your code if needed. +- You code must be executed within a timeout. You have ${pyodideCodeRunTimeoutSec} seconds before the run is canceled. +- You have these python packages installed: \`${deps}\` ` + if (enableFileOutputs) { + toolDescription += '- To output files or images, save them in the "/output_files" folder.\n' + } let setLogLevel: LoggingLevel = 'emergency' @@ -110,11 +154,31 @@ The code will be executed with Python 3.13. { name: 'main.py', content: python_code }, global_variables, returnMode !== 'xml', + enableFileOutputs, + pyodideMaxWorkers, + pyodideCodeRunTimeoutSec, + pyodideWorkerWaitTimeoutSec, ) await Promise.all(logPromises) - return { - content: [{ type: 'text', text: returnMode === 'xml' ? asXml(result) : asJson(result) }], + const mcpResponse: any[] = [] + mcpResponse.push({ type: 'text', text: returnMode === 'xml' ? asXml(result) : asJson(result) }) + + // Add the Resources - if there are any + if (result.status === 'success') { + for (const entry of result.embeddedResources) { + mcpResponse.push({ + type: 'resource', + resource: { + uri: 'file://_', // not providing a file url, as enabling resources is optional according to MCP spec: https://modelcontextprotocol.io/specification/2025-06-18/server/tools#embedded-resources + name: entry.name, + mimeType: entry.mimeType, + blob: entry.blob, + }, + }) + } } + + return { content: mcpResponse } }, ) return server @@ -167,14 +231,30 @@ function httpSetJsonResponse(res: http.ServerResponse, status: number, text: str /* * Run the MCP server using the Streamable HTTP transport */ -function runStreamableHttp(port: number, deps: string[], returnMode: string) { +function runStreamableHttp( + port: number, + deps: string[], + returnMode: string, + enableFileOutputs: boolean, + pyodideMaxWorkers: number, + pyodideCodeRunTimeoutSec: number, + pyodideWorkerWaitTimeoutSec: number, +) { // https://github.com/modelcontextprotocol/typescript-sdk?tab=readme-ov-file#with-session-management - const mcpServer = createServer(deps, returnMode) + const mcpServer = createServer( + deps, + returnMode, + enableFileOutputs, + pyodideMaxWorkers, + pyodideCodeRunTimeoutSec, + pyodideWorkerWaitTimeoutSec, + ) const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {} const server = http.createServer(async (req, res) => { const url = httpGetUrl(req) let pathMatch = false + function match(method: string, path: string): boolean { if (url.pathname === path) { pathMatch = true @@ -252,8 +332,22 @@ function runStreamableHttp(port: number, deps: string[], returnMode: string) { /* * Run the MCP server using the Stdio transport. */ -async function runStdio(deps: string[], returnMode: string) { - const mcpServer = createServer(deps, returnMode) +async function runStdio( + deps: string[], + returnMode: string, + enableFileOutputs: boolean, + pyodideMaxWorkers: number, + pyodideCodeRunTimeoutSec: number, + pyodideWorkerWaitTimeoutSec: number, +) { + const mcpServer = createServer( + deps, + returnMode, + enableFileOutputs, + pyodideMaxWorkers, + pyodideCodeRunTimeoutSec, + pyodideWorkerWaitTimeoutSec, + ) const transport = new StdioServerTransport() await mcpServer.connect(transport) } diff --git a/mcp_run_python/deno/src/runCode.ts b/mcp_run_python/deno/src/runCode.ts index 1be8681..8d2bc13 100644 --- a/mcp_run_python/deno/src/runCode.ts +++ b/mcp_run_python/deno/src/runCode.ts @@ -1,6 +1,9 @@ // deno-lint-ignore-file no-explicit-any import { loadPyodide, type PyodideInterface } from 'pyodide' import { preparePythonCode } from './prepareEnvCode.ts' +import { randomBytes } from 'node:crypto' +import mime from 'mime-types' +import { encodeBase64 } from '@std/encoding/base64' import type { LoggingLevel } from '@modelcontextprotocol/sdk/types.js' export interface CodeFile { @@ -13,86 +16,171 @@ interface PrepResult { preparePyEnv: PreparePyEnv sys: any prepareStatus: PrepareSuccess | PrepareError + output: string[] } -export class RunCode { - private output: string[] = [] - private pyodide?: PyodideInterface - private preparePyEnv?: PreparePyEnv - private prepPromise?: Promise +interface PyodideWorker extends PrepResult { + id: number + pyodideInterruptBuffer: Uint8Array + inUse: boolean +} - async run( +/* + * Class that instantiates pyodide and keeps multiple instances + * There need to be multiple instances, as file system mounting to a standard directory (which is needed for the LLM), cannot be easily done without mixing files + * Now, every process has their own file system & they get looped around + */ +class PyodideAccess { + // context manager to get a pyodide instance (with timeout & max members) + public async withPyodideInstance( dependencies: string[], log: (level: LoggingLevel, data: string) => void, - file?: CodeFile, - globals?: Record, - alwaysReturnJson: boolean = false, - ): Promise { - let pyodide: PyodideInterface - let sys: any - let prepareStatus: PrepareSuccess | PrepareError | undefined - let preparePyEnv: PreparePyEnv - if (this.pyodide && this.preparePyEnv) { - pyodide = this.pyodide - preparePyEnv = this.preparePyEnv - sys = pyodide.pyimport('sys') + maximumInstances: number, + pyodideWorkerWaitTimeoutSec: number, + fn: (w: PyodideWorker) => Promise, + ): Promise { + const w = await this.getPyodideInstance(dependencies, log, maximumInstances, pyodideWorkerWaitTimeoutSec) + try { + return await fn(w) + } finally { + this.releasePyodideInstance(w.id) + } + } + + private pyodideInstances: { [workerId: number]: PyodideWorker } = {} + private nextWorkerId = 1 + private creatingCount = 0 + + // after code is run, this releases the worker again to the pool for other codes to run + private releasePyodideInstance(workerId: number): void { + const worker = this.pyodideInstances[workerId] + if (!worker) { + throw new Error(`Trying to release unknown pyodide worker ${workerId}`) + } + + // clear interrupt buffer in case it was used & clear output + worker.pyodideInterruptBuffer[0] = 0 + worker.output.length = 0 + + // if sb is waiting, take the first worker from queue & keep status as "inUse" + const waiter = this.waitQueue.shift() + if (waiter) { + clearTimeout(waiter.timer) + worker.inUse = true + waiter.resolve(worker) } else { - if (!this.prepPromise) { - this.prepPromise = this.prepEnv(dependencies, log) - } - // TODO is this safe if the promise has already been accessed? it seems to work fine - const prep = await this.prepPromise - pyodide = prep.pyodide - preparePyEnv = prep.preparePyEnv - sys = prep.sys - prepareStatus = prep.prepareStatus + worker.inUse = false } + } - if (prepareStatus && prepareStatus.kind == 'error') { - return { - status: 'install-error', - output: this.takeOutput(sys), - error: prepareStatus.message, - } - } else if (file) { + // main logic of getting a pyodide instance. Will reuse if possible, otherwise create (up to limit) + private async getPyodideInstance( + dependencies: string[], + log: (level: LoggingLevel, data: string) => void, + maximumInstances: number, + pyodideWorkerWaitTimeoutSec: number, + ): Promise { + // 1) if possible, take a free - already initialised - worker + const free = this.tryAcquireFree() + if (free) return free + + // 2) if none is free, check that we are not over capacity already + const currentCount = Object.keys(this.pyodideInstances).length + if (currentCount + this.creatingCount < maximumInstances) { + this.creatingCount++ try { - const rawValue = await pyodide.runPythonAsync(file.content, { - globals: pyodide.toPy({ ...(globals || {}), __name__: '__main__' }), - filename: file.name, - }) - return { - status: 'success', - output: this.takeOutput(sys), - returnValueJson: preparePyEnv.dump_json(rawValue, alwaysReturnJson), - } + const id = this.nextWorkerId++ + const worker = await this.createPyodideWorker(id, dependencies, log) + + // cool, created a new one so let's use that one + worker.inUse = true + this.pyodideInstances[id] = worker + return worker } catch (err) { - return { - status: 'run-error', - output: this.takeOutput(sys), - error: formatError(err), + // Need to make sure the creation gets reduced again, so simply re-throwing + throw err + } finally { + this.creatingCount-- + } + } + + // 3) we have the maximum worker, poll periodically until timeout for a free one + return await new Promise((resolve, reject) => { + const start = Date.now() + const poll = () => { + const free = this.tryAcquireFree() + if (free) { + clearTimeout(timer) + resolve(free) + return + } + if (Date.now() - start >= pyodideWorkerWaitTimeoutSec * 1000) { + reject(new Error('Timeout: no free Pyodide worker')) + return } + timer = setTimeout(poll, 1000) } - } else { - return { - status: 'success', - output: this.takeOutput(sys), - returnValueJson: null, + let timer = setTimeout(poll, 1000) + this.waitQueue.push({ resolve, reject, timer }) + }) + } + + private waitQueue: { + resolve: (w: PyodideWorker) => void + reject: (e: unknown) => void + timer: ReturnType + }[] = [] + + private tryAcquireFree(): PyodideWorker | undefined { + for (const w of Object.values(this.pyodideInstances)) { + if (!w.inUse) { + w.inUse = true + return w } } + return undefined + } + + // this creates the pyodide worker from scratch + private async createPyodideWorker( + id: number, + dependencies: string[], + log: (level: LoggingLevel, data: string) => void, + ): Promise { + const prepPromise = this.prepEnv(dependencies, log) + const prep = await prepPromise + + // setup the interrupt buffer to be able to cancel the task + const interruptBuffer = new Uint8Array(new SharedArrayBuffer(1)) + prep.pyodide.setInterruptBuffer(interruptBuffer) + + return { + id, + pyodide: prep.pyodide, + pyodideInterruptBuffer: interruptBuffer, + sys: prep.sys, + prepareStatus: prep.prepareStatus, + preparePyEnv: prep.preparePyEnv, + output: prep.output, + inUse: false, + } } - async prepEnv( + // load pyodide and install dependencies + private async prepEnv( dependencies: string[], log: (level: LoggingLevel, data: string) => void, ): Promise { + const output: string[] = [] + const pyodide = await loadPyodide({ stdout: (msg) => { log('info', msg) - this.output.push(msg) + output.push(msg) }, stderr: (msg) => { log('warning', msg) - this.output.push(msg) + output.push(msg) }, }) @@ -104,7 +192,7 @@ export class RunCode { messageCallback: (msg: string) => log('debug', msg), errorCallback: (msg: string) => { log('error', msg) - this.output.push(`install error: ${msg}`) + output.push(`install error: ${msg}`) }, ...options, }) @@ -128,27 +216,167 @@ export class RunCode { preparePyEnv, sys, prepareStatus, + output, } } +} + +export class RunCode { + private pyodideAccess: PyodideAccess = new PyodideAccess() + + async run( + dependencies: string[], + log: (level: LoggingLevel, data: string) => void, + file?: CodeFile, + globals?: Record, + alwaysReturnJson: boolean = false, + enableFileOutputs: boolean = false, + pyodideMaxWorkers: number = 10, + pyodideCodeRunTimeoutSec: number = 60, + pyodideWorkerWaitTimeoutSec: number = 60, + ): Promise { + // get a pyodide instance for this job + try { + return await this.pyodideAccess.withPyodideInstance( + dependencies, + log, + pyodideMaxWorkers, + pyodideWorkerWaitTimeoutSec, + async (pyodideWorker) => { + if (pyodideWorker.prepareStatus && pyodideWorker.prepareStatus.kind == 'error') { + return { + status: 'install-error', + output: this.takeOutput(pyodideWorker), + error: pyodideWorker.prepareStatus.message, + } + } else if (file) { + try { + // defaults in case file output is not enabled + let folderPath = '' + let files: Resource[] = [] - private takeOutput(sys: any): string[] { - sys.stdout.flush() - sys.stderr.flush() - const output = this.output - this.output = [] - return output + if (enableFileOutputs) { + // make the temp file system for pyodide to use + const folderName = randomBytes(20).toString('hex').slice(0, 20) + folderPath = `./output_files/${folderName}` + await Deno.mkdir(folderPath, { recursive: true }) + pyodideWorker.pyodide.mountNodeFS('/output_files', folderPath) + } + + // run the code with pyodide including a timeout + let timeoutId: any + const rawValue = await Promise.race([ + pyodideWorker.pyodide.runPythonAsync(file.content, { + globals: pyodideWorker.pyodide.toPy({ ...(globals || {}), __name__: '__main__' }), + filename: file.name, + }), + new Promise((_, reject) => { + timeoutId = setTimeout(() => { + // after the time passes signal SIGINT to stop execution + // 2 stands for SIGINT + pyodideWorker.pyodideInterruptBuffer[0] = 2 + reject(new Error(`Timeout exceeded for python execution (${pyodideCodeRunTimeoutSec} sec)`)) + }, pyodideCodeRunTimeoutSec * 1000) + }), + ]) + clearTimeout(timeoutId) + + if (enableFileOutputs) { + // check files that got saved + files = await this.readAndDeleteFiles(folderPath) + pyodideWorker.pyodide.FS.unmount('/output_files') + } + + return { + status: 'success', + output: this.takeOutput(pyodideWorker), + returnValueJson: pyodideWorker.preparePyEnv.dump_json(rawValue, alwaysReturnJson), + embeddedResources: files, + } + } catch (err) { + try { + pyodideWorker.pyodide.FS.unmount('/output_files') + } catch (_) { + // we need to make sure unmount is attempted, but ignore errors here + } + + console.log(err) + return { + status: 'run-error', + output: this.takeOutput(pyodideWorker), + error: formatError(err), + } + } + } else { + return { + status: 'success', + output: this.takeOutput(pyodideWorker), + returnValueJson: null, + embeddedResources: [], + } + } + }, + ) + } catch (err) { + console.error(err) + return { + status: 'fatal-runtime-error', + output: [], + error: formatError(err), + } + } + } + + async readAndDeleteFiles(folderPath: string): Promise { + const results: Resource[] = [] + for await (const file of Deno.readDir(folderPath)) { + // Skip directories + if (!file.isFile) continue + + const fileName = file.name + const filePath = `${folderPath}/${fileName}` + const mimeType = mime.lookup(fileName) + const fileData = await Deno.readFile(filePath) + + // Convert binary to Base64 + const base64Encoded = encodeBase64(fileData) + + results.push({ + name: fileName, + mimeType: mimeType, + blob: base64Encoded, + }) + } + + // Now delete the file folder - otherwise they add up :) + await Deno.remove(folderPath, { recursive: true }) + + return results + } + + private takeOutput(pyodideWorker: PyodideWorker): string[] { + pyodideWorker.sys.stdout.flush() + pyodideWorker.sys.stderr.flush() + return [...pyodideWorker.output] } } +interface Resource { + name: string + mimeType: string + blob: string +} + interface RunSuccess { status: 'success' // we could record stdout and stderr separately, but I suspect simplicity is more important output: string[] returnValueJson: string | null + embeddedResources: Resource[] } interface RunError { - status: 'install-error' | 'run-error' + status: 'install-error' | 'run-error' | 'fatal-runtime-error' output: string[] error: string } @@ -209,10 +437,12 @@ interface PrepareSuccess { kind: 'success' dependencies?: string[] } + interface PrepareError { kind: 'error' message: string } + interface PreparePyEnv { prepare_env: (files: CodeFile[]) => Promise dump_json: (value: any, always_return_json: boolean) => string | null diff --git a/mcp_run_python/main.py b/mcp_run_python/main.py index 34d177d..9fb5440 100644 --- a/mcp_run_python/main.py +++ b/mcp_run_python/main.py @@ -26,6 +26,10 @@ def run_mcp_server( return_mode: Literal['json', 'xml'] = 'xml', deps_log_handler: LogHandler | None = None, allow_networking: bool = True, + enable_file_outputs: bool = False, + pyodide_max_workers: int = 10, + pyodide_worker_wait_timeout_sec: int = 60, + pyodide_code_run_timeout_sec: int = 60, ) -> int: """Install dependencies then run the mcp-run-python server. @@ -36,6 +40,10 @@ def run_mcp_server( return_mode: The mode to return tool results in. deps_log_handler: Optional function to receive logs emitted while installing dependencies. allow_networking: Whether to allow networking when running provided python code. + enable_file_outputs: Whether to enable output files + pyodide_max_workers: How many pyodide workers to max use at the same time + pyodide_code_run_timeout_sec: How long to wait for pyodide code to run in seconds. + pyodide_worker_wait_timeout_sec: How long to wait for a free pyodide worker in seconds. """ with prepare_deno_env( mode, @@ -44,9 +52,14 @@ def run_mcp_server( return_mode=return_mode, deps_log_handler=deps_log_handler, allow_networking=allow_networking, + enable_file_outputs=enable_file_outputs, + pyodide_max_workers=pyodide_max_workers, + pyodide_worker_wait_timeout_sec=pyodide_worker_wait_timeout_sec, + pyodide_code_run_timeout_sec=pyodide_code_run_timeout_sec, ) as env: + logger.info(f'Running with file output support {"enabled" if enable_file_outputs else "disabled"}.') if mode == 'streamable_http': - logger.info('Running mcp-run-python via %s on port %d...', mode, http_port) + logger.info('Running mcp-run-python via %s on port %d...', mode, str(http_port)) else: logger.info('Running mcp-run-python via %s...', mode) @@ -74,6 +87,10 @@ def prepare_deno_env( return_mode: Literal['json', 'xml'] = 'xml', deps_log_handler: LogHandler | None = None, allow_networking: bool = True, + enable_file_outputs: bool = False, + pyodide_max_workers: int = 10, + pyodide_worker_wait_timeout_sec: int = 60, + pyodide_code_run_timeout_sec: int = 60, ) -> Iterator[DenoEnv]: """Prepare the deno environment for running the mcp-run-python server with Deno. @@ -89,6 +106,10 @@ def prepare_deno_env( deps_log_handler: Optional function to receive logs emitted while installing dependencies. allow_networking: Whether the prepared DenoEnv should allow networking when running code. Note that we always allow networking during environment initialization to install dependencies. + enable_file_outputs: Whether to enable output files + pyodide_max_workers: How many pyodide workers to max use at the same time + pyodide_code_run_timeout_sec: How long to wait for pyodide code to run in seconds. + pyodide_worker_wait_timeout_sec: How long to wait for a free pyodide worker in seconds. Returns: Yields the deno environment details. @@ -98,6 +119,7 @@ def prepare_deno_env( src = Path(__file__).parent / 'deno' logger.debug('Copying from %s to %s...', src, cwd) shutil.copytree(src, cwd) + (cwd / 'output_files').mkdir() logger.info('Installing dependencies %s...', dependencies) args = 'deno', *_deno_install_args(dependencies) @@ -121,6 +143,10 @@ def prepare_deno_env( dependencies=dependencies, return_mode=return_mode, allow_networking=allow_networking, + enable_file_outputs=enable_file_outputs, + pyodide_max_workers=pyodide_max_workers, + pyodide_worker_wait_timeout_sec=pyodide_worker_wait_timeout_sec, + pyodide_code_run_timeout_sec=pyodide_code_run_timeout_sec, ) yield DenoEnv(cwd, args) @@ -137,6 +163,10 @@ async def async_prepare_deno_env( return_mode: Literal['json', 'xml'] = 'xml', deps_log_handler: LogHandler | None = None, allow_networking: bool = True, + enable_file_outputs: bool = False, + pyodide_max_workers: int = 10, + pyodide_worker_wait_timeout_sec: int = 60, + pyodide_code_run_timeout_sec: int = 60, ) -> AsyncIterator[DenoEnv]: """Async variant of `prepare_deno_env`.""" ct = await _asyncify( @@ -147,6 +177,10 @@ async def async_prepare_deno_env( return_mode=return_mode, deps_log_handler=deps_log_handler, allow_networking=allow_networking, + enable_file_outputs=enable_file_outputs, + pyodide_max_workers=pyodide_max_workers, + pyodide_worker_wait_timeout_sec=pyodide_worker_wait_timeout_sec, + pyodide_code_run_timeout_sec=pyodide_code_run_timeout_sec, ) try: yield await _asyncify(ct.__enter__) @@ -176,17 +210,27 @@ def _deno_run_args( dependencies: list[str] | None = None, return_mode: Literal['json', 'xml'] = 'xml', allow_networking: bool = True, + enable_file_outputs: bool = False, + pyodide_max_workers: int = 10, + pyodide_worker_wait_timeout_sec: int = 60, + pyodide_code_run_timeout_sec: int = 60, ) -> list[str]: args = ['run'] if allow_networking: args += ['--allow-net'] args += [ - '--allow-read=./node_modules', + '--allow-read=./node_modules,./output_files', + '--allow-write=./output_files', '--node-modules-dir=auto', 'src/main.ts', mode, f'--return-mode={return_mode}', + f'--pyodide-max-workers={pyodide_max_workers}', + f'--pyodide-worker-wait-timeout-sec={pyodide_worker_wait_timeout_sec}', + f'--pyodide-code-run-timeout-sec={pyodide_code_run_timeout_sec}', ] + if enable_file_outputs: + args += ['--enable-file-outputs'] if dependencies is not None: args.append(f'--deps={",".join(dependencies)}') if http_port is not None: diff --git a/tests/test_mcp_servers.py b/tests/test_mcp_servers.py index 889b903..46c60e5 100644 --- a/tests/test_mcp_servers.py +++ b/tests/test_mcp_servers.py @@ -3,9 +3,10 @@ import asyncio import re import subprocess +import time from collections.abc import AsyncIterator, Callable from contextlib import AbstractAsyncContextManager, asynccontextmanager -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest from httpx import AsyncClient, HTTPError @@ -13,6 +14,7 @@ from mcp import ClientSession, StdioServerParameters, types from mcp.client.stdio import stdio_client from mcp.client.streamable_http import streamablehttp_client +from mcp.types import BlobResourceContents, EmbeddedResource from mcp_run_python import async_prepare_deno_env @@ -27,9 +29,11 @@ def fixture_run_mcp_session( request: pytest.FixtureRequest, ) -> Callable[[list[str]], AbstractAsyncContextManager[ClientSession]]: @asynccontextmanager - async def run_mcp(deps: list[str]) -> AsyncIterator[ClientSession]: + async def run_mcp(deps: list[str], enable_file_outputs: bool = True) -> AsyncIterator[ClientSession]: if request.param == 'stdio': - async with async_prepare_deno_env('stdio', dependencies=deps) as env: + async with async_prepare_deno_env( + 'stdio', dependencies=deps, enable_file_outputs=enable_file_outputs + ) as env: server_params = StdioServerParameters(command='deno', args=env.args, cwd=env.cwd) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: @@ -37,8 +41,10 @@ async def run_mcp(deps: list[str]) -> AsyncIterator[ClientSession]: else: assert request.param == 'streamable_http', request.param port = 3101 - async with async_prepare_deno_env('streamable_http', http_port=port, dependencies=deps) as env: - p = subprocess.Popen(['deno', *env.args], cwd=env.cwd) + async with async_prepare_deno_env( + 'streamable_http', http_port=port, dependencies=deps, enable_file_outputs=enable_file_outputs + ) as env: + p = subprocess.Popen(['deno', *env.args], cwd=env.cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) try: url = f'http://localhost:{port}/mcp' await wait_for_server(url, 8) @@ -192,6 +198,56 @@ async def test_run_python_code( assert content.text == expected_output +@pytest.mark.parametrize( + 'deps,code,expected_output,expected_resources', + [ + pytest.param( + [], + [ + 'from pathlib import Path', + 'Path("/output_files/hello.txt").write_text("hello world!")', + ], + snapshot("""\ +success + +12 +\ +"""), + [ + EmbeddedResource( + type='resource', + resource=BlobResourceContents( + uri='file://_', + mimeType='text/plain', + name='hello.txt', # pyright: ignore[reportCallIssue] + blob='aGVsbG8gd29ybGQh', + ), + ) + ], + id='hello-world-file', + ), + ], +) +async def test_run_python_code_with_output_resource( + run_mcp_session: Callable[[list[str]], AbstractAsyncContextManager[ClientSession]], + deps: list[str], + code: list[str], + expected_output: str, + expected_resources: list[EmbeddedResource], +) -> None: + async with run_mcp_session(deps) as mcp_session: + await mcp_session.initialize() + result = await mcp_session.call_tool('run_python_code', {'python_code': '\n'.join(code)}) + assert len(result.content) >= 2 + text_content = result.content[0] + resource_content = result.content[1:] + assert isinstance(text_content, types.TextContent) + assert text_content.text == expected_output + assert len(resource_content) == len(expected_resources) + for got, expected in zip(resource_content, expected_resources): + assert got == expected + + async def test_install_run_python_code() -> None: logs: list[str] = [] @@ -226,3 +282,179 @@ def logging_callback(level: str, message: str) -> None: \ """ ) + + +@pytest.mark.parametrize('enable_file_outputs', [pytest.param(True), pytest.param(False)]) +@pytest.mark.parametrize( + 'code_list,multiplicator,max_time_needed', + [ + pytest.param( + [ + """ + import time + time.sleep(5) + x=11 + x + """, + """ + import asyncio + await asyncio.sleep(5) + x=11 + x + """, + ], + 10, + 40, + ), + pytest.param( + [ + """ + x=11 + x + """, + ], + 500, + 40, + ), + ], +) +async def test_run_parallel_python_code( + run_mcp_session: Callable[[list[str], bool], AbstractAsyncContextManager[ClientSession]], + enable_file_outputs: bool, + code_list: list[str], + multiplicator: int, + max_time_needed: int, +) -> None: + # Run this a couple times (10) in parallel + # As we have 10 pyodide workers by default, this should finish in under the needed time if you add the tasks itself (first initialisation takes a bit - especially for 10 workers) + code_list = code_list * multiplicator + + concurrency_limiter = asyncio.Semaphore(50) + + async def run_wrapper(code: str): + # limit concurrency to avoid overwhelming the server with 500 tasks at once :D + async with concurrency_limiter: + return await mcp_session.call_tool('run_python_code', {'python_code': code}) + + async with run_mcp_session([], enable_file_outputs) as mcp_session: + await mcp_session.initialize() + + start = time.perf_counter() + + tasks: set[Any] = set() + for code in code_list: + tasks.add(run_wrapper(code)) + + # await the tasks + results: list[types.CallToolResult] = await asyncio.gather(*tasks) + + # check parallelism + end = time.perf_counter() + run_time = end - start + assert run_time < max_time_needed + assert run_time > 5 + + # check that all outputs are fine too + for result in results: + assert len(result.content) == 1 + content = result.content[0] + + assert isinstance(content, types.TextContent) + assert ( + content.text.strip() + == """success + +11 +""".strip() + ) + + +async def test_run_parallel_python_code_with_files( + run_mcp_session: Callable[[list[str], bool], AbstractAsyncContextManager[ClientSession]], +) -> None: + """Check that the file system works between runs and keeps files to their runs""" + code_list = [ + """ + import time + from pathlib import Path + for i in range(5): + Path(f"/output_files/run1_file{i}.txt").write_text("hi") + time.sleep(1) + """, + """ + import time + from pathlib import Path + for i in range(5): + time.sleep(1) + Path(f"/output_files/run2_file{i}.txt").write_text("hi") + """, + ] + + async with run_mcp_session([], True) as mcp_session: + await mcp_session.initialize() + + start = time.perf_counter() + + tasks: set[Any] = set() + for code in code_list: + tasks.add(mcp_session.call_tool('run_python_code', {'python_code': code})) + + # await the tasks + results: list[types.CallToolResult] = await asyncio.gather(*tasks) + + # check parallelism + end = time.perf_counter() + run_time = end - start + assert run_time < 10 + assert run_time > 5 + + # check that all outputs are fine too + for result in results: + assert len(result.content) == 6 + + run_ids: set[str] = set() + for content in result.content: + match content: + case types.EmbeddedResource(): + # save the run id from the text file name - to make sure its all the same + run_ids.add(content.resource.name.split('_')[0]) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportAttributeAccessIssue] + assert content.resource.blob == 'aGk=' # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue] + case types.TextContent(): + assert content.text.strip() == 'success' + case _: + raise AssertionError('Unexpected content type') + assert len(run_ids) == 1 + + +async def test_run_python_code_timeout( + run_mcp_session: Callable[[list[str], bool], AbstractAsyncContextManager[ClientSession]], +) -> None: + """Check that the timeout of the run command works (60s)""" + code = """ + import time + time.sleep(90) + """ + + async with run_mcp_session([], True) as mcp_session: + await mcp_session.initialize() + + start = time.perf_counter() + + result = await mcp_session.call_tool('run_python_code', {'python_code': code}) + + # check parallelism + end = time.perf_counter() + run_time = end - start + assert run_time > 60 + assert run_time < 65 + + assert len(result.content) == 1 + content = result.content[0] + assert isinstance(content, types.TextContent) + assert ( + content.text.strip() + == """run-error + +Error: Timeout exceeded for python execution (60 sec) +""".strip() + )