# Enterprise RAG Knowledge Engine

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ayoisio/genai-on-google-cloud/blob/main/chapter-2/colabs/06_enterprise_rag_knowledge_engine.ipynb)

**Estimated Time**: 45-60 minutes

**Prerequisites**: Google Cloud project with billing enabled, BigQuery, Cloud SQL, and Vertex AI APIs enabled

---

## Overview

This notebook demonstrates building a **production-ready Enterprise RAG Knowledge Engine** that combines:

### Part 1: BigQuery-Native RAG Pipeline
1. **Unstructured-to-Structured**: Use ML.GENERATE_TEXT for JSON extraction from documents
2. **Chunking & Embedding**: In-datawarehouse RAG preparation with text-embedding models
3. **Semantic Search**: ML.DISTANCE for efficient vector retrieval

### Part 2: Cloud SQL Vector Store & Agentic RAG
4. **pgvector Database**: High-performance operational vector store with Cloud SQL
5. **RAG Agent**: Intelligent agent with function calling using ADK
6. **Production Deployment**: Cloud Run deployment patterns

**Architecture**: BigQuery (analytics) → Cloud SQL (serving) → Gemini Agent (intelligence)

**Use Case**: Financial Services - Earnings call analysis, risk assessment, portfolio insights

### Learning Goals

By the end of this notebook, you will be able to:
- Build a BigQuery-native RAG pipeline using ML.GENERATE_TEXT and ML.GENERATE_EMBEDDING
- Set up Cloud SQL with pgvector as an operational vector store
- Create a RAG agent using Google's Agent Development Kit (ADK)
- Deploy your RAG agent to Cloud Run for production use

---

## PART 1: BIGQUERY-NATIVE RAG PIPELINE

---

## 0. Setup and Configuration

In [None]:
!pip install google-cloud-aiplatform google-cloud-bigquery --upgrade --quiet

### ⚠️ **Restart the Kernel**

After installing the libraries, you must restart the kernel for the changes to take effect.

**In Google Colab:** Go to **Runtime -> Restart session**.

In [None]:
import sys

# Google Colab auth
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()
    print("Colab authentication successful.")

In [None]:
# --- ⚠️ SET YOUR PROJECT DETAILS HERE ---
PROJECT_ID = "vertex-art-of-the-practical"  # @param {type:"string"}
REGION = "us-central1"         # @param {type:"string"}
BQ_DATASET = "finserve_rag_db_v3" # @param {type:"string"}
BQ_CONNECTION = f"projects/{PROJECT_ID}/locations/{REGION}/connections/bq_connection"
# --------------------------------------

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"BigQuery Dataset: {BQ_DATASET}")

In [None]:
# Enable necessary APIs
!gcloud services enable aiplatform.googleapis.com --project {PROJECT_ID}
!gcloud services enable bigquery.googleapis.com --project {PROJECT_ID}
!gcloud services enable sqladmin.googleapis.com --project {PROJECT_ID}
!gcloud services enable bigqueryconnection.googleapis.com --project {PROJECT_ID}

In [None]:
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from google.cloud import bigquery
import json
import subprocess
import re

# Initialize clients
vertexai.init(project=PROJECT_ID, location=REGION)
bq_client = bigquery.Client(project=PROJECT_ID)

print("Vertex AI and BigQuery clients initialized.")

## Step 1: Data & Model Setup

In [None]:
# Create the BigQuery dataset
dataset = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET}")
dataset.location = REGION
try:
    bq_client.create_dataset(dataset, timeout=30)
    print(f"Created BigQuery dataset: {BQ_DATASET}")
except Exception as e:
    if "Already Exists" in str(e):
        print(f"BigQuery dataset {BQ_DATASET} already exists.")
    else:
        print(f"Error creating dataset: {e}")

In [None]:
# Create BQ Connection
print("Creating BQ Connection...")
!bq mk --connection \
    --connection_type=CLOUD_RESOURCE \
    --project_id={PROJECT_ID} \
    --location={REGION} \
    --quiet bq_connection

