Added a new layering violation checker, easily maintainable via YAML.
Hooked in, but not yet activated.  This eases tweaking things and
incrementally creating a sensible config.  After that, the previous
hard-to-maintain layering checker can be removed.

Change-Id: I1a3e5b71aa9e11e7cad62d4265df74f9939a99d4
spt29 committed Sep 14, 2022
# pickle collected data for later comparisons. Not used in our CI and makes runs faster
# Be a little bit more mypy-friendly.

# layering-definition=.layering.yaml
#!/usr/bin/env python3
# Copyright (C) 2022 tribe29 GmbH - License: GNU General Public License v2
# This file is part of Checkmk ( It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.

# A pylint checker for Checkmk layering conventions. The basic idea is very simple: We partition
# qualified names into "virtual packages" (see PackageFor protocol below) and check if imports
# between these virtual packages are explicitly allowed (see IsPackageRelationshipOK protocol
# below). If not, we report a violation. Two exceptions to: Importing from the standard library and
# the own package are always allowed, obviously. This is effectively what we would get if we had
# real separate packages with their own files etc.

# Test with:
# PYLINT_ARGS="--load-plugins=tests.testlib.pylint_checker_layering_violation --disable=all --enable=layering-violation" time make -C tests test-pylint
# or
# PYTHONPATH=. pipenv run python3 -m pylint --load-plugins=tests.testlib.pylint_checker_layering_violation --disable=all --enable=layering-violation cmk/{bi,ec,core_helpers,fields,notification_plugins,snmplib,utils}

from __future__ import annotations

from import Collection, Container, Hashable, Iterable, Mapping, Sequence, Set
from pathlib import Path
from typing import Protocol, TypeVar

import jsonschema # type: ignore[import]
import yaml
from astroid import nodes # type: ignore[import]
from pylint.checkers import BaseChecker # type: ignore[import]
from pylint.lint import PyLinter # type: ignore[import]

# our main "business logic", the heart of our import checking logic

class PackageFor(Protocol):
def __call__(self, name: str) -> str:

class IsPackageRelationshipOK(Protocol):
def __call__(self, *, importing_package: str, imported_package: str) -> bool:

class IsImportOK:
def __init__(
self, package_for: PackageFor, is_package_relationship_ok: IsPackageRelationshipOK
) -> None:
self._package_for = package_for
self._is_package_relationship_ok = is_package_relationship_ok

