Skip to content

Commit

Permalink
Change algorithm for independence contract
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonym committed Oct 4, 2022
1 parent 4076d62 commit 2acbff4
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 140 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ latest
* Remove upper bounds on dependencies. This allows usage of Grimp 2.0, which should significantly speed up checking of
layers contracts.
* Add --verbose flag to lint-imports command.
* Improve algorithm for independence contracts, in the following ways:
- It is significantly faster.
- As with layers contracts, reports of illegal indirect imports reports now include multiple start
and end points (if they exist).
- Illegal indirect imports that are via other modules listed in the contract are no longer listed.

1.3.0 (2022-08-22)
------------------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def read(*names, **kwargs):
"Topic :: Utilities",
],
python_requires=">=3.7",
install_requires=["click>=6", "grimp>=1.3", "typing-extensions>=3.10.0.0"],
install_requires=["click>=6", "grimp>=2.0", "typing-extensions>=3.10.0.0"],
extras_require={"toml": ["toml"]},
entry_points={
"console_scripts": ["lint-imports = importlinter.cli:lint_imports_command"]
Expand Down
107 changes: 106 additions & 1 deletion src/importlinter/contracts/_common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from typing import List, Optional, Tuple
"""
Helper functions used by more than one contract type.
Code in here should not be relied upon as a public API; if you're
relying on it for a custom contract type, be aware things may change
without warning.
"""

from typing import List, Optional, Tuple, Union, cast

from typing_extensions import TypedDict

from importlinter.application import output
from importlinter.domain.imports import Module
from importlinter.domain.ports.graph import ImportGraph


class Link(TypedDict):
Expand Down Expand Up @@ -31,6 +41,101 @@ def render_chain_data(chain_data: DetailedChain) -> None:
_render_direct_import(main_chain[-1], extra_lasts=chain_data["extra_lasts"])


def find_segments(
graph: ImportGraph, reference_graph: ImportGraph, importer: Module, imported: Module
) -> List[Chain]:
"""
Return list of headless and tailless chains.
Two graphs are passed in: the first is mutated, the second is used purely as a reference to
look up import details which are otherwise removed during mutation.
"""
segments = []
for chain in _pop_shortest_chains(graph, importer=importer.name, imported=imported.name):
if len(chain) == 2:
raise ValueError("Direct chain found - these should have been removed.")
segment: List[Link] = []
for importer_in_chain, imported_in_chain in [
(chain[i], chain[i + 1]) for i in range(len(chain) - 1)
]:
import_details = reference_graph.get_import_details(
importer=importer_in_chain, imported=imported_in_chain
)
line_numbers = tuple(sorted(set(cast(int, j["line_number"]) for j in import_details)))
segment.append(
{
"importer": importer_in_chain,
"imported": imported_in_chain,
"line_numbers": line_numbers,
}
)
segments.append(segment)
return segments


def segments_to_collapsed_chains(
graph: ImportGraph, segments: List[Chain], importer: Module, imported: Module
) -> List[DetailedChain]:
collapsed_chains: List[DetailedChain] = []
for segment in segments:
head_imports: List[Link] = []
imported_module = segment[0]["imported"]
candidate_modules = sorted(graph.find_modules_that_directly_import(imported_module))
for module in [
m
for m in candidate_modules
if Module(m) == importer or Module(m).is_descendant_of(importer)
]:
import_details_list = graph.get_import_details(
importer=module, imported=imported_module
)
line_numbers = tuple(
sorted(set(cast(int, j["line_number"]) for j in import_details_list))
)
head_imports.append(
{"importer": module, "imported": imported_module, "line_numbers": line_numbers}
)

tail_imports: List[Link] = []
importer_module = segment[-1]["importer"]
candidate_modules = sorted(graph.find_modules_directly_imported_by(importer_module))
for module in [
m
for m in candidate_modules
if Module(m) == imported or Module(m).is_descendant_of(imported)
]:
import_details_list = graph.get_import_details(
importer=importer_module, imported=module
)
line_numbers = tuple(
sorted(set(cast(int, j["line_number"]) for j in import_details_list))
)
tail_imports.append(
{"importer": importer_module, "imported": module, "line_numbers": line_numbers}
)

collapsed_chains.append(
{
"chain": [head_imports[0]] + segment[1:-1] + [tail_imports[0]],
"extra_firsts": head_imports[1:],
"extra_lasts": tail_imports[1:],
}
)

return collapsed_chains


def _pop_shortest_chains(graph: ImportGraph, importer: str, imported: str):
chain: Union[Optional[Tuple[str, ...]], bool] = True
while chain:
chain = graph.find_shortest_chain(importer, imported)
if chain:
# Remove chain of imports from graph.
for index in range(len(chain) - 1):
graph.remove_import(importer=chain[index], imported=chain[index + 1])
yield chain


def _render_direct_import(
direct_import,
first_line: bool = False,
Expand Down
148 changes: 137 additions & 11 deletions src/importlinter/contracts/independence.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from itertools import permutations
from typing import List, cast

Expand All @@ -11,7 +12,13 @@
from importlinter.domain.imports import Module
from importlinter.domain.ports.graph import ImportGraph

from ._common import DetailedChain, Link, render_chain_data
from ._common import (
DetailedChain,
Link,
find_segments,
render_chain_data,
segments_to_collapsed_chains,
)


class _SubpackageChainData(TypedDict):
Expand Down Expand Up @@ -45,7 +52,8 @@ class IndependenceContract(Contract):
unmatched_ignore_imports_alerting = fields.EnumField(AlertLevel, default=AlertLevel.ERROR)

def check(self, graph: ImportGraph, verbose: bool) -> ContractCheck:
invalid_chains = []
invalid_chains: List[_SubpackageChainData] = []
modules = cast(List[Module], self.modules)

warnings = contract_utils.remove_ignored_imports(
graph=graph,
Expand All @@ -55,21 +63,65 @@ def check(self, graph: ImportGraph, verbose: bool) -> ContractCheck:

self._check_all_modules_exist_in_graph(graph)

for subpackage_1, subpackage_2 in permutations(self.modules, r=2): # type: ignore
temp_graph = copy.deepcopy(graph)
# First pass: direct imports.
for subpackage_1, subpackage_2 in permutations(modules, r=2): # type: ignore
output.verbose_print(
verbose,
"Searching for import chains from " f"{subpackage_1} to {subpackage_2}...",
"Searching for direct imports from " f"{subpackage_1} to {subpackage_2}...",
)
with settings.TIMER as timer:
subpackage_chain_data = self._build_subpackage_chain_data(
upstream_module=subpackage_2,
downstream_module=subpackage_1,
graph=graph,
direct_chains = self._pop_direct_imports(
importer_package=subpackage_1,
imported_package=subpackage_2,
graph=temp_graph,
)
if subpackage_chain_data["chains"]:
invalid_chains.append(subpackage_chain_data)
if direct_chains:
invalid_chains.append(
{
"upstream_module": subpackage_2.name,
"downstream_module": subpackage_1.name,
"chains": direct_chains,
}
)
if verbose:
chain_count = len(direct_chains)
pluralized = "s" if chain_count != 1 else ""
output.print(
f"Found {chain_count} illegal chain{pluralized} "
f"in {timer.duration_in_s}s.",
)

# Second pass: indirect imports.
self._squash_modules(graph=temp_graph, modules_to_squash=modules)
for subpackage_1, subpackage_2 in permutations(modules, r=2): # type: ignore
output.verbose_print(
verbose,
"Searching for indirect imports from " f"{subpackage_1} to {subpackage_2}...",
)
with settings.TIMER as timer:
other_independent_packages = [
m for m in modules if m not in (subpackage_1, subpackage_2)
]
trimmed_graph = self._make_graph_with_packages_removed(
temp_graph, packages_to_remove=other_independent_packages
)
indirect_chains = self._get_indirect_collapsed_chains(
trimmed_graph=trimmed_graph,
reference_graph=graph,
importer_package=subpackage_1,
imported_package=subpackage_2,
)
if indirect_chains:
invalid_chains.append(
{
"upstream_module": subpackage_2.name,
"downstream_module": subpackage_1.name,
"chains": indirect_chains,
}
)
if verbose:
chain_count = len(subpackage_chain_data["chains"])
chain_count = len(indirect_chains)
pluralized = "s" if chain_count != 1 else ""
output.print(
f"Found {chain_count} illegal chain{pluralized} "
Expand Down Expand Up @@ -102,6 +154,80 @@ def _check_all_modules_exist_in_graph(self, graph: ImportGraph) -> None:
if module.name not in graph.modules:
raise ValueError(f"Module '{module.name}' does not exist.")

def _squash_modules(self, graph: ImportGraph, modules_to_squash: List[Module]) -> None:
for module in modules_to_squash:
graph.squash_module(module.name)

def _make_graph_with_packages_removed(
self, graph: ImportGraph, packages_to_remove: List[Module]
) -> ImportGraph:
"""
Assumes the packages are squashed.
"""
new_graph = copy.deepcopy(graph)
for package in packages_to_remove:
new_graph.remove_module(package.name)
return new_graph

def _get_indirect_collapsed_chains(
self,
trimmed_graph: ImportGraph,
reference_graph: ImportGraph,
importer_package: Module,
imported_package: Module,
) -> List[DetailedChain]:
"""
Return chains from the importer to the imported package.
Assumes the packages are both squashed.
"""
segments = find_segments(
trimmed_graph,
reference_graph=reference_graph,
importer=importer_package,
imported=imported_package,
)
return segments_to_collapsed_chains(
reference_graph, segments, importer=importer_package, imported=imported_package
)

def _pop_direct_imports(
self,
importer_package: Module,
imported_package: Module,
graph: ImportGraph,
) -> List[DetailedChain]:
"""
Remove and return direct imports from the importer to the imported package.
"""
direct_imports: List[DetailedChain] = []
importer_modules = {importer_package.name} | graph.find_descendants(importer_package.name)
imported_modules = {imported_package.name} | graph.find_descendants(imported_package.name)

for importer_module in importer_modules:
for imported_module in imported_modules:
import_details = graph.get_import_details(
importer=importer_module, imported=imported_module
)
if import_details:
line_numbers = tuple(cast(int, i["line_number"]) for i in import_details)
direct_imports.append(
{
"chain": [
{
"importer": cast(str, import_details[0]["importer"]),
"imported": cast(str, import_details[0]["imported"]),
"line_numbers": line_numbers,
}
],
"extra_firsts": [],
"extra_lasts": [],
}
)
graph.remove_import(importer=importer_module, imported=imported_module)

return direct_imports

def _build_subpackage_chain_data(
self, upstream_module: Module, downstream_module: Module, graph: ImportGraph
) -> _SubpackageChainData:
Expand Down

0 comments on commit 2acbff4

Please sign in to comment.