try:
    bq_command = [
        "bq", "show",
        "--project_id", PROJECT_ID,
        "--format=json",
        f"--connection", f"{REGION}.bq_connection"
    ]
    result = subprocess.run(bq_command, capture_output=True, text=True, check=True)
    connection_details = json.loads(result.stdout)
    SERVICE_ACCOUNT_ID = connection_details["cloudResource"]["serviceAccountId"]

    print(f"Service Account ID: {SERVICE_ACCOUNT_ID}")

    !gcloud projects add-iam-policy-binding {PROJECT_ID} \
        --member="serviceAccount:{SERVICE_ACCOUNT_ID}" \
        --role="roles/aiplatform.user"

    print(f"✅ Granted Vertex AI User role")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Create and populate raw data
sql_create_transcripts_table = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.raw_earnings_call_transcripts` (
  transcript_id STRING,
  transcript_text STRING
);

INSERT INTO `{PROJECT_ID}.{BQ_DATASET}.raw_earnings_call_transcripts` (transcript_id, transcript_text)
VALUES
  ('txn_001', "AlphaCorp (Ticker: APL) Q3 Earnings. CEO mentioned significant headwinds from supply chain disruption. Primary risk reported: 'semiconductor shortages'. Outlook: Cautious guidance for Q4, but optimistic on new product cycle."),
  ('txn_002', "BetaTech (Ticker: BTA) Q3. Strong growth in cloud division. CEO stated: 'Our AI services are seeing unprecedented adoption.' Risk: 'Intense market competition'. Outlook: Strong, raising full-year guidance."),
  ('txn_003', "GammaFin (Ticker: GFI) Q3. Missed on revenue. Primary risk: 'Interest rate volatility'. Outlook: Neutral. CEO mentioned 'cost-cutting measures' and 'focus on core assets'."),
  ('txn_004', "AlphaCorp (Ticker: APL) Q4 Update. Supply chain issues are easing. CEO: 'We are navigating the semiconductor shortage better than expected.' New risk: 'inflationary pressure on consumers'. Outlook: Stable.");
"""

print("Creating raw data table...")
bq_client.query(sql_create_transcripts_table).result()
print("✅ Raw data loaded")

## Step 2: Unstructured-to-Structured (ML.GENERATE_TEXT)

In [None]:
# Create text generation model
sql_create_llm_model = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BQ_DATASET}.gemini_flash_model`
  REMOTE WITH CONNECTION `{BQ_CONNECTION}`
  OPTIONS (endpoint = 'gemini-2.0-flash-exp');
"""

print("Creating BQML model...")
bq_client.query(sql_create_llm_model).result()
print("✅ Model created")

In [None]:
# Extract structured JSON from text
sql_transform = """
CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.llm_raw_output` AS
SELECT
  transcript_id,
  ml_generate_text_result AS raw_model_response
FROM
  ML.GENERATE_TEXT(
    MODEL `{PROJECT_ID}.{BQ_DATASET}.gemini_flash_model`,
    (
      SELECT
        transcript_id,
        CONCAT(
          '''
          Extract structured data into valid JSON:
          {{
            "company_ticker": "string",
            "primary_risk": "string",
            "ceo_outlook": "string"
          }}

          Text:
          ''',
          transcript_text
        ) AS prompt
      FROM
        `{PROJECT_ID}.{BQ_DATASET}.raw_earnings_call_transcripts`
    ),
    STRUCT(
      0.2 AS temperature,
      1024 AS max_output_tokens
    )
  );
""".format(PROJECT_ID=PROJECT_ID, BQ_DATASET=BQ_DATASET)

print("Extracting structured data...")
bq_client.query(sql_transform).result()
print("✅ Extraction complete")

In [None]:
# Parse JSON and create final table
sql_parse_and_clean = """
CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.structured_earnings_summary` AS
WITH
  CleanedData AS (
    SELECT
      transcript_id,
      SAFE.PARSE_JSON(
        REGEXP_EXTRACT(
          JSON_VALUE(raw_model_response, '$.candidates[0].content.parts[0].text'),
          r'\{{[\s\S]*\}}'
        )
      ) AS report_data
    FROM
      `{PROJECT_ID}.{BQ_DATASET}.llm_raw_output`
  )
SELECT
  transcript_id,
  JSON_VALUE(report_data, '$.company_ticker') AS company_ticker,
  JSON_VALUE(report_data, '$.primary_risk') AS primary_risk,
  JSON_VALUE(report_data, '$.ceo_outlook') AS ceo_outlook
