Skip to content

Commit

Permalink
Let bar plugin thoroughly search for Python and mpm
Browse files Browse the repository at this point in the history
  • Loading branch information
kdeldycke committed Feb 28, 2024
1 parent 34dc92e commit 2daaed5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 70 deletions.
194 changes: 137 additions & 57 deletions meta_package_manager/bar_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@
import re
import sys
from configparser import RawConfigParser
from enum import Enum
from functools import cached_property
from operator import methodcaller
from operator import itemgetter, methodcaller
from pathlib import Path
from shutil import which
from subprocess import run
from textwrap import dedent
from typing import Generator

python_min_version = (3, 8, 0)
PYTHON_MIN_VERSION = (3, 8, 0)
"""Minimal requirement is aligned to mpm."""

MPM_MIN_VERSION = (5, 0, 0)
"""Mpm v5.0.0 was the first version taking care of the complete layout rendering."""


def v_to_str(version_tuple: tuple[int, ...] | None) -> str:
"""Transforms into a string a tuple of integers representing a version."""
Expand All @@ -76,12 +82,8 @@ def v_to_str(version_tuple: tuple[int, ...] | None) -> str:
return ".".join(map(str, version_tuple))


if sys.version_info < python_min_version:
msg = (
f"Bar plugin invoked with Python {sys.version}, but requires "
f"Python >= {v_to_str(python_min_version)}"
)
raise SystemError(msg)
Venv = Enum("Venv", ["PIPENV", "POETRY", "VIRTUALENV"])
"""Type of virtualenv we are capable of detecting."""


class MPMPlugin:
Expand All @@ -95,9 +97,6 @@ class MPMPlugin:
and `SwiftBar dialect <https://github.com/swiftbar/SwiftBar#plugin-api>`_.
"""

mpm_min_version = (5, 0, 0)
"""Mpm v5.0.0 was the first version taking care of the complete layout rendering."""

@staticmethod
def getenv_str(var, default: str | None = None) -> str | None:
"""Utility to get environment variables.
Expand Down Expand Up @@ -170,53 +169,124 @@ def is_swiftbar(self) -> bool:
return self.getenv_bool("SWIFTBAR")

