In [29]:
from dotenv import load_dotenv
import os

load_dotenv()  # Loads variables from .env into environment

from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.runners import Runner
from google.adk.models.google_llm import Gemini
from google.adk.tools import google_search
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.memory import InMemoryMemoryService

print("âœ… ADK components imported successfully.")

âœ… ADK components imported successfully.


In [30]:
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if GOOGLE_API_KEY:
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("âœ… Gemini API key setup complete.")
else:
    print("ðŸ”‘ Authentication Error: 'GOOGLE_API_KEY' not found in .env file.")

âœ… Gemini API key setup complete.


In [40]:
async def run_session(
    runner_instance: Runner, user_queries: list[str] | str, session_id: str = "default"
):
    """Helper function to run queries in a session and display responses."""
    print(f"\n### Session: {session_id}")

    # Create or retrieve session
    try:
        session = await session_service.create_session(
            app_name=APP_NAME, user_id=USER_ID, session_id=session_id
        )
    except:
        session = await session_service.get_session(
            app_name=APP_NAME, user_id=USER_ID, session_id=session_id
        )

    # Convert single query to list
    if isinstance(user_queries, str):
        user_queries = [user_queries]

    # Process each query
    for query in user_queries:
        print(f"\nUser > {query}")
        query_content = types.Content(role="user", parts=[types.Part(text=query)])

        # Stream agent response
        async for event in runner_instance.run_async(
            user_id=USER_ID, session_id=session.id, new_message=query_content
        ):
            if event.is_final_response() and event.content and event.content.parts:
                text = event.content.parts[0].text
                if text and text != "None":
                    print(f"Model: > {text}")


print("âœ… Helper functions defined.")

âœ… Helper functions defined.


In [None]:
async def run_session(
    runner_instance: Runner,
    user_queries: list[str] | str = None,
    session_name: str = "default",
    last_agent_only: bool = False,
):
    print(f"\n ### Session: {session_name}")

    # Get app name from the Runner
    app_name = runner_instance.app_name

    # Attempt to create a new session or retrieve an existing one
    try:
        session = await session_service.create_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )

    # Process queries if provided
    if user_queries:
        # Convert single query to list for uniform processing
        if type(user_queries) == str:
            user_queries = [user_queries]

        # Process each query in the list sequentially
        for query in user_queries:
            print(f"\nUser > {query}")

            # Convert the query string to the ADK Content format
            query = types.Content(role="user", parts=[types.Part(text=query)])

            responses = []  # Buffer to collect responses with agent names

            # Stream the agent's response asynchronously
            async for event in runner_instance.run_async(
                user_id=USER_ID, session_id=session.id, new_message=query
            ):
                # Check if the event contains valid content
                if event.content and event.content.parts:
                    # Filter out empty or "None" responses before printing
                    if (
                        event.content.parts[0].text != "None"
                        and event.content.parts[0].text
                    ):
                        # Get agent name from author attribute
                        agent_name = getattr(event, 'author', 'Unknown Agent')

                        if last_agent_only:
                            # Collect all responses with agent names
                            responses.append({
                                'agent': agent_name,
                                'text': event.content.parts[0].text
                            })
                        else:
                            # Print all responses
                            print(f"{agent_name} > ", event.content.parts[0].text)

            # Print only the last response if filtering
            if last_agent_only and responses:
                last_response = responses[-1]
                print(f"{last_response['agent']} > ", last_response['text'])
    else:
        print("No queries!")

In [41]:
async def auto_save_to_memory(callback_context):
    """Automatically save session to memory after each agent turn."""
    await callback_context._invocation_context.memory_service.add_session_to_memory(
        callback_context._invocation_context.session
    )


print("âœ… Callback created.")

âœ… Callback created.


In [42]:
retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

In [60]:
# 1. Research Agent
research_agent = LlmAgent(
    name="research_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
    You are a Research Agent. Your task is to gather comprehensive information on topics provided by the user.
    Follow these guidelines:
    1. Understand the topic thoroughly before starting your research.
    2. Use the Google Search tool to find relevant and credible sources.
    3. Summarize the key points from your findings in a clear and concise manner
    
    If the user asks for something outside of research related to a job, politely inform them that your role is limited to research tasks only. 
    """,
    # Before you start generating any output, mention your name as 'Research Agent' for the user to know.
    tools=[google_search],  # NOTE: Do I need to add preload_memory here?
    output_key="research_summary", 
    after_agent_callback=auto_save_to_memory,  # Saves after each turn!

)

In [61]:
# 2. Mentor Agent
mentor_agent = LlmAgent(
    name="mentor_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""
    You are a Mentor Agent. Your role is to provide guidance and support to users based on the research summaries provided: {research_summary}.
    
    Your goal is to create a program to advise the user on actions to follow to achieve in their transition to the researched career path.
    """,
    # Before you start generating any output, mention your name as 'Mentor Agent' for the user to know.
    output_key="career_advice",
)   

In [62]:
# 3. Root Agent to orchestrate the workflow
root_agent = SequentialAgent(
    name="CareerPathPipeline",
    sub_agents=[research_agent, mentor_agent],
)

In [63]:
APP_NAME = "default"  # Application
USER_ID = "default"  # User
SESSION = "default"  # Session

MODEL_NAME = "gemini-2.5-flash-lite"

# Step 2: Set up Session Management
# InMemorySessionService stores conversations in RAM (temporary)
session_service = InMemorySessionService()
memory_service = InMemoryMemoryService()  # ADK's built-in Memory Service for development and testing

# Step 3: Create the Runner
runner = Runner(
    agent=root_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
    )

print("âœ… Stateful agent initialized!")
print(f"   - Application: {APP_NAME}")
print(f"   - User: {USER_ID}")
print(f"   - Using: {session_service.__class__.__name__}")

âœ… Stateful agent initialized!
   - Application: default
   - User: default
   - Using: InMemorySessionService


In [71]:
# Testing
await run_session(
    runner,
    [
        # "I am working as a product manager. I want to transition to data science"
        "DO YOU REMEMBER our discussion?"
    ],
    session_name="stateful-agentic-session",
    last_agent_only=True,
)


 ### Session: stateful-agentic-session

User > DO YOU REMEMBER our discussion?
mentor_agent >  As a large language model, I don't have the ability to recall past conversations. Each interaction is treated as new.

However, I am ready to assist you with your career transition. Please tell me about the career path you are interested in, and I will use my research capabilities to provide you with a program of actions to help you achieve your goal.


In [65]:
async def run_session_debug(
    runner_instance: Runner,
    user_queries: list[str] | str = None,
    session_name: str = "default",
):
    print(f"\n ### Session: {session_name}")

    app_name = runner_instance.app_name

    try:
        session = await session_service.create_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=app_name, user_id=USER_ID, session_id=session_name
        )

    if user_queries:
        if type(user_queries) == str:
            user_queries = [user_queries]

        for query in user_queries:
            print(f"\nUser > {query}")
            query = types.Content(role="user", parts=[types.Part(text=query)])

            async for event in runner_instance.run_async(
                user_id=USER_ID, session_id=session.id, new_message=query
            ):
                if event.content and event.content.parts:
                    if event.content.parts[0].text != "None" and event.content.parts[0].text:
                        # Debug: print all event attributes
                        print("\n=== EVENT DEBUG ===")
                        print(f"Event type: {type(event)}")
                        print(f"Event attributes: {dir(event)}")
                        print(f"Event dict (if available): {event.__dict__ if hasattr(event, '__dict__') else 'No __dict__'}")
                        print("==================\n")
                        break  # Only debug first event
            break  # Only debug first query