Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 3 additions & 29 deletions src/treemapper/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ class GraphArgs:
format: str = "mermaid"
summary: bool = False
level: str = "directory"
edge_types: list[str] | None = None
mermaid: bool = False
cycles: bool = False
hotspots: int | None = None
metrics: bool = False
impact: str | None = None
blast_radius: str | None = None


@dataclass
Expand Down Expand Up @@ -235,26 +228,15 @@ def _build_graph_parser() -> argparse.ArgumentParser:
default="mermaid",
help="Graph output format (default: mermaid)",
)
graph_parser.add_argument("--summary", action="store_true", help="Print graph summary statistics")
graph_parser.add_argument(
"--summary", action="store_true", help="Print graph statistics (cycles, hotspots, coupling metrics)"
)
graph_parser.add_argument(
"--level",
choices=["fragment", "file", "directory"],
default="directory",
help="Granularity level for graph operations (default: directory)",
)
graph_parser.add_argument(
"--edge-types",
default=None,
help="Comma-separated edge types to include (e.g., semantic,config)",
)
graph_parser.add_argument("--mermaid", action="store_true", help="Output graph as Mermaid diagram")
graph_parser.add_argument("--cycles", action="store_true", help="Detect dependency cycles")
graph_parser.add_argument(
"--hotspots", type=int, nargs="?", const=10, default=None, metavar="N", help="Show top N hotspots (default: 10)"
)
graph_parser.add_argument("--metrics", action="store_true", help="Show coupling/cohesion metrics per module")
graph_parser.add_argument("--impact", default=None, metavar="FILE", help="Show impact subgraph for a file")
graph_parser.add_argument("--blast-radius", default=None, metavar="FILE", help="Estimate blast radius for a file")
return graph_parser


Expand Down Expand Up @@ -360,7 +342,6 @@ def _build_graph_parsed_args(args: argparse.Namespace) -> ParsedArgs:
ignore_file = _resolve_ignore_file(args.ignore, root_dir)
whitelist_file = _resolve_whitelist_file(args.whitelist, root_dir)
verbosity = "error" if args.quiet else args.log_level
edge_types = [t.strip() for t in args.edge_types.split(",")] if args.edge_types else None

return ParsedArgs(
root_dir=root_dir,
Expand All @@ -381,13 +362,6 @@ def _build_graph_parsed_args(args: argparse.Namespace) -> ParsedArgs:
format=args.format,
summary=args.summary,
level=args.level,
edge_types=edge_types,
mermaid=args.mermaid,
cycles=args.cycles,
hotspots=args.hotspots,
metrics=args.metrics,
impact=args.impact,
blast_radius=args.blast_radius,
),
)

Expand Down
71 changes: 5 additions & 66 deletions src/treemapper/treemapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@
def _format_cycles(g: GraphArgs, pg: Any) -> str:
from .diffctx.graph_analytics import detect_cycles

edge_filter = set(g.edge_types) if g.edge_types else {"semantic"}
cycles = detect_cycles(pg, level=g.level, edge_types=edge_filter)
cycles = detect_cycles(pg, level=g.level, edge_types={"semantic"})
if not cycles:
return "No dependency cycles detected."
lines = [f"{len(cycles)} dependency cycle(s) detected:\n"]
Expand All @@ -118,11 +117,10 @@
return "\n".join(lines)


def _format_hotspots(g: GraphArgs, pg: Any) -> str:

Check warning on line 120 in src/treemapper/treemapper.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused function parameter "g".

See more on https://sonarcloud.io/project/issues?id=nikolay-e_TreeMapper&issues=AZ1YwiByYOdke2T56jhE&open=AZ1YwiByYOdke2T56jhE&pullRequest=36
from .diffctx.graph_analytics import hotspots

