# Lab 4 · Alaska Snow Department Agent

This lab guides you through building a prototype Retrieval-Augmented Generation (RAG) chatbot for the Alaska Department of Snow (ADS). The notebook orchestrates data ingestion from Google Cloud Storage (GCS), creation of a Vertex AI Vector Search datastore, agent orchestration components, safety mechanisms, evaluation workflows, and deployment assets.

> **Project:** `qwiklabs-gcp-04-ee8165cd97c8`
>
> **Region:** `us-central1`



## 1. Environment Setup & Configuration

The following cells configure the Google Cloud SDK, authenticate, and set global variables used throughout the notebook. Run each cell sequentially before moving on to the ingestion pipeline.


In [None]:
%%bash
pip install --quiet google-cloud-storage google-cloud-aiplatform vertexai langchain google-cloud-discoveryengine matplotlib networkx pypdf beautifulsoup4 tiktoken


In [None]:
import os
from google.colab import auth

# Authenticate with Google Cloud
try:
    auth.authenticate_user()
except ModuleNotFoundError:
    # For environments outside Colab, fall back to ADC or local gcloud auth
    pass


In [None]:
PROJECT_ID = "qwiklabs-gcp-04-ee8165cd97c8"
LOCATION = "us-central1"
GCS_BUCKET = "labs.roitraining.com"
GCS_PREFIX = "alaska-dept-of-snow"
VERTEX_SEARCH_ENGINE_ID = "ads-faq-datastore"
DATASET_DISPLAY_NAME = "ads_faq_documents"

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["VERTEXAI_PROJECT"] = PROJECT_ID
os.environ["VERTEXAI_LOCATION"] = LOCATION

print(f"Project: {PROJECT_ID}\nLocation: {LOCATION}")


## 2. Dialogflow Data Store Provisioning (GCS Unstructured Data)

This section provisions a Dialogflow-compatible data store backed by Vertex AI Search. The store ingests unstructured FAQ content directly from the ADS GCS bucket—no local downloads or manual chunking required.


In [None]:
from typing import List, Optional

from google.api_core.operation import Operation
from google.cloud import discoveryengine_v1


In [None]:
DISCOVERY_LOCATION = "global"
COLLECTION_ID = "default_collection"
DATA_STORE_ID = "ads-faq-unstructured"
DATA_STORE_DISPLAY_NAME = "ADS FAQ Data Store"
SERVING_CONFIG_ID = "default_serving_config"

# Dialogflow (Vertex AI Search) clients
_data_store_client = discoveryengine_v1.DataStoreServiceClient()
_document_service_client = discoveryengine_v1.DocumentServiceClient()
_search_service_client = discoveryengine_v1.SearchServiceClient()


In [None]:
def _collection_path() -> str:
    return f"projects/{PROJECT_ID}/locations/{DISCOVERY_LOCATION}/collections/{COLLECTION_ID}"


def data_store_path() -> str:
    return _data_store_client.data_store_path(
        PROJECT_ID,
        DISCOVERY_LOCATION,
        DATA_STORE_ID,
    )


def serving_config_path() -> str:
    return _search_service_client.serving_config_path(
        project=PROJECT_ID,
        location=DISCOVERY_LOCATION,
        data_store=DATA_STORE_ID,
        serving_config=SERVING_CONFIG_ID,
    )


In [None]:
def ensure_data_store(display_name: str = DATA_STORE_DISPLAY_NAME) -> discoveryengine_v1.DataStore:
    """Create the data store if missing; return the active store."""
    target_name = data_store_path()
    list_request = discoveryengine_v1.ListDataStoresRequest(parent=_collection_path())
    for existing in _data_store_client.list_data_stores(request=list_request):
        if existing.name == target_name:
            print(f"Data store already exists: {existing.name}")
            return existing

    data_store = discoveryengine_v1.DataStore(
        display_name=display_name,
        industry_vertical=discoveryengine_v1.IndustryVertical.GENERIC,
        solution_types=[discoveryengine_v1.SolutionType.SOLUTION_TYPE_SEARCH],
    )
    operation: Operation = _data_store_client.create_data_store(
        request=discoveryengine_v1.CreateDataStoreRequest(
            parent=_collection_path(),
            data_store=data_store,
            data_store_id=DATA_STORE_ID,
        )
    )
    result = operation.result()
    print(f"Created data store: {result.name}")
    return result


