```
User Query
    ↓
Navigator Agent  ──► (file paths) ──► Fetcher Agent ──► (file content)
                                                            │
Chunker Agent ◀──────────────────────────────────────────────┘
    ↓
Embedder Agent
    ↓
ChromaDB
```


In [1]:
!pwd

/Users/sumi/Desktop/GitSurfer/Notebooks


In [2]:
import os

os.chdir("../")

!pwd

/Users/sumi/Desktop/GitSurfer


In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
os.environ["GEMINI_API_KEY"]=os.getenv("GOOGLE_API_KEY")


In [8]:
import os
import aiohttp
import asyncio
import base64
import json

from typing import List, Tuple, Dict, Any, Optional, Annotated
from operator import add
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph
from langchain_core.runnables import Runnable
from langchain_core.messages import AIMessage, HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI

from logger import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Auth headers
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
AUTH_HEADERS = {
    "Authorization": f"token {GITHUB_TOKEN}",
    "Accept": "application/vnd.github.v3+json"
}

# Retry helper
async def retry_async(func, retries=3, backoff_in_seconds=1, *args, **kwargs):
    for attempt in range(retries):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            logger.warning(f"Attempt {attempt+1} failed with error: {e}")
            if attempt == retries - 1:
                raise
            await asyncio.sleep(backoff_in_seconds * 2 ** attempt)

# --- Node 1: Fetch Tree ---
async def fetch_github_tree(owner: str, repo: str, branch: str = "main", session: Optional[aiohttp.ClientSession] = None) -> List[Tuple[str, str]]:
    url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1"
    close_session = False
    if session is None:
        session = aiohttp.ClientSession()
        close_session = True
    try:
        async with session.get(url, headers=AUTH_HEADERS) as response:
            if response.status != 200:
                text = await response.text()
                raise Exception(f"Error fetching tree: {response.status} — {text}")
            data = await response.json()
            return [(item["path"], item["type"]) for item in data["tree"]]
    finally:
        if close_session:
            await session.close()

class FetchTreeNode(Runnable):
    def invoke(self, state, config=None):
        return asyncio.run(self.ainvoke(state, config))
    
    async def ainvoke(self, state, config=None):
        owner = state.owner
        repo = state.repo
        branch = getattr(state, "branch", "main")
        async with aiohttp.ClientSession() as session:
            tree_data = await retry_async(fetch_github_tree, owner=owner, repo=repo, branch=branch, session=session)
        logger.info(f"Fetched tree with {len(tree_data)} items")
        return {"tree": tree_data}

# --- Node 2: Process Tree with LLM ---
class SummarizeTreeNode(Runnable):
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", google_api_key=os.getenv("GOOGLE_API_KEY"))

    def invoke(self, state, config=None):
        return asyncio.run(self.ainvoke(state, config))
    
    async def ainvoke(self, state, config=None):
        tree_items = state.tree
        # Build tree text, limit length to avoid token overflow
        tree_text = "\n".join(f"- {p}" for p, t in tree_items)
        max_length = 3000  # adjust based on token limits
        if len(tree_text) > max_length:
            tree_text = tree_text[:max_length] + "\n... (truncated)"
        prompt = f"""
        You are a helpful assistant. Analyze the following list of file paths and produce a JSON structure 
        that represents the file/folder hierarchy. Do not include metadata, just the structure.
        Ensure that you do not include ```json``` in your response.

        FILE TREE:
        {tree_text}
        """
        response = await self.llm.ainvoke([HumanMessage(content=prompt)])
        # Try to parse JSON safely
        try:
            tree_summary = json.loads(response.content)
        except Exception:
            logger.warning("LLM response is not valid JSON, saving raw content")
            tree_summary = response.content
        with open("tree.json", "w", encoding="utf-8") as f:
            if isinstance(tree_summary, str):
                f.write(tree_summary)
            else:
                json.dump(tree_summary, f, indent=2)
        return {"tree_summary": tree_summary}

