Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 70 additions & 47 deletions pyisolate/runtime/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,8 @@ def _merge_allowed_imports(policy, allowed_imports: Optional[Iterable[str]]):
return imports
return None

def __init__(
def _init_config_wiring(
self,
name: str,
policy=None,
cpu_ms: Optional[int] = None,
mem_bytes: Optional[int] = None,
Expand All @@ -413,18 +412,11 @@ def __init__(
network_ops_max: Optional[int] = None,
output_bytes_max: Optional[int] = None,
child_work_max: Optional[int] = None,
allowed_imports: Optional[list[str]] = None,
on_violation: Optional[Callable[[str, Exception], None]] = None,
tracer: Optional["Tracer"] = None,
allowed_imports: Optional[Iterable[str]] = None,
numa_node: Optional[int] = None,
cgroup_path=None,
capabilities: Optional[dict[str, Any]] = None,
):
super().__init__(name=name, daemon=True)
self._logger = logging.getLogger(f"pyisolate.{name}")
self._inbox: "queue.Queue[Any]" = queue.Queue()
self._outbox: "queue.Queue[Any]" = queue.Queue()
self._stop_event = threading.Event()
) -> None:
self.policy = policy
self.cpu_quota_ms = cpu_ms
self.mem_quota_bytes = mem_bytes
Expand All @@ -434,28 +426,71 @@ def __init__(
self.output_bytes_max = output_bytes_max
self.child_work_max = child_work_max
self.allowed_imports = self._merge_allowed_imports(policy, allowed_imports)
self.numa_node = numa_node
self._bound_numa_node = None
self._cgroup_path = cgroup_path
self._capabilities = deserialize_capabilities(capabilities)

def _reset_runtime_state(self) -> None:
self._cpu_time = 0.0
self._mem_peak = 0
self.numa_node = numa_node
self._bound_numa_node: int | None = None
self._mem_base = 0
self._start_time: float | None = None
self._on_violation = on_violation
self._tracer = tracer or Tracer()
self._start_time = None
self._ops = 0
self._errors = 0
self._latency = {"0.5": 0, "1": 0, "5": 0, "10": 0, "inf": 0}
self._latency_sum = 0.0
self._cgroup_path = cgroup_path
self._trace_enabled = False
self._syscall_log: list[str] = []
self._capabilities = deserialize_capabilities(capabilities)
self._quarantine_reason: str | None = None
self.termination_reason: str | None = None
self._quarantine_reason = None
self.termination_reason = None
self._open_files = 0
self._network_ops = 0
self._output_bytes = 0
self._child_work = 0

def __init__(
self,
name: str,
policy=None,
cpu_ms: Optional[int] = None,
mem_bytes: Optional[int] = None,
wall_time_ms: Optional[int] = None,
open_files_max: Optional[int] = None,
network_ops_max: Optional[int] = None,
output_bytes_max: Optional[int] = None,
child_work_max: Optional[int] = None,
allowed_imports: Optional[list[str]] = None,
on_violation: Optional[Callable[[str, Exception], None]] = None,
tracer: Optional["Tracer"] = None,
numa_node: Optional[int] = None,
cgroup_path=None,
capabilities: Optional[dict[str, Any]] = None,
):
super().__init__(name=name, daemon=True)
self._logger = logging.getLogger(f"pyisolate.{name}")
self._inbox: "queue.Queue[Any]" = queue.Queue()
self._outbox: "queue.Queue[Any]" = queue.Queue()
self._stop_event = threading.Event()
self._on_violation = on_violation
self._tracer = tracer or Tracer()
self._init_config_wiring(
policy=policy,
cpu_ms=cpu_ms,
mem_bytes=mem_bytes,
wall_time_ms=wall_time_ms,
open_files_max=open_files_max,
network_ops_max=network_ops_max,
output_bytes_max=output_bytes_max,
child_work_max=child_work_max,
allowed_imports=allowed_imports,
numa_node=numa_node,
cgroup_path=cgroup_path,
capabilities=capabilities,
)
self._reset_runtime_state()
# Dedup set spans sandbox lifetimes for this thread; message IDs are monotonic
# and intentionally preserved across reset() to avoid stale replay collisions.
self._next_attach_msg_id = 1
self._seen_attach_msg_ids: set[int] = set()

Expand Down Expand Up @@ -619,33 +654,21 @@ def reset(
"""Reuse this thread for a new sandbox."""
old_path = getattr(self, "_cgroup_path", None)
self.name = name
self.policy = policy
self.cpu_quota_ms = cpu_ms
self.mem_quota_bytes = mem_bytes
self.wall_time_ms = wall_time_ms
self.open_files_max = open_files_max
self.network_ops_max = network_ops_max
self.output_bytes_max = output_bytes_max
self.child_work_max = child_work_max
self.numa_node = numa_node
self._bound_numa_node = None
self.allowed_imports = self._merge_allowed_imports(policy, allowed_imports)
self._cpu_time = 0.0
self._mem_peak = 0
self._ops = 0
self._errors = 0
self._latency = {"0.5": 0, "1": 0, "5": 0, "10": 0, "inf": 0}
self._latency_sum = 0.0
self._trace_enabled = False
self._syscall_log = []
self._start_time = None
self._cgroup_path = cgroup_path
self._capabilities = deserialize_capabilities(capabilities)
self.termination_reason = None
self._open_files = 0
self._network_ops = 0
self._output_bytes = 0
self._child_work = 0
self._init_config_wiring(
policy=policy,
cpu_ms=cpu_ms,
mem_bytes=mem_bytes,
wall_time_ms=wall_time_ms,
open_files_max=open_files_max,
network_ops_max=network_ops_max,
output_bytes_max=output_bytes_max,
child_work_max=child_work_max,
allowed_imports=allowed_imports,
numa_node=numa_node,
cgroup_path=cgroup_path,
capabilities=capabilities,
)
self._reset_runtime_state()
# Request the sandbox thread to (re)attach itself to the new cgroup.
# The attachment must happen from the sandbox thread's context.
msg_id = self._next_attach_msg_id
Expand Down
Loading