<a href="https://colab.research.google.com/github/lx-47/1000iq-backend/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install javalang groq agno pygithub

Collecting javalang
  Downloading javalang-0.13.0-py3-none-any.whl.metadata (805 bytes)
Collecting groq
  Downloading groq-0.22.0-py3-none-any.whl.metadata (15 kB)
Collecting agno
  Downloading agno-1.3.5-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.5/44.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pygithub
  Downloading PyGithub-2.6.1-py3-none-any.whl.metadata (3.9 kB)
Collecting pydantic-settings (from agno)
  Downloading pydantic_settings-2.9.1-py3-none-any.whl.metadata (3.8 kB)
Collecting python-dotenv (from agno)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting python-multipart (from agno)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting tomli (from agno)
  Downloading tomli-2.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting pynacl>=1.4.0 (from pygithub)
  Downloading PyNaCl-1.5.0-cp36-abi3-manylinux_

In [19]:
from google.colab import userdata
import os
github_token = userdata.get('git_token')
groq_key = userdata.get('groq_api_key')
os.environ['GROQ_API_KEY'] = groq_key

In [21]:
from typing import Dict, List, Any, Set, Optional, Tuple
import javalang

# Configurable weights for method prioritization
METHOD_PRIORITY_WEIGHTS = {
    "rest_endpoint": 30, "annotation": 5, "crud_operation": 10, "public_api": 15,
}

def extract_class_info(java_code: str, file_path: str = "unknown") -> Optional[Dict[str, Any]]:
    """Enhanced class info extractor with better error handling."""
    try:
        tree = javalang.parse.parse(java_code)
        class_info = {
            "imports": [], "rest_endpoints": [], "semantic_description": "",
            "relationships": {"depends_on": set(), "called_by": set()},
            "is_interface": False, "is_repository": False, "methods": [],
            "fields": [], "extends": [], "implements": [], "annotations": []
        }

        # Process imports
        for _, node in tree.filter(javalang.tree.Import):
            if node.path:
                class_info["imports"].append(node.path)
                if "Repository" in node.path:
                    class_info["is_repository"] = True

        # Process type declaration
        for _, node in tree:
            if isinstance(node, (javalang.tree.ClassDeclaration, javalang.tree.InterfaceDeclaration)):
                class_info.update({
                    "name": node.name,
                    "is_interface": isinstance(node, javalang.tree.InterfaceDeclaration),
                    "annotations": [ann.name for ann in getattr(node, 'annotations', [])]
                })

                # Handle extends/implements
                if isinstance(node, javalang.tree.InterfaceDeclaration):
                    class_info["extends"] = [ext.name for ext in node.extends] if node.extends else []
                else:  # ClassDeclaration
                    class_info["extends"] = [node.extends.name] if node.extends else []
                    class_info["implements"] = [impl.name for impl in node.implements] if node.implements else []

                # Class type classification
                if any(ext in class_info["extends"] for ext in ['JpaRepository', 'CrudRepository']):
                    class_info["class_type"] = "Repository"
                    # Add repository-specific dependencies
                    for ext in class_info["extends"]:
                        if '<' in ext:  # Handle generics
                            generics = ext.split('<')[1].split('>')[0].split(',')
                            class_info["relationships"]["depends_on"].update([g.strip() for g in generics])
                elif class_info["is_interface"]:
                    class_info["class_type"] = "Interface"
                else:
                    class_info["class_type"] = classify_class(node)

                class_info["semantic_description"] = generate_class_description(node, class_info["class_type"])

            # Process methods
            elif isinstance(node, javalang.tree.MethodDeclaration):
                method_info = {
                    "name": node.name,
                    "return_type": str(node.return_type) if node.return_type else "void",
                    "parameters": [(str(p.type), p.name) for p in node.parameters],
                    "annotations": [ann.name for ann in getattr(node, 'annotations', [])],
                    "purpose": infer_method_purpose(node)
                }

                if endpoint := extract_rest_endpoint(node):
                    method_info["endpoint"] = endpoint
                    class_info["rest_endpoints"].append(endpoint)

                class_info["methods"].append(method_info)

            # Process fields
            elif isinstance(node, javalang.tree.FieldDeclaration):
                for decl in node.declarators:
                    field_info = {
                        "type": str(node.type), "name": decl.name,
                        "annotations": [ann.name for ann in getattr(node, 'annotations', [])]
                    }
                    class_info["fields"].append(field_info)

                    if any(ann.name == "Autowired" for ann in getattr(node, 'annotations', [])):
                        class_info["relationships"]["depends_on"].add(str(node.type))

            # Process method calls
            elif isinstance(node, javalang.tree.MethodInvocation):
                if hasattr(node.qualifier, 'name'):
                    class_info["relationships"]["depends_on"].add(f"{node.qualifier.name}.{node.member}")
        return class_info
    except Exception:
        return None

