Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions src/conductor/cli/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,18 +469,114 @@ def _report_install_failure(
console.print(r" [bold]Add-MpPreference -ExclusionPath \"$env:LOCALAPPDATA\uv\"[/bold]")


def _get_self_and_ancestor_pids() -> set[int]:
"""Return the PID of the current process and all of its ancestors.

On Windows the ``conductor.exe`` shim that launched the Python
interpreter is a *separate* process. Excluding only ``os.getpid()``
would still surface that shim as another running Conductor process
during ``conductor update``. Walking the full ancestor chain prevents
those false positives.

The walk is best-effort: any failure (missing tool, parse error,
timeout, permission denied) returns whatever PIDs were collected so
far, never raises. The current PID and its direct parent are always
included via the ``os`` module so the result is never empty.

Returns:
A set of PIDs to treat as "self" when scanning for other running
Conductor processes.
"""
pids: set[int] = {os.getpid()}
with contextlib.suppress(OSError):
pids.add(os.getppid())

parent_map = _build_parent_pid_map()
if not parent_map:
return pids

# Walk up from the current PID until we hit a missing entry or a
# cycle (defensive: PID reuse can theoretically create one).
cursor = os.getpid()
seen: set[int] = set()
while cursor in parent_map and cursor not in seen:
seen.add(cursor)
ppid = parent_map[cursor]
if ppid <= 0:
break
pids.add(ppid)
cursor = ppid

return pids


def _build_parent_pid_map() -> dict[int, int]:
"""Return a mapping of ``pid -> parent_pid`` for all running processes.

Uses ``wmic`` on Windows and ``ps`` elsewhere. Returns an empty dict
on any failure so callers can fall back gracefully.
"""
try:
if sys.platform == "win32":
proc = subprocess.run( # noqa: S603, S607
["wmic", "process", "get", "ProcessId,ParentProcessId", "/format:csv"],
capture_output=True,
text=True,
timeout=5,
)
if proc.returncode != 0:
return {}
mapping: dict[int, int] = {}
# CSV header is "Node,ParentProcessId,ProcessId" — we don't
# rely on column position; instead we look for the first two
# integer columns on each line.
for line in proc.stdout.splitlines():
parts = [p.strip() for p in line.split(",")]
ints = [int(p) for p in parts if p.isdigit()]
if len(ints) >= 2:
ppid_val, pid_val = ints[0], ints[1]
mapping[pid_val] = ppid_val
return mapping
proc = subprocess.run( # noqa: S603, S607
["ps", "-axo", "pid=,ppid="],
capture_output=True,
text=True,
timeout=5,
)
if proc.returncode != 0:
return {}
mapping = {}
for line in proc.stdout.splitlines():
tokens = line.split()
if len(tokens) < 2:
continue
try:
pid_val = int(tokens[0])
ppid_val = int(tokens[1])
except ValueError:
continue
mapping[pid_val] = ppid_val
return mapping
except (OSError, subprocess.TimeoutExpired):
return {}


def _find_running_conductor_processes() -> list[dict]:
"""Return a list of other running Conductor processes (excluding self).

Cross-platform: uses ``tasklist`` on Windows and ``ps`` elsewhere.
Each entry is ``{"pid": int, "cmd": str}``. The current process is
always excluded.
Each entry is ``{"pid": int, "cmd": str}``. The current process and its
ancestor chain are always excluded — on Windows the ``conductor.exe``
shim that launched the Python interpreter is a separate process from
the running Python, so excluding only ``os.getpid()`` would still
surface our own shim as a "running Conductor process".

Returns:
A list of dicts, one per detected Conductor process other than the
current one. Empty list on detection error or when none are found.
current one (and its ancestors). Empty list on detection error or
when none are found.
"""
self_pid = os.getpid()
excluded_pids = _get_self_and_ancestor_pids()
results: list[dict] = []

try:
Expand All @@ -507,7 +603,7 @@ def _find_running_conductor_processes() -> list[dict]:
pid = int(pid_str)
except ValueError:
continue
if pid == self_pid:
if pid in excluded_pids:
continue
results.append({"pid": pid, "cmd": image})
else:
Expand All @@ -529,7 +625,7 @@ def _find_running_conductor_processes() -> list[dict]:
pid = int(pid_str)
except ValueError:
continue
if pid == self_pid:
if pid in excluded_pids:
continue
# Match the conductor entrypoint script or any process whose
# command line invokes it. Avoid matching this module's own
Expand Down
109 changes: 109 additions & 0 deletions tests/test_cli/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,3 +849,112 @@ def test_hint_not_shown_when_not_tty(self, cache_dir: Path) -> None:
):
runner.invoke(app, ["validate", "--help"])
mock_hint.assert_not_called()


