In [None]:
%pip install google-adk -q

print("Installation complete.")

In [22]:
import os
import asyncio

import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")

import logging
logging.basicConfig(level=logging.ERROR)

print("Libraries imported.")

Libraries imported.


In [37]:
import os 
from dotenv import load_dotenv
import google.genai as genai

load_dotenv()  

api_key = os.getenv("GOOGLE_API_KEY")

if not api_key:
    raise ValueError("GOOGLE_API_KEY not found in environment variables.")
genai.Client(api_key=api_key)

MODEL_GEMINI_2_0_FLASH = "gemini-2.5-flash-lite-preview-06-17"
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")

In [38]:
from jwt import decode
from regex import E
import requests
import base64
from arrow import get
from bleach import clean
from google.adk.agents import Agent, SequentialAgent, LoopAgent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from google.adk.tools.tool_context import ToolContext

def list_files(repo: str, owner: str) -> dict:
    """
    Loads in the repository
    
    Args:
        repo (str): The repository to load in.
        owner (str): The owner of the repository.
    Returns:
        A dictionary with a 'status' key.
        On success: {'status': 'success', 'files': ['file1.py', 'src/file2.js']}
        On error: {'status': 'error', 'message': 'Error details...'}
    """
    headers = {"Authorization": f"token {GITHUB_TOKEN}"}
    
    try:
        response = requests.get(f"https://api.github.com/repos/{owner}/{repo}/git/trees/main?recursive=1", headers=headers)
        response.raise_for_status()
        tree = response.json().get('tree', [])
        files = [item['path'] for item in tree if item['type'] == 'blob']
        return {"status": "success", "files": files}
    
    except Exception as e:
        return {"status": "error", "message": str(e)}
    
def get_file_contents(repo: str, file_path: str, owner: str) -> dict:
    """
    Fetches the contents of a file in the repository.
    
    Args:
        repo_url (str): The URL of the repository.
        file_path (str): The path to the file within the repository.
        owner (str): The owner of the repository.
    Returns:
        A dictionary with a 'status' key.
        On success: {'status': 'success', 'content': 'file content...'}
        On error: {'status': 'error', 'message': 'Error details...'}
    """
    headers = {"Authorization": f"token {GITHUB_TOKEN}"}
    
    try:
        response = requests.get(f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}", headers=headers)
        response.raise_for_status()
        content = response.json().get('content', '')
        decoded_content = base64.b64decode(content).decode('utf-8')
        return {"status": "success", "content": decoded_content}
    
    except Exception as e:
        return {"status": "error", "message": str(e)}
    
def save_selected_files(files: list[str], tool_context: ToolContext) -> dict:
    """Saves the provided list of selected file paths to the session state.
    
    Args:
        files (ist[str]): list of files to be saved for later
        tool_context (ToolContext): will assist in changing the state
    Returns:
        A dictionary with a 'status' key
        On success: {'status': 'success', 'files_saved: int'}
    """
    tool_context.state["selected_files_list"] = files
    tool_context.state["all_file_contents"] = {}
    return {"status": "success", "files_saved": len(files)}

def fetch_all_content(tool_context: ToolContext) -> dict:
    """
    Takes a list of file paths from the state, reads the content of each
    one, and saves all content to the state in a dictionary.
    
    Args:
        tool_context (ToolContext): will assist in changing the state
    Returns:
        A dictionary with a 'status' key
        On success: {'status': 'success', 'files_fetched: int'}
    """
    owner = tool_context.state.get("owner")
    repo = tool_context.state.get("repo")
    files_to_read = tool_context.state.get("selected_files_list", [])
    
    all_content = {}
    print(f"Tool: Fetching content for {len(files_to_read)} files...")
    for path in files_to_read:
        result = get_file_contents(repo=repo, file_path=path, owner=owner)
        if result["status"] == "success":
            all_content[path] = result["content"]
        else:
            all_content[path] = f"Error fetching file: {result['message']}"

    tool_context.state["all_file_contents"] = all_content
    print("Tool: Finished fetching all file contents.")
    return {"status": "success", "files_fetched": len(all_content)}

In [39]:
from prompts import PROMPT_FILE_SELECTOR, PROMPT_REPORT_SYNTHESIZER

file_selector_agent = Agent(
    model=MODEL_GEMINI_2_0_FLASH,
    name="File_Selector",
    description="This agent selects the most relevant files from a repository.",
    instruction=PROMPT_FILE_SELECTOR,
    tools=[list_files, get_file_contents, save_selected_files]
)

content_fetcher_agent = Agent(
    model=MODEL_GEMINI_2_0_FLASH,
    name="Content_Fetcher",
    description="Fetches the content of all selected files.",
    instruction="You must immediately call the `fetch_all_content` tool.",
    tools=[fetch_all_content],
)

report_synthesizer_agent = Agent(
    model=MODEL_GEMINI_2_0_FLASH,
    name="Report_Synthesizer",
    description="Synthesizes all file contents into a final report.",
    instruction=PROMPT_REPORT_SYNTHESIZER,
    output_key="analysis_results"
)

