diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fc6d22d..bbd1973 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -172,6 +172,25 @@ jobs: PYTHON_MANAGER_CONFIG: .\test-config.json PYMANAGER_DEBUG: true + - name: 'Validate entrypoint script' + run: | + $env:PYTHON_MANAGER_CONFIG = (gi $env:PYTHON_MANAGER_CONFIG).FullName + cd .\test_installs\_bin + del pip* -Verbose + pymanager install --refresh + dir pip* + Get-Item .\pip.exe + Get-Item .\pip.exe.__target__ + Get-Content .\pip.exe.__target__ + Get-Item .\pip.exe.__script__.py + Get-Content .\pip.exe.__script__.py + .\pip.exe --version + env: + PYTHON_MANAGER_INCLUDE_UNMANAGED: false + PYTHON_MANAGER_CONFIG: .\test-config.json + PYMANAGER_DEBUG: true + shell: powershell + - name: 'Offline bundle download and install' run: | pymanager list --online 3 3-32 3-64 3-arm64 diff --git a/src/manage/aliasutils.py b/src/manage/aliasutils.py new file mode 100644 index 0000000..e76e777 --- /dev/null +++ b/src/manage/aliasutils.py @@ -0,0 +1,371 @@ +import os + +from .exceptions import FilesInUseError, NoLauncherTemplateError +from .fsutils import atomic_unlink, ensure_tree, unlink +from .logging import LOGGER +from .pathutils import Path +from .tagutils import install_matches_any + +_EXE = ".exe".casefold() + +DEFAULT_SITE_DIRS = ["Lib\\site-packages", "Scripts"] + +SCRIPT_CODE = """import sys + +# Clear sys.path[0] if it contains this script. +# Be careful to use the most compatible Python code possible. +try: + if sys.path[0]: + if sys.argv[0].startswith(sys.path[0]): + sys.path[0] = "" + else: + open(sys.path[0] + "/" + sys.argv[0], "rb").close() + sys.path[0] = "" +except OSError: + pass +except AttributeError: + pass +except IndexError: + pass + +# Replace argv[0] with our executable instead of the script name. +try: + if sys.argv[0][-14:].upper() == ".__SCRIPT__.PY": + sys.argv[0] = sys.argv[0][:-14] + sys.orig_argv[0] = sys.argv[0] +except AttributeError: + pass +except IndexError: + pass + +from {mod} import {func} +sys.exit({func}()) +""" + + +class AliasInfo: + def __init__(self, **kwargs): + self.install = kwargs.get("install") + self.name = kwargs.get("name") + self.windowed = kwargs.get("windowed", 0) + self.target = kwargs.get("target") + self.mod = kwargs.get("mod") + self.func = kwargs.get("func") + + def replace(self, **kwargs): + return AliasInfo(**{ + "install": self.install, + "name": self.name, + "windowed": self.windowed, + "target": self.target, + "mod": self.mod, + "func": self.func, + **kwargs, + }) + + @property + def script_code(self): + if self.mod and self.func: + if not all(s.isidentifier() for s in self.mod.split(".")): + LOGGER.warn("Alias %s has an entrypoint with invalid module " + "%r.", self.name, self.mod) + return None + if not all(s.isidentifier() for s in self.func.split(".")): + LOGGER.warn("Alias %s has an entrypoint with invalid function " + "%r.", self.name, self.func) + return None + return SCRIPT_CODE.format(mod=self.mod, func=self.func) + + +def _if_exists(launcher, plat): + suffix = "." + launcher.suffix.lstrip(".") + plat_launcher = launcher.parent / f"{launcher.stem}{plat}{suffix}" + if plat_launcher.is_file(): + return plat_launcher + return launcher + + +def _create_alias(cmd, *, name, target, plat=None, windowed=0, script_code=None, _link=os.link): + p = cmd.global_dir / name + if not p.match("*.exe"): + p = p.with_name(p.name + ".exe") + if not isinstance(target, Path): + target = Path(target) + ensure_tree(p) + launcher = cmd.launcher_exe + if windowed: + launcher = cmd.launcherw_exe or launcher + + if plat: + LOGGER.debug("Checking for launcher for platform -%s", plat) + launcher = _if_exists(launcher, f"-{plat}") + if not launcher.is_file(): + LOGGER.debug("Checking for launcher for default platform %s", cmd.default_platform) + launcher = _if_exists(launcher, cmd.default_platform) + if not launcher.is_file(): + LOGGER.debug("Checking for launcher for -64") + launcher = _if_exists(launcher, "-64") + LOGGER.debug("Create %s linking to %s using %s", name, target, launcher) + if not launcher or not launcher.is_file(): + raise NoLauncherTemplateError() + + try: + launcher_bytes = launcher.read_bytes() + except OSError: + warnings_shown = cmd.scratch.setdefault("aliasutils.create_alias.warnings_shown", set()) + if str(launcher) not in warnings_shown: + LOGGER.warn("Failed to read launcher template at %s.", launcher) + warnings_shown.add(str(launcher)) + LOGGER.debug("Failed to read %s", launcher, exc_info=True) + return + + existing_bytes = b'' + try: + with open(p, 'rb') as f: + existing_bytes = f.read(len(launcher_bytes) + 1) + except FileNotFoundError: + pass + except OSError: + LOGGER.debug("Failed to read existing alias launcher.") + + launcher_remap = cmd.scratch.setdefault("aliasutils.create_alias.launcher_remap", {}) + if existing_bytes == launcher_bytes: + # Valid existing launcher, so save its path in case we need it later + # for a hard link. + launcher_remap.setdefault(launcher.name, p) + else: + # First try and create a hard link + unlink(p) + try: + _link(launcher, p) + LOGGER.debug("Created %s as hard link to %s", p.name, launcher.name) + except OSError as ex: + if ex.winerror != 17: + # Report errors other than cross-drive links + LOGGER.debug("Failed to create hard link for command.", exc_info=True) + launcher2 = launcher_remap.get(launcher.name) + if launcher2: + try: + _link(launcher2, p) + LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name) + except FileNotFoundError: + raise + except OSError: + LOGGER.debug("Failed to create hard link to fallback launcher") + launcher2 = None + if not launcher2: + try: + p.write_bytes(launcher_bytes) + LOGGER.debug("Created %s as copy of %s", p.name, launcher.name) + launcher_remap[launcher.name] = p + except OSError: + LOGGER.error("Failed to create global command %s.", name) + LOGGER.debug("TRACEBACK", exc_info=True) + + p_target = p.with_name(p.name + ".__target__") + do_update = True + try: + do_update = not target.match(p_target.read_text(encoding="utf-8")) + except FileNotFoundError: + pass + except (OSError, UnicodeDecodeError): + LOGGER.debug("Failed to read existing target path.", exc_info=True) + + if do_update: + p_target.write_text(str(target), encoding="utf-8") + + p_script = p.with_name(p.name + ".__script__.py") + if script_code: + do_update = True + try: + do_update = p_script.read_text(encoding="utf-8") != script_code + except FileNotFoundError: + pass + except (OSError, UnicodeDecodeError): + LOGGER.debug("Failed to read existing script file.", exc_info=True) + if do_update: + p_script.write_text(script_code, encoding="utf-8") + else: + try: + unlink(p_script) + except OSError: + LOGGER.error("Failed to clean up existing alias. Re-run with -v " + "or check the install log for details.") + LOGGER.info("Failed to remove %s.", p_script) + LOGGER.debug("TRACEBACK", exc_info=True) + + +def _parse_entrypoint_line(line): + line = line.partition("#")[0] + name, sep, rest = line.partition("=") + name = name.strip() + if name and name[0].isalnum() and sep and rest: + mod, sep, rest = rest.partition(":") + mod = mod.strip() + if mod and sep and rest: + func, sep, extra = rest.partition("[") + func = func.strip() + if func: + return name, mod, func + return None, None, None + + +def _readlines(path): + try: + f = open(path, "r", encoding="utf-8", errors="strict") + except OSError: + LOGGER.debug("Failed to read %s", path, exc_info=True) + return + + with f: + try: + while True: + yield next(f) + except StopIteration: + return + except UnicodeDecodeError: + LOGGER.debug("Failed to decode contents of %s", path, exc_info=True) + return + + +def _scan_one(install, root): + # Scan d for dist-info directories with entry_points.txt + dist_info = [d for d in root.glob("*.dist-info") if d.is_dir()] + entrypoints = [f for f in [d / "entry_points.txt" for d in dist_info] if f.is_file()] + if len(entrypoints): + LOGGER.debug("Found %i entry_points.txt files in %i dist-info in %s", + len(entrypoints), len(dist_info), root) + + # Filter down to [console_scripts] and [gui_scripts] + for ep in entrypoints: + alias = None + for line in _readlines(ep): + if line.strip() == "[console_scripts]": + alias = dict(windowed=0) + elif line.strip() == "[gui_scripts]": + alias = dict(windowed=1) + elif line.lstrip().startswith("["): + alias = None + elif alias is not None: + name, mod, func = _parse_entrypoint_line(line) + if name and mod and func: + yield AliasInfo(install=install, name=name, + mod=mod, func=func, **alias) + + +def _scan(install, prefix, dirs): + for dirname in dirs or (): + root = prefix / dirname + yield from _scan_one(install, root) + + +def calculate_aliases(cmd, install, *, _scan=_scan): + LOGGER.debug("Calculating aliases for %s", install["id"]) + + prefix = install["prefix"] + + default_alias = None + default_alias_w = None + + for a in install.get("alias", ()): + target = prefix / a["target"] + if not target.is_file(): + LOGGER.warn("Skipping alias '%s' because target '%s' does not exist", + a["name"], a["target"]) + continue + ai = AliasInfo(install=install, **a) + yield ai + if a.get("windowed") and not default_alias_w: + default_alias_w = ai + if not default_alias: + default_alias = ai + + if not default_alias_w: + default_alias_w = default_alias + + if install.get("default"): + if default_alias: + yield default_alias.replace(name="python") + if default_alias_w: + yield default_alias_w.replace(name="pythonw", windowed=1) + + site_dirs = DEFAULT_SITE_DIRS + for s in install.get("shortcuts", ()): + if s.get("kind") == "site-dirs": + site_dirs = s.get("dirs", ()) + break + + for ai in _scan(install, prefix, site_dirs): + if ai.windowed and default_alias_w: + yield ai.replace(target=default_alias_w.target) + elif not ai.windowed and default_alias: + yield ai.replace(target=default_alias.target) + + +def create_aliases(cmd, aliases, *, _create_alias=_create_alias): + if not cmd.global_dir: + return + + written = set() + + LOGGER.debug("Creating aliases") + + for alias in aliases: + if not alias.name: + LOGGER.debug("Invalid alias info provided with no name.") + continue + + n = alias.name.casefold().removesuffix(_EXE) + if n in written: + # We've already written this alias, so skip it. + continue + written.add(n) + + if not alias.target: + LOGGER.debug("No suitable alias found for %s. Skipping", alias.name) + continue + + target = alias.install["prefix"] / alias.target + try: + _create_alias( + cmd, + name=alias.name, + plat=alias.install.get("tag", "").rpartition("-")[2], + target=target, + script_code=alias.script_code, + windowed=alias.windowed, + ) + except NoLauncherTemplateError: + if install_matches_any(alias.install, getattr(cmd, "tags", None)): + LOGGER.warn("Skipping %s alias because " + "the launcher template was not found.", alias.name) + else: + LOGGER.debug("Skipping %s alias because " + "the launcher template was not found.", alias.name) + + + +def cleanup_aliases(cmd, *, preserve, _unlink_many=atomic_unlink): + if not cmd.global_dir or not cmd.global_dir.is_dir(): + return + + LOGGER.debug("Cleaning up aliases") + expected = set() + for alias in preserve: + if alias.name: + n = alias.name.casefold().removesuffix(_EXE) + _EXE + expected.add(n) + + LOGGER.debug("Retaining %d aliases", len(expected)) + for alias in cmd.global_dir.glob("*.exe"): + if alias.name.casefold() in expected: + continue + target = alias.with_name(alias.name + ".__target__") + script = alias.with_name(alias.name + ".__script__.py") + LOGGER.debug("Unlink %s", alias) + try: + _unlink_many([alias, target, script]) + except (OSError, FilesInUseError): + LOGGER.warn("Failed to remove %s. Ensure it is not in use and run " + "py install --refresh to try again.", alias.name) + LOGGER.debug("TRACEBACK", exc_info=True) diff --git a/src/manage/arputils.py b/src/manage/arputils.py index 63ade07..6e98b28 100644 --- a/src/manage/arputils.py +++ b/src/manage/arputils.py @@ -25,7 +25,7 @@ def _self_cmd(): if not appdata: appdata = os.path.expanduser(r"~\AppData\Local") apps = Path(appdata) / r"Microsoft\WindowsApps" - LOGGER.debug("Searching %s for pymanager.exe", apps) + LOGGER.debug("Searching %s for pymanager.exe for ARP entries", apps) for d in apps.iterdir(): if not d.match("PythonSoftwareFoundation.PythonManager_*"): continue diff --git a/src/manage/commands.py b/src/manage/commands.py index 0ffb8f5..f589bf6 100644 --- a/src/manage/commands.py +++ b/src/manage/commands.py @@ -39,6 +39,10 @@ WELCOME = f"""!B!Python install manager was successfully updated to {__version__}.!W! + +This update adds global shortcuts for installed scripts such as !G!pip.exe!W!. +Use !G!py install --refresh!W! to update all shortcuts. +!Y!This will be needed after installing new scripts, as it is not run automatically.!W! """ # The 'py help' or 'pymanager help' output is constructed by these default docs, diff --git a/src/manage/exceptions.py b/src/manage/exceptions.py index d7892ad..50cd2cf 100644 --- a/src/manage/exceptions.py +++ b/src/manage/exceptions.py @@ -75,3 +75,8 @@ def __init__(self): class FilesInUseError(Exception): def __init__(self, files): self.files = files + + +class NoLauncherTemplateError(Exception): + def __init__(self): + super().__init__("No suitable launcher template was found.") diff --git a/src/manage/install_command.py b/src/manage/install_command.py index ac67e61..6d82d1c 100644 --- a/src/manage/install_command.py +++ b/src/manage/install_command.py @@ -215,173 +215,93 @@ def _calc(prefix, filename, calculate_dest=calculate_dest): LOGGER.debug("Attempted to overwrite: %s", dest) -def _if_exists(launcher, plat): - suffix = "." + launcher.suffix.lstrip(".") - plat_launcher = launcher.parent / f"{launcher.stem}{plat}{suffix}" - if plat_launcher.is_file(): - return plat_launcher - return launcher - - -def _write_alias(cmd, install, alias, target, _link=os.link): - p = (cmd.global_dir / alias["name"]) - target = Path(target) - ensure_tree(p) - launcher = cmd.launcher_exe - if alias.get("windowed"): - launcher = cmd.launcherw_exe or launcher - - plat = install["tag"].rpartition("-")[-1] - if plat: - LOGGER.debug("Checking for launcher for platform -%s", plat) - launcher = _if_exists(launcher, f"-{plat}") - if not launcher.is_file(): - LOGGER.debug("Checking for launcher for default platform %s", cmd.default_platform) - launcher = _if_exists(launcher, cmd.default_platform) - if not launcher.is_file(): - LOGGER.debug("Checking for launcher for -64") - launcher = _if_exists(launcher, "-64") - LOGGER.debug("Create %s linking to %s using %s", alias["name"], target, launcher) - if not launcher or not launcher.is_file(): - if install_matches_any(install, getattr(cmd, "tags", None)): - LOGGER.warn("Skipping %s alias because the launcher template was not found.", alias["name"]) - else: - LOGGER.debug("Skipping %s alias because the launcher template was not found.", alias["name"]) - return - +def _create_shortcut_pep514(cmd, install, shortcut): try: - launcher_bytes = launcher.read_bytes() - except OSError: - warnings_shown = cmd.scratch.setdefault("install_command._write_alias.warnings_shown", set()) - if str(launcher) not in warnings_shown: - LOGGER.warn("Failed to read launcher template at %s.", launcher) - warnings_shown.add(str(launcher)) - LOGGER.debug("Failed to read %s", launcher, exc_info=True) + from .pep514utils import update_registry + root = cmd.pep514_root + except (ImportError, AttributeError): + LOGGER.debug("Skipping PEP 514 creation.", exc_info=True) return - - existing_bytes = b'' - try: - with open(p, 'rb') as f: - existing_bytes = f.read(len(launcher_bytes) + 1) - except FileNotFoundError: - pass - except OSError: - LOGGER.debug("Failed to read existing alias launcher.") - - launcher_remap = cmd.scratch.setdefault("install_command._write_alias.launcher_remap", {}) - - if existing_bytes == launcher_bytes: - # Valid existing launcher, so save its path in case we need it later - # for a hard link. - launcher_remap.setdefault(launcher.name, p) - else: - # First try and create a hard link - unlink(p) - try: - _link(launcher, p) - LOGGER.debug("Created %s as hard link to %s", p.name, launcher.name) - except OSError as ex: - if ex.winerror != 17: - # Report errors other than cross-drive links - LOGGER.debug("Failed to create hard link for command.", exc_info=True) - launcher2 = launcher_remap.get(launcher.name) - if launcher2: - try: - _link(launcher2, p) - LOGGER.debug("Created %s as hard link to %s", p.name, launcher2.name) - except FileNotFoundError: - raise - except OSError: - LOGGER.debug("Failed to create hard link to fallback launcher") - launcher2 = None - if not launcher2: - try: - p.write_bytes(launcher_bytes) - LOGGER.debug("Created %s as copy of %s", p.name, launcher.name) - launcher_remap[launcher.name] = p - except OSError: - LOGGER.error("Failed to create global command %s.", alias["name"]) - LOGGER.debug(exc_info=True) - - p_target = p.with_name(p.name + ".__target__") - try: - if target.match(p_target.read_text(encoding="utf-8")): - return - except FileNotFoundError: - pass - except (OSError, UnicodeDecodeError): - LOGGER.debug("Failed to read existing target path.", exc_info=True) - - p_target.write_text(str(target), encoding="utf-8") - - -def _create_shortcut_pep514(cmd, install, shortcut): - from .pep514utils import update_registry - update_registry(cmd.pep514_root, install, shortcut, cmd.tags) + update_registry(root, install, shortcut, cmd.tags) def _cleanup_shortcut_pep514(cmd, install_shortcut_pairs): - from .pep514utils import cleanup_registry - cleanup_registry(cmd.pep514_root, {s["Key"] for i, s in install_shortcut_pairs}, cmd.tags) + try: + from .pep514utils import cleanup_registry + root = cmd.pep514_root + except (ImportError, AttributeError): + LOGGER.debug("Skipping PEP 514 cleanup.", exc_info=True) + return + cleanup_registry(root, {s["Key"] for i, s in install_shortcut_pairs}, getattr(cmd, "tags", None)) def _create_start_shortcut(cmd, install, shortcut): - from .startutils import create_one - create_one(cmd.start_folder, install, shortcut, cmd.tags) + try: + from .startutils import create_one + root = cmd.start_folder + except (ImportError, AttributeError): + LOGGER.debug("Skipping Start shortcut creation.", exc_info=True) + return + create_one(root, install, shortcut, cmd.tags) def _cleanup_start_shortcut(cmd, install_shortcut_pairs): - from .startutils import cleanup - cleanup(cmd.start_folder, [s for i, s in install_shortcut_pairs], cmd.tags) + try: + from .startutils import cleanup + root = cmd.start_folder + except (ImportError, AttributeError): + LOGGER.debug("Skipping Start shortcut cleanup.", exc_info=True) + return + cleanup(root, [s for i, s in install_shortcut_pairs], getattr(cmd, "tags", None)) def _create_arp_entry(cmd, install, shortcut): # ARP = Add/Remove Programs - from .arputils import create_one - create_one(install, shortcut, cmd.tags) + try: + from .arputils import create_one + except ImportError: + LOGGER.debug("Skipping ARP entry creation.", exc_info=True) + return + create_one(install, shortcut, getattr(cmd, "tags", None)) def _cleanup_arp_entries(cmd, install_shortcut_pairs): - from .arputils import cleanup - cleanup([i for i, s in install_shortcut_pairs], cmd.tags) + try: + from .arputils import cleanup + except ImportError: + LOGGER.debug("Skipping ARP entry cleanup.", exc_info=True) + return + cleanup([i for i, s in install_shortcut_pairs], getattr(cmd, "tags", None)) SHORTCUT_HANDLERS = { "pep514": (_create_shortcut_pep514, _cleanup_shortcut_pep514), "start": (_create_start_shortcut, _cleanup_start_shortcut), "uninstall": (_create_arp_entry, _cleanup_arp_entries), + # We want to catch these, but not handle them as regular shortcuts. + "site-dirs": (None, None), } -def update_all_shortcuts(cmd): +def update_all_shortcuts(cmd, *, _aliasutils=None): LOGGER.debug("Updating global shortcuts") - alias_written = set() + installs = cmd.get_installs() shortcut_written = {} - for i in cmd.get_installs(): - if cmd.global_dir: - aliases = i.get("alias", ()) - # Generate a python.exe for the default runtime in case the user - # later disables/removes the global python.exe command. - if i.get("default"): - aliases = list(i.get("alias", ())) - alias_1 = [a for a in aliases if not a.get("windowed")] - alias_2 = [a for a in aliases if a.get("windowed")] - if alias_1: - aliases.append({**alias_1[0], "name": "python.exe"}) - if alias_2: - aliases.append({**alias_2[0], "name": "pythonw.exe"}) - - for a in aliases: - if a["name"].casefold() in alias_written: - continue - target = i["prefix"] / a["target"] - if not target.is_file(): - LOGGER.warn("Skipping alias '%s' because target '%s' does not exist", a["name"], a["target"]) - continue - _write_alias(cmd, i, a, target) - alias_written.add(a["name"].casefold()) + if cmd.global_dir: + if not _aliasutils: + from . import aliasutils as _aliasutils + aliases = [] + for i in installs: + try: + aliases.extend(_aliasutils.calculate_aliases(cmd, i)) + except LookupError: + LOGGER.warn("Failed to process aliases for %s.", i.get("display-name", i["id"])) + LOGGER.debug("TRACEBACK", exc_info=True) + _aliasutils.create_aliases(cmd, aliases) + _aliasutils.cleanup_aliases(cmd, preserve=aliases) + for i in installs: for s in i.get("shortcuts", ()): if cmd.enable_shortcut_kinds and s["kind"] not in cmd.enable_shortcut_kinds: continue @@ -393,21 +313,13 @@ def update_all_shortcuts(cmd): LOGGER.warn("Skipping invalid shortcut for '%s'", i["id"]) LOGGER.debug("shortcut: %s", s) else: - create(cmd, i, s) - shortcut_written.setdefault(s["kind"], []).append((i, s)) - - if cmd.global_dir and cmd.global_dir.is_dir() and cmd.launcher_exe: - for target in cmd.global_dir.glob("*.exe.__target__"): - alias = target.with_suffix("") - if alias.name.casefold() not in alias_written: - LOGGER.debug("Unlink %s", alias) - unlink(alias, f"Attempting to remove {alias} is taking some time. " + - "Ensure it is not is use, and please continue to wait " + - "or press Ctrl+C to abort.") - target.unlink() + if create: + create(cmd, i, s) + shortcut_written.setdefault(s["kind"], []).append((i, s)) for k, (_, cleanup) in SHORTCUT_HANDLERS.items(): - cleanup(cmd, shortcut_written.get(k, [])) + if cleanup: + cleanup(cmd, shortcut_written.get(k, [])) def print_cli_shortcuts(cmd): @@ -522,15 +434,7 @@ def _download_one(cmd, source, install, download_dir, *, must_copy=False): return package -def _should_preserve_on_upgrade(cmd, root, path): - if path.match("site-packages"): - return True - if path.parent == root and path.match("Scripts"): - return True - return False - - -def _preserve_site(cmd, root): +def _preserve_site(cmd, root, install): if not root.is_dir(): return None if not cmd.preserve_site_on_upgrade: @@ -542,39 +446,48 @@ def _preserve_site(cmd, root): if cmd.repair: LOGGER.verbose("Not preserving site directory because of --repair") return None + state = [] i = 0 - dirs = [root] + + from .aliasutils import DEFAULT_SITE_DIRS + site_dirs = DEFAULT_SITE_DIRS + for s in install.get("shortcuts", ()): + if s.get("kind") == "site-dirs": + site_dirs = s.get("dirs", ()) + break + target_root = root.with_name(f"_{root.name}") target_root.mkdir(parents=True, exist_ok=True) - while dirs: - if _should_preserve_on_upgrade(cmd, root, dirs[0]): - while True: - target = target_root / str(i) - i += 1 - try: - unlink(target) - break - except FileNotFoundError: - break - except OSError: - LOGGER.verbose("Failed to remove %s.", target) - try: - LOGGER.info("Preserving %s during update.", dirs[0].relative_to(root)) - except ValueError: - # Just in case a directory goes weird, so we don't break - LOGGER.verbose(exc_info=True) - LOGGER.verbose("Moving %s to %s", dirs[0], target) + + for dirname in site_dirs: + d = root / dirname + if not d.is_dir(): + continue + + while True: + target = target_root / str(i) + i += 1 try: - dirs[0].rename(target) + unlink(target) + break + except FileNotFoundError: + break except OSError: - LOGGER.warn("Failed to preserve %s during update.", dirs[0]) - LOGGER.verbose("TRACEBACK", exc_info=True) - else: - state.append((dirs[0], target)) + LOGGER.verbose("Failed to remove %s.", target) + try: + LOGGER.info("Preserving %s during update.", d.relative_to(root)) + except ValueError: + # Just in case a directory goes weird, so we don't break + LOGGER.verbose("Error information:", exc_info=True) + LOGGER.verbose("Moving %s to %s", d, target) + try: + d.rename(target) + except OSError: + LOGGER.warn("Failed to preserve %s during update.", d) + LOGGER.verbose("Error information:", exc_info=True) else: - dirs.extend(d for d in dirs[0].iterdir() if d.is_dir()) - dirs.pop(0) + state.append((d, target)) # Append None, target_root last to clean up after restore is done state.append((None, target_root)) return state @@ -634,7 +547,7 @@ def _install_one(cmd, source, install, *, target=None): dest = target or (cmd.install_dir / install["id"]) - preserved_site = _preserve_site(cmd, dest) + preserved_site = _preserve_site(cmd, dest, install) LOGGER.verbose("Extracting %s to %s", package, dest) if not cmd.repair: diff --git a/src/manage/pathutils.py b/src/manage/pathutils.py index 8e75d8e..4576573 100644 --- a/src/manage/pathutils.py +++ b/src/manage/pathutils.py @@ -44,11 +44,17 @@ def __bool__(self): @property def stem(self): - return self.name.rpartition(".")[0] + stem, dot, suffix = self.name.rpartition(".") + if not dot: + return suffix + return stem @property def suffix(self): - return self.name.rpartition(".")[2] + stem, dot, suffix = self.name.rpartition(".") + if not dot: + return "" + return dot + suffix @property def parent(self): diff --git a/src/manage/startutils.py b/src/manage/startutils.py index 4a45e7c..76349c2 100644 --- a/src/manage/startutils.py +++ b/src/manage/startutils.py @@ -160,7 +160,10 @@ def cleanup(root, preserve, warn_for=[]): LOGGER.debug("Cleaning up Start menu shortcuts") for item in keep: - LOGGER.debug("Except: %s", item) + try: + LOGGER.debug("Except: %s", item.relative_to(root)) + except ValueError: + LOGGER.debug("Except: %s", item) for entry in root.iterdir(): _cleanup(entry, keep) diff --git a/src/pymanager/_launch.cpp b/src/pymanager/_launch.cpp index 0286093..e4de999 100644 --- a/src/pymanager/_launch.cpp +++ b/src/pymanager/_launch.cpp @@ -34,49 +34,48 @@ dup_handle(HANDLE input, HANDLE *output) int -launch(const wchar_t *executable, const wchar_t *insert_args, int skip_argc, DWORD *exitCode) -{ +launch( + const wchar_t *executable, + const wchar_t *orig_cmd_line, + const wchar_t *insert_args, + int skip_argc, + DWORD *exit_code +) { HANDLE job; JOBOBJECT_EXTENDED_LIMIT_INFORMATION info; DWORD info_len; STARTUPINFOW si; PROCESS_INFORMATION pi; int lastError = 0; - const wchar_t *arg_space = L" "; - LPCWSTR origCmdLine = GetCommandLineW(); - const wchar_t *cmdLine = NULL; + const wchar_t *cmd_line = NULL; - if (insert_args == NULL) { - insert_args = L""; + if (orig_cmd_line[0] == L'"') { + cmd_line = wcschr(orig_cmd_line + 1, L'"'); + } else { + cmd_line = wcschr(orig_cmd_line, L' '); } - size_t n = wcslen(executable) + wcslen(origCmdLine) + wcslen(insert_args) + 5; - wchar_t *newCmdLine = (wchar_t *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, n * sizeof(wchar_t)); - if (!newCmdLine) { + size_t n = wcslen(executable) + wcslen(orig_cmd_line) + (insert_args ? wcslen(insert_args) : 0) + 6; + wchar_t *new_cmd_line = (wchar_t *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, n * sizeof(wchar_t)); + if (!new_cmd_line) { lastError = GetLastError(); goto exit; } - if (origCmdLine[0] == L'"') { - cmdLine = wcschr(origCmdLine + 1, L'"'); - } else { - cmdLine = wcschr(origCmdLine, L' '); - } - - while (skip_argc-- > 0) { + // Skip any requested args, deliberately leaving any trailing spaces + // (we'll skip one later on and add our own space, and preserve multiple) + while (cmd_line && *cmd_line && skip_argc-- > 0) { wchar_t c; - while (*++cmdLine && *cmdLine == L' ') { } - while (*++cmdLine && *cmdLine != L' ') { } + while (*++cmd_line && *cmd_line == L' ') { } + while (*++cmd_line && *cmd_line != L' ') { } } - if (!insert_args || !*insert_args) { - arg_space = L""; - } - if (cmdLine && *cmdLine) { - swprintf_s(newCmdLine, n + 1, L"\"%s\"%s%s %s", executable, arg_space, insert_args, cmdLine + 1); - } else { - swprintf_s(newCmdLine, n + 1, L"\"%s\"%s%s", executable, arg_space, insert_args); - } + swprintf_s(new_cmd_line, n, L"\"%s\"%s%s%s%s", + executable, + (insert_args && *insert_args) ? L" ": L"", + (insert_args && *insert_args) ? insert_args : L"", + (cmd_line && *cmd_line) ? L" " : L"", + (cmd_line && *cmd_line) ? cmd_line + 1 : L""); #if defined(_WINDOWS) /* @@ -123,7 +122,7 @@ launch(const wchar_t *executable, const wchar_t *insert_args, int skip_argc, DWO } si.dwFlags |= STARTF_USESTDHANDLES; - if (!CreateProcessW(executable, newCmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) { + if (!CreateProcessW(executable, new_cmd_line, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi)) { lastError = GetLastError(); goto exit; } @@ -131,12 +130,12 @@ launch(const wchar_t *executable, const wchar_t *insert_args, int skip_argc, DWO AssignProcessToJobObject(job, pi.hProcess); CloseHandle(pi.hThread); WaitForSingleObjectEx(pi.hProcess, INFINITE, FALSE); - if (!GetExitCodeProcess(pi.hProcess, exitCode)) { + if (!GetExitCodeProcess(pi.hProcess, exit_code)) { lastError = GetLastError(); } exit: - if (newCmdLine) { - HeapFree(GetProcessHeap(), 0, newCmdLine); + if (new_cmd_line) { + HeapFree(GetProcessHeap(), 0, new_cmd_line); } return lastError ? HRESULT_FROM_WIN32(lastError) : 0; } diff --git a/src/pymanager/_launch.h b/src/pymanager/_launch.h index 721a06a..1f51746 100644 --- a/src/pymanager/_launch.h +++ b/src/pymanager/_launch.h @@ -1 +1,7 @@ -int launch(const wchar_t *executable, const wchar_t *insert_args, int skip_argc, DWORD *exitCode); +int launch( + const wchar_t *executable, + const wchar_t *orig_cmd_line, + const wchar_t *insert_args, + int skip_argc, + DWORD *exit_code +); diff --git a/src/pymanager/launcher.cpp b/src/pymanager/launcher.cpp index c456c8f..79e1063 100644 --- a/src/pymanager/launcher.cpp +++ b/src/pymanager/launcher.cpp @@ -142,6 +142,64 @@ get_executable(wchar_t *executable, unsigned int bufferSize) } +int +get_script(wchar_t **result_path) +{ + HANDLE ph = GetProcessHeap(); + wchar_t *path = NULL; + DWORD path_len = 0; + DWORD len = 0; + int error = 0; + const wchar_t *SUFFIX = L".__script__.py"; + + // Get our path in a dynamic buffer with enough space to add SUFFIX + while (len >= path_len) { + if (path) { + HeapFree(ph, 0, path); + } + path_len += 260; + + path = (wchar_t *)HeapAlloc(ph, HEAP_ZERO_MEMORY, sizeof(wchar_t) * path_len); + if (!path) { + return HRESULT_FROM_WIN32(GetLastError()); + } + + len = GetModuleFileNameW(NULL, path, path_len - wcslen(SUFFIX)); + if (len == 0) { + error = GetLastError(); + HeapFree(ph, 0, path); + return HRESULT_FROM_WIN32(error); + } + } + + wcscpy_s(&path[len], path_len - len, SUFFIX); + + // Check that we have a script file. FindFirstFile should be fastest. + WIN32_FIND_DATAW fd; + HANDLE fh = FindFirstFileW(path, &fd); + if (fh == INVALID_HANDLE_VALUE) { + error = GetLastError(); + HeapFree(ph, 0, path); + switch (error) { + case ERROR_INVALID_FUNCTION: + case ERROR_FILE_NOT_FOUND: + case ERROR_PATH_NOT_FOUND: + // This is the typical exit for normal launches. We ought to be nice + // and fast up until this point, but can be slower through every + // other path. + return 0; + default: + return HRESULT_FROM_WIN32(error); + } + } + FindClose(fh); + + // Deliberately letting our memory leak - it'll be cleaned up when the + // process ends, and this is not a loop. + *result_path = path; + return 0; +} + int try_load_python3_dll(const wchar_t *executable, unsigned int bufferSize, void **mainFunction) @@ -209,16 +267,36 @@ wmain(int argc, wchar_t **argv) { int exit_code; wchar_t executable[MAXLEN]; + wchar_t *script = NULL; int err = get_executable(executable, MAXLEN); if (err) { return print_error(err, L"Failed to get target path"); } + err = get_script(&script); + if (err) { + return print_error(err, L"Failed to get script path"); + } + void *main_func = NULL; err = try_load_python3_dll(executable, MAXLEN, (void **)&main_func); switch (err) { case 0: + if (script) { + wchar_t **argv2 = (wchar_t **)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, + (argc + 1) * sizeof(wchar_t *)); + if (!argv2) { + return HRESULT_FROM_WIN32(GetLastError()); + } + argv2[0] = argv[0]; + argv2[1] = script; + for (int i = 1; i < argc; ++i) { + argv2[i + 1] = argv[i]; + } + argv = argv2; + argc += 1; + } err = launch_by_dll(main_func, executable, argc, argv, &exit_code); if (!err) { return exit_code; @@ -248,7 +326,8 @@ wmain(int argc, wchar_t **argv) break; } - err = launch(executable, NULL, 0, (DWORD *)&exit_code); + err = launch(executable, GetCommandLineW(), script, 0, (DWORD *)&exit_code); + if (!err) { return exit_code; } diff --git a/src/pymanager/main.cpp b/src/pymanager/main.cpp index 95512bc..0094df1 100644 --- a/src/pymanager/main.cpp +++ b/src/pymanager/main.cpp @@ -630,7 +630,7 @@ wmain(int argc, wchar_t **argv) } #endif - err = launch(executable.c_str(), args.c_str(), skip_argc, &exitCode); + err = launch(executable.c_str(), GetCommandLineW(), args.c_str(), skip_argc, &exitCode); // TODO: Consider sharing print_error() with launcher.cpp // This will ensure error messages are aligned whether we're launching diff --git a/tests/conftest.py b/tests/conftest.py index 6faf08e..3941f01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -155,6 +155,7 @@ def __init__(self, global_dir, installs=[]): self.installs = list(installs) self.shebang_can_run_anything = True self.shebang_can_run_anything_silently = False + self.scratch = {} def get_installs(self, *, include_unmanaged=True, set_default=True): return self.installs diff --git a/tests/test_alias.py b/tests/test_alias.py new file mode 100644 index 0000000..27da874 --- /dev/null +++ b/tests/test_alias.py @@ -0,0 +1,327 @@ +import pytest +import secrets + +from manage import aliasutils as AU +from manage.exceptions import NoLauncherTemplateError + + +@pytest.fixture +def alias_checker(tmp_path): + with AliasChecker(tmp_path) as checker: + yield checker + + +class AliasChecker: + class Cmd: + global_dir = "out" + launcher_exe = "launcher.txt" + launcherw_exe = "launcherw.txt" + default_platform = "-64" + + def __init__(self, platform=None): + self.scratch = {} + if platform: + self.default_platform = platform + + + def __init__(self, tmp_path): + self.Cmd.global_dir = tmp_path / "out" + self.Cmd.launcher_exe = tmp_path / "launcher.txt" + self.Cmd.launcherw_exe = tmp_path / "launcherw.txt" + self._expect_target = "target-" + secrets.token_hex(32) + self._expect = { + "-32": "-32-" + secrets.token_hex(32), + "-64": "-64-" + secrets.token_hex(32), + "-arm64": "-arm64-" + secrets.token_hex(32), + "w-32": "w-32-" + secrets.token_hex(32), + "w-64": "w-64-" + secrets.token_hex(32), + "w-arm64": "w-arm64-" + secrets.token_hex(32), + } + for k, v in self._expect.items(): + (tmp_path / f"launcher{k}.txt").write_text(v) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + pass + + def check(self, cmd, tag, name, expect, windowed=0, script_code=None): + AU._create_alias( + cmd, + name=name, + plat=tag.rpartition("-")[2], + target=self._expect_target, + script_code=script_code, + windowed=windowed, + ) + print(*cmd.global_dir.glob("*"), sep="\n") + assert (cmd.global_dir / f"{name}.exe").is_file() + assert (cmd.global_dir / f"{name}.exe.__target__").is_file() + assert (cmd.global_dir / f"{name}.exe").read_text() == expect + assert (cmd.global_dir / f"{name}.exe.__target__").read_text() == self._expect_target + if script_code: + assert (cmd.global_dir / f"{name}.exe.__script__.py").is_file() + assert (cmd.global_dir / f"{name}.exe.__script__.py").read_text() == script_code + + def check_32(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["-32"]) + + def check_w32(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["w-32"], windowed=1) + + def check_64(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["-64"]) + + def check_w64(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["w-64"], windowed=1) + + def check_arm64(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["-arm64"]) + + def check_warm64(self, cmd, tag, name): + self.check(cmd, tag, name, self._expect["w-arm64"], windowed=1) + + def check_script(self, cmd, tag, name, windowed=0): + self.check(cmd, tag, name, self._expect["w-32" if windowed else "-32"], + windowed=windowed, script_code=secrets.token_hex(128)) + + +def test_write_alias_tag_with_platform(alias_checker): + alias_checker.check_32(alias_checker.Cmd(), "1.0-32", "testA") + alias_checker.check_w32(alias_checker.Cmd(), "1.0-32", "testB") + alias_checker.check_64(alias_checker.Cmd(), "1.0-64", "testC") + alias_checker.check_w64(alias_checker.Cmd(), "1.0-64", "testD") + alias_checker.check_arm64(alias_checker.Cmd(), "1.0-arm64", "testE") + alias_checker.check_warm64(alias_checker.Cmd(), "1.0-arm64", "testF") + + +def test_write_alias_default_platform(alias_checker): + alias_checker.check_32(alias_checker.Cmd("-32"), "1.0", "testA") + alias_checker.check_w32(alias_checker.Cmd("-32"), "1.0", "testB") + alias_checker.check_64(alias_checker.Cmd(), "1.0", "testC") + alias_checker.check_w64(alias_checker.Cmd(), "1.0", "testD") + alias_checker.check_arm64(alias_checker.Cmd("-arm64"), "1.0", "testE") + alias_checker.check_warm64(alias_checker.Cmd("-arm64"), "1.0", "testF") + + +def test_write_alias_fallback_platform(alias_checker): + alias_checker.check_64(alias_checker.Cmd("-spam"), "1.0", "testA") + alias_checker.check_w64(alias_checker.Cmd("-spam"), "1.0", "testB") + + +def test_write_script_alias(alias_checker): + alias_checker.check_script(alias_checker.Cmd(), "1.0-32", "testA", windowed=0) + alias_checker.check_script(alias_checker.Cmd(), "1.0-32", "testB", windowed=1) + alias_checker.check_script(alias_checker.Cmd(), "1.0-32", "testA", windowed=0) + + +def test_write_alias_launcher_missing(fake_config, assert_log, tmp_path): + fake_config.launcher_exe = tmp_path / "non-existent.exe" + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + with pytest.raises(NoLauncherTemplateError): + AU._create_alias( + fake_config, + name="test.exe", + plat="-64", + target=tmp_path / "target.exe", + ) + assert_log( + "Checking for launcher.*", + "Checking for launcher.*", + "Checking for launcher.*", + "Create %s linking to %s", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unreadable(fake_config, assert_log, tmp_path): + class FakeLauncherPath: + stem = "test" + suffix = ".exe" + parent = tmp_path + + @staticmethod + def is_file(): + return True + + @staticmethod + def read_bytes(): + raise OSError("no reading for the test") + + fake_config.scratch = {} + fake_config.launcher_exe = FakeLauncherPath + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + AU._create_alias( + fake_config, + name="test.exe", + target=tmp_path / "target.exe", + ) + assert_log( + "Create %s linking to %s", + "Failed to read launcher template at %s\\.", + "Failed to read %s", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unlinkable(fake_config, assert_log, tmp_path): + def fake_link(x, y): + raise OSError("Error for testing") + + fake_config.scratch = {} + fake_config.launcher_exe = tmp_path / "launcher.txt" + fake_config.launcher_exe.write_bytes(b'Arbitrary contents') + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + AU._create_alias( + fake_config, + name="test.exe", + target=tmp_path / "target.exe", + _link=fake_link + ) + assert_log( + "Create %s linking to %s", + "Failed to create hard link.+", + "Created %s as copy of %s", + assert_log.end_of_log(), + ) + + +def test_write_alias_launcher_unlinkable_remap(fake_config, assert_log, tmp_path): + # This is for the fairly expected case of the PyManager install being on one + # drive, but the global commands directory being on another. In this + # situation, we can't hard link directly into the app files, and will need + # to copy. But we only need to copy once, so if a launcher_remap has been + # set (in the current process), then we have an available copy already and + # can link to that. + + def fake_link(x, y): + if x.match("launcher.txt"): + raise OSError(17, "Error for testing") + + fake_config.scratch = { + "aliasutils.create_alias.launcher_remap": {"launcher.txt": tmp_path / "actual_launcher.txt"}, + } + fake_config.launcher_exe = tmp_path / "launcher.txt" + fake_config.launcher_exe.write_bytes(b'Arbitrary contents') + (tmp_path / "actual_launcher.txt").write_bytes(b'Arbitrary contents') + fake_config.default_platform = '-32' + fake_config.global_dir = tmp_path / "bin" + AU._create_alias( + fake_config, + name="test.exe", + target=tmp_path / "target.exe", + _link=fake_link + ) + assert_log( + "Create %s linking to %s", + "Failed to create hard link.+", + ("Created %s as hard link to %s", ("test.exe", "actual_launcher.txt")), + assert_log.end_of_log(), + ) + + +def test_parse_entrypoint_line(): + for line, expect in [ + ("", (None, None, None)), + ("# comment", (None, None, None)), + ("name-only", (None, None, None)), + ("name=value", (None, None, None)), + ("name=mod:func", ("name", "mod", "func")), + ("name=mod:func#comment", ("name", "mod", "func")), + (" name = mod : func ", ("name", "mod", "func")), + ("name=mod:func[extra]", ("name", "mod", "func")), + ("name=mod:func [extra]", ("name", "mod", "func")), + ]: + assert expect == AU._parse_entrypoint_line(line) + + +def test_scan_entrypoints(fake_config, tmp_path): + root = tmp_path / "test_install" + site = root / "site-packages" + A = site / "A.dist-info" + A.mkdir(parents=True, exist_ok=True) + (root / "target.exe").write_bytes(b"") + (A / "entry_points.txt").write_text("""[console_scripts] +a = a:main + +[gui_scripts] +aw = a:main +""") + + install = dict( + prefix=root, + id="test", + default=1, + alias=[dict(name="target", target="target.exe")], + shortcuts=[dict(kind="site-dirs", dirs=["site-packages"])], + ) + + actual = list(AU.calculate_aliases(fake_config, install)) + + assert ["target", "python", "pythonw", "a", "aw"] == [a.name for a in actual] + assert [0, 0, 1, 0, 1] == [a.windowed for a in actual] + assert [None, None, None, "a", "a"] == [a.mod for a in actual] + assert [None, None, None, "main", "main"] == [a.func for a in actual] + + +def test_create_aliases(fake_config, tmp_path): + target = tmp_path / "target.exe" + target.write_bytes(b"") + + created = [] + # Full arguments copied from source to ensure callers only pass valid args + def _on_create(cmd, *, name, target, plat=None, windowed=0, script_code=None): + created.append((name, windowed, script_code)) + + aliases = [ + AU.AliasInfo(install=dict(prefix=tmp_path), name="a", target=target), + AU.AliasInfo(install=dict(prefix=tmp_path), name="a.exe", target=target), + AU.AliasInfo(install=dict(prefix=tmp_path), name="aw", windowed=1, target=target), + ] + + AU.create_aliases(fake_config, aliases, _create_alias=_on_create) + print(created) + + assert ["a", "aw"] == [a[0] for a in created] + assert [0, 1] == [a[1] for a in created] + assert [None, None] == [a[2] for a in created] + + +def test_cleanup_aliases(fake_config, tmp_path): + target = tmp_path / "target.exe" + target.write_bytes(b"") + + aliases = [ + AU.AliasInfo(install=dict(prefix=tmp_path), name="A", target=target), + AU.AliasInfo(install=dict(prefix=tmp_path), name="B.exe", target=target), + ] + + root = fake_config.global_dir + root.mkdir(parents=True, exist_ok=True) + files = ["A.exe", "A.exe.__target__", + "B.exe", "B.exe.__script__.py", "B.exe.__target__", + "C.exe", "C.exe.__script__.py", "C.exe.__target__"] + for f in files: + (root / f).write_bytes(b"") + + # Ensure the expect files get requested to be unlinked + class Unlinker(list): + def __call__(self, names): + self.extend(names) + + unlinked = Unlinker() + AU.cleanup_aliases(fake_config, preserve=aliases, _unlink_many=unlinked) + assert set(f.name for f in unlinked) == set(["C.exe", "C.exe.__script__.py", "C.exe.__target__"]) + + # Ensure we don't break if unlinking fails + def unlink2(names): + raise PermissionError("Simulated error") + AU.cleanup_aliases(fake_config, preserve=aliases, _unlink_many=unlink2) + + # Ensure the actual unlink works + AU.cleanup_aliases(fake_config, preserve=aliases) + assert set(f.name for f in root.glob("*")) == set(files[:-3]) diff --git a/tests/test_install_command.py b/tests/test_install_command.py index ec48de2..4b6c09d 100644 --- a/tests/test_install_command.py +++ b/tests/test_install_command.py @@ -8,258 +8,6 @@ from manage import installs -@pytest.fixture -def alias_checker(tmp_path): - with AliasChecker(tmp_path) as checker: - yield checker - - -class AliasChecker: - class Cmd: - global_dir = "out" - launcher_exe = "launcher.txt" - launcherw_exe = "launcherw.txt" - default_platform = "-64" - - def __init__(self, platform=None): - self.scratch = {} - if platform: - self.default_platform = platform - - - def __init__(self, tmp_path): - self.Cmd.global_dir = tmp_path / "out" - self.Cmd.launcher_exe = tmp_path / "launcher.txt" - self.Cmd.launcherw_exe = tmp_path / "launcherw.txt" - self._expect_target = "target-" + secrets.token_hex(32) - self._expect = { - "-32": "-32-" + secrets.token_hex(32), - "-64": "-64-" + secrets.token_hex(32), - "-arm64": "-arm64-" + secrets.token_hex(32), - "w-32": "w-32-" + secrets.token_hex(32), - "w-64": "w-64-" + secrets.token_hex(32), - "w-arm64": "w-arm64-" + secrets.token_hex(32), - } - for k, v in self._expect.items(): - (tmp_path / f"launcher{k}.txt").write_text(v) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - pass - - def check(self, cmd, tag, name, expect, windowed=0): - IC._write_alias( - cmd, - {"tag": tag}, - {"name": f"{name}.txt", "windowed": windowed}, - self._expect_target, - ) - print(*cmd.global_dir.glob("*"), sep="\n") - assert (cmd.global_dir / f"{name}.txt").is_file() - assert (cmd.global_dir / f"{name}.txt.__target__").is_file() - assert (cmd.global_dir / f"{name}.txt").read_text() == expect - assert (cmd.global_dir / f"{name}.txt.__target__").read_text() == self._expect_target - - def check_32(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["-32"]) - - def check_w32(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["w-32"], windowed=1) - - def check_64(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["-64"]) - - def check_w64(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["w-64"], windowed=1) - - def check_arm64(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["-arm64"]) - - def check_warm64(self, cmd, tag, name): - self.check(cmd, tag, name, self._expect["w-arm64"], windowed=1) - - -def test_write_alias_tag_with_platform(alias_checker): - alias_checker.check_32(alias_checker.Cmd(), "1.0-32", "testA") - alias_checker.check_w32(alias_checker.Cmd(), "1.0-32", "testB") - alias_checker.check_64(alias_checker.Cmd(), "1.0-64", "testC") - alias_checker.check_w64(alias_checker.Cmd(), "1.0-64", "testD") - alias_checker.check_arm64(alias_checker.Cmd(), "1.0-arm64", "testE") - alias_checker.check_warm64(alias_checker.Cmd(), "1.0-arm64", "testF") - - -def test_write_alias_default_platform(alias_checker): - alias_checker.check_32(alias_checker.Cmd("-32"), "1.0", "testA") - alias_checker.check_w32(alias_checker.Cmd("-32"), "1.0", "testB") - alias_checker.check_64(alias_checker.Cmd(), "1.0", "testC") - alias_checker.check_w64(alias_checker.Cmd(), "1.0", "testD") - alias_checker.check_arm64(alias_checker.Cmd("-arm64"), "1.0", "testE") - alias_checker.check_warm64(alias_checker.Cmd("-arm64"), "1.0", "testF") - - -def test_write_alias_fallback_platform(alias_checker): - alias_checker.check_64(alias_checker.Cmd("-spam"), "1.0", "testA") - alias_checker.check_w64(alias_checker.Cmd("-spam"), "1.0", "testB") - - -def test_write_alias_launcher_missing(fake_config, assert_log, tmp_path): - fake_config.launcher_exe = tmp_path / "non-existent.exe" - fake_config.default_platform = '-32' - fake_config.global_dir = tmp_path / "bin" - IC._write_alias( - fake_config, - {"tag": "test"}, - {"name": "test.exe"}, - tmp_path / "target.exe", - ) - assert_log( - "Checking for launcher.*", - "Checking for launcher.*", - "Checking for launcher.*", - "Create %s linking to %s", - "Skipping %s alias because the launcher template was not found.", - assert_log.end_of_log(), - ) - - -def test_write_alias_launcher_unreadable(fake_config, assert_log, tmp_path): - class FakeLauncherPath: - stem = "test" - suffix = ".exe" - parent = tmp_path - - @staticmethod - def is_file(): - return True - - @staticmethod - def read_bytes(): - raise OSError("no reading for the test") - - fake_config.scratch = {} - fake_config.launcher_exe = FakeLauncherPath - fake_config.default_platform = '-32' - fake_config.global_dir = tmp_path / "bin" - IC._write_alias( - fake_config, - {"tag": "test"}, - {"name": "test.exe"}, - tmp_path / "target.exe", - ) - assert_log( - "Checking for launcher.*", - "Create %s linking to %s", - "Failed to read launcher template at %s\\.", - "Failed to read %s", - assert_log.end_of_log(), - ) - - -def test_write_alias_launcher_unlinkable(fake_config, assert_log, tmp_path): - def fake_link(x, y): - raise OSError("Error for testing") - - fake_config.scratch = {} - fake_config.launcher_exe = tmp_path / "launcher.txt" - fake_config.launcher_exe.write_bytes(b'Arbitrary contents') - fake_config.default_platform = '-32' - fake_config.global_dir = tmp_path / "bin" - IC._write_alias( - fake_config, - {"tag": "test"}, - {"name": "test.exe"}, - tmp_path / "target.exe", - _link=fake_link - ) - assert_log( - "Checking for launcher.*", - "Create %s linking to %s", - "Failed to create hard link.+", - "Created %s as copy of %s", - assert_log.end_of_log(), - ) - - -def test_write_alias_launcher_unlinkable_remap(fake_config, assert_log, tmp_path): - # This is for the fairly expected case of the PyManager install being on one - # drive, but the global commands directory being on another. In this - # situation, we can't hard link directly into the app files, and will need - # to copy. But we only need to copy once, so if a launcher_remap has been - # set (in the current process), then we have an available copy already and - # can link to that. - - def fake_link(x, y): - if x.match("launcher.txt"): - raise OSError(17, "Error for testing") - - fake_config.scratch = { - "install_command._write_alias.launcher_remap": {"launcher.txt": tmp_path / "actual_launcher.txt"}, - } - fake_config.launcher_exe = tmp_path / "launcher.txt" - fake_config.launcher_exe.write_bytes(b'Arbitrary contents') - (tmp_path / "actual_launcher.txt").write_bytes(b'Arbitrary contents') - fake_config.default_platform = '-32' - fake_config.global_dir = tmp_path / "bin" - IC._write_alias( - fake_config, - {"tag": "test"}, - {"name": "test.exe"}, - tmp_path / "target.exe", - _link=fake_link - ) - assert_log( - "Checking for launcher.*", - "Create %s linking to %s", - "Failed to create hard link.+", - ("Created %s as hard link to %s", ("test.exe", "actual_launcher.txt")), - assert_log.end_of_log(), - ) - - -@pytest.mark.parametrize("default", [1, 0]) -def test_write_alias_default(alias_checker, monkeypatch, tmp_path, default): - prefix = Path(tmp_path) / "runtime" - - class Cmd: - global_dir = Path(tmp_path) / "bin" - launcher_exe = None - scratch = {} - def get_installs(self): - return [ - { - "alias": [ - {"name": "python3.exe", "target": "p.exe"}, - {"name": "pythonw3.exe", "target": "pw.exe", "windowed": 1}, - ], - "default": default, - "prefix": prefix, - } - ] - - prefix.mkdir(exist_ok=True, parents=True) - (prefix / "p.exe").write_bytes(b"") - (prefix / "pw.exe").write_bytes(b"") - - written = [] - def write_alias(*a): - written.append(a) - - monkeypatch.setattr(IC, "_write_alias", write_alias) - monkeypatch.setattr(IC, "SHORTCUT_HANDLERS", {}) - - IC.update_all_shortcuts(Cmd()) - - if default: - # Main test: python.exe and pythonw.exe are added in automatically - assert sorted(w[2]["name"] for w in written) == ["python.exe", "python3.exe", "pythonw.exe", "pythonw3.exe"] - else: - assert sorted(w[2]["name"] for w in written) == ["python3.exe", "pythonw3.exe"] - # Ensure we still only have the two targets - assert set(w[3].name for w in written) == {"p.exe", "pw.exe"} - - def test_print_cli_shortcuts(patched_installs, assert_log, monkeypatch, tmp_path): class Cmd: scratch = {} @@ -355,6 +103,7 @@ def test_preserve_site(tmp_path): preserved = tmp_path / "_root" site = root / "site-packages" not_site = root / "site-not-packages" + not_site.mkdir(parents=True, exist_ok=True) A = site / "A" B = site / "B.txt" C = site / "C.txt" @@ -367,22 +116,28 @@ class Cmd: force = False repair = False - state = IC._preserve_site(Cmd, root) + install = { + "shortcuts": [ + {"kind": "site-dirs", "dirs": ["site-packages"]}, + ], + } + + state = IC._preserve_site(Cmd, root, install) assert not state assert not preserved.exists() Cmd.preserve_site_on_upgrade = True Cmd.force = True - state = IC._preserve_site(Cmd, root) + state = IC._preserve_site(Cmd, root, install) assert not state assert not preserved.exists() Cmd.force = False Cmd.repair = True - state = IC._preserve_site(Cmd, root) + state = IC._preserve_site(Cmd, root, install) assert not state assert not preserved.exists() Cmd.repair = False - state = IC._preserve_site(Cmd, root) + state = IC._preserve_site(Cmd, root, install) assert state == [(site, preserved / "0"), (None, preserved)] assert preserved.is_dir() @@ -395,7 +150,7 @@ class Cmd: assert b"original" == C.read_bytes() assert not preserved.exists() - state = IC._preserve_site(Cmd, root) + state = IC._preserve_site(Cmd, root, install) assert state == [(site, preserved / "0"), (None, preserved)] assert not C.exists() @@ -407,3 +162,55 @@ class Cmd: assert C.is_file() assert b"updated" == C.read_bytes() assert not preserved.exists() + + +@pytest.mark.parametrize("default", [1, 0]) +def test_write_alias_default(monkeypatch, tmp_path, default): + prefix = Path(tmp_path) / "runtime" + + class Cmd: + global_dir = Path(tmp_path) / "bin" + launcher_exe = None + scratch = {} + enable_shortcut_kinds = disable_shortcut_kinds = None + def get_installs(self): + return [ + { + "id": "test", + "alias": [ + {"name": "python3.exe", "target": "p.exe"}, + {"name": "pythonw3.exe", "target": "pw.exe", "windowed": 1}, + ], + "default": default, + "prefix": prefix, + } + ] + + prefix.mkdir(exist_ok=True, parents=True) + (prefix / "p.exe").write_bytes(b"") + (prefix / "pw.exe").write_bytes(b"") + + + created = [] + + class AliasUtils: + import manage.aliasutils as AU + calculate_aliases = staticmethod(AU.calculate_aliases) + + @staticmethod + def create_aliases(cmd, aliases): + created.extend(aliases) + + @staticmethod + def cleanup_aliases(cmd, preserve): + pass + + IC.update_all_shortcuts(Cmd(), _aliasutils=AliasUtils) + + if default: + # Main test: python.exe and pythonw.exe are added in automatically + assert sorted(a.name for a in created) == ["python", "python3.exe", "pythonw", "pythonw3.exe"] + else: + assert sorted(a.name for a in created) == ["python3.exe", "pythonw3.exe"] + # Ensure we still only have the two targets + assert set(a.target for a in created) == {"p.exe", "pw.exe"} diff --git a/tests/test_pathutils.py b/tests/test_pathutils.py index 5c6f62e..54ebc41 100644 --- a/tests/test_pathutils.py +++ b/tests/test_pathutils.py @@ -15,3 +15,18 @@ def test_path_match(): assert not p.match("example*") assert not p.match("example*.com") assert not p.match("*ple*") + + +def test_path_stem(): + p = Path("python3.12.exe") + assert p.stem == "python3.12" + assert p.suffix == ".exe" + p = Path("python3.12") + assert p.stem == "python3" + assert p.suffix == ".12" + p = Path("python3") + assert p.stem == "python3" + assert p.suffix == "" + p = Path(".exe") + assert p.stem == "" + assert p.suffix == ".exe"