diff --git a/common/graph.py b/common/graph.py index 8d8fa70..1049cb5 100644 --- a/common/graph.py +++ b/common/graph.py @@ -1,12 +1,5 @@ from pathlib import Path -from typing import ( - Any, - List, - Mapping, - Optional, - Set, - Tuple, -) +from typing import Any import networkx as nx @@ -19,9 +12,8 @@ logger = logging.getLogger(__name__) -def _get_module_path(process: OdoobinProcess, module_name: str) -> Optional[Path]: - """ - Find the path of a module within the Odoo addons paths. +def _get_module_path(process: OdoobinProcess, module_name: str) -> Path | None: + """Find the path of a module within the Odoo addons paths. :param process: The OdoobinProcess instance which knows about addon paths. :param module_name: The name of the module to find. @@ -37,9 +29,8 @@ def _get_module_path(process: OdoobinProcess, module_name: str) -> Optional[Path return None -def build_dependency_tree(process: OdoobinProcess, modules: List[str], max_level: int = 1) -> nx.DiGraph: - """ - Build a dependency tree for a list of Odoo modules. +def build_dependency_tree(process: OdoobinProcess, modules: list[str], max_level: int = 1) -> nx.DiGraph: + """Build a dependency tree for a list of Odoo modules. This method parses modules from the standard Odoo repositories (odoo, enterprise, design-themes), reads their manifests to find dependencies, @@ -51,8 +42,8 @@ def build_dependency_tree(process: OdoobinProcess, modules: List[str], max_level :return: A networkx.DiGraph representing the dependency tree. """ graph: nx.DiGraph = nx.DiGraph() - to_process: List[Tuple[str, int]] = [(m, 0) for m in modules] - processed: Set[str] = set() + to_process: list[tuple[str, int]] = [(m, 0) for m in modules] + processed: set[str] = set() while to_process: module_name, level = to_process.pop(0) @@ -65,14 +56,14 @@ def build_dependency_tree(process: OdoobinProcess, modules: List[str], max_level if max_level is not None and level >= max_level: continue - module_path: Optional[Path] = _get_module_path(process, module_name) + module_path: Path | None = _get_module_path(process, module_name) if not module_path: logger.warning(f"Module '{module_name}' not found in standard Odoo repositories.") continue - manifest: Optional[Mapping[str, Any]] = process.read_manifest(module_path / "__manifest__.py") + manifest: dict[str, Any] | None = process.read_manifest(module_path / "__manifest__.py") if manifest and "depends" in manifest: - dependencies: List[str] = manifest.get("depends", []) + dependencies: list[str] = manifest.get("depends", []) for dependency in dependencies: graph.add_edge(dependency, module_name) if dependency not in processed: @@ -81,24 +72,23 @@ def build_dependency_tree(process: OdoobinProcess, modules: List[str], max_level return graph -def print_dependency_tree(graph: nx.DiGraph, modules: List[str]) -> None: - """ - Prints the dependency tree and installation order for a given graph. +def print_dependency_tree(graph: nx.DiGraph, modules: list[str]) -> None: + """Print the dependency tree and installation order for a given graph. :param graph: The dependency graph, as returned by `build_dependency_tree`. :param modules: The list of initial modules to highlight in the output. """ console.print(string.stylize(f"\nDependency Tree for: {', '.join(modules)}\n", "bold underline")) - sorted_modules: List[str] = sorted(graph.nodes()) + sorted_modules: list[str] = sorted(graph.nodes()) for module in sorted_modules: - dependencies: List[str] = sorted(graph.predecessors(module)) + dependencies: list[str] = sorted(graph.predecessors(module)) if dependencies: console.print(f" {string.stylize(module, 'bold cyan')} -> {', '.join(dependencies)}") try: - installation_order: List[str] = list(nx.topological_sort(graph)) + installation_order: list[str] = list(nx.topological_sort(graph)) console.print(f"\n{string.stylize('Installation Order (Topological Sort):', 'bold underline')}\n") for module in installation_order: console.print(f" - {module}") diff --git a/common/llm.py b/common/llm.py index 2e139fd..0fdbc84 100644 --- a/common/llm.py +++ b/common/llm.py @@ -1,5 +1,3 @@ -from typing import Dict, List, Optional - import litellm from litellm import InternalServerError, ModelResponse, token_counter @@ -13,8 +11,8 @@ # A mapping of provider names to a list of model names to try in order of preference. # Note: These may be custom model names/aliases specific to a litellm proxy setup. -LLM_LIST: Dict[str, List[str]] = { - "Gemini": ["gemini/gemini-2.5-pro", "gemini/gemini-2.5-flash"], +LLM_LIST: dict[str, list[str]] = { + "Gemini": ["gemini/gemini-3-pro-preview", "gemini/gemini-2.5-pro", "gemini/gemini-2.5-flash"], "ChatGPT": ["chatgpt/gpt-5", "chatgpt/gpt-4.5"], "Claude": ["claude/claude-4", "claude/claude-3.7"], "Grok": ["grok/grok-4", "grok/grok-3"], @@ -26,11 +24,10 @@ class LLM: provider: str api_key: str - model: Optional[str] = None + model: str | None = None - def __init__(self, model_identifier: Optional[str] = None, api_key: Optional[str] = None): - """ - Initializes the LLM client. + def __init__(self, model_identifier: str | None = None, api_key: str | None = None): + """Initialize the LLM client. :param model_identifier: The name of the LLM provider (e.g., "Gemini", "ChatGPT") or a specific model identifier (e.g., "gemini/gemini-1.5-pro"). If a provider is given, it must @@ -51,9 +48,8 @@ def __init__(self, model_identifier: Optional[str] = None, api_key: Optional[str self.api_key = api_key - def completion(self, messages: List[Dict[str, str]], response_format: Optional[type] = None) -> Optional[str]: - """ - Sends a completion request to the configured LLM and expects a structured response. + def completion(self, messages: list[dict[str, str]], response_format: type | None = None) -> str | None: + """Send a completion request to the configured LLM and expect a structured response. If a specific model was provided during initialization, it will use that model. Otherwise, it iterates through a list of models for the configured provider, @@ -62,7 +58,7 @@ def completion(self, messages: List[Dict[str, str]], response_format: Optional[t :param messages: A list of messages forming the conversation history for the prompt. :return: The string content of the response, or `None` if all attempts fail. """ - model_list: List[str] = [self.model] if self.model else LLM_LIST.get(self.provider, []) + model_list: list[str] = [self.model] if self.model else LLM_LIST.get(self.provider, []) if not model_list: logger.error(f"No models are configured for the provider '{self.provider}'.") return None @@ -73,13 +69,21 @@ def completion(self, messages: List[Dict[str, str]], response_format: Optional[t logger.debug(f"Attempting completion with model: {model_name}") litellm.suppress_debug_info = True - response: ModelResponse = litellm.completion( # type: ignore + response: ModelResponse = litellm.completion( model=model_name, messages=messages, api_key=self.api_key, response_format=response_format, verbose=False, ) + except InternalServerError as e: + logger.warning(f"Model '{model_name}' failed with an internal server error: {e}") + # Continue to the next model in the list + except Exception as e: # noqa: BLE001 + # Catch other potential exceptions from litellm (e.g., validation, connection errors) + logger.error(f"An unexpected error occurred with model '{model_name}': {e}") + # Continue to the next model in the list + else: logger.info(f"Successfully received a response from {model_name}.") if response.usage: @@ -90,18 +94,9 @@ def completion(self, messages: List[Dict[str, str]], response_format: Optional[t ) # Callers expect the string content of the message. - if response.choices and response.choices[0].message.content: # type: ignore - return response.choices[0].message.content # type: ignore + if response.choices and response.choices[0].message.content: + return response.choices[0].message.content return None - except InternalServerError as e: - logger.warning(f"Model '{model_name}' failed with an internal server error: {e}") - # Continue to the next model in the list - except Exception as e: - # Catch other potential exceptions from litellm (e.g., validation, connection errors) - logger.error(f"An unexpected error occurred with model '{model_name}': {e}") - # Continue to the next model in the list - logger.error( - f"All configured models for provider '{self.provider}' failed. " f"Attempted: {', '.join(model_list)}" - ) + logger.error(f"All configured models for provider '{self.provider}' failed. Attempted: {', '.join(model_list)}") return None diff --git a/common/odoo_context.py b/common/odoo_context.py index 986b678..5b91e9f 100644 --- a/common/odoo_context.py +++ b/common/odoo_context.py @@ -1,16 +1,10 @@ import re import xml.etree.ElementTree as ET from pathlib import Path -from typing import ( - Any, - Dict, - List, - Optional, - Set, - Tuple, -) +from typing import Any import networkx as nx +from defusedxml import ElementTree as SafeET from odev.common.logging import logging from odev.common.odoobin import OdoobinProcess @@ -22,21 +16,17 @@ class OdooContext: - """ - Gathers and holds the context of an Odoo project based on a list of modules - and a development analysis. It inspects module files to retrieve relevant - source code for models, views, controllers, etc. + """Gathers and holds the context of an Odoo project based on a list of modules and a development analysis. + + It inspects module files to retrieve relevant source code for models, views, controllers, etc. """ def __init__(self, process: OdoobinProcess) -> None: - """ - :param process: The OdoobinProcess instance. - """ + """Initialize the OdooContext with the given process.""" self.process: OdoobinProcess = process def gather_po_context(self, po_content: str) -> dict: - """ - Gathers context from files referenced in PO file content. + """Gathers context from files referenced in PO file content. It looks for lines like `#: code:path/to/file:line_number`, extracts the unique file paths, reads their content, and returns @@ -71,13 +61,13 @@ def gather_po_context(self, po_content: str) -> dict: return context_files def _build_dependency_info( - self, initial_depends: List[str], dependency_level: int - ) -> Tuple[List[str], Dict[str, Path]]: - """Builds dependency graph, sorts modules topologically, and finds their paths.""" + self, initial_depends: list[str], dependency_level: int + ) -> tuple[list[str], dict[str, Path]]: + """Build dependency graph, sort modules topologically, and find their paths.""" logger.info("Building dependency tree for Odoo context...") dependency_graph = graph.build_dependency_tree(self.process, initial_depends, max_level=dependency_level) - sorted_modules: List[str] = [] - module_paths: Dict[str, Path] = {} + sorted_modules: list[str] = [] + module_paths: dict[str, Path] = {} try: if dependency_graph: sorted_modules = list(nx.topological_sort(dependency_graph)) @@ -94,20 +84,24 @@ def _build_dependency_info( return sorted_modules, module_paths def gather_context( - self, depends: Optional[List[str]] = None, analysis: Optional[Dict[str, Any]] = None, dependency_level: int = 0 - ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: - """Iterates over modules and gathers context based on the analysis.""" + self, + depends: list[str] | None = None, + analysis: dict[str, Any] | None = None, + override_module_name: str = "", + dependency_level: int = 0, + ) -> dict[str, dict[str, list[dict[str, Any]]]]: + """Iterate over modules and gather context based on the analysis.""" if analysis is None: analysis = {} - sorted_modules: List[str] = [] - module_paths: Dict[str, Path] = {} + sorted_modules: list[str] = [] + module_paths: dict[str, Path] = {} if depends: sorted_modules, module_paths = self._build_dependency_info(depends, dependency_level) logger.info(f"Gathering Odoo context from modules: {', '.join(sorted_modules)}") - context: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} + context: dict[str, dict[str, list[dict[str, Any]]]] = {} for module_name in sorted_modules: if module_name in ["base", "web", "mail", "utm"]: @@ -130,7 +124,7 @@ def gather_context( } self._gather_manifest(context, module_name, module_path) - self._gather_models(context, module_name, module_path, analysis) + self._gather_models(context, module_name, module_path, analysis, override_module_name) self._gather_views(context, module_name, module_path, analysis) self._gather_controllers(context, module_name, module_path, analysis) self._gather_assets(context, module_name, module_path, analysis) @@ -139,6 +133,13 @@ def gather_context( self._gather_website_templates(context, module_name, module_path, analysis) self._gather_data(context, module_name, module_path) + if logger.isEnabledFor(logging.DEBUG): + self._log_context_summary(context) + + return context + + def _log_context_summary(self, context: dict) -> None: + """Log a summary of the gathered context.""" summary_message = "Odoo Context Summary:\n" grand_total_items = grand_total_lines = grand_total_chars = 0 @@ -149,8 +150,8 @@ def gather_context( for category, items in sorted(categories.items()): if items: num_items = len(items) - num_lines = sum(len(item.get("content", "").splitlines()) for item in items) - num_chars = sum(len(item.get("content", "")) for item in items) + num_lines = sum(len(item.get("content", "")) for item in items) + num_chars = sum(len("\n".join(item.get("content", ""))) for item in items) grand_total_items += num_items grand_total_lines += num_lines grand_total_chars += num_chars @@ -162,8 +163,6 @@ def gather_context( ) logger.debug(summary_message) - return context - def _gather_manifest(self, context: dict, module_name: str, module_path: Path) -> None: """Gathers manifest file content.""" manifest_path = module_path / "__manifest__.py" @@ -171,23 +170,30 @@ def _gather_manifest(self, context: dict, module_name: str, module_path: Path) - context[module_name]["manifests"].append( { "path": str(manifest_path), - "content": manifest_path.read_text(), + "content": manifest_path.read_text().split("\n"), } ) - def _gather_models(self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any]) -> None: + def _gather_models( + self, + context: dict, + module_name: str, + module_path: Path, + analysis: dict[str, Any], + override_module_name: str = "", + ) -> None: """Gathers model files if they are loaded in __init__.py and present in the analysis.""" - models_dir = module_path / "models" init_py = models_dir / "__init__.py" - analysis_models: Set[str] = {m["name"] for m in analysis.get("models", [])} + analysis_models: set[str] = {m["name"] for m in analysis.get("models", [])} - if not models_dir.is_dir() or not init_py.exists() or not analysis_models: + if not models_dir.is_dir() or not init_py.exists(): return try: init_content = init_py.read_text() - loaded_py_files: List[Path] = [] + context[module_name]["models"].append({"path": str(init_py), "content": init_content}) + loaded_py_files: list[Path] = [] for line in init_content.splitlines(): match = re.match(r"^\s*from\s+\.\s+import\s+([\w, ]+)", line) if match: @@ -197,65 +203,84 @@ def _gather_models(self, context: dict, module_name: str, module_path: Path, ana loaded_py_files.append(py_file) for py_file in loaded_py_files: - content_lines = py_file.read_text().splitlines() - i = 0 - while i < len(content_lines): - line = content_lines[i] - class_match = re.match(r"^(\s*)class\s+.*:", line) - if not class_match: - i += 1 - continue - - class_indentation = len(class_match.group(1)) - class_block_lines: List[str] = [line] - - j = i + 1 - while j < len(content_lines): - next_line = content_lines[j] - if next_line.strip() and (len(next_line) - len(next_line.lstrip(" "))) <= class_indentation: - break - class_block_lines.append(next_line) - j += 1 - - class_content = "\n".join(class_block_lines) - declared: List[str] = re.findall(r"_name\s*=\s*['\"]([^'\"]+)['\"]", class_content) - inherited: List[str] = re.findall(r"_inherit\s*=\s*['\"]([^'\"]+)['\"]", class_content) - inherited_list_str = re.search(r"_inherit\s*=\s*\[([^\]]+)\]", class_content) - - if inherited_list_str: - inherited.extend(re.findall(r"['\"]([^'\"]+)['\"]", inherited_list_str.group(1))) - - if analysis_models.intersection(set(declared + inherited)): - context[module_name]["models"].append({"path": str(py_file), "content": class_content}) - i = j - except Exception as e: + self._process_model_file(context, module_name, py_file, analysis_models, override_module_name) + except Exception as e: # noqa: BLE001 logger.warning(f"Could not process models for {module_name}: {e}") - def _gather_views(self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any]) -> None: + def _process_model_file( + self, + context: dict, + module_name: str, + py_file: Path, + analysis_models: set[str], + override_module_name: str, + ) -> None: + """Process a single model file to extract relevant classes.""" + file_content = py_file.read_text() + content_lines = file_content.splitlines() + i = 0 + while i < len(content_lines): + line = content_lines[i] + class_match = re.match(r"^(\s*)class\s+.*:", line) + if not class_match: + i += 1 + continue + + class_indentation = len(class_match.group(1)) + class_block_lines: list[str] = [line] + + j = i + 1 + while j < len(content_lines): + next_line = content_lines[j] + if next_line.strip() and (len(next_line) - len(next_line.lstrip(" "))) <= class_indentation: + break + class_block_lines.append(next_line) + j += 1 + + class_content = "\n".join(class_block_lines) + declared: list[str] = re.findall(r"_name\s*=\s*['\"]([^'\"]+)['\"]", class_content) + inherited: list[str] = re.findall(r"_inherit\s*=\s*['\"]([^'\"]+)['\"]", class_content) + inherited_list_str = re.search(r"_inherit\s*=\s*\[([^\]]+)\]", class_content) + + if inherited_list_str: + inherited.extend(re.findall(r"['\"]([^'\"]+)['\"]", inherited_list_str.group(1))) + + if analysis_models.intersection(set(declared + inherited)) or ( + not analysis_models and module_name == override_module_name + ): + context[module_name]["models"].append( + { + "path": str(py_file), + "content": (file_content if module_name == override_module_name else class_content).split("\n"), + } + ) + i = j + + def _gather_views(self, context: dict, module_name: str, module_path: Path, analysis: dict[str, Any]) -> None: """Gathers view files based on models in the analysis.""" - analysis_views: List[Dict[str, Any]] = analysis.get("views", []) + analysis_views: list[dict[str, Any]] = analysis.get("views", []) if not analysis_views: return - analysis_view_models: Set[str] = {v["model"] for v in analysis_views if "model" in v} + analysis_view_models: set[str] = {v["model"] for v in analysis_views if "model" in v} for xml_file in module_path.rglob("*.xml"): try: content = xml_file.read_text() - tree = ET.fromstring(content) + tree = SafeET.fromstring(content) for record in tree.findall(".//record[@model='ir.ui.view']"): model_field = record.find("./field[@name='model']") if model_field is not None and model_field.text in analysis_view_models: - context[module_name]["views"].append({"path": str(xml_file), "content": content}) + context[module_name]["views"].append({"path": str(xml_file), "content": content.split("\n")}) break except (ET.ParseError, FileNotFoundError): continue - def _gather_controllers(self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any]) -> None: + def _gather_controllers(self, context: dict, module_name: str, module_path: Path, analysis: dict[str, Any]) -> None: """Gathers controller files by matching routes from the analysis.""" - analysis_controllers: List[Dict[str, Any]] = analysis.get("controller", []) + analysis_controllers: list[dict[str, Any]] = analysis.get("controller", []) if not analysis_controllers: return - analysis_routes: Set[str] = {c["action_name"] for c in analysis_controllers if "action_name" in c} + analysis_routes: set[str] = {c["action_name"] for c in analysis_controllers if "action_name" in c} controllers_dir = module_path / "controllers" if not controllers_dir.is_dir(): @@ -263,18 +288,18 @@ def _gather_controllers(self, context: dict, module_name: str, module_path: Path for py_file in controllers_dir.rglob("*.py"): content = py_file.read_text() - routes_in_file: List[str] = re.findall(r"@http\.route\(\s*['\"]([^'\"]+)['\"]", content) - routes_in_file_list: List[str] = re.findall(r"@http\.route\(\s*\[([^\]]+)\]", content) + routes_in_file: list[str] = re.findall(r"@http\.route\(\s*['\"]([^'\"]+)['\"]", content) + routes_in_file_list: list[str] = re.findall(r"@http\.route\(\s*\[([^\]]+)\]", content) for route_list in routes_in_file_list: routes_in_file.extend(re.findall(r"['\"]([^'\"]+)['\"]", route_list)) if analysis_routes.intersection(routes_in_file): - context[module_name]["controllers"].append({"path": str(py_file), "content": content}) + context[module_name]["controllers"].append({"path": str(py_file), "content": content.split("\n")}) - def _gather_assets(self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any]) -> None: + def _gather_assets(self, context: dict, module_name: str, module_path: Path, analysis: dict[str, Any]) -> None: """Gathers asset files by matching paths from the analysis.""" for asset in analysis.get("assets", []): - file_path_str: Optional[str] = asset.get("file_path") + file_path_str: str | None = asset.get("file_path") if not file_path_str: continue @@ -293,7 +318,7 @@ def _gather_assets(self, context: dict, module_name: str, module_path: Path, ana # Fallback to search by filename filename = Path(file_path_str).name for f in module_path.rglob(filename): - context[module_name]["assets"].append({"path": str(f), "content": f.read_text()}) + context[module_name]["assets"].append({"path": str(f), "content": f.read_text().split("\n")}) break def _gather_security(self, context: dict, module_name: str, module_path: Path) -> None: @@ -302,42 +327,44 @@ def _gather_security(self, context: dict, module_name: str, module_path: Path) - if security_dir.is_dir(): for sec_file in security_dir.iterdir(): if sec_file.is_file() and (sec_file.name.endswith(".csv") or sec_file.name.endswith(".xml")): - context[module_name]["security"].append({"path": str(sec_file), "content": sec_file.read_text()}) + context[module_name]["security"].append( + {"path": str(sec_file), "content": sec_file.read_text().split("\n")} + ) - def _gather_reports(self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any]) -> None: + def _gather_reports(self, context: dict, module_name: str, module_path: Path, analysis: dict[str, Any]) -> None: """Gathers report definition files based on models in the analysis.""" - report_models: Set[str] = {r["model"] for r in analysis.get("reports", []) if "model" in r} + report_models: set[str] = {r["model"] for r in analysis.get("reports", []) if "model" in r} if not report_models: return for xml_file in module_path.rglob("*.xml"): try: content = xml_file.read_text() - tree = ET.fromstring(content) + tree = SafeET.fromstring(content) for record in tree.findall(".//record[@model='ir.actions.report']"): model_field = record.find("./field[@name='model']") if model_field is not None and model_field.text in report_models: - context[module_name]["reports"].append({"path": str(xml_file), "content": content}) + context[module_name]["reports"].append({"path": str(xml_file), "content": content.split("\n")}) break except (ET.ParseError, FileNotFoundError): continue def _gather_website_templates( - self, context: dict, module_name: str, module_path: Path, analysis: Dict[str, Any] + self, context: dict, module_name: str, module_path: Path, analysis: dict[str, Any] ) -> None: """Gathers website template files by matching template IDs from the analysis.""" - view_ids: Set[str] = {v["view"] for v in analysis.get("website_views", []) if "view" in v} + view_ids: set[str] = {v["view"] for v in analysis.get("website_views", []) if "view" in v} if not view_ids: return for xml_file in module_path.rglob("*.xml"): try: content = xml_file.read_text() - tree = ET.fromstring(content) + tree = SafeET.fromstring(content) for template in tree.findall(".//template"): template_id = template.get("id") if template_id and template_id in view_ids: - context[module_name]["website"].append({"path": str(xml_file), "content": content}) + context[module_name]["website"].append({"path": str(xml_file), "content": content.split("\n")}) break except (ET.ParseError, FileNotFoundError): continue @@ -348,4 +375,6 @@ def _gather_data(self, context: dict, module_name: str, module_path: Path) -> No if data_dir.is_dir(): for data_file in data_dir.iterdir(): if data_file.is_file() and (data_file.name.endswith(".csv") or data_file.name.endswith(".xml")): - context[module_name]["data"].append({"path": str(data_file), "content": data_file.read_text()}) + context[module_name]["data"].append( + {"path": str(data_file), "content": data_file.read_text().split("\n")} + )