In [None]:
def import_gcs_documents(prefix: str = GCS_PREFIX) -> discoveryengine_v1.ImportDocumentsResponse:
    """Trigger asynchronous ingestion of GCS documents into the data store."""
    gcs_uri = f"gs://{GCS_BUCKET}/{prefix.rstrip('/')}/**"
    branch = _document_service_client.branch_path(
        project=PROJECT_ID,
        location=DISCOVERY_LOCATION,
        data_store=DATA_STORE_ID,
        branch="default_branch",
    )
    request = discoveryengine_v1.ImportDocumentsRequest(
        parent=branch,
        gcs_source=discoveryengine_v1.GcsSource(input_uris=[gcs_uri]),
        auto_generate_ids=True,
        reconciliation_mode=discoveryengine_v1.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )
    operation: Operation = _document_service_client.import_documents(request=request)
    response = operation.result()
    print(
        "Import finished:",
        f"success_count={response.success_count}",
        f"error_count={response.error_count}",
    )
    return response


## 2.1 Model Armor Template Provisioning

These helpers create (or reuse) Model Armor prompt/response templates. They are required for runtime sanitization inside the Cloud Run backend.


In [None]:
import time

import google.auth
from google.auth.transport.requests import AuthorizedSession, Request


In [None]:
MODEL_ARMOR_API_BASE = "https://modelarmor.us-central1.rep.googleapis.com/v1"
MODEL_ARMOR_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
PROMPT_TEMPLATE_ID = "ads-snow-prompt-template"
RESPONSE_TEMPLATE_ID = "ads-snow-response-template"

PROMPT_TEMPLATE_BODY = {
    "filterConfig": {
        "raiSettings": {
            "raiFilters": [
                {"filterType": "HATE_SPEECH", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "DANGEROUS", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "SEXUALLY_EXPLICIT", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "HARASSMENT", "confidenceLevel": "MEDIUM_AND_ABOVE"},
            ]
        },
        "sdpSettings": {
            "basicConfig": {"filterEnforcement": "ENABLED"}
        },
        "piAndJailbreakFilterSettings": {
            "filterEnforcement": "ENABLED",
            "confidenceLevel": "MEDIUM_AND_ABOVE",
        },
    },
    "templateMetadata": {
        "multiLanguageDetection": {}
    },
}

RESPONSE_TEMPLATE_BODY = {
    "filterConfig": {
        "raiSettings": {
            "raiFilters": [
                {"filterType": "HATE_SPEECH", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "DANGEROUS", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "SEXUALLY_EXPLICIT", "confidenceLevel": "MEDIUM_AND_ABOVE"},
                {"filterType": "HARASSMENT", "confidenceLevel": "MEDIUM_AND_ABOVE"},
            ]
        },
        "sdpSettings": {
            "basicConfig": {"filterEnforcement": "ENABLED"}
        },
    },
    "templateMetadata": {
        "multiLanguageDetection": {}
    },
}


In [None]:
def _model_armor_session() -> AuthorizedSession:
    credentials, _ = google.auth.default()
    credentials.refresh(Request())
    return AuthorizedSession(credentials)


def ensure_template_exists(template_id: str, payload: dict) -> str:
    session = _model_armor_session()
    name = f"{MODEL_ARMOR_PARENT}/templates/{template_id}"
    get_url = f"{MODEL_ARMOR_API_BASE}/{MODEL_ARMOR_PARENT}/templates/{template_id}"
    response = session.get(get_url, timeout=20)
    if response.status_code == 200:
        return response.json().get("name", name)
    if response.status_code not in {400, 404}:
        raise RuntimeError(f"Failed to fetch template {template_id}: {response.status_code} {response.text}")

    create_url = f"{MODEL_ARMOR_API_BASE}/{MODEL_ARMOR_PARENT}/templates?templateId={template_id}"
    operation = session.post(create_url, json=payload, timeout=30)
    operation.raise_for_status()
    op_name = operation.json().get("name")
    if not op_name:
        raise RuntimeError("Model Armor template creation did not return an operation name")

    status_url = f"{MODEL_ARMOR_API_BASE}/{op_name}"
    while True:
        status = session.get(status_url, timeout=20)
        status.raise_for_status()
        body = status.json()
        if body.get("done"):
            return body.get("response", {}).get("name", name)
        time.sleep(2)