edge_filter = set(g.edge_types) if g.edge_types else set(_ARCHITECTURAL_EDGE_TYPES)
hot = hotspots(pg, top=g.hotspots or 10, edge_types=edge_filter)
hot = hotspots(pg, top=10, edge_types=set(_ARCHITECTURAL_EDGE_TYPES))
lines = [f"Top {len(hot)} hotspots:"]
for rank, (name, score, details) in enumerate(hot, 1):
lines.append(f" {rank}. {name} score={score} out_degree={details['out_degree']} churn={details['churn']}")
Expand All @@ -132,8 +130,7 @@
def _format_metrics(g: GraphArgs, pg: Any) -> str:
from .diffctx.graph_analytics import coupling_metrics

edge_filter = set(g.edge_types) if g.edge_types else set(_ARCHITECTURAL_EDGE_TYPES)
metrics = coupling_metrics(pg, level=g.level, edge_types=edge_filter)
metrics = coupling_metrics(pg, level=g.level, edge_types=set(_ARCHITECTURAL_EDGE_TYPES))
lines = [f"Module metrics ({g.level} level):"]
for m in metrics:
flags = ""
Expand All @@ -148,52 +145,6 @@
return "\n".join(lines)


def _format_impact(g: GraphArgs, pg: Any) -> str:
from pathlib import Path

from .diffctx.ppr import personalized_pagerank
from .diffctx.project_graph import _relative_path

assert g.impact is not None
seed_path = Path(g.impact).resolve()
seed_fids = {fid for fid in pg.fragments if fid.path.resolve() == seed_path}
if not seed_fids:
logger.error("File '%s' not found in project graph", g.impact)
sys.exit(1)
scores = personalized_pagerank(
pg.graph, seeds=set(seed_fids), alpha=0.5, seed_weights={fid: 1.0 / len(seed_fids) for fid in seed_fids}
)
ranked = sorted(scores.items(), key=lambda x: -x[1])
lines = [f"Impact subgraph for {g.impact}:"]
seen_files: set[str] = set()
for fid, score in ranked[:30]:
rel = _relative_path(fid.path, pg.root_dir)
if rel not in seen_files:
seen_files.add(rel)
lines.append(f" {rel} relevance={score:.4f}")
return "\n".join(lines)


def _format_blast_radius(g: GraphArgs, pg: Any) -> str:
from pathlib import Path

from .diffctx.graph_analytics import blast_radius

assert g.blast_radius is not None
seed_path = Path(g.blast_radius).resolve()
result = blast_radius(pg, seed_files=[seed_path])
lines = [f"Blast radius for {g.blast_radius}:"]
for key, entries in result.items():
if key == "summary":
lines.append(f"\n Summary: {', '.join(e[0] for e in entries)}")
else:
depth_num = key.replace("depth_", "")
lines.append(f"\n Depth {depth_num}: {len(entries)} file(s)")
for name, count in entries:
lines.append(f" {name} ({count} fragment(s))")
return "\n".join(lines)


def _graph_to_string(pg: Any, fmt: str, level: str = "directory") -> str:
from .diffctx.graph_analytics import quotient_graph, to_mermaid
from .diffctx.graph_export import graph_to_graphml_string, graph_to_json_string
Expand All @@ -207,7 +158,6 @@


def _handle_graph_mode(args: ParsedArgs) -> str:
from .diffctx.graph_analytics import quotient_graph, to_mermaid
from .diffctx.graph_export import graph_summary
from .diffctx.project_graph import build_project_graph

Expand All @@ -225,22 +175,11 @@

if g.summary:
parts.append(graph_summary(pg))
if g.cycles:
parts.append(_format_cycles(g, pg))
if g.hotspots is not None:
parts.append(_format_hotspots(g, pg))
if g.metrics:
parts.append(_format_metrics(g, pg))
if g.impact:
parts.append(_format_impact(g, pg))
if g.blast_radius:
parts.append(_format_blast_radius(g, pg))
if g.mermaid:
qg = quotient_graph(pg, level=g.level)
parts.append(to_mermaid(qg))