def safe_parse_java(java_code: str, file_path: str = "unknown") -> Optional[Dict[str, Any]]:
    """Safer wrapper for parsing Java code that handles common issues."""
    if not java_code or not java_code.strip():
        return None

    try:
        # Remove Unicode BOM if present
        if java_code.startswith('\ufeff'):
            java_code = java_code[1:]

        # Try to handle multiline comments that might be malformed
        if '/*' in java_code and '*/' not in java_code:
            java_code += '*/'

        return extract_class_info(java_code, file_path)
    except Exception:
        return None

def classify_class(class_node) -> str:
    """Determine class type based on annotations."""
    annotations = {ann.name for ann in getattr(class_node, 'annotations', [])}
    if "RestController" in annotations:
        return "Controller"
    elif "Service" in annotations:
        return "Service"
    elif "Entity" in annotations:
        return "Entity"
    return "Other"

def generate_class_description(node, class_type: str) -> str:
    """Generate human-readable class description."""
    name = node.name
    if class_type == "Controller":
        return f"Handles web requests for {name.replace('Controller', '')}"
    elif class_type == "Service":
        return f"Business logic for {name.replace('Service', '')}"
    elif class_type == "Repository":
        return f"Data access for {name.replace('Repository', '')}"
    elif class_type == "Entity":
        return f"Data model for {name.replace('Entity', '')}"
    return f"{class_type} {name}"

def infer_method_purpose(method_node) -> str:
    """Infer method purpose from its signature."""
    name = method_node.name
    if name.startswith(('get', 'find')):
        return f"Retrieves {name[3:]}"
    elif name.startswith('create'):
        return f"Creates new {name[6:]}"
    elif name.startswith('update'):
        return f"Updates {name[6:]}"
    elif name.startswith('delete'):
        return f"Deletes {name[6:]}"
    return f"Performs {name} operation"

def extract_rest_endpoint(method_node) -> Optional[Dict[str, str]]:
    """Extract REST endpoint data from annotations."""
    for ann in getattr(method_node, 'annotations', []):
        if ann.name.endswith("Mapping"):
            return {
                "http_method": ann.name.replace("Mapping", "").upper(),
                "path": resolve_annotation_path(ann),
                "method_name": method_node.name
            }
    return None

def resolve_annotation_path(annotation) -> str:
    """Extract path value from mapping annotation."""
    if hasattr(annotation, "element"):
        if isinstance(annotation.element, javalang.tree.Literal):
            return annotation.element.value.strip('"')
    return "/"

def process_repository_class_only(repo_files: Dict[str, str]) -> Dict[str, Any]:
    """Process files and generate class-level embeddings and dependency graph."""
    project_summary = {}
    class_summaries = []
    failed_files = []

    # Process files sequentially
    for path, content in repo_files.items():
        try:
            summary = safe_parse_java(content, path)
            if summary and summary.get('name'):
                # Convert sets to lists for JSON serialization
                summary["relationships"] = {
                    "depends_on": list(summary["relationships"]["depends_on"]),
                    "called_by": list(summary["relationships"]["called_by"])
                }
                class_summaries.append(summary)
                project_summary[path] = summary
            else:
                failed_files.append(path)
        except Exception:
            failed_files.append(path)

    # Generate embeddings for successful parses
    embedding_data = []
    for cls in class_summaries:
        if isinstance(cls, dict) and cls.get('name'):
            try:
                embedding = _generate_class_embedding(cls, project_summary)
                embedding_data.append(embedding)
            except Exception:
                pass

    # Build dependency graph
    dependency_graph = {}
    try:
        dependency_graph = _build_dependency_graph(class_summaries)
    except Exception:
        pass

    return {
        "project_summary": project_summary,
        "class_summaries": class_summaries,
        "embedding_data": embedding_data,
        "dependency_graph": dependency_graph,
        "failed_files": failed_files,
        "stats": {
            "total_files": len(repo_files),
            "successful_files": len(project_summary),
            "failed_files": len(failed_files)
        }
    }

