Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: "actions/checkout@v2"
- uses: "actions/setup-python@v2"
- uses: "actions/checkout@v6"
- uses: "actions/setup-python@v6"
with:
python-version: "${{ matrix.python-version }}"
- name: "Install dependencies"
Expand Down
4 changes: 3 additions & 1 deletion importscan/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .scan import scan # noqa
from .scan import scan

__all__ = ("scan",)
84 changes: 62 additions & 22 deletions importscan/scan.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from pkgutil import iter_modules
from __future__ import annotations

import sys
from pkgutil import iter_modules
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
from importlib.abc import Loader
from types import ModuleType
from typing_extensions import TypeIs
from importscan.types import IgnoreModule, ModuleInfo, StrOrBytesPath

def scan(package, ignore=None, handle_error=None):

def scan(
package: ModuleType,
ignore: Iterable[IgnoreModule] | IgnoreModule | None = None,
handle_error: Callable[[str, Exception], object] | None = None,
) -> None:
"""Scan a package by importing it.

A framework can provide registration decorators: a decorator that
Expand Down Expand Up @@ -99,22 +113,39 @@ def handle_error(name, e):
is_ignored=is_ignored,
handle_error=handle_error,
):
try:
loader = importer.find_spec(modname).loader
except AttributeError:
# zipimport.zipimporter doesn't have find_spec
loader = importer.find_module(modname)
# FIXME: Add support for MetaPathFinder? But how would that work?
# What path do we pass in to get the correct result?
# We probably would need to remember the value of path we passed
# into iter_modules for submodules/subpackages. There's also
# the additional issue that not all finders will implement the
# non-standard iter_modules method, but without it there's
# no way to list all of the modules. Also since walk_packages
# already imports all of the packages, why are we importing
# them again here? Shouldn't we only import modules here?
# Also why do we do only use `import_module` here, but not
# in `walk_packages`? Doesn't that mean that the additional
# check in `import_module` doesn't do anything for packages?
loader = importer.find_spec(modname).loader # type: ignore
assert loader is not None

try:
import_module(modname, loader, handle_error)
finally:
if hasattr(loader, "file") and hasattr(loader.file, "close"):
loader.file.close()


def import_module(modname, loader, handle_error):
if hasattr(loader, "file") and hasattr(
loader.file, # pyright: ignore[reportAttributeAccessIssue]
"close",
):
loader.file.close() # pyright: ignore[reportAttributeAccessIssue]


def import_module(
modname: str,
loader: Loader,
handle_error: Callable[[str, Exception], object] | None,
) -> None:
get_filename = getattr(loader, "get_filename", None)
if get_filename is None:
get_filename = loader._get_filename
get_filename = loader._get_filename # type: ignore[attr-defined]
try:
fn = get_filename(modname)
except TypeError:
Expand All @@ -135,10 +166,13 @@ def import_module(modname, loader, handle_error):
raise


def get_is_ignored(package, ignore):
def get_is_ignored(
package: ModuleType, ignore: Iterable[IgnoreModule] | IgnoreModule | None
) -> Callable[[str], bool]:

pkg_name = package.__name__

def is_nonstr_iter(v):
def is_nonstr_iter(v: object) -> TypeIs[Iterable[IgnoreModule]]:
if isinstance(v, str): # pragma: no cover
return False
return hasattr(v, "__iter__")
Expand All @@ -157,23 +191,28 @@ def is_nonstr_iter(v):
# functions, e.g. re.compile('pattern').search
callable_ignores = [ign for ign in ignore if callable(ign)]

def is_ignored(fullname):
def is_ignored(fullname: str) -> bool:
for ign in rel_ignores:
if fullname.startswith(pkg_name + ign):
return True
for ign in abs_ignores:
# non-leading-dotted name absolute object name
if fullname.startswith(ign):
return True
for ign in callable_ignores:
if ign(fullname):
for ign_fn in callable_ignores:
if ign_fn(fullname):
return True
return False

return is_ignored


def walk_packages(path=None, prefix="", is_ignored=None, handle_error=None):
def walk_packages(
path: Iterable[StrOrBytesPath] | None = None,
prefix: str = "",
is_ignored: Callable[[str], bool] | None = None,
handle_error: Callable[[str, Exception], object] | None = None,
) -> Generator[ModuleInfo]:
"""Yields (module_finder, name, ispkg) for all modules recursively
on path, or, if path is ``None``, all accessible modules.

Expand Down Expand Up @@ -205,10 +244,11 @@ def walk_packages(path=None, prefix="", is_ignored=None, handle_error=None):

