Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions dspy/primitives/python_interpreter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import logging
import os
import subprocess
from os import PathLike
from types import TracebackType
from typing import Any

logger = logging.getLogger(__name__)

class InterpreterError(RuntimeError):
pass
Expand Down Expand Up @@ -37,8 +39,9 @@ def __init__(
"""
Args:
deno_command: command list to launch Deno.
enable_read_paths: Files or directories to allow reading from in the sandbox.
enable_write_paths: Files or directories to allow writing to in the sandbox.
enable_read_paths: Files or directories to allow reading from in the sandbox.
enable_write_paths: Files or directories to allow writing to in the sandbox.
All write paths will also be able to be read from for mounting.
enable_env_vars: Environment variable names to allow in the sandbox.
enable_network_access: Domains or IPs to allow network access in the sandbox.
sync_files: If set, syncs changes within the sandbox back to original files after execution.
Expand All @@ -56,7 +59,22 @@ def __init__(
if deno_command:
self.deno_command = list(deno_command)
else:
args = ["deno", "run", "--allow-read"]
args = ["deno", "run"]

# Allow reading runner.js and explicitly enabled paths
allowed_read_paths = [self._get_runner_path()]

# Also allow reading Deno's cache directory so Pyodide can load its files
deno_dir = self._get_deno_dir()
if deno_dir:
allowed_read_paths.append(deno_dir)

if self.enable_read_paths:
allowed_read_paths.extend(str(p) for p in self.enable_read_paths)
if self.enable_write_paths:
allowed_read_paths.extend(str(p) for p in self.enable_write_paths)
args.append(f"--allow-read={','.join(allowed_read_paths)}")

self._env_arg = ""
if self.enable_env_vars:
user_vars = [str(v).strip() for v in self.enable_env_vars]
Expand All @@ -77,6 +95,36 @@ def __init__(
self.deno_process = None
self._mounted_files = False

_deno_dir_cache = None

@classmethod
def _get_deno_dir(cls) -> str | None:
if cls._deno_dir_cache:
return cls._deno_dir_cache

if "DENO_DIR" in os.environ:
cls._deno_dir_cache = os.environ["DENO_DIR"]
return cls._deno_dir_cache

try:
# Attempt to find deno in path or use just "deno"
# We can't easily know which 'deno' will be used if not absolute, but 'deno' is a safe bet
result = subprocess.run(
["deno", "info", "--json"],
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
info = json.loads(result.stdout)
cls._deno_dir_cache = info.get("denoDir")
return cls._deno_dir_cache
except Exception:
logger.warning("Unable to find the Deno cache dir.")
pass

return None

def _get_runner_path(self) -> str:
current_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(current_dir, "runner.js")
Expand Down
33 changes: 33 additions & 0 deletions tests/primitives/test_python_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,36 @@ def test_enable_net_flag():
)
result = interpreter.execute(code)
assert int(result) == 200, "Network access is permitted with enable_network_access"


def test_interpreter_security_filesystem_access(tmp_path):
"""
Verify that the interpreter cannot read arbitrary files from the host system
unless explicitly allowed.
"""
# 1. Create a "secret" file on the host
secret_file = tmp_path / "secret.txt"
secret_content = "This is a secret content"
secret_file.write_text(secret_content)
secret_path_str = str(secret_file.absolute())

# 2. Attempt to read the file WITHOUT permission
malicious_code = f"""
import js
try:
content = js.Deno.readTextFileSync('{secret_path_str}')
print(content)
except Exception as e:
print(f"Error: {{e}}")
"""

with PythonInterpreter() as interpreter:
output = interpreter(malicious_code)
assert "Requires read access" in output
assert secret_content not in output

# 3. Attempt to read the file WITH permission
with PythonInterpreter(enable_read_paths=[secret_path_str]) as interpreter:
output = interpreter(malicious_code)
assert secret_content in output