# Create a Vertex AI Datastore and Search Engine

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/julienmiquel/rag/blob/main/1_create_datastore/create_datastore_and_search.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/julienmiquel/rag/blob/main/1_create_datastore/create_datastore_and_search.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/julienmiquel/rag/main/search/create_datastore_and_search.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

<div style="clear: both;"></div>

<b>Share to:</b>

<a href="https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/julienmiquel/rag/blob/main/search/create_datastore_and_search.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg" alt="LinkedIn logo">
</a>

<a href="https://bsky.app/intent/compose?text=https%3A//github.com/julienmiquel/rag/blob/main/search/create_datastore_and_search.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg" alt="Bluesky logo">
</a>

<a href="https://twitter.com/intent/tweet?url=https%3A//github.com/julienmiquel/rag/blob/main/search/create_datastore_and_search.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/53/X_logo_2023_original.svg" alt="X logo">
</a>

<a href="https://reddit.com/submit?url=https%3A//github.com/julienmiquel/rag/blob/main/search/create_datastore_and_search.ipynb" target="_blank">
  <img width="20px" src="https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png" alt="Reddit logo">
</a>

<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/julienmiquel/rag/blob/main/search/create_datastore_and_search.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg" alt="Facebook logo">
</a>            

---

* Author(s): [Julien MIQUEL]
* Created: 22 Nov 2024
* Updated: 31 Jan 2025

---

## Objective

This notebook shows how to create and populate a Vertex AI Search Datastore, how to create a search app connected to that datastore, and how to submit queries through the search engine.


Services used in the notebook:

- ✅ Vertex AI Search for document search and retrieval

## Install pre-requisites

If running in Colab install the pre-requisites into the runtime. Otherwise it is assumed that the notebook is running in Vertex AI Workbench.

In [None]:
!pip install google-cloud-discoveryengine==0.12.3

### Restart current runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which will restart the current kernel.

In [None]:
# Restart kernel after installs so that your environment can access the new packages

import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>


## Authenticate

If running in Colab authenticate with `google.colab.google.auth` otherwise assume that running on Vertex AI Workbench.

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

## Configure notebook environment

In [None]:
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

PROJECT_ID = "ml-demo-384110"#"poc-searchv2"  # @param {type:"string"}
DATA_STORE_LOCATION = LOCATION = "eu"


Set [Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials)

In [None]:
!gcloud auth application-default login --project {PROJECT_ID}

## Create and Populate a Datastore

In [None]:
# The datastore name can only contain lowercase letters, numbers, and hyphens
DATASTORE_NAME = "dev-8"
DATASTORE_ID = f"{DATASTORE_NAME}"

In [None]:
def create_data_store(
    project_id: str, location: str, data_store_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DataStoreServiceClient(client_options=client_options)

    # Initialize request argument(s)
    data_store = discoveryengine.DataStore(
        display_name=data_store_name,
        industry_vertical="GENERIC",
        content_config='NO_CONTENT',
        solution_types=["SOLUTION_TYPE_SEARCH"],
    )



    operation = client.create_data_store(
        request=discoveryengine.CreateDataStoreRequest(
            parent=client.collection_path(project_id, location, "default_collection"),
            data_store=data_store,
            data_store_id=data_store_id,
        )
    )

    # Make the request
    # The try block is necessary to prevent execution from halting due to an error being thrown when the datastore takes a while to instantiate
    try:
        response = operation.result(timeout=90)
    except:
        print("long-running operation error.")

In [None]:
# Create empty datastore
create_data_store(PROJECT_ID, LOCATION, DATASTORE_NAME, DATASTORE_ID)

## update the datastore schema

In [None]:

def update_schema(
    project_id: str,
    location: str,
    data_store_id: str,
    doc_schema
):
    client_options = (
        ClientOptions(api_endpoint=f"{DATA_STORE_LOCATION}-discoveryengine.googleapis.com")
        if DATA_STORE_LOCATION != "global"
        else None
    )
    client = discoveryengine.SchemaServiceClient(client_options=client_options)

    schema = discoveryengine.Schema(
        name=client.schema_path(project_id, location, data_store_id, "default_schema"),
        struct_schema= doc_schema,
    )

    operation = client.update_schema(
        request=discoveryengine.UpdateSchemaRequest(schema=schema)
    )

    print("Waiting for operation to complete...")

    response = operation.result()

    # Handle the response
    print(response)

def get_schema(
    project_id: str,
    location: str,
    data_store_id: str,
):
    client_options = (
        ClientOptions(api_endpoint=f"{DATA_STORE_LOCATION}-discoveryengine.googleapis.com")
        if DATA_STORE_LOCATION != "global"
        else None
    )
    client = discoveryengine.SchemaServiceClient(client_options=client_options)

    operation = client.get_schema(
        request=discoveryengine.GetSchemaRequest(name=client.schema_path(project_id, location, data_store_id, "default_schema"))
    )

    print("Waiting for operation to complete...")

    response = operation

    # Handle the response
    print("done")
    #discoveryengine.Schema.to_json(response)


    return response


In [None]:
import json
with open(f'./struct_schema.json', 'r') as readfile:
    data = readfile.read()

struct_schema_json = json.loads(data)    

update_schema(PROJECT_ID, LOCATION,  DATASTORE_ID,struct_schema_json )

In [None]:
#backup schemas
struct_schema =get_schema(PROJECT_ID, DATA_STORE_LOCATION, DATASTORE_ID)

struct_schema_json = discoveryengine.Schema.to_json(struct_schema)
struct_schema_json = json.loads(struct_schema_json)
struct_schema_json = struct_schema_json['structSchema']

with open(f'struct_schema_full_{DATASTORE_ID}.json', 'w') as outfile:
  json.dump(struct_schema_json, outfile)


# Import document example

In [None]:
def import_documents(
    project_id: str,
    location: str,
    data_store_id: str,
    gcs_uri: str,
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.DocumentServiceClient(client_options=client_options)

    # The full resource name of the search engine branch.
    # e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="default_branch",
    )

    source_documents = [f"{gcs_uri}"]

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        gcs_source=discoveryengine.GcsSource(
            input_uris=source_documents, data_schema="custom" # CHANGE HERE "custom"
        ),
        # Options: `FULL`, `INCREMENTAL`
        reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
        id_field="id" # CHANGE HERE id_field="id"
    )

    # Make the request
    operation = client.import_documents(request=request)

    response = operation.result()

    # Once the operation is complete,
    # get information from operation metadata
    metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

    # Handle the response
    return operation.operation.name