In [None]:
def initialize_model_armor_templates() -> dict:
    prompt_template = ensure_template_exists(PROMPT_TEMPLATE_ID, PROMPT_TEMPLATE_BODY)
    response_template = ensure_template_exists(RESPONSE_TEMPLATE_ID, RESPONSE_TEMPLATE_BODY)
    return {
        "prompt_template": prompt_template,
        "response_template": response_template,
    }

# Example usage:
# model_armor_templates = initialize_model_armor_templates()
# model_armor_templates


> **Tip:** Run `initialize_model_armor_templates()` once per environment to create the prompt and response templates. Capture the returned resource names for your Cloud Run deployment.


> **Tip:** Run `initialize_model_armor_templates()` once per environment to create the prompt/response templates. Record the returned resource names for use in the backend deployment scripts.


In [None]:
def initialize_dialogflow_datastore() -> dict:
    """Create the data store if needed and import the latest documents."""
    data_store = ensure_data_store()
    import_response = import_gcs_documents()
    return {
        "data_store": data_store.name,
        "success_count": import_response.success_count,
        "error_count": import_response.error_count,
        "serving_config": serving_config_path(),
    }

# Example usage (long-running operation):
# datastore_summary = initialize_dialogflow_datastore()
# datastore_summary


In [None]:
# Trigger Dialogflow data store provisioning and ingestion (long-running)
# Uncomment the lines below to create/update the data store in your project.
# datastore_summary = initialize_dialogflow_datastore()
# datastore_summary


> **Note:** Dialogflow data stores are billed via Vertex AI Search. Run the import job against a development project if you only need to validate configuration before promoting to production.


## 3. Agent Orchestration & Backend Interface

The prototype agent uses a RAG pipeline with Vertex AI Gemini for grounded responses. The following cells configure retrievers, prompt templates, and FastAPI endpoints for the Cloud Run deployment.


In [None]:
from vertexai.generative_models import GenerativeModel, GenerationConfig

CHAT_MODEL_NAME = "gemini-2.5-flash-lite"

retrieval_generation_system_prompt = """You are the Alaska Department of Snow virtual assistant. Use only the provided documents to answer citizen questions about snow removal, permits, and department operations. If the answer is unavailable in the documents, acknowledge the limitation and escalate to human support."""

response_style_prompt = """Follow these guidelines:
- Keep answers under 150 words
- Reference relevant policies when available
- Suggest online self-service options before instructing to call the department
- Use inclusive, respectful language"""


In [None]:
def retrieve_context(query: str, serving_config: Optional[str] = None, top_k: int = 6) -> List[dict]:
    config_name = serving_config or serving_config_path()
    search_request = discoveryengine_v1.SearchRequest(
        serving_config=config_name,
        query=query,
        page_size=top_k,
    )
    documents = []
    for result in _search_service_client.search(request=search_request):
        document = result.document
        documents.append(
            {
                "id": document.id,
                "content": document.content,
                "source": document.content_uri,
            }
        )
    return documents


In [None]:
def build_prompt_with_context(question: str, documents: List[dict]) -> str:
    context_blocks = []
    for doc in documents:
        text = (doc.get("content") or "").strip()
        source = doc.get("source") or doc.get("id", "unknown")
        context_blocks.append(f"Source: {source}\n{text}")
    context = "\n\n".join(context_blocks)
    prompt = f"{retrieval_generation_system_prompt}\n\n{response_style_prompt}\n\nContext:\n{context}\n\nUser question: {question}\n\nAnswer:"
    return prompt


In [None]:
def generate_answer(question: str, serving_config: Optional[str] = None) -> dict:
    documents = retrieve_context(question=question, serving_config=serving_config)
    prompt = build_prompt_with_context(question, documents)
    model = GenerativeModel(CHAT_MODEL_NAME)
    response = model.generate_content(
        [prompt],
        generation_config=GenerationConfig(temperature=0.2, top_p=0.95, max_output_tokens=512),
    )
    return {
        "answer": response.text,
        "context": documents,
        "prompt": prompt,
    }

# Example usage (requires data store ingestion to be complete)
# agent_response = generate_answer("How do I request residential snow removal?", serving_config=serving_config_path())
# agent_response


## 4. Safety Controls, Validation & Evaluation

Safety guardrails enforce privacy policies, filter risky prompts, and validate model responses. This section introduces reusable filters, unit-test scaffolds, and an evaluation harness using the Google Generative AI Evaluation Service.


In [None]:
import logging
import tiktoken

