# Intermediate - RAG with Vector Search and BigQuery

### References

https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/Document_QnA_using_gemini_and_vector_search.ipynb

### Install libraries

In [1]:
! pip install --upgrade pymupdf \
    google-cloud-aiplatform \
    google-cloud-bigquery \
    db-dtypes \
    langchain \
    langchain-community

Collecting pymupdf
  Downloading pymupdf-1.25.3-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (3.4 kB)
Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.83.0-py2.py3-none-any.whl.metadata (33 kB)
Collecting google-cloud-bigquery
  Downloading google_cloud_bigquery-3.30.0-py2.py3-none-any.whl.metadata (7.9 kB)
Collecting db-dtypes
  Downloading db_dtypes-1.4.2-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting langchain
  Downloading langchain-0.3.20-py3-none-any.whl.metadata (7.7 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.19-py3-none-any.whl.metadata (2.4 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1->google-cloud-aiplatform)
  Using cached google_api_core-2.24.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-auth<3.0.0dev,>=2.14.1 (

### Set variables and initialize Vertex AI

In [4]:
# Define project information
PROJECT_ID = "jo-cn-hackathon-qqal"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
VECTOR_SEARCH_REGION = "us-central1"
VECTOR_SEARCH_INDEX_NAME = f"{PROJECT_ID}-rregop-index"
VECTOR_SEARCH_INDEX_ENDPOINT_NAME = f"{PROJECT_ID}-rregop-index-endpoint"
VECTOR_SEARCH_EMBEDDING_BUCKET = f"{PROJECT_ID}-rregop-vector-search-bucket"
VECTOR_SEARCH_DIMENSIONS = 768
BQ_DATASET = "rregop_dataset"

### Set gcloud configs

In [None]:
! gcloud config set project {PROJECT_ID} --quiet
! gcloud config set run/region {LOCATION} --quiet

### Import libraries


In [None]:
from datetime import datetime
import json
# File system operations and displaying images
import os
import random

# Import utility functions for timing and file handling
import time

# Libraries for downloading files, data manipulation, and creating a user interface
import uuid

import fitz

from langchain.document_loaders import DataFrameLoader
from langchain.text_splitter import CharacterTextSplitter

# Initialize Vertex AI libraries for working with generative models
from google.cloud import aiplatform
import pandas as pd
from vertexai.generative_models import GenerativeModel, Image
from vertexai.language_models import TextEmbeddingModel
from google.cloud import bigquery
from google.cloud import storage

# Print Vertex AI SDK version
print(f"Vertex AI SDK version: {aiplatform.__version__}")

### Initializing Gemini 1.5 Pro and Text Embedding models

In [7]:
# Loading Gemini 1.5 Pro Model
multimodal_model = GenerativeModel("gemini-1.5-flash-002")

# Initializing embedding model
text_embedding_model = TextEmbeddingModel.from_pretrained("text-multilingual-embedding-002")

### Split PDF files to images

In [None]:
# Create an "Images" directory if it doesn't exist
data_path = "../data"

# Create an "Images" directory if it doesn't exist
images_path = f"{data_path}/images/"
if not os.path.exists(images_path):
    os.makedirs(images_path)
else:
    print(f"Images directory already exists: {images_path}")


In [6]:
# To get better resolution
zoom_x = 2.0  # horizontal zoom
zoom_y = 2.0  # vertical zoom
mat = fitz.Matrix(zoom_x, zoom_y)  # zoom factor 2 in each dimension

# Get list of PDF files in data folder
pdf_files = [f for f in os.listdir(data_path) if f.endswith('.pdf')]

# Process each PDF file
for pdf_file in pdf_files:
    doc = fitz.open(f"{data_path}/{pdf_file}")  # open document
    for page in doc:  # iterate through the pages
        pix = page.get_pixmap(matrix=mat)  # render page to an image
        outpath = f"{images_path}/{pdf_file}_{page.number}.jpg"
        pix.save(outpath)  # store image as a PNG
    doc.close()  # close the document after processing

### Extract data using Gemini Vision Pro

This module processes a set of images, extracting text and tabular data using a multimodal model (Gemini 1.5 Pro). It handles potential errors, stores the extracted information in a DataFrame, and saves the results to a CSV file.

In [None]:
# Define the path where images are located
image_names = os.listdir(images_path)
max_images = len(image_names)
print(f"Processing {max_images} images")

In [None]:
# Create empty lists to store image information
page_source = []
page_content = []
page_id = []

p_id = 0  # Initialize image ID counter
rest_count = 0  # Initialize counter for error handling

while p_id < max_images:
    try:
        # Construct the full path to the current image
        image_path = images_path + image_names[p_id]
        print(f"Processing image: {image_path}")

        # Load the image
        image = Image.load_from_file(image_path)

        # Generate prompts for text and table extraction
        prompt_text = "Extract all text content in the image"
        prompt_table = (
            "Detect table in this image. Extract content maintaining the structure"
        )

        # Extract text using your multimodal model
        contents = [image, prompt_text]
        response = multimodal_model.generate_content(contents)
        text_content = response.text

        # Extract table using your multimodal model
        contents = [image, prompt_table]
        response = multimodal_model.generate_content(contents)
        table_content = response.text

        # Log progress and store results
        print(f"processed image no: {p_id}")
        page_source.append(image_path)
        page_content.append(text_content + "\n" + table_content)
        page_id.append(p_id)
        p_id += 1

    except Exception as err:
        # Handle errors during processing
        print(err)
        print("Taking Some Rest")
        time.sleep(1)  # Pause execution for 1 second
        rest_count += 1
        if rest_count == 5:  # Limit consecutive error handling
            rest_count = 0
            print(f"Cannot process image no: {image_path}")
            p_id += 1  # Move to the next image

# Create a DataFrame to store extracted information
df = pd.DataFrame(
    {"page_id": page_id, "page_source": page_source, "page_content": page_content}
)
del page_id, page_source, page_content  # Conserve memory
df.head()  # Preview the DataFrame

### Generate Text Embeddings

Leverage a powerful language model textembedding-gecko to generate rich text embeddings that helps us find relevant information from a dataset.

In [10]:
def generate_text_embedding(text) -> list:
    """Text embedding with a Large Language Model."""
    embeddings = text_embedding_model.get_embeddings([text])
    vector = embeddings[0].values
    return vector

#### Create BigQuery dataset and GCS bucket

In [None]:
# Create BigQuery dataset if it doesn't exist
bq_client = bigquery.Client()
dataset_ref = bq_client.dataset(BQ_DATASET)

# Check if dataset exists by listing all datasets and checking if BQ_DATASET is in the list
datasets = list(bq_client.list_datasets())
dataset_exists = any(dataset.dataset_id == BQ_DATASET for dataset in datasets)

if not dataset_exists:
    # Construct a Dataset object to send to the API
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"  # Specify the location
    
    try:
        # API request to create dataset
        dataset = bq_client.create_dataset(dataset)
        print(f"Dataset {BQ_DATASET} created successfully")
    except Exception as e:
        print(f"Error creating dataset: {str(e)}")
        raise
else:
    print(f"Dataset {BQ_DATASET} already exists")


In [None]:
# Create GCS bucket if it doesn't exist
gcs_client = storage.Client()
bucket = gcs_client.bucket(VECTOR_SEARCH_EMBEDDING_BUCKET)
if not bucket.exists():
    bucket = gcs_client.create_bucket(
        VECTOR_SEARCH_EMBEDDING_BUCKET,
        location=VECTOR_SEARCH_REGION  # Specify us-central1 as the location
    )
    print(f"Bucket {VECTOR_SEARCH_EMBEDDING_BUCKET} created successfully")
else:
    print(f"Bucket {VECTOR_SEARCH_EMBEDDING_BUCKET} already exists")


#### Generate embeddings and store in BigQuery and GCS

In [None]:
execution_id = str(random.randint(100000, 999999))

# Create a DataFrameLoader to prepare data for LangChain
loader = DataFrameLoader(df, page_content_column="page_content")

# Load documents from the 'page_content' column of your DataFrame
documents = loader.load()

# Log the number of documents loaded
print(f"# of documents loaded (pre-chunking) = {len(documents)}")

# Create a text splitter to divide documents into smaller chunks
text_splitter = CharacterTextSplitter(
    chunk_size=10000,  # Target size of approximately 10000 characters per chunk
    chunk_overlap=200,  # overlap between chunks
)

# Split the loaded documents
doc_splits = text_splitter.split_documents(documents)

# Add a 'chunk' ID to each document split's metadata for tracking
for idx, split in enumerate(doc_splits):
    split.metadata["chunk"] = idx

# Log the number of documents after splitting
print(f"# of documents = {len(doc_splits)}")

texts = [doc.page_content for doc in doc_splits]
text_embeddings_list = []
id_list = []
page_source_list = []
for doc in doc_splits:
    id = uuid.uuid4()
    text_embeddings_list.append(generate_text_embedding(doc.page_content))
    id_list.append(str(id))
    page_source_list.append(doc.metadata["page_source"])
    time.sleep(1)  # So that we don't run into Quota Issue

bq_client = bigquery.Client()

# Prepare data for BigQuery and Vector Search
rows_to_insert = []
index_data = []

for doc, embedding in zip(doc_splits, text_embeddings_list):
    doc_id = str(uuid.uuid4())
    
    # Prepare BigQuery row
    rows_to_insert.append({
        "id": doc_id,
        "embedding": embedding,  # Already a list from generate_text_embedding()
        "page_source": doc.metadata["page_source"],
        "text": doc.page_content,
        "execution_id": execution_id
    })

    # Prepare data for Vector Search
    index_data.append({
        "id": doc_id,
        "embedding": embedding
    })

# Upload to BigQuery
table_id = f"{PROJECT_ID}.{BQ_DATASET}.document_embeddings"
table = bigquery.Table(table_id, schema=[
    bigquery.SchemaField("id", "STRING"),
    bigquery.SchemaField("embedding", "FLOAT64", mode="REPEATED"),
    bigquery.SchemaField("page_source", "STRING"),
    bigquery.SchemaField("text", "STRING"),
    bigquery.SchemaField("execution_id", "STRING")
])
table = bq_client.create_table(table, exists_ok=True)

errors = bq_client.insert_rows_json(table, rows_to_insert)
if errors:
    print("BigQuery Errors:", errors)

print("Data stored successfully in BigQuery")

# Convert index_data to JSON and store in GCS
embeddings_path = f"{data_path}/embeddings/"
if not os.path.exists(embeddings_path):
    os.makedirs(embeddings_path)

# save id and embedding as a json file
# Convert list of dicts to JSON string directly
jsonl_string = "\n".join(json.dumps(record) for record in index_data)
with open(f"{embeddings_path}/data.json", "w") as f:
    f.write(jsonl_string)

# show the first few lines of the json file
! head -n 3 "{embeddings_path}/data.json"

#send to GCS
! gsutil cp {embeddings_path}/data.json gs://{VECTOR_SEARCH_EMBEDDING_BUCKET}/embeddings-{execution_id}/data.json

print(f"Data stored successfully in GCS: gs://{VECTOR_SEARCH_EMBEDDING_BUCKET}/embeddings-{execution_id}")


### Create and deploy a Vector Search Index

The code configures and deploys a vector search index on Google Cloud, making it ready to store and search through embeddings.

Embedding size : The number of values used to represent a piece of text in vector form. Larger dimensions mean a denser and potentially more expressive representation.

Dimensions vs. Latency

Search: Higher-dimensional embeddings can make vector similarity searches slower, especially in large databases.
Computation: Calculations with larger vectors generally take more time during model training and inference.

#### Create an Index

Now it's ready to load the embeddings to Vector Search. Its APIs are available under the aiplatform package of the SDK.

Create an MatchingEngineIndex with its create_tree_ah_index function (Matching Engine is the previous name of Vector Search).

By calling the create_tree_ah_index function, it starts building an Index. This will take under a few minutes if the dataset is small, otherwise about 50 minutes or more depending on the size of the dataset. You can check status of the index creation on the [Vector Search Console > INDEXES tab](https://console.cloud.google.com/vertex-ai/matching-engine/indexes).

The parameters for creating index:
- `contents_delta_uri`: The URI of Cloud Storage directory where you stored the embedding JSON files  
- `dimensions`: Dimension size of each embedding. In this case, it is 768 as we are using the embeddings from the Text Embeddings API.
- `approximate_neighbors_count`: how many similar items we want to retrieve in typical cases
- `distance_measure_type`: what metrics to measure distance/similarity between embeddings. In this case it's DOT_PRODUCT_DISTANCE

See the document for more details on creating Index and the parameters.

In [None]:
# Check if an index with the same display name already exists
indexes = aiplatform.MatchingEngineIndex.list(
    filter=f"display_name={VECTOR_SEARCH_INDEX_NAME}"
)

BUCKET_URI = f"gs://{VECTOR_SEARCH_EMBEDDING_BUCKET}/embeddings-{execution_id}"

if indexes:
    # If an index with the same display name exists, update it using batch update
    my_index = indexes[0]
    my_index.update_embeddings(
        contents_delta_uri=BUCKET_URI,
    )
    print(f"Index with display name {VECTOR_SEARCH_INDEX_NAME} updated.")
else:
    # If no index with the same display name exists, create a new one
    my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name=f"{VECTOR_SEARCH_INDEX_NAME}",
        contents_delta_uri=BUCKET_URI,
        dimensions=768,
        approximate_neighbors_count=20,
        distance_measure_type="DOT_PRODUCT_DISTANCE",
    )
    print(f"Index with display name {VECTOR_SEARCH_INDEX_NAME} created.")

#### Create an Index Endpoint and deploy the Index to it


To use the Index, you need to create an `Index Endpoint`. It works as a server instance accepting query requests for your Index.


In [None]:
# Check if an index endpoint with the same display name already exists
index_endpoints = aiplatform.MatchingEngineIndexEndpoint.list(
    filter=f"display_name={VECTOR_SEARCH_INDEX_ENDPOINT_NAME}"
)

if index_endpoints:
    my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
        index_endpoint_name=index_endpoints[0].name
    )
    print(f"Index endpoint with display name {index_endpoints[0].display_name} already exists.")
else:
    # If no index endpoint with the same display name exists, create a new one
    my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
        display_name=f"{VECTOR_SEARCH_INDEX_ENDPOINT_NAME}",
        public_endpoint_enabled=True,
    )
    print(f"Index endpoint with display name {VECTOR_SEARCH_INDEX_ENDPOINT_NAME} created.")