In [None]:
source_documents_gs_uri = (
    # CHANGE HERE
    #"gs://video-search-content/picture/import_ds_gen_captions/caption_picture_30102024-00000-of-00320.jsonl"
    "gs://ml-demo-eu/datasets/jsonl/picture_import_ds_gen_captions_caption_picture_30102024-00094-of-00320.jsonl"
)

import_documents(PROJECT_ID, LOCATION, DATASTORE_ID, source_documents_gs_uri)

## Create a Search Engine

This is used to set the `search_tier` to enterprise and to enable advanced LLM features.

Enterprise tier is required to get extractive answers from a search query and advanced LLM features are required to summarize search results.

In [None]:
def create_engine(
    project_id: str, location: str, engine_name: str, data_store_id: str
):
    # Create a client
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if location != "global"
        else None
    )
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    # Initialize request argument(s)
    engine = discoveryengine.Engine(
        display_name=engine_name,
        solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        data_store_ids=[data_store_id],
        search_engine_config=discoveryengine.Engine.SearchEngineConfig(
            search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
            search_add_ons=[discoveryengine.SearchAddOn.SEARCH_ADD_ON_LLM],
        ),
    )

    request = discoveryengine.CreateEngineRequest(
        parent=client.collection_path(project_id, location, "default_collection"),
        engine=engine,
        engine_id=engine.display_name,
    )

    # Make the request
    operation = client.create_engine(request=request)
    response = operation.result(timeout=90)

In [None]:
ENGINE_NAME = DATASTORE_NAME
ENGINE_ID = DATASTORE_ID
create_engine(PROJECT_ID, LOCATION, ENGINE_NAME, DATASTORE_ID)

## Query your Search Engine

Note: The Engine will take some time to be ready to query.

If you recently created an engine and you receive an error similar to:

`404 Engine {ENGINE_NAME} is not found`

Then wait a few minutes and try your query again.