"""

def seen(p, m={}):
def seen(p: str, m: set[str] = set()) -> bool:
if p in m: # pragma: no cover
return True
m[p] = True
m.add(p)
return False

# iter_modules is nonrecursive
for module_finder, name, ispkg in iter_modules(path, prefix):
Expand All @@ -235,6 +275,6 @@ def seen(p, m={}):
path = getattr(sys.modules[name], "__path__", None) or []

# don't traverse path items we've seen before
path = [p for p in path if not seen(p)]
path = [p for p in path if not seen(p)] # pyright: ignore

yield from walk_packages(path, name + ".", is_ignored, handle_error)
6 changes: 4 additions & 2 deletions importscan/tests/fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

calls = 0


def call():
def call() -> None:
global calls
calls += 1


def reset():
def reset() -> None:
global calls
calls = 0
65 changes: 37 additions & 28 deletions importscan/tests/test_importscan.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,93 @@
import sys
import re
import os
from __future__ import annotations

import contextlib
import os
import re
import sys
from typing import TYPE_CHECKING

import pytest
from pytest import raises
from importscan import scan
from . import fixtures
from pytest import raises

if TYPE_CHECKING:
from collections.abc import Generator

# note that due to the nature of imports, we need to have a unique fixture
# for each test


@contextlib.contextmanager
def with_entry_in_sys_path(entry):
def with_entry_in_sys_path(entry: str) -> Generator[None]:
"""Context manager that temporarily puts an entry at head of sys.path"""
sys.path.insert(0, entry)
yield
sys.path.remove(entry)


def zip_file_in_sys_path():
def zip_file_in_sys_path() -> contextlib.AbstractContextManager[None]:
"""Context manager that puts zipped.zip at head of sys.path"""
zip_pkg_path = os.path.join(
os.path.dirname(__file__), "fixtures", "zipped.zip"
)
return with_entry_in_sys_path(zip_pkg_path)


def setup_function(function):
@pytest.fixture(autouse=True, scope="function")
def reset_fixtures() -> None:
fixtures.reset()


def test_empty_package():
def test_empty_package() -> None:
from .fixtures import empty_package

scan(empty_package)

assert fixtures.calls == 1


def test_module():
def test_module() -> None:
from .fixtures import module

scan(module)

assert fixtures.calls == 1


def test_package():
def test_package() -> None:
from .fixtures import package

scan(package)

assert fixtures.calls == 1


def test_empty_subpackage():
def test_empty_subpackage() -> None:
from .fixtures import empty_subpackage

scan(empty_subpackage)

assert fixtures.calls == 1


def test_subpackage():
def test_subpackage() -> None:
from .fixtures import subpackage

scan(subpackage)

assert fixtures.calls == 1


def test_ignore_module_relative():
def test_ignore_module_relative() -> None:
from .fixtures import ignore_module

scan(ignore_module, ignore=[".module"])

assert fixtures.calls == 0


def test_ignore_module_absolute():
def test_ignore_module_absolute() -> None:
from .fixtures import ignore_module_absolute

scan(
Expand All @@ -89,57 +98,57 @@ def test_ignore_module_absolute():
assert fixtures.calls == 0


def test_ignore_module_function():
def test_ignore_module_function() -> None:
from .fixtures import ignore_module_function

scan(ignore_module_function, ignore=re.compile("module$").search)

assert fixtures.calls == 0


def test_ignore_subpackage_relative():
def test_ignore_subpackage_relative() -> None:
from .fixtures import ignore_subpackage

scan(ignore_subpackage, ignore=[".sub"])

assert fixtures.calls == 0


def test_ignore_subpackage_function():
def test_ignore_subpackage_function() -> None:
from .fixtures import ignore_subpackage_function

scan(ignore_subpackage_function, ignore=re.compile("sub$").search)

assert fixtures.calls == 0


def test_ignore_subpackage_module_relative():
def test_ignore_subpackage_module_relative() -> None:
from .fixtures import ignore_subpackage_module

scan(ignore_subpackage_module, ignore=[".sub.module"])

assert fixtures.calls == 0


def test_importerror():
def test_importerror() -> None:
from .fixtures import importerror

with raises(ImportError):
scan(importerror)


def test_attributeerror():
def test_attributeerror() -> None:
from .fixtures import attributeerror

with raises(AttributeError):
scan(attributeerror)


def test_importerror_handle_error():
def test_importerror_handle_error() -> None:
from .fixtures import importerror_handle_error

# skip import errors
def handle_error(name, e):
def handle_error(name: str, e: Exception) -> None:
if not isinstance(e, ImportError):
raise e

Expand All @@ -148,30 +157,30 @@ def handle_error(name, e):
assert fixtures.calls == 1


def test_attributeerror_not_handle_error():
def test_attributeerror_not_handle_error() -> None:
from .fixtures import attributeerror_not_handle_error

# skip import errors but not attribute errors
def handle_error(name, e):
def handle_error(name: str, e: Exception) -> None:
if not isinstance(e, ImportError):
raise e

with raises(AttributeError):
scan(attributeerror_not_handle_error, handle_error=handle_error)


def test_package_in_zipped():
def test_package_in_zipped() -> None:
with zip_file_in_sys_path():
import packageinzipped
import packageinzipped # type: ignore

scan(packageinzipped)

assert fixtures.calls == 1


def test_module_in_zipped():
def test_module_in_zipped() -> None:
with zip_file_in_sys_path():
import moduleinzipped
import moduleinzipped # type: ignore

scan(moduleinzipped)

Expand Down
Loading