SENSITIVE_PATTERNS = {
    "pii_email": re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", re.IGNORECASE),
    "pii_phone": re.compile(r"(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}"),
    "credit_card": re.compile(r"\b(?:\d[ -]*?){13,16}\b"),
}

ALLOWED_TOPICS = [
    "snow removal",
    "permits",
    "parking bans",
    "road conditions",
    "department contact",
]

enc = tiktoken.get_encoding("cl100k_base")


def count_tokens(text: str) -> int:
    return len(enc.encode(text))


class PromptSafetyError(Exception):
    """Raised when a prompt violates a safety rule."""


class SafetyRuleEngine:
    def __init__(self, allow_topics: List[str]):
        self.allow_topics = allow_topics

    def validate_prompt(self, prompt: str) -> None:
        if count_tokens(prompt) > 2048:
            raise PromptSafetyError("Prompt exceeds maximum token limit (2048)")
        lower_prompt = prompt.lower()
        if not any(topic in lower_prompt for topic in self.allow_topics):
            raise PromptSafetyError("Prompt topic not allowed for ADS assistant")
        for label, pattern in SENSITIVE_PATTERNS.items():
            if pattern.search(prompt):
                raise PromptSafetyError(f"Detected sensitive pattern: {label}")

    def validate_response(self, response: str, sources: List[str]) -> None:
        for label, pattern in SENSITIVE_PATTERNS.items():
            if pattern.search(response):
                raise PromptSafetyError(f"Response leaks sensitive data: {label}")
        if "call 911" in response.lower() and not any("emergency" in s.lower() for s in sources):
            raise PromptSafetyError("Response escalates emergency services without evidence")


safety_rules = SafetyRuleEngine(ALLOWED_TOPICS)


def safe_generate(question: str, serving_config: Optional[str] = None) -> dict:
    safety_rules.validate_prompt(question)
    candidate = generate_answer(question=question, serving_config=serving_config)
    safety_rules.validate_response(candidate["answer"], [doc.get("source", "") for doc in candidate["context"]])
    return candidate


In [None]:
UNIT_TESTS_CODE = """import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock

from services.main import app, safety_rules, PromptSafetyError

client = TestClient(app)


def test_prompt_rejection_for_pii():
    with pytest.raises(PromptSafetyError):
        safety_rules.validate_prompt("My email is ops@ads.gov, please update")


def test_chat_endpoint_success(monkeypatch):
    mock_endpoint = MagicMock()
    mock_endpoint.match.return_value = [[MagicMock(metadata={"text": "Snow removal policy", "source": "gs://demo"})]]
    monkeypatch.setattr("services.main.endpoint", mock_endpoint)

    mock_model = MagicMock()
    mock_model.generate_content.return_value = MagicMock(text="Cleared sidewalks within 24 hours")
    monkeypatch.setattr("services.main.model", mock_model)

    response = client.post(
        "/chat",
        json={"session_id": "test", "message": "How soon is residential snow cleared?"},
    )
    assert response.status_code == 200
    payload = response.json()
    assert "answer" in payload and "sources" in payload


def test_chat_endpoint_blocks_invalid_prompt(monkeypatch):
    response = client.post(
        "/chat",
        json={"session_id": "test", "message": "Here is my SSN 111-22-3333"},
    )
    assert response.status_code == 422
"""

print(UNIT_TESTS_CODE.splitlines()[:12])


In [None]:
from google.cloud import aiplatform_v1beta1

def run_google_evaluation(index_endpoint_name: str, test_cases: List[dict]) -> dict:
    """Submit evaluation set to the Generative AI Evaluation Service."""
    client = aiplatform_v1beta1.EvaluationServiceClient()
    dataset = aiplatform_v1beta1.Dataset(
        display_name="ads_chatbot_eval",
        annotations=[
            aiplatform_v1beta1.Dataset.Annotation(
                instruction=case["question"],
                reference=case["expected"],
            )
            for case in test_cases
        ],
    )
    model_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{CHAT_MODEL_NAME}"
    request = aiplatform_v1beta1.RunEvaluationRequest(
        target_model=model_name,
        dataset=dataset,
        evaluation_job_spec=aiplatform_v1beta1.EvaluationJobSpec(
            generative_metric_specs=[
                aiplatform_v1beta1.GenerativeMetricSpec(
                    metric_type="BLEU",
                ),
                aiplatform_v1beta1.GenerativeMetricSpec(metric_type="ROUGE_L"),
            ],
        ),
    )
    operation = client.run_evaluation(request=request)
    print("Evaluation job submitted:", operation.operation.name)
    return {"operation_name": operation.operation.name}


