In [1]:
"""
Stage 3 — Build per-tag dependency graphs and metrics (Python-only).
Inputs:  data/<repo_slug>/curated/versions.csv  (from Stage 2)
Outputs: data/<repo_slug>/curated/{modules,edges,metrics}.csv (+ .parquet)
"""
from __future__ import annotations
import os, sys, re, ast, json, tempfile
from pathlib import Path
from typing import Iterable, Tuple, Dict, Set, List

import pandas as pd
import networkx as nx
from git import Repo as GitRepo
from tqdm import tqdm
from radon.complexity import cc_visit

# ------------------- CONFIG -------------------
REPO_SLUG = "fastapi"
REPO_PATH = Path("../external/fastapi")  # local checkout path
CURATED_DIR = Path(f"../data/{REPO_SLUG}/curated")
LOGS_DIR = Path(f"../data/{REPO_SLUG}/logs")
EXCLUDE_DIRS = {".git", ".venv", "venv", "env", "build", "dist", "site-packages", "docs", "doc", "examples", "scripts", "tests", "test"}
EXCLUDE_FILE_REGEX = re.compile(r".*\.(pyi|pyc)$", re.IGNORECASE)

# If you want to limit to recent tags, set a number; else None to use all in versions.csv
MAX_TAGS = 25  # e.g., 15