FROM
  CleanedData
WHERE
  report_data IS NOT NULL;
""".format(PROJECT_ID=PROJECT_ID, BQ_DATASET=BQ_DATASET)

bq_client.query(sql_parse_and_clean).result()
print("✅ Structured table created")

# View results
bq_client.query(f"SELECT * FROM `{PROJECT_ID}.{BQ_DATASET}.structured_earnings_summary`").to_dataframe()

## Step 3: Build RAG Knowledge Base

In [None]:
# Chunk documents by sentence
sql_create_chunks = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.earnings_call_chunks` AS
WITH
  NumberedTranscripts AS (
    SELECT
      ROW_NUMBER() OVER () AS doc_id,
      transcript_id,
      transcript_text
    FROM
      `{PROJECT_ID}.{BQ_DATASET}.raw_earnings_call_transcripts`
  )
SELECT
  doc_id,
  transcript_id,
  CONCAT(CAST(doc_id AS STRING), '-', CAST(ROW_NUMBER() OVER (PARTITION BY doc_id) AS STRING)) as chunk_id,
  TRIM(chunk) AS chunk_text
FROM
  NumberedTranscripts,
  UNNEST(SPLIT(transcript_text, '.')) AS chunk
WHERE
  LENGTH(TRIM(chunk)) > 15;
"""

print("Chunking documents...")
bq_client.query(sql_create_chunks).result()
print("✅ Chunking complete")

In [None]:
# Create embedding model
sql_create_embedding_model = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BQ_DATASET}.embedding_model`
  REMOTE WITH CONNECTION `{BQ_CONNECTION}`
  OPTIONS (endpoint = 'text-embedding-005');
"""

print("Creating embedding model...")
bq_client.query(sql_create_embedding_model).result()
print("✅ Model created")

In [None]:
# Generate embeddings
sql_embed_logs = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.earnings_call_embeddings_chunked` AS
SELECT
  chunk_id,
  chunk_text,
  ml_generate_embedding_result AS embedding
FROM
  ML.GENERATE_EMBEDDING(
    MODEL `{PROJECT_ID}.{BQ_DATASET}.embedding_model`,
    (
      SELECT
        chunk_id,
        chunk_text AS content,
        chunk_text AS chunk_text
      FROM
        `{PROJECT_ID}.{BQ_DATASET}.earnings_call_chunks`
    ),
    STRUCT(TRUE AS flatten_json_output)
  );
"""

print("Generating embeddings...")
bq_client.query(sql_embed_logs).result()
print("✅ Embeddings created")

## Step 4: RAG in Action (BigQuery-based)

In [None]:
# Simple RAG query
USER_QUERY = "What are the main risks related to supply chain?"

sql_retrieve_context = f"""
WITH UserQuery AS (
  SELECT ml_generate_embedding_result AS query_embedding
  FROM ML.GENERATE_EMBEDDING(
    MODEL `{PROJECT_ID}.{BQ_DATASET}.embedding_model`,
    (SELECT '{USER_QUERY}' AS content),
    STRUCT(TRUE AS flatten_json_output)
  )
)
SELECT
  base.chunk_text,
  ML.DISTANCE(
    base.embedding,
    query.query_embedding,
    'COSINE'
  ) AS distance
FROM
  `{PROJECT_ID}.{BQ_DATASET}.earnings_call_embeddings_chunked` AS base,
  UserQuery AS query
ORDER BY
  distance ASC
LIMIT 3
"""

context_df = bq_client.query(sql_retrieve_context).to_dataframe()
print("--- Retrieved Context ---")
print(context_df)

# Generate answer
context_string = "\n---\n".join(context_df['chunk_text'])
prompt = f"""
Answer based only on this context:

{context_string}

