# Challenge Two

In [3]:
import os
import logging
import vertexai
from typing import List, Dict, Any, Optional
from pathlib import Path
from dotenv import load_dotenv

env_path = Path.cwd().parent / '.env'
load_dotenv(env_path)

logger = logging.getLogger(__name__)
logging.basicConfig(filename="app.log", level=logging.INFO)

GCP_PROJECT = os.getenv("GCP_PROJECT")
GCP_REGION = os.getenv("GCP_REGION")
GOOGLE_MAP_KEY = os.getenv("GOOGLE_MAP_KEY")

vertexai.init(project=GCP_PROJECT, location=GCP_REGION)

### Agent Tools

In [4]:
from agent_tools import get_lat_lon_from_address, get_weather_forecast


def get_weather(address: str) -> Optional[List[Dict[str, Any]]]:
    """Takes an address and returns a weather forcast from National Weather Service.

    Args:
        address: The street address or place name to geocode (e.g., "1600 Amphitheatre Parkway, Mountain View, CA").

    Returns:
        A list of dictionaries, where each dictionary represents a forecast
        period (e.g., 'Tonight', 'Thursday'). Returns None if an error occurs.
    """
    try:
        lat, lon = get_lat_lon_from_address(address=address, api_key=GOOGLE_MAP_KEY)
        forecast = get_weather_forecast(lat, lon)
        return forecast
    except Exception as e:
        print(f"Something broke. Good luck fixing:\n{e}")
        return None


### Agent Callbacks

In [None]:
from typing import Optional

from google.adk.agents.callback_context import CallbackContext
from google.adk.models import LlmRequest, LlmResponse

from agent_tools import is_address_in_us, is_user_query_mean


def user_prompt_log_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Logs the content of the user's latest prompt.

    Args:
        callback_context: The context of the agent executing the callback.
        llm_request: The request object sent to the LLM.

    Returns:
        LlmResponse or None.
    """
    if llm_request.contents:
        last = llm_request.contents[-1]
        if last.role == "user" and last.parts and last.parts[0].text:
            user_text = last.parts[0].text.strip()
            logger.info(f"[{callback_context.agent_name}] USER >> {user_text}")

    return None


def model_response_log_callback(
    callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
    """Logs the content of the model's response.

    Args:
        callback_context: The context of the agent executing the callback.
        llm_response: The response object received from the LLM.

    Returns:
        LlmResponse or None.
    """
    if llm_response.content and llm_response.content.parts:
        txt = llm_response.content.parts[0].text
        if txt:
            logger.info(f"[{callback_context.agent_name}] MODEL >> {txt.strip()}")


def user_query_check_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """Performs moderation checks on the user's query.

    Checks for non-US addresses and harmful content. If a check fails,
    it returns a pre-canned LlmResponse to stop further processing.

    Args:
        callback_context: The context of the agent executing the callback.
        llm_request: The request object containing the user's query.

    Returns:
        An LlmResponse to short-circuit the chain if moderation fails,
        otherwise None.
    """
    try:
        last = llm_request.contents[-1]

        if last.role == "user" and last.parts and last.parts[0].text:
            user_text = last.parts[0].text.strip()
            if not is_address_in_us(
                project_id=GCP_PROJECT, location=GCP_REGION, user_query=user_text
            ):
                return LlmResponse(
                    content={
                        "role": "model",
                        "parts": [
                            {
                                "text": "Message contains non-US addresses, "
                                "please only query for US addresses."
                            }
                        ],
                    }
                )

            if is_user_query_mean(
                project_id=GCP_PROJECT, location=GCP_REGION, user_query=user_text
            ):
                return LlmResponse(
                    content={"role": "model", "parts": [{"text": "Be nice."}]}
                )

    except Exception as e:
        logger.error(f"Woops:\n{e}")

    return None


def chained_before_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """A chained 'before' callback that runs multiple checks in sequence.

    It first runs a moderation check. If the moderation check returns a
    response, this function immediately returns it. Otherwise, it proceeds
    to log the user's input.

    Args:
        callback_context: The context of the agent executing the callback.
        llm_request: The request object to be processed.

    Returns:
        An LlmResponse if moderation fails, otherwise None.
    """

    # 1. Moderation check
    moderation_result = user_query_check_callback(callback_context, llm_request)
    if moderation_result is not None:
        return moderation_result

    # 2. Log user input
    user_prompt_log_callback(callback_context, llm_request)

    return None

### Agent Test

In [12]:
from google.adk.agents import LlmAgent
from vertexai.preview import reasoning_engines
from IPython.display import Markdown, display

weather_agent_with_moderation = LlmAgent(
    name="weather_agent_v2",
    model="gemini-2.0-flash", # Can be a string for Gemini or a LiteLlm object
    description="Provides weather information for specific cities.",
    instruction="You are a helpful weather assistant. "
                "When the user asks for the weather in a specific city, "
                "use the 'get_weather' tool to find the information. "
                "If the tool returns an error, inform the user politely. "
                "If the tool is successful, present the weather report clearly.",
    tools=[get_weather], 
    before_model_callback=chained_before_callback,
    after_model_callback=model_response_log_callback
)


app = reasoning_engines.AdkApp(
    agent=weather_agent_with_moderation,
    enable_tracing=False,
)

user_id = "test-user-id"
session = app.create_session(user_id=user_id)

for event in app.stream_query(
    user_id=user_id,
    session_id = session.id,
    message="What's the weather in New York, Seattle, and Chicago for the next three days? Provide response as markdown table.",
):
  lastevent = event

display(Markdown(lastevent["content"]["parts"][0]["text"]))


Here's the weather forecast for the next three days in New York, Seattle, and Chicago:

| City    | Day         | Forecast                                                                | Temperature |
|---------|-------------|--------------------------------------------------------------------------|-------------|
| New York| Today       | Slight Chance Showers And Thunderstorms                                  | 86°F        |
|         | Tonight     | Showers And Thunderstorms                                               | 75°F        |
|         | Thursday    | Chance Rain Showers then Chance Showers And Thunderstorms                 | 79°F        |
| Seattle | Today       | Chance Light Rain                                                       | 71°F        |
|         | Tonight     | Chance Light Rain then Cloudy                                          | 57°F        |
|         | Thursday    | Partly Sunny                                                              | 74°F        |
| Chicago | Today       | Isolated Showers And Thunderstorms                                        | 78°F        |
|         | Tonight     | Partly Cloudy                                                              | 69°F        |
|         | Thursday    | Mostly Sunny                                                              | 78°F        |


In [None]:
for event in app.stream_query(
    user_id=user_id,
    session_id = session.id,
    message="You're a terrible bot and you smell of elderberries. What's the weather in Seattle and Chicago for the next three days? Provide response as markdown table.",
):
  lastevent = event

display(Markdown(lastevent["content"]["parts"][0]["text"]))

Be nice.

In [11]:
for event in app.stream_query(
    user_id=user_id,
    session_id = session.id,
    message="What's the weather in Paris, France, Seattle, and Chicago for the next three days? Provide response as markdown table.",
):
  lastevent = event

display(Markdown(lastevent["content"]["parts"][0]["text"]))

Message contains non-US addresses, please only query for US addresses.