Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 143 additions & 50 deletions codra/indirection/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,60 +43,25 @@ def analyze_file(self, file_path: str) -> list[IndirectionResult]:
class_methods = self._collect_class_methods(tree)
aliases = self._collect_aliases(tree)
module_symbols = ModuleSymbolCollector().collect(tree)
builtin_names = set(dir(builtins))
stdlib_modules = set(sys.stdlib_module_names)
unit_collector = UnitNodeCollector(file_path=file_path)
unit_collector.visit(tree)
call_map = collect_calls_by_unit(unit_collector.units)
builtin_names = self._collect_builtin_names()
stdlib_modules = self._collect_stdlib_modules()
unit_nodes, call_map = self._collect_unit_calls(file_path, tree)
alias_cache: dict[str, str] = {}
call_graph = self._collect_call_graph(
function_names, class_methods, aliases, call_map, alias_cache
)
depth_cache: dict[str, int] = {}
results: list[IndirectionResult] = []
for unit_node in unit_collector.units:
class_name = (
unit_node.definition.qualified_id.split(".", 1)[0]
if unit_node.definition.kind == "method"
else None
)
class_method_names = (
class_methods.get(class_name, set()) if class_name else set()
)
call_names = call_map.get(unit_node.definition.qualified_id, [])
resolved_calls: list[str] = []
unresolved_calls: set[str] = set()
for name in call_names:
is_self_call = name.startswith("self.")
if is_self_call:
if class_name:
method_name = name.split(".", 1)[1]
if method_name in class_method_names:
resolved_calls.append(f"{class_name}.{method_name}")
continue
resolved = self._resolve_alias(name, aliases, alias_cache)
if resolved in function_names:
resolved_calls.append(resolved)
elif resolved in module_symbols:
continue
elif resolved in builtin_names:
continue
elif resolved in stdlib_modules:
continue
else:
unresolved_calls.add(name)
id_max, id_avg = self._calculate_indirection_depths(
resolved_calls, call_graph, depth_cache
)
results.append(
IndirectionResult(
unit=unit_node.definition,
id_max=id_max,
id_avg=id_avg,
unresolved_calls=sorted(unresolved_calls),
)
)
return results
return self._build_results(
unit_nodes=unit_nodes,
call_map=call_map,
class_methods=class_methods,
aliases=aliases,
function_names=function_names,
module_symbols=module_symbols,
builtin_names=builtin_names,
stdlib_modules=stdlib_modules,
alias_cache=alias_cache,
call_graph=call_graph,
)

def _parse_file(self, file_path: str) -> ast.AST:
with open(file_path, "r", encoding="utf-8") as handle:
Expand Down Expand Up @@ -156,6 +121,134 @@ def _collect_call_graph(
call_graph[qualified_name].append(resolved)
return call_graph

def _collect_builtin_names(self) -> set[str]:
return set(dir(builtins))

def _collect_stdlib_modules(self) -> set[str]:
return set(sys.stdlib_module_names)

def _collect_unit_calls(
self, file_path: str, tree: ast.AST
) -> tuple[list[UnitNode], dict[str, list[str]]]:
unit_collector = UnitNodeCollector(file_path=file_path)
unit_collector.visit(tree)
call_map = collect_calls_by_unit(unit_collector.units)
return unit_collector.units, call_map

def _build_results(
self,
unit_nodes: list[UnitNode],
call_map: dict[str, list[str]],
class_methods: dict[str, set[str]],
aliases: dict[str, str],
function_names: set[str],
module_symbols: set[str],
builtin_names: set[str],
stdlib_modules: set[str],
alias_cache: dict[str, str],
call_graph: dict[str, list[str]],
) -> list[IndirectionResult]:
depth_cache: dict[str, int] = {}
results: list[IndirectionResult] = []
for unit_node in unit_nodes:
class_name, class_method_names = self._class_context(
unit_node, class_methods
)
call_names = call_map.get(unit_node.definition.qualified_id, [])
resolved_calls, unresolved_calls = self._resolve_unit_calls(
call_names=call_names,
class_name=class_name,
class_method_names=class_method_names,
aliases=aliases,
alias_cache=alias_cache,
function_names=function_names,
module_symbols=module_symbols,
builtin_names=builtin_names,
stdlib_modules=stdlib_modules,
)
id_max, id_avg = self._calculate_indirection_depths(
resolved_calls, call_graph, depth_cache
)
results.append(
IndirectionResult(
unit=unit_node.definition,
id_max=id_max,
id_avg=id_avg,
unresolved_calls=sorted(unresolved_calls),
)
)
return results

def _class_context(
self, unit_node: UnitNode, class_methods: dict[str, set[str]]
) -> tuple[str | None, set[str]]:
if unit_node.definition.kind != "method":
return None, set()
class_name = unit_node.definition.qualified_id.split(".", 1)[0]
return class_name, class_methods.get(class_name, set())

def _resolve_unit_calls(
self,
call_names: list[str],
class_name: str | None,
class_method_names: set[str],
aliases: dict[str, str],
alias_cache: dict[str, str],
function_names: set[str],
module_symbols: set[str],
builtin_names: set[str],
stdlib_modules: set[str],
) -> tuple[list[str], set[str]]:
resolved_calls: list[str] = []
unresolved_calls: set[str] = set()
for name in call_names:
resolved_self = self._resolve_self_call(
name, class_name, class_method_names
)
if resolved_self:
resolved_calls.append(resolved_self)
continue
if name.startswith("self."):
continue
resolved = self._resolve_alias(name, aliases, alias_cache)
if resolved in function_names:
resolved_calls.append(resolved)
continue
if self._is_ignorable_call(
resolved, module_symbols, builtin_names, stdlib_modules
):
continue
unresolved_calls.add(name)
return resolved_calls, unresolved_calls

def _resolve_self_call(
self,
name: str,
class_name: str | None,
class_method_names: set[str],
) -> str | None:
if not name.startswith("self."):
return None
if not class_name:
return None
method_name = name.split(".", 1)[1]
if method_name not in class_method_names:
return None
return f"{class_name}.{method_name}"

def _is_ignorable_call(
self,
resolved: str,
module_symbols: set[str],
builtin_names: set[str],
stdlib_modules: set[str],
) -> bool:
return (
resolved in module_symbols
or resolved in builtin_names
or resolved in stdlib_modules
)

def _resolve_alias(
self, name: str, aliases: dict[str, str], alias_cache: dict[str, str]
) -> str:
Expand Down
Loading