In [None]:
import os
import re
import time
import math
import json
import hashlib
import requests
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from tqdm import tqdm
from git import Repo
import concurrent.futures
import threading

In [None]:
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "your_token")
HEADERS = {"Authorization": f"token {GITHUB_TOKEN}"}
PRINT_LOCK = threading.Lock()

BASE_OUTPUT_DIR = "kmp_analysis"
os.makedirs(BASE_OUTPUT_DIR, exist_ok=True)

In [None]:
def list_kotlin_files_recursive(owner, repo, path=""):
    """Recursively list all Kotlin (.kt) files in a GitHub repository."""
    url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
    resp = requests.get(url, headers=HEADERS)

    if resp.status_code == 403:
        print("Rate limit hit, waiting 1 hour...")
        time.sleep(3600)
        resp = requests.get(url, headers=HEADERS)

    if resp.status_code != 200:
        print(f"Failed to access {url}: {resp.status_code}")
        return []

    files = []
    for item in resp.json():
        if item["type"] == "file" and item["name"].endswith(".kt"):
            files.append(item["path"])
        elif item["type"] == "dir":
            files.extend(list_kotlin_files_recursive(owner, repo, item["path"]))
    return files

In [None]:
def extract_classes(content):
    """Extract Kotlin class/interface/object names."""
    pattern = r"(?:public|internal|private|open|abstract|sealed)?\s*(?:data\s+)?(class|interface|object|sealed class|abstract class)\s+(\w+)"
    return [m[1] for m in re.findall(pattern, content)]

In [None]:
def get_file_role(file_name, file_content, file_path=""):
    """Identify the Kotlin file’s role in architecture (MVVM, MVI, VIPER, etc.)."""
    name_lower = file_name.lower()
    path_lower = file_path.lower()
    content_lower = file_content.lower()

    # --- Tests ---
    if any(x in path_lower for x in ["test", "unittest", "iostest"]):
        return "Tests"

    # --- ViewModel ---
    if "viewmodel" in name_lower or re.search(r"class\s+\w+viewmodel", content_lower):
        return "ViewModel"
    if "viewmodels" in path_lower and "state" not in name_lower:
        return "ViewModel"

    # --- View ---
    if any(x in name_lower for x in ["activity", "fragment", "screen", "view"]) \
        or any(x in content_lower for x in ["@composable", "setcontent", "androidx.compose"]) \
        or "ui." in path_lower:
        return "View"

    # --- State ---
    if any(x in name_lower for x in ["state", "ui_state", "ui_event"]) \
        or re.search(r"data\s+class\s+\w+state", content_lower):
        return "State"

    # --- UseCase / Contract ---
    if any(x in name_lower for x in ["usecase", "use_case", "contract"]):
        return "UseCase"

    # --- Interactor ---
    if "interactor" in name_lower or "interactor" in path_lower or re.search(r"class\s+\w+interactor", content_lower):
        return "Interactor"
    if any(x in name_lower for x in ["mapper", "controller", "manager"]):
        return "Interactor"

    # --- Repository ---
    if any(x in name_lower for x in ["repository", "dao", "service", "datastore", "database"]):
        return "Repository"
    if any(x in content_lower for x in ["retrofit", "room.database", "insert(", "query", "suspend fun get"]):
        return "Repository"

    # --- Model ---
    if any(x in name_lower for x in ["model", "entity", "dto"]) or "data class" in content_lower:
        if not any(x in path_lower for x in ["ui", "feature", "view"]):
            return "Model"

    # --- Presenter ---
    if "presenter" in name_lower or re.search(r"class\s+\w+presenter", content_lower):
        return "Presenter"

    # --- Router ---
    if any(x in name_lower for x in ["router", "navigator", "navcontroller", "navigation"]) \
        or any(x in content_lower for x in ["navcontroller", "navigation"]) \
        or "decompose" in path_lower or "component" in name_lower:
        return "Router"

    # --- Dependency Injection ---
    if any(x in name_lower for x in ["module", "provider", "factory"]) or "koin" in content_lower:
        return "DI"

    # --- Utility ---
    if any(x in name_lower for x in ["util", "helper", "extensions", "logger", "eventbus"]):
        return "Utility"

    # --- Intent (MVI) ---
    if "intent" in name_lower or re.search(r"sealed\s+class\s+\w+intent", content_lower):
        return "Intent"

    # --- Fallback ---
    if "model" in path_lower:
        return "Model"

    return "Unknown"



