diff --git a/README.md b/README.md index 2f08bfa..0f469c1 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,77 @@ If the folder has no `.devcontainer/devcontainer.json`, falls back to plain `cod Open `` (default: current directory) in VS Code via the configured devcontainer. Exit code is forwarded from the spawned editor. +### `dcode shell` + +Open an interactive shell inside the project's running devcontainer. + +```bash +dcode shell # current directory +dcode shell ./my-project # specific path +dcode shell --shell zsh # explicit shell executable (overrides settings) +``` + +Shell selection priority (highest first): + +1. `--shell` CLI flag (literal executable; no argument parsing) +2. Workspace `/.vscode/settings.json`: + `terminal.integrated.defaultProfile.linux` plus the matching + `terminal.integrated.profiles.linux` entry +3. `devcontainer.json` `customizations.vscode.settings` with the same keys +4. Host user-level VS Code settings, such as `~/Library/Application Support/Code/User/settings.json` + on macOS, `~/.config/Code/User/settings.json` on Linux, or Windows-side + settings via the WSL bridge +5. Container login shell from `getent passwd ` (`nologin` and `false` are rejected) +6. Fallback: `/bin/bash`, then `/bin/sh` + +`dcode shell` always reads the `.linux` terminal settings because devcontainers +run Linux, even on macOS and WSL hosts. Profile `args` and `env` are honored; +if a profile `path` is a list, the first entry is used. `${...}` substitution in +profile values is not resolved in this version, so those values are passed +through verbatim with a warning. + +SSH agent forwarding works automatically when VS Code is open and connected to +the devcontainer. `dcode shell` detects the VS Code relay socket at +`/tmp/vscode-ssh-auth-*.sock` and sets `SSH_AUTH_SOCK` on `docker exec`. If no +socket is found, it prints a hint to open the project in VS Code first. + +The shell runs as `remoteUser` from `devcontainer.json` when set, then +`containerUser`, otherwise the container image's `USER` applies. + +The working directory matches the URI logic: `/` +for worktrees, otherwise ``. The path is probed with `test -d`; +if it does not exist, `dcode shell` falls back to the base `` +with a warning, or omits `-w` entirely if that is missing too. + +Limitations: + +- **GPG agent forwarding is not yet supported.** Commit signing inside the shell + will not work unless you've configured your own GPG forwarding via + `containerEnv` and a bind mount. +- **`remoteEnv` is not applied.** The environment may differ from VS Code's + integrated terminal; a warning is printed when `remoteEnv` is present in + `devcontainer.json`. +- **Variable substitution** (`${env:VAR}`, `${localEnv:VAR}`) in terminal + profile values is not resolved. +- **Devcontainer config inheritance** (`extends`, image-label metadata, Docker + Compose service `user`) is not merged; only the raw `devcontainer.json` file + is read. For complex setups, shell selection may differ from VS Code's + resolved view. +- **Requires an interactive terminal.** `dcode shell` exits with an error when + stdin or stdout is not a TTY, such as in piped or scripted contexts. + +Common errors: + +- No `devcontainer.json`: exits non-zero and points you at `dcode doctor`. +- Container not running: no matching devcontainer was found; open the project in + VS Code first. +- Container stopped: run `dcode ` to start it. +- Multiple matching containers: clean up the duplicate containers listed in the + error. +- Docker not available: install/start Docker or Docker Desktop and try again. + +To open a folder literally named `shell`, run `dcode ./shell`. + ### `dcode doctor [path]` Diagnose the local environment for dcode and print a "what would `dcode ` do here?" @@ -71,10 +142,12 @@ Exit codes: ### Naming-collision workaround -`doctor` and `update` are subcommands, so `dcode doctor` and `dcode update` always -invoke them. To open a folder literally named `doctor` or `update`, prefix the path: +`shell`, `doctor`, and `update` are subcommands, so `dcode shell`, `dcode doctor`, +and `dcode update` always invoke them. To open a folder literally named `shell`, +`doctor`, or `update`, prefix the path: ```bash +dcode ./shell dcode ./doctor dcode "$(pwd)/update" ``` diff --git a/src/dcode/cli.py b/src/dcode/cli.py index 8f0f331..d6ff638 100644 --- a/src/dcode/cli.py +++ b/src/dcode/cli.py @@ -10,7 +10,7 @@ from dcode.doctor import run_doctor from dcode.update import run_update, run_update_check -_SUBCOMMANDS = ("doctor", "update") +_SUBCOMMANDS = ("doctor", "update", "shell") def _build_parser() -> argparse.ArgumentParser: @@ -19,9 +19,10 @@ def _build_parser() -> argparse.ArgumentParser: description=( "Open a folder in a VS Code devcontainer.\n" "\n" - "`dcode doctor` and `dcode update` always run their respective subcommands. " - "To open a folder literally named 'doctor' or 'update', " - "run `dcode ./doctor` or `dcode ./update`." + "`dcode doctor`, `dcode update`, and `dcode shell` always run their " + "respective subcommands. To open a folder literally named " + "'doctor', 'update', or 'shell', run `dcode ./doctor`, " + "`dcode ./update`, or `dcode ./shell`." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -57,6 +58,37 @@ def _build_parser() -> argparse.ArgumentParser: help="check for an available update without installing it", ) + p_shell = subparsers.add_parser( + "shell", + help="Open a shell in the project's running devcontainer", + description=( + "Open an interactive shell inside the running devcontainer for " + "the project at `path`. Mirrors VS Code's integrated terminal: " + "respects terminal profile settings (workspace > devcontainer > " + "user), forwards the SSH agent socket when available, runs as " + "`remoteUser`/`containerUser` from devcontainer.json. Requires " + "an interactive terminal. To open a folder literally named " + "'shell', use `dcode ./shell`." + ), + ) + p_shell.add_argument( + "shell_path", + nargs="?", + default=".", + metavar="path", + help="project folder (default: current directory)", + ) + p_shell.add_argument( + "--shell", + default=None, + dest="shell_override", + metavar="EXECUTABLE", + help=( + "literal shell executable to use (overrides VS Code settings); " + "no shell-style argument splitting" + ), + ) + parser.add_argument( "path", nargs="?", @@ -85,6 +117,7 @@ def _looks_like_subcommand(argv: list[str]) -> bool: def main() -> None: argv = sys.argv[1:] + parser: argparse.ArgumentParser | None = None if _looks_like_subcommand(argv): parser = _build_parser() args = parser.parse_args(argv) @@ -105,4 +138,17 @@ def main() -> None: sys.exit(run_update_check()) sys.exit(run_update()) + if args.command == "shell": + shell_override = args.shell_override + if shell_override is not None and ( + shell_override.strip() != shell_override or any(c.isspace() for c in shell_override) + ): + assert parser is not None + parser.error( + "--shell must be a single executable path or name (no arguments); " + "use VS Code terminal profile args for that" + ) + from dcode.shell import run_shell + sys.exit(run_shell(args.shell_path, insiders=args.insiders, shell_override=shell_override)) + run_dcode(args.path, insiders=args.insiders) diff --git a/src/dcode/shell.py b/src/dcode/shell.py new file mode 100644 index 0000000..27f25a3 --- /dev/null +++ b/src/dcode/shell.py @@ -0,0 +1,642 @@ +"""dcode shell: exec into a running devcontainer. + +Locates the container by the Docker labels that ``devcontainers/cli`` sets +(``devcontainer.local_folder`` and ``devcontainer.config_file``), resolves a +shell using VS Code's settings priority chain, forwards the SSH agent socket +when available, then ``os.execvp``s into ``docker exec -it [...] ``. +""" + +from __future__ import annotations + +import json +import os +import platform +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +import json5 + +from dcode.core import find_devcontainer, get_workspace_folder, resolve_worktree +from dcode.wsl import _wsl_to_windows_path, get_windows_vscode_settings_path, is_wsl + +_ContainerState = Literal[ + "running", "stopped", "missing", "ambiguous", "docker_unavailable" +] + + +@dataclass(frozen=True, slots=True) +class ContainerLookup: + """Result of looking up a devcontainer by its Docker labels.""" + + state: _ContainerState + id: str | None = None + ids: tuple[str, ...] = () + detail: str | None = None + + +@dataclass(frozen=True, slots=True) +class ResolvedShell: + """A shell resolved from a VS Code terminal profile.""" + + path: str + args: tuple[str, ...] = () + env: tuple[tuple[str, str], ...] = () + + +# --------------------------------------------------------------------------- +# JSONC loading +# --------------------------------------------------------------------------- + + +def _load_jsonc(path: Path) -> dict: + """Load a JSONC file as a dict. + + Returns ``{}`` for missing files (silently), or for malformed / non-dict + contents (with a stderr warning). + """ + if not path.is_file(): + return {} + try: + text = path.read_text() + except OSError as exc: + print(f"dcode: failed to read {path}: {exc}", file=sys.stderr) + return {} + if not text.strip(): + return {} + try: + parsed = json5.loads(text) + except ValueError as exc: + print(f"dcode: failed to parse {path}: {exc}", file=sys.stderr) + return {} + if not isinstance(parsed, dict): + print( + f"dcode: ignoring {path}: top-level value is not an object", + file=sys.stderr, + ) + return {} + return parsed + + +# --------------------------------------------------------------------------- +# Container discovery +# --------------------------------------------------------------------------- + + +def _docker_ps(filters: list[str], *, all_states: bool = False) -> tuple[int, str, str]: + """Run ``docker ps [-a] -q --filter ...`` and return (rc, stdout, stderr).""" + argv = ["docker", "ps"] + if all_states: + argv.append("-a") + argv.append("-q") + for f in filters: + argv.extend(["--filter", f]) + try: + proc = subprocess.run(argv, capture_output=True, text=True, check=False) + except FileNotFoundError as exc: + return (-1, "", str(exc)) + except OSError as exc: + return (-1, "", str(exc)) + return (proc.returncode, proc.stdout, proc.stderr) + + +def find_container(host_path: str, config_path: str) -> ContainerLookup: + """Locate the devcontainer for a project. + + Tries ``devcontainer.local_folder`` + ``devcontainer.config_file`` first; + falls back to the single-label query, then probes ``docker ps -a`` to + detect a stopped-but-existing container. Distinguishes running, stopped, + missing, ambiguous, and ``docker_unavailable`` states. + + On WSL, both ``host_path`` and ``config_path`` are converted to Windows + paths before being used as label values, since VS Code on Windows stores + the labels in Windows-path format. + """ + if is_wsl(): + host_label_value = _wsl_to_windows_path(host_path) + config_label_value = _wsl_to_windows_path(config_path) + else: + host_label_value = host_path + config_label_value = config_path + + two_label = [ + f"label=devcontainer.local_folder={host_label_value}", + f"label=devcontainer.config_file={config_label_value}", + ] + one_label = [f"label=devcontainer.local_folder={host_label_value}"] + + rc, out, err = _docker_ps(two_label) + if rc != 0: + return ContainerLookup(state="docker_unavailable", detail=err.strip() or None) + ids = out.split() + if len(ids) > 1: + return ContainerLookup(state="ambiguous", ids=tuple(ids)) + if len(ids) == 1: + return ContainerLookup(state="running", id=ids[0]) + + # Fallback: single-label lookup (legacy containers, moved configs). + rc, out, err = _docker_ps(one_label) + if rc != 0: + return ContainerLookup(state="docker_unavailable", detail=err.strip() or None) + ids = out.split() + if len(ids) > 1: + return ContainerLookup(state="ambiguous", ids=tuple(ids)) + if len(ids) == 1: + return ContainerLookup(state="running", id=ids[0]) + + # Nothing running — check for a stopped container. + rc, out, err = _docker_ps(one_label, all_states=True) + if rc != 0: + return ContainerLookup(state="docker_unavailable", detail=err.strip() or None) + ids = out.split() + if ids: + # Could be 1 or many; report state=stopped either way. + return ContainerLookup(state="stopped", id=ids[0], ids=tuple(ids)) + + return ContainerLookup(state="missing") + + +# --------------------------------------------------------------------------- +# User-level VS Code settings +# --------------------------------------------------------------------------- + + +def get_user_settings_path(insiders: bool) -> Path | None: + """Return the host user's VS Code ``settings.json`` path (or ``None``).""" + code_dir = "Code - Insiders" if insiders else "Code" + + if is_wsl(): + return get_windows_vscode_settings_path(insiders) + + system = platform.system() + if system == "Darwin": + base = Path.home() / "Library" / "Application Support" + return base / code_dir / "User" / "settings.json" + if system == "Linux": + base = Path(os.environ.get("XDG_CONFIG_HOME") or (Path.home() / ".config")) + return base / code_dir / "User" / "settings.json" + return None + + +# --------------------------------------------------------------------------- +# Profile resolution +# --------------------------------------------------------------------------- + + +def _extract_profiles_layer(settings: dict) -> tuple[str | None, dict]: + """Pull (defaultProfile.linux, profiles.linux) from a settings dict. + + Accepts both flat keys (``terminal.integrated.profiles.linux``) and the + nested form (``terminal: {integrated: {profiles: {linux: ...}}}``). + """ + default_name: str | None = None + profiles: dict = {} + + flat_default = settings.get("terminal.integrated.defaultProfile.linux") + if isinstance(flat_default, str): + default_name = flat_default + flat_profiles = settings.get("terminal.integrated.profiles.linux") + if isinstance(flat_profiles, dict): + profiles = flat_profiles + + nested = settings.get("terminal") + if isinstance(nested, dict): + integrated = nested.get("integrated") + if isinstance(integrated, dict): + n_default = integrated.get("defaultProfile") + if isinstance(n_default, dict): + v = n_default.get("linux") + if isinstance(v, str): + default_name = v + n_profiles = integrated.get("profiles") + if isinstance(n_profiles, dict): + v = n_profiles.get("linux") + if isinstance(v, dict): + # Nested form fully replaces the flat form for this layer. + profiles = v + return default_name, profiles + + +def _merge_profiles(layers: list[dict]) -> dict: + """Deep-merge ``profiles.linux`` dicts across layers (lower → higher). + + A ``null`` value at any layer DELETES that profile entry from the merged + dict (matches VS Code's documented mechanism for disabling built-ins). + """ + merged: dict = {} + for layer in layers: + for name, value in layer.items(): + if value is None: + merged.pop(name, None) + else: + merged[name] = value + return merged + + +def _profile_to_resolved( + profile: dict, + *, + warn_substitution: list[bool], +) -> ResolvedShell | None: + """Convert a profile dict to a ResolvedShell, or None if unusable.""" + raw_path = profile.get("path") + if isinstance(raw_path, list): + raw_path = raw_path[0] if raw_path else None + if not isinstance(raw_path, str) or not raw_path: + return None + + raw_args = profile.get("args") or [] + args: list[str] = [] + if isinstance(raw_args, list): + for a in raw_args: + if isinstance(a, str): + if "${" in a and not warn_substitution[0]: + print( + "dcode: terminal profile contains ${...} substitution; " + "passing through unchanged (variable substitution is not " + "yet implemented)", + file=sys.stderr, + ) + warn_substitution[0] = True + args.append(a) + + raw_env = profile.get("env") or {} + env_pairs: list[tuple[str, str]] = [] + if isinstance(raw_env, dict): + for k, v in raw_env.items(): + if not isinstance(k, str) or not isinstance(v, str): + continue + if "${" in v and not warn_substitution[0]: + print( + "dcode: terminal profile contains ${...} substitution; " + "passing through unchanged (variable substitution is not " + "yet implemented)", + file=sys.stderr, + ) + warn_substitution[0] = True + env_pairs.append((k, v)) + + return ResolvedShell(path=raw_path, args=tuple(args), env=tuple(env_pairs)) + + +def resolve_terminal_profile( + workspace_root: Path, + devcontainer_cfg: dict, + insiders: bool, +) -> ResolvedShell | None: + """Resolve a shell from VS Code settings, walking the priority chain. + + Order (lower → higher precedence): user → devcontainer remote → workspace. + The merged ``profiles.linux`` is then used to look up the highest-scope + ``defaultProfile.linux``. If that profile name is missing or unusable, + falls through to lower-scope default names. Returns ``None`` if no scope + yields a usable profile (caller should fall back to the login shell). + """ + # Load layers. User scope is lowest precedence, workspace is highest. + user_settings: dict = {} + user_path = get_user_settings_path(insiders) + if user_path is not None: + user_settings = _load_jsonc(user_path) + + dc_customizations = devcontainer_cfg.get("customizations") or {} + dc_vscode = ( + dc_customizations.get("vscode") if isinstance(dc_customizations, dict) else None + ) + dc_settings: dict = {} + if isinstance(dc_vscode, dict): + s = dc_vscode.get("settings") + if isinstance(s, dict): + dc_settings = s + + workspace_settings = _load_jsonc(workspace_root / ".vscode" / "settings.json") + + # Extract per-layer (defaultProfile, profiles) tuples. + layers_in_order = [user_settings, dc_settings, workspace_settings] + layer_data = [_extract_profiles_layer(s) for s in layers_in_order] + + merged_profiles = _merge_profiles([profiles for _, profiles in layer_data]) + + # Try defaultProfile.linux from highest scope down. + warn_state = [False] + for default_name, _ in reversed(layer_data): + if not default_name: + continue + profile = merged_profiles.get(default_name) + if not isinstance(profile, dict): + # Strict resolution: never treat a profile name as an executable. + continue + resolved = _profile_to_resolved(profile, warn_substitution=warn_state) + if resolved is not None: + return resolved + + return None + + +# --------------------------------------------------------------------------- +# Login-shell detection +# --------------------------------------------------------------------------- + + +def _docker_exec_capture(container_id: str, argv: list[str]) -> subprocess.CompletedProcess: + """Run ``docker exec `` and capture output.""" + full = ["docker", "exec", container_id, *argv] + return subprocess.run(full, capture_output=True, text=True, check=False) + + +def detect_login_shell(container_id: str, exec_user: str | None) -> str: + """Detect the login shell for ``exec_user`` inside the container. + + If ``exec_user`` is None, probes ``id -un`` to discover the effective + container user. Falls back through ``getent passwd`` → ``/bin/bash`` → + ``/bin/sh``. Rejects ``nologin``/``false`` shells. + """ + user = exec_user + if user is None: + proc = _docker_exec_capture(container_id, ["id", "-un"]) + if proc.returncode == 0: + user = proc.stdout.strip() or None + + if user: + proc = _docker_exec_capture(container_id, ["getent", "passwd", user]) + if proc.returncode == 0: + line = proc.stdout.strip().splitlines()[0] if proc.stdout.strip() else "" + fields = line.split(":") + if len(fields) >= 7: + shell_path = fields[6].strip() + if shell_path: + base = os.path.basename(shell_path) + if base not in ("nologin", "false"): + return shell_path + + # Fallback: probe /bin/bash, then /bin/sh. + for candidate in ("/bin/bash", "/bin/sh"): + proc = _docker_exec_capture(container_id, ["test", "-x", candidate]) + if proc.returncode == 0: + return candidate + + # Last-ditch: return /bin/sh and let docker exec fail with a clear error. + return "/bin/sh" + + +# --------------------------------------------------------------------------- +# SSH socket discovery +# --------------------------------------------------------------------------- + + +def find_ssh_socket(container_id: str) -> str | None: + """Find the SSH agent socket path inside the container, or None.""" + # Step 1: inspect Config.Env for SSH_AUTH_SOCK. + try: + proc = subprocess.run( + [ + "docker", + "inspect", + container_id, + "--format", + "{{json .Config.Env}}", + ], + capture_output=True, + text=True, + check=False, + ) + except OSError: + proc = None + + if proc is not None and proc.returncode == 0 and proc.stdout.strip(): + try: + env_list = json.loads(proc.stdout.strip()) + except ValueError: + env_list = None + if isinstance(env_list, list): + for entry in env_list: + if isinstance(entry, str) and entry.startswith("SSH_AUTH_SOCK="): + value = entry.split("=", 1)[1] + if value: + return value + + # Step 2: probe /tmp/vscode-ssh-auth-*.sock (newest first). + proc = _docker_exec_capture( + container_id, + ["sh", "-c", "ls -t /tmp/vscode-ssh-auth*.sock 2>/dev/null | head -1"], + ) + candidate = proc.stdout.strip() if proc.returncode == 0 else "" + if candidate: + check = _docker_exec_capture(container_id, ["test", "-S", candidate]) + if check.returncode == 0: + return candidate + + return None + + +# --------------------------------------------------------------------------- +# Working-directory probe +# --------------------------------------------------------------------------- + + +def probe_workdir(container_id: str, candidate: str, fallback: str) -> str | None: + """Return the best-existing working dir, or ``None`` if neither exists.""" + proc = _docker_exec_capture(container_id, ["test", "-d", candidate]) + if proc.returncode == 0: + return candidate + if fallback and fallback != candidate: + print( + f"dcode: working directory {candidate} not found in container; " + f"falling back to {fallback}", + file=sys.stderr, + ) + proc = _docker_exec_capture(container_id, ["test", "-d", fallback]) + if proc.returncode == 0: + return fallback + return None + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def _resolve_exec_user(devcontainer_cfg: dict) -> str | None: + for key in ("remoteUser", "containerUser"): + v = devcontainer_cfg.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + return None + + +def _prompt_start_stopped(container_id: str, host_path: str | Path) -> bool: + """Prompt to start a stopped container and run ``docker start`` if accepted.""" + sys.stderr.write( + f"dcode: devcontainer for {host_path} is stopped. Start it now? [Y/n] " + ) + sys.stderr.flush() + + answer = sys.stdin.readline().strip().lower() + if answer not in ("", "y", "yes"): + print("dcode: aborted", file=sys.stderr) + return False + + short_id = container_id[:12] + print(f"dcode: starting container {short_id}...", file=sys.stderr) + try: + proc = subprocess.run( + ["docker", "start", container_id], + capture_output=True, + text=True, + check=False, + ) + except (FileNotFoundError, OSError) as exc: + print(f"dcode: failed to start container {short_id}: {exc}", file=sys.stderr) + return False + + if proc.returncode != 0: + detail = (proc.stderr or "").strip() + if not detail: + detail = (proc.stdout or "").strip() or f"exit code {proc.returncode}" + print(f"dcode: failed to start container {short_id}: {detail}", file=sys.stderr) + return False + + print("dcode: container started", file=sys.stderr) + return True + + +def run_shell(path: str, *, insiders: bool, shell_override: str | None) -> int: + """Open an interactive shell in the running devcontainer for ``path``. + + Returns an exit code suitable for ``sys.exit``. On success, replaces the + process via ``os.execvp`` (the explicit ``return 0`` is only reachable + when ``execvp`` is mocked in tests). + """ + target = Path(path).resolve() + + worktree = resolve_worktree(target) + if worktree is not None: + main_repo, rel_path = worktree + else: + main_repo = target + rel_path = None + + devcontainer_path = find_devcontainer(main_repo) + if devcontainer_path is None: + print( + f"dcode: no devcontainer.json found for {main_repo}; " + f"run `dcode doctor` to diagnose", + file=sys.stderr, + ) + return 1 + + devcontainer_cfg = _load_jsonc(devcontainer_path) + workspace_folder = get_workspace_folder(devcontainer_path, main_repo) + + lookup = find_container(str(main_repo), str(devcontainer_path)) + if lookup.state in ("running", "stopped") and not ( + sys.stdin.isatty() and sys.stdout.isatty() + ): + if lookup.state == "stopped": + print( + f"dcode: devcontainer for {main_repo} exists but is stopped — " + "run interactively to be prompted to start it, or run " + f"`dcode {path}` first", + file=sys.stderr, + ) + else: + print( + "dcode: dcode shell requires an interactive terminal", + file=sys.stderr, + ) + return 1 + + if lookup.state == "stopped": + container_id = lookup.id + if container_id is None: # pragma: no cover - defensive + print("dcode: container lookup returned no id", file=sys.stderr) + return 1 + if not _prompt_start_stopped(container_id, main_repo): + return 1 + if lookup.state == "missing": + print( + f"dcode: no devcontainer found running for {main_repo} — " + f"open in VS Code first (`dcode {path}`)", + file=sys.stderr, + ) + return 1 + if lookup.state == "ambiguous": + ids = ", ".join(lookup.ids) + print( + f"dcode: multiple devcontainers match {main_repo}: {ids} — " + f"please remove duplicates with `docker rm`", + file=sys.stderr, + ) + return 1 + if lookup.state == "docker_unavailable": + detail = lookup.detail or "unknown error" + print( + f"dcode: docker CLI not available — is Docker Desktop running? " + f"({detail})", + file=sys.stderr, + ) + return 1 + + container_id = lookup.id + if container_id is None: # pragma: no cover - defensive + print("dcode: container lookup returned no id", file=sys.stderr) + return 1 + + exec_user = _resolve_exec_user(devcontainer_cfg) + + if "remoteEnv" in devcontainer_cfg: + print( + "dcode: devcontainer remoteEnv is not applied to this shell yet; " + "environment may differ from VS Code terminal", + file=sys.stderr, + ) + + # Resolve shell. + if shell_override: + resolved = ResolvedShell(path=shell_override) + else: + profile = resolve_terminal_profile(main_repo, devcontainer_cfg, insiders) + if profile is not None: + resolved = profile + else: + shell_path = detect_login_shell(container_id, exec_user) + resolved = ResolvedShell(path=shell_path) + + # SSH agent socket forwarding. + ssh_sock = find_ssh_socket(container_id) + if ssh_sock is None: + print( + "dcode: SSH agent socket not found in container — SSH key auth " + "may not work (open in VS Code to enable forwarding)", + file=sys.stderr, + ) + + # Working directory probe. + if rel_path is not None: + candidate_workdir = f"{workspace_folder}/{rel_path.as_posix()}" + else: + candidate_workdir = workspace_folder + workdir = probe_workdir(container_id, candidate_workdir, workspace_folder) + + # Build argv. + argv: list[str] = ["docker", "exec", "-it"] + if exec_user: + argv.extend(["-u", exec_user]) + if workdir: + argv.extend(["-w", workdir]) + if ssh_sock: + argv.extend(["-e", f"SSH_AUTH_SOCK={ssh_sock}"]) + for k, v in resolved.env: + argv.extend(["-e", f"{k}={v}"]) + argv.append(container_id) + argv.append(resolved.path) + argv.extend(resolved.args) + + try: + os.execvp("docker", argv) + except OSError as exc: + print(f"dcode: failed to exec docker: {exc}", file=sys.stderr) + return 127 + + return 0 # only reached when os.execvp is mocked in tests diff --git a/src/dcode/wsl.py b/src/dcode/wsl.py index 041d3e7..082297d 100644 --- a/src/dcode/wsl.py +++ b/src/dcode/wsl.py @@ -54,7 +54,7 @@ def build_uri_wsl(host_path: str, workspace_folder: str) -> str: return f"vscode-remote://dev-container+{hex_payload}{workspace_folder}" -def _get_windows_vscode_settings_path(insiders: bool = False) -> Path | None: +def get_windows_vscode_settings_path(insiders: bool = False) -> Path | None: """Find the Windows-side VS Code settings.json from WSL.""" try: result = subprocess.run( @@ -205,3 +205,7 @@ def _print_wsl_hint() -> None: ' "dev.containers.executeInWSLDistro": ""', file=sys.stderr, ) + + +# Backwards-compatible alias for the previously-private helper. +_get_windows_vscode_settings_path = get_windows_vscode_settings_path diff --git a/tests/test_cli.py b/tests/test_cli.py index 4e6fe25..f6bfde9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -123,3 +123,139 @@ def test_path_named_doctor_workaround(self, monkeypatch): cli.main() m_run.assert_called_once_with("./doctor", insiders=False) m_doc.assert_not_called() + + +class TestShellDispatch: + """Dispatch tests for the `dcode shell` subcommand. + + NOTE: `run_shell` is lazy-imported inside `cli.main()` via + `from dcode.shell import run_shell`, so it MUST be patched at + `dcode.shell.run_shell` rather than `dcode.cli.run_shell`. + """ + + def test_shell_no_args(self, monkeypatch): + monkeypatch.setattr("sys.argv", ["dcode", "shell"]) + with ( + patch("dcode.shell.run_shell", return_value=0) as m_run, + pytest.raises(SystemExit) as exc, + ): + cli.main() + assert exc.value.code == 0 + m_run.assert_called_once_with(".", insiders=False, shell_override=None) + + def test_shell_with_path(self, monkeypatch): + monkeypatch.setattr("sys.argv", ["dcode", "shell", "./project"]) + with ( + patch("dcode.shell.run_shell", return_value=0) as m_run, + pytest.raises(SystemExit), + ): + cli.main() + m_run.assert_called_once_with("./project", insiders=False, shell_override=None) + + def test_shell_with_shell_override(self, monkeypatch): + monkeypatch.setattr("sys.argv", ["dcode", "shell", "--shell", "zsh"]) + with ( + patch("dcode.shell.run_shell", return_value=0) as m_run, + pytest.raises(SystemExit), + ): + cli.main() + m_run.assert_called_once_with(".", insiders=False, shell_override="zsh") + + def test_shell_with_path_and_shell_override(self, monkeypatch): + monkeypatch.setattr( + "sys.argv", ["dcode", "shell", "./path", "--shell", "bash"] + ) + with ( + patch("dcode.shell.run_shell", return_value=0) as m_run, + pytest.raises(SystemExit), + ): + cli.main() + m_run.assert_called_once_with("./path", insiders=False, shell_override="bash") + + def test_insiders_flag_before_shell(self, monkeypatch): + monkeypatch.setattr("sys.argv", ["dcode", "-i", "shell"]) + with ( + patch("dcode.shell.run_shell", return_value=0) as m_run, + pytest.raises(SystemExit), + ): + cli.main() + m_run.assert_called_once_with(".", insiders=True, shell_override=None) + + def test_insiders_flag_after_shell_rejected(self, monkeypatch, capsys): + # The shell subparser does NOT redeclare -i, so argparse rejects + # `dcode shell -i` as an unrecognized argument (exit code 2). + monkeypatch.setattr("sys.argv", ["dcode", "shell", "-i"]) + with ( + patch("dcode.shell.run_shell") as m_run, + pytest.raises(SystemExit) as exc, + ): + cli.main() + assert exc.value.code == 2 + m_run.assert_not_called() + + def test_shell_override_with_internal_whitespace_rejected( + self, monkeypatch, capsys + ): + monkeypatch.setattr("sys.argv", ["dcode", "shell", "--shell", "bash -l"]) + with ( + patch("dcode.shell.run_shell") as m_run, + pytest.raises(SystemExit) as exc, + ): + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "single executable" in err or "whitespace" in err.lower() + m_run.assert_not_called() + + def test_shell_override_with_leading_whitespace_rejected( + self, monkeypatch, capsys + ): + monkeypatch.setattr("sys.argv", ["dcode", "shell", "--shell", " zsh"]) + with ( + patch("dcode.shell.run_shell") as m_run, + pytest.raises(SystemExit) as exc, + ): + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "single executable" in err or "whitespace" in err.lower() + m_run.assert_not_called() + + def test_path_named_shell_workaround(self, monkeypatch): + # `dcode ./shell` must open a folder literally named 'shell', + # not dispatch to the shell subcommand. + monkeypatch.setattr("sys.argv", ["dcode", "./shell"]) + with patch("dcode.cli.run_dcode") as m_run: + cli.main() + m_run.assert_called_once_with("./shell", insiders=False) + + def test_looks_like_subcommand_recognizes_shell(self): + assert cli._looks_like_subcommand(["shell"]) is True + assert cli._looks_like_subcommand(["./shell"]) is False + + def test_top_level_help_mentions_shell(self, monkeypatch, capsys): + monkeypatch.setattr("sys.argv", ["dcode", "--help"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 0 + out = capsys.readouterr().out + assert "shell" in out + assert "dcode ./shell" in out + + def test_shell_subcommand_help(self, monkeypatch, capsys): + monkeypatch.setattr("sys.argv", ["dcode", "shell", "--help"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 0 + out = capsys.readouterr().out + assert "--shell" in out + assert "devcontainer" in out.lower() + + def test_shell_exit_code_forwarded(self, monkeypatch): + monkeypatch.setattr("sys.argv", ["dcode", "shell"]) + with ( + patch("dcode.shell.run_shell", return_value=127), + pytest.raises(SystemExit) as exc, + ): + cli.main() + assert exc.value.code == 127 diff --git a/tests/test_shell.py b/tests/test_shell.py new file mode 100644 index 0000000..89eeb2e --- /dev/null +++ b/tests/test_shell.py @@ -0,0 +1,964 @@ +"""Tests for dcode.shell.""" + +from __future__ import annotations + +import io +import json +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +from conftest import _make_worktree + +from dcode.shell import ( + ContainerLookup, + ResolvedShell, + _load_jsonc, + detect_login_shell, + find_container, + find_ssh_socket, + get_user_settings_path, + probe_workdir, + resolve_terminal_profile, + run_shell, +) + + +def _completed(rc: int = 0, stdout: str = "", stderr: str = "") -> SimpleNamespace: + """Return a stand-in for ``subprocess.CompletedProcess`` with fixed fields.""" + return SimpleNamespace(returncode=rc, stdout=stdout, stderr=stderr) + + +class _TTYStringIO(io.StringIO): + def __init__(self, value: str = "", *, isatty: bool = True): + super().__init__(value) + self._isatty = isatty + + def isatty(self) -> bool: + return self._isatty + + +# --------------------------------------------------------------------------- +# _load_jsonc +# --------------------------------------------------------------------------- + + +class TestLoadJsonc: + def test_missing_file_returns_empty_dict_silently(self, tmp_path, capsys): + result = _load_jsonc(tmp_path / "nope.json") + assert result == {} + assert capsys.readouterr().err == "" + + def test_valid_jsonc_with_comments_and_trailing_commas(self, tmp_path): + f = tmp_path / "settings.json" + f.write_text('// hi\n{\n "a": 1,\n "b": [1, 2,],\n}\n') + assert _load_jsonc(f) == {"a": 1, "b": [1, 2]} + + def test_top_level_array_warns_and_returns_empty(self, tmp_path, capsys): + f = tmp_path / "settings.json" + f.write_text("[1, 2]") + result = _load_jsonc(f) + assert result == {} + err = capsys.readouterr().err + assert str(f) in err + + def test_malformed_json_warns_and_returns_empty(self, tmp_path, capsys): + f = tmp_path / "settings.json" + f.write_text("{this is not json") + result = _load_jsonc(f) + assert result == {} + err = capsys.readouterr().err + assert str(f) in err + + def test_top_level_scalar_warns_and_returns_empty(self, tmp_path, capsys): + f = tmp_path / "settings.json" + f.write_text("42") + result = _load_jsonc(f) + assert result == {} + err = capsys.readouterr().err + assert str(f) in err + + +# --------------------------------------------------------------------------- +# find_container +# --------------------------------------------------------------------------- + + +class TestFindContainer: + def _patch_run(self, results): + """Return a MagicMock that returns successive results from `results`.""" + m = MagicMock(side_effect=results) + return patch("dcode.shell.subprocess.run", m), m + + def test_two_label_hit_running(self): + results = [_completed(0, "abc123\n", "")] + ctx, m = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result == ContainerLookup(state="running", id="abc123") + assert m.call_count == 1 + # Two filters in argv: + argv = m.call_args_list[0].args[0] + assert argv.count("--filter") == 2 + + def test_single_label_fallback_uses_one_filter(self): + results = [_completed(0, "", ""), _completed(0, "deadbeef\n", "")] + ctx, m = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result.state == "running" + assert result.id == "deadbeef" + argv2 = m.call_args_list[1].args[0] + assert argv2.count("--filter") == 1 + + def test_stopped_container_via_dash_a(self): + results = [ + _completed(0, "", ""), + _completed(0, "", ""), + _completed(0, "stopped1\nstopped2\n", ""), + ] + ctx, m = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result.state == "stopped" + assert result.id == "stopped1" + assert result.ids == ("stopped1", "stopped2") + # Verify third call had -a: + argv3 = m.call_args_list[2].args[0] + assert "-a" in argv3 + + def test_missing_when_no_results_anywhere(self): + results = [_completed(0, "", "")] * 3 + ctx, _ = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result == ContainerLookup(state="missing") + + def test_ambiguous_when_two_label_returns_multiple(self): + results = [_completed(0, "id1\nid2\nid3\n", "")] + ctx, _ = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result.state == "ambiguous" + assert result.ids == ("id1", "id2", "id3") + + def test_docker_unavailable_when_file_not_found(self): + ctx, _ = self._patch_run([FileNotFoundError("docker not found")]) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result.state == "docker_unavailable" + assert result.detail and "docker" in result.detail.lower() + + def test_docker_unavailable_when_nonzero_returncode(self): + results = [_completed(1, "", "Cannot connect to the Docker daemon")] + ctx, _ = self._patch_run(results) + with patch("dcode.shell.is_wsl", return_value=False), ctx: + result = find_container("/host/proj", "/host/proj/.devcontainer/devcontainer.json") + assert result.state == "docker_unavailable" + assert result.detail and "Docker daemon" in result.detail + + def test_wsl_converts_both_paths_for_label_filters(self): + results = [_completed(0, "wid\n", "")] + ctx, m = self._patch_run(results) + with ( + patch("dcode.shell.is_wsl", return_value=True), + patch("dcode.shell._wsl_to_windows_path", side_effect=lambda p: f"WIN({p})"), + ctx, + ): + result = find_container("/h/proj", "/h/proj/.devcontainer/devcontainer.json") + assert result.state == "running" + argv = m.call_args_list[0].args[0] + joined = " ".join(argv) + assert "label=devcontainer.local_folder=WIN(/h/proj)" in joined + assert "label=devcontainer.config_file=WIN(/h/proj/.devcontainer/devcontainer.json)" in joined + + +# --------------------------------------------------------------------------- +# get_user_settings_path +# --------------------------------------------------------------------------- + + +class TestGetUserSettingsPath: + def test_macos_default(self, monkeypatch): + monkeypatch.setattr("dcode.shell.platform.system", lambda: "Darwin") + with patch("dcode.shell.is_wsl", return_value=False): + p = get_user_settings_path(insiders=False) + assert p == Path.home() / "Library" / "Application Support" / "Code" / "User" / "settings.json" + + def test_macos_insiders(self, monkeypatch): + monkeypatch.setattr("dcode.shell.platform.system", lambda: "Darwin") + with patch("dcode.shell.is_wsl", return_value=False): + p = get_user_settings_path(insiders=True) + assert p is not None + assert "Code - Insiders" in str(p) + + def test_linux_no_xdg(self, monkeypatch): + monkeypatch.setattr("dcode.shell.platform.system", lambda: "Linux") + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + with patch("dcode.shell.is_wsl", return_value=False): + p = get_user_settings_path(insiders=False) + assert p == Path.home() / ".config" / "Code" / "User" / "settings.json" + + def test_linux_with_xdg(self, monkeypatch): + monkeypatch.setattr("dcode.shell.platform.system", lambda: "Linux") + monkeypatch.setenv("XDG_CONFIG_HOME", "/custom") + with patch("dcode.shell.is_wsl", return_value=False): + p = get_user_settings_path(insiders=False) + assert p == Path("/custom") / "Code" / "User" / "settings.json" + + def test_wsl_delegates_to_windows_helper(self): + sentinel = Path("/mnt/c/Users/me/AppData/Roaming/Code/User/settings.json") + with ( + patch("dcode.shell.is_wsl", return_value=True), + patch("dcode.shell.get_windows_vscode_settings_path", return_value=sentinel) as m, + ): + p = get_user_settings_path(insiders=True) + assert p == sentinel + m.assert_called_once_with(True) + + def test_returns_path_even_when_not_existing(self, monkeypatch): + monkeypatch.setattr("dcode.shell.platform.system", lambda: "Linux") + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + with patch("dcode.shell.is_wsl", return_value=False): + p = get_user_settings_path(insiders=False) + # Path is returned regardless of existence. + assert p is not None + + +# --------------------------------------------------------------------------- +# resolve_terminal_profile +# --------------------------------------------------------------------------- + + +class TestResolveTerminalProfile: + def _setup(self, tmp_path, user, workspace): + """Write user + workspace settings; return main_repo path.""" + user_path = tmp_path / "user-settings.json" + user_path.write_text(json.dumps(user)) + main_repo = tmp_path / "proj" + (main_repo / ".vscode").mkdir(parents=True) + (main_repo / ".vscode" / "settings.json").write_text(json.dumps(workspace)) + return main_repo, user_path + + def test_workspace_beats_devcontainer_beats_user(self, tmp_path): + user = { + "terminal.integrated.defaultProfile.linux": "user-shell", + "terminal.integrated.profiles.linux": {"user-shell": {"path": "/u"}}, + } + dc_cfg = { + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.defaultProfile.linux": "dc-shell", + "terminal.integrated.profiles.linux": {"dc-shell": {"path": "/d"}}, + } + } + } + } + workspace = { + "terminal.integrated.defaultProfile.linux": "ws-shell", + "terminal.integrated.profiles.linux": {"ws-shell": {"path": "/w"}}, + } + main_repo, user_path = self._setup(tmp_path, user, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, dc_cfg, insiders=False) + assert r == ResolvedShell(path="/w") + + def test_deep_merge_across_layers(self, tmp_path): + user = {"terminal.integrated.profiles.linux": {"alpha": {"path": "/a"}}} + dc_cfg = { + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.profiles.linux": {"beta": {"path": "/b"}} + } + } + } + } + workspace = { + "terminal.integrated.defaultProfile.linux": "alpha", + "terminal.integrated.profiles.linux": {"gamma": {"path": "/g"}}, + } + main_repo, user_path = self._setup(tmp_path, user, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, dc_cfg, insiders=False) + # alpha was defined only in user layer; merge preserves it. + assert r == ResolvedShell(path="/a") + + def test_null_at_higher_layer_deletes_profile(self, tmp_path): + user = { + "terminal.integrated.defaultProfile.linux": "alpha", + "terminal.integrated.profiles.linux": {"alpha": {"path": "/a"}}, + } + workspace = {"terminal.integrated.profiles.linux": {"alpha": None}} + main_repo, user_path = self._setup(tmp_path, user, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is None + + def test_default_pointing_to_missing_profile_returns_none(self, tmp_path): + user = {} + workspace = {"terminal.integrated.defaultProfile.linux": "foo"} + main_repo, user_path = self._setup(tmp_path, user, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is None + + def test_default_pointing_to_null_profile_returns_none(self, tmp_path): + user = { + "terminal.integrated.defaultProfile.linux": "foo", + "terminal.integrated.profiles.linux": {"foo": None}, + } + workspace = {} + main_repo, user_path = self._setup(tmp_path, user, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is None + + def test_path_as_list_uses_first_entry(self, tmp_path): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": {"a": {"path": ["/first", "/second"]}}, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is not None + assert r.path == "/first" + + def test_bare_name_returned_as_is(self, tmp_path): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": {"a": {"path": "zsh"}}, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is not None + assert r.path == "zsh" + + def test_args_become_tuple(self, tmp_path): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": {"a": {"path": "/bin/zsh", "args": ["-l"]}}, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is not None + assert r.args == ("-l",) + + def test_env_becomes_tuple_of_tuples(self, tmp_path): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": { + "a": {"path": "/bin/zsh", "env": {"FOO": "bar"}} + }, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is not None + assert r.env == (("FOO", "bar"),) + + def test_substitution_warning_emitted_at_most_once(self, tmp_path, capsys): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": { + "a": { + "path": "/bin/zsh", + "args": ["${env:FOO}", "${env:BAR}"], + "env": {"X": "${env:Y}"}, + } + }, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is not None + # Substitution values passed through verbatim. + assert r.args == ("${env:FOO}", "${env:BAR}") + assert r.env == (("X", "${env:Y}"),) + err = capsys.readouterr().err + # Single warning line — count by line-prefix to avoid matching the word + # "substitution" twice within the message body itself. + assert err.count("dcode: terminal profile contains") == 1 + + def test_profile_without_path_returns_none(self, tmp_path): + workspace = { + "terminal.integrated.defaultProfile.linux": "a", + "terminal.integrated.profiles.linux": {"a": {"args": ["-l"]}}, + } + main_repo, user_path = self._setup(tmp_path, {}, workspace) + with patch("dcode.shell.get_user_settings_path", return_value=user_path): + r = resolve_terminal_profile(main_repo, {}, insiders=False) + assert r is None + + +# --------------------------------------------------------------------------- +# detect_login_shell +# --------------------------------------------------------------------------- + + +class TestDetectLoginShell: + def test_getent_returns_zsh(self): + results = [_completed(0, "node:x:1000:1000::/home/node:/bin/zsh\n", "")] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert detect_login_shell("cid", "node") == "/bin/zsh" + + def test_nologin_falls_through_to_bash(self): + results = [ + _completed(0, "svc:x:0:0::/:/usr/sbin/nologin\n", ""), + _completed(0, "", ""), # /bin/bash test -x + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert detect_login_shell("cid", "svc") == "/bin/bash" + + def test_false_shell_falls_through_to_bash(self): + results = [ + _completed(0, "svc:x:0:0::/:/bin/false\n", ""), + _completed(0, "", ""), + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert detect_login_shell("cid", "svc") == "/bin/bash" + + def test_getent_failure_uses_bash_when_available(self): + results = [ + _completed(2, "", "no such user"), + _completed(0, "", ""), # /bin/bash exists + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert detect_login_shell("cid", "ghost") == "/bin/bash" + + def test_no_bash_falls_through_to_sh(self): + results = [ + _completed(2, "", ""), # getent fails + _completed(1, "", ""), # /bin/bash test -x fails + _completed(0, "", ""), # /bin/sh test -x ok + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert detect_login_shell("cid", "x") == "/bin/sh" + + def test_exec_user_none_invokes_id_un_first(self): + results = [ + _completed(0, "vscode\n", ""), # id -un + _completed(0, "vscode:x:1000:1000::/home/vscode:/bin/zsh\n", ""), + ] + m = MagicMock(side_effect=results) + with patch("dcode.shell.subprocess.run", m): + assert detect_login_shell("cid", None) == "/bin/zsh" + # First call must be id -un: + assert m.call_args_list[0].args[0][-2:] == ["id", "-un"] + # Second call must include `getent passwd vscode`: + argv2 = m.call_args_list[1].args[0] + assert argv2[-3:] == ["getent", "passwd", "vscode"] + + +# --------------------------------------------------------------------------- +# find_ssh_socket +# --------------------------------------------------------------------------- + + +class TestFindSshSocket: + def test_found_via_inspect_env(self): + env_json = json.dumps(["FOO=bar", "SSH_AUTH_SOCK=/host/sock"]) + results = [_completed(0, env_json + "\n", "")] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") == "/host/sock" + + def test_inspect_empty_then_ls_single_path_with_socket(self): + results = [ + _completed(0, "[]\n", ""), + _completed(0, "/tmp/vscode-ssh-auth-1.sock\n", ""), # ls -t + _completed(0, "", ""), # test -S ok + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") == "/tmp/vscode-ssh-auth-1.sock" + + def test_ls_multiline_uses_first(self): + # ls -t output is already piped to head -1 on the container side, so + # only the first line is returned by stdout in practice. Simulate that: + results = [ + _completed(0, "[]\n", ""), + _completed(0, "/tmp/vscode-ssh-auth-newer.sock\n", ""), + _completed(0, "", ""), + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") == "/tmp/vscode-ssh-auth-newer.sock" + + def test_ls_empty_returns_none(self): + results = [ + _completed(0, "[]\n", ""), + _completed(0, "", ""), # nothing matched + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") is None + + def test_ls_path_but_not_socket_returns_none(self): + results = [ + _completed(0, "[]\n", ""), + _completed(0, "/tmp/vscode-ssh-auth-x.sock\n", ""), + _completed(1, "", ""), # test -S fails + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") is None + + def test_inspect_malformed_json_falls_through(self): + results = [ + _completed(0, "not json at all\n", ""), + _completed(0, "", ""), # ls produces nothing + ] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert find_ssh_socket("cid") is None + + +# --------------------------------------------------------------------------- +# probe_workdir +# --------------------------------------------------------------------------- + + +class TestProbeWorkdir: + def test_candidate_exists(self, capsys): + with patch("dcode.shell.subprocess.run", side_effect=[_completed(0, "", "")]): + assert probe_workdir("cid", "/workspaces/proj/sub", "/workspaces/proj") == "/workspaces/proj/sub" + assert capsys.readouterr().err == "" + + def test_candidate_missing_fallback_succeeds(self, capsys): + results = [_completed(1, "", ""), _completed(0, "", "")] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert probe_workdir("cid", "/workspaces/proj/sub", "/workspaces/proj") == "/workspaces/proj" + err = capsys.readouterr().err + assert "/workspaces/proj/sub" in err + + def test_both_fail_returns_none(self): + results = [_completed(1, "", ""), _completed(1, "", "")] + with patch("dcode.shell.subprocess.run", side_effect=results): + assert probe_workdir("cid", "/c", "/f") is None + + +# --------------------------------------------------------------------------- +# run_shell — orchestration helpers + tests +# --------------------------------------------------------------------------- + + +def _make_project(tmp_path: Path, devcontainer_text: str = "{}") -> Path: + main_repo = tmp_path / "proj" + (main_repo / ".devcontainer").mkdir(parents=True) + (main_repo / ".devcontainer" / "devcontainer.json").write_text(devcontainer_text) + return main_repo + + +class _RunShellHarness: + """Patches all subprocess-touching helpers to safe defaults for run_shell.""" + + def __init__( + self, + *, + container_id: str = "cid123", + ssh_sock: str | None = "/host/ssh.sock", + workdir: str | None = "/workspaces/proj", + profile: ResolvedShell | None = None, + login_shell: str = "/bin/bash", + isatty: bool = True, + execvp_side_effect=None, + ): + self.container_id = container_id + self.ssh_sock = ssh_sock + self.workdir = workdir + self.profile = profile + self.login_shell = login_shell + self.isatty = isatty + self.execvp = MagicMock(side_effect=execvp_side_effect) + + def __enter__(self): + self._patches = [ + patch( + "dcode.shell.find_container", + return_value=ContainerLookup(state="running", id=self.container_id), + ), + patch("dcode.shell.find_ssh_socket", return_value=self.ssh_sock), + patch("dcode.shell.probe_workdir", return_value=self.workdir), + patch("dcode.shell.resolve_terminal_profile", return_value=self.profile), + patch("dcode.shell.detect_login_shell", return_value=self.login_shell), + patch("dcode.shell.os.execvp", self.execvp), + patch("sys.stdin"), + patch("sys.stdout"), + ] + self._opened = [p.start() for p in self._patches] + # isatty configuration: + import sys as _sys + _sys.stdin.isatty = MagicMock(return_value=self.isatty) + _sys.stdout.isatty = MagicMock(return_value=self.isatty) + return self + + def __exit__(self, *exc): + for p in self._patches: + p.stop() + + +class TestRunShell: + def test_happy_path_argv(self, tmp_path): + proj = _make_project(tmp_path, '{"workspaceFolder": "/workspaces/proj"}') + with _RunShellHarness(workdir="/workspaces/proj") as h: + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc == 0 + h.execvp.assert_called_once() + argv = h.execvp.call_args.args[1] + assert argv[:3] == ["docker", "exec", "-it"] + assert argv[-2:] == ["cid123", "/bin/bash"] + + def test_remote_user_adds_u_flag(self, tmp_path): + proj = _make_project(tmp_path, '{"remoteUser": "node"}') + with _RunShellHarness() as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + i = argv.index("-u") + assert argv[i + 1] == "node" + + def test_container_user_used_when_no_remote_user(self, tmp_path): + proj = _make_project(tmp_path, '{"containerUser": "vscode"}') + with _RunShellHarness() as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert "vscode" in argv + + def test_no_user_when_neither_present(self, tmp_path): + proj = _make_project(tmp_path, "{}") + with _RunShellHarness() as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert "-u" not in argv + + def test_workdir_present(self, tmp_path): + proj = _make_project(tmp_path) + with _RunShellHarness(workdir="/workspaces/proj") as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + i = argv.index("-w") + assert argv[i + 1] == "/workspaces/proj" + + def test_no_workdir_flag_when_probe_returns_none(self, tmp_path): + proj = _make_project(tmp_path) + with _RunShellHarness(workdir=None) as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert "-w" not in argv + + def test_ssh_socket_forwarded(self, tmp_path): + proj = _make_project(tmp_path) + with _RunShellHarness(ssh_sock="/host/ssh.sock") as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert "SSH_AUTH_SOCK=/host/ssh.sock" in argv + + def test_no_ssh_socket_warns(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with _RunShellHarness(ssh_sock=None) as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert not any(a.startswith("SSH_AUTH_SOCK=") for a in argv) + err = capsys.readouterr().err + assert "VS Code" in err and "SSH" in err + + def test_profile_env_in_argv(self, tmp_path): + proj = _make_project(tmp_path) + prof = ResolvedShell(path="/bin/zsh", env=(("FOO", "bar"), ("BAZ", "qux"))) + with _RunShellHarness(profile=prof) as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert "FOO=bar" in argv + assert "BAZ=qux" in argv + + def test_profile_args_appended(self, tmp_path): + proj = _make_project(tmp_path) + prof = ResolvedShell(path="/bin/zsh", args=("-l", "-i")) + with _RunShellHarness(profile=prof) as h: + run_shell(str(proj), insiders=False, shell_override=None) + argv = h.execvp.call_args.args[1] + assert argv[-3:] == ["/bin/zsh", "-l", "-i"] + # container id immediately precedes shell path + assert argv[-4] == "cid123" + + def test_shell_override_uses_path_with_no_args_or_env(self, tmp_path): + proj = _make_project(tmp_path) + with _RunShellHarness() as h: + run_shell(str(proj), insiders=False, shell_override="/bin/fish") + argv = h.execvp.call_args.args[1] + assert argv[-1] == "/bin/fish" + # No extra env from a profile since override skips profile resolution. + # SSH/etc may still add -e SSH_AUTH_SOCK=...; that's fine. + + def test_execvp_oserror_returns_127(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with _RunShellHarness(execvp_side_effect=OSError("boom")) as h: + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc == 127 + assert "failed to exec docker" in capsys.readouterr().err + h.execvp.assert_called_once() + + def test_execvp_mocked_returns_zero(self, tmp_path): + proj = _make_project(tmp_path) + with _RunShellHarness(): + assert run_shell(str(proj), insiders=False, shell_override=None) == 0 + + def test_non_tty_returns_nonzero_and_no_execvp(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with _RunShellHarness(isatty=False) as h: + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + h.execvp.assert_not_called() + assert "interactive terminal" in capsys.readouterr().err + + def test_tty_check_after_container_lookup(self, tmp_path): + """find_container is called even when TTY check would fail.""" + proj = _make_project(tmp_path) + find_mock = MagicMock( + return_value=ContainerLookup(state="running", id="cid") + ) + with ( + patch("dcode.shell.find_container", find_mock), + patch("dcode.shell.find_ssh_socket", return_value=None), + patch("dcode.shell.probe_workdir", return_value=None), + patch("dcode.shell.resolve_terminal_profile", return_value=None), + patch("dcode.shell.detect_login_shell", return_value="/bin/sh"), + patch("dcode.shell.os.execvp"), + patch("sys.stdin") as stdin, + patch("sys.stdout") as stdout, + ): + stdin.isatty = MagicMock(return_value=False) + stdout.isatty = MagicMock(return_value=True) + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + find_mock.assert_called_once() + + def test_worktree_uses_main_repo_for_lookup(self, tmp_path): + main_repo, worktree = _make_worktree(tmp_path) + # devcontainer lives in main repo + (main_repo / ".devcontainer").mkdir() + (main_repo / ".devcontainer" / "devcontainer.json").write_text( + '{"workspaceFolder": "/workspaces/main-repo"}' + ) + # Target is the worktree root itself (it has the gitdir pointer file). + target = worktree + + find_mock = MagicMock( + return_value=ContainerLookup(state="running", id="cid") + ) + probe_mock = MagicMock(return_value="/workspaces/main-repo") + with ( + patch("dcode.shell.find_container", find_mock), + patch("dcode.shell.find_ssh_socket", return_value=None), + patch("dcode.shell.probe_workdir", probe_mock), + patch("dcode.shell.resolve_terminal_profile", return_value=None), + patch("dcode.shell.detect_login_shell", return_value="/bin/sh"), + patch("dcode.shell.os.execvp"), + patch("sys.stdin") as stdin, + patch("sys.stdout") as stdout, + ): + stdin.isatty = MagicMock(return_value=True) + stdout.isatty = MagicMock(return_value=True) + run_shell(str(target), insiders=False, shell_override=None) + + # find_container received the MAIN repo path, not the worktree. + host_arg = find_mock.call_args.args[0] + assert host_arg == str(main_repo.resolve()) + + # probe_workdir candidate is workspaceFolder / rel_path (URI-style). + candidate = probe_mock.call_args.args[1] + assert candidate == "/workspaces/main-repo/.worktrees/pr-34" + + +class TestRunShellStoppedPrompt: + def _run_stopped( + self, + tmp_path, + monkeypatch, + answer: str, + *, + isatty: bool = True, + start_rc: int = 0, + start_stderr: str = "", + ): + proj = _make_project(tmp_path) + stdout = _TTYStringIO(isatty=isatty) + monkeypatch.setattr("sys.stdin", _TTYStringIO(answer, isatty=isatty)) + monkeypatch.setattr("sys.stdout", stdout) + + start = MagicMock(return_value=_completed(start_rc, "abc123\n", start_stderr)) + execvp = MagicMock() + with ( + patch( + "dcode.shell.find_container", + return_value=ContainerLookup( + state="stopped", id="abc123", ids=("abc123",) + ), + ), + patch("dcode.shell.subprocess.run", start), + patch("dcode.shell.find_ssh_socket", return_value="/host/ssh.sock"), + patch("dcode.shell.probe_workdir", return_value="/workspaces/proj"), + patch("dcode.shell.resolve_terminal_profile", return_value=None), + patch("dcode.shell.detect_login_shell", return_value="/bin/bash"), + patch("dcode.shell.os.execvp", execvp), + ): + rc = run_shell(str(proj), insiders=False, shell_override=None) + + return SimpleNamespace(rc=rc, start=start, execvp=execvp, stdout=stdout) + + def test_y_starts_container_then_execs(self, tmp_path, monkeypatch, capsys): + result = self._run_stopped(tmp_path, monkeypatch, "y\n") + + assert result.rc == 0 + result.start.assert_called_once_with( + ["docker", "start", "abc123"], + capture_output=True, + text=True, + check=False, + ) + result.execvp.assert_called_once() + err = capsys.readouterr().err + assert "Start it now? [Y/n]" in err + assert "starting container abc123" in err + assert "container started" in err + + def test_enter_defaults_to_yes(self, tmp_path, monkeypatch): + result = self._run_stopped(tmp_path, monkeypatch, "\n") + + assert result.rc == 0 + result.start.assert_called_once() + result.execvp.assert_called_once() + + def test_yes_word_is_case_insensitive(self, tmp_path, monkeypatch): + result = self._run_stopped(tmp_path, monkeypatch, "YeS\n") + + assert result.rc == 0 + result.start.assert_called_once() + result.execvp.assert_called_once() + + def test_n_aborts_without_start_or_exec(self, tmp_path, monkeypatch, capsys): + result = self._run_stopped(tmp_path, monkeypatch, "n\n") + + assert result.rc != 0 + result.start.assert_not_called() + result.execvp.assert_not_called() + assert "aborted" in capsys.readouterr().err + + def test_no_aborts_without_start_or_exec(self, tmp_path, monkeypatch, capsys): + result = self._run_stopped(tmp_path, monkeypatch, "no\n") + + assert result.rc != 0 + result.start.assert_not_called() + result.execvp.assert_not_called() + assert "aborted" in capsys.readouterr().err + + def test_docker_start_failure_includes_stderr( + self, tmp_path, monkeypatch, capsys + ): + result = self._run_stopped( + tmp_path, + monkeypatch, + "y\n", + start_rc=1, + start_stderr="some docker error", + ) + + assert result.rc != 0 + result.start.assert_called_once() + result.execvp.assert_not_called() + err = capsys.readouterr().err + assert "failed to start container abc123" in err + assert "some docker error" in err + + def test_non_tty_stopped_does_not_prompt_or_start( + self, tmp_path, monkeypatch, capsys + ): + result = self._run_stopped(tmp_path, monkeypatch, "y\n", isatty=False) + + assert result.rc != 0 + result.start.assert_not_called() + result.execvp.assert_not_called() + err = capsys.readouterr().err + assert "run interactively to be prompted to start it" in err + assert "Start it now? [Y/n]" not in err + + def test_prompt_is_written_to_stderr_not_stdout( + self, tmp_path, monkeypatch, capsys + ): + result = self._run_stopped(tmp_path, monkeypatch, "y\n") + + err = capsys.readouterr().err + assert "Start it now? [Y/n]" in err + assert result.stdout.getvalue() == "" + + +# --------------------------------------------------------------------------- +# run_shell — error paths +# --------------------------------------------------------------------------- + + +class TestRunShellErrors: + def _run_with(self, lookup_state, *, detail=None, ids=()): + lookup = ContainerLookup( + state=lookup_state, + id=ids[0] if ids else None, + ids=ids, + detail=detail, + ) + return patch("dcode.shell.find_container", return_value=lookup) + + def test_missing_devcontainer(self, tmp_path, capsys): + # tmp_path has no .devcontainer at all + proj = tmp_path / "empty" + proj.mkdir() + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + assert "dcode doctor" in capsys.readouterr().err + + def test_state_missing_message(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with self._run_with("missing"): + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + err = capsys.readouterr().err + assert "no devcontainer found running" in err + + def test_state_stopped_non_tty_message(self, tmp_path, capsys, monkeypatch): + proj = _make_project(tmp_path) + monkeypatch.setattr("sys.stdin", _TTYStringIO(isatty=False)) + monkeypatch.setattr("sys.stdout", _TTYStringIO(isatty=False)) + with self._run_with("stopped", ids=("abc",)): + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + err = capsys.readouterr().err + assert "run interactively to be prompted to start it" in err + assert f"dcode {proj}" in err + + def test_state_ambiguous_lists_ids(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with self._run_with("ambiguous", ids=("id1", "id2")): + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + err = capsys.readouterr().err + assert "id1" in err and "id2" in err + + def test_state_docker_unavailable_includes_detail(self, tmp_path, capsys): + proj = _make_project(tmp_path) + with self._run_with("docker_unavailable", detail="cannot connect"): + rc = run_shell(str(proj), insiders=False, shell_override=None) + assert rc != 0 + err = capsys.readouterr().err + assert "Docker" in err + assert "cannot connect" in err + + def test_remote_env_warning_fires_when_present(self, tmp_path, capsys): + proj = _make_project(tmp_path, '{"remoteEnv": {}}') + with _RunShellHarness(): + run_shell(str(proj), insiders=False, shell_override=None) + err = capsys.readouterr().err + assert "remoteEnv" in err and "not applied" in err + + def test_remote_env_warning_silent_when_absent(self, tmp_path, capsys): + proj = _make_project(tmp_path, "{}") + with _RunShellHarness(): + run_shell(str(proj), insiders=False, shell_override=None) + err = capsys.readouterr().err + assert "remoteEnv" not in err