1. Infrastructure as Code (Terraform - infra/main.tf) - This Terraform script sets up the core GCP services. A real production setup would be split into modules.

In [None]:
terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 4.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Cloud Storage Bucket for raw documents
resource "google_storage_bucket" "documents_bucket" {
  name          = "${var.project_id}-documents"
  location      = var.region
  force_destroy = false

  versioning {
    enabled = true
  }

  uniform_bucket_level_access = true

  encryption {
    default_kms_key_name = google_kms_crypto_key.documents_key.id
  }
}

# Pub/Sub Topic for new file notifications
resource "google_pubsub_topic" "ingestion_topic" {
  name = "document-ingestion-topic"
}

# Trigger Pub/Sub message on new file upload
resource "google_storage_notification" "documents_notification" {
  bucket         = google_storage_bucket.documents_bucket.name
  payload_format = "JSON_API_V1"
  topic          = google_pubsub_topic.ingestion_topic.id
  event_types    = ["OBJECT_FINALIZE"]
  depends_on     = [google_pubsub_topic_iam_binding.binding]
}

# BigQuery Dataset for processed text and metadata
resource "google_bigquery_dataset" "rag_metadata" {
  dataset_id    = "rag_metadata"
  friendly_name = "RAG Processing Metadata"
  description   = "Stores cleaned text chunks and processing logs for the RAG pipeline."
  location      = var.region

  encryption_key {
    kms_key_name = google_kms_crypto_key.bq_key.id
  }
}

# Vertex AI Feature Store for Embeddings (Fully managed index is created via SDK, but we define the VPC here)
resource "google_vertex_ai_index" "policy_embeddings_index" {
  region       = var.region
  display_name = "policy-documents-index"
  description  = "Index for semantic search over policy document chunks"
  index_update_method = "BATCH_UPDATE"

  metadata {
    contents_delta_uri = "gs://${google_storage_bucket.documents_bucket.name}/embeddings/"
    config {
      dimensions = 768
      algorithm_config {
        tree_ah_config {
          leaf_node_embedding_count = 1000
        }
      }
    }
  }
}

# KMS Keys for Encryption
resource "google_kms_key_ring" "key_ring" {
  name     = "rag-pipeline-keyring"
  location = "global"
}

resource "google_kms_crypto_key" "documents_key" {
  name            = "documents-bucket-key"
  key_ring        = google_kms_key_ring.key_ring.id
  rotation_period = "7776000s" # 90 days

  version_template {
    algorithm = "GOOGLE_SYMMETRIC_ENCRYPTION"
  }
}

resource "google_kms_crypto_key" "bq_key" {
  name            = "bigquery-key"
  key_ring        = google_kms_key_ring.key_ring.id
  rotation_period = "7776000s"
  version_template {
    algorithm = "GOOGLE_SYMMETRIC_ENCRYPTION"
  }
}

# ... (Additional IAM bindings, VPC-SC perimeters, etc. would be defined here)

2. Ingestion & Processing Pipeline (Apache Beam - ingestion/beam_pipeline.py)
This pipeline is triggered by a Pub/Sub message from the storage bucket.

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.api_core.exceptions import GoogleAPIError
from google.cloud import documentai, storage, bigquery
import json
import re

# Define the BigQuery table schema for the cleaned chunks
BQ_SCHEMA = {
    "fields": [
        {"name": "doc_id", "type": "STRING", "mode": "REQUIRED"},
        {"name": "chunk_id", "type": "STRING", "mode": "REQUIRED"},
        {"name": "content", "type": "STRING", "mode": "NULLABLE"},
        {"name": "page_number", "type": "INTEGER", "mode": "NULLABLE"},
        {"name": "section_type", "type": "STRING", "mode": "NULLABLE"},
        {"name": "policy_type", "type": "STRING", "mode": "NULLABLE"},
        {"name": "jurisdiction", "type": "STRING", "mode": "NULLABLE"},
        {"name": "effective_date", "type": "DATE", "mode": "NULLABLE"},
        {"name": "processing_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"},
    ]
}