In [None]:
def search_sample(
    project_id: str,
    location: str,
    engine_id: str,
    search_query: str,
) -> list[discoveryengine.SearchResponse]:
    #  For more information, refer to:
    # https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
    client_options = (
        ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
        if LOCATION != "global"
        else None
    )
    print(client_options)

    # Create a client
    client = discoveryengine.SearchServiceClient(client_options=client_options)

    # The full resource name of the search engine serving config
    # e.g. projects/{project_id}/locations/{location}/dataStores/{data_store_id}/servingConfigs/{serving_config_id}
    serving_config = f"projects/{project_id}/locations/{location}/collections/default_collection/engines/{engine_id}/servingConfigs/default_search" 
                     
    # Optional: Configuration options for search
    # Refer to the `ContentSearchSpec` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest.ContentSearchSpec
    content_search_spec = discoveryengine.SearchRequest.ContentSearchSpec(
        # For information about snippets, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/snippets
        snippet_spec=discoveryengine.SearchRequest.ContentSearchSpec.SnippetSpec(
            return_snippet=True
        ),
        # For information about search summaries, refer to:
        # https://cloud.google.com/generative-ai-app-builder/docs/get-search-summaries
        summary_spec=discoveryengine.SearchRequest.ContentSearchSpec.SummarySpec(
            summary_result_count=5,
            include_citations=True,
            ignore_adversarial_query=True,
            ignore_non_summary_seeking_query=True,
        ),
    )

    # Refer to the `SearchRequest` reference for all supported fields:
    # https://cloud.google.com/python/docs/reference/discoveryengine/latest/google.cloud.discoveryengine_v1.types.SearchRequest
    request = discoveryengine.SearchRequest(
        serving_config=serving_config,
        query=search_query,
        page_size=20,
        content_search_spec=content_search_spec,
        query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
            condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO,
        ),
        spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
            mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
        ),
    )

    response = client.search(request)
    return response

# Test search

In [None]:
query = "Toronto International Film Festival"

response = search_sample( PROJECT_ID, LOCATION, ENGINE_ID, query)

print(response)

## Ranking API

In [None]:
def rank(elements, search_query):
    records = []
    for gcs_path in [*elements]:
        generated_caption = elements[gcs_path].get("generated_caption")
        caption = elements[gcs_path].get("caption")
        title = elements[gcs_path].get("title")
        print(generated_caption)
        if generated_caption is None:
            generated_caption = caption

        record = discoveryengine.RankingRecord(
            id=gcs_path,
            title=title,
            content=generated_caption,
        )

    records.append(record)

    client = discoveryengine.RankServiceClient()

    # The full resource name of the ranking config.
    # Format: projects/{project_id}/locations/{location}/rankingConfigs/default_ranking_config
    ranking_config = client.ranking_config_path(
        project=PROJECT_ID,
        location=LOCATION,
        ranking_config="default_ranking_config",
    )
    request = discoveryengine.RankRequest(
        ranking_config=ranking_config,
        model="semantic-ranker-512@latest",
        top_n=40,
        query=search_query,
        records=records  ,
        ignore_record_details_in_response = True,
    )

    response = client.rank(request=request)
    records = MessageToDict(response._pb)
    # print(records)

    ranking_scores = {doc['id']: doc for doc in records['records']}
    for id in ranking_scores:
        elements[id]['ranking'] = ranking_scores[id]['score']
    # docs = [update_ranking_doc(d, ranking_scores) for d in docs]


    return elements

In [None]:
def convert_discovery_filter(filter_input):
    """
    Converts a complex discovery search engine filter JSON structure into a Python dictionary.

    Args:
        filter_input (list): The input filter structure as a list of dictionaries.

    Returns:
        dict: The converted filter structure as a Python dictionary.
    """

    def process_filter_element(element):
        if isinstance(element, dict):
            if "exclude" in element:
                exclude_val = element["exclude"]
                if isinstance(exclude_val, list):
                    return {"exclude": [process_filter_element(item) for item in exclude_val]}
                else:
                    return {"exclude": process_filter_element(exclude_val)}

            elif "or" in element:
                return {"or": [process_filter_element(item) for item in element["or"]]}

            elif "and" in element:
                and_val = element["and"]
                if isinstance(and_val, list):
                    return {"and": [process_filter_element(item) for item in and_val]}
                else:
                    return {"and": process_filter_element(and_val)}

            elif "name" in element:
                name = element["name"]
                result = {"name": name}
                if "range" in element:
                    result["range"] = element["range"]
                if "and" in element:
                    result["and"] = element["and"]
                if "in" in element:
                    result["in"] = element["in"]
                if "fullText" in element:
                    result["fullText"] = element["fullText"]
                if "contains" in element:
                    result["contains"] = element["contains"]
                if "exclude" in element:
                    result["exclude"] = element["exclude"]
                if "having" in element:
                    result["having"] = element["having"]
                return result

            else:
                return element  # Return as is if no specific key is found

        elif isinstance(element, list):
            return [process_filter_element(item) for item in element]

        else:
            return element  # Return as is if not a dict or list

    return [process_filter_element(item) for item in filter_input]


In [None]:

# Example usage:
filter_input = [
    {
        "exclude": [
            {
                "name": "expires",
                "range": {
                    "fromExcluded": False,
                    "to": "now",
                    "toExcluded": False
                }
            },
            {
                "name": "status",
                "and": [
                    "withheld"
                ]
            }
        ]
    }
    ]



In [None]:
converted_filter = convert_discovery_filter(filter_input)
print(converted_filter)