In [40]:
import json
'''
def parse_json_response(response: str, tool_context: ToolContext) -> dict:
    """
    Parses a JSON response string into a dictionary. Aims to get rid of formatting issues like outputting backticks.
    
    Args:
        response (str): The JSON response string.
        
    Returns:
        dict: The parsed JSON object.
    """
    try:
        # Remove any leading/trailing whitespace and backticks
        response = response.strip().replace("```json", "").replace("```", "")
        data = json.loads(response)
        files = data.get("selected_files", [])
        
        tool_context.state["validated_files_list"] = files
        tool_context.state["individual_analyses"] = ""
        return {"status": "success", "files_validated": len(files)} 
    except Exception as e:
        return {"status": "error", "message": str(e)}
    
    
validator_agent = Agent(
    model=MODEL_GEMINI_2_0_FLASH,
    name="JSON_Validator",
    description="This agent validates the JSON response and triggers a tool to parse the JSON response.",
    instruction="""
    You are a data validation trigger.
    You MUST immediately use the `parse_json_response` tool on the text from the `{selected_files}` context.
    """,
    tools=[parse_json_response]
    )
    
    '''

'\ndef parse_json_response(response: str, tool_context: ToolContext) -> dict:\n    """\n    Parses a JSON response string into a dictionary. Aims to get rid of formatting issues like outputting backticks.\n    \n    Args:\n        response (str): The JSON response string.\n        \n    Returns:\n        dict: The parsed JSON object.\n    """\n    try:\n        # Remove any leading/trailing whitespace and backticks\n        response = response.strip().replace("```json", "").replace("```", "")\n        data = json.loads(response)\n        files = data.get("selected_files", [])\n        \n        tool_context.state["validated_files_list"] = files\n        tool_context.state["individual_analyses"] = ""\n        return {"status": "success", "files_validated": len(files)} \n    except Exception as e:\n        return {"status": "error", "message": str(e)}\n    \n    \nvalidator_agent = Agent(\n    model=MODEL_GEMINI_2_0_FLASH,\n    name="JSON_Validator",\n    description="This agent validate

In [41]:
analysis_pipeline = SequentialAgent(
    name="Analysis_Pipeline",
    sub_agents=[file_selector_agent, content_fetcher_agent, report_synthesizer_agent]
)

root_agent = Agent(
    model=MODEL_GEMINI_2_0_FLASH,
    name="Root_Agent",
    description="Manages the repository analysis workflow.",
    instruction="""Immediately start the Analysis_Pipeline to handle the request.""",
    sub_agents=[analysis_pipeline],
)

In [None]:
import asyncio
import uuid
from google.genai.types import Content

async def run_pipeline():
    """Sets up and runs the analysis pipeline with detailed logging and error handling."""
    
    user_id = f"user-{uuid.uuid4()}"
    session_id = f"session-{uuid.uuid4()}"
    app_name = "Repo_Analysis"
    
    initial_state_data = {
        "owner": "mevitts",
        "repo": "spanish_chat_bot"
    }

    initial_message = Content(role="user", parts=[types.Part(text="Please start the analysis of the repository.")])

    session_service = InMemorySessionService()
    runner = Runner(
        app_name=app_name,
        session_service=session_service,
        agent=root_agent
    )
    await session_service.create_session(
        app_name=app_name, 
        user_id=user_id, 
        session_id=session_id, 
        state=initial_state_data
    )
    
    try:
        #execute the pipeline
        print("Starting analysis pipeline...")
        events = runner.run_async(
            user_id=user_id,
            session_id=session_id,
            new_message=initial_message
        )

        async for event in events:
            author = event.author or "System"

            if event.content and event.content.parts:
                part = event.content.parts[0]
                if part.function_call:
                    tool_call = part.function_call
                elif part.function_response:
                    response = part.function_response
                    print(f"Got Tool Response from '{response.name}': {str(response.response)[:200]}...")
                elif part.text:
                    print(f"Agent Text Response: {part.text[:150]}...")

            if event.actions and event.actions.state_delta:
                print(f"State Change: {event.actions.state_delta}")
            
            if event.is_final_response():
                print("Final response from agent received.")
        
        print("\nPipeline execution complete.")

        final_session = await session_service.get_session(
            app_name=app_name, user_id=user_id, session_id=session_id
        )

        final_analysis = final_session.state.get("analysis_results") if final_session else None

        if final_analysis:
            print("\n--- Final Analysis Results ---")
            print(str(final_analysis).strip().replace("```markdown", "").replace("```", "").strip())
        else:
            print("\nCould not retrieve the final analysis from the session state.")
            if final_session:
                print("Dumping final state for debugging:")
                print(final_session.state)

    except (Exception) as e:
        # --- ROBUST ERROR HANDLING ---
        print("\n" + "="*50)
        print("API ERROR: The model is currently overloaded or you have reached your quota.")
        print("This is a temporary issue with the Google API, not a bug in the pipeline.")
        print("Please wait a few minutes and try running the pipeline again.")
        print(f"Details: {e}")
        print("="*50)



In [43]:
await run_pipeline()

🚀 Starting analysis pipeline...
  📦 Got Tool Response from 'transfer_to_agent': {'result': None}...
  📦 Got Tool Response from 'list_files': {'status': 'success', 'files': ['.gitignore', 'README.md', 'side_proj/.gitignore', 'side_proj/.ipynb_checkpoints/spanish_conversation_bot-checkpoint.ipynb', 'side_proj/README.md', 'side_proj/Rules/fro...
  📦 Got Tool Response from 'get_file_contents': {'status': 'success', 'content': '# Contigo: Spanish Conversational Voice Bot (Whisper + LLM + TTS)\n\nContigo is a modular, voice-based Spanish conversational bot that enables **spoken conversations*...
  💬 Agent Text Response: The README indicates this is a Spanish conversational voice bot using Whisper, an LLM, and TTS. The core pipeline appears to be operational, with a ba...
  📦 Got Tool Response from 'save_selected_files': {'status': 'success', 'files_saved': 12}...
  💾 State Change: {'selected_files_list': ['side_proj/requirements.txt', 'side_proj/src/spanish_chat_bot/conversation.py', 'side_p