class ProcessDocument(beam.DoFn):
    """A DoFn to process a document from GCS using Document AI."""
    def __init__(self, processor_name):
        self.processor_name = processor_name

    def setup(self):
        self.docai_client = documentai.DocumentProcessorServiceClient()
        self.storage_client = storage.Client()

    def process(self, message):
        data = json.loads(message)
        bucket_name = data['bucket']
        file_path = data['name']

        try:
            # 1. Read file from GCS
            bucket = self.storage_client.bucket(bucket_name)
            blob = bucket.blob(file_path)
            file_content = blob.download_as_bytes()

            # 2. Process with Document AI
            request = documentai.ProcessRequest(
                name=self.processor_name,
                raw_document=documentai.RawDocument(
                    content=file_content, mime_type=blob.content_type
                ),
            )
            result = self.docai_client.process_document(request=request)
            full_text = result.document.text

            # 3. Custom Preprocessing (simplified example)
            metadata = self._extract_metadata(result.document, file_path)
            chunks = self._chunk_text(full_text)

            # 4. Yield chunks for BigQuery and Embedding generation
            for i, chunk_text in enumerate(chunks):
                chunk_id = f"{file_path}#chunk{i}"
                row = {
                    "doc_id": file_path,
                    "chunk_id": chunk_id,
                    "content": chunk_text,
                    "page_number": 1,  # Simplified, would be parsed from entities
                    "section_type": metadata.get('section', 'body'),
                    "policy_type": metadata.get('policy_type', 'unknown'),
                    "jurisdiction": metadata.get('jurisdiction', 'global'),
                    "effective_date": metadata.get('effective_date'),
                    "processing_timestamp": beam.window.TimestampedValue(i, beam.DoFn.TimestampParam),
                }
                yield row

        except GoogleAPIError as e:
            # Log error to Dead Letter Queue (DLQ) or Cloud Logging
            error_row = {"error": str(e), "file_path": file_path, "message": message}
            yield beam.pvalue.TaggedOutput('errors', error_row)

    def _extract_metadata(self, document, file_path):
        """Extract metadata from Document AI entities and filename."""
        metadata = {}
        for entity in document.entities:
            if entity.type_ == 'policy_type':
                metadata['policy_type'] = entity.mention_text
            elif entity.type_ == 'effective_date':
                metadata['effective_date'] = entity.mention_text # Would normalize to date
            # ... extract other entities
        return metadata

    def _chunk_text(self, text, chunk_size=512, overlap=50):
        """Split text into overlapping chunks."""
        # Use a more sophisticated chunking strategy (e.g., semantic, markdown-aware)
        words = text.split()
        chunks = []
        for i in range(0, len(words), chunk_size - overlap):
            chunk = ' '.join(words[i:i + chunk_size])
            chunks.append(chunk)
        return chunks

def run():
    options = PipelineOptions(
        streaming=True,
        save_main_session=True,
        region='us-central1',
        project=project_id,
        temp_location=f'gs://{bucket_name}/temp'
    )

    processor_name = "projects/my-project/locations/us-central1/processors/abc123" # Your Document AI Processor

    with beam.Pipeline(options=options) as p:
        # Read from Pub/Sub
        messages = (p | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(
            topic=f'projects/{project_id}/topics/document-ingestion-topic')
        )

        processed_data = (messages
                          | "Process Document" >> beam.ParDo(
                                ProcessDocument(processor_name=processor_name)
                            ).with_outputs('errors', main='main')
                          )

        # Write clean chunks to BigQuery
        _ = (processed_data.main
             | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                 table=f'{project_id}:rag_metadata.document_chunks',
                 schema=BQ_SCHEMA,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
             )

        # Write errors to a DLQ in Cloud Storage or BigQuery
        _ = (processed_data.errors
             | "Write Errors to DLQ" >> beam.io.WriteToText(
                 file_path_prefix=f'gs://{bucket_name}/dlq/errors',
                 file_name_suffix='.json')
             )

if __name__ == '__main__':
    run()

embedding/embedding.py
This script is responsible for:

Reading the cleaned text chunks from BigQuery.

Generating embeddings for each chunk using the Vertex AI Text Embedding API.

Upserting the embeddings and their metadata into the Vertex AI Vector Search index.

In [None]:
#!/usr/bin/env python3
"""
embedding.py

A production-level script to generate embeddings for cleaned document chunks
stored in BigQuery and upsert them into a Vertex AI Vector Search index.

This script is designed to be run as a batch job (e.g., via Cloud Scheduler,
triggered by Pub/Sub after a Dataflow job completes, or as a Cloud Function).
"""

import logging
import time
from typing import List, Dict, Any
from google.cloud import bigquery, aiplatform, storage
from google.cloud.aiplatform import vector_search
from vertexai.language_models import TextEmbeddingModel
import backoff
import pandas as pd
import numpy as np

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration (would be set via environment variables or a config file in production)
PROJECT_ID = "your-project-id"
REGION = "us-central1"
BQ_DATASET = "rag_metadata"
BQ_TABLE = "document_chunks"
BATCH_SIZE = 100  # Adjust based on API quotas and performance
INDEX_ENDPOINT_ID = "your-index-endpoint-id"  # e.g., "123456789"
DEPLOYED_INDEX_ID = "policy_documents_index"  # The ID you gave your index on deployment

