# Rag Engine

<span style="color:red">**YOU NEED TO UPDATE YOUR PROJECT_ID AND LOCATION**</span>

In [35]:
import os
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine
from vertexai.preview import rag
import vertexai

PROJECT_ID = "hello-world-418507"
LOCATION = "us-central1"
VERTEX_AI_SEARCH_LOCATION = "us-central1"
vertexai.init(project=PROJECT_ID, location=LOCATION)

## Configuration

- Create a new Cloud Storage bucket with PDF.
- Datastore name MUST BE LOWERCASE

In [15]:
# The datastore name can only contain lowercase letters, numbers, and hyphens
DATASTORE_NAME = "2025-adk-workshop-policy-tobedeleted2"
DATASTORE_ID = f"{DATASTORE_NAME}-id"

GCS_BUCKET = "gs://2025-adk-workshop-policy"


# Part 1 - Datastore (Run once only)

### Create and Populate a Datastore

RAG is not supported for the Vertex AI Search DataStore with 'chunking config'.

In [16]:
def create_data_store(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        content_config=discoveryengine.DataStore.ContentConfig.CONTENT_REQUIRED,
    )

    operation = client.create_data_store(
        request=discoveryengine.CreateDataStoreRequest(
            parent=client.collection_path(project_id, location, "default_collection"),
            data_store=data_store,
            data_store_id=data_store_id,
        )
    )

    # Make the request
    response = operation.result(timeout=90)
    return response.name

In [17]:
def import_documents(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str, # Bucket path
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    source_documents = [f"{gcs_uri}/*"]

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=source_documents, data_schema="content"
        ),
        # Options: `FULL`, `INCREMENTAL`
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
    )

    # Make the request
    operation = client.import_documents(request=request)

    response = operation.result()

    # Once the operation is complete,
    # get information from operation metadata
    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    # Handle the response
    return operation.operation.name

In [None]:
create_data_store(PROJECT_ID, VERTEX_AI_SEARCH_LOCATION, DATASTORE_NAME, DATASTORE_ID)

Next is to import the Cloud Storage document, this steps takes about 10 minutes.

In [None]:
import_documents(PROJECT_ID, VERTEX_AI_SEARCH_LOCATION, DATASTORE_ID, GCS_BUCKET)

## Part 2 - Create a Search Engine (Run once only)

In [20]:
def create_engine(
    project_id: str, location: str, engine_name: str, engine_id: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    # Initialize request argument(s)
    engine = discoveryengine.Engine(
        display_name=engine_name,
        solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        data_store_ids=[data_store_id],
        search_engine_config=discoveryengine.Engine.SearchEngineConfig(
            search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
        ),
    )
    print(engine.display_name)

    request = discoveryengine.CreateEngineRequest(
        parent=client.collection_path(project_id, location, "default_collection"),
        engine=engine,
        engine_id=engine_id,
    )

    # Make the request
    operation = client.create_engine(request=request)
    response = operation.result(timeout=90)
    return response.name

In [None]:
DATASTORE_NAME, DATASTORE_ID

In [None]:
ENGINE_NAME = DATASTORE_NAME + "-engine"
ENGINE_ID = DATASTORE_ID + "-engine"

response = create_engine(
    project_id=PROJECT_ID,
    location=VERTEX_AI_SEARCH_LOCATION,
    engine_name=ENGINE_NAME,
    engine_id=ENGINE_ID,
    data_store_id=DATASTORE_ID,
)

response

## Part 3 - Create corpus (Run only once)

In [40]:
from vertexai.preview import rag

In [None]:
VERTEX_AI_SEARCH_LOCATION, ENGINE_NAME, ENGINE_ID

In [None]:
"""
Format:
    ``projects/{project}/locations/{location}/collections/{collection}/engines/{engine_id}/servingConfigs/{serving_config}``
or
    ``projects/{project}/locations/{location}/collections/{collection}/dataStores/{data_store_id}/servingConfigs/{serving_config}``
"""

config = f"projects/{PROJECT_ID}/locations/{VERTEX_AI_SEARCH_LOCATION}/collections/default_collection/engines/{ENGINE_ID}/servingConfigs/default_search"
print(config)

vertex_ai_search_config = rag.VertexAiSearchConfig(
    serving_config=config,
)


In [43]:
rag_corpus = rag.create_corpus(
    display_name=ENGINE_NAME,
    vertex_ai_search_config=vertex_ai_search_config,
)


In [None]:
print(rag_corpus.name)

In [None]:
# Check the corpus just created
new_corpus = rag.get_corpus(name=rag_corpus.name)
new_corpus

## List corpora

In [None]:
rag.list_corpora()

# Part 4 - Retrieval

### Query with Rag Engine

You can find the value for `CORPUS_NAME` from the previous cell `name` value.

```json
rag_corpora {
  name: "projects/hello-world-418507/locations/us-central1/ragCorpora/3054003497310617600"
  display_name: "2025-adk-workshop-policy-tobedeleted-engine"
  create_time {
    seconds: 1746602040
    nanos: 531441000
  }
  update_time {
    seconds: 1746602040
    nanos: 531441000
  }
  corpus_status {
    state: ACTIVE
  }
  vertex_ai_search_config {
    serving_config: "projects/hello-world-418507/locations/global/collections/default_collection/engines/2025-adk-workshop-policy-tobedeleted-id-engine/servingConfigs/default_search"
  }
}
```

In [None]:
RETRIEVAL_QUERY = "When can i start taking maternity leave?"
CORPUS_NAME = "projects/hello-world-418507/locations/us-central1/ragCorpora/5148740273991319552"

rag_resource = rag.RagResource(rag_corpus=CORPUS_NAME)
print(rag_resource)

rag_retrieval_config = rag.RagRetrievalConfig(
    top_k=5,  # Optional
    filter=rag.Filter(
        vector_distance_threshold=0.2,  # Optional
    ),
)

response = rag.retrieval_query(
    rag_resources=[rag_resource],  # Currently only 1 corpus is allowed.
    text=RETRIEVAL_QUERY,
    similarity_top_k=10,
    # rag_retrieval_config=rag_retrieval_config
)
print(response)


## Query with Gemini


In [33]:
from rich import print as rich_print
from vertexai.preview import rag
from google.genai.types import (
    GenerateContentConfig,
    VertexRagStore,
    Retrieval,
    Tool,
    VertexAISearch,
)
from google import genai


In [None]:

PROJECT_ID = "hello-world-418507"
LOCATION = "us-central1"
MODEL_ID = "gemini-2.0-flash-001"
RETRIEVAL_QUERY = "When can i start taking maternity leave?"


client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

rag_resource = rag.RagResource(
    rag_corpus=CORPUS_NAME,
)

vertex_ai_search_tool = Tool(
    retrieval=Retrieval(vertex_rag_store=VertexRagStore(rag_resources=[rag_resource]))
)

response = client.models.generate_content(
    model=MODEL_ID,
    contents=RETRIEVAL_QUERY,
    config=GenerateContentConfig(tools=[vertex_ai_search_tool]),
)

rich_print(response)


## Cleaning up

Clean up RAG resources created in this notebook.

In [None]:
# rag.delete_corpus(name="projects/hello-world-418507/locations/us-central1/ragCorpora/1689975760170778624")