Question: {USER_QUERY}
Answer:
"""

model = GenerativeModel("gemini-2.0-flash-exp")
response = model.generate_content(prompt)
print("\n--- Answer ---")
print(response.text)

---

## PART 2: CLOUD SQL VECTOR STORE & AGENTIC RAG

Now we'll build a production-grade system with:
- Cloud SQL + pgvector for low-latency queries
- Intelligent RAG agent with function calling
- Scalable deployment patterns

---

## Step 5: Cloud SQL Vector Store Setup

In [None]:
# === STEP 5a: PROVISION CLOUD SQL INSTANCE ===
# This cell creates the Cloud SQL instance correctly from scratch.
# It takes 15-20 minutes. Please be patient!

INSTANCE_NAME = "finserve-knowledge-engine-2" # You can change this name if it conflicts
DB_NAME = "financial_knowledge"
DB_USER = "postgres"
DB_PASSWORD = "FinServe2024!" # CHANGE IN PRODUCTION
REGION = "us-central1"

print(f"--- INFRASTRUCTURE SETUP ---")
print(f"Instance: {INSTANCE_NAME}")
print(f"Region: {REGION}")

# 1. Delete old instance if it exists (optional, uncomment if needed to retry)
# print(f"Deleting old instance '{INSTANCE_NAME}' (if it exists)...")
# !gcloud sql instances delete {INSTANCE_NAME} --project={PROJECT_ID} --quiet

# 2. Create the instance with the CRITICAL flag for Vertex AI integration
print(f"\nCreating Cloud SQL instance (this takes 15-20 mins)...")
!gcloud sql instances create {INSTANCE_NAME} \
    --database-version=POSTGRES_15 \
    --cpu=1 \
    --memory=4GiB \
    --region={REGION} \
    --project={PROJECT_ID} \
    --enable-google-ml-integration \
    --quiet

print(f"Instance created. Creating database '{DB_NAME}'...")
!gcloud sql databases create {DB_NAME} --instance={INSTANCE_NAME} --project={PROJECT_ID} --quiet

print(f"Database created. Setting password for user '{DB_USER}'...")
!gcloud sql users set-password {DB_USER} --instance={INSTANCE_NAME} --password={DB_PASSWORD} --project={PROJECT_ID} --quiet

print("\n✅ Cloud SQL infrastructure successfully provisioned with ML integration!")

In [None]:
# Install Cloud SQL connector
!pip install cloud-sql-python-connector[pg8000] sqlalchemy --upgrade --quiet

In [None]:
# === FIX: Enable Outbound Connectivity for Vertex AI ===
# The 'google_ml_integration' extension needs to make outbound calls to Vertex AI.
# We must enable this explicitly on the Cloud SQL instance.

print(f"Enabling outbound public IP for instance '{INSTANCE_NAME}'...")

!gcloud sql instances patch {INSTANCE_NAME} \
    --project={PROJECT_ID} \
    --assign-ip \
    --quiet

print("✅ Outbound connectivity enabled. Waiting 2 minutes for changes to propagate...")
import time
time.sleep(120) # Give it time to restart/update internally
print("Ready to connect.")

roles/cloudsql.client

In [None]:
from google.cloud.sql.connector import Connector
import sqlalchemy
import pg8000

connector = Connector()

def get_db_connection():
    conn = connector.connect(
        f"{PROJECT_ID}:{REGION}:{INSTANCE_NAME}",
        "pg8000",
        user=DB_USER,
        password=DB_PASSWORD,
        db=DB_NAME
    )
    return conn

engine = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=get_db_connection,
)

print("✅ Cloud SQL connection established")

In [None]:
# Enable extensions
with engine.connect() as conn:
    conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
    conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS google_ml_integration CASCADE;"))
    conn.commit()
    print("✅ pgvector enabled")

In [None]:
# Create knowledge table
create_table_sql = """
CREATE TABLE IF NOT EXISTS earnings_knowledge (
    id SERIAL PRIMARY KEY,
    chunk_id TEXT,
    company_ticker TEXT,
    document_type TEXT,
    chunk_content TEXT,
    embedding VECTOR(768)
);
"""

with engine.connect() as conn:
    conn.execute(sqlalchemy.text(create_table_sql))
    conn.commit()
    print("✅ Table created")

In [None]:
# Load data from BigQuery
bq_data_query = f"""
SELECT
    chunk_id,
    chunk_text,
    embedding