# ---------------------------------------------------------------------------
# Tests: ancestor PID exclusion in _find_running_conductor_processes
# ---------------------------------------------------------------------------


class TestAncestorPidExclusion:
"""Verify that the current process and its ancestors are not flagged."""

def test_self_pid_in_excluded_set(self) -> None:
"""``os.getpid()`` is always excluded."""
import os

from conductor.cli.update import _get_self_and_ancestor_pids

assert os.getpid() in _get_self_and_ancestor_pids()

def test_parent_pid_in_excluded_set(self) -> None:
"""``os.getppid()`` is always excluded so the launching shim is ignored."""
import os

from conductor.cli.update import _get_self_and_ancestor_pids

assert os.getppid() in _get_self_and_ancestor_pids()

def test_ancestor_chain_excluded_via_parent_map(self) -> None:
"""When a parent map is available, the full ancestor chain is excluded."""
import os

from conductor.cli.update import _get_self_and_ancestor_pids

my_pid = os.getpid()
# Fabricated ancestor chain: my_pid -> 100 -> 200 -> 0
fake_map = {my_pid: 100, 100: 200, 200: 0}
with patch("conductor.cli.update._build_parent_pid_map", return_value=fake_map):
pids = _get_self_and_ancestor_pids()
assert my_pid in pids
assert 100 in pids
assert 200 in pids
# PPID 0 is the chain terminator and is not added.
assert 0 not in pids

def test_parent_map_failure_falls_back_to_getpid_and_getppid(self) -> None:
"""When the parent map cannot be built we still exclude self and direct parent."""
import os

from conductor.cli.update import _get_self_and_ancestor_pids

with patch("conductor.cli.update._build_parent_pid_map", return_value={}):
pids = _get_self_and_ancestor_pids()
assert os.getpid() in pids
assert os.getppid() in pids

def test_cycle_in_parent_map_terminates_walk(self) -> None:
"""A cycle in the parent map does not cause infinite recursion."""
import os

from conductor.cli.update import _get_self_and_ancestor_pids

my_pid = os.getpid()
# Cyclic chain my_pid -> 100 -> my_pid
fake_map = {my_pid: 100, 100: my_pid}
with patch("conductor.cli.update._build_parent_pid_map", return_value=fake_map):
pids = _get_self_and_ancestor_pids()
# Walk added 100 then stopped before re-adding my_pid.
assert 100 in pids
assert my_pid in pids

def test_find_running_excludes_ancestor(self) -> None:
"""Conductor shim listed in ``ps`` but in our ancestor chain is filtered out."""
import os
import sys

from conductor.cli.update import _find_running_conductor_processes

ancestor_pid = 99999
other_pid = 88888

# ps output containing both ancestor and an unrelated conductor proc.
if sys.platform == "win32":
mock_stdout = (
f'"conductor.exe","{ancestor_pid}","Console","1","12 K"\n'
f'"conductor.exe","{other_pid}","Console","1","12 K"\n'
)
cmd_match = ["tasklist"]
else:
mock_stdout = (
f"{ancestor_pid} /usr/local/bin/conductor run wf.yaml\n"
f"{other_pid} /usr/local/bin/conductor run other.yaml\n"
)
cmd_match = ["ps"]

from subprocess import CompletedProcess

completed = CompletedProcess(args=cmd_match, returncode=0, stdout=mock_stdout, stderr="")

with (
patch(
"conductor.cli.update._get_self_and_ancestor_pids",
return_value={os.getpid(), ancestor_pid},
),
patch("conductor.cli.update.subprocess.run", return_value=completed),
):
running = _find_running_conductor_processes()

pids = {p["pid"] for p in running}
assert ancestor_pid not in pids, "ancestor (own shim) should be filtered"
assert other_pid in pids, "unrelated conductor proc should be reported"
Loading