# --- Node 3: Fetch All Files in Parallel ---
async def fetch_file_content(owner: str, repo: str, path: str, branch: str = "main", session: Optional[aiohttp.ClientSession] = None) -> Dict[str, Any]:
    url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}"
    close_session = False
    if session is None:
        session = aiohttp.ClientSession()
        close_session = True
    try:
        async with session.get(url, headers=AUTH_HEADERS) as resp:
            if resp.status != 200:
                logger.warning(f"Failed to fetch {path}: HTTP {resp.status}")
                return {"path": path, "error": f"{resp.status}"}
            data = await resp.json()
            if data.get("encoding") == "base64":
                content = base64.b64decode(data["content"]).decode("utf-8", errors="ignore")
            else:
                content = data.get("content", "")
            return {"path": path, "content": content}
    finally:
        if close_session:
            await session.close()

async def fetch_all_files_by_path(owner: str, repo: str, paths: List[str], branch: str = "main", max_concurrency: int = 10) -> List[Dict[str, Any]]:
    sem = asyncio.Semaphore(max_concurrency)
    async with aiohttp.ClientSession() as session:
        async def limited_fetch(path):
            async with sem:
                return await retry_async(fetch_file_content, owner=owner, repo=repo, path=path, branch=branch, session=session)
        results = await asyncio.gather(*(limited_fetch(p) for p in paths))
    # Filter out errors or empty content
    filtered = [res for res in results if res.get("content")]
    logger.info(f"Fetched {len(filtered)} files with content")
    return filtered

class FetchFilesNode(Runnable):
    def invoke(self, state, config=None):
        return asyncio.run(self.ainvoke(state, config))
    
    async def ainvoke(self, state, config=None):
        owner = state.owner
        repo = state.repo
        branch = getattr(state, "branch", "main")
        paths = [p for p, t in state.tree if t == "blob"]
        results = await fetch_all_files_by_path(owner, repo, paths, branch)
        with open("chunks_raw.json", "w", encoding="utf-8") as f:
            json.dump(results, f, indent=2)
        return {"files": results}

# --- Define State Schema ---
class MyState(BaseModel):
    owner: str
    repo: str
    branch: str = "main"
    tree: List[Tuple[str, str]] = Field(default_factory=list)
    files: Annotated[List[Dict[str, Any]], add] = Field(default_factory=list)

# --- Define LangGraph ---
graph = StateGraph(state_schema=MyState)
graph.add_node("fetch_tree", FetchTreeNode())
graph.add_node("summarize_tree", SummarizeTreeNode())
graph.add_node("fetch_files", FetchFilesNode())

# Parallel execution after tree fetch
graph.add_edge("fetch_tree", "summarize_tree")
graph.add_edge("fetch_tree", "fetch_files")

graph.set_entry_point("fetch_tree")
graph.set_finish_point("fetch_files")

flow = graph.compile()
#flow




In [9]:
# Example run (async)
async def main():
    initial_state = {"owner": "kumar8074", "repo": "NOVA-AI", "branch": "main"}
    result = await flow.ainvoke(initial_state)
    logger.info("Flow completed")
    # result contains final state including files
    print(result)

await main()

{'owner': 'kumar8074', 'repo': 'NOVA-AI', 'branch': 'main', 'tree': [('.devcontainer', 'tree'), ('.devcontainer/devcontainer.json', 'blob'), ('.gitignore', 'blob'), ('LICENSE', 'blob'), ('Notebooks', 'tree'), ('Notebooks/1-nova-basic.ipynb', 'blob'), ('README.md', 'blob'), ('__init__.py', 'blob'), ('app.py', 'blob'), ('config.py', 'blob'), ('models', 'tree'), ('models/__init__.py', 'blob'), ('models/llm.py', 'blob'), ('nova.py', 'blob'), ('prompts', 'tree'), ('prompts/__init__.py', 'blob'), ('prompts/templates.py', 'blob'), ('requirements.txt', 'blob'), ('utils', 'tree'), ('utils/__init__.py', 'blob'), ('utils/document_utils.py', 'blob'), ('utils/search_utils.py', 'blob'), ('utils/ui_utils.py', 'blob'), ('voicebot.py', 'blob')], 'files': [{'path': '.devcontainer/devcontainer.json', 'content': '{\n  "name": "Python 3",\n  // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile\n  "image": "mcr.microsoft.com/devcontainers/python:1-3.11-bullseye",