FROM `{PROJECT_ID}.{BQ_DATASET}.earnings_call_embeddings_chunked`
"""

df_embeddings = bq_client.query(bq_data_query).to_dataframe()
print(f"Fetched {len(df_embeddings)} chunks")

In [None]:
# === GRANT PERMISSIONS TO CLOUD SQL SERVICE ACCOUNT ===
# The Cloud SQL instance has its own service account. IT needs permission to call Vertex AI.

print(f"Fetching service account for Cloud SQL instance '{INSTANCE_NAME}'...")
sql_sa = !gcloud sql instances describe {INSTANCE_NAME} --project={PROJECT_ID} --format="value(serviceAccountEmailAddress)"
SQL_SERVICE_ACCOUNT = sql_sa[0]
print(f"Cloud SQL Service Account: {SQL_SERVICE_ACCOUNT}")

print("Granting 'Vertex AI User' role to Cloud SQL Service Account...")
!gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SQL_SERVICE_ACCOUNT}" \
    --role="roles/aiplatform.user" \
    --quiet

print("Granting 'Vertex AI Service Agent' role (belt and suspenders)...")
!gcloud projects add-iam-policy-binding {PROJECT_ID} \
    --member="serviceAccount:{SQL_SERVICE_ACCOUNT}" \
    --role="roles/aiplatform.serviceAgent" \
    --quiet

print("\n✅ Permissions granted. Waiting 60 seconds for propagation...")
import time
time.sleep(60)

In [None]:
# Insert with Cloud SQL embedding function
insert_count = 0
with engine.connect() as conn:
    for _, row in df_embeddings.iterrows():
        ticker = "UNKNOWN"
        if "Alpha" in row['chunk_text']:
            ticker = "APL"
        elif "Beta" in row['chunk_text']:
            ticker = "BTA"
        elif "Gamma" in row['chunk_text']:
            ticker = "GFI"

        insert_sql = sqlalchemy.text("""
            INSERT INTO earnings_knowledge (chunk_id, company_ticker, document_type, chunk_content, embedding)
            VALUES (:chunk_id, :ticker, 'EARNINGS_CALL', :content,
                    (embedding('text-embedding-005', :content))::vector)
        """)

        conn.execute(insert_sql, {
            'chunk_id': row['chunk_id'],
            'ticker': ticker,
            'content': row['chunk_text']
        })
        insert_count += 1

    conn.commit()

print(f"✅ Inserted {insert_count} chunks")

In [None]:
# Create HNSW index
create_index_sql = """
CREATE INDEX IF NOT EXISTS earnings_knowledge_embedding_idx
ON earnings_knowledge
USING hnsw (embedding vector_cosine_ops);
"""

with engine.connect() as conn:
    conn.execute(sqlalchemy.text(create_index_sql))
    conn.commit()
    print("✅ HNSW index created for fast search")

## Step 6: Build RAG Agent with ADK (Agent Development Kit)

We'll use Google's Agent Development Kit to create a production-ready RAG agent with proper tool use.

In [None]:
# Install ADK
!pip install google-genai --upgrade --quiet
!pip install google-adk --upgrade --quiet

In [None]:
import os
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'TRUE'
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = REGION

from google import genai
from google.genai.types import EmbedContentConfig

# Initialize Genai client for embeddings
client = genai.Client(vertexai=True, project=PROJECT_ID, location=REGION)

def knowledge_lookup(company_query: str) -> str:
    """
    Search the financial knowledge base for information about companies.

    Args:
        company_query: The company name, ticker, or financial topic to search for

    Returns:
        Relevant information from earnings calls and financial documents
    """
    print(f"\n🔍 TOOL CALLED: knowledge_lookup('{company_query}')")

    # Step 1: Convert query to embedding using Gemini
    result = client.models.embed_content(
        model="text-embedding-005",
        contents=company_query,
        config=EmbedContentConfig(
            task_type="RETRIEVAL_DOCUMENT",
            output_dimensionality=768,
        )
    )

    query_embedding = result.embeddings[0].values

    # Step 2: Perform cosine similarity search in pgvector
    search_sql = """
        SELECT
            company_ticker,
            chunk_content
        FROM earnings_knowledge
        ORDER BY embedding <=> %s
        LIMIT 3
    """

    with engine.connect() as conn:
        with conn.connection.cursor() as cursor:
            cursor.execute(search_sql, ([query_embedding],))
            results = cursor.fetchall()

    # Step 3: Format results for the agent
    if not results:
        return "No relevant information found in the knowledge base."

    formatted_output = "\n\n".join([
        f"[{row[0]}] {row[1]}" for row in results
    ])

    print(f"✅ Retrieved {len(results)} relevant chunks\n")
    return formatted_output

print("✅ knowledge_lookup tool defined")

### Step 6b: Create the ADK Agent

Using ADK's LlmAgent with the knowledge_lookup tool.

### Step 6a: Create the Knowledge Lookup Tool

This tool will query our pgvector database using semantic search.

In [None]:
from google.adk.agents import LlmAgent

# Create the Financial Analyst RAG Agent using ADK
financial_analyst_agent = LlmAgent(
    model="gemini-2.5-flash",
    name="financial_analyst",
    description="A financial analyst that answers questions about companies, risks, and earnings using a knowledge base",
    instruction="""
        You are a Senior Financial Analyst with expertise in earnings analysis and risk assessment.

        **Your Process:**
        1. When asked about companies, risks, or financial topics, ALWAYS use the `knowledge_lookup` tool first
        2. Base your analysis ONLY on the information retrieved from the knowledge base
        3. If the knowledge base doesn't have relevant information, clearly state that
        4. Provide concise, actionable insights with specific citations (company ticker)
        5. Structure your responses professionally with clear bullet points when appropriate

        **Analysis Guidelines:**
        - Highlight key risks and opportunities
        - Compare across companies when relevant
        - Focus on material information (supply chain, market conditions, CEO guidance)
        - Use financial terminology appropriately

        **Output Format:**
        Provide clear, evidence-based analysis citing the specific company sources.
    """,
    tools=[knowledge_lookup],
)

print("✅ Financial Analyst Agent initialized with ADK")
print(f"   Model: gemini-2.5-flash")
print(f"   Tools: [knowledge_lookup]")
print(f"   Status: Ready for queries")

In [None]:
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, Session
from google.genai import types
import uuid

# Setup ADK session and runner
APP_NAME = "financial_rag_app"
USER_ID = "analyst_user"

# Create session service
session_service = InMemorySessionService()

# Create runner with the agent
runner = Runner(
    agent=financial_analyst_agent,
    app_name=APP_NAME,
    session_service=session_service
)

# Create a persistent session ID for this conversation
session_id = str(uuid.uuid4())

def query_agent(question: str) -> str:
    """
    Send a query to the financial analyst agent using ADK Runner.
    """
    print(f"\n{'='*80}")
    print(f"USER QUESTION: {question}")
    print(f"{'='*80}\n")

    try:
        # Ensure session exists before each query
        try:
            session_service.get_session(
                app_name=APP_NAME,
                user_id=USER_ID,
                session_id=session_id
            )
        except:
            # Create session if it doesn't exist
            session_service.create_session(
                app_name=APP_NAME,
                user_id=USER_ID,
                session_id=session_id
            )

        # Create user message content
        content = types.Content(
            role='user',
            parts=[types.Part(text=question)]
        )

        # Run the agent with proper session management
        events = runner.run(
            user_id=USER_ID,
            session_id=session_id,
            new_message=content
        )

        # Process events and extract final response
        response_text = ""
        for event in events:
            # Debug: show tool calls
            if hasattr(event, 'tool_call') and event.tool_call:
                print(f"🔧 Tool called: {event.tool_call.name}")

            # Extract final response
            if event.is_final_response() and event.content:
                for part in event.content.parts:
                    if hasattr(part, 'text') and part.text:
                        response_text = part.text.strip()

    except Exception as e:
        import traceback
        response_text = f"ERROR: Could not interact with agent.\n{str(e)}\n\nTraceback:\n{traceback.format_exc()}"

    print(f"\n{'='*80}")
    print(f"AGENT RESPONSE:")
    print(f"{'='*80}")
    print(response_text)
    print(f"{'='*80}\n")

    return response_text

print("✅ Helper function ready (using ADK Runner)")
print(f"   App: {APP_NAME}")
print(f"   User: {USER_ID}")
print(f"   Session ID: {session_id}")

### Step 6c: Helper Function for Agent Interaction

### Test the Agent

In [None]:
# Test 1: Supply chain risks
query_agent("What are the main supply chain challenges?")

In [None]:
# Test 2: Company outlook
query_agent("What is BetaTech's outlook?")

In [None]:
# Test 3: Risk comparison
query_agent("Compare risks across all companies")

In [None]:
# Create agent.py for deployment
agent_code = """
import os
from google import genai
from google.genai.adk import LlmAgent
from google.genai.types import EmbedContentConfig
from cloud_sql_python_connector import Connector
import sqlalchemy
import pg8000

