From 3995a96b1c2c35be3e35be4b7185bcacc5cb66b1 Mon Sep 17 00:00:00 2001 From: Stephen Moore Date: Fri, 3 May 2024 01:24:05 +1000 Subject: [PATCH] Extract some code from yaml test item (#149) * Make this function return an int Prior to this a SystemExit exception could return the code as a string or an integer, now we convert it to a sensible integer to return * Move where the extension hook is executed It currently doesn't depend on the execution path existing, and we can make sure this runs before we create that path and run anything * Extract code to check mypy output This is so we can abstract away what is involved in checking the mypy output. This will also help when we extract running mypy so we don't have to pass around the information that is only needed for this check * Extract out code for running mypy This will make it easier for us to run mypy multiple times later on * Extract the running of the test item This object glues together the creation of test files, the running of mypy, and the checking of the result * Remove unused imports --- pytest_mypy_plugins/item.py | 376 +++++++++++++++++++++-------------- pytest_mypy_plugins/utils.py | 2 - 2 files changed, 224 insertions(+), 154 deletions(-) diff --git a/pytest_mypy_plugins/item.py b/pytest_mypy_plugins/item.py index 6f9afce..9d9643d 100644 --- a/pytest_mypy_plugins/item.py +++ b/pytest_mypy_plugins/item.py @@ -1,6 +1,7 @@ import importlib import io import os +import shutil import subprocess import sys import tempfile @@ -76,7 +77,7 @@ class ReturnCodes: FATAL_ERROR = 2 -def run_mypy_typechecking(cmd_options: List[str], stdout: TextIO, stderr: TextIO) -> Optional[Union[str, int]]: +def run_mypy_typechecking(cmd_options: List[str], stdout: TextIO, stderr: TextIO) -> int: fscache = FileSystemCache() sources, options = process_options(cmd_options, fscache=fscache) @@ -100,7 +101,16 @@ def flush_errors(*args: Any) -> None: build.build(sources, options, flush_errors=flush_errors, fscache=fscache, stdout=stdout, stderr=stderr) except SystemExit as sysexit: - return sysexit.code + # The code to a SystemExit is optional + # From python docs, if the code is None then the exit code is 0 + # Otherwise if the code is not an integer the exit code is 1 + code = sysexit.code + if code is None: + code = 0 + elif not isinstance(code, int): + code = 1 + + return code finally: fscache.flush() @@ -110,6 +120,184 @@ def flush_errors(*args: Any) -> None: return ReturnCodes.SUCCESS +class MypyExecutor: + def __init__( + self, + same_process: bool, + rootdir: Union[Path, None], + execution_path: Path, + environment_variables: Dict[str, Any], + mypy_executable: str, + ) -> None: + self.rootdir = rootdir + self.same_process = same_process + self.execution_path = execution_path + self.mypy_executable = mypy_executable + self.environment_variables = environment_variables + + def execute(self, mypy_cmd_options: List[str]) -> Tuple[int, Tuple[str, str]]: + # Returns (returncode, (stdout, stderr)) + if self.same_process: + return self._typecheck_in_same_process(mypy_cmd_options) + else: + return self._typecheck_in_new_subprocess(mypy_cmd_options) + + def _typecheck_in_new_subprocess(self, mypy_cmd_options: List[Any]) -> Tuple[int, Tuple[str, str]]: + # add current directory to path + self._collect_python_path(self.rootdir) + # adding proper MYPYPATH variable + self._collect_mypy_path(self.rootdir) + + # Windows requires this to be set, otherwise the interpreter crashes + if "SYSTEMROOT" in os.environ: + self.environment_variables["SYSTEMROOT"] = os.environ["SYSTEMROOT"] + + completed = subprocess.run( + [self.mypy_executable, *mypy_cmd_options], + capture_output=True, + cwd=os.getcwd(), + env=self.environment_variables, + ) + captured_stdout = completed.stdout.decode() + captured_stderr = completed.stderr.decode() + return completed.returncode, (captured_stdout, captured_stderr) + + def _typecheck_in_same_process(self, mypy_cmd_options: List[Any]) -> Tuple[int, Tuple[str, str]]: + return_code = -1 + with utils.temp_environ(), utils.temp_path(), utils.temp_sys_modules(): + # add custom environment variables + for key, val in self.environment_variables.items(): + os.environ[key] = val + + # add current directory to path + sys.path.insert(0, str(self.execution_path)) + + stdout = io.StringIO() + stderr = io.StringIO() + + with stdout, stderr: + return_code = run_mypy_typechecking(mypy_cmd_options, stdout=stdout, stderr=stderr) + stdout_value = stdout.getvalue() + stderr_value = stderr.getvalue() + + return return_code, (stdout_value, stderr_value) + + def _collect_python_path(self, rootdir: Optional[Path]) -> None: + python_path_parts = [] + + existing_python_path = os.environ.get("PYTHONPATH") + if existing_python_path: + python_path_parts.append(existing_python_path) + python_path_parts.append(str(self.execution_path)) + python_path_key = self.environment_variables.get("PYTHONPATH") + if python_path_key: + python_path_parts.append(maybe_to_abspath(python_path_key, rootdir)) + python_path_parts.append(python_path_key) + + self.environment_variables["PYTHONPATH"] = ":".join(python_path_parts) + + def _collect_mypy_path(self, rootdir: Optional[Path]) -> None: + mypy_path_parts = [] + + existing_mypy_path = os.environ.get("MYPYPATH") + if existing_mypy_path: + mypy_path_parts.append(existing_mypy_path) + mypy_path_key = self.environment_variables.get("MYPYPATH") + if mypy_path_key: + mypy_path_parts.append(maybe_to_abspath(mypy_path_key, rootdir)) + mypy_path_parts.append(mypy_path_key) + if rootdir: + mypy_path_parts.append(str(rootdir)) + + self.environment_variables["MYPYPATH"] = ":".join(mypy_path_parts) + + +class OutputChecker: + def __init__(self, expect_fail: bool, execution_path: Path, expected_output: List[OutputMatcher]) -> None: + self.expect_fail = expect_fail + self.execution_path = execution_path + self.expected_output = expected_output + + def check(self, ret_code: int, stdout: str, stderr: str) -> None: + mypy_output = stdout + stderr + if ret_code == ReturnCodes.FATAL_ERROR: + print(mypy_output, file=sys.stderr) + raise TypecheckAssertionError(error_message="Critical error occurred") + + output_lines = [] + for line in mypy_output.splitlines(): + output_line = replace_fpath_with_module_name(line, rootdir=self.execution_path) + output_lines.append(output_line) + try: + assert_expected_matched_actual(expected=self.expected_output, actual=output_lines) + except TypecheckAssertionError as e: + if not self.expect_fail: + raise e + else: + if self.expect_fail: + raise TypecheckAssertionError("Expected failure, but test passed") + + +class Runner: + def __init__( + self, + *, + files: List[File], + config: Config, + main_file: Path, + config_file: Optional[str], + disable_cache: bool, + mypy_executor: MypyExecutor, + output_checker: OutputChecker, + test_only_local_stub: bool, + incremental_cache_dir: str, + ) -> None: + self.files = files + self.config = config + self.main_file = main_file + self.config_file = config_file + self.mypy_executor = mypy_executor + self.disable_cache = disable_cache + self.output_checker = output_checker + self.test_only_local_stub = test_only_local_stub + self.incremental_cache_dir = incremental_cache_dir + + def run(self) -> None: + # start from main.py + mypy_cmd_options = self._prepare_mypy_cmd_options() + mypy_cmd_options.append(str(self.main_file)) + + # make files + for file in self.files: + self._make_test_file(file) + + returncode, (stdout, stderr) = self.mypy_executor.execute(mypy_cmd_options) + self.output_checker.check(returncode, stdout, stderr) + + def _make_test_file(self, file: File) -> None: + current_directory = Path.cwd() + fpath = current_directory / file.path + fpath.parent.mkdir(parents=True, exist_ok=True) + fpath.write_text(file.content) + + def _prepare_mypy_cmd_options(self) -> List[str]: + mypy_cmd_options = [ + "--show-traceback", + "--no-error-summary", + "--no-pretty", + "--hide-error-context", + ] + if not self.test_only_local_stub: + mypy_cmd_options.append("--no-silence-site-packages") + if not self.disable_cache: + mypy_cmd_options.extend(["--cache-dir", self.incremental_cache_dir]) + + if self.config_file: + mypy_cmd_options.append(f"--config-file={self.config_file}") + + return mypy_cmd_options + + class YamlTestItem(pytest.Item): def __init__( self, @@ -155,16 +343,6 @@ def __init__( self.base_pyproject_toml_fpath = None self.incremental_cache_dir = os.path.join(self.root_directory, ".mypy_cache") - def make_test_file(self, file: File) -> None: - current_directory = Path.cwd() - fpath = current_directory / file.path - fpath.parent.mkdir(parents=True, exist_ok=True) - fpath.write_text(file.content) - - def make_test_files_in_current_directory(self) -> None: - for file in self.files: - self.make_test_file(file) - def remove_cache_files(self, fpath_no_suffix: Path) -> None: cache_file = Path(self.incremental_cache_dir) cache_file /= ".".join([str(part) for part in sys.version_info[:2]]) @@ -186,55 +364,6 @@ def remove_cache_files(self, fpath_no_suffix: Path) -> None: ): parent_dir.rmdir() - def typecheck_in_new_subprocess( - self, execution_path: Path, mypy_cmd_options: List[Any] - ) -> Tuple[int, Tuple[str, str]]: - import shutil - - mypy_executable = shutil.which("mypy") - assert mypy_executable is not None, "mypy executable is not found" - - rootdir = getattr(getattr(self.parent, "config", None), "rootdir", None) - # add current directory to path - self._collect_python_path(rootdir, execution_path) - # adding proper MYPYPATH variable - self._collect_mypy_path(rootdir) - - # Windows requires this to be set, otherwise the interpreter crashes - if "SYSTEMROOT" in os.environ: - self.environment_variables["SYSTEMROOT"] = os.environ["SYSTEMROOT"] - - completed = subprocess.run( - [mypy_executable, *mypy_cmd_options], - capture_output=True, - cwd=os.getcwd(), - env=self.environment_variables, - ) - captured_stdout = completed.stdout.decode() - captured_stderr = completed.stderr.decode() - return completed.returncode, (captured_stdout, captured_stderr) - - def typecheck_in_same_process( - self, execution_path: Path, mypy_cmd_options: List[Any] - ) -> Tuple[Optional[Union[str, int]], Tuple[str, str]]: - with utils.temp_environ(), utils.temp_path(), utils.temp_sys_modules(): - # add custom environment variables - for key, val in self.environment_variables.items(): - os.environ[key] = val - - # add current directory to path - sys.path.insert(0, str(execution_path)) - - stdout = io.StringIO() - stderr = io.StringIO() - - with stdout, stderr: - return_code = run_mypy_typechecking(mypy_cmd_options, stdout=stdout, stderr=stderr) - stdout_value = stdout.getvalue() - stderr_value = stderr.getvalue() - - return return_code, (stdout_value, stderr_value) - def execute_extension_hook(self) -> None: extension_hook_fqname = self.config.option.mypy_extension_hook module_name, func_name = extension_hook_fqname.rsplit(".", maxsplit=1) @@ -252,47 +381,42 @@ def runtest(self) -> None: ) from e try: - execution_path = Path(temp_dir.name) + mypy_executable = shutil.which("mypy") + assert mypy_executable is not None, "mypy executable is not found" + rootdir = getattr(getattr(self.parent, "config", None), "rootdir", None) - with utils.cd(execution_path): - # extension point for derived packages - if ( - hasattr(self.config.option, "mypy_extension_hook") - and self.config.option.mypy_extension_hook is not None - ): - self.execute_extension_hook() - - # start from main.py - main_file = str(execution_path / "main.py") - mypy_cmd_options = self.prepare_mypy_cmd_options(execution_path) - mypy_cmd_options.append(main_file) - - # make files - self.make_test_files_in_current_directory() - - if self.same_process: - returncode, (stdout, stderr) = self.typecheck_in_same_process(execution_path, mypy_cmd_options) - else: - returncode, (stdout, stderr) = self.typecheck_in_new_subprocess(execution_path, mypy_cmd_options) - - mypy_output = stdout + stderr - if returncode == ReturnCodes.FATAL_ERROR: - print(mypy_output, file=sys.stderr) - raise TypecheckAssertionError(error_message="Critical error occurred") - - output_lines = [] - for line in mypy_output.splitlines(): - output_line = replace_fpath_with_module_name(line, rootdir=execution_path) - output_lines.append(output_line) - try: - assert_expected_matched_actual(expected=self.expected_output, actual=output_lines) - except TypecheckAssertionError as e: - if not self.expect_fail: - raise e - else: - if self.expect_fail: - raise TypecheckAssertionError("Expected failure, but test passed") + # extension point for derived packages + if ( + hasattr(self.config.option, "mypy_extension_hook") + and self.config.option.mypy_extension_hook is not None + ): + self.execute_extension_hook() + execution_path = Path(temp_dir.name) + with utils.cd(execution_path): + mypy_executor = MypyExecutor( + same_process=self.same_process, + execution_path=execution_path, + rootdir=rootdir, + environment_variables=self.environment_variables, + mypy_executable=mypy_executable, + ) + + output_checker = OutputChecker( + expect_fail=self.expect_fail, execution_path=execution_path, expected_output=self.expected_output + ) + + Runner( + files=self.files, + config=self.config, + main_file=execution_path / "main.py", + config_file=self.prepare_config_file(execution_path), + disable_cache=self.disable_cache, + mypy_executor=mypy_executor, + output_checker=output_checker, + test_only_local_stub=self.test_only_local_stub, + incremental_cache_dir=self.incremental_cache_dir, + ).run() finally: temp_dir.cleanup() # remove created modules @@ -303,24 +427,6 @@ def runtest(self) -> None: assert not os.path.exists(temp_dir.name) - def prepare_mypy_cmd_options(self, execution_path: Path) -> List[str]: - mypy_cmd_options = [ - "--show-traceback", - "--no-error-summary", - "--no-pretty", - "--hide-error-context", - ] - if not self.test_only_local_stub: - mypy_cmd_options.append("--no-silence-site-packages") - if not self.disable_cache: - mypy_cmd_options.extend(["--cache-dir", self.incremental_cache_dir]) - - config_file = self.prepare_config_file(execution_path) - if config_file: - mypy_cmd_options.append(f"--config-file={config_file}") - - return mypy_cmd_options - def prepare_config_file(self, execution_path: Path) -> Optional[str]: # Merge (`self.base_ini_fpath` or `base_pyproject_toml_fpath`) # and `self.additional_mypy_config` @@ -365,37 +471,3 @@ def reportinfo(self) -> Tuple[Union[py.path.local, Path, str], Optional[int], st path = getattr(self, "path", None) or getattr(self, "fspath") assert path return path, None, self.name - - def _collect_python_path( - self, - rootdir: Optional[Path], - execution_path: Path, - ) -> None: - python_path_parts = [] - - existing_python_path = os.environ.get("PYTHONPATH") - if existing_python_path: - python_path_parts.append(existing_python_path) - if execution_path: - python_path_parts.append(str(execution_path)) - python_path_key = self.environment_variables.get("PYTHONPATH") - if python_path_key: - python_path_parts.append(maybe_to_abspath(python_path_key, rootdir)) - python_path_parts.append(python_path_key) - - self.environment_variables["PYTHONPATH"] = ":".join(python_path_parts) - - def _collect_mypy_path(self, rootdir: Optional[Path]) -> None: - mypy_path_parts = [] - - existing_mypy_path = os.environ.get("MYPYPATH") - if existing_mypy_path: - mypy_path_parts.append(existing_mypy_path) - mypy_path_key = self.environment_variables.get("MYPYPATH") - if mypy_path_key: - mypy_path_parts.append(maybe_to_abspath(mypy_path_key, rootdir)) - mypy_path_parts.append(mypy_path_key) - if rootdir: - mypy_path_parts.append(str(rootdir)) - - self.environment_variables["MYPYPATH"] = ":".join(mypy_path_parts) diff --git a/pytest_mypy_plugins/utils.py b/pytest_mypy_plugins/utils.py index 77c6e9a..6eb3d1f 100644 --- a/pytest_mypy_plugins/utils.py +++ b/pytest_mypy_plugins/utils.py @@ -1,8 +1,6 @@ # Borrowed from Pew. # See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82 -import contextlib import inspect -import io import os import re import sys