In [1]:
import ast
import os
import networkx as nx
from collections import defaultdict


def is_local_module(module_name: str, code_dir: str) -> bool:
    module_name = module_name.replace("code_base.", "")  # ✅ Strip project root
    rel_path = module_name.replace(".", os.sep) + ".py"
    module_path = os.path.join(code_dir, rel_path)
    return os.path.isfile(module_path)


def make_node_id(filename: str, func: str) -> str:
    """Standardized graph node ID: <filename>:<function>"""
    return f"{os.path.basename(filename)}:{func}"

def strip_base_module_prefix(module_name: str, code_dir: str):
    abs_code_dir = os.path.abspath(code_dir)
    for root, _, files in os.walk(abs_code_dir):
        for f in files:
            if f.endswith(".py"):
                module_guess = os.path.splitext(f)[0]
                if module_guess == module_name or module_guess.endswith(f".{module_name}"):
                    return module_guess
    return module_name



class CodeAnalyzer(ast.NodeVisitor):
    def __init__(self, filename):
        self.filename = filename
        self.imports = {}  # alias -> module:function or module
        self.func_defs = {}  # function name -> FunctionDef
        self.func_calls = defaultdict(list)  # function -> list of called names
        self.var_usage = defaultdict(set)  # function -> set of variable names

    def visit_ImportFrom(self, node):
        module = node.module
        for alias in node.names:
            self.imports[alias.asname or alias.name] = f"{module}:{alias.name}"

    def visit_Import(self, node):
        for alias in node.names:
            self.imports[alias.asname or alias.name] = alias.name

    def visit_FunctionDef(self, node):
        func_name = node.name
        self.func_defs[func_name] = node
        for inner in ast.walk(node):
            if isinstance(inner, ast.Call):
                if isinstance(inner.func, ast.Name):
                    self.func_calls[func_name].append(inner.func.id)
                elif isinstance(inner.func, ast.Attribute):
                    self.func_calls[func_name].append(inner.func.attr)
            elif isinstance(inner, ast.Name):
                self.var_usage[func_name].add(inner.id)
        self.generic_visit(node)


def build_code_graph(code_dir: str) -> nx.DiGraph:
    graph = nx.DiGraph()
    code_dir = os.path.abspath(code_dir)
    all_py_files = [
        os.path.join(code_dir, f)
        for f in os.listdir(code_dir)
        if f.endswith(".py")
    ]

    analyzers = {}
    for file_path in all_py_files:
        with open(file_path, "r") as f:
            tree = ast.parse(f.read(), filename=file_path)
        analyzer = CodeAnalyzer(file_path)
        analyzer.visit(tree)
        analyzers[file_path] = analyzer

    for file_path, analyzer in analyzers.items():
        file_name = os.path.basename(file_path)

        # Add function nodes
        for func_name in analyzer.func_defs:
            node_id = make_node_id(file_name, func_name)
            graph.add_node(
                node_id,
                file=file_name,
                func=func_name,
                variables=list(analyzer.var_usage.get(func_name, [])),
                external=False,
            )

        # Add edges for function calls
        for caller, callees in analyzer.func_calls.items():
            caller_node = make_node_id(file_name, caller)
            for callee in callees:
                callee_node = None
                edge_type = "calls"

                imported = analyzer.imports.get(callee)
                if imported and ":" in imported:
                    mod, func = imported.split(":")
                    mod = strip_base_module_prefix(mod, code_dir)
                    if is_local_module(mod, code_dir):
                        callee_node = make_node_id(f"{mod}.py", func)
                    else:
                        print(f"⚠️ Warning: `{callee}` imported from `{mod}`, not found locally. Marking as external.")
                        callee_node = f"<external>:{func}"
                        edge_type = "calls_external"
                        graph.add_node(callee_node, func=func, file="<external>", external=True)
                elif imported:
                    # e.g., imported = "numpy"
                    callee_node = f"<external>:{callee}"
                    edge_type = "calls_external"
                    graph.add_node(callee_node, func=callee, file="<external>", external=True)
                else:
                    # Assume it's a local call in the same file
                    callee_node = make_node_id(file_name, callee)

                if callee_node:
                    if not graph.has_node(callee_node):
                        graph.add_node(
                            callee_node,
                            func=callee,
                            file=callee_node.split(":")[0],
                            external=callee_node.startswith("<external>"),
                        )
                    graph.add_edge(caller_node, callee_node, type=edge_type)

    return graph