In [None]:
def build_dependency_graph(repo_files):
    """Build a dependency graph from Kotlin files."""
    G = nx.DiGraph()
    file_data = []

    # Nodes
    for file_path, content in repo_files.items():
        classes = extract_classes(content)
        role = get_file_role(os.path.basename(file_path), content, file_path)

        if not classes:
            node_name = os.path.basename(file_path).replace(".kt", "")
            G.add_node(node_name, role=role, path=file_path)
            file_data.append({"file": file_path, "role": role})
        else:
            for cls in classes:
                G.add_node(cls, role=role, path=file_path)
                file_data.append({"file": file_path, "role": role})

    # Edges
    for src_path, src_content in repo_files.items():
        src_classes = extract_classes(src_content)
        src_names = src_classes or [os.path.basename(src_path).replace(".kt", "")]
        for dst_path, dst_content in repo_files.items():
            dst_classes = extract_classes(dst_content)
            dst_names = dst_classes or [os.path.basename(dst_path).replace(".kt", "")]
            for src_name in src_names:
                for dst_name in dst_names:
                    if src_name != dst_name and re.search(rf"\b{dst_name}\b", src_content):
                        G.add_edge(src_name, dst_name)
    return G, file_data

In [None]:
def infer_architecture(file_data, G, repo_files):
    """Detects architectural pattern (MVI, MVVM, Clean, VIPER, etc.)."""
    ROLE_ALIAS = {
        "Service": "Repository",
        "Mapper": "Model",
        "State": "State",
        "DI": "Repository",
        "Utility": "Unknown",
        "Intent": "Intent"
    }
    role_map = {n: ROLE_ALIAS.get(d.get("role"), d.get("role")) for n, d in G.nodes(data=True)}

    # VIPER
    if all(r in role_map.values() for r in ["View", "Presenter", "Interactor", "Router"]):
        return "VIPER", "Detected View–Presenter–Interactor–Router pattern."

    # Clean Architecture
    for uc in [n for n, r in role_map.items() if r == "UseCase"]:
        for repo in G.successors(uc):
            if role_map.get(repo) == "Repository":
                for vm in G.successors(repo):
                    if role_map.get(vm) == "ViewModel":
                        return "Clean", "Detected UseCase → Repository → ViewModel chain."

    # MVVM (Decompose)
    decompose_files = [f for f in repo_files if "decompose" in f.lower()]
    if decompose_files:
        has_component = any(f.lower().endswith(("component.kt", "componentimpl.kt")) for f in decompose_files)
        has_view = any("ui/" in f.lower() or "screen" in f.lower() or "view" in f.lower() for f in repo_files)
        has_repository = any("repository" in f.lower() or "api" in f.lower() or "datasource" in f.lower() for f in repo_files)
        if has_component and has_view and has_repository:
            return "MVVM", (
                "Detected Decompose components mediating between View and Repository layers, "
                "acting as ViewModel-like structures in an MVVM architecture."
            )

    # MVVM
    for view in [n for n, r in role_map.items() if r == "View"]:
        for vm in G.successors(view):
            if role_map.get(vm) == "ViewModel":
                for repo in G.successors(vm):
                    if role_map.get(repo) in ["Repository", "Model"]:
                        return "MVVM", "Detected View → ViewModel → Repository/Model chain."

    # MVI
    for view in [n for n, r in role_map.items() if r == "View"]:
        for state_holder in G.successors(view):
            if role_map.get(state_holder) in ["ViewModel", "StateHolder", "State"]:
                for state in G.successors(state_holder):
                    if role_map.get(state) in ["State", "Model"]:
                        return "MVI", "Detected View → ViewModel/StateHolder → State/Model chain."

    # MVP
    for view in [n for n, r in role_map.items() if r == "View"]:
        for pres in G.successors(view):
            if role_map.get(pres) == "Presenter":
                for model in G.successors(pres):
                    if role_map.get(model) == "Model":
                        return "MVP", "Detected View → Presenter → Model chain."

    # Fallbacks
    if any(r == "View" for r in role_map.values()) and any(r == "ViewModel" for r in role_map.values()):
        return "MVVM", "Detected both View and ViewModel layers (implicit MVVM structure)."
    if any(r == "View" for r in role_map.values()) and any(r == "Model" for r in role_map.values()):
        return "MVC", "Detected View–Model layers only."

    return "Unknown", "No consistent architectural pattern found."