In [2]:
# ------------------- IO HELPERS -------------------
def atomic_write_csv(df: pd.DataFrame, path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    with tempfile.NamedTemporaryFile("w", delete=False, dir=str(path.parent), suffix=".csv") as tmp:
        df.to_csv(tmp.name, index=False)
        tmp_path = Path(tmp.name)
    tmp_path.replace(path)

def atomic_write_parquet(df: pd.DataFrame, path: Path):
    try:
        import pyarrow  # noqa
    except Exception:
        return
    path.parent.mkdir(parents=True, exist_ok=True)
    with tempfile.NamedTemporaryFile("wb", delete=False, dir=str(path.parent), suffix=".parquet") as tmp:
        df.to_parquet(tmp.name, index=False)
        tmp_path = Path(tmp.name)
    tmp_path.replace(path)

def load_versions(curated_dir: Path) -> pd.DataFrame:
    df = pd.read_csv(curated_dir / "versions.csv", parse_dates=["date"])
    df = df.sort_values("date").reset_index(drop=True)
    if MAX_TAGS:
        df = df.tail(MAX_TAGS).reset_index(drop=True)
    # enforce dtypes
    df["id"] = df["id"].astype("int64")
    df["tag"] = df["tag"].astype("string")
    return df

# ------------------- FILE DISCOVERY -------------------
def iter_python_files(root: Path) -> Iterable[Path]:
    for dirpath, dirnames, filenames in os.walk(root):
        # prune excluded dirs
        base = Path(dirpath)
        dirnames[:] = [d for d in dirnames if d not in EXCLUDE_DIRS and not d.startswith(".")]
        for fn in filenames:
            if not fn.endswith(".py"):
                continue
            if EXCLUDE_FILE_REGEX.match(fn):
                continue
            p = base / fn
            yield p

def path_to_module(repo_root: Path, file_path: Path) -> str:
    rel = file_path.relative_to(repo_root).with_suffix("")  # drop .py
    parts = list(rel.parts)
    # remove leading non-package dirs if they lack __init__.py (best-effort)
    # we still form dotted path from rel
    mod = ".".join(parts)
    if mod.endswith("__init__"):
        mod = ".".join(parts[:-1])  # package module
    return mod

def discover_repo_modules(repo_root: Path) -> Tuple[Dict[str, Path], Set[str]]:
    """
    Returns:
      module_by_name: module -> path
      top_level_pkgs: set of first segments (module namespaces in repo)
    """
    module_by_name: Dict[str, Path] = {}
    for fp in iter_python_files(repo_root):
        mod = path_to_module(repo_root, fp)
        if mod:
            module_by_name[mod] = fp
    top_level = {m.split(".", 1)[0] for m in module_by_name.keys()}
    return module_by_name, top_level

# ------------------- AST IMPORT PARSING -------------------
def extract_imports_from_source(src: str, this_module: str) -> Set[str]:
    """
    Parse imports from Python source. Returns set of dotted module names (may be partial).
    """
    try:
        tree = ast.parse(src)
    except SyntaxError:
        return set()

    imports: Set[str] = set()
    pkg = this_module.rsplit(".", 1)[0] if "." in this_module else ""

    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                if alias.name:
                    imports.add(alias.name)  # e.g., "fastapi", "fastapi.routing"

        elif isinstance(node, ast.ImportFrom):
            # from .sub import x  | from .. import y
            base = node.module or ""
            level = getattr(node, "level", 0) or 0
            resolved = base
            if level > 0:
                # resolve relative to pkg
                base_parts = pkg.split(".") if pkg else []
                if level <= len(base_parts):
                    resolved = ".".join(base_parts[:len(base_parts) - level + (1 if base else 0)])
                    if base:
                        resolved = (resolved + "." + base) if resolved else base
                else:
                    resolved = base  # fallback
            if resolved:
                imports.add(resolved)
            # also add explicit submodules if present in names (e.g., from fastapi import routing)
            for alias in node.names:
                if alias.name and resolved:
                    imports.add(f"{resolved}.{alias.name}".rstrip("."))

    # normalize trivial artifacts like trailing dots
    imports = {i.strip(".") for i in imports if i and i.strip(".")}
    return imports

def parse_file_imports(file_path: Path, module_name: str) -> Set[str]:
    try:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            src = f.read()
    except Exception:
        return set()
    return extract_imports_from_source(src, module_name)

# ------------------- GRAPH & METRICS -------------------
def filter_internal_imports(imports: Set[str], repo_namespaces: Set[str], module_by_name: Dict[str, Path]) -> Set[str]:
    """
    Keep only imports that refer to modules inside the repository.
    Heuristic: first segment must be in repo_namespaces, and we try to match by prefix
    to an existing module_by_name key.
    """
    internal: Set[str] = set()
    for imp in imports:
        first = imp.split(".", 1)[0]
        if first not in repo_namespaces:
            continue
        # normalize to the closest known module by prefix
        if imp in module_by_name:
            internal.add(imp)
            continue
        # find longest prefix that matches an existing module
        parts = imp.split(".")
        for k in range(len(parts), 0, -1):
            candidate = ".".join(parts[:k])
            if candidate in module_by_name:
                internal.add(candidate)
                break
    return internal

def compute_cyclomatic(file_path: Path) -> int:
    try:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            src = f.read()
        blocks = cc_visit(src)
        return int(sum(max(1, b.complexity) for b in blocks))  # at least 1 per block
    except Exception:
        return 0

def build_graph_for_tag(repo: GitRepo, tag: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Returns (modules_df, edges_df, metrics_df) for a given tag.
    """
    # checkout
    repo.git.checkout(tag)

    repo_root = Path(repo.working_tree_dir)
    module_by_name, namespaces = discover_repo_modules(repo_root)

    # modules
    modules_rows = []
    for mod, path in module_by_name.items():
        modules_rows.append({"tag": tag, "module": mod, "path": str(path.relative_to(repo_root))})
    modules_df = pd.DataFrame(modules_rows, columns=["tag", "module", "path"])

    # edges
    edge_set: Set[Tuple[str, str]] = set()
    files = list(module_by_name.items())
    for mod, path in tqdm(files, desc=f"Imports @{tag}", leave=False):
        imports = parse_file_imports(path, mod)
        internal_imports = filter_internal_imports(imports, namespaces, module_by_name)
        for dep in internal_imports:
            if dep != mod:
                edge_set.add((mod, dep))
    edges_df = pd.DataFrame([{"tag": tag, "src_module": s, "dst_module": d} for (s, d) in sorted(edge_set)],
                            columns=["tag", "src_module", "dst_module"])

    # metrics
    G = nx.DiGraph()
    G.add_nodes_from(modules_df["module"].tolist())
    G.add_edges_from(edge_set)

    fan_out = dict(G.out_degree())
    fan_in = dict(G.in_degree())
    # use undirected degree centrality for a simple centrality signal
    centrality = nx.degree_centrality(G.to_undirected()) if G.number_of_nodes() else {}

    # per-module cyclomatic
    cyclo_by_mod = {m: compute_cyclomatic(module_by_name[m]) for m in module_by_name}

    metrics_rows = []
    for m in modules_df["module"]:
        metrics_rows.append({
            "tag": tag,
            "module": m,
            "fan_in": int(fan_in.get(m, 0)),
            "fan_out": int(fan_out.get(m, 0)),
            "cyclomatic": int(cyclo_by_mod.get(m, 0)),
            "centrality_degree": float(centrality.get(m, 0.0)),
        })
    metrics_df = pd.DataFrame(metrics_rows,
                              columns=["tag", "module", "fan_in", "fan_out", "cyclomatic", "centrality_degree"])
    return modules_df, edges_df, metrics_df


In [4]:
# ------------------- MAIN -------------------
versions = load_versions(CURATED_DIR)
if versions.empty:
    print("No versions.csv found or empty. Run Stage 2 first.")
    sys.exit(1)


# Prepare repo
repo = GitRepo(str(REPO_PATH))
original_ref = repo.head.commit.hexsha

all_modules: List[pd.DataFrame] = []
all_edges: List[pd.DataFrame] = []
all_metrics: List[pd.DataFrame] = []

In [5]:
try:
    for _, row in tqdm(versions.iterrows(), total=len(versions), desc="Stage 3 — tags"):
        tag = str(row["tag"])
        mdf, edf, xdf = build_graph_for_tag(repo, tag)
        # attach version_id
        mdf.insert(0, "version_id", int(row["id"]))
        edf.insert(0, "version_id", int(row["id"]))
        xdf.insert(0, "version_id", int(row["id"]))
        all_modules.append(mdf)
        all_edges.append(edf)
        all_metrics.append(xdf)
finally:
    # return to original state
    repo.git.checkout(original_ref)

Stage 3 — tags: 100%|██████████| 25/25 [00:22<00:00,  1.13it/s]


In [6]:
modules = pd.concat(all_modules, ignore_index=True)
edges   = pd.concat(all_edges, ignore_index=True)
metrics = pd.concat(all_metrics, ignore_index=True)

# Integrity checks
assert modules[["version_id","tag","module"]].drop_duplicates().shape[0] == modules.shape[0]
if not edges.empty:
    assert set(edges["src_module"]).issubset(set(modules["module"]))
    assert set(edges["dst_module"]).issubset(set(modules["module"]))
assert set(metrics["module"]).issubset(set(modules["module"]))


In [7]:
# Write outputs
out_modules = CURATED_DIR / "modules.csv"
out_edges   = CURATED_DIR / "edges.csv"
out_metrics = CURATED_DIR / "metrics.csv"

atomic_write_csv(modules, out_modules)
atomic_write_csv(edges, out_edges)
atomic_write_csv(metrics, out_metrics)

# Parquet sidecars (faster for Stage 4/5)
atomic_write_parquet(modules, CURATED_DIR / "modules.parquet")
atomic_write_parquet(edges,   CURATED_DIR / "edges.parquet")
atomic_write_parquet(metrics, CURATED_DIR / "metrics.parquet")

# Log
LOGS_DIR.mkdir(parents=True, exist_ok=True)
with open(LOGS_DIR / "stage3_run.json", "w") as f:
    json.dump({"repo_slug": REPO_SLUG,
                "tags_processed": versions["tag"].tolist(),
                "module_rows": int(len(modules)),
                "edge_rows": int(len(edges)),
                "metric_rows": int(len(metrics))}, f, indent=2)

print("OK ✅ Stage 3 complete")
print(f"- modules: {len(modules):,} rows → {out_modules}")
print(f"- edges:   {len(edges):,} rows → {out_edges}")
print(f"- metrics: {len(metrics):,} rows → {out_metrics}")


OK ✅ Stage 3 complete
- modules: 18,295 rows → ..\data\fastapi\curated\modules.csv
- edges:   22,101 rows → ..\data\fastapi\curated\edges.csv
- metrics: 18,295 rows → ..\data\fastapi\curated\metrics.csv
