In [1]:
import json
import time
import requests
import google.auth
from google.auth.transport.requests import Request
from google.cloud import discoveryengine_v1 as discoveryengine
from google.protobuf import json_format

# --- Configuration ---
# Replace with your project ID and desired location
DATASTORE_PROJECT_ID = 'kaggle-hackathon-471317'
DATASTORE_LOCATION = "global"  # e.g., "us", "eu", "global"
DATASTORE_ID = "poi-locations-ds-testrial12"
DATASTORE_DISPLAY_NAME = "POI Locations Datastore"
DATASTORE_SCHEMA = {
  "type": "object",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "dynamic": "false",
  "datetime_detection": True,
  "geolocation_detection": True,
  "properties": {
    "name": {
      "indexable": True,
      "searchable": True,
      "type": "string",
      "retrievable": True,
      "dynamicFacetable": True
    },
    "category": {
      "indexable": True,
      "dynamicFacetable": True,
      "searchable": True,
      "retrievable": True,
      "type": "string"
    }
  }
}
    
BQ_PROJECT_ID = 'kaggle-hackathon-471317' # Project containing the BQ table
BQ_DATASET = "geo_intents"
BQ_TABLE = "sample_table"

### --- Core Functions ---


In [9]:

def get_access_token() -> str:
    """Gets the default gcloud access token."""
    credentials, _ = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    if not credentials.valid:
        if credentials.expired and credentials.refresh_token:
            credentials.refresh(Request())
    return credentials.token


def poll_operation_rest(operation_name: str, access_token: str) -> dict:
    """Polls a long-running operation using REST until it's done."""
    url = f"https://discoveryengine.googleapis.com/v1/{operation_name}"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }
    while True:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        result = response.json()
        if result.get("done", False):
            print("Operation finished.")
            return result
        print("Operation not finished, waiting 10 seconds...")
        time.sleep(10)


# --- Core Functions ---

def create_data_store(
    project_id: str,
    location: str,
    data_store_id: str,
    display_name: str,
    solution_type: str = "SOLUTION_TYPE_SEARCH",
) -> discoveryengine.DataStore:
    """
    Creates a new Vertex AI Search data store using the client library.

    Args:
        project_id: Your Google Cloud project ID.
        location: The location for the data store (e.g., "global").
        data_store_id: A unique ID for the new data store.
        display_name: The display name for the data store.
        solution_type: The solution type, e.g., 'SOLUTION_TYPE_SEARCH'.

    Returns:
        The created DataStore object.
    """
    client = discoveryengine.DataStoreServiceClient()
    parent = f"projects/{project_id}/locations/{location}/collections/default_collection"

    data_store = discoveryengine.DataStore(
        display_name=display_name,
        solution_types=[solution_type],
        content_config=discoveryengine.DataStore.ContentConfig.NO_CONTENT,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,

    )

    request = discoveryengine.CreateDataStoreRequest(
        parent=parent,
        data_store=data_store,
        data_store_id=data_store_id,
    )

    print(f"Attempting to create data store '{data_store_id}'...")
    operation = client.create_data_store(request=request)

    print(f"Create data store operation started: {operation.operation.name}")
    print("Waiting for operation to complete...")
    response = operation.result()
    print(f"Data store '{response.name}' created successfully.")

    return response


def import_from_bigquery(
    project_id: str,
    location: str,
    data_store_id: str,
    bigquery_project_id: str,
    bigquery_dataset_id: str,
    bigquery_table_id: str,
) -> discoveryengine.ImportDocumentsResponse:
    """
    Imports documents into a data store from a BigQuery table using the client library.

    Args:
        project_id: The project ID containing the data store.
        location: The location of the data store.
        data_store_id: The ID of the data store.
        bigquery_project_id: The project ID of the BigQuery table.
        bigquery_dataset_id: The dataset ID of the BigQuery table.
        bigquery_table_id: The table ID of the BigQuery table.

    Returns:
        The ImportDocumentsResponse object.
    """
    client = discoveryengine.DocumentServiceClient()

    # The default branch is '0'
    parent = client.branch_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        branch="0",
    )

    bq_source = discoveryengine.BigQuerySource(
        project_id=bigquery_project_id,
        dataset_id=bigquery_dataset_id,
        table_id=bigquery_table_id,
        data_schema="custom",
    )

    request = discoveryengine.ImportDocumentsRequest(
        parent=parent,
        bigquery_source=bq_source
        # The options below are optional
        # reconciliation_mode defaults to INCREMENTAL
    )

    print(
        f"Starting import from BigQuery table '{bigquery_project_id}.{bigquery_dataset_id}.{bigquery_table_id}'..."
    )
    operation = client.import_documents(request=request)

    print(f"Import operation started: {operation.operation.name}")
    print("Waiting for operation to complete...")
    response = operation.result()
    print("Import completed successfully.")

    return response


def update_schema(
    project_id: str, location: str, data_store_id: str, schema_dict: dict
) -> dict:
    """
    Updates the schema of a data store using the REST API.

    Args:
        project_id: The project ID containing the data store.
        location: The location of the data store.
        data_store_id: The ID of the data store.
        schema_dict: A dictionary representing the JSON schema.

    Returns:
        A dictionary containing the response of the completed operation.
    """
    access_token = get_access_token()
 
    # For PATCH requests, the fields to be updated must be specified in an `updateMask` query parameter.
    # The field for a JSON object schema is `structSchema`.
    url = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}/schemas/default_schema" \


    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    # The payload is the Schema resource itself, with only the fields to be updated.
    # The key for a JSON object schema is `structSchema`.
    payload = {"structSchema": schema_dict}

    print(f"Attempting to update schema for data store '{data_store_id}' via REST...")
    response = requests.patch(url, headers=headers, json=payload)

    if response.status_code != 200:
        print(f"Error updating schema: {response.text}")
        response.raise_for_status()

    operation = response.json()
    print(f"Update schema operation started: {operation['name']}")

    # Poll the operation for completion
    final_result = poll_operation_rest(operation["name"], access_token)

    print("Schema updated successfully.")
    return final_result


def get_schema(project_id: str, location: str, data_store_id: str) -> dict:
    """
    Retrieves the current schema of a data store using the client library.

    Args:
        project_id: The project ID containing the data store.
        location: The location of the data store.
        data_store_id: The ID of the data store.

    Returns:
        A dictionary containing the current schema.
    """
    client = discoveryengine.SchemaServiceClient()

    # The default schema is named 'default_schema'
    schema_name = client.schema_path(
        project=project_id,
        location=location,
        data_store=data_store_id,
        schema="default_schema",
    )

    request = discoveryengine.GetSchemaRequest(name=schema_name)

    print(f"Attempting to retrieve schema for data store '{data_store_id}'...")
    schema = client.get_schema(request=request)

    # Convert the protobuf Struct back to a Python dictionary for easy viewing
    # The schema can be in either structSchema or jsonSchema field
    if schema.struct_schema:
        return json_format.MessageToDict(schema.struct_schema)
    elif schema.json_schema:
        return json.loads(schema.json_schema)
    return {}



In [3]:
create_result = create_data_store(
    project_id=DATASTORE_PROJECT_ID,
    location=DATASTORE_LOCATION,
    data_store_id=DATASTORE_ID,
    display_name=DATASTORE_DISPLAY_NAME,
)

Attempting to create data store 'poi-locations-ds-testrial12'...
Create data store operation started: projects/168027251845/locations/global/collections/default_collection/operations/create-data-store-12936732990538311071
Waiting for operation to complete...
Data store 'projects/168027251845/locations/global/collections/default_collection/dataStores/poi-locations-ds-testrial12' created successfully.


In [10]:
import_result = import_from_bigquery(
    project_id=DATASTORE_PROJECT_ID,
    location=DATASTORE_LOCATION,
    data_store_id=DATASTORE_ID,
    bigquery_project_id=BQ_PROJECT_ID,
    bigquery_dataset_id=BQ_DATASET,
    bigquery_table_id=BQ_TABLE,
)

Starting import from BigQuery table 'kaggle-hackathon-471317.geo_intents.sample_table'...
Import operation started: projects/168027251845/locations/global/collections/default_collection/dataStores/poi-locations-ds-testrial12/branches/0/operations/import-documents-7910644913709426476
Waiting for operation to complete...
Import completed successfully.


In [11]:
update_schema_result = update_schema(
    project_id=DATASTORE_PROJECT_ID,
    location=DATASTORE_LOCATION,
    data_store_id=DATASTORE_ID,
    schema_dict=DATASTORE_SCHEMA,
)

Attempting to update schema for data store 'poi-locations-ds-testrial12' via REST...
Update schema operation started: projects/168027251845/locations/global/collections/default_collection/dataStores/poi-locations-ds-testrial12/schemas/default_schema/operations/update-schema-17342063653246973095
Operation not finished, waiting 10 seconds...
Operation finished.
Schema updated successfully.