# Environment variables
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
REGION = os.getenv('GOOGLE_CLOUD_LOCATION', 'us-central1')
INSTANCE_NAME = os.getenv('CLOUD_SQL_INSTANCE')
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DB_NAME', 'financial_knowledge')

os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'TRUE'

# Initialize connections
connector = Connector()
client = genai.Client(vertexai=True, project=PROJECT_ID, location=REGION)

def get_db_connection():
    conn = connector.connect(
        INSTANCE_NAME,
        "pg8000",
        user=DB_USER,
        password=DB_PASSWORD,
        db=DB_NAME
    )
    return conn

engine = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=get_db_connection,
)

def knowledge_lookup(company_query: str) -> str:
    result = client.models.embed_content(
        model="text-embedding-005",
        contents=company_query,
        config=EmbedContentConfig(
            task_type="RETRIEVAL_DOCUMENT",
            output_dimensionality=768,
        )
    )

    query_embedding = result.embeddings[0].values

    search_sql = "SELECT company_ticker, chunk_content FROM earnings_knowledge ORDER BY embedding <=> %s LIMIT 3"

    with engine.connect() as conn:
        with conn.connection.cursor() as cursor:
            cursor.execute(search_sql, ([query_embedding],))
            results = cursor.fetchall()

    if not results:
        return "No relevant information found."

    return "\\n\\n".join([f"[{row[0]}] {row[1]}" for row in results])

