diff --git a/Lib/_pyrepl/_module_completer.py b/Lib/_pyrepl/_module_completer.py index cf59e007f4df80..dce5b097004885 100644 --- a/Lib/_pyrepl/_module_completer.py +++ b/Lib/_pyrepl/_module_completer.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: from typing import Any, Iterable, Iterator, Mapping + from .types import CompletionAction HARDCODED_SUBMODULES = { @@ -52,11 +53,17 @@ class ModuleCompleter: def __init__(self, namespace: Mapping[str, Any] | None = None) -> None: self.namespace = namespace or {} self._global_cache: list[pkgutil.ModuleInfo] = [] + self._failed_imports: set[str] = set() self._curr_sys_path: list[str] = sys.path[:] self._stdlib_path = os.path.dirname(importlib.__path__[0]) - def get_completions(self, line: str) -> list[str] | None: - """Return the next possible import completions for 'line'.""" + def get_completions(self, line: str) -> tuple[list[str], CompletionAction | None] | None: + """Return the next possible import completions for 'line'. + + For attributes completion, if the module to complete from is not + imported, also return an action (prompt + callback to run if the + user press TAB again) to import the module. + """ result = ImportParser(line).parse() if not result: return None @@ -65,24 +72,26 @@ def get_completions(self, line: str) -> list[str] | None: except Exception: # Some unexpected error occurred, make it look like # no completions are available - return [] + return [], None - def complete(self, from_name: str | None, name: str | None) -> list[str]: + def complete(self, from_name: str | None, name: str | None) -> tuple[list[str], CompletionAction | None]: if from_name is None: # import x.y.z assert name is not None path, prefix = self.get_path_and_prefix(name) modules = self.find_modules(path, prefix) - return [self.format_completion(path, module) for module in modules] + return [self.format_completion(path, module) for module in modules], None if name is None: # from x.y.z path, prefix = self.get_path_and_prefix(from_name) modules = self.find_modules(path, prefix) - return [self.format_completion(path, module) for module in modules] + return [self.format_completion(path, module) for module in modules], None # from x.y import z - return self.find_modules(from_name, name) + submodules = self.find_modules(from_name, name) + attributes, action = self.find_attributes(from_name, name) + return sorted({*submodules, *attributes}), action def find_modules(self, path: str, prefix: str) -> list[str]: """Find all modules under 'path' that start with 'prefix'.""" @@ -129,6 +138,33 @@ def _is_stdlib_module(self, module_info: pkgutil.ModuleInfo) -> bool: return (isinstance(module_info.module_finder, FileFinder) and module_info.module_finder.path == self._stdlib_path) + def find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]: + """Find all attributes of module 'path' that start with 'prefix'.""" + attributes, action = self._find_attributes(path, prefix) + # Filter out invalid attribute names + # (for example those containing dashes that cannot be imported with 'import') + return [attr for attr in attributes if attr.isidentifier()], action + + def _find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]: + if path.startswith('.'): + # Convert relative path to absolute path + package = self.namespace.get('__package__', '') + path = self.resolve_relative_name(path, package) # type: ignore[assignment] + if path is None: + return [], None + + imported_module = sys.modules.get(path) + if not imported_module: + if path in self._failed_imports: # Do not propose to import again + return [], None + return [], self._get_import_completion_action(path) + try: + module_attributes = dir(imported_module) + except Exception: + module_attributes = [] + return [attr_name for attr_name in module_attributes + if self.is_suggestion_match(attr_name, prefix)], None + def is_suggestion_match(self, module_name: str, prefix: str) -> bool: if prefix: return module_name.startswith(prefix) @@ -200,6 +236,21 @@ def global_cache(self) -> list[pkgutil.ModuleInfo]: self._global_cache = list(pkgutil.iter_modules()) return self._global_cache + def _get_import_completion_action(self, path: str) -> CompletionAction: + prompt = ("[ module not imported, press again to import it " + "and propose attributes ]") + + def _do_import() -> str | None: + try: + importlib.import_module(path) + return None + except Exception as exc: + sys.modules.pop(path, None) # Clean half-imported module + self._failed_imports.add(path) + return f"[ error during import: {exc} ]" + + return (prompt, _do_import) + class ImportParser: """ diff --git a/Lib/_pyrepl/completing_reader.py b/Lib/_pyrepl/completing_reader.py index 9d2d43be5144e8..1a6060a745476f 100644 --- a/Lib/_pyrepl/completing_reader.py +++ b/Lib/_pyrepl/completing_reader.py @@ -29,8 +29,9 @@ # types Command = commands.Command -if False: - from .types import KeySpec, CommandName +TYPE_CHECKING = False +if TYPE_CHECKING: + from .types import KeySpec, CommandName, CompletionAction def prefix(wordlist: list[str], j: int = 0) -> str: @@ -168,15 +169,24 @@ def do(self) -> None: r: CompletingReader r = self.reader # type: ignore[assignment] last_is_completer = r.last_command_is(self.__class__) + if r.cmpltn_action: + if last_is_completer: # double-tab: execute action + msg = r.cmpltn_action[1]() + if msg: + r.msg = msg + else: # other input since last tab: cancel action + r.cmpltn_action = None + immutable_completions = r.assume_immutable_completions completions_unchangable = last_is_completer and immutable_completions stem = r.get_stem() if not completions_unchangable: - r.cmpltn_menu_choices = r.get_completions(stem) + r.cmpltn_menu_choices, r.cmpltn_action = r.get_completions(stem) completions = r.cmpltn_menu_choices if not completions: - r.error("no matches") + if not r.cmpltn_action: + r.error("no matches") elif len(completions) == 1: if completions_unchangable and len(completions[0]) == len(stem): r.msg = "[ sole completion ]" @@ -202,6 +212,16 @@ def do(self) -> None: r.msg = "[ not unique ]" r.dirty = True + if r.cmpltn_action: + if r.msg and r.cmpltn_message_visible: + # There is already a message (eg. [ not unique ]) that + # would conflict for next tab: cancel action + r.cmpltn_action = None + else: + r.msg = r.cmpltn_action[0] + r.cmpltn_message_visible = True + r.dirty = True + class self_insert(commands.self_insert): def do(self) -> None: @@ -240,6 +260,7 @@ class CompletingReader(Reader): cmpltn_message_visible: bool = field(init=False) cmpltn_menu_end: int = field(init=False) cmpltn_menu_choices: list[str] = field(init=False) + cmpltn_action: CompletionAction | None = field(init=False) def __post_init__(self) -> None: super().__post_init__() @@ -281,6 +302,7 @@ def cmpltn_reset(self) -> None: self.cmpltn_message_visible = False self.cmpltn_menu_end = 0 self.cmpltn_menu_choices = [] + self.cmpltn_action = None def get_stem(self) -> str: st = self.syntax_table @@ -291,8 +313,8 @@ def get_stem(self) -> str: p -= 1 return ''.join(b[p+1:self.pos]) - def get_completions(self, stem: str) -> list[str]: - return [] + def get_completions(self, stem: str) -> tuple[list[str], CompletionAction | None]: + return [], None def get_line(self) -> str: """Return the current line until the cursor position.""" diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py index 0ebd9162eca4bb..42979eb166f38f 100644 --- a/Lib/_pyrepl/reader.py +++ b/Lib/_pyrepl/reader.py @@ -381,9 +381,13 @@ def calc_screen(self) -> list[str]: self.screeninfo = screeninfo self.cxy = self.pos2xy() if self.msg: + width = self.console.width for mline in self.msg.split("\n"): - screen.append(mline) - screeninfo.append((0, [])) + # If self.msg is larger that console width, make it fit + # TODO: try to split between words? + for r in range((len(mline) - 1) // width + 1): + screen.append(mline[r * width : (r + 1) * width:]) + screeninfo.append((0, [])) self.last_refresh_cache.update_cache(self, screen, screeninfo) return screen diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py index 23b8fa6b9c7625..367d61873e81fb 100644 --- a/Lib/_pyrepl/readline.py +++ b/Lib/_pyrepl/readline.py @@ -55,7 +55,7 @@ # types Command = commands.Command from collections.abc import Callable, Collection -from .types import Callback, Completer, KeySpec, CommandName +from .types import Callback, Completer, KeySpec, CommandName, CompletionAction TYPE_CHECKING = False @@ -134,7 +134,7 @@ def get_stem(self) -> str: p -= 1 return "".join(b[p + 1 : self.pos]) - def get_completions(self, stem: str) -> list[str]: + def get_completions(self, stem: str) -> tuple[list[str], CompletionAction | None]: module_completions = self.get_module_completions() if module_completions is not None: return module_completions @@ -144,7 +144,7 @@ def get_completions(self, stem: str) -> list[str]: while p > 0 and b[p - 1] != "\n": p -= 1 num_spaces = 4 - ((self.pos - p) % 4) - return [" " * num_spaces] + return [" " * num_spaces], None result = [] function = self.config.readline_completer if function is not None: @@ -165,9 +165,9 @@ def get_completions(self, stem: str) -> list[str]: # emulate the behavior of the standard readline that sorts # the completions before displaying them. result.sort() - return result + return result, None - def get_module_completions(self) -> list[str] | None: + def get_module_completions(self) -> tuple[list[str], CompletionAction | None] | None: line = self.get_line() return self.config.module_completer.get_completions(line) diff --git a/Lib/_pyrepl/types.py b/Lib/_pyrepl/types.py index c5b7ebc1a406bd..e19607bf18e8b1 100644 --- a/Lib/_pyrepl/types.py +++ b/Lib/_pyrepl/types.py @@ -8,3 +8,4 @@ type Completer = Callable[[str, int], str | None] type CharBuffer = list[str] type CharWidths = list[int] +type CompletionAction = tuple[str, Callable[[], str | None]] diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index ad7464dc3d37c6..4025db4f35873e 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -957,7 +957,9 @@ def prepare_reader(self, events, namespace): reader = ReadlineAlikeReader(console=console, config=config) return reader - def test_import_completions(self): + @patch.dict(sys.modules, + {"importlib.resources": object()}) # don't propose to import it + def test_completions(self): cases = ( ("import path\t\n", "import pathlib"), ("import importlib.\t\tres\t\n", "import importlib.resources"), @@ -1005,12 +1007,13 @@ def test_private_completions(self): ModuleInfo(None, "_private", True), ], ) + @patch.dict(sys.modules, {"foo": object()}) # don't propose to import it def test_sub_module_private_completions(self): cases = ( # Return public methods by default ("from foo import \t\n", "from foo import public"), # Return private methods if explicitly specified - ("from foo import _\t\n", "from foo import _private"), + ("from foo import _p\t\n", "from foo import _private"), ) for code, expected in cases: with self.subTest(code=code): @@ -1031,12 +1034,15 @@ def test_builtin_completion_top_level(self): output = reader.readline() self.assertEqual(output, expected) - def test_relative_import_completions(self): + def test_relative_completions(self): cases = ( (None, "from .readl\t\n", "from .readl"), (None, "from . import readl\t\n", "from . import readl"), ("_pyrepl", "from .readl\t\n", "from .readline"), ("_pyrepl", "from . import readl\t\n", "from . import readline"), + ("_pyrepl", "from .readline import mul\t\n", "from .readline import multiline_input"), + ("_pyrepl", "from .. import toodeep\t\n", "from .. import toodeep"), + ("concurrent", "from .futures.i\t\n", "from .futures.interpreter"), ) for package, code, expected in cases: with self.subTest(code=code): @@ -1066,7 +1072,7 @@ def test_no_fallback_on_regular_completion(self): cases = ( ("import pri\t\n", "import pri"), ("from pri\t\n", "from pri"), - ("from typing import Na\t\n", "from typing import Na"), + ("from typong import Na\t\n", "from typong import Na"), ) for code, expected in cases: with self.subTest(code=code): @@ -1075,10 +1081,22 @@ def test_no_fallback_on_regular_completion(self): output = reader.readline() self.assertEqual(output, expected) + def test_global_cache(self): + with (tempfile.TemporaryDirectory() as _dir1, + patch.object(sys, "path", [_dir1, *sys.path])): + dir1 = pathlib.Path(_dir1) + (dir1 / "mod_aa.py").mkdir() + (dir1 / "mod_bb.py").mkdir() + events = code_to_events("import mod_a\t\nimport mod_b\t\n") + reader = self.prepare_reader(events, namespace={}) + output_1, output_2 = reader.readline(), reader.readline() + self.assertEqual(output_1, "import mod_aa") + self.assertEqual(output_2, "import mod_bb") + def test_hardcoded_stdlib_submodules(self): cases = ( ("import collections.\t\n", "import collections.abc"), - ("from os import \t\n", "from os import path"), + ("import os.\t\n", "import os.path"), ("import xml.parsers.expat.\t\te\t\n\n", "import xml.parsers.expat.errors"), ("from xml.parsers.expat import \t\tm\t\n\n", "from xml.parsers.expat import model"), ) @@ -1095,12 +1113,119 @@ def test_hardcoded_stdlib_submodules_not_proposed_if_local_import(self): (dir / "collections").mkdir() (dir / "collections" / "__init__.py").touch() (dir / "collections" / "foo.py").touch() - with patch.object(sys, "path", [dir, *sys.path]): + with patch.object(sys, "path", [_dir, *sys.path]): events = code_to_events("import collections.\t\n") reader = self.prepare_reader(events, namespace={}) output = reader.readline() self.assertEqual(output, "import collections.foo") + def test_attribute_completion_module_already_imported(self): + cases = ( + ("from collections import def\t\n", "from collections import defaultdict"), + ("from collections.abc import \tB\t\n", "from collections.abc import Buffer"), + ) + for code, expected in cases: + with self.subTest(code=code): + events = code_to_events(code) + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + self.assertEqual(output, expected) + + def test_attribute_completion_module_on_demand(self): + with tempfile.TemporaryDirectory() as _dir: + dir = pathlib.Path(_dir) + (dir / "foo.py").write_text("bar = 42") + (dir / "pack").mkdir() + (dir / "pack" / "__init__.py").write_text("attr = 42") + (dir / "pack" / "foo.py").touch() + (dir / "pack" / "bar.py").touch() + (dir / "pack" / "baz.py").touch() + with patch.object(sys, "path", [_dir, *sys.path]): + cases = ( + # needs 2 tabs to import (show prompt, then import) + ("from foo import \t\n", "from foo import ", set()), + ("from foo import \t\t\n", "from foo import bar", {"foo"}), + ("from foo import ba\t\n", "from foo import ba", set()), + ("from foo import ba\t\t\n", "from foo import bar", {"foo"}), + # reset if a character is inserted between tabs + ("from foo import \tb\ta\t\n", "from foo import ba", set()), + # packages: needs 3 tabs ([ not unique ], prompt, import) + ("from pack import \t\t\n", "from pack import ", set()), + ("from pack import \t\t\t\n", "from pack import ", {"pack"}), + ("from pack import \t\t\ta\t\n", "from pack import attr", {"pack"}), + # one match: needs 2 tabs (insert + show prompt, import) + ("from pack import f\t\n", "from pack import foo", set()), + ("from pack import f\t\t\n", "from pack import foo", {"pack"}), + # common prefix: needs 3 tabs (insert + [ not unique ], prompt, import) + ("from pack import b\t\n", "from pack import ba", set()), + ("from pack import b\t\t\n", "from pack import ba", set()), + ("from pack import b\t\t\t\n", "from pack import ba", {"pack"}), + ) + for code, expected, expected_imports in cases: + with self.subTest(code=code), patch.dict(sys.modules): + _imported = set(sys.modules.keys()) + events = code_to_events(code) + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + self.assertEqual(output, expected) + new_imports = sys.modules.keys() - _imported + self.assertEqual(new_imports, expected_imports) + + @patch.dict(sys.modules) + def test_attribute_completion_error_on_import(self): + with tempfile.TemporaryDirectory() as _dir: + dir = pathlib.Path(_dir) + (dir / "foo.py").write_text("bar = 42") + (dir / "boom.py").write_text("1 <> 2") + with patch.object(sys, "path", [_dir, *sys.path]): + cases = ( + ("from boom import \t\t\n", "from boom import "), + ("from foo import \t\t\n", "from foo import bar"), # still working + ) + for code, expected in cases: + with self.subTest(code=code): + events = code_to_events(code) + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + self.assertEqual(output, expected) + self.assertNotIn("boom", sys.modules) + + + def test_attribute_completion_error_on_attributes_access(self): + class BrokenModule: + def __dir__(self): + raise ValueError("boom") + + with (patch.dict(sys.modules, {"boom": BrokenModule()}), + patch("_pyrepl._module_completer.ModuleCompleter.iter_submodules", + lambda *_: [ModuleInfo(None, "submodule", False)])): + events = code_to_events("from boom import \t\n") + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + # ignore attributes, just propose submodule + self.assertEqual(output, "from boom import submodule") + + @patch.dict(sys.modules) + def test_attribute_completion_private_and_invalid_names(self): + with tempfile.TemporaryDirectory() as _dir: + dir = pathlib.Path(_dir) + (dir / "foo.py").write_text("_secret = 'bar'") + with patch.object(sys, "path", [_dir, *sys.path]): + mod = importlib.import_module("foo") + mod.__dict__["invalid-identifier"] = "baz" + cases = ( + ("from foo import \t\n", "from foo import "), + ("from foo import _s\t\n", "from foo import _secret"), + ("from foo import inv\t\n", "from foo import inv"), + ) + for code, expected in cases: + with self.subTest(code=code): + events = code_to_events(code) + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + self.assertEqual(output, expected) + + def test_get_path_and_prefix(self): cases = ( ('', ('', '')), @@ -1203,6 +1328,7 @@ def test_parse_error(self): 'import ..foo', 'import .foo.bar', 'import foo; x = 1', + 'import foo; 1,', 'import a.; x = 1', 'import a.b; x = 1', 'import a.b.; x = 1', @@ -1222,6 +1348,8 @@ def test_parse_error(self): 'from foo import import', 'from foo import from', 'from foo import as', + 'from \\x', # _tokenize SyntaxError -> tokenize TokenError + 'if 1:\n pass\n\tpass', # _tokenize TabError -> tokenize TabError ) for code in cases: parser = ImportParser(code) @@ -1229,8 +1357,64 @@ def test_parse_error(self): with self.subTest(code=code): self.assertEqual(actual, None) + def test_suggestions_and_messages(self) -> None: + # more unitary tests checking the exact suggestions provided + # (sorting, de-duplication, import action...) + _prompt = ("[ module not imported, press again to import it " + "and propose attributes ]") + _error = "[ error during import: division by zero ]" + with tempfile.TemporaryDirectory() as _dir: + dir = pathlib.Path(_dir) + (dir / "foo.py").write_text("bar = 42") + (dir / "boom.py").write_text("1/0") + (dir / "pack").mkdir() + (dir / "pack" / "__init__.py").write_text("foo = 1; bar = 2;") + (dir / "pack" / "bar.py").touch() + with patch.object(sys, "path", [_dir, *sys.path]): + cases = ( + # no match != not an import + ("import nope", ([], None), set()), + ("improt nope", None, set()), + # names sorting + ("import col", (["collections", "colorsys"], None), set()), + # module auto-import + ("import fo", (["foo"], None), set()), + ("from foo import ", ([], (_prompt, None)), {"foo"}), + ("from foo import ", (["bar"], None), set()), # now imported + ("from foo import ba", (["bar"], None), set()), + # error during import + ("from boom import ", ([], (_prompt, _error)), set()), + ("from boom import ", ([], None), set()), # do not retry + # packages + ("from collections import a", (["abc"], None), set()), + ("from pack import ", (["bar"], (_prompt, None)), {"pack"}), + ("from pack import ", (["bar", "foo"], None), set()), + ("from pack.bar import ", ([], (_prompt, None)), {"pack.bar"}), + ("from pack.bar import ", ([], None), set()), + ) + completer = ModuleCompleter() + for i, (code, expected, expected_imports) in enumerate(cases): + with self.subTest(code=code, i=i): + _imported = set(sys.modules.keys()) + result = completer.get_completions(code) + self.assertEqual(result is None, expected is None) + if result: + compl, act = result + self.assertEqual(compl, expected[0]) + self.assertEqual(act is None, expected[1] is None) + if act: + msg, func = act + self.assertEqual(msg, expected[1][0]) + act_result = func() + self.assertEqual(act_result, expected[1][1]) + + new_imports = sys.modules.keys() - _imported + self.assertSetEqual(new_imports, expected_imports) + for mod in new_imports: + self.addCleanup(sys.modules.pop, mod) class TestHardcodedSubmodules(TestCase): + @patch.dict(sys.modules) def test_hardcoded_stdlib_submodules_are_importable(self): for parent_path, submodules in HARDCODED_SUBMODULES.items(): for module_name in submodules: diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-11-01-01-49-52.gh-issue-140870.iknc12.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-11-01-01-49-52.gh-issue-140870.iknc12.rst new file mode 100644 index 00000000000000..aadf57622a424c --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-11-01-01-49-52.gh-issue-140870.iknc12.rst @@ -0,0 +1,2 @@ +Add support for module attributes in the :term:`REPL` auto-completion of +imports.