In [1]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build with RAG Engine in Vertex AI

| | |
|-|-|
| Author(s) | [Laxmi Harikumar](https://github.com/laxmih-genai) |

## Overview

**Retrieval Augmented Generation (RAG)** improves large language models by allowing them to access and process external information sources during generation. This ensures the model's responses are grounded in factual data and avoids hallucinations.

RAG Engine API



For more information, refer to the public documentation for [Vertex AI RAG Engine](https://cloud.google.com/vertex-ai/generative-ai/docs/rag-overview).

This notebook aims at providing a hands on tutorial for RAG Engine API with the following steps.

Part 1: Use default managed Vector Db
- Create a RAG corpus by specifying an embedding model and vector database
- Upload a local PDF file to the corpus
- Import a scanned PDF (This requires creating Document AI Layout Parser)
- Set up a retrieval tool
- Use your RAG retrieval tool to add context to  Gemini's responses to user queries

Part 2: Use Vertex AI Vector Search
- Set up Vertex AI Vector SearchImport Alphabet PDFs from gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs
- Set up a retrieval tool
- Use your RAG retrieval tool to add context to  Gemini's responses to user queries

- Clean up
  Delete the corpus


## Get started

### Install Vertex AI SDK and other required packages


In [2]:
%pip install --upgrade --user --quiet google-cloud-aiplatform google-cloud-documentai google-cloud-discoveryengine

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [3]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

If you're running this notebook on Google Colab, run the cell below to authenticate your environment.

In [1]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information and initialize Vertex AI SDK

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [2]:
# Use the environment variable if the user doesn't provide Project ID.
import os

import vertexai

PROJECT_ID = "demos-vertex"  # @param {type:"string", isTemplate: true}
if PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

vertexai.init(project=PROJECT_ID, location=LOCATION)

### Import libraries

In [30]:
from IPython.display import Markdown
from vertexai.preview import rag
from vertexai.preview.generative_models import GenerativeModel, Tool

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import FailedPrecondition
from google.cloud import documentai
from google.api_core.exceptions import AlreadyExists

from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

## PART 1

Create a RAG Corpus by uploading a local file and a scanned document from GCS Bucket. Set up a retrieval tool and get responses to queries

### Helper functions

In [13]:
def list_files_in_corpus(rag_corpus_name):
  # List the files
  files = rag.list_files(corpus_name=rag_corpus_name)
  for file in files:
      print(file.display_name)
      print(file.name)

### Create a RAG Corpus

Configure the Embedding model

In [4]:
EMBEDDING_MODEL = "text-embedding-004"  # @param {type:"string", isTemplate: true}
embedding_model_config = rag.EmbeddingModelConfig(publisher_model=f"""publishers/google/models/{EMBEDDING_MODEL}""")

In [5]:
embedding_model_config

EmbeddingModelConfig(publisher_model='publishers/google/models/text-embedding-004', endpoint=None, model=None, model_version_id=None)

In [8]:
CORPUS_DISPLAY_NAME = "rag-corpus-for-demo"

In [9]:
rag_corpus = rag.create_corpus(
    display_name=CORPUS_DISPLAY_NAME,
    embedding_model_config=embedding_model_config
)

### Check the corpus just created

In [10]:
rag.list_corpora()

ListRagCorporaPager<rag_corpora {
  name: "projects/demos-vertex/locations/us-central1/ragCorpora/1873497444986126336"
  display_name: "rag-corpus-for-demo"
  create_time {
    seconds: 1734413222
    nanos: 714518000
  }
  update_time {
    seconds: 1734413222
    nanos: 714518000
  }
  rag_embedding_model_config {
    vertex_prediction_endpoint {
      endpoint: "projects/demos-vertex/locations/us-central1/publishers/google/models/text-embedding-004"
    }
  }
  rag_vector_db_config {
    rag_managed_db {
    }
    rag_embedding_model_config {
      vertex_prediction_endpoint {
        endpoint: "projects/demos-vertex/locations/us-central1/publishers/google/models/text-embedding-004"
      }
    }
  }
  corpus_status {
    state: ACTIVE
  }
  vector_db_config {
    rag_managed_db {
    }
    rag_embedding_model_config {
      vertex_prediction_endpoint {
        endpoint: "projects/demos-vertex/locations/us-central1/publishers/google/models/text-embedding-004"
      }
    }
  }
}
>

### Upload a local file to the corpus

In [11]:
rag_file = rag.upload_file(
    corpus_name=rag_corpus.name,
    path="/content/contents/veo-imagen-blog.pdf",
    display_name="veo-imagen-blog.pdf",
    description="Veo and Imagen3 announcement",
)

In [14]:
# Check if file is in corpus
list_files_in_corpus(rag_corpus.name)

veo-imagen-blog.pdf
projects/756696270058/locations/us-central1/ragCorpora/1873497444986126336/ragFiles/5325940723548139623


### Enable the DocumentAI API

[RAG Engine supports DocumentAI's layout parser](https://cloud.google.com/vertex-ai/generative-ai/docs/layout-parser-integration) to extracts content elements (text, tables, lists) from documents for better information retrieval.

To use the layour parser Document AI API has to be enabled.



In [None]:
!gcloud config set project demos-vertex
!gcloud services enable documentai.googleapis.com discoveryengine.googleapis.com

Updated property [core/project].
Operation "operations/acat.p2-756696270058-731ca9d5-923c-42f9-883c-8af4461f0246" finished successfully.


### Create a Document AI Layout Parser

In [16]:
def create_parser_processor(
    project_id: str, location: str, processor_display_name: str
) -> documentai.Processor:
    client = documentai.DocumentProcessorServiceClient(client_options=client_options)

    # The full resource name of the location
    # e.g.: projects/project_id/locations/location
    parent = client.common_location_path(project_id, location)

    # Create a processor
    return client.create_processor(
        parent=parent,
        processor=documentai.Processor(
            display_name=processor_display_name, type_="LAYOUT_PARSER_PROCESSOR"
        ),
    )

In [17]:
# See https://cloud.google.com/document-ai/docs/regions for all options.
parser_location = "us"

# Must be unique per project
parser_display_name = "rag-engine-demo-processor"

# Set the `api_endpoint`
client_options = ClientOptions(api_endpoint=f"{parser_location}-documentai.googleapis.com")


try:
    processor = create_parser_processor(PROJECT_ID, parser_location, parser_display_name)
    print(f"Created Processor {processor.name}")
except AlreadyExists as e:
    print(
        f"Processor already exits, change the processor name and rerun this code. {e.message}"
    )


Created Processor projects/756696270058/locations/us/processors/7c5f97532d9e1f3


In [19]:
layout_parser_processor_name = processor.name

### Import a scanned document

In [20]:
INPUT_GCS_BUCKET = "gs://rag-agent-demo/"

response = rag.import_files(
    corpus_name=rag_corpus.name,
    paths=[INPUT_GCS_BUCKET],
    chunk_size=512,  # Optional
    chunk_overlap=100,  # Optional
    max_embedding_requests_per_min=900,  # Optional

    layout_parser=rag.LayoutParserConfig(
        processor_name=layout_parser_processor_name,
        max_parsing_requests_per_min=120,  # Optional
    )
)
print(f"Imported {response.imported_rag_files_count} files.")

Imported 1 files.


In [22]:
# List the files in the corpus
list_files_in_corpus(rag_corpus.name)

veo-imagen-blog.pdf
projects/756696270058/locations/us-central1/ragCorpora/1873497444986126336/ragFiles/5325940723548139623
contract_1.pdf
projects/756696270058/locations/us-central1/ragCorpora/1873497444986126336/ragFiles/5325946455892971992


### Create RAG Retrieval Tool

In [23]:
# Create a tool for the RAG Corpus
rag_retrieval_tool = Tool.from_retrieval(
    retrieval=rag.Retrieval(
        source=rag.VertexRagStore(
            rag_corpora=[rag_corpus.name],
            similarity_top_k=2,
            vector_distance_threshold=0.5,
        ),
    )
)

### Generate Content with Gemini using Rag Retrieval Tool

In [25]:
# Load tool into Gemini model
rag_gemini_model = GenerativeModel(
    "gemini-1.5-flash-002",
    tools=[rag_retrieval_tool]
)

Question from the uploaded file

In [26]:
response = rag_gemini_model.generate_content("What is Google's video generation model?")

display(Markdown(response.text))

Google's most advanced video generation model is Veo.  It generates high-quality, high-definition videos from text or image prompts, offering a range of cinematic and visual styles.  Veo is available on Vertex AI in private preview.


Question from the scanned PDF

In [27]:
response = rag_gemini_model.generate_content("What is the price per unit of the office supplies")
display(Markdown(response.text))

The price per unit of the office supplies is $15.00.


### Clean up - Delete the corpus

In [29]:
rag.delete_corpus(rag_corpus.name)

Successfully deleted the RagCorpus.


# Part 2: Using Vertex AI RAG Engine with Vertex AI Search


Part 2 of this tutorial explores using [Vertex AI RAG Engine](https://cloud.google.com/vertex-ai/generative-ai/docs/rag-overview) with [Vertex AI Search](https://cloud.google.com/enterprise-search) as a retrieval backend. Vertex AI Search's ability to handle large datasets, provide low-latency retrieval, and improve scalability makes it a powerful tool for enhancing RAG applications.

- To start using Vertex AI Search, [enable the Discovery Engine API](https://console.cloud.google.com/apis/enableflow?apiid=discoveryengine.googleapis.com&inv=1&invt=AbkVrw&project=demos-vertex).
- To setup your Vertex AI Search, a set of helper functions are defined
  - create a Datastore
  - import documents into it
  - create a search engine

Skip the helper functions section and set up Vertex AI Search section if you already have a Vertex AI Search engine ready to use.

### Helper Functions

In [33]:
## Create the Datastore
def create_data_store(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        content_config=discoveryengine.DataStore.ContentConfig.CONTENT_REQUIRED,
    )

    operation = client.create_data_store(
        request=discoveryengine.CreateDataStoreRequest(
            parent=client.collection_path(project_id, location, "default_collection"),
            data_store=data_store,
            data_store_id=data_store_id,
        )
    )

    # Make the request
    response = operation.result(timeout=90)
    return response.name

In [34]:
## Import the documents

def import_documents(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str,
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    source_documents = [f"{gcs_uri}/*"]

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=source_documents, data_schema="content"
        ),
        # Options: `FULL`, `INCREMENTAL`
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )

    # Make the request
    operation = client.import_documents(request=request)

    response = operation.result()

    # Once the operation is complete, get information from operation metadata
    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    # Handle the response
    return operation.operation.name

In [35]:
## Create a search engine
def create_engine(
    project_id: str, location: str, engine_name: str, engine_id: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    # Initialize request argument(s)
    engine = discoveryengine.Engine(
        display_name=engine_name,
        solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        data_store_ids=[data_store_id],
        search_engine_config=discoveryengine.Engine.SearchEngineConfig(
            search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
        ),
    )

    request = discoveryengine.CreateEngineRequest(
        parent=client.collection_path(project_id, location, "default_collection"),
        engine=engine,
        engine_id=engine.display_name,
    )

    # Make the request
    operation = client.create_engine(request=request)
    response = operation.result(timeout=90)
    return response.name

### Setup Vertex AI Search

Create datastore, import documents and create search

In [44]:
# Set the location as global
VERTEX_AI_SEARCH_LOCATION = "global"

# The datastore name can only contain lowercase letters, numbers, and hyphens
DATASTORE_NAME = "alphabet-contracts-demo-1"  # @param {type:"string", isTemplate: true}
DATASTORE_ID = f"{DATASTORE_NAME}-id"

created_ds_id = create_data_store(PROJECT_ID, VERTEX_AI_SEARCH_LOCATION, DATASTORE_NAME, DATASTORE_ID)
print(created_ds_id)

projects/756696270058/locations/global/collections/default_collection/dataStores/alphabet-contracts-demo-1-id


In [45]:
GCS_BUCKET = "gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs"  # @param {type:"string", isTemplate: true}

import_documents(PROJECT_ID, VERTEX_AI_SEARCH_LOCATION, DATASTORE_ID, GCS_BUCKET)

'projects/756696270058/locations/global/collections/default_collection/dataStores/alphabet-contracts-demo-1-id/branches/0/operations/import-documents-1407167227432598471'

In [46]:
ENGINE_NAME = DATASTORE_NAME
ENGINE_ID = DATASTORE_ID
created_engine_id = create_engine(
    PROJECT_ID, VERTEX_AI_SEARCH_LOCATION, ENGINE_NAME, ENGINE_ID, DATASTORE_ID
)
print(created_engine_id)

projects/756696270058/locations/global/collections/default_collection/engines/alphabet-contracts-demo-1


In [47]:
# Name your corpus
CORPUS_DISPLAY_NAME = "rag-vertexai-vector-search"  # @param {type:"string", "placeholder": "your-corpus-name"}

# Vertex AI Search name
ENGINE_NAME = created_engine_id
vertex_ai_search_config = rag.VertexAiSearchConfig(
    serving_config=f"{ENGINE_NAME}/servingConfigs/default_search",
)

# Create a RAG Corpus
rag_corpus = rag.create_corpus(
    display_name=CORPUS_DISPLAY_NAME,
    vertex_ai_search_config=vertex_ai_search_config,
)


# Check the corpus just created
new_corpus = rag.get_corpus(name=rag_corpus.name)


RagCorpus(name='projects/756696270058/locations/us-central1/ragCorpora/8791026472627208192', display_name='rag-vertexai-vector-search', description='', embedding_model_config=EmbeddingModelConfig(publisher_model=None, endpoint=None, model=None, model_version_id=None), vector_db=None, vertex_ai_search_config=VertexAiSearchConfig(serving_config='projects/756696270058/locations/global/collections/default_collection/engines/alphabet-contracts-demo-1/servingConfigs/default_search'), backend_config=RagVectorDbConfig(vector_db=None, rag_embedding_model_config=None))

In [48]:
# Check the newly created corpus
rag.list_corpora()

ListRagCorporaPager<rag_corpora {
  name: "projects/demos-vertex/locations/us-central1/ragCorpora/8791026472627208192"
  display_name: "rag-vertexai-vector-search"
  create_time {
    seconds: 1734418656
    nanos: 41792000
  }
  update_time {
    seconds: 1734418656
    nanos: 41792000
  }
  corpus_status {
    state: ACTIVE
  }
  vertex_ai_search_config {
    serving_config: "projects/756696270058/locations/global/collections/default_collection/engines/alphabet-contracts-demo-1/servingConfigs/default_search"
  }
}
>

### Create RAG Retrieval Tool

In [49]:
rag_resource = rag.RagResource(
    rag_corpus=rag_corpus.name,
)

rag_retrieval_tool = Tool.from_retrieval(
    retrieval=rag.Retrieval(
        source=rag.VertexRagStore(
            rag_resources=[rag_resource],  # Currently only 1 corpus is allowed.
            similarity_top_k=10,
        ),
    )
)

### Generate Content with Gemini using Rag Retrieval Tool

In [53]:
rag_vertexsrch_model = GenerativeModel("gemini-1.5-flash", tools=[rag_retrieval_tool])

Question from the Alphabet docs in Google GCS Bucket

In [54]:
response = rag_vertexsrch_model.generate_content("What is the total assets and total liabilities as of December 31 2021?")
display(Markdown(response.text))

As of December 31, 2021, Alphabet Inc. had total assets of $359,268 million and total liabilities of $107,633 million. 


### Perform direct context retrieval

Use retrieved contexts with your preferred SDK or API for final output.

In [None]:
# RETRIEVAL_QUERY = "What is the total assets and total liabilities as of December 31 2022?"  # @param {type:"string"}

# rag_resource = rag.RagResource(
#     rag_corpus=rag_corpus.name,
#     # Need to manually get the ids from rag.list_files.
#     # rag_file_ids=[],
# )

# response = rag.retrieval_query(
#     rag_resources=[rag_resource],  # Currently only 1 corpus is allowed.
#     text=RETRIEVAL_QUERY,
#     similarity_top_k=10,
# )

# # The retrieved context can be passed to any SDK or model generation API to generate final results.
# retrieved_context = " ".join(
#     [context.text for context in response.contexts.contexts]
# ).replace("\n", "")

# retrieved_context

### Cleanup