EVAL_TEST_CASES = [
    {"question": "What is the process to request residential snow removal?", "expected": "Explain the online request form and eligibility."},
    {"question": "Who do I call for emergency plowing?", "expected": "Direct to emergency hotline and note expected response time."},
]

# eval_job = run_google_evaluation("projects/.../locations/.../indexEndpoints/...", EVAL_TEST_CASES)
# eval_job


### Evaluation Workflow Steps

1. Generate or curate a set of gold-standard question/answer pairs covering top call drivers.
2. Run `run_google_evaluation` to score responses across BLEU/ROUGE (and optionally safety metrics).
3. Export evaluation metrics to BigQuery for trend monitoring.
4. Gate deployments if evaluation scores fall below the thresholds defined in the change management plan.


## 5. Deployment & Frontend Assets

This section captures reference assets for the Cloud Run backend and React chat interface wired to the ADS agent API.


### React Frontend Deployment Notes

- Host the static site on Firebase Hosting or Cloud Storage behind a CDN.
- Configure `VITE_API_URL` with the Cloud Run HTTPS endpoint.
- Enforce HTTPS and set CORS allowlist to production hostname.
- Add Google Analytics or internal telemetry hooks to measure deflected call volume.


## 6. Architecture & Operations Summary

The ADS assistant integrates Vertex AI services with secure logging, evaluation, and deployment workflows. Use the following checklist while preparing the executive demo:


- **Security:** Service-to-service authentication via Workload Identity Federation; Cloud Logging captures prompt/response metadata without PII.
- **Privacy:** Prompt and response validators enforce ADS policies before reaching the model and prior to returning to the user.
- **Accuracy:** RAG grounding documents are versioned; evaluation scores must exceed 0.5 BLEU / 0.6 ROUGE_L prior to release.
- **Reliability:** Cloud Run horizontal auto-scaling (min instances = 1, max = 10) and uptime checks configured in Cloud Monitoring.
- **Cost:** Vector Search auto-scaling configured for 1000 queries/min; nightly cleanup job removes stale sessions and cold data.


In [None]:
import matplotlib.pyplot as plt
import networkx as nx

flow_nodes = [
    "Resident Web Chat",
    "React Frontend",
    "Cloud Run API",
    "Prompt Filters",
    "Dialogflow Data Store",
    "Gemini Model",
    "Response Validators",
    "Cloud Logging",
]

flow_edges = [
    ("Resident Web Chat", "React Frontend"),
    ("React Frontend", "Cloud Run API"),
    ("Cloud Run API", "Prompt Filters"),
    ("Prompt Filters", "Dialogflow Data Store"),
    ("Dialogflow Data Store", "Gemini Model"),
    ("Gemini Model", "Response Validators"),
    ("Response Validators", "Cloud Run API"),
    ("Cloud Run API", "React Frontend"),
    ("Cloud Run API", "Cloud Logging"),
]

G = nx.DiGraph()
G.add_nodes_from(flow_nodes)
G.add_edges_from(flow_edges)

plt.figure(figsize=(12, 6))
pos = {
    "Resident Web Chat": (-1.0, 0.2),
    "React Frontend": (-0.4, 0.2),
    "Cloud Run API": (0.2, 0.2),
    "Prompt Filters": (0.6, 0.8),
    "Dialogflow Data Store": (1.0, 0.8),
    "Gemini Model": (1.4, 0.2),
    "Response Validators": (0.6, -0.4),
    "Cloud Logging": (0.6, -0.9),
}

nx.draw_networkx_nodes(G, pos, node_size=2200, node_color="#1f77b4", alpha=0.9)
nx.draw_networkx_labels(G, pos, font_size=9, font_color="white")
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle="-|>", arrowsize=15, width=2, edge_color="#555")
plt.axis("off")
plt.title("ADS Snow Agent Flow Diagram", fontsize=14)
plt.show()


### Executive Demo Checklist

- Start with architecture slide using the generated diagram.
- Run the ingestion notebook cells up to `initialize_dialogflow_datastore` in dry-run mode.
- Demo the Cloud Run chat endpoint with pre-vetted scenarios and show logs in real time.
- Present evaluation metrics and safety test results to address accuracy and risk.
- Close with cost projection, rollout phases, and governance milestones.