def __call__(self, *, importing_module: str, imported_name: str) -> bool:
return self._is_package_relationship_ok(

# the hook into pylint's AST traversal

def register(linter: PyLinter) -> None:

# NOTE: The first paragraph of the class documentation string is shown in pylint's --help output as
# a heading for the defined options.
class LayerViolationChecker(BaseChecker):
"""Checkmk layering conventions"""

name = "layering_violation" # name of the section in the config
msgs = {
"C8411": ( # message id; Why did we choose this number?
"import of %r not allowed in module %r", # template of displayed message
"layering-violation", # message symbol
"Used when an import is found which violates the Checkmk layering conventions.", # message description
options = (
"default": "",
"type": "path",
"metavar": "<path to YAML file>",
"help": "A path to a YAML file describing the layering conventions in Checkmk,"
" consisting of a description of the virtual packages, the allowed package"
" relationships and some expected package import cycles.",

def __init__(self, linter: PyLinter | None = None) -> None:
# The config file and commandline arguments have not been processed yet, so linter.config is
# not yet complete. We need to delay any configuration processing to open().
self._is_import_ok: IsImportOK | None = None
self._linter = linter

def open(self) -> None:
if self._linter and (filename := self._linter.config.layering_definition):
self._is_import_ok = load_layering_configuration(Path(filename))

def visit_import(self, node: nodes.Import) -> None:
importing_module = extract_importing_module(node)
imported_names = extract_imported_names(node)
self._check_imports(node, importing_module, imported_names)

def visit_importfrom(self, node: nodes.ImportFrom) -> None:
importing_module = extract_importing_module(node)
imported_module = extract_imported_module(node)
imported_names = [imported_module + "." + name for name in extract_imported_names(node)]
self._check_imports(node, importing_module, imported_names)

def _check_imports(
self, node: nodes.NodeNG, importing_module: str, imported_names: Sequence[str]
) -> None:
if self._is_import_ok is None:
for imported_name in imported_names:
if not self._is_import_ok(
importing_module=importing_module, imported_name=imported_name
args=(imported_name, importing_module),

# YAML helpers

def load_layering_configuration(path: Path) -> IsImportOK:
# NOTE: yaml.safe_load is guaranteed to return mappings in insertion order, and we depend on
# this! This is a tiny bit of a hack, we could use a list of pairs instead, but that would make
# the YAML quite a bit uglier.
with as stream:
layering_definition = yaml.load(stream, UniqueKeyLoader)
# We validate the layering definition here syntactically, a more thorough semantic validation
# will be done later in the constructors of PackageMapper and RelationChecker.
instance=layering_definition, schema=yaml.safe_load(LAYERING_DEFINITION_SCHEMA)
package_mapper = PackageMapper(layering_definition["package-definitions"].items())
relation_checker = RelationChecker(
return IsImportOK(package_mapper.package_for, relation_checker.is_package_relationship_ok)

# PyYAML doesn't check for duplicate mapping keys, although it really should, see
# for a discussion and the workaround below.
class UniqueKeyLoader(yaml.SafeLoader): # pylint: disable=too-many-ancestors
def construct_mapping(self, node: yaml.Node, deep: bool = False) -> dict:
mapping = set()
for key_node, _value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if key in mapping:
raise yaml.MarkedYAMLError(
"while constructing a mapping",
"found duplicate key",
return super().construct_mapping(node, deep)

# We could move this schema into a separate file to be usable for IDEs, see e.g.
type: object
type: object
type: array
type: string
uniqueItems: true
type: object
type: array
type: string
uniqueItems: true
type: array
type: array
type: string
uniqueItems: true
uniqueItems: true
- package-definitions
- allowed-package-relationships
- known-package-cycles
additionalProperties: false

# AST helpers

def extract_importing_module(node: nodes.Import) -> str:
return node.root().name

def extract_imported_module(node: nodes.ImportFrom) -> str:
level: int | None = node.level # numer of dots in relative import, 0 (None?) for absolute
modname: str = node.modname # the module that is being imported from
if level:
root: nodes.Module = node.root()
index = (None if level == 1 else -(level - 1)) if root.package else -level
return ".".join(".")[:index] + ([modname] if modname else []))
return modname

def extract_imported_names(node: nodes.Import) -> Sequence[str]:
return [name for name, _alias in node.names] # we don't care about any aliases

# mapping of qualified name prefixes to "virtual packages" via lists of prefixes

class PackageMapper:
def __init__(self, package_definitions: Iterable[tuple[str, Iterable[str]]]) -> None:
self._package_definitions = list(package_definitions)

def _validate_no_prefix_shadowing(self) -> None:
prefixes_seen: set[str] = set()
for package_name, prefixes in self._package_definitions:
for prefix in prefixes:
for prefix_seen in prefixes_seen:
if self._is_prefix_of(prefix_seen, prefix):
raise ValueError(
f"module prefix {prefix!r} in package definition for {package_name!r} shadowed by {prefix_seen!r}"
prefixes_seen |= set(prefixes)

def defined_package_names(self) -> Set[str]:
return {package_name for package_name, _prefixes in self._package_definitions}

def package_for(self, name: str) -> str:
for package_name, prefixes in self._package_definitions:
if any(self._is_prefix_of(p, name) for p in prefixes):
return package_name
raise ValueError(f"undefined package name for {name!r}")

def _is_prefix_of(module_prefix: str, name: str) -> bool:
return (name + ".").startswith(module_prefix + ".")

# check allowed imports between "virtual packages", using the given whitelist

class RelationChecker:
def __init__(
allowed_package_relationships: Mapping[str, Iterable[str]],
defined_package_names: Set[str],
known_package_cycles: Collection[Sequence[str]],
) -> None:
self._allowed_package_relationships = allowed_package_relationships
self._validate_only_defined_package_names_used(defined_package_names, known_package_cycles)

def _validate_only_defined_package_names_used(
defined_package_names: Set[str],
known_package_cycles: Collection[Sequence[str]],
) -> None:
def validate_defined(package_name: str, where: str) -> None:
if package_name not in defined_package_names:
raise ValueError(f"unknown package {package_name!r} in {where}")

for importing_package, allowed_imports in self._allowed_package_relationships.items():
validate_defined(importing_package, "allowed package relationships")
for allowed_import in allowed_imports:
validate_defined(allowed_import, f"allowed imports for {importing_package!r}")
for cycle in known_package_cycles:
for package_name in cycle:
validate_defined(package_name, f"known package cycle {cycle!r}")

def _validate_no_cycles(self, known_package_cycles: Container[Sequence[str]]) -> None:
if cycles := [
for scc in tarjan(self._allowed_package_relationships)
if len(scc) > 1 and scc not in known_package_cycles
plural = "s" if len(cycles) > 1 else ""
pretty_cycles = " and ".join(
" => ".join(list(cycle[::-1]) + [cycle[-1]]) for cycle in cycles
raise ValueError(f"cycle{plural} in allowed package relationships: {pretty_cycles}")

def is_package_relationship_ok(self, importing_package: str, imported_package: str) -> bool:
return (
imported_package in ("stdlib", importing_package)
or imported_package in self._allowed_package_relationships[importing_package]

# Tarjan's algorithm for SCCs, everybody should write their own version of it at least once! ;-)

T = TypeVar("T", bound=Hashable)

def tarjan(graph: Mapping[T, Iterable[T]]) -> Sequence[Sequence[T]]:
"""Returns the strongly connected components of the graph g in topological order,
see e.g.
Simple recursive version, should be OK for our purposes."""
node_stack: list[T] = []
on_stack: set[T] = set()
index: dict[T, int] = {}
lowlink: dict[T, int] = {}
sccs: list[list[T]] = []

def strong_connect(v: T) -> None:
lowlink[v] = index[v] = len(index)
for w in graph.get(v, ()):
if w not in index:
lowlink[v] = min(lowlink[v], lowlink[w])
elif w in on_stack:
lowlink[v] = min(lowlink[v], index[w])
if lowlink[v] == index[v]:
scc: list[T] = []
while True:
w = node_stack.pop()
if w == v:

for v in graph:
if v not in index:
return sccs