class EmbeddingGenerator:
    """Orchestrates the generation and upsert of embeddings to Vector Search."""

    def __init__(self):
        """Initialize clients for BigQuery, Vertex AI, and Cloud Storage."""
        self.bq_client = bigquery.Client(project=PROJECT_ID)
        self.embedding_model = TextEmbeddingModel.from_pretrained("textembedding-gecko@003")
        # Initialize the AI Platform SDK for the Matching Engine
        aiplatform.init(project=PROJECT_ID, location=REGION)
        # Initialize the Vector Search client
        self.vector_search_client = vector_search.VectorSearchServiceClient()
        self.index_endpoint = self._get_index_endpoint(INDEX_ENDPOINT_ID)

    def _get_index_endpoint(self, index_endpoint_id: str):
        """Fetches the index endpoint resource."""
        endpoint_name = self.vector_search_client.index_endpoint_path(
            project=PROJECT_ID, location=REGION, index_endpoint=index_endpoint_id
        )
        return self.vector_search_client.get_index_endpoint(name=endpoint_name)

    @backoff.on_exception(backoff.expo, Exception, max_tries=5)
    def _generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """
        Generates embeddings for a batch of text chunks using Vertex AI.
        Uses exponential backoff for retrying on API errors.
        """
        try:
            embeddings = self.embedding_model.get_embeddings(texts)
            return [e.values for e in embeddings]
        except Exception as e:
            logger.error(f"Failed to generate embeddings for batch: {e}")
            raise

    def _prepare_upsert_data(
        self,
        df_batch: pd.DataFrame,
        embeddings: List[List[float]]
    ) -> List[vector_search.IndexDatapoint]:
        """
        Prepares the data for upsert into the Vector Search index.
        Maps each chunk's data and embedding to an IndexDatapoint object.
        """
        datapoints = []
        for _, row in df_batch.iterrows():
            # The datapoint_id must be unique and stable for each chunk.
            # Using the chunk_id from BigQuery is perfect.
            datapoint_id = row["chunk_id"]

            # Create the embedding array
            embedding = vector_search.types.IndexDatapoint(
                datapoint_id=datapoint_id,
                feature_vector=embeddings.pop(0), # Get the first embedding for this row
                restricts=[
                    # Add metadata for filtering during query.
                    # This is a key performance feature.
                    vector_search.types.IndexDatapoint.Restriction(
                        namespace="policy_type",
                        allow_list=[row["policy_type"]] if pd.notna(row["policy_type"]) else [],
                    ),
                    vector_search.types.IndexDatapoint.Restriction(
                        namespace="jurisdiction",
                        allow_list=[row["jurisdiction"]] if pd.notna(row["jurisdiction"]) else [],
                    ),
                    vector_search.types.IndexDatapoint.Restriction(
                        namespace="section_type",
                        allow_list=[row["section_type"]] if pd.notna(row["section_type"]) else [],
                    ),
                ],
                # Optional: Store sparse embedding for hybrid search
                # sparse_embedding=vector_search.types.IndexDatapoint.SparseEmbedding(...)
            )
            datapoints.append(datapoint)
        return datapoints

    def _upsert_datapoints_batch(self, datapoints: List[vector_search.types.IndexDatapoint]):
        """Upserts a batch of datapoints to the Vector Search index."""
        try:
            # The full resource name of the deployed index
            deployed_index_name = self.index_endpoint.deployed_indexes[DEPLOYED_INDEX_ID].id

            upsert_request = vector_search.types.UpsertDatapointsRequest(
                index_endpoint=self.index_endpoint.name,
                deployed_index_id=deployed_index_name,
                datapoints=datapoints,
            )

            response = self.vector_search_client.upsert_datapoints(upsert_request)
            # Log if any datapoints failed to upsert
            if response.failed_datapoints:
                for failed in response.failed_datapoints:
                    logger.warning(f"Failed to upsert datapoint {failed.datapoint_id}: {failed.error_message}")
            else:
                logger.info(f"Successfully upserted batch of {len(datapoints)} datapoints.")

        except Exception as e:
            logger.error(f"Failed to upsert batch to index: {e}")
            # In production, you might push failed batches to a retry queue (e.g., Pub/Sub)

    def process_new_chunks(self, last_processed_timestamp: str = None):
        """
        Main method to process chunks from BigQuery.
        If a timestamp is provided, only processes chunks newer than that time.
        Otherwise, processes all chunks (use with caution!).
        """
        logger.info("Starting embedding generation process.")

        # Build the BigQuery query
        query = f"""
            SELECT
                doc_id,
                chunk_id,
                content,
                policy_type,
                jurisdiction,
                section_type
            FROM `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}`
            WHERE content IS NOT NULL
            AND LENGTH(content) > 10  -- Filter out very short/empty chunks
        """
        if last_processed_timestamp:
            query += f" AND processing_timestamp > '{last_processed_timestamp}'"

        query += " ORDER BY processing_timestamp ASC" # Process in order

        # Use the BigQuery storage API for efficient large data reads
        df_iterator = self.bq_client.list_rows(
            self.bq_client.get_table(f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"),
            selected_fields=[...] # Specify fields to read
        ).to_dataframe_iterable()

        # Alternatively, use a query job for more complex filtering
        query_job = self.bq_client.query(query)
        total_rows = query_job.result().total_rows
        logger.info(f"Found {total_rows} chunks to process.")

        processed_count = 0
        for df_batch in df_iterator:
            # df_batch is a chunk of the result set
            if df_batch.empty:
                continue

            texts = df_batch["content"].tolist()

            # 1. Generate Embeddings
            logger.info(f"Generating embeddings for batch of {len(texts)} chunks...")
            embeddings = self._generate_embeddings_batch(texts)

            # 2. Prepare Data for Upsert
            logger.info("Preparing datapoints for upsert...")
            datapoints = self._prepare_upsert_data(df_batch, embeddings)

            # 3. Upsert to Vector Index
            logger.info("Upserting batch to Vector Search index...")
            self._upsert_datapoints_batch(datapoints)

            processed_count += len(texts)
            logger.info(f"Processed {processed_count}/{total_rows} chunks.")

            # Be kind to the API quotas
            time.sleep(1)

        logger.info(f"Embedding generation complete. Total chunks processed: {processed_count}")

        # In production, you would now update a state tracker (e.g., a BigQuery table or Firestore document)
        # with the latest `processing_timestamp` to use as `last_processed_timestamp` in the next run.

def main(event, context):
    """Entry point for Cloud Function or Cloud Run Job."""
    # The event could be a Pub/Sub message from the Dataflow job completion topic
    # or a Cloud Scheduler trigger.

    # Example: Get the timestamp of the last successful run from a config store
    # last_run_time = get_last_run_time()

    # For a full refresh, set last_processed_timestamp to None
    generator = EmbeddingGenerator()
    generator.process_new_chunks(last_processed_timestamp=None)

if __name__ == "__main__":
    # For local testing or containerized execution
    main(None, None)

. RAG Query API (FastAPI on Cloud Run - api/main.py & api/query_service.py)
api/main.py

In [None]:
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from google.cloud import bigquery
from google.auth import jwt
import vertexai
from vertexai.language_models import TextGenerationModel

from . import models
from .query_service import QueryService

app = FastAPI(title="Policy RAG API", version="1.0.0")
security = HTTPBearer()

# Initialize clients (would be configured via environment variables)
vertexai.init(project="your-project-id", location="us-central1")
llm_model = TextGenerationModel.from_pretrained("gemini-1.5-pro")
query_service = QueryService()

def validate_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Validate JWT token using Google's public keys."""
    try:
        # In production, use a library like `google-auth` to verify the token
        # and check audience, issuer, and required scopes.
        decoded_token = jwt.decode(credentials.credentials, verify=False)
        return decoded_token.get('email')
    except Exception as e:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")

@app.post("/query", response_model=models.QueryResponse)
async def query_policies(
    request: models.QueryRequest,
    user_email: str = Depends(validate_token)
):
    """Main endpoint for natural language queries."""
    try:
        result = await query_service.execute_rag_query(
            query=request.query,
            user_id=user_email,
            filter_dict=request.filters
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# ... Additional endpoints for feedback, analytics, etc.

api/query_service.py

In [None]:
from google.cloud import aiplatform, bigquery
from vertexai.language_models import TextGenerationModel
import logging
from .models import QueryResponse, RetrievedChunk

logger = logging.getLogger(__name__)

class QueryService:
    def __init__(self):
        self.vector_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
            index_endpoint_name="projects/my-project/locations/us-central1/indexEndpoints/123"
        )
        self.llm_model = TextGenerationModel.from_pretrained("gemini-1.5-pro")
        self.bq_client = bigquery.Client()

    async def execute_rag_query(self, query: str, user_id: str, filter_dict: dict = None):
        # 1. Generate Embedding for the Query
        query_embedding = await self._generate_embedding(query)

        # 2. Retrieve Relevant Chunks from Vector Store
        retrieved_chunks = self._retrieve_chunks(query_embedding, filter_dict, top_k=5)
        if not retrieved_chunks:
            return QueryResponse(answer="No relevant policy documents found.", chunks=[])

        # 3. Construct Context for LLM
        context = "\n\n---\n\n".join([chunk.content for chunk in retrieved_chunks])

        # 4. Generate Answer with Gemini, using a carefully engineered prompt
        prompt = self._build_prompt(context, query)
        answer = self._generate_answer(prompt)

        # 5. Log the query for analytics and improvement
        self._log_query(user_id, query, retrieved_chunks, answer)

        return QueryResponse(answer=answer, chunks=retrieved_chunks)

    def _retrieve_chunks(self, query_embedding, filters, top_k=5):
        """Query Vertex AI Vector Search."""
        try:
            # The filter is a SQL-like string for metadata
            filter_string = None
            if filters:
                # e.g., "policy_type = 'claims' AND jurisdiction = 'NY'"
                filter_string = " AND ".join([f"{k} = '{v}'" for k, v in filters.items()])

            response = self.vector_index_endpoint.find_neighbors(
                deployed_index_id="policy_index",
                queries=[query_embedding],
                num_neighbors=top_k,
                filter=filter_string
            )

            retrieved_chunks = []
            for neighbor in response[0]:
                # Assuming metadata is stored in the index
                chunk = RetrievedChunk(
                    content=neighbor.datapoint.datapoint_id, # or stored text
                    document_id=neighbor.datapoint.restricts[0].namespace, # example
                    confidence_score=neighbor.distance,
                    # ... other metadata
                )
                retrieved_chunks.append(chunk)
            return retrieved_chunks

        except Exception as e:
            logger.error(f"Error retrieving from vector index: {e}")
            return []

    def _build_prompt(self, context, query):
        """Build a structured prompt to guide the LLM and minimize hallucination."""
        return f"""
        You are a helpful and precise assistant for a large insurance company.
        Always answer based ONLY on the provided context from the company's policy documents.
        If the answer is not contained in the context, say "I cannot find a specific policy regarding this."

        Context from Policy Documents:
        {context}

        User Question: {query}

        Instructions:
        1. Provide a concise and accurate answer.
        2. Be specific. If possible, mention the policy name or number.
        3. Do not make up any information not present in the context.
        4. If the context is about a specific jurisdiction, note it in your answer.
        Answer:
        """

    def _generate_answer(self, prompt):
        """Generate an answer using Vertex AI's Gemini model."""
        try:
            response = self.llm_model.predict(
                prompt,
                temperature=0.1, # Low temperature for factual accuracy
                max_output_tokens=1024,
                top_k=40,
                top_p=0.8
            )
            return response.text
        except Exception as e:
            logger.error(f"Error generating answer: {e}")
            return "An error occurred while generating an answer."

    def _log_query(self, user_id, query, chunks, answer):
        """Log the query and results to BigQuery for analytics."""
        # ... Implementation to insert into a BigQuery table
        pass

    async def _generate_embedding(self, text):
        """Generate an embedding for the given text using Vertex AI."""
        # ... Implementation using textembedding-gecko model
        pass

4. Deployment & Orchestration
A simple deployment script (scripts/deploy.sh) would look like this:

In [None]:
#!/bin/bash

# Deploy Infrastructure
cd infra/
terraform init
terraform apply -auto-approve -var="project_id=$PROJECT_ID"

# Deploy Dataflow Pipeline
cd ../ingestion/
python -m beam_pipeline --runner DataflowRunner \
    --project $PROJECT_ID \
    --region us-central1 \
    --temp_location gs://$BUCKET_NAME/temp \
    --setup_file ./setup.py

# Build and Deploy Cloud Run Service
cd ../api/
gcloud builds submit --tag gcr.io/$PROJECT_ID/rag-api
gcloud run deploy rag-api \
    --image gcr.io/$PROJECT_ID/rag-api \
    --platform managed \
    --region us-central1 \
    --service-account rag-service-account@$PROJECT_ID.iam.gserviceaccount.com \
    --set-env-vars PROJECT_ID=$PROJECT_ID \
    --vpc-connector my-vpc-connector \
    --ingress internal-and-cloud-load-balancing