@cached_property
def python_path(self) -> str:
"""Returns the system's Python binary path.
def all_pythons(self) -> list[Path]:
"""Search for any Python on the system.
Returns a generator of normalized and deduplicated ``Path`` to Python binaries.
This plugin being run from Python, we have the one called by Xbar/SwiftBar to
fallback to (i.e. ``sys.executable``). But before that, we attempt to locate it
by respecting the environment variables.
Filters out old Python interpreters.
We first try to locate Python by respecting the environment variables as-is,
i.e. as defined by the user. Then we return the Python interpreter used to
execute this script.
TODO: try to tweak the env vars to look for homebrew location etc?
"""
for bin_name in (sys.executable, "python", "python3"):
collected = []
seen = set()
for bin_name in ("python3", "python", sys.executable):
py_path = which(bin_name)
if py_path:
return py_path
raise FileNotFoundError("No Python binary found on the system.")
if not py_path:
continue

@cached_property
def mpm_exec(self) -> tuple[str, ...]:
"""Search for mpm execution alternatives, either direct ``mpm`` call or as an
executable Python module."""
# XXX Local debugging and development.
# return "poetry", "run", "mpm"
mpm_exec = which("mpm")
if mpm_exec:
return (mpm_exec,)
return self.python_path, "-m", "meta_package_manager"
normalized_path = os.path.normcase(Path(py_path).resolve())
if normalized_path in seen:
continue
seen.add(normalized_path)

process = run(
(str(normalized_path), "--version"),
capture_output=True,
encoding="utf-8",
)
version_string = process.stdout.split()[1]
python_version = tuple(map(int, version_string.split(".")))
# Is Python too old?
if python_version < PYTHON_MIN_VERSION:
continue

collected.append(normalized_path)

return collected

@staticmethod
def search_venv(folder: Path) -> tuple[Venv | None, tuple[str, ...] | None]:
"""Search for signs of a virtual env in the provided folder.
Returns the type of the detected venv and CLI arguments that can be used to run
a command from the virtualenv context.
Returns ``(None, None)`` if the folder is not a venv.
"""
if (folder / "Pipfile").is_file():
return Venv.PIPENV, (f"PIPENV_PIPFILE='{folder}'", "pipenv", "run", "mpm")

if (folder / "poetry.lock").is_file():
return Venv.POETRY, ("poetry", "run", "--directory", str(folder), "mpm")

if (folder / "requirements.txt").is_file() or (folder / "setup.py").is_file():
return Venv.VIRTUALENV, (
f"VIRTUAL_ENV='{folder}'",
"python",
"-m",
"meta_package_manager",
)

return None, None

def search_mpm(self) -> Generator[tuple[str, ...], None, None]:
"""Iterare over possible CLI commands to execute ``mpm``.
Should be able to produce the full spectrum of alternative commands we can use
to invoke ``mpm`` over different context.
"""
# Search for an mpm executable in the environment, be it a script or a binary.
mpm_bin = which("mpm")
if mpm_bin:
yield (mpm_bin,)

# Search for a meta_package_manager package installed in any Python found on
# the system.
for python_path in self.all_pythons:
yield python_path, "-m", "meta_package_manager"

# This script might be itself part of an mpm installation that was deployed in
# a virtualenv. So walk back the whole folder tree from here in search of a
# virtualenv.
for folder in Path(__file__).parents:
# Stop looking beyond Home.
if folder == Path.home():
continue

venv_type, run_args = self.search_venv(folder)
if not venv_type:
continue

yield run_args

def check_mpm(
self,
) -> tuple[bool, tuple[int, ...] | None, bool, str | Exception | None]:
self, mpm_cli_args: tuple[str, ...]
) -> tuple[bool, bool, tuple[int, ...] | None, str | Exception | None]:
"""Test-run mpm execution and extract its version."""
error: str | Exception | None = None
try:
process = run(
# Output a color-less version just in case the script is not run in a
# non-interactive shell, or Click/Click-Extra autodetection fails.
(*self.mpm_exec, "--no-color", "--version"),
(*mpm_cli_args, "--no-color", "--version"),
capture_output=True,
encoding="utf-8",
)
error = process.stderr
except FileNotFoundError as ex:
error = ex

installed = False
runnable = False
mpm_version = None
up_to_date = False
# Is mpm CLI installed on the system?
# Is mpm runnable as-is with provided CLI arguments?
if not process.returncode and not error:
installed = True
runnable = True
# This regular expression is designed to extract the version number,
# whether it is surrounded by ANSI color escape sequence or not.
match = re.compile(
Expand All @@ -236,10 +306,10 @@ def check_mpm(
version_string = match.groupdict()["version"]
mpm_version = tuple(map(int, version_string.split(".")))
# Is mpm too old?
if mpm_version >= self.mpm_min_version:
if mpm_version >= MPM_MIN_VERSION:
up_to_date = True

return installed, mpm_version, up_to_date, error
return runnable, up_to_date, mpm_version, error

@staticmethod
def pp(label: str, *args: str) -> None:
Expand Down Expand Up @@ -286,19 +356,34 @@ def print_error(self, message: str | Exception, submenu: str = "") -> None:
"symbolize=false" if self.is_swiftbar else "",
)

@cached_property
def ranked_mpm(self) -> list[tuple[str, ...]]:
"""Rank the best mpm candidates we can find on the system."""
# Collect all mpm.
all_mpm = (
(mpm_candidate, self.check_mpm(mpm_candidate))
for mpm_candidate in self.search_mpm()
)
# Sort them by runnability, then up-to-date status, then version number, and error.
return sorted(all_mpm, key=itemgetter(1), reverse=True)

@cached_property
def best_mpm(self) -> tuple[str, ...]:
return self.ranked_mpm[0]

def print_menu(self) -> None:
"""Print the main menu."""
mpm_installed, _, mpm_up_to_date, error = self.check_mpm()
if not mpm_installed or not mpm_up_to_date:
mpm_args, (runnable, up_to_date, _mpm_version, error) = self.best_mpm
if not runnable or not up_to_date:
self.print_error_header()
if error:
self.print_error(error)
print("---")
action_msg = "Install" if not mpm_installed else "Upgrade"
min_version_str = v_to_str(self.mpm_min_version)
action_msg = "Install" if not runnable else "Upgrade"
min_version_str = v_to_str(MPM_MIN_VERSION)
self.pp(
f"{action_msg} mpm >= v{min_version_str}",
f"shell={self.python_path}",
f"shell={self.all_pythons[0]}",
"param1=-m",
"param2=pip",
"param3=install",
Expand All @@ -316,15 +401,15 @@ def print_menu(self) -> None:
return

# Force a sync of all local package databases.
run((*self.mpm_exec, "--verbosity", "ERROR", "sync"))
run((*mpm_args, "--verbosity", "ERROR", "sync"))

# Fetch outdated packages from all package managers available on the system.
# We defer all rendering to mpm itself so it can compute more intricate layouts.
process = run(
# We silence all errors but the CRITICAL ones. All others will be captured
# by mpm in --plugin-output mode and rendered back into each manager
# section.
(*self.mpm_exec, "--verbosity", "CRITICAL", "outdated", "--plugin-output"),
(*mpm_args, "--verbosity", "CRITICAL", "outdated", "--plugin-output"),
capture_output=True,
encoding="utf-8",
)
Expand All @@ -344,25 +429,20 @@ def print_menu(self) -> None:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--check-mpm",
"--search-mpm",
action="store_true",
help="Locate mpm on the system and check its version.",
help="Locate all mpm on the system and sort them by best candidates.",
)
args = parser.parse_args()

plugin = MPMPlugin()

if args.check_mpm:
mpm_installed, mpm_version, mpm_up_to_date, error = plugin.check_mpm()
if not mpm_installed:
raise FileNotFoundError(error)
if not mpm_up_to_date:
msg = (
f"{plugin.mpm_exec} is too old: "
f"{v_to_str(mpm_version)} < {v_to_str(plugin.mpm_min_version)}"
if args.search_mpm:
for mpm_args, (runnable, up_to_date, version, error) in plugin.ranked_mpm:
print(
f"{' '.join(mpm_args)} | runnable: {runnable} | up to date: {up_to_date}"
f" | version: {version} | error: {error}"
)
raise ValueError(msg)
print(f"{' '.join(plugin.mpm_exec)} v{v_to_str(mpm_version)}")

else:
plugin.print_menu()
7 changes: 5 additions & 2 deletions meta_package_manager/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class BarPluginRenderer(MPMPlugin):
dialect.
The minimal code to locate ``mpm``, then call it and print its output resides in the
plugin itself at :py:meth:`meta_package_manager.bar_plugin.MPMPlugin.mpm_exec`.
plugin itself at :py:meth:`meta_package_manager.bar_plugin.MPMPlugin.best_mpm`.
All other stuff, especially the rendering code, is managed here, to allow for more
complex layouts relying on external Python dependencies. This also limits the number
Expand Down Expand Up @@ -445,8 +445,11 @@ def add_upgrade_cli(self, outdated_data):
f"{theme.invoked_command(manager_id)} "
"does not implement upgrade_all_cli.",
)
mpm_args, (_runnable, _up_to_date, _mpm_version, _error) = (
self.best_mpm
)
upgrade_all_cli = (
*self.mpm_exec,
*mpm_args,
f"--{manager_id}",
"upgrade",
"--all",
Expand Down
15 changes: 4 additions & 11 deletions meta_package_manager/tests/test_bar_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import annotations

import logging
import re
import subprocess
from collections import Counter
Expand Down Expand Up @@ -171,16 +170,12 @@ class TestBarPlugin:
def plugin_output_checks(self, checklist, extra_env=None):
"""Run the plugin script and check its output against the checklist."""
process = subprocess.run(
# Force the plugin to be called within Poetry venv to not have it seeking
# for macOS's default Python.
("poetry", "run", "python", bar_plugin.__file__),
bar_plugin.__file__,
capture_output=True,
encoding="utf-8",
env=env_copy(extra_env),
)

logging.info(process.stderr)

assert not process.stderr
assert process.returncode == 0

Expand Down Expand Up @@ -258,15 +253,13 @@ def test_plugin_shell_invocation(self, shell_args):
for the mpm executable and extract its version.
"""
process = subprocess.run(
_subcmd_args(shell_args, bar_plugin.__file__, "--check-mpm"),
_subcmd_args(shell_args, bar_plugin.__file__, "--search-mpm"),
capture_output=True,
encoding="utf-8",
)

logging.info(process.stderr)

assert not process.stderr
assert re.match(r"^.+ v\d+\.\d+\.\d+$", process.stdout)
assert process.stdout
assert process.returncode == 0

@shell_python_args
Expand All @@ -286,5 +279,5 @@ def test_python_shell_invocation(self, shell_args, python_args):
# We need to parse the version to account for alpha release,
# like Python `3.12.0a4`.
assert parse_version(process.stdout.split()[1]) >= parse_version(
".".join(str(i) for i in bar_plugin.python_min_version),
".".join(str(i) for i in bar_plugin.PYTHON_MIN_VERSION),
)

0 comments on commit 2daaed5

Please sign in to comment.