This tutorial utilizes a `Public Endpoint` and does not support Virtual Private Cloud (VPC). Unless you have a specific requirement for VPC, we recommend using a Public Endpoint. Despite the term "public" in its name, it does not imply open access to the public internet. Rather, it functions like other endpoints in Vertex AI services, which are secured by default through IAM. Without explicit IAM permissions, as we have previously established, no one can access the endpoint.

With the Index Endpoint, deploy the Index by specifying an unique deployed index ID.

In [None]:
DEPLOYED_INDEX_ID = f"{VECTOR_SEARCH_INDEX_NAME}_{execution_id}"
DEPLOYED_INDEX_ID = DEPLOYED_INDEX_ID.replace(
    "-", "_"
)  # Can't have - in deployment name, only alphanumeric and _ allowed

# deploy the Index to the Index Endpoint
deployed_indexes = my_index_endpoint.deployed_indexes
if deployed_indexes:
    print(f"Index {DEPLOYED_INDEX_ID} is already deployed.")
else:
    my_index_endpoint.deploy_index(index=my_index, deployed_index_id=DEPLOYED_INDEX_ID)
    print(f"Index {DEPLOYED_INDEX_ID} has been deployed.")

If it is the first time to deploy an Index to an Index Endpoint, it will take around 25 minutes to automatically build and initiate the backend for it. After the first deployment, it will finish in seconds. To see the status of the index deployment, open the [Vector Search Console > INDEX ENDPOINTS tab](https://console.cloud.google.com/vertex-ai/matching-engine/index-endpoints) and click the Index Endpoint.

### Search!

In [16]:
def search_and_answer(question: str):
    # 1. Generate embedding for the question
    question_embedding = generate_text_embedding(question)

    print(f"Question embedding: {question_embedding}")

    # 2. Search in Vector Search
    response = my_index_endpoint.find_neighbors(
        deployed_index_id=DEPLOYED_INDEX_ID,
        queries=[question_embedding],
        num_neighbors=3
    )
    
    # 3. Get matching document IDs
    matched_ids = [neighbor.id for neighbor in response[0]]
    
    # 4. Query BigQuery to get the actual content
    query = f"""
    SELECT text
    FROM `{PROJECT_ID}.{BQ_DATASET}.document_embeddings`
    WHERE id IN UNNEST({matched_ids})
    """
    
    df_results = bq_client.query(query).to_dataframe()
    context = "\n".join(df_results['text'].tolist())
    
    # 5. Generate response with Gemini
    prompt = f"""Based on the following context, please answer the question. 
    If the answer cannot be found in the context, say "I cannot find information about this in the provided documents."
    
    Context:
    {context}
    
    Question: {question}
    """
    
    response = multimodal_model.generate_content(prompt)
    return response.text


In [18]:
# Example usage
question = "What is RREGOP?"
answer = search_and_answer(question)
print(f"Q: {question}\nA: {answer}")

Question embedding: [-0.006713328417390585, 0.01826227270066738, -0.017548376694321632, -0.008790664374828339, 0.04660380259156227, 0.020247919484972954, -0.010362439788877964, -0.006213292013853788, -0.005355918779969215, 0.008916757069528103, -0.027919145300984383, 0.030738044530153275, -0.02374914102256298, -0.02460528537631035, -0.05231049656867981, -0.0029622011352330446, 0.005142012145370245, -0.027832360938191414, 0.0053589059971272945, 0.02847324125468731, -0.017501456663012505, -0.02481982670724392, 0.025588447228074074, -0.02891330048441887, -0.016972579061985016, -0.0362677127122879, 0.011463724076747894, -0.05470849946141243, -0.01264912262558937, 0.04799213260412216, 0.04327404126524925, 0.021555010229349136, -0.05793343111872673, 0.006443563383072615, 0.026558129116892815, -0.02362137846648693, 0.03936624899506569, 0.03031410090625286, -0.004219745751470327, 0.07515201717615128, -0.03086763620376587, -0.025475244969129562, 0.04131973907351494, 0.01190003752708435, -0.0758