def _generate_class_embedding(cls: Dict[str, Any], project_summary: Dict[str, Any]) -> Dict[str, Any]:
    """Helper to generate a single class embedding."""
    try:
        method_summaries = generate_method_summaries(cls)
        endpoint_summary = generate_endpoint_summary(cls)
        relationship_summary = generate_relationship_summary(cls)
        import_summary = generate_import_summary(cls)

        return {
            "type": "class",
            "id": cls['name'],
            "text": generate_enriched_class_text(
                cls, method_summaries, endpoint_summary, relationship_summary, import_summary,
            ),
            "metadata": {
                "class_type": cls.get("class_type", "Unknown"),
                "is_interface": cls.get("is_interface", False),
                "is_repository": cls.get("is_repository", False),
                "file": next((k for k, v in project_summary.items() if v == cls), "unknown"),
            }
        }
    except Exception as e:
        return {
            "type": "class",
            "id": cls.get('name', 'unknown'),
            "text": f"Error generating embedding: {e}",
            "metadata": {"error": str(e)}
        }

def generate_method_summaries(cls: Dict[str, Any]) -> str:
    """Generate a concise summary of the most important methods in a class."""
    methods = cls.get("methods", [])
    if not methods:
        return "No methods defined."

    try:
        prioritized_methods = prioritize_methods(methods)
        top_methods = prioritized_methods[:min(5, len(prioritized_methods))]
        method_texts = []
        for method in top_methods:
            param_text = ""
            if method.get("parameters"):
                params = [f"{p_type} {p_name}" for p_type, p_name in method.get("parameters", [])]
                param_text = f" ({', '.join(params)})"
            method_text = f"{method.get('return_type', 'void')} {method['name']}{param_text}: {method.get('purpose', '')}"
            method_texts.append(method_text)
        if len(methods) > 5:
            method_texts.append(f"... and {len(methods) - 5} more methods")
        return "\n".join(method_texts)
    except Exception as e:
        return f"Error generating method summaries: {e}"