has_analysis_flag = any([g.summary, g.cycles, g.hotspots is not None, g.metrics, g.impact, g.blast_radius, g.mermaid])
if not has_analysis_flag:

if not g.summary:
parts.append(_graph_to_string(pg, g.format, level=g.level))

return "\n".join(parts) + "\n" if parts else ""
Expand Down
65 changes: 4 additions & 61 deletions tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from tests.framework.pygit2_backend import Pygit2Repo
from treemapper.diffctx.graph_analytics import (
QuotientGraph,
blast_radius,
coupling_metrics,
detect_cycles,
hotspots,
Expand Down Expand Up @@ -383,26 +382,9 @@ def test_summary(self, graph_git_project):
out = result.stdout.lower()
assert "nodes" in out
assert "edges" in out

def test_mermaid(self, graph_git_project):
result = _run_graph_cli([".", "--mermaid", "-q"], cwd=graph_git_project)
assert result.returncode == 0
assert result.stdout.strip().startswith("graph LR")

def test_cycles(self, graph_git_project):
result = _run_graph_cli([".", "--cycles", "-q"], cwd=graph_git_project)
assert result.returncode == 0
assert "cycle" in result.stdout.lower()

def test_hotspots(self, graph_git_project):
result = _run_graph_cli([".", "--hotspots", "3", "-q"], cwd=graph_git_project)
assert result.returncode == 0
assert "hotspot" in result.stdout.lower()

def test_metrics(self, graph_git_project):
result = _run_graph_cli([".", "--metrics", "-q"], cwd=graph_git_project)
assert result.returncode == 0
assert "cohesion" in result.stdout.lower()
assert "cycle" in out
assert "hotspot" in out
assert "cohesion" in out

def test_output_file(self, graph_git_project):
out_file = graph_git_project / "graph.json"
Expand All @@ -413,7 +395,7 @@ def test_output_file(self, graph_git_project):
assert data["type"] == "project_graph"

def test_level_directory_mermaid(self, graph_git_project):
result = _run_graph_cli([".", "--mermaid", "--level", "directory", "-q"], cwd=graph_git_project)
result = _run_graph_cli([".", "-f", "mermaid", "--level", "directory", "-q"], cwd=graph_git_project)
assert result.returncode == 0
out = result.stdout
assert out.strip().startswith("graph LR")
Expand Down Expand Up @@ -558,45 +540,6 @@ def test_directory_level_names(self, graph_project):
assert any("src" in n for n in names)


class TestGraphBlastRadius:
def test_central_file_has_dependents(self, graph_project):
pg = _build_graph(graph_project)
models_path = graph_project / "src" / "models.py"
result = blast_radius(pg, seed_files=[models_path])
all_entries = []
for key in result:
if key.startswith("depth_"):
all_entries.extend(result[key])
if pg.edge_count > 0:
assert len(all_entries) > 0

def test_leaf_file_zero_radius(self, graph_project):
pg = _build_graph(graph_project)
helpers_path = graph_project / "utils" / "helpers.py"
result = blast_radius(pg, seed_files=[helpers_path])
for key in result:
if key.startswith("depth_"):
for name, count in result[key]:
assert not name.startswith("utils/helpers")

def test_summary_present(self, graph_project):
pg = _build_graph(graph_project)
models_path = graph_project / "src" / "models.py"
result = blast_radius(pg, seed_files=[models_path])
assert "summary" in result
summary_text = " ".join(e[0] for e in result["summary"])
assert "reachable_files" in summary_text
assert "reachable_fragments" in summary_text

def test_nonexistent_file_empty(self, graph_project):
pg = _build_graph(graph_project)
fake_path = graph_project / "nonexistent.py"
result = blast_radius(pg, seed_files=[fake_path])
for key in result:
if key.startswith("depth_"):
assert len(result[key]) == 0


class TestGraphExportGraphML:
def test_valid_xml(self, graph_project):
pg = _build_graph(graph_project)
Expand Down
Loading