# Create the agent
financial_analyst = LlmAgent(
    model="gemini-2.0-flash-exp",
    name="financial_analyst",
    instruction='''
        You are a Senior Financial Analyst. Use knowledge_lookup to answer questions
        about companies, risks, and earnings. Base answers ONLY on retrieved data.
    ''',
    tools=[knowledge_lookup],
)
"""

with open('agent.py', 'w') as f:
    f.write(agent_code)

print("✅ agent.py created")
print("\\nTo run locally:")
print("  adk run financial_analyst")
print("\\nAgent will be available at: http://localhost:8080")

In [None]:
# Dataflow pipeline for continuous embedding
dataflow_pipeline = """
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import aiplatform
import sqlalchemy
from cloud_sql_python_connector import Connector

class EmbedAndStore(beam.DoFn):
    def __init__(self, project, region, instance):
        self.project = project
        self.region = region
        self.instance = instance

    def setup(self):
        aiplatform.init(project=self.project, location=self.region)
        connector = Connector()

        def get_conn():
            return connector.connect(
                self.instance,
                "pg8000",
                user="postgres",
                password=os.getenv("DB_PASSWORD"),
                db="financial_knowledge"
            )

        self.engine = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=get_conn
        )

    def process(self, element):
        # Generate embedding
        from vertexai.language_models import TextEmbeddingModel
        model = TextEmbeddingModel.from_pretrained("text-embedding-005")
        embedding = model.get_embeddings([element['text']])[0].values

        # Insert to Cloud SQL
        with self.engine.connect() as conn:
            conn.execute(
                sqlalchemy.text(
                    "INSERT INTO earnings_knowledge (chunk_id, company_ticker, chunk_content, embedding) "
                    "VALUES (:id, :ticker, :content, :embed)"
                ),
                {
                    'id': element['id'],
                    'ticker': element['ticker'],
                    'content': element['text'],
                    'embed': embedding
                }
            )

        yield element

def run_pipeline():
    options = PipelineOptions(
        runner='DataflowRunner',
        project='YOUR_PROJECT_ID',
        region='us-central1',
        temp_location='gs://YOUR_BUCKET/temp'
    )

    with beam.Pipeline(options=options) as p:
        (p
         | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
             query='SELECT chunk_id, chunk_text, company FROM earnings_raw WHERE processed = FALSE'
         )
         | 'Embed and Store' >> beam.ParDo(EmbedAndStore(
             project='YOUR_PROJECT_ID',
             region='us-central1',
             instance='YOUR_INSTANCE'
         ))
        )