def prioritize_methods(methods: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Method prioritization using scores."""
    def score(method: Dict[str, Any]) -> int:
        try:
            return (
                (METHOD_PRIORITY_WEIGHTS["rest_endpoint"] if method.get("endpoint") else 0) +
                (METHOD_PRIORITY_WEIGHTS["annotation"] * len(method.get("annotations", []))) +
                (METHOD_PRIORITY_WEIGHTS["crud_operation"] if method["name"].startswith(("get", "create", "update", "delete")) else 0) +
                (METHOD_PRIORITY_WEIGHTS["public_api"] if "public" in str(method.get("annotations", [])) else 0)
            )
        except Exception:
            return 0
    try:
        return sorted(methods, key=score, reverse=True)
    except Exception:
        return methods

def _build_dependency_graph(class_summaries: List[Dict[str, Any]]) -> Dict[str, Set[str]]:
    """Precompute a {class_name -> {dependencies}} graph."""
    graph = {}
    for cls in class_summaries:
        try:
            name = cls.get("name")
            if not name:
                continue
            graph[name] = set()
            # Add inheritance and interface dependencies
            graph[name].update(cls.get("extends", []))
            graph[name].update(cls.get("implements", []))

            # Add explicit dependencies
            for dep in cls.get("relationships", {}).get("depends_on", []):
                try:
                    # Handle both simple and qualified names
                    dep_name = dep.split('.')[-1].split('<')[0]
                    graph[name].add(dep_name)
                except Exception:
                    pass
        except Exception:
            pass
    return graph

def generate_endpoint_summary(cls: Dict[str, Any]) -> str:
    """Generate a summary of REST endpoints if the class is a controller."""
    try:
        endpoints = cls.get("rest_endpoints", [])
        if not endpoints:
            return ""
        endpoint_texts = []
        for endpoint in endpoints[:min(5, len(endpoints))]:
            http_method = endpoint.get("http_method", "GET")
            path = endpoint.get("path", "/")
            method_name = endpoint.get("method_name", "unknown")
            endpoint_texts.append(f"{http_method} {path} → {method_name}")
        if len(endpoints) > 5:
            endpoint_texts.append(f"... and {len(endpoints) - 5} more endpoints")
        return "\n".join(endpoint_texts)
    except Exception:
        return ""

def generate_relationship_summary(cls: Dict[str, Any]) -> str:
    """Generate a summary of the class's key relationships."""
    try:
        relationships = []
        if cls.get("extends"):
            relationships.append(f"Extends: {', '.join(cls.get('extends', []))}")
        if cls.get("implements"):
            relationships.append(f"Implements: {', '.join(cls.get('implements', []))}")
        depends_on = cls.get("relationships", {}).get("depends_on", [])
        if depends_on and len(depends_on) <= 5:
            relationships.append(f"Depends on: {', '.join(depends_on)}")
        elif depends_on:
            relationships.append(f"Depends on {len(depends_on)} other classes")
        return "\n".join(relationships)
    except Exception:
        return ""

def generate_import_summary(cls: Dict[str, Any]) -> str:
    """Generate a summary of imports, grouped by package."""
    try:
        imports = cls.get("imports", [])
        if not imports:
            return ""
        grouped_imports = {}
        for imp in imports:
            parts = imp.split('.')
            if len(parts) > 1:
                pkg = '.'.join(parts[:-1])
                grouped_imports.setdefault(pkg, []).append(parts[-1])
            else:
                grouped_imports.setdefault("default", []).append(imp)
        import_texts = []
        for pkg, classes in grouped_imports.items():
            if pkg == "default":
                import_texts.append(", ".join(classes))
            else:
                import_texts.append(f"{pkg}.{{{', '.join(classes)}}}")
        return "; ".join(import_texts)
    except Exception:
        return ""

def generate_enriched_class_text(cls: Dict[str, Any], method_summaries: str, endpoint_summary: str,
                                relationship_summary: str, import_summary: str) -> str:
    """Generate enriched text representation of a class for embedding."""
    try:
        name = cls.get('name', 'Unknown')
        description = cls.get('semantic_description', '')
        class_type = cls.get('class_type', 'Class')
        is_interface = cls.get('is_interface', False)
        text_parts = [
            f"{name}: {'Interface' if is_interface else class_type} - {description}"
        ]
        if relationship_summary:
            text_parts.append("\nRelationships:")
            text_parts.append(relationship_summary)
        if import_summary:
            text_parts.append("\nImports:")
            text_parts.append(import_summary)
        if endpoint_summary:
            text_parts.append("\nREST Endpoints:")
            text_parts.append(endpoint_summary)
        text_parts.append("\nKey Methods:")
        text_parts.append(method_summaries)
        fields = cls.get("fields", [])
        if fields and len(fields) <= 5:
            field_texts = [f"{f.get('type', 'Unknown')} {f.get('name', 'unknown')}" for f in fields[:5]]
            text_parts.append("\nFields: " + ", ".join(field_texts))
        elif fields:
            text_parts.append(f"\nContains {len(fields)} fields")
        return "\n".join(text_parts)
    except Exception:
        return f"{cls.get('name', 'Unknown')}: Error generating description"

result = process_repository_class_only(java_files)

In [29]:
# for summary in result["class_summaries"]:
#     print(summary)
for embedding in result["embedding_data"]:
  for key, value in embedding.items():
    print(f"{key}: {value}")


type: class
id: TodoMapper
text: TodoMapper: Other - Other TodoMapper

Imports:
com.example.todo.dto.{TodoResponse}; com.example.todo.entity.{TodoEntity}; org.springframework.stereotype.{Component}

Key Methods:
ReferenceType(arguments=None, dimensions=[], name=TodoResponse, sub_type=None) todoResponse (ReferenceType(arguments=None, dimensions=[], name=TodoEntity, sub_type=None) todoEntity): Performs todoResponse operation
metadata: {'class_type': 'Other', 'is_interface': False, 'is_repository': False, 'file': 'src/main/java/com/example/todo/Mapper/TodoMapper.java'}
type: class
id: TodoApplication
text: TodoApplication: Other - Other TodoApplication

Imports:
org.springframework.boot.{SpringApplication}; org.springframework.boot.autoconfigure.{SpringBootApplication}

Key Methods:
void main (ReferenceType(arguments=None, dimensions=[None], name=String, sub_type=None) args): Performs main operation
metadata: {'class_type': 'Other', 'is_interface': False, 'is_repository': False, 'file': '

In [27]:
from agno.agent import Agent
from agno.models.groq import Groq
from github import Github
import os
import json
from typing import Dict, List, Optional

class GitHubRepoSummarizer:

    def __init__(self, github_token: str, repo_name: str, branch: str = "main"):
        self.github_token = github_token
        self.repo_name = repo_name
        self.branch = branch
        self.gh = Github(self.github_token)
        self.repo = self.gh.get_repo(self.repo_name)

    def fetch_groupable_folders(self, extension=".java") -> Dict[str, Dict[str, str]]:
        grouped_folders = {}

        def is_flat_folder(path):
            contents = self.repo.get_contents(path, ref=self.branch)
            return all(item.type == "file" for item in contents)

        def collect_files(path):
            contents = self.repo.get_contents(path, ref=self.branch)
            return {
                item.path: item.decoded_content.decode("utf-8")
                for item in contents
                if item.type == "file" and item.path.endswith(extension)
            }

        def recurse_dirs(path=""):
            contents = self.repo.get_contents(path, ref=self.branch)
            for item in contents:
                if item.type == "dir":
                    if is_flat_folder(item.path):
                        grouped_folders[item.path] = collect_files(item.path)
                    else:
                        recurse_dirs(item.path)

        recurse_dirs()
        return grouped_folders


file_summarizer_agent = Agent(
    name="Java Folder Summarizer",
    role="Summarize folders of Java files with architectural clarity and semantic richness.",
    model=Groq(id="gemma2-9b-it"),
    instructions="""
    Summarize the provided Java files from the same folder in a structured format.

    For each file, extract:

    1. **Name and Purpose** – Class or interface name and its core responsibility.
    2. **Key Imports and Dependencies** – External/internal dependencies and annotations.
    3. **Core Methods and Logic** – Summary of important methods and their roles.
    4. **Important Fields** – Key variables and their purpose.
    5. **Design Patterns or Role** – Patterns used and how this fits into the architecture.

    Then, provide a **combined folder-level summary** of what this group of files implements in the project.

    Return the result in a clean, structured markdown format. Avoid verbosity. Use consistent formatting for better retrieval.
    """
)


class RepoSummarizerOrchestrator:

    def __init__(self, github_token: str, repo_name: str):
        self.repo_tool = GitHubRepoSummarizer(github_token, repo_name)
        self.summarizer = file_summarizer_agent

    def generate_folder_summaries(self, extension=".java", output_file="folder_summaries.json"):
        grouped_folders = self.repo_tool.fetch_groupable_folders(extension)
        print(f"Found {len(grouped_folders)} flat folders to summarize.")

        summaries = []

        for idx, (folder_path, files) in enumerate(grouped_folders.items()):
            print(f"Summarizing folder {idx+1}/{len(grouped_folders)}: {folder_path} with {len(files)} files")

            combined_code = "\n\n".join(
                [f"// File: {file_path}\n{content}" for file_path, content in files.items()]
            )

            summary_result = self.summarizer.run(
                f"Summarize the following Java files from the same folder:\n\n```java\n{combined_code}\n```"
            )

            summaries.append({
                "id": os.path.basename(folder_path),
                "text": summary_result.content,
                "metadata": {
                    "folder_path": folder_path,
                    "file_count": len(files)
                }
            })

        with open(output_file, 'w') as f:
            json.dump(summaries, f, indent=2)

        print(f"Saved folder-level summaries to {output_file}")
        return summaries


# === Usage Example ===
if __name__ == "__main__":
    github_token = os.getenv("GITHUB_TOKEN")
    groq_api_key = os.getenv("GROQ_API_KEY")

    os.environ["GROQ_API_KEY"] = groq_api_key
    repo_name = "lx-47/todo-spring"

    orchestrator = RepoSummarizerOrchestrator(github_token, repo_name)
    summaries = orchestrator.generate_folder_summaries()

    if summaries:
        print("\nSample summary:\n")
        print(json.dumps(summaries[0], indent=2))



Request GET /repos/lx-47/todo-spring failed with 403: rate limit exceeded
INFO:github.GithubRetry:Request GET /repos/lx-47/todo-spring failed with 403: rate limit exceeded
Setting next backoff to 2174.156146s
INFO:github.GithubRetry:Setting next backoff to 2174.156146s


KeyboardInterrupt: 

In [16]:
from agno.agent import Agent
from agno.models.groq import Groq
from github import Github
import os
import json
from typing import Dict, List, Optional

class GitHubRepoSummarizer:

    def __init__(self, github_token: str, repo_name: str, branch: str = "main"):
        self.github_token = github_token
        self.repo_name = repo_name
        self.branch = branch
        self.gh = Github(self.github_token)
        self.repo = self.gh.get_repo(self.repo_name)
        self.repo_files = {}

    def fetch_all_files(self, extension=".java"):
        self._fetch_files_in_dir("", extension)
        return self.repo_files

    def _fetch_files_in_dir(self, folder_path: str, extension: str):
        contents = self.repo.get_contents(folder_path, ref=self.branch)

        for item in contents:
            if item.type == "dir":
                self._fetch_files_in_dir(item.path, extension)
            elif item.type == "file" and item.path.endswith(extension):
                try:
                    file_content = item.decoded_content.decode('utf-8')
                    self.repo_files[item.path] = file_content
                except Exception as e:
                    print(f"Error reading {item.path}: {str(e)}")

file_summarizer_agent = Agent(
    name="Java File Summarizer",
    role="Analyze Java files and create concise, informative summaries",
    model=Groq(id="gemma2-9b-it"),
    instructions="""
    Analyze the provided Java file and generate a structured summary with the following elements. Focus on semantic clarity and consistency to optimize for vector similarity retrieval:

    1. **Name and Purpose**
      - State the class or interface name.
      - Describe its core responsibility or role in 1–2 sentences.

    2. **Key Imports and Dependencies**
      - List external libraries, frameworks, or internal modules used.
      - Highlight any annotations or dependency injections.

    3. **Core Methods and Logic**
      - Name key methods and summarize their behavior.
      - Focus on what each method does, not how.

    4. **Important Fields and State**
      - List significant fields and explain their purpose briefly.
      - Mention configuration values, injected services, and internal state.

    5. **Design Patterns or Architectural Role**
      - Identify design patterns (e.g., Singleton, Controller, DTO, Factory) if present.
      - Explain how this file fits into the application's architecture (e.g., part of service layer, controller, utility, data model).

    Return the output in a **clean, structured format** . Avoid verbose explanations. Aim for clarity and semantic richness suitable for embedding-based retrieval.
    """
)

class RepoSummarizerOrchestrator:

    def __init__(self, github_token: str, repo_name: str):
        self.repo_tool = GitHubRepoSummarizer(github_token, repo_name)
        self.summarizer = file_summarizer_agent

    def generate_file_summaries(self, extension=".java", output_file="repo_summaries.json"):
        """Generate summaries for all files in the repository"""
        # Step 1: Fetch all files from the repository
        print(f"Fetching all {extension} files from {self.repo_tool.repo_name}...")
        files = self.repo_tool.fetch_all_files(extension)
        print(f"Found {len(files)} {extension} files")

        # Step 2: Generate summaries for each file
        summaries = []
        for idx, (file_path, content) in enumerate(files.items()):
            print(f"Processing file {idx+1}/{len(files)}: {file_path}")

            # Call the agent to generate a summary
            summary_result = self.summarizer.run(
                f"Please analyze and summarize this Java file:\n\n```java\n{content}\n```"
            )

            # Store the result
            summaries.append({
                "id": os.path.basename(file_path).replace(".java", ""),
                "text": summary_result.content,
                "metadata": {
                    "file_path": file_path
                }
            })

        # Step 3: Save summaries to a JSON file
        with open(output_file, 'w') as f:
            json.dump(summaries, f, indent=2)

        print(f"Generated summaries for {len(summaries)} files and saved to {output_file}")
        return summaries

# Usage example
if __name__ == "__main__":
    os.environ["GROQ_API_KEY"] = groq_api_key
    github_token = github_token
    repo_name = "lx-47/todo-spring"

    orchestrator = RepoSummarizerOrchestrator(github_token, repo_name)
    summaries = orchestrator.generate_file_summaries()

    # Print a sample summary
    if summaries:
        print("\nSample summary:")
        print(json.dumps(summaries[0], indent=2))

Request GET /repos/lx-47/todo-spring failed with 403: rate limit exceeded
INFO:github.GithubRetry:Request GET /repos/lx-47/todo-spring failed with 403: rate limit exceeded
Setting next backoff to 2561.490886s
INFO:github.GithubRetry:Setting next backoff to 2561.490886s


KeyboardInterrupt: 

In [20]:
import requests
import base64
import os

class GitTreeFetcher:
    def __init__(self, github_token: str, repo: str, branch: str = "main"):
        self.github_token = github_token
        self.repo = repo
        self.branch = branch
        self.headers = {
            "Authorization": f"Bearer {self.github_token}",
            "Accept": "application/vnd.github+json"
        }
        self.api_base = f"https://api.github.com/repos/{self.repo}"

    def fetch_tree(self):
        print(f"Fetching SHA for branch {self.branch}...")
        branch_info = requests.get(f"{self.api_base}/branches/{self.branch}", headers=self.headers)
        branch_info.raise_for_status()
        tree_sha = branch_info.json()['commit']['commit']['tree']['sha']

        print(f"Fetching full tree with SHA {tree_sha}...")
        tree_url = f"{self.api_base}/git/trees/{tree_sha}?recursive=1"
        tree_data = requests.get(tree_url, headers=self.headers)
        tree_data.raise_for_status()
        return tree_data.json()['tree']

    def fetch_java_files(self):
        tree = self.fetch_tree()
        java_files = [item for item in tree if item['path'].endswith(".java") and item['type'] == 'blob']
        print(f"Found {len(java_files)} Java files")

        files = {}
        for item in java_files:
            print(f"Downloading {item['path']}...")
            file_url = f"{self.api_base}/contents/{item['path']}?ref={self.branch}"
            res = requests.get(file_url, headers=self.headers)
            res.raise_for_status()
            content = base64.b64decode(res.json()['content']).decode("utf-8")
            files[item['path']] = content
        return files

repo_name = "lx-47/todo-spring"

fetcher = GitTreeFetcher(github_token, repo_name)
java_files = fetcher.fetch_java_files()
for path, content in java_files.items():
  print(f"\n=== {path} ===\n")
  print(content[:300], "...\n")


Fetching SHA for branch main...
Fetching full tree with SHA ca93c253c9605ec9c8af51a69f3221355a71bdd0...
Found 11 Java files
Downloading src/main/java/com/example/todo/Mapper/TodoMapper.java...
Downloading src/main/java/com/example/todo/TodoApplication.java...
Downloading src/main/java/com/example/todo/controller/TodoController.java...
Downloading src/main/java/com/example/todo/dto/TodoRequest.java...
Downloading src/main/java/com/example/todo/dto/TodoResponse.java...
Downloading src/main/java/com/example/todo/entity/TodoEntity.java...
Downloading src/main/java/com/example/todo/exception/GlobalExceptionHandler.java...
Downloading src/main/java/com/example/todo/exception/TodoNotFoundException.java...
Downloading src/main/java/com/example/todo/repository/TodoRepository.java...
Downloading src/main/java/com/example/todo/service/TodoService.java...
Downloading src/test/java/com/example/todo/TodoApplicationTests.java...

=== src/main/java/com/example/todo/Mapper/TodoMapper.java ===

package 