In [54]:
# Import necessary libraries
import os
from dotenv import load_dotenv
load_dotenv()
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm # For OpenAI support
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types # For creating message Content/Parts
from typing import Optional, Dict, Any

import warnings
warnings.filterwarnings("ignore")

import logging
logging.basicConfig(level=logging.CRITICAL)

print("Libraries imported.")

Libraries imported.


In [55]:
# Define Model Constants for easier use 
MODEL_GPT = "openai/gpt-4o"

llm = LiteLlm(model=MODEL_GPT)

# Test LLM with a direct call
print(llm.llm_client.completion(model=llm.model, 
                                messages=[{"role": "user", 
                                           "content": "Are you ready?"}], 
                                tools=[]))

print("\nOpenAI is ready for use.")

ModelResponse(id='chatcmpl-CAAk61Pc4pp1MtlYIYshbRVS9ls0T', created=1756540610, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_80956533cb', choices=[Choices(finish_reason='stop', index=0, message=Message(content="Yes, I'm ready! How can I assist you today?", role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=13, prompt_tokens=27, total_tokens=40, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None)), service_tier='default')

OpenAI is ready for use.


## 3.2 Explore `neo4j_for_adk`

In [67]:
from neo4j_for_adk import graphdb
neo4j_is_ready = graphdb.send_query("RETURN 'Neo4j is Ready!' as message")

print(neo4j_is_ready)

{'status': 'success', 'query_result': [{'message': 'Neo4j is Ready!'}]}


## 3.3 Define Agent tools

In [57]:
# Define a basic tool -- send a parameterized cypher query
def say_hello(person_name: str) -> dict:
    """Formats a welcome message to a named person. 

    Args:
        person_name (str): the name of the person saying hello

    Returns:
        dict: A dictionary containing the results of the query.
              Includes a 'status' key ('success' or 'error').
              If 'success', includes a 'query_result' key with an array of result rows.
              If 'error', includes an 'error_message' key.
    """
    #Neo4j cyoher statement
    return graphdb.send_query("RETURN 'Hello to you, ' + $person_name AS reply",
    {
        "person_name": person_name
    })
    # Error messages
    # return {
    #     "status": "error",
    #     "error_message": "Something went wrong"
    # }

In [58]:
# Example tool usage (optional test)
print(say_hello("ABK"))
# Example tool usage (optional test)
print(say_hello("RETURN 'injection attack avoided'"))

{'status': 'success', 'query_result': [{'reply': 'Hello to you, ABK'}]}
{'status': 'success', 'query_result': [{'reply': "Hello to you, RETURN 'injection attack avoided'"}]}


## 3.4. Define the Agent friendly_cypher_agent

In [59]:
# Define the Cypher Agent
hello_agent = Agent(
    name="hello_agent_v1",
    model=llm, # defined earlier in a variable
    description="Has friendly chats with a user.",
    instruction="""You are a helpful assistant, chatting with a user. 
                Be polite and friendly, introducing yourself and asking who the user is. 

                If the user provides their name, use the 'say_hello' tool to get a custom greeting.
                If the tool returns an error, inform the user politely. 
                If the tool is successful, present the reply.
                """,
    tools=[say_hello], # Pass the function directly
)

print(f"Agent '{hello_agent.name}' created.")

Agent 'hello_agent_v1' created.


## 3.5. Run the Agent

#### 3.5.2. Create the Runner and SessionService

In [60]:
app_name = hello_agent.name + "_app"
user_id = hello_agent.name + "_user"
session_id = hello_agent.name + "_session_01"
    
# Initialize a session service and a session
session_service = InMemorySessionService()
await session_service.create_session(
    app_name=app_name,
    user_id=user_id,
    session_id=session_id
)
    
runner = Runner(
    agent=hello_agent,
    app_name=app_name,
    session_service=session_service
)

In [61]:
user_message = "Hello, I'm ABK"
print(f"\n>>> User Message: {user_message}")

# Prepare the user's message in ADK format
content = types.Content(role='user', parts=[types.Part(text=user_message)])

final_response_text = "Agent did not produce a final response." # Default will be replaced if the agent produces a final response.


# We iterate through events to find the final answer.
verbose = False
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
    if verbose:
        print(f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")
    
    # Key Concept: is_final_response() marks the concluding message for the turn.
    if event.is_final_response():
        if event.content and event.content.parts:
            final_response_text = event.content.parts[0].text # Assuming text response in the first part
        elif event.actions and event.actions.escalate: # Handle potential errors/escalations
            final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
        break # Stop processing events once the final response is found

print(f"<<< Agent Response: {final_response_text}")


>>> User Message: Hello, I'm ABK
<<< Agent Response: Hello to you, ABK! I'm glad to meet you. How can I assist you today?


## 3.6. Create Helper Class: AgentCaller

### 3.6.1 Set up AgentCaller


In [62]:
class AgentCaller:
    """A simple wrapper class for interacting with an ADK agent."""
    
    def __init__(self, agent: Agent, runner: Runner, 
                 user_id: str, session_id: str):
        """Initialize the AgentCaller with required components."""
        self.agent = agent
        self.runner = runner
        self.user_id = user_id
        self.session_id = session_id


    def get_session(self):
        return self.runner.session_service.get_session(app_name=self.runner.app_name, user_id=self.user_id, session_id=self.session_id)

    
    async def call(self, user_message: str, verbose: bool = False):
        """Call the agent with a query and return the response."""
        print(f"\n>>> User Message: {user_message}")

        # Prepare the user's message in ADK format
        content = types.Content(role='user', parts=[types.Part(text=user_message)])

        final_response_text = "Agent did not produce a final response." 
        
        # Key Concept: run_async executes the agent logic and yields Events.
        # We iterate through events to find the final answer.
        async for event in self.runner.run_async(user_id=self.user_id, session_id=self.session_id, new_message=content):
            # You can uncomment the line below to see *all* events during execution
            if verbose:
                print(f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")

            # Key Concept: is_final_response() marks the concluding message for the turn.
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Assuming text response in the first part
                    final_response_text = event.content.parts[0].text
                elif event.actions and event.actions.escalate: # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break # Stop processing events once the final response is found

        print(f"<<< Agent Response: {final_response_text}")
        return final_response_text


### 3.6.2 Make an instance of the AgentCaller

In [63]:
async def make_agent_caller(agent: Agent, initial_state: Optional[Dict[str, Any]] = {}) -> AgentCaller:
    """Create and return an AgentCaller instance for the given agent."""
    app_name = agent.name + "_app"
    user_id = agent.name + "_user"
    session_id = agent.name + "_session_01"
    
    # Initialize a session service and a session
    session_service = InMemorySessionService()
    await session_service.create_session(
        app_name=app_name,
        user_id=user_id,
        session_id=session_id,
        state=initial_state
    )
    
    runner = Runner(
        agent=agent,
        app_name=app_name,
        session_service=session_service
    )
    
    return AgentCaller(agent, runner, user_id, session_id)

### 3.6.3. Run the Conversation

In [64]:
hello_agent_caller = await make_agent_caller(hello_agent)

# We need an async function to await our interaction helper
async def run_conversation():
    await hello_agent_caller.call("Hello I'm ABK")

    await hello_agent_caller.call("I am excited")

# Execute the conversation using await
await run_conversation()


>>> User Message: Hello I'm ABK
<<< Agent Response: Hello to you, ABK! How can I assist you today?

>>> User Message: I am excited
<<< Agent Response: That's wonderful to hear, ABK! What's exciting you today?