if __name__ == '__main__':
    run_pipeline()
"""

with open('vectorization_pipeline.py', 'w') as f:
    f.write(dataflow_pipeline)

print("✅ Dataflow pipeline created")
print("\\nTo run:")
print("  python vectorization_pipeline.py")

### Optional: Continuous Vectorization Pipeline with Dataflow

Keep the knowledge base updated as new earnings calls are added.

In [None]:
# Deploy to Cloud Run
deploy_script = f"""
#!/bin/bash
set -e

# Variables
SERVICE_NAME="financial-analyst-agent"
REGION="{REGION}"
PROJECT_ID="{PROJECT_ID}"
CLOUD_SQL_INSTANCE="{CLOUD_SQL_INSTANCE_NAME}"

echo "Building container..."
gcloud builds submit --tag gcr.io/$$PROJECT_ID/$$SERVICE_NAME

echo "Deploying to Cloud Run..."
gcloud run deploy $$SERVICE_NAME \\
  --image gcr.io/$$PROJECT_ID/$$SERVICE_NAME \\
  --platform managed \\
  --region $$REGION \\
  --allow-unauthenticated \\
  --set-env-vars GOOGLE_GENAI_USE_VERTEXAI=TRUE \\
  --set-env-vars GOOGLE_CLOUD_PROJECT=$$PROJECT_ID \\
  --set-env-vars GOOGLE_CLOUD_LOCATION=$$REGION \\
  --set-env-vars CLOUD_SQL_INSTANCE=$$CLOUD_SQL_INSTANCE \\
  --set-env-vars DB_USER={DB_USER} \\
  --set-env-vars DB_NAME={DB_NAME} \\
  --set-env-vars A2A_HOST=0.0.0.0 \\
  --set-env-vars A2A_PORT=8080 \\
  --set-secrets DB_PASSWORD=db-password:latest \\
  --add-cloudsql-instances $$CLOUD_SQL_INSTANCE \\
  --memory 2Gi \\
  --timeout 300 \\
  --max-instances 10

echo "Deployment complete!"
gcloud run services describe $$SERVICE_NAME --region $$REGION --format 'value(status.url)'
"""

with open('deploy.sh', 'w') as f:
    f.write(deploy_script)

print("✅ deploy.sh created")
print("\\nTo deploy:")
print("  chmod +x deploy.sh")
print("  ./deploy.sh")

In [None]:
# Create Dockerfile for ADK agent
dockerfile_content = """
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
RUN pip install --no-cache-dir \\
    google-genai-sdk \\
    cloud-sql-python-connector[pg8000] \\
    sqlalchemy

# Copy agent code
COPY agent.py .

# ADK environment variables
ENV A2A_HOST=0.0.0.0
ENV A2A_PORT=8080

# Run the agent
CMD ["python", "-m", "google.genai.adk.cli", "run", "financial_analyst", "--host", "0.0.0.0", "--port", "8080"]
"""

with open('Dockerfile', 'w') as f:
    f.write(dockerfile_content)

print("✅ Dockerfile created")

## Step 7: Production Deployment to Cloud Run

Deploy the ADK agent to Cloud Run for scalable, serverless hosting.

### Alternative: Run with ADK CLI

For production deployment, you can create an `agent.py` file and run with `adk run`.

## Summary

You've built a complete enterprise RAG knowledge engine with:

### ✅ Data Pipeline
- BigQuery for analytics and batch processing
- ML.GENERATE_TEXT for structured extraction
- Chunking & embedding at scale

### ✅ Vector Store
- Cloud SQL + pgvector for low-latency queries
- HNSW indexing for sub-second search
- Hybrid storage (metadata + vectors)

### ✅ Intelligent Agent
- Function calling for tool use
- RAG pattern (Retrieve → Augment → Generate)
- Grounded, cited responses

### 🚀 Production Ready
- Deploy to Cloud Run
- Dataflow for continuous ingestion
- Scalable to millions of documents