In [None]:
def get_default_branch(owner, repo):
    """Get default branch of repo."""
    url = f"https://api.github.com/repos/{owner}/{repo}"
    resp = requests.get(url, headers=HEADERS)
    return resp.json().get("default_branch", "main") if resp.status_code == 200 else "main"

In [None]:
def visualize_graph(G, save_path, title="Dependency Graph"):
    role_colors = {
        "View": "#87CEFA",
        "ViewModel": "#90EE90",
        "Model": "#FFA500",
        "Repository": "#EE82EE",
        "Presenter": "#FFFF66",
        "Interactor": "#FFB6C1",
        "Router": "#A52A2A",
        "UseCase": "#00FFFF",
        "Unknown": "#D3D3D3",
    }

    filtered_nodes = [n for n, d in G.nodes(data=True) if d.get("role") in role_colors]
    H = G.subgraph(filtered_nodes).copy()

    # Agrupa por role
    layers = {}
    for node, data in H.nodes(data=True):
        role = data.get("role", "Unknown")
        layers.setdefault(role, []).append(node)

    # Posiciona os nós por camada
    pos = {}
    y_step, x_spacing = 3.0, 2.0
    for i, (role, nodes) in enumerate(layers.items()):
        max_per_row = 10
        n_rows = math.ceil(len(nodes) / max_per_row)
        y_base = -i * y_step
        for r in range(n_rows):
            row_nodes = nodes[r * max_per_row:(r + 1) * max_per_row]
            x_start = -len(row_nodes) / 2 * x_spacing
            for j, node in enumerate(row_nodes):
                pos[node] = (x_start + j * x_spacing, y_base - r * 1.0)

    plt.figure(figsize=(22, 14))
    node_colors = [role_colors.get(H.nodes[n]["role"], "#D3D3D3") for n in H.nodes()]
    nx.draw_networkx_nodes(H, pos, node_color=node_colors, node_size=900, edgecolors="black", linewidths=1.2)
    nx.draw_networkx_labels(H, pos, font_size=7, font_color="black", font_weight="bold")
    nx.draw_networkx_edges(H, pos, arrows=True, arrowstyle="-|>", arrowsize=10, edge_color="gray", alpha=0.4)

    plt.gca().set_facecolor("#F5F5F5")
    plt.title(title, fontsize=16, fontweight="bold", color="darkblue")
    plt.axis("off")

    # ✅ Corrigido: legenda compatível com matplotlib moderno
    handles = [mpatches.Patch(color=color, label=role) for role, color in role_colors.items()]
    labels = [role for role in role_colors.keys()]
    plt.legend(handles=handles, labels=labels, loc="upper right", fontsize=8, frameon=True, facecolor="white")

    plt.savefig(save_path, bbox_inches="tight", dpi=300)
    plt.close()



