Skip to content

Commit

Permalink
Have duplicate metadata and invalid reqs warnings honor --warn (#357)
Browse files Browse the repository at this point in the history
This resolves #355 by making changes and refactors to the warning logic.
It does so by introducing a module-level singleton "WarningPrinter"
object and refactors the code in such a way to integrate this object for
it to be used.
  • Loading branch information
kemzeb committed Apr 28, 2024
1 parent c325803 commit 4a5f90a
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 73 deletions.
24 changes: 17 additions & 7 deletions src/pipdeptree/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@
from pipdeptree._models import PackageDAG
from pipdeptree._render import render
from pipdeptree._validate import validate
from pipdeptree._warning import WarningPrinter, WarningType, get_warning_printer


def main(args: Sequence[str] | None = None) -> None | int:
"""CLI - The main function called as entry point."""
options = get_options(args)

# Warnings are only enabled when using text output.
is_text_output = not any([options.json, options.json_tree, options.output_format])
if not is_text_output:
options.warn = WarningType.SILENCE
warning_printer = get_warning_printer()
warning_printer.warning_type = options.warn

pkgs = get_installed_distributions(
interpreter=options.python, local_only=options.local_only, user_only=options.user_only
)
tree = PackageDAG.from_pkgs(pkgs)
is_text_output = not any([options.json, options.json_tree, options.output_format])

return_code = validate(options, is_text_output, tree)
validate(tree)

# Reverse the tree (if applicable) before filtering, thus ensuring, that the filter will be applied on ReverseTree
if options.reverse:
Expand All @@ -35,14 +42,17 @@ def main(args: Sequence[str] | None = None) -> None | int:
try:
tree = tree.filter_nodes(show_only, exclude)
except ValueError as e:
if options.warn in {"suppress", "fail"}:
print(e, file=sys.stderr) # noqa: T201
return_code |= 1 if options.warn == "fail" else 0
return return_code
if warning_printer.should_warn():
warning_printer.print_single_line(str(e))
return _determine_return_code(warning_printer)

render(options, tree)

return return_code
return _determine_return_code(warning_printer)


def _determine_return_code(warning_printer: WarningPrinter) -> int:
return 1 if warning_printer.has_warned_with_failure() else 0


if __name__ == "__main__":
Expand Down
81 changes: 73 additions & 8 deletions src/pipdeptree/_cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from __future__ import annotations

import enum
import sys
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from typing import TYPE_CHECKING, Sequence, cast
from argparse import Action, ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from typing import Any, Sequence, cast

from .version import __version__
from pipdeptree._warning import WarningType

if TYPE_CHECKING:
from typing import Literal
from .version import __version__


class Options(Namespace):
Expand All @@ -16,7 +16,7 @@ class Options(Namespace):
all: bool
local_only: bool
user_only: bool
warn: Literal["silence", "suppress", "fail"]
warn: WarningType
reverse: bool
packages: str
exclude: str
Expand All @@ -40,11 +40,11 @@ def build_parser() -> ArgumentParser:
parser.add_argument(
"-w",
"--warn",
action="store",
dest="warn",
type=WarningType,
nargs="?",
default="suppress",
choices=("silence", "suppress", "fail"),
action=EnumAction,
help=(
"warning control: suppress will show warnings but return 0 whether or not they are present; silence will "
"not show warnings at all and always return 0; fail will show warnings and return 1 if any are present"
Expand Down Expand Up @@ -154,6 +154,71 @@ def get_options(args: Sequence[str] | None) -> Options:
return cast(Options, parsed_args)


class EnumAction(Action):
"""
Generic action that exists to convert a string into a Enum value that is then added into a `Namespace` object.
This custom action exists because argparse doesn't have support for enums.
References
----------
- https://github.com/python/cpython/issues/69247#issuecomment-1308082792
- https://docs.python.org/3/library/argparse.html#action-classes
"""

def __init__( # noqa: PLR0913, PLR0917
self,
option_strings: list[str],
dest: str,
nargs: str | None = None,
const: Any | None = None,
default: Any | None = None,
type: Any | None = None, # noqa: A002
choices: Any | None = None,
required: bool = False, # noqa: FBT001, FBT002
help: str | None = None, # noqa: A002
metavar: str | None = None,
) -> None:
if not type or not issubclass(type, enum.Enum):
msg = "type must be a subclass of Enum"
raise TypeError(msg)
if not isinstance(default, str):
msg = "default must be defined with a string value"
raise TypeError(msg)

choices = tuple(e.name.lower() for e in type)
if default not in choices:
msg = "default value should be among the enum choices"
raise ValueError(msg)

super().__init__(
option_strings=option_strings,
dest=dest,
nargs=nargs,
const=const,
default=default,
type=None, # We return None here so that we default to str.
choices=choices,
required=required,
help=help,
metavar=metavar,
)

self._enum = type

def __call__(
self,
parser: ArgumentParser, # noqa: ARG002
namespace: Namespace,
value: Any,
option_string: str | None = None, # noqa: ARG002
) -> None:
value = value or self.default
value = next(e for e in self._enum if e.name.lower() == value)
setattr(namespace, self.dest, value)


__all__ = [
"Options",
"get_options",
Expand Down
22 changes: 15 additions & 7 deletions src/pipdeptree/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from packaging.utils import canonicalize_name

from pipdeptree._warning import get_warning_printer


def get_installed_distributions(
interpreter: str = str(sys.executable),
Expand Down Expand Up @@ -42,6 +44,8 @@ def get_installed_distributions(
else:
original_dists = distributions()

warning_printer = get_warning_printer()

# Since importlib.metadata.distributions() can return duplicate packages, we need to handle this. pip's approach is
# to keep track of each package metadata it finds, and if it encounters one again it will simply just ignore it. We
# take it one step further and warn the user that there are duplicate packages in their environment.
Expand All @@ -55,11 +59,17 @@ def get_installed_distributions(
seen_dists[normalized_name] = dist
dists.append(dist)
continue
already_seen_dists = first_seen_to_already_seen_dists_dict.setdefault(seen_dists[normalized_name], [])
already_seen_dists.append(dist)

if first_seen_to_already_seen_dists_dict:
render_duplicated_dist_metadata_text(first_seen_to_already_seen_dists_dict)
if warning_printer.should_warn():
already_seen_dists = first_seen_to_already_seen_dists_dict.setdefault(seen_dists[normalized_name], [])
already_seen_dists.append(dist)

should_print_warning = warning_printer.should_warn() and first_seen_to_already_seen_dists_dict
if should_print_warning:
warning_printer.print_multi_line(
"Duplicate package metadata found",
lambda: render_duplicated_dist_metadata_text(first_seen_to_already_seen_dists_dict),
ignore_fail=True,
)

return dists

Expand All @@ -77,7 +87,6 @@ def render_duplicated_dist_metadata_text(
dist_list = entries_to_pairs_dict.setdefault(entry, [])
dist_list.append((first_seen, dist))

print("Warning!!! Duplicate package metadata found:", file=sys.stderr) # noqa: T201
for entry, pairs in entries_to_pairs_dict.items():
print(f'"{entry}"', file=sys.stderr) # noqa: T201
for first_seen, dist in pairs:
Expand All @@ -88,7 +97,6 @@ def render_duplicated_dist_metadata_text(
),
file=sys.stderr,
)
print("-" * 72, file=sys.stderr) # noqa: T201


__all__ = [
Expand Down
20 changes: 12 additions & 8 deletions src/pipdeptree/_models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
from importlib.metadata import Distribution


from .package import DistPackage, InvalidRequirementError, ReqPackage
from pipdeptree._warning import get_warning_printer

from .package import DistPackage, InvalidRequirementError, ReqPackage

def render_invalid_reqs_text_if_necessary(dist_name_to_invalid_reqs_dict: dict[str, list[str]]) -> None:
if not dist_name_to_invalid_reqs_dict:
return

print("Warning!!! Invalid requirement strings found for the following distributions:", file=sys.stderr) # noqa: T201
def render_invalid_reqs_text(dist_name_to_invalid_reqs_dict: dict[str, list[str]]) -> None:
for dist_name, invalid_reqs in dist_name_to_invalid_reqs_dict.items():
print(dist_name, file=sys.stderr) # noqa: T201

for invalid_req in invalid_reqs:
print(f' Skipping "{invalid_req}"', file=sys.stderr) # noqa: T201
print("-" * 72, file=sys.stderr) # noqa: T201


class PackageDAG(Mapping[DistPackage, List[ReqPackage]]):
Expand Down Expand Up @@ -53,6 +50,7 @@ class PackageDAG(Mapping[DistPackage, List[ReqPackage]]):

@classmethod
def from_pkgs(cls, pkgs: list[Distribution]) -> PackageDAG:
warning_printer = get_warning_printer()
dist_pkgs = [DistPackage(p) for p in pkgs]
idx = {p.key: p for p in dist_pkgs}
m: dict[DistPackage, list[ReqPackage]] = {}
Expand All @@ -65,7 +63,8 @@ def from_pkgs(cls, pkgs: list[Distribution]) -> PackageDAG:
req = next(requires_iterator)
except InvalidRequirementError as err:
# We can't work with invalid requirement strings. Let's warn the user about them.
dist_name_to_invalid_reqs_dict.setdefault(p.project_name, []).append(str(err))
if warning_printer.should_warn():
dist_name_to_invalid_reqs_dict.setdefault(p.project_name, []).append(str(err))
continue
except StopIteration:
break
Expand All @@ -78,7 +77,12 @@ def from_pkgs(cls, pkgs: list[Distribution]) -> PackageDAG:
reqs.append(pkg)
m[p] = reqs

render_invalid_reqs_text_if_necessary(dist_name_to_invalid_reqs_dict)
should_print_warning = warning_printer.should_warn() and dist_name_to_invalid_reqs_dict
if should_print_warning:
warning_printer.print_multi_line(
"Invalid requirement strings found for the following distributions",
lambda: render_invalid_reqs_text(dist_name_to_invalid_reqs_dict),
)

return cls(m)

Expand Down
64 changes: 29 additions & 35 deletions src/pipdeptree/_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,28 @@
from collections import defaultdict
from typing import TYPE_CHECKING

from pipdeptree._warning import get_warning_printer

if TYPE_CHECKING:
from pipdeptree._models.package import Package

from ._cli import Options
from ._models import DistPackage, PackageDAG, ReqPackage


def validate(args: Options, is_text_output: bool, tree: PackageDAG) -> int: # noqa: FBT001
def validate(tree: PackageDAG) -> None:
# Before any reversing or filtering, show warnings to console, about possibly conflicting or cyclic deps if found
# and warnings are enabled (i.e. only if output is to be printed to console)
if is_text_output and args.warn != "silence":
warning_printer = get_warning_printer()
if warning_printer.should_warn():
conflicts = conflicting_deps(tree)
if conflicts:
render_conflicts_text(conflicts)
print("-" * 72, file=sys.stderr) # noqa: T201
warning_printer.print_multi_line(
"Possibly conflicting dependencies found", lambda: render_conflicts_text(conflicts)
)

cycles = cyclic_deps(tree)
if cycles:
render_cycles_text(cycles)
print("-" * 72, file=sys.stderr) # noqa: T201

if args.warn == "fail" and (conflicts or cycles):
return 1
return 0
warning_printer.print_multi_line("Cyclic dependencies found", lambda: render_cycles_text(cycles))


def conflicting_deps(tree: PackageDAG) -> dict[DistPackage, list[ReqPackage]]:
Expand All @@ -50,16 +48,14 @@ def conflicting_deps(tree: PackageDAG) -> dict[DistPackage, list[ReqPackage]]:


def render_conflicts_text(conflicts: dict[DistPackage, list[ReqPackage]]) -> None:
if conflicts:
print("Warning!!! Possibly conflicting dependencies found:", file=sys.stderr) # noqa: T201
# Enforce alphabetical order when listing conflicts
pkgs = sorted(conflicts.keys())
for p in pkgs:
pkg = p.render_as_root(frozen=False)
print(f"* {pkg}", file=sys.stderr) # noqa: T201
for req in conflicts[p]:
req_str = req.render_as_branch(frozen=False)
print(f" - {req_str}", file=sys.stderr) # noqa: T201
# Enforce alphabetical order when listing conflicts
pkgs = sorted(conflicts.keys())
for p in pkgs:
pkg = p.render_as_root(frozen=False)
print(f"* {pkg}", file=sys.stderr) # noqa: T201
for req in conflicts[p]:
req_str = req.render_as_branch(frozen=False)
print(f" - {req_str}", file=sys.stderr) # noqa: T201


def cyclic_deps(tree: PackageDAG) -> list[list[Package]]:
Expand Down Expand Up @@ -104,20 +100,18 @@ def dfs(root: DistPackage, current: Package, visited: set[str], cdeps: list[Pack


def render_cycles_text(cycles: list[list[Package]]) -> None:
if cycles:
print("Warning!! Cyclic dependencies found:", file=sys.stderr) # noqa: T201
# List in alphabetical order the dependency that caused the cycle (i.e. the second-to-last Package element)
cycles = sorted(cycles, key=lambda c: c[len(c) - 2].key)
for cycle in cycles:
print("*", end=" ", file=sys.stderr) # noqa: T201

size = len(cycle) - 1
for idx, pkg in enumerate(cycle):
if idx == size:
print(f"{pkg.project_name}", end="", file=sys.stderr) # noqa: T201
else:
print(f"{pkg.project_name} =>", end=" ", file=sys.stderr) # noqa: T201
print(file=sys.stderr) # noqa: T201
# List in alphabetical order the dependency that caused the cycle (i.e. the second-to-last Package element)
cycles = sorted(cycles, key=lambda c: c[len(c) - 2].key)
for cycle in cycles:
print("*", end=" ", file=sys.stderr) # noqa: T201

size = len(cycle) - 1
for idx, pkg in enumerate(cycle):
if idx == size:
print(f"{pkg.project_name}", end="", file=sys.stderr) # noqa: T201
else:
print(f"{pkg.project_name} =>", end=" ", file=sys.stderr) # noqa: T201
print(file=sys.stderr) # noqa: T201


__all__ = [
Expand Down
Loading

0 comments on commit 4a5f90a

Please sign in to comment.