diff --git a/docs/command_line.md b/docs/command_line.md index 7f73047..16089a9 100644 --- a/docs/command_line.md +++ b/docs/command_line.md @@ -37,25 +37,30 @@ Usage: docstub run [OPTIONS] PACKAGE_PATH annotations or to override them. Options: - -o, --out-dir PATH Set output directory explicitly. Stubs will be directly - written into that directory while preserving the - directory structure under `PACKAGE_PATH`. Otherwise, - stubs are generated inplace. - --config PATH Set one or more configuration file(s) explicitly. - Otherwise, it will look for a `pyproject.toml` or - `docstub.toml` in the current directory. - --ignore GLOB Ignore files matching this glob-style pattern. Can be - used multiple times. - --group-errors Group identical errors together and list where they - occurred. Will delay showing errors until all files have - been processed. Otherwise, simply report errors as the - occur. - --allow-errors INT Allow this many or fewer errors. If docstub reports - more, exit with error code '1'. This is useful to adopt - docstub gradually. [default: 0; x>=0] - --no-cache Ignore pre-existing cache and don't create a new one. - -v, --verbose Print more details (repeatable). - -h, --help Show this message and exit. + -o, --out-dir PATH Set output directory explicitly. Stubs will be + directly written into that directory while preserving + the directory structure under `PACKAGE_PATH`. + Otherwise, stubs are generated inplace. + --config PATH Set one or more configuration file(s) explicitly. + Otherwise, it will look for a `pyproject.toml` or + `docstub.toml` in the current directory. + --ignore GLOB Ignore files matching this glob-style pattern. Can be + used multiple times. + --group-errors Group identical errors together and list where they + occurred. Will delay showing errors until all files + have been processed. Otherwise, simply report errors + as the occur. + --allow-errors INT Allow this many or fewer errors. If docstub reports + more, exit with error code '1'. This is useful to + adopt docstub gradually. [default: 0; x>=0] + -W, --fail-on-warning Return non-zero exit code when a warning is raised. + Will add to '--allow-errors'. + --no-cache Ignore pre-existing cache and don't create a new one. + -v, --verbose Print more details. Use once to show information + messages. Use '-vv' to print debug messages. + -q, --quiet Print less details. Use once to hide warnings. Use + '-qq' to completely silence output. + -h, --help Show this message and exit. ``` @@ -75,7 +80,10 @@ Usage: docstub clean [OPTIONS] one exists, remove it. Options: - -v, --verbose Print more details (repeatable). + -v, --verbose Print more details. Use once to show information messages. + Use '-vv' to print debug messages. + -q, --quiet Print less details. Use once to hide warnings. Use '-qq' to + completely silence output. -h, --help Show this message and exit. ``` diff --git a/src/docstub/_analysis.py b/src/docstub/_analysis.py index 090ba9b..8c71b60 100644 --- a/src/docstub/_analysis.py +++ b/src/docstub/_analysis.py @@ -14,7 +14,7 @@ from ._utils import accumulate_qualname, module_name_from_path, pyfile_checksum -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) def _shared_leading_qualname(*qualnames): @@ -104,6 +104,18 @@ def typeshed_Incomplete(cls): return import_ def format_import(self, relative_to=None): + """Format import as valid Python import statement. + + Parameters + ---------- + relative_to : str, optional + If a dot-delimited module name is given, format the import relative + to it. + + Returns + ------- + formatted : str + """ if self.implicit: msg = f"cannot import implicit object: {self.implicit!r}" raise RuntimeError(msg) @@ -265,20 +277,6 @@ def common_known_types(): return types -@dataclass(slots=True, kw_only=True) -class TypeCollectionResult: - types: dict[str, PyImport] - type_prefixes: dict[str, PyImport] - - @classmethod - def serialize(cls, result): - pass - - @classmethod - def deserialize(cls, result): - pass - - class TypeCollector(cst.CSTVisitor): """Collect types from a given Python file. @@ -296,7 +294,13 @@ class TypeCollector(cst.CSTVisitor): """ class ImportSerializer: - """Implements the `FuncSerializer` protocol to cache `TypeCollector.collect`.""" + """Implements the `FuncSerializer` protocol to cache `TypeCollector.collect`. + + Attributes + ---------- + suffix : ClassVar[str] + encoding : ClassVar[str] + """ suffix = ".json" encoding = "utf-8" @@ -524,7 +528,7 @@ def _resolve_nickname(self, name): resolved = name else: logger.warning( - "reached limit while resolving nicknames for %r in %s, using %r", + "Reached limit while resolving nicknames for %r in %s, using %r", original, self.current_file or "", resolved, @@ -572,7 +576,7 @@ def match(self, search): "%r (original %r) in %s matches multiple types %r, using %r", search, original_search, - self.current_file or "", + self.current_file or "", matches.keys(), shortest_key, ) diff --git a/src/docstub/_cache.py b/src/docstub/_cache.py index e7a9f31..77ff1f9 100644 --- a/src/docstub/_cache.py +++ b/src/docstub/_cache.py @@ -2,13 +2,13 @@ from functools import cached_property from typing import Any, Protocol -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) -CACHE_DIR_NAME = ".docstub_cache" +CACHE_DIR_NAME: str = ".docstub_cache" -CACHEDIR_TAG_CONTENT = """\ +CACHEDIR_TAG_CONTENT: str = """\ Signature: 8a477f597d28d172789f06886806bc55 # Mark this directory as a cache [1], created by docstub [2] # [1] https://bford.info/cachedir/ @@ -16,7 +16,7 @@ """ -GITHUB_IGNORE_CONTENT = """\ +GITHUB_IGNORE_CONTENT: str = """\ # Make git ignore this cache directory, created by docstub [1] # [1] https://github.com/scientific-python/docstub * @@ -172,7 +172,7 @@ def cache_dir(self): create_cache(self._cache_dir) if _directory_size(self._cache_dir) > 512 * 1024**2: - logger.warning("cache size at %r exceeds 512 MiB", self._cache_dir) + logger.warning("Cache size at %r exceeds 512 MiB", self._cache_dir) return self._cache_dir diff --git a/src/docstub/_cli.py b/src/docstub/_cli.py index b371204..c8ebd70 100644 --- a/src/docstub/_cli.py +++ b/src/docstub/_cli.py @@ -21,11 +21,11 @@ walk_source_and_targets, walk_source_package, ) +from ._report import setup_logging from ._stubs import Py2StubTransformer, try_format_stub -from ._utils import ErrorReporter, GroupedErrorReporter from ._version import __version__ -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) def _cache_dir_in_cwd(): @@ -55,39 +55,45 @@ def _load_configuration(config_paths=None): if config_paths: for path in config_paths: - logger.info("using %s", path) + logger.info("Using %s", path) add_config = Config.from_toml(path) config = config.merge(add_config) else: pyproject_toml = Path.cwd() / "pyproject.toml" if pyproject_toml.is_file(): - logger.info("using %s", pyproject_toml) + logger.info("Using %s", pyproject_toml) add_config = Config.from_toml(pyproject_toml) config = config.merge(add_config) docstub_toml = Path.cwd() / "docstub.toml" if docstub_toml.is_file(): - logger.info("using %s", docstub_toml) + logger.info("Using %s", docstub_toml) add_config = Config.from_toml(docstub_toml) config = config.merge(add_config) return config -def _setup_logging(*, verbose): - _VERBOSITY_LEVEL = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG} - verbose = min(2, max(0, verbose)) # Limit to range [0, 2] +def _calc_verbosity(*, verbose, quiet): + """Calculate the verbosity from the "--verbose" or "--quiet" flags. - format_ = "%(levelname)s: %(message)s" - if verbose >= 2: - format_ += " py_source=%(filename)s#L%(lineno)d::%(funcName)s" + Parameters + ---------- + verbose : {0, 1, 2} + quiet : {0, 1, 2} - logging.basicConfig( - level=_VERBOSITY_LEVEL[verbose], - format=format_, - stream=sys.stderr, - ) + Returns + ------- + verbosity : {-2, -1, 0, 1, 2} + """ + if verbose and quiet: + raise click.UsageError( + "Options '-v/--verbose' and '-q/--quiet' cannot be used together" + ) + verbose -= quiet + verbose = min(2, max(-2, verbose)) # Limit to range [-2, 2] + return verbose def _collect_type_info(root_path, *, ignore=(), cache=False): @@ -110,7 +116,6 @@ def _collect_type_info(root_path, *, ignore=(), cache=False): type_prefixes : dict[str, PyImport] """ types = common_known_types() - type_prefixes = {} if cache: collect = FileCache( @@ -121,29 +126,62 @@ def _collect_type_info(root_path, *, ignore=(), cache=False): else: collect = TypeCollector.collect + collected_types = {} + collected_type_prefixes = {} for source_path in walk_source_package(root_path, ignore=ignore): if cache: module = source_path.relative_to(root_path.parent) collect.sub_dir = f"{__version__}/{module}" types_in_file, prefixes_in_file = collect(source_path) - types.update(types_in_file) - type_prefixes.update(prefixes_in_file) + collected_types.update(types_in_file) + collected_type_prefixes.update(prefixes_in_file) logger.info( - "collected types in %s%s", + "Collected%s types in %s", + " cached" if cache and collect.cached_last_call else "", + source_path, + ) + logger.debug( + "%i types, %i type prefixes in %s", + len(types_in_file), + len(prefixes_in_file), source_path, - " (cached)" if cache and collect.cached_last_call else "", ) - return types, type_prefixes + logger.debug("Collected %i types", len(collected_types)) + logger.debug("Collected %i type prefixes", len(collected_type_prefixes)) + types |= collected_types + return types, collected_type_prefixes + + +def _format_unknown_names(unknown_names): + """Format unknown type names as a list for printing. + + Parameters + ---------- + unknown_names : Iterable[str] + + Returns + ------- + formatted : str + A multiline string. + """ + lines = [click.style(f"Unknown type names: {len(unknown_names)}", bold=True)] + counter = Counter(unknown_names) + sorted_item_counts = sorted(counter.items(), key=lambda x: x[1], reverse=True) + for item, count in sorted_item_counts: + lines.append(f" {item} (x{count})") + return "\n".join(lines) @contextmanager -def report_execution_time(): +def log_execution_time(): start = time.time() try: yield + except KeyboardInterrupt: + logger.critical("Interrupt!") finally: stop = time.time() total_seconds = stop - start @@ -157,13 +195,7 @@ def report_execution_time(): if hours: formated_duration = f"{hours} h {formated_duration}" - # FIXME A hack to ensure that closing message is printed last, instead - # it would be got to use the same mechanism for all output - for handler in logger.handlers: - handler.flush() - - click.echo() - click.echo(f"Finished in {formated_duration}") + logger.info("Finished in %s", formated_duration) # docstub: off @@ -222,16 +254,36 @@ def cli(): metavar="INT", help="Allow this many or fewer errors. " "If docstub reports more, exit with error code '1'. " - "This is useful to adopt docstub gradually.", + "This is useful to adopt docstub gradually. ", +) +@click.option( + "-W", + "--fail-on-warning", + is_flag=True, + help="Return non-zero exit code when a warning is raised. " + "Will add to '--allow-errors'.", ) @click.option( "--no-cache", is_flag=True, help="Ignore pre-existing cache and don't create a new one.", ) -@click.option("-v", "--verbose", count=True, help="Print more details (repeatable).") +@click.option( + "-v", + "--verbose", + count=True, + help="Print more details. Use once to show information messages. " + "Use '-vv' to print debug messages.", +) +@click.option( + "-q", + "--quiet", + count=True, + help="Print less details. Use once to hide warnings. " + "Use '-qq' to completely silence output.", +) @click.help_option("-h", "--help") -@report_execution_time() +@log_execution_time() def run( *, root_path, @@ -240,8 +292,10 @@ def run( ignore, group_errors, allow_errors, + fail_on_warning, no_cache, verbose, + quiet, ): """Generate Python stub files. @@ -258,13 +312,16 @@ def run( ignore : Sequence[str] group_errors : bool allow_errors : int + fail_on_warning : bool no_cache : bool verbose : int + quiet : int """ # Setup ------------------------------------------------------------------- - _setup_logging(verbose=verbose) + verbosity = _calc_verbosity(verbose=verbose, quiet=quiet) + error_handler = setup_logging(verbosity=verbosity, group_errors=group_errors) root_path = Path(root_path) if root_path.is_file(): @@ -296,11 +353,10 @@ def run( for prefix, module in config.type_prefixes.items() } - reporter = GroupedErrorReporter() if group_errors else ErrorReporter() matcher = TypeMatcher( types=types, type_prefixes=type_prefixes, type_nicknames=config.type_nicknames ) - stub_transformer = Py2StubTransformer(matcher=matcher, reporter=reporter) + stub_transformer = Py2StubTransformer(matcher=matcher) if not out_dir: if root_path.is_file(): @@ -316,67 +372,79 @@ def run( root_path, out_dir, ignore=config.ignore_files ): if source_path.suffix.lower() == ".pyi": - logger.debug("using existing stub file %s", source_path) + logger.debug("Using existing stub file %s", source_path) with source_path.open() as fo: stub_content = fo.read() else: with source_path.open() as fo: py_content = fo.read() - logger.debug("creating stub from %s", source_path) + logger.debug("Transforming %s", source_path) try: stub_content = stub_transformer.python_to_stub( py_content, module_path=source_path ) stub_content = f"{STUB_HEADER_COMMENT}\n\n{stub_content}" stub_content = try_format_stub(stub_content) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as e: - logger.exception("failed creating stub for %s:\n\n%s", source_path, e) + except Exception: + logger.exception("Failed creating stub for %s", source_path) continue stub_path.parent.mkdir(parents=True, exist_ok=True) with stub_path.open("w") as fo: - logger.info("wrote %s", stub_path) + logger.info("Wrote %s", stub_path) fo.write(stub_content) # Reporting -------------------------------------------------------------- if group_errors: - reporter.print_grouped() + error_handler.emit_grouped() + assert error_handler.group_errors is True + error_handler.group_errors = False # Report basic statistics successful_queries = matcher.successful_queries - click.secho(f"{successful_queries} matched annotations", fg="green") - + transformed_doctypes = stub_transformer.transformer.stats["transformed"] syntax_error_count = stub_transformer.transformer.stats["syntax_errors"] + unknown_type_names = matcher.unknown_qualnames + total_warnings = error_handler.warning_count + total_errors = error_handler.error_count + + logger.info("Recognized type names: %i", successful_queries) + logger.info("Transformed doctypes: %i", transformed_doctypes) + if total_warnings: + logger.warning("Warnings: %i", total_warnings) if syntax_error_count: - click.secho(f"{syntax_error_count} syntax errors", fg="red") - - unknown_qualnames = matcher.unknown_qualnames - if unknown_qualnames: - click.secho(f"{len(unknown_qualnames)} unknown type names", fg="red") - counter = Counter(unknown_qualnames) - sorted_item_counts = sorted(counter.items(), key=lambda x: x[1], reverse=True) - for item, count in sorted_item_counts: - click.echo(f" {item} (x{count})") - - total_errors = len(unknown_qualnames) + syntax_error_count - total_msg = f"{total_errors} total errors" - if allow_errors: - total_msg = f"{total_msg} (allowed {allow_errors})" - click.secho(total_msg, bold=True) - - if allow_errors < total_errors: - logger.debug("number of allowed errors %i was exceeded") + logger.warning("Syntax errors: %i", syntax_error_count) + if unknown_type_names: + logger.warning(_format_unknown_names(unknown_type_names)) + if total_errors: + logger.error("Total errors: %i", total_errors) + + total_fails = total_errors + if fail_on_warning: + total_fails += total_warnings + if allow_errors < total_fails: sys.exit(1) # docstub: off @cli.command() # docstub: on -@click.option("-v", "--verbose", count=True, help="Print more details (repeatable).") +@click.option( + "-v", + "--verbose", + count=True, + help="Print more details. Use once to show information messages. " + "Use '-vv' to print debug messages.", +) +@click.option( + "-q", + "--quiet", + count=True, + help="Print less details. Use once to hide warnings. " + "Use '-qq' to completely silence output.", +) @click.help_option("-h", "--help") -def clean(verbose): +def clean(verbose, quiet): """Clean the cache. Looks for a cache directory relative to the current working directory. @@ -386,8 +454,10 @@ def clean(verbose): Parameters ---------- verbose : int + quiet : int """ - _setup_logging(verbose=verbose) + verbosity = _calc_verbosity(verbose=verbose, quiet=quiet) + setup_logging(verbosity=verbosity, group_errors=False) path = _cache_dir_in_cwd() if path.exists(): @@ -404,6 +474,6 @@ def clean(verbose): sys.exit(1) else: shutil.rmtree(_cache_dir_in_cwd()) - logger.info("cleaned %s", path) + logger.info("Cleaned %s", path) else: - logger.info("no cache to clean") + logger.info("No cache to clean") diff --git a/src/docstub/_config.py b/src/docstub/_config.py index e187041..acd44b1 100644 --- a/src/docstub/_config.py +++ b/src/docstub/_config.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import ClassVar -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) @dataclasses.dataclass(frozen=True, slots=True, kw_only=True) @@ -35,7 +35,7 @@ def from_toml(cls, path): with open(path, "rb") as fp: raw = tomllib.load(fp) config = cls(**raw.get("tool", {}).get("docstub", {}), config_paths=(path,)) - logger.debug("created Config from %s", path) + logger.debug("Created `Config` from %s", path) return config def merge(self, other): @@ -58,7 +58,7 @@ def merge(self, other): ignore_files=self.ignore_files + other.ignore_files, config_paths=self.config_paths + other.config_paths, ) - logger.debug("merged Config from %s", new.config_paths) + logger.debug("Merged Config from %s", new.config_paths) return new def to_dict(self): @@ -74,6 +74,16 @@ def __repr__(self) -> str: @staticmethod def validate(mapping): + """Make sure that a valid Config can be created from `mapping`. + + Parameters + ---------- + mapping : Mapping + + Raises + ------ + TypeError + """ for field in ["types", "type_prefixes", "type_nicknames"]: table = mapping[field] if not isinstance(table, dict): diff --git a/src/docstub/_docstrings.py b/src/docstub/_docstrings.py index 5dc4ce2..20012c7 100644 --- a/src/docstub/_docstrings.py +++ b/src/docstub/_docstrings.py @@ -16,19 +16,20 @@ # types and imports. I think that could very well be done at a higher level, # e.g. in the stubs module. from ._analysis import PyImport, TypeMatcher -from ._utils import DocstubError, ErrorReporter, escape_qualname +from ._report import ContextReporter +from ._utils import DocstubError, escape_qualname -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) -here = Path(__file__).parent -grammar_path = here / "doctype.lark" +here: Path = Path(__file__).parent +grammar_path: Path = here / "doctype.lark" with grammar_path.open() as file: - _grammar = file.read() + _grammar: str = file.read() -_lark = lark.Lark(_grammar, propagate_positions=True, strict=True) +_lark: lark.Lark = lark.Lark(_grammar, propagate_positions=True, strict=True) def _find_one_token(tree, *, name): @@ -178,7 +179,7 @@ def _aggregate_annotations(*types): return values, imports -FallbackAnnotation = Annotation( +FallbackAnnotation: Annotation = Annotation( value="Incomplete", imports=frozenset([PyImport.typeshed_Incomplete()]) ) @@ -270,7 +271,10 @@ def __init__(self, *, matcher=None, **kwargs): super().__init__(**kwargs) - self.stats = {"syntax_errors": 0} + self.stats = { + "syntax_errors": 0, + "transformed": 0, + } def doctype_to_annotation(self, doctype): """Turn a type description in a docstring into a type annotation. @@ -296,6 +300,7 @@ def doctype_to_annotation(self, doctype): annotation = Annotation( value=value, imports=frozenset(self._collected_imports) ) + self.stats["transformed"] += 1 return annotation, self._unknown_qualnames except ( lark.exceptions.LexError, @@ -390,7 +395,7 @@ def natlang_literal(self, tree): if len(tree.children) == 1: logger.warning( - "natural language literal with one item `%s`, " + "Natural language literal with one item `%s`, " "consider using `%s` to improve readability", tree.children[0], out, @@ -458,20 +463,7 @@ def shape(self, tree): ------- out : lark.visitors._DiscardType """ - logger.debug("dropping shape information") - return lark.Discard - - def optional(self, tree): - """ - Parameters - ---------- - tree : lark.Tree - - Returns - ------- - out : lark.visitors._DiscardType - """ - logger.debug("dropping optional / default info") + logger.debug("Dropping shape information %r", tree) return lark.Discard def optional_info(self, tree): @@ -484,7 +476,7 @@ def optional_info(self, tree): ------- out : lark.visitors._DiscardType """ - logger.debug("dropping optional info") + # logger.debug("Dropping optional info %r", tree) return lark.Discard def __default__(self, data, children, meta): @@ -583,7 +575,7 @@ class DocstringAnnotations: ---------- docstring : str transformer : DoctypeTransformer - reporter : ~.ErrorReporter + reporter : ~.ContextReporter Examples -------- @@ -597,14 +589,6 @@ class DocstringAnnotations: >>> transformer = DoctypeTransformer() >>> annotations = DocstringAnnotations(docstring, transformer=transformer) >>> annotations.parameters.keys() - Invalid syntax in docstring type annotation - some invalid syntax - ^ - - Unknown name in doctype: 'unknown.symbol' - unknown.symbol - ^^^^^^^^^^^^^^ - dict_keys(['a', 'b', 'c']) """ @@ -614,15 +598,15 @@ def __init__(self, docstring, *, transformer, reporter=None): ---------- docstring : str transformer : DoctypeTransformer - reporter : ~.ErrorReporter, optional + reporter : ~.ContextReporter, optional """ self.docstring = docstring self.np_docstring = npds.NumpyDocString(docstring) self.transformer = transformer if reporter is None: - reporter = ErrorReporter(line=0) - self.reporter: ErrorReporter = reporter + reporter = ContextReporter(logger=logger, line=0) + self.reporter = reporter.copy_with(logger=logger) def _doctype_to_annotation(self, doctype, ds_line=0): """Convert a type description to a Python-ready type. @@ -647,13 +631,16 @@ def _doctype_to_annotation(self, doctype, ds_line=0): annotation, unknown_qualnames = self.transformer.doctype_to_annotation( doctype ) + reporter.debug( + "Transformed doctype", details=(" %s\n-> %s", doctype, annotation) + ) except (lark.exceptions.LexError, lark.exceptions.ParseError) as error: details = None if hasattr(error, "get_context"): details = error.get_context(doctype) details = details.replace("^", click.style("^", fg="red", bold=True)) - reporter.message( + reporter.error( "Invalid syntax in docstring type annotation", details=details ) return FallbackAnnotation @@ -661,7 +648,7 @@ def _doctype_to_annotation(self, doctype, ds_line=0): except lark.visitors.VisitError as e: tb = "\n".join(traceback.format_exception(e.orig_exc)) details = f"doctype: {doctype!r}\n\n{tb}" - reporter.message("unexpected error while parsing doctype", details=details) + reporter.error("Unexpected error while parsing doctype", details=details) return FallbackAnnotation else: @@ -669,7 +656,7 @@ def _doctype_to_annotation(self, doctype, ds_line=0): width = stop_col - start_col error_underline = click.style("^" * width, fg="red", bold=True) details = f"{doctype}\n{' ' * start_col}{error_underline}\n" - reporter.message(f"Unknown name in doctype: {name!r}", details=details) + reporter.error(f"Unknown name in doctype: {name!r}", details=details) return annotation @cached_property @@ -704,8 +691,8 @@ def parameters(self): duplicates = param_section.keys() & other_section.keys() for duplicate in duplicates: - self.reporter.message( - "duplicate attribute name in docstring", + self.reporter.warn( + "Duplicate attribute name in docstring", details=self.reporter.underline(duplicate), ) @@ -798,7 +785,7 @@ def _handle_missing_whitespace(self, param): ds_line = i break reporter = self.reporter.copy_with(line_offset=ds_line) - reporter.message(msg, details=hint) + reporter.warn(msg, details=hint) new_name, new_type = param.name.split(":", maxsplit=1) param = npds.Parameter(name=new_name, type=new_type, desc=param.desc) @@ -828,8 +815,8 @@ def _section_annotations(self, name): if param.name in annotated_params: # TODO make error - self.reporter.message( - "duplicate parameter / attribute name in docstring", + self.reporter.warn( + "Duplicate parameter / attribute name in docstring", details=self.reporter.underline(param.name), ) continue diff --git a/src/docstub/_path_utils.py b/src/docstub/_path_utils.py index 3ec60ca..f1f9993 100644 --- a/src/docstub/_path_utils.py +++ b/src/docstub/_path_utils.py @@ -11,10 +11,10 @@ from ._vendored.stdlib import glob_translate -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) -STUB_HEADER_COMMENT = "# File generated with docstub" +STUB_HEADER_COMMENT: str = "# File generated with docstub" def is_docstub_generated(stub_path): @@ -130,7 +130,7 @@ def find_package_root(path): for _ in range(2**16): if not is_python_package_dir(root): - logger.debug("detected %s as the package root of %s", root, path) + logger.debug("Detected %s as the package root of %s", root, path) return root root = root.parent @@ -214,7 +214,7 @@ def _walk_source_package(path, *, ignore_regex): """ # Make sure if ignore_regex and ignore_regex.match(str(path.resolve())): - logger.info("ignoring %s", path) + logger.info("Ignoring '%s'", path) return if is_python_package_dir(path): @@ -235,10 +235,10 @@ def _walk_source_package(path, *, ignore_regex): yield path elif path.is_dir(): - logger.debug("skipping directory %s which isn't a Python package", path) + logger.debug("Skipping directory '%s', not a Python package", path) elif path.is_file(): - logger.debug("skipping non-Python file %s", path) + logger.debug("Skipping non-Python file '%s'", path) def walk_source_package(path, *, ignore=()): diff --git a/src/docstub/_report.py b/src/docstub/_report.py new file mode 100644 index 0000000..e1e4dbe --- /dev/null +++ b/src/docstub/_report.py @@ -0,0 +1,366 @@ +"""Report errors and information to the user.""" + +import dataclasses +import logging +from pathlib import Path +from textwrap import indent + +import click + +logger: logging.Logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) +class ContextReporter: + """Log messages in context of a file path and line number. + + This is basically a custom :class:`logging.LoggingAdapter`. + + Attributes + ---------- + logger + path : + Path to a file for the current context. + line : + The line in the given file. + + Examples + -------- + Setup logging for doctest, note the use of :cls:`ErrorHandler` + + >>> import sys + >>> import logging + >>> logger = logging.getLogger(__name__) + >>> logger.setLevel(logging.INFO) + >>> logger.addHandler(ReportHandler(sys.stdout)) + + >>> rep = ContextReporter(logger=logger) + >>> rep.info("Message") + I Message + + >>> rep = rep.copy_with(path=Path("file/with/problems.py")) + >>> rep.copy_with(line=3).error("Message with line info") + E Message with line info + file...problems.py:3 + + >>> rep.copy_with(line=4).warn("With details", details="More details") + W With details + More details + file...problems.py:4 + """ + + logger: logging.Logger + path: Path | None = None + line: int | None = None + + def copy_with(self, *, logger=None, path=None, line=None, line_offset=None): + """Return a new copy with the modified attributes. + + Parameters + ---------- + logger : logging.Logger + path : Path, optional + line : int, optional + line_offset : int, optional + + Returns + ------- + new : Self + """ + kwargs = dataclasses.asdict(self) + if logger: + kwargs["logger"] = logger + if path: + kwargs["path"] = path + if line: + kwargs["line"] = line + if line_offset: + kwargs["line"] += line_offset + new = type(self)(**kwargs) + return new + + def report(self, short, *, log_level, details=None, **log_kw): + """Log a report in context of the saved location. + + Parameters + ---------- + short : str + A short summarizing report that shouldn't wrap over multiple lines. + log_level : int + The logging level. + details : str, optional + An optional multiline report with more details. + **log_kw : Any + """ + extra = {"details": details} + + if self.path is not None: + location = self.path + if self.line is not None: + location = f"{location}:{self.line}" + extra["src_location"] = location + + self.logger.log(log_level, msg=short, extra=extra, **log_kw) + + def debug(self, short, *, details=None, **log_kw): + """Log information with context of the relevant source. + + Parameters + ---------- + short : str + A short summarizing report that shouldn't wrap over multiple lines. + details : str, optional + An optional multiline report with more details. + **log_kw : Any + """ + return self.report(short, log_level=logging.DEBUG, details=details, **log_kw) + + def info(self, short, *, details=None, **log_kw): + """Log information with context of the relevant source. + + Parameters + ---------- + short : str + A short summarizing report that shouldn't wrap over multiple lines. + details : str, optional + An optional multiline report with more details. + **log_kw : Any + """ + return self.report(short, log_level=logging.INFO, details=details, **log_kw) + + def warn(self, short, *, details=None, **log_kw): + """Log a warning with context of the relevant source. + + Parameters + ---------- + short : str + A short summarizing report that shouldn't wrap over multiple lines. + details : str, optional + An optional multiline report with more details. + **log_kw : Any + """ + return self.report(short, log_level=logging.WARNING, details=details, **log_kw) + + def error(self, short, *, details=None, **log_kw): + """Log an error with context of the relevant source. + + Parameters + ---------- + short : str + A short summarizing report that shouldn't wrap over multiple lines. + details : str, optional + An optional multiline report with more details. + **log_kw : Any + """ + return self.report(short, log_level=logging.ERROR, details=details, **log_kw) + + def __post_init__(self): + if self.path is not None and not isinstance(self.path, Path): + msg = f"expected `path` to be of type `Path`, got {type(self.path)!r}" + raise TypeError(msg) + + @staticmethod + def underline(line, *, char="^"): + """Underline `line` with the given `char`. + + Parameters + ---------- + line : str + char : str + + Returns + ------- + underlined : str + """ + assert len(char) == 1 + underlined = f"{line}\n{click.style(char * len(line), fg='red', bold=True)}" + return underlined + + +class ReportHandler(logging.StreamHandler): + """Custom handler to group and style reports from :cls:`ContextReporter`. + + Attributes + ---------- + group_errors : bool + If ``True``, hold errors until :func:`emit_grouped` is called. + error_count : int + warning_count : int + level_to_color : ClassVar[dict[int, str]] + """ + + level_to_color = { # noqa: RUF012 + logging.DEBUG: "white", + logging.INFO: "cyan", + logging.WARNING: "yellow", + logging.ERROR: "red", + logging.CRITICAL: "red", + logging.FATAL: "red", + } + + def __init__(self, stream=None, group_errors=False): + """ + Parameters + ---------- + stream : TextIO + group_errors : bool, optional + """ + super().__init__(stream=stream) + self.group_errors = group_errors + self._records = [] + + self.error_count = 0 + self.warning_count = 0 + + # Be defensive about using click's non-public `should_strip_ansi` + try: + from click._compat import should_strip_ansi # noqa: PLC0415 + + self.strip_ansi = should_strip_ansi(self.stream) + except Exception: + self.strip_ansi = True + logger.exception("Unexpected error while using click's `should_strip_ansi`") + + def format(self, record): + """Format a log record. + + Parameters + ---------- + record : logging.LogRecord + + Returns + ------- + formatted : str + """ + msg = super().format(record) + + # Except for INFO level, style message + if record.levelno >= logging.WARNING: + msg = click.style(msg, bold=True) + if record.levelno == logging.DEBUG: + msg = click.style(msg, fg="white") + + # Prefix with a colored log ID, fallback to first char of level name + log_id = getattr(record, "log_id", record.levelname[0]) + if log_id: + log_id = click.style( + log_id, + bold=True, + fg=self.level_to_color.get(record.levelno), + ) + msg = f"{log_id} {msg}" + + # Normalize `src_location` to `list[str]` + # (may also be missing or a single `str`) + src_locations = getattr(record, "src_location", []) + if not isinstance(src_locations, list): + src_locations = [src_locations] + + # and append number if multiple locations exist + if len(src_locations) > 1: + msg = f"{msg} ({len(src_locations)}x)" + + # Append `details` with indent if present + details = getattr(record, "details", None) + if details: + # Allow same %-based formatting as for general log messages + if isinstance(details, tuple): + details = details[0] % details[1:] + indented = indent(details, prefix=" ").rstrip() + msg = f"{msg}\n{indented}" + + # Append locations + for location in sorted(src_locations): + location_styled = click.style(location, fg="magenta") + msg = f"{msg}\n {location_styled}" + + if self.strip_ansi: + msg = click.unstyle(msg) + + return msg + + def emit(self, record): + """Handle a log record. + + Parameters + ---------- + record : logging.LogRecord + """ + if record.levelno >= logging.ERROR: + self.error_count += 1 + elif record.levelno == logging.WARNING: + self.warning_count += 1 + + if self.group_errors and logging.WARNING <= record.levelno <= logging.ERROR: + self._records.append(record) + else: + super().emit(record) + + def emit_grouped(self): + """Emit all saved log records in groups. + + Saved log records that were not yet emitted will be emitted. Records + whose "message" including an optional "details" field are identical + will be grouped together. + """ + # Group by report + groups = {} + for record in self._records: + group_id = record.getMessage(), getattr(record, "details", "") + groups[group_id] = groups.get(group_id, []) + groups[group_id].append(record) + + # Show largest groups last + groups_by_size = sorted(groups.values(), key=lambda x: len(x)) + + # Emit by group + for records in groups_by_size: + merged_record = records[0] + merged_record.src_location = [ + getattr(r, "src_location", "") for r in records + ] + super().emit(merged_record) + + # Clear now emitted records + self._records = [] + + +def setup_logging(*, verbosity, group_errors): + """ + + Parameters + ---------- + verbosity : int + group_errors : bool + + Returns + ------- + handler : ReportHandler + """ + _VERBOSITY_LEVEL = { + -2: logging.CRITICAL + 1, # never print anything + -1: logging.ERROR, + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + + format_ = "%(message)s" + if verbosity >= 2: + format_ += " [loc=%(pathname)s:%(lineno)d, func=%(funcName)s, time=%(asctime)s]" + + formatter = logging.Formatter(format_) + handler = ReportHandler(group_errors=group_errors) + handler.setLevel(_VERBOSITY_LEVEL[verbosity]) + handler.setFormatter(formatter) + + # Only allow logging by docstub itself + handler.addFilter(logging.Filter("docstub")) + + logging.basicConfig( + level=_VERBOSITY_LEVEL[verbosity], + handlers=[handler], + ) + logging.captureWarnings(True) + + return handler diff --git a/src/docstub/_stubs.py b/src/docstub/_stubs.py index 583b0df..7947c04 100644 --- a/src/docstub/_stubs.py +++ b/src/docstub/_stubs.py @@ -16,9 +16,10 @@ from ._analysis import PyImport from ._docstrings import DocstringAnnotations, DoctypeTransformer, FallbackAnnotation -from ._utils import ErrorReporter, module_name_from_path +from ._report import ContextReporter +from ._utils import module_name_from_path -logger = logging.getLogger(__name__) +logger: logging.Logger = logging.getLogger(__name__) def try_format_stub(stub: str) -> str: @@ -29,12 +30,16 @@ def try_format_stub(stub: str) -> str: stub = isort.code(stub) except ImportError: logger.warning("isort is not available, couldn't sort imports") + except Exception: + logger.exception("Unexpected error while running isort") try: import black # noqa: PLC0415 stub = black.format_str(stub, mode=black.Mode(is_pyi=True)) except ImportError: logger.warning("black is not available, couldn't format stubs") + except Exception: + logger.exception("Unexpected error while formatting with black") return stub @@ -99,7 +104,7 @@ def _get_docstring_node(node): Returns ------- - docstring_node : cst.SimpleString | cst.ConcatenatedString | None + docstring_node : cst.SimpleString | cst.ConcatenatedString | None The node of the docstring if found. """ docstring_node = None @@ -139,8 +144,6 @@ def wrap(func): def wrapped(self, original_node, updated_node): try: return func(self, original_node, updated_node) - except (SystemError, KeyboardInterrupt): - raise except Exception: position = self.get_metadata( cst.metadata.PositionProvider, original_node @@ -185,15 +188,28 @@ def _docstub_comment_directives(cls): """ state = {"is_off": False} + class Filter: + @staticmethod + def filter(record): + # Demote any logging event to DEBUG level. Don't hide completely + # in case there are bugs in this code itself + record.levelno = logging.DEBUG + record.levelname = logging.getLevelName(logging.DEBUG) + record.msg = f"{record.msg} ('docstub: off' directive active!)" + return True + def wrap_leave_Comment(method): """Detect docstub comment directives and record the state.""" @wraps(method) def wrapped(self, original_node, updated_node): + reporter = self._reporter_with_ctx(original_node) if cstm.matches(original_node, cstm.Comment(value="# docstub: off")): + reporter.debug("Comment directive 'docstub: off'") state["is_off"] = True return cst.RemovalSentinel.REMOVE if cstm.matches(original_node, cstm.Comment(value="# docstub: on")): + reporter.debug("Comment directive 'docstub: on'") state["is_off"] = False return cst.RemovalSentinel.REMOVE return method(self, original_node, updated_node) @@ -206,10 +222,15 @@ def wrap_leave(method): @wraps(method) def wrapped(self, original_node, updated_node): if state["is_off"]: - # Pass a copy of updated_node and return unmodified one - updated_node_copy = updated_node.deep_clone() - method(self, original_node, updated_node_copy) - return updated_node + self.reporter.logger.addFilter(Filter) + try: + # Pass a copy of updated_node and return unmodified one + updated_node_copy = updated_node.deep_clone() + method(self, original_node, updated_node_copy) + return updated_node + finally: + self.reporter.logger.removeFilter(Filter) + # Just pass through return method(self, original_node, updated_node) @@ -278,18 +299,14 @@ def print_upper(x: Incomplete) -> None: ... ) _Annotation_None: ClassVar[cst.Annotation] = cst.Annotation(cst.Name("None")) - def __init__(self, *, matcher=None, reporter=None): + def __init__(self, *, matcher=None): """ Parameters ---------- matcher : ~.TypeMatcher - reporter : ~.ErrorReporter """ - if reporter is None: - reporter = ErrorReporter() - self.transformer = DoctypeTransformer(matcher=matcher) - self.reporter = reporter + self.reporter = ContextReporter(logger=logger) # Relevant docstring for the current context self._scope_stack = None # Entered module, class or function scopes self._pytypes_stack = None # Collected pytypes for each stack @@ -342,7 +359,7 @@ def python_to_stub(self, source, *, module_path=None): Parameters ---------- source : str - module_path : Path, optional + module_path : Path , optional The location of the source that is transformed into a stub file. If given, used to enhance logging & error messages with more context information. @@ -463,6 +480,7 @@ def leave_FunctionDef(self, original_node, updated_node): ------- updated_node : cst.FunctionDef """ + reporter = self._reporter_with_ctx(original_node) node_changes = {"body": self._body_replacement} ds_annotations = self._pytypes_stack.pop() @@ -478,21 +496,13 @@ def leave_FunctionDef(self, original_node, updated_node): else: # Notify about ignored docstring annotation - # TODO: either remove message or print only in verbose mode - position = self.get_metadata( - cst.metadata.PositionProvider, original_node - ).start - reporter = self.reporter.copy_with( - path=self.current_source, line=position.line - ) to_keep = _inline_node_as_code(original_node.returns.annotation) details = ( f"{reporter.underline(to_keep)} " f"ignoring docstring: {annotation_value}" ) - reporter.message( - short="Keeping existing inline return annotation", - details=details, + reporter.warn( + short="Keeping existing inline return annotation", details=details ) elif original_node.returns is None: @@ -515,6 +525,7 @@ def leave_Param(self, original_node, updated_node): ------- updated_node : cst.Param """ + reporter = self._reporter_with_ctx(original_node) node_changes = {} scope = self._scope_stack[-1] @@ -549,13 +560,6 @@ def leave_Param(self, original_node, updated_node): else: # Notify about ignored docstring annotation - # TODO: either remove message or print only in verbose mode - position = self.get_metadata( - cst.metadata.PositionProvider, original_node - ).start - reporter = self.reporter.copy_with( - path=self.current_source, line=position.line - ) to_keep = cst.Module([]).code_for_node( original_node.annotation.annotation ) @@ -563,7 +567,7 @@ def leave_Param(self, original_node, updated_node): f"{reporter.underline(to_keep)} " f"ignoring docstring: {annotation_value}" ) - reporter.message( + reporter.warn( short="Keeping existing inline parameter annotation", details=details, ) @@ -573,6 +577,7 @@ def leave_Param(self, original_node, updated_node): node_changes["annotation"] = self._Annotation_Incomplete import_ = PyImport.typeshed_Incomplete() self._required_imports.add(import_) + reporter.warn(f"Missing annotation for parameter '{name}'") if node_changes: updated_node = updated_node.with_changes(**node_changes) @@ -623,6 +628,8 @@ def leave_Assign(self, original_node, updated_node): ------- updated_node : cst.Assign or cst.FlattenSentinel """ + reporter = self._reporter_with_ctx(original_node) + target_names = [ name.value for target in updated_node.targets @@ -630,15 +637,18 @@ def leave_Assign(self, original_node, updated_node): ] if "__all__" in target_names: if len(target_names) > 1: - logger.warning( - "found `__all__` in assignment with multiple targets, not modifying it" + reporter.warn( + "found `__all__` in assignment with multiple targets, " + "not modifying it" ) return updated_node assert len(original_node.targets) > 0 if len(target_names) == 1: # Replace with annotated assignment - updated_node = self._create_annotated_assign(name=target_names[0]) + updated_node = self._create_annotated_assign( + name=target_names[0], reporter=reporter + ) else: # Unpack assignment with multiple targets into multiple annotated ones @@ -647,7 +657,7 @@ def leave_Assign(self, original_node, updated_node): for name in target_names: is_last = name == target_names[-1] sub_node = self._create_annotated_assign( - name=name, trailing_semicolon=not is_last + name=name, trailing_semicolon=not is_last, reporter=reporter ) unpacked.append(sub_node) updated_node = cst.FlattenSentinel(unpacked) @@ -666,6 +676,8 @@ def leave_AnnAssign(self, original_node, updated_node): ------- updated_node : cst.AnnAssign """ + reporter = self._reporter_with_ctx(original_node) + name = updated_node.target.value if updated_node.value is not None: @@ -703,20 +715,13 @@ def leave_AnnAssign(self, original_node, updated_node): elif pytype != FallbackAnnotation: # Notify about ignored docstring annotation - # TODO: either remove message or print only in verbose mode - position = self.get_metadata( - cst.metadata.PositionProvider, original_node - ).start - reporter = self.reporter.copy_with( - path=self.current_source, line=position.line - ) to_keep = cst.Module([]).code_for_node( updated_node.annotation.annotation ) details = ( f"{reporter.underline(to_keep)} ignoring docstring: {pytype.value}" ) - reporter.message( + reporter.warn( short="Keeping existing inline annotation for assignment", details=details, ) @@ -885,21 +890,20 @@ def _annotations_from_node(self, node): ) except (SystemExit, KeyboardInterrupt): raise - except Exception as e: - logger.exception( - "error while parsing docstring of `%s`:\n\n%s", - node.name.value, - e, - ) + except Exception: + reporter.error("could not parse docstring", exc_info=True) return annotations - def _create_annotated_assign(self, *, name, trailing_semicolon=False): + def _create_annotated_assign( + self, *, name, trailing_semicolon=False, reporter=None + ): """Create an annotated assign. Parameters ---------- name : str trailing_semicolon : bool, optional + reporter : ContextReporter, optional Returns ------- @@ -913,6 +917,8 @@ def _create_annotated_assign(self, *, name, trailing_semicolon=False): else: annotation = self._Annotation_Incomplete self._required_imports.add(PyImport.typeshed_Incomplete()) + if reporter: + reporter.warn(f"Missing annotation for assignment '{name}'") semicolon = ( cst.Semicolon(whitespace_after=cst.SimpleWhitespace(" ")) @@ -961,3 +967,18 @@ def _insert_instance_attributes(self, updated_node, attributes): ) return updated_node + + def _reporter_with_ctx(self, node): + """Return reporter with file and line information attached. + + Parameters + ---------- + node : cst.CSTNode + + Returns + ------- + reporter : ContextReporter + """ + position = self.get_metadata(cst.metadata.PositionProvider, node).start + reporter = self.reporter.copy_with(path=self.current_source, line=position.line) + return reporter diff --git a/src/docstub/_utils.py b/src/docstub/_utils.py index 781bba1..1d1cb89 100644 --- a/src/docstub/_utils.py +++ b/src/docstub/_utils.py @@ -1,13 +1,8 @@ -import dataclasses import itertools import re from functools import lru_cache -from pathlib import Path -from textwrap import indent from zlib import crc32 -import click - def accumulate_qualname(qualname, *, start_right=False): """Return possible partial names from a fully qualified one. @@ -128,233 +123,5 @@ def pyfile_checksum(path): return key -@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) -class ErrorReporter: - """Format error messages in context of a location in a file. - - Attributes - ---------- - path : - Path to a file for the current context. - line : - The line in the given file. - column : - The column in the given line. - - Examples - -------- - >>> from pathlib import Path - >>> rep = ErrorReporter() - >>> rep.message("Message") - Message - - >>> rep = rep.copy_with(path=Path("file/with/problems.py")) - >>> rep.copy_with(line=3).message("Message with line info") - file...problems.py:3: Message with line info - - >>> rep.copy_with(line=4, column=2).message("With line & column info") - file...problems.py:4:2: With line & column info - - >>> rep.message("Summary", details="More details") - file...problems.py: Summary - More details - - """ - - path: Path | None = None - line: int | None = None - column: int | None = None - - def copy_with(self, *, path=None, line=None, column=None, line_offset=None): - """Return a new copy with the modified attributes. - - Parameters - ---------- - path : Path, optional - line : int, optional - column : int, optional - line_offset : int, optional - - Returns - ------- - new : Self - """ - kwargs = dataclasses.asdict(self) - if path: - kwargs["path"] = path - if line: - kwargs["line"] = line - if line_offset: - kwargs["line"] += line_offset - if column: - kwargs["column"] = column - new = type(self)(**kwargs) - return new - - def message(self, short, *, details=None): - """Print a message in context of the saved location. - - Parameters - ---------- - short : str - A short summarizing message that shouldn't wrap over multiple lines. - details : str, optional - An optional multiline message with more details. - """ - message = click.style(short, bold=True) - location = self.format_location( - path=self.path, line=self.line, column=self.column - ) - if location: - message = f"{location}: {message}" - - if details: - indented = indent(details, prefix=" ") - message = f"{message}\n{indented}" - - message = f"{message.strip()}\n" - click.echo(message) - - def __post_init__(self): - if self.path is not None and not isinstance(self.path, Path): - msg = f"expected `path` to be of type `Path`, got {type(self.path)!r}" - raise TypeError(msg) - - @staticmethod - def format_location(*, path, line, column): - location = "" - if path: - location = path - if line: - location = f"{location}:{line}" - if column: - location = f"{location}:{column}" - if location: - location = click.style(location, fg="magenta") - return location - - @staticmethod - def underline(line): - underlined = f"{line}\n{click.style('^' * len(line), fg='red', bold=True)}" - return underlined - - -@dataclasses.dataclass(kw_only=True, frozen=True) -class GroupedErrorReporter(ErrorReporter): - """Format & group error messages in context of a location in a file. - - Examples - -------- - >>> from pathlib import Path - >>> rep = GroupedErrorReporter() - >>> rep.message("Syntax error") - >>> rep = rep.copy_with(path=Path("file/with/problems.py")) - >>> rep.copy_with(line=3).message("Syntax error") - >>> rep.copy_with(line=4, column=2).message("Unknown doctype") - >>> rep.message("Unknown doctype") - >>> rep.print_grouped() - Syntax error (x2) - - ...problems.py:3 - - Unknown doctype (x2) - ...problems.py - ...problems.py:4:2 - - """ - - _messages: list = dataclasses.field(default_factory=list) - - def copy_with(self, *, path=None, line=None, column=None, line_offset=None): - """Return a new copy with the modified attributes. - - Parameters - ---------- - path : Path, optional - line : int, optional - column : int, optional - line_offset : int, optional - - Returns - ------- - new : Self - """ - new = super().copy_with( - path=path, line=line, column=column, line_offset=line_offset - ) - # Explicitly override `_message` since super method relies on - # `dataclasses.asdict` which performs deep copies on lists, while - # we want to collect all messages in one list - object.__setattr__(new, "_messages", self._messages) - return new - - def message(self, short, *, details=None): - """Print a message in context of the saved location. - - Parameters - ---------- - short : str - A short summarizing message that shouldn't wrap over multiple lines. - details : str, optional - An optional multiline message with more details. - """ - self._messages.append( - { - "short": short.strip(), - "details": details.strip() if details else details, - "path": self.path, - "line": self.line, - "column": self.column, - } - ) - - def print_grouped(self): - """Print all collected messages in groups.""" - - def key(message): - return ( - message["short"] or "", - message["details"] or "", - message["path"] or Path(), - message["line"] or -1, - message["column"] or -1, - ) - - groups = {} - for message in sorted(self._messages, key=key): - group_name = (message["short"], message["details"]) - if group_name not in groups: - groups[group_name] = [] - groups[group_name].append(message) - - # Show largest groups last - groups_by_size = sorted(groups.items(), key=lambda x: len(x[1])) - - for (short, details), group in groups_by_size: - formatted = click.style(short, bold=True) - if len(group) > 1: - formatted = f"{formatted} (x{len(group)})" - if details: - indented = indent(details, prefix=" ") - formatted = f"{formatted}\n{indented}" - - occurrences = [] - for message in group: - location = ( - self.format_location( - path=message["path"], - line=message["line"], - column=message["column"], - ) - or "" - ) - occurrences.append(location) - occurrences = "\n".join(occurrences) - occurrences = indent(occurrences, prefix=" ") - formatted = f"{formatted}\n{occurrences}\n" - - click.echo(formatted) - - class DocstubError(Exception): """An error raised by docstub.""" diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 1050178..5f930a2 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -349,7 +349,7 @@ def test_nickname_infinite_loop(self, caplog): type_name, py_import = matcher.match("Foo") assert len(caplog.records) == 1 - assert "reached limit while resolving nicknames" in caplog.text + assert "Reached limit while resolving nicknames" in caplog.text assert type_name == "Foo" assert py_import == PyImport(implicit="Foo") diff --git a/tests/test_cli.py b/tests/test_cli.py index 2aa5088..d0f751a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -34,7 +34,7 @@ def test_no_cache(self, tmp_path_cwd, caplog): assert run_result.exit_code == 0 assert _cli._cache_dir_in_cwd().exists() # Check that no collected file was logged as "(cached)" - assert "(cached)" not in "\n".join(caplog.messages) + assert "cached" not in "\n".join(caplog.messages) # Third run with existing cache should use cache caplog.clear() @@ -43,7 +43,7 @@ def test_no_cache(self, tmp_path_cwd, caplog): assert run_result.exception is None assert run_result.exit_code == 0 # Check that at least one collected file was logged as "(cached)" - assert "(cached)" in "\n".join(caplog.messages) + assert "cached" in "\n".join(caplog.messages) # Fourth run with '--no-cache' should ignore existing cache caplog.clear() @@ -52,7 +52,7 @@ def test_no_cache(self, tmp_path_cwd, caplog): assert run_result.exception is None assert run_result.exit_code == 0 # Check that at least one collected file was logged as "(cached)" - assert "(cached)" not in "\n".join(caplog.messages) + assert "cached" not in "\n".join(caplog.messages) class Test_clean: diff --git a/tests/test_docstrings.py b/tests/test_docstrings.py index 5763230..6afe069 100644 --- a/tests/test_docstrings.py +++ b/tests/test_docstrings.py @@ -498,7 +498,7 @@ def test_args_kwargs(self): assert "kwargs" in annotations.parameters assert "**kargs" not in annotations.parameters - def test_missing_whitespace(self, capsys): + def test_missing_whitespace(self, caplog): """Check for warning if a whitespace is missing between parameter and colon. In this case, NumPyDoc parses the entire thing as the parameter name. @@ -513,8 +513,8 @@ def test_missing_whitespace(self, capsys): transformer = DoctypeTransformer() annotations = DocstringAnnotations(docstring, transformer=transformer) assert annotations.parameters["a"].value == "int" - captured = capsys.readouterr() - assert "Possibly missing whitespace" in captured.out + assert len(caplog.records) == 1 + assert "Possibly missing whitespace" in caplog.text def test_combined_numpydoc_params(self): docstring = dedent( diff --git a/tests/test_report.py b/tests/test_report.py new file mode 100644 index 0000000..818421a --- /dev/null +++ b/tests/test_report.py @@ -0,0 +1,135 @@ +import logging +from pathlib import Path +from textwrap import dedent + +import pytest + +from docstub._report import ContextReporter, ReportHandler + + +class Test_ContextReporter: + @pytest.mark.parametrize("level", ["debug", "info", "warn", "error"]) + def test_basic(self, level, caplog): + caplog.set_level(logging.DEBUG) + logger = logging.getLogger(__name__) + rep = ContextReporter(logger=logger) + + func = getattr(rep, level) + + func("Message") + assert len(caplog.records) == 1 + + record = caplog.records[0] + assert record.message == "Message" + assert record.details is None + assert not hasattr(record, "src_location") + assert record.levelname.startswith(level.upper()) + + def test_details(self, caplog): + logger = logging.getLogger(__name__) + rep = ContextReporter(logger=logger) + + rep.info("Message", details="More\nmultiline details.") + assert len(caplog.records) == 1 + + record = caplog.records[0] + assert record.message == "Message" + assert record.details == "More\nmultiline details." + assert not hasattr(record, "src_location") + + def test_src_location(self, caplog): + logger = logging.getLogger(__name__) + rep = ContextReporter(logger=logger, path=Path("foo.py"), line=3) + + rep.info("Message") + assert len(caplog.records) == 1 + + record = caplog.records[0] + assert record.message == "Message" + assert record.details is None + assert record.src_location == "foo.py:3" + + def test_copy_with(self, caplog): + logger = logging.getLogger(__name__) + + rep = ContextReporter(logger=logger, path=Path("foo.py"), line=3) + rep_new_path = rep.copy_with(path=Path("bar.py")) + rep_new_line = rep.copy_with(line=7) + rep_line_offset = rep.copy_with(line_offset=8) + + rep_new_path.info("Message") + rep_new_line.info("Message") + rep_line_offset.info("Message") + assert len(caplog.records) == 3 + + assert caplog.records[0].src_location == "bar.py:3" + assert caplog.records[1].src_location == "foo.py:7" + assert caplog.records[2].src_location == "foo.py:11" + + +@pytest.fixture +def log_record(): + record = logging.LogRecord( + name="testing", + level=logging.ERROR, + pathname=__file__, + lineno=0, + msg="The actual log message", + args=(), + exc_info=None, + ) + return record + + +class Test_ReportHandler: + def test_format(self, log_record): + log_record.details = "Multiline\ndetails" + log_record.src_location = "foo.py:42" + log_record.log_id = "E321" + + handler = ReportHandler() + result = handler.format(log_record) + + expected = dedent( + """ + E321 The actual log message + Multiline + details + foo.py:42 + """ + ).strip() + assert result == expected + + def test_format_multiple_locations(self, log_record): + log_record.details = "Some details" + log_record.src_location = ["foo.py:42", "bar.py", "a/path.py:100"] + log_record.log_id = "E321" + + handler = ReportHandler() + result = handler.format(log_record) + + expected = dedent( + """ + E321 The actual log message (3x) + Some details + a/path.py:100 + bar.py + foo.py:42 + """ + ).strip() + assert result == expected + + def test_format_details_with_args(self, log_record): + log_record.details = ("Details with args: %i, %f", 3, 0.5) + log_record.log_id = "E321" + + handler = ReportHandler() + result = handler.format(log_record) + + expected = dedent( + """ + E321 The actual log message + Details with args: 3, 0.500000 + """ + ).strip() + assert result == expected diff --git a/tests/test_stubs.py b/tests/test_stubs.py index 1fbab4c..83876c3 100644 --- a/tests/test_stubs.py +++ b/tests/test_stubs.py @@ -313,7 +313,7 @@ def test_keep_assign_param(self): result = transformer.python_to_stub(source) assert expected == result - def test_module_assign_conflict(self, capsys): + def test_module_assign_conflict(self, caplog): source = dedent( ''' """ @@ -333,13 +333,8 @@ def test_module_assign_conflict(self, capsys): result = transformer.python_to_stub(source) assert expected == result - captured = capsys.readouterr() - assert captured.out == ( - "Keeping existing inline annotation for assignment\n" - " str\n" - " ^^^ ignoring docstring: int\n" - "\n" - ) + assert caplog.messages == ["Keeping existing inline annotation for assignment"] + assert "ignoring docstring: int" in caplog.records[0].details def test_module_assign_no_conflict(self, capsys): source = dedent( @@ -387,7 +382,7 @@ class Foo: result = transformer.python_to_stub(source) assert expected == result - def test_class_assign_conflict(self, capsys): + def test_class_assign_conflict(self, caplog): source = dedent( ''' class Foo: @@ -409,15 +404,9 @@ class Foo: result = transformer.python_to_stub(source) assert expected == result - captured = capsys.readouterr() - assert captured.out == ( - "Keeping existing inline annotation for assignment\n" - " str\n" - " ^^^ ignoring docstring: Sized\n" - "\n" - ) + assert caplog.messages == ["Keeping existing inline annotation for assignment"] - def test_class_assign_no_conflict(self, capsys): + def test_class_assign_no_conflict(self, caplog): source = dedent( ''' class Foo: @@ -445,8 +434,7 @@ class Foo: # No warning should have been raised, since there is no conflict # between docstring and inline annotation - output = capsys.readouterr() - assert output.out == "" + assert caplog.messages == [] def test_param_keep_inline_annotation(self): source = dedent( @@ -464,7 +452,7 @@ def foo(a: str) -> None: ... result = transformer.python_to_stub(source) assert expected == result - def test_param_conflict(self, capsys): + def test_param_conflict(self, caplog): source = dedent( ''' def foo(a: int) -> None: @@ -485,13 +473,8 @@ def foo(a: int) -> None: ... result = transformer.python_to_stub(source) assert expected == result - captured = capsys.readouterr() - assert captured.out == ( - "Keeping existing inline parameter annotation\n" - " int\n" - " ^^^ ignoring docstring: Sized\n" - "\n" - ) + assert caplog.messages == ["Keeping existing inline parameter annotation"] + assert "ignoring docstring: Sized" in caplog.records[0].details def test_return_keep_inline_annotation(self): source = dedent( @@ -509,7 +492,7 @@ def foo() -> str: ... result = transformer.python_to_stub(source) assert expected == result - def test_return_conflict(self, capsys): + def test_return_conflict(self, caplog): source = dedent( ''' def foo() -> int: @@ -530,13 +513,8 @@ def foo() -> int: ... result = transformer.python_to_stub(source) assert expected == result - captured = capsys.readouterr() - assert captured.out == ( - "Keeping existing inline return annotation\n" - " int\n" - " ^^^ ignoring docstring: Sized\n" - "\n" - ) + assert caplog.messages == ["Keeping existing inline return annotation"] + assert "ignoring docstring: Sized" in caplog.records[0].details def test_preserved_type_comment(self): source = dedent(