In [None]:
def analyze_repo(owner, repo):

    repo_hash = hashlib.md5(f"{owner}/{repo}".encode()).hexdigest()
    repo_dir = os.path.join(BASE_OUTPUT_DIR, repo_hash)
    if os.path.exists(repo_dir):
        print(f"Ignore {owner}/{repo}: folder already exists ({repo_hash}).")
        return None

    """Analyze Kotlin repo if it contains MainActivity.kt."""
    print(f"Checking {owner}/{repo} for MainActivity.kt...")
    kotlin_files = list_kotlin_files_recursive(owner, repo)
    if not any("mainactivity.kt" in f.lower() for f in kotlin_files):
        print(f"Skipping {owner}/{repo}: no MainActivity.kt found.")
        return None

    print(f"Found MainActivity.kt, analyzing {owner}/{repo}...")
    default_branch = get_default_branch(owner, repo)

    repo_files = {}
    for path in tqdm(kotlin_files, desc=f"Fetching {repo}"):
        url = f"https://raw.githubusercontent.com/{owner}/{repo}/{default_branch}/{path}"
        resp = requests.get(url, headers=HEADERS)
        if resp.status_code == 200:
            repo_files[path] = resp.text
        elif resp.status_code == 403:
            print("Rate limit hit, waiting 1 hour...")
            time.sleep(3600)
        elif resp.status_code == 401:
            print(f"Unauthorized access to {owner}/{repo}.")
            return None

    if not repo_files:
        print(f"No Kotlin files found in {owner}/{repo}.")
        return None

    G, file_data = build_dependency_graph(repo_files)
    arch, reason = infer_architecture(file_data, G, repo_files)

    repo_hash = hashlib.md5(f"{owner}/{repo}".encode()).hexdigest()
    repo_dir = os.path.join(BASE_OUTPUT_DIR, repo_hash)
    os.makedirs(repo_dir, exist_ok=True)

    clone_path = os.path.join(repo_dir, "repo_clone")
    if not os.path.exists(clone_path):
        try:
            Repo.clone_from(f"https://github.com/{owner}/{repo}.git", clone_path)
        except Exception as e:
            print(f"Could not clone {repo}: {e}")

    roles = ["View", "ViewModel", "Model", "Repository", "Presenter",
             "Interactor", "Router", "UseCase", "Unknown"]
    roles_dict = {r: sorted({f["file"] for f in file_data if f["role"] == r}) for r in roles}

    json_data = {"RepoURL": f"https://github.com/{owner}/{repo}",
                 "Architecture": arch, "Reason": reason, **roles_dict}

    with open(os.path.join(repo_dir, "analysis.json"), "w", encoding="utf-8") as f:
        json.dump(json_data, f, indent=4, ensure_ascii=False)

    # Update summary
    summary_path = os.path.join(BASE_OUTPUT_DIR, "summary.json")
    summary = json.load(open(summary_path)) if os.path.exists(summary_path) else {}
    summary.setdefault(arch, {})[repo_hash] = {"owner": owner, "repo": repo}
    json.dump(summary, open(summary_path, "w"), indent=4, ensure_ascii=False)

    visualize_graph(G, os.path.join(repo_dir, "graph.png"), f"{owner}/{repo} ({arch})")

    print(f"Finished {owner}/{repo} → {arch}")
    return arch

In [None]:
def fetch_all_repositories_unlimited(
    queries=[
        "topic:compose language:Kotlin", "topic:kmp language:Kotlin", "topic:compose-multiplatform language:kotlin", "topic:Kotlin-multiplatform language:Kotlin"
    ]
):
    print("Iniciando busca completa de repositórios...")

    repos = []
    seen = set()

    for q_index, query in enumerate(queries):
        print(f"\nProcessando QUERY {q_index+1}/{len(queries)} → {query}")

        last_star_count = None
        page = 1

        while True:
            url = (
                f"https://api.github.com/search/repositories"
                f"?q={query}+stars:<={last_star_count if last_star_count else 999999}"
                f"&sort=stars&order=desc&per_page=100&page={page}"
            )

            resp = requests.get(url, headers=HEADERS)

            if resp.status_code == 403:
                print("Rate limit atingido — aguardando 60s...")
                time.sleep(60)
                continue

            if resp.status_code != 200:
                print(f"Erro {resp.status_code}: {resp.text}")
                break  # passa para a próxima query

            items = resp.json().get("items", [])
            if not items:
                print("➡️ Nenhum item restante nesta query, indo para a próxima…")
                break

            for repo in items:
                full_name = repo["full_name"]
                if full_name not in seen:
                    seen.add(full_name)
                    owner, name = full_name.split("/")
                    repos.append((owner, name))

            last_star_count = items[-1]["stargazers_count"]
            print(f"Página {page} processada — total acumulado: {len(repos)}")

            page += 1
            time.sleep(1)

    print(f"\nFINALIZADO → {len(repos)} repositórios coletados.")
    return repos

In [None]:
def analyze_single_repo(idx, total, owner, repo):
    """
    To analyse a repo with threads
    """
    with PRINT_LOCK:
        print(f"\n[{idx}/{total}]Checking: {owner}/{repo}")

    try:
        arch = analyze_repo(owner, repo)

        with PRINT_LOCK:
            print(f"Result → {arch}\n")

    except Exception as e:
        with PRINT_LOCK:
            print(f"Error to check {owner}/{repo}: {e}")



In [None]:
def run_full_analysis(max_threads=8):
    """
    Run using 4 to 8 threads
    """
    repos_to_analyze = fetch_all_repositories_unlimited()

    if not repos_to_analyze:
        print("Any repository found")
        return

    total = len(repos_to_analyze)
    print(f"\nchecking {total} repos with {max_threads} threads...\n")

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = []

        for idx, (owner, repo) in enumerate(repos_to_analyze, start=1):
            futures.append(
                executor.submit(analyze_single_repo, idx, total, owner, repo)
            )

        concurrent.futures.wait(futures)

    print("\nFinish!\n")