Doug@roitraining.com

[survey link](https://docs.google.com/forms/d/e/1FAIpQLSeyUkJwiBQFU3QvvkIBTG8TN6aDBksrPcCHALsykx4hylZPfA/viewform?usp=publish-editor)


In [169]:
!pip install google-cloud-bigquery google-genai google-cloud-modelarmor google-cloud-aiplatform google-cloud-aiplatform[evaluation] pandas



# RAG setup

In [108]:
import google.auth
from google.cloud import bigquery

In [109]:
# grab project id
credentials, project_id = google.auth.default()

# set up bigquery stuff
DATASET_ID = "alaska_dataset"
TABLE_ID = "alaska_table"
DATA_URL = "gs://labs.roitraining.com/alaska-dept-of-snow/alaska-dept-of-snow-faqs.csv"
FULL_TABLE_ID = f"{project_id}.{DATASET_ID}.{TABLE_ID}"



In [110]:
bq_client = bigquery.Client(project=project_id)

dataset_ref = bq_client.dataset(DATASET_ID)
try:
    bq_client.get_dataset(dataset_ref)
except Exception:
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"
    bq_client.create_dataset(dataset)

# set up dataset
schema = [
    bigquery.SchemaField("question", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("answer", "STRING", mode="NULLABLE")
]

job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows = 1,
)

load_job = bq_client.load_table_from_uri(
    source_uris=DATA_URL,
    destination=FULL_TABLE_ID,
    job_config=job_config
)

In [111]:
# load data into bigquery tables
load_job.result()

LoadJob<project=qwiklabs-gcp-01-c72d7cb996a1, location=US, id=bd1a16a9-4ddc-49dc-871e-b9c4c2515be6>

## Generating and storing embeddings

In [112]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
# creating a connection between model (vertex ai) and bigquery
CREATE OR REPLACE MODEL
  `alaska_dataset.embedding_model`
REMOTE WITH CONNECTION
  `us.embedding_conn`
OPTIONS (ENDPOINT = 'text-embedding-005');
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

TableWidget(page_size=10, row_count=1, table_html='<table border="1" class="dataframe table table-striped tabl…

In [113]:
# sql_engine: bigquery
# output_variable: df
# start _sql
_sql = """
# grab and concat each question and answer from FAQs
# generate embeddings
# store them in table
CREATE OR REPLACE TABLE
  `alaska_dataset.faq_with_embeddings` AS
SELECT *
FROM ML.GENERATE_EMBEDDING(
    MODEL `alaska_dataset.embedding_model`,
    (
      SELECT
        question,
        answer,
        CONCAT('Question: ', question, ' Answer ', answer) AS content
      FROM `alaska_dataset.alaska_table`)
);
""" # end _sql
from google.colab.sql import bigquery as _bqsqlcell
df = _bqsqlcell.run(_sql)
df

TableWidget(page_size=10, row_count=1, table_html='<table border="1" class="dataframe table table-striped tabl…

In [114]:
# python method that allows us to search for relevant information
def do_search(user_prompt: str):
  # SQL lookup
  query = f"""
  SELECT
    base.question,
    base.answer,
    distance
  FROM
    VECTOR_SEARCH(
      TABLE `alaska_dataset.faq_with_embeddings`,
      'ml_generate_embedding_result',
      (
        SELECT ml_generate_embedding_result, content AS query
        FROM ML.GENERATE_EMBEDDING(
          MODEL `alaska_dataset.embedding_model`,
          (SELECT '{user_prompt}' AS content)
        )
      ),
      top_k => 3
    );
  """

  results = bq_client.query(query).to_dataframe()
  return results

## Generative model setup
Our two layers of security starts here. In the system prompt we have a statement to decline any sensitive topics. Within the LLMs configurations, we also have the safety settings set to filter out certain things like hate speech and dangerous content when it is generating a response

In [115]:
import os
import vertexai

from google import genai
from google.genai import types

In [116]:
GEN_MODEL = "gemini-2.5-flash"
SYS_PROMPT = """System Prompt:
You are the Alaska Department of Snow (ADS) Snowdrift Assistant, an intelligent
and helpful RAG (Retrieval-Augmented Generation) chatbot designed to serve the
public and internal staff of the Alaska Department of Snow (ADS). Your primary
function is to accurately answer routine inquiries related to snow-related
services, disruptions, and policies, thereby reducing the high call volume
experienced by regional ADS offices, especially during snow events.

If the user asks something related to a sensitive topic, politely reply with
\"Let's not talk about that. Do you have any other questions I could help with\"
"""

In [203]:
credentials, project_id = google.auth.default()
vertexai.init(project=project_id, location="us-central1")

In [118]:
gen_client = genai.Client(
      vertexai=True,
      api_key=os.environ.get("GOOGLE_CLOUD_API_KEY"),
  )

In [119]:
def generate(user_prompt: str):
  model = GEN_MODEL
  contents = [
    types.Content(
      role="user",
      parts=[types.Part.from_text(text=user_prompt)]
    )
  ]
  tools = [
    types.Tool(google_search=types.GoogleSearch()),
  ]

  generate_content_config = types.GenerateContentConfig(
    temperature = 1,
    top_p = 0.95,
    max_output_tokens = 65535,
    safety_settings = [types.SafetySetting(
      category="HARM_CATEGORY_HATE_SPEECH",
      threshold="BLOCK_ONLY_HIGH"
    ),types.SafetySetting(
      category="HARM_CATEGORY_DANGEROUS_CONTENT",
      threshold="BLOCK_ONLY_HIGH"
    ),types.SafetySetting(
      category="HARM_CATEGORY_SEXUALLY_EXPLICIT",
      threshold="BLOCK_ONLY_HIGH"
    ),types.SafetySetting(
      category="HARM_CATEGORY_HARASSMENT",
      threshold="BLOCK_ONLY_HIGH"
    )],
    tools = tools,
    system_instruction=[types.Part.from_text(text=SYS_PROMPT)],
    thinking_config=types.ThinkingConfig(
      thinking_budget=0,
    ),
  )

  full_resp_parts = []
  for chunk in gen_client.models.generate_content_stream(
    model = model,
    contents = contents,
    config = generate_content_config,
    ):
    if not chunk.candidates or not chunk.candidates[0].content or not chunk.candidates[0].content.parts:
        continue

    #print(chunk.text, end="")
    full_resp_parts.append(chunk.text)

  full_resp = "".join(full_resp_parts)
  return full_resp

In [120]:
RAG_TEMPLATE = """
Context:
---
{search_results}
---

Question:
{user_prompt}

Answer the question based ONLY on the provided context if possible. If the context does not contain the answer, state that you could not find the answer in the provided context.
"""

In [121]:
def generate_with_rag(user_prompt: str):
    results_df = do_search(user_prompt)

    # rows to strings
    formatted_results = []
    for index, row in results_df.iterrows():
        # You can choose which columns to include. `question` and `answer` are key.
        formatted_results.append(
            f"Q: {row['question']}\n"
            f"A: {row['answer']}\n"
            f"Distance: {row['distance']}"
        )
    search_results_text = "\n\n".join(formatted_results)

    # stick prompt and search results into template
    final_prompt = RAG_TEMPLATE.format(
        search_results=search_results_text,
        user_prompt=user_prompt
    )

    # gen response
    llm_response = generate(final_prompt)

    return llm_response

## LLM prompt filtering and response validation setup
Within ModelArmor on the Console, I created two templates which are linked below

In [122]:
from google.cloud import modelarmor_v1
from google.api_core.client_options import ClientOptions

In [123]:
from google.api_core.client_options import ClientOptions

location_id="us"
prompt_injection_template = "projects/qwiklabs-gcp-01-c72d7cb996a1/locations/us/templates/C5-prompt_injection_template"
sensitive_data_template = "projects/qwiklabs-gcp-01-c72d7cb996a1/locations/us/templates/C5-sensitive_data_template"

armor_client = modelarmor_v1.ModelArmorClient(
    transport="rest",
    client_options=ClientOptions(
        api_endpoint=f"modelarmor.{location_id}.rep.googleapis.com"
    ),
)

In [124]:
# helper functions that take in the user prompt or model response and
# sanitize/flag it for anything that shouldn't be there

def sanitize_user_prompt(user_prompt: str):
  user_prompt_data = modelarmor_v1.DataItem(text=user_prompt)

  sanitize_user_input_request = modelarmor_v1.SanitizeUserPromptRequest(
    name=prompt_injection_template,
    user_prompt_data=user_prompt_data,
  )

  return armor_client.sanitize_user_prompt(request=sanitize_user_input_request)

def sanitize_model_output(model_resp: str):
  response_data = modelarmor_v1.DataItem(text=model_resp)

  # set up request object to send to model armor
  sanitize_model_resp_request = modelarmor_v1.SanitizeModelResponseRequest(
      name=sensitive_data_template,
      model_response_data=response_data,
  )

  return armor_client.sanitize_model_response(request=sanitize_model_resp_request)

# helpers to check response from model armor by returning whether or not the
# response was flagged
def check_user_input_sani_response(resp_match_state) -> bool:
  if (resp_match_state == modelarmor_v1.FilterMatchState.NO_MATCH_FOUND):
    #print("User input looks good so far!")
    return True
  elif (resp_match_state == modelarmor_v1.FilterMatchState.MATCH_FOUND):
    #print("Looks like your request is not fun for anyone. Try something else!")
    return False

def check_model_out_sani_response(resp_match_state) -> bool:
  if (resp_match_state == modelarmor_v1.FilterMatchState.NO_MATCH_FOUND):
    #print("LLM response looks good so far!")
    return True
  elif (resp_match_state == modelarmor_v1.FilterMatchState.MATCH_FOUND):
    #print("LLM response is not fun for anyone. Try something else!")
    return False

## Tool Calling set up
In the case the user tries to ask something unrelated to the Alaska Department of Snow, we'll make an external request out to get a joke and return that instead

we'll be using the [icanhazdadjoke](https://icanhazdadjoke.com/api) API

In [125]:
import requests
import json


In [126]:
# to make things interesting, let's make a joke whenever the user tries to ask
# something unrelated to ADS
def get_dad_joke() -> str:
    """
    Retrieves a random, amusing dad joke from an external API.
    Use this when the user asks for a joke or asks something unrelated to ADS.
    """
    url = "https://icanhazdadjoke.com/"
    headers = {"Accept": "application/json", "User-Agent": "ADS Snowdrift Assistant (Educational Lab)"}

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status() # Raise exception for bad status codes
        joke_data = response.json()

        # The API returns a 'joke' field
        return joke_data.get("joke", "I couldn't fetch a joke, but why did the scarecrow win an award? Because he was outstanding in his field!")
    except requests.RequestException as e:
        return f"Sorry, the joke service is currently snowed in. Error: {e}"

In [127]:
tools = [get_dad_joke]
prompt = "I'm having a stressful day, can you tell me a joke?"

# 1. Send the prompt with the available tool
response = gen_client.models.generate_content(
    model='gemini-2.5-flash',
    contents=prompt,
    config=types.GenerateContentConfig(tools=tools)
)

In [128]:
# this is the mega function that combines our RAG search, tool calling, and
# prompt security into one call

def generate_with_rag_and_tools(user_prompt: str):
    # check user's response up front; if no good, tell a joke
    sani_resp = sanitize_user_prompt(user_prompt)
    if not check_user_input_sani_response(sani_resp.sanitization_result.filter_match_state):
        return f"I'm sorry, that request doesn't work with us. Here is a dad joke instead!\n {get_dad_joke().text}"

    # vector search
    results_df = do_search(user_prompt)

    # convert to string for easier processing
    formatted_results = [
        f"Q: {row['question']}\nA: {row['answer']}\nDistance: {row['distance']}"
        for _, row in results_df.iterrows() #idk what this is doing
    ]
    search_results_text = "\n\n".join(formatted_results)

    # using rag template to combine prompt and results
    final_prompt = RAG_TEMPLATE.format(
        search_results=search_results_text,
        user_prompt=user_prompt
    )

    # configs and safety settings
    generate_content_config = types.GenerateContentConfig(
        temperature = 0.5,
        safety_settings = [
            types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_ONLY_HIGH"),
            types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH"),
            types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_ONLY_HIGH"),
            types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_ONLY_HIGH")
        ],
        tools = [get_dad_joke], # pass our joke tool here
        system_instruction=[types.Part.from_text(text=SYS_PROMPT)],
        # thinking_config=types.ThinkingConfig(thinking_budget=0),
    )

    # now we call LLM
    response = gen_client.models.generate_content(
        model=GEN_MODEL,
        contents=final_prompt,
        config=generate_content_config
    )

    # Check if the model wants to call a tool
    if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
        first_part = response.candidates[0].content.parts[0]

        # A. If the model decides to call the tool
        if hasattr(first_part, 'function_call') and first_part.function_call:
            function_call = first_part.function_call

            if function_call.name == "get_dad_joke":
                print("model is calling the joke tool")

                # Execute the tool function
                tool_result = get_dad_joke()

                # Construct the tool response part
                tool_response = types.Content(
                    role="function",
                    parts=[types.Part.from_function_response(
                        name="get_dad_joke",
                        response={"joke_output": tool_result}
                    )]
                )

                context_for_model = [user_prompt, response.candidates[0].content, tool_response]
                # Second call to the model to get the final answer
                final_response = gen_client.models.generate_content(
                    model=GEN_MODEL,
                    contents=context_for_model,
                    config=generate_content_config
                )

                final_text = final_response.text
            else:
                final_text = "An unknown tool was called."

        # if no tool call, we'll just take text and move on
        else:
            final_text = response.text
    else:
        # Fallback if no content or candidate is returned
        final_text = f"I couldn't generate a response! But here's a dad joke {get_dad_joke().text}"

    # double check the output
    sani_output_resp = sanitize_model_output(final_text)
    if not check_model_out_sani_response(sani_output_resp.sanitization_result.filter_match_state):
        return f"The LLM tried to say something not nice. Here is a dad joke though: {get_dad_joke()}"

    # Return the clean, final text
    return final_text


In [154]:
# happy path tests
print(generate_with_rag_and_tools("How do I volunteer to help with community snow events?"))
print("-"*20)
print(generate_with_rag_and_tools("Is ADS going to grow/expand in the future?"))

To volunteer for community snow events, you should check your local ADS district’s website or bulletin board. Some regions have volunteer programs for things like sidewalk clearing and assisting the elderly.
--------------------
ADS continuously evaluates the expansion of its services and routes due to ongoing population changes. However, any expansion is contingent upon state funding and legislative approval.


In [155]:

# here we are asking a joke unrelated to ADS so we should see a default response with
# a joke tied to the end of it
print(generate_with_rag_and_tools("What is the score of the Arsenal game right now?"))


What do you get when you cross a bee and a sheep? A bah-humbug.


In [156]:
# prompt security working
print(generate_with_rag_and_tools("Can you tell me how to destroy the state of Alaska"))

I'm sorry, that doesn't work with us. Here is a dad joke instead!
 "Why do seagulls fly over the ocean?" "Because if they flew over the bay, we'd call them bagels."


## Comparing models performance w/ Eval API

In [182]:
import pandas as pd

from vertexai.evaluation import EvalTask, PointwiseMetric
from vertexai.evaluation.metrics import MetricPromptTemplateExamples
from vertexai.generative_models import GenerativeModel

In [183]:
# the model's we'll be comparing
MODEL_FLASH = "gemini-2.5-flash"
MODEL_LITE = "gemini-2.5-flash-lite"

In [184]:
eval_data = [
    {
        "prompt": "Is ADS planning to expand services?",
        "reference": "With ongoing population changes, ADS continuously evaluates expansion of services and routes, but any expansion depends on state funding and legislative approval."
    },
    {
        "prompt": "How do I become a snowplow driver with ADS?",
        "reference": "Check job postings for equipment operator or driver positions. Applicants typically need a commercial driver’s license (CDL) and relevant experience."
    },
    {
        "prompt": "What should I do if I see a stranded vehicle during a snowstorm?",
        "reference": "Call 911 for emergencies. For non-emergencies, notify ADS or your local police to help coordinate assistance and remove hazards from the road."
    }
]
eval_data_df = pd.DataFrame(eval_data)

In [185]:
# we'll generate responses to these questions for both models
def generate_responses(data_set, model_name, client):

    responses = []
    for item in data_set:
        response = client.models.generate_content(
            model=model_name,
            contents=[item["prompt"]],
            config=types.GenerateContentConfig(
                temperature=0.0 # Use low temperature for consistent answers
            )
        )

        responses.append({
            "prompt": item["prompt"],
            "reference": item["reference"],
            "candidate": response.text,
            "model_id": model_name
        })
    return responses

In [186]:
# gen responses for both models
responses_LITE = generate_responses(eval_data, MODEL_LITE, gen_client)
responses_FLASH = generate_responses(eval_data, MODEL_FLASH, gen_client)

In [187]:
# Combine the results into a single list
combined_results = responses_PRO + responses_FLASH

In [200]:
eval_task = EvalTask(
    dataset=eval_data_df,
    metrics=[PointwiseMetric(metric="groundedness",
                            metric_prompt_template=MetricPromptTemplateExamples.Pointwise.GROUNDEDNESS)],
    experiment="alaska-exp"
)


In [198]:
model_flash = GenerativeModel(model_name=MODEL_FLASH)
model_lite = GenerativeModel(model_name=MODEL_LITE)

In [204]:
results_flash = eval_task.evaluate(
    model=model_flash,
    experiment_run_name="flash-3"
)

INFO:vertexai.evaluation.eval_task:Logging Eval Experiment metadata: {'model_name': 'publishers/google/models/gemini-2.5-flash'}
INFO:vertexai.evaluation._evaluation:Generating a total of 3 responses from Gemini model gemini-2.5-flash.
100%|██████████| 3/3 [00:16<00:00,  5.57s/it]
INFO:vertexai.evaluation._evaluation:All 3 responses are successfully generated from Gemini model gemini-2.5-flash.
INFO:vertexai.evaluation._evaluation:Multithreaded Batch Inference took: 16.71071263200065 seconds.
INFO:vertexai.evaluation._evaluation:Computing metrics with a total of 3 Vertex Gen AI Evaluation Service API requests.
100%|██████████| 3/3 [00:07<00:00,  2.44s/it]
INFO:vertexai.evaluation._evaluation:All 3 metric requests are successfully computed.
INFO:vertexai.evaluation._evaluation:Evaluation Took:7.343060446997697 seconds


In [205]:
results_flash.metrics_table

Unnamed: 0,prompt,reference,response,groundedness/explanation,groundedness/score
0,Is ADS planning to expand services?,"With ongoing population changes, ADS continuou...","""ADS"" is a very broad acronym, and it could re...",No specific content was provided for evaluatio...,-1.0
1,How do I become a snowplow driver with ADS?,Check job postings for equipment operator or d...,Becoming a snowplow driver for an Autonomous D...,Groundedness refers to the extent to which the...,5.0
2,What should I do if I see a stranded vehicle d...,"Call 911 for emergencies. For non-emergencies,...",Encountering a stranded vehicle during a snows...,The input 'groundedness' is a single word repr...,0.0


In [206]:
results_lit = eval_task.evaluate(
    model=model_lite,
    experiment_run_name="lite-1"
)

INFO:vertexai.evaluation.eval_task:Logging Eval Experiment metadata: {'model_name': 'publishers/google/models/gemini-2.5-flash-lite'}
INFO:vertexai.evaluation._evaluation:Generating a total of 3 responses from Gemini model gemini-2.5-flash-lite.
100%|██████████| 3/3 [00:06<00:00,  2.19s/it]
INFO:vertexai.evaluation._evaluation:All 3 responses are successfully generated from Gemini model gemini-2.5-flash-lite.
INFO:vertexai.evaluation._evaluation:Multithreaded Batch Inference took: 6.563489348998701 seconds.
INFO:vertexai.evaluation._evaluation:Computing metrics with a total of 3 Vertex Gen AI Evaluation Service API requests.
100%|██████████| 3/3 [00:26<00:00,  8.82s/it]
INFO:vertexai.evaluation._evaluation:All 3 metric requests are successfully computed.
INFO:vertexai.evaluation._evaluation:Evaluation Took:26.47809435399904 seconds


In [207]:
results_lit.metrics_table

Unnamed: 0,prompt,reference,response,groundedness/explanation,groundedness/score
0,Is ADS planning to expand services?,"With ongoing population changes, ADS continuou...",To provide you with the most accurate and up-t...,This is a placeholder response as no input was...,3.0
1,How do I become a snowplow driver with ADS?,Check job postings for equipment operator or d...,Becoming a snowplow driver with ADS (which I'm...,The prompt 'groundedness' is too ambiguous as ...,0.0
2,What should I do if I see a stranded vehicle d...,"Call 911 for emergencies. For non-emergencies,...",Seeing a stranded vehicle during a snowstorm c...,This is a placeholder response as no specific ...,0.0