def explain_variable_relation(graph: nx.DiGraph, var1: str, var2: str):
    nodes_var1 = [n for n, d in graph.nodes(data=True) if var1 in d.get("variables", [])]
    nodes_var2 = [n for n, d in graph.nodes(data=True) if var2 in d.get("variables", [])]

    if not nodes_var1:
        print(f"❌ Variable `{var1}` not found in any function.")
    if not nodes_var2:
        print(f"❌ Variable `{var2}` not found in any function.")
    if not nodes_var1 or not nodes_var2:
        return

    print(f"\n🔍 `{var1}` found in: {nodes_var1}")
    print(f"🔍 `{var2}` found in: {nodes_var2}")

    found_path = False
    for src in nodes_var1:
        for tgt in nodes_var2:
            if nx.has_path(graph, src, tgt):
                path = nx.shortest_path(graph, src, tgt)
                print(f"\n📈 Relationship path from `{var1}` to `{var2}`:")
                for i in range(len(path) - 1):
                    a, b = path[i], path[i + 1]
                    etype = graph.edges[a, b].get("type", "")
                    print(f"  {a} --[{etype}]--> {b}")
                found_path = True

    if not found_path:
        print("\n❗ No path found between the functions using the two variables.")



In [2]:
code_dir = "./code_base"  # Your code folder
graph = build_code_graph(code_dir)

# Show graph structure
print("\n📦 Nodes:")
for n, d in graph.nodes(data=True):
    print(f"{n} -> {d}")

print("\n🔁 Edges:")
for u, v, d in graph.edges(data=True):
    print(f"{u} --[{d['type']}]--> {v}")

# Variable trace across the graph
explain_variable_relation(graph, "x", "result")




📦 Nodes:
a.py:add -> {'file': 'a.py', 'func': 'add', 'variables': ['y', 'x'], 'external': False}
c.py:display -> {'file': 'c.py', 'func': 'display', 'variables': ['print', 'val'], 'external': False}
c.py:print -> {'func': 'print', 'file': 'c.py', 'external': False}
b.py:calc -> {'file': 'b.py', 'func': 'calc', 'variables': ['result', 'add', 'a', 'b'], 'external': False}
code_base.a.py:add -> {'func': 'add', 'file': 'code_base.a.py', 'external': False}

🔁 Edges:
c.py:display --[calls]--> c.py:print
b.py:calc --[calls]--> code_base.a.py:add

🔍 `x` found in: ['a.py:add']
🔍 `result` found in: ['b.py:calc']

❗ No path found between the functions using the two variables.


In [None]:
pip install scikit-learn numpy sentence-transformers

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import numpy as np

# Example code chunks (function-level or file snippets)
code_chunks = [
    "def load_config(path):\n    with open(path) as f:\n        return json.load(f)",
    "config = load_config('settings.json')",
    "def process_data(data):\n    return [d for d in data if d is not None]",
    "# This function processes the user data and removes null entries",
    "def main():\n    data = fetch_data()\n    clean_data = process_data(data)"
]

# Your input query
query = "explain the variable config"

# ---------------------
# 1. Sparse BM25-style retrieval
# ---------------------
vectorizer = TfidfVectorizer()
sparse_matrix = vectorizer.fit_transform(code_chunks)
sparse_query_vec = vectorizer.transform([query])
bm25_scores = cosine_similarity(sparse_query_vec, sparse_matrix)[0]  # cosine ~ BM25 for short texts

# ---------------------
# 2. Dense embedding retrieval
# ---------------------
model = SentenceTransformer("all-MiniLM-L6-v2")  # You can replace with CodeBERT or CodeT5 for better code results
chunk_embeddings = model.encode(code_chunks, convert_to_tensor=False)
query_embedding = model.encode([query], convert_to_tensor=False)[0]
dense_scores = cosine_similarity([query_embedding], chunk_embeddings)[0]

# ---------------------
# 3. Score Fusion (normalize + blend)
# ---------------------
bm25_norm = (bm25_scores - np.min(bm25_scores)) / (np.max(bm25_scores) - np.min(bm25_scores) + 1e-8)
dense_norm = (dense_scores - np.min(dense_scores)) / (np.max(dense_scores) - np.min(dense_scores) + 1e-8)

alpha = 0.5  # weight between dense and sparse
hybrid_scores = alpha * dense_norm + (1 - alpha) * bm25_norm

# ---------------------
# 4. Get Top-K Chunks
# ---------------------
top_k = 3
top_indices = np.argsort(hybrid_scores)[::-1][:top_k]
for i in top_indices:
    print(f"Score: {hybrid_scores[i]:.4f}")
    print(code_chunks[i])
    print("-" * 50)
