In [None]:
import json
import numpy as np
from google.cloud import aiplatform, storage

# Vertex AI Initialization
PROJECT_ID = "your-project-id"  # Replace with your GCP Project ID
REGION = "us-central1"  # Use Matching Engine's supported region
BUCKET_NAME = "your-gcs-bucket-name"  # Replace with your GCS bucket name

aiplatform.init(project=PROJECT_ID, location=REGION)

In [None]:
gcloud auth login
gcloud auth application-default login


Step - 1: Define Schema

In [None]:
def prepare_schema_based_data(data, schema, vector_dim=384):
    """
    Converts data into Vertex AI-compatible JSONL format based on the schema.

    Args:
        data (list): List of raw data dictionaries.
        schema (dict): Schema definition for the collection.
        vector_dim (int): Expected dimension of embedding vectors.

    Returns:
        list: List of JSON objects formatted for Vertex AI.
    """
    jsonl_data = []
    for item in data:
        jsonl_entry = {"id": str(item["id"]), "embedding": item["vector"]}

        # Add metadata fields from schema
        metadata = {}
        for field in schema["collections"][0]["fields"]:
            if field["name"] not in ["id", "vector"]:
                metadata[field["name"]] = item.get(field["name"])
        jsonl_entry["metadata"] = metadata

        # Validate vector dimensions
        if len(item["vector"]) != vector_dim:
            raise ValueError(f"Vector dimension mismatch for id {item['id']}. Expected {vector_dim}, got {len(item['vector'])}")

        jsonl_data.append(jsonl_entry)
    return jsonl_data

def save_jsonl_to_gcs(data, local_file, bucket_name, destination_blob_name):
    """
    Saves data as JSONL and uploads it to GCS.

    Args:
        data (list): Data to save.
        local_file (str): Local path to save JSONL.
        bucket_name (str): GCS bucket name.
        destination_blob_name (str): Destination path in GCS.
    """
    # Save to JSONL
    with open(local_file, "w") as f:
        for entry in data:
            f.write(json.dumps(entry) + "\n")
    print(f"Data saved locally to {local_file}")

    # Upload to GCS
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(local_file)
    print(f"Uploaded {local_file} to gs://{bucket_name}/{destination_blob_name}")



Step 2: Create and Deploy Matching Engine Index

In [None]:
def create_and_deploy_index(dimensions, index_name="SEARCH_PDP_INDEX", endpoint_name="SEARCH_PDP_ENDPOINT"):
    """
    Creates and deploys a Vertex AI Matching Engine Index.

    Args:
        dimensions (int): Dimension of the embeddings.
        index_name (str): Name of the index.
        endpoint_name (str): Name of the endpoint.

    Returns:
        tuple: Resource names of the index and endpoint.
    """
    # Create Index
    index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name=index_name,
        dimensions=dimensions,
        approximate_neighbors_count=100,
        distance_measure_type="DOT_PRODUCT",
        sync=True,
    )
    print(f"Created Matching Engine Index: {index.resource_name}")

    # Deploy Index to Endpoint
    endpoint = aiplatform.MatchingEngineIndexEndpoint.create(display_name=endpoint_name, sync=True)
    endpoint.deploy_index(index=index.resource_name, sync=True)
    print(f"Deployed Index at Endpoint: {endpoint.resource_name}")

    return index.resource_name, endpoint.resource_name

Step 3: Query Matching Engine Index

In [None]:
def query_index(endpoint_resource_name, query_vector, top_k=5):
    """
    Queries the Matching Engine index for the most similar vectors.

    Args:
        endpoint_resource_name (str): The endpoint resource name.
        query_vector (list): Query embedding vector.
        top_k (int): Number of nearest neighbors to return.

    Returns:
        list: Query results with IDs and similarity scores.
    """
    endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_resource_name)

    results = endpoint.match(
        deployed_index_id="SEARCH_PDP_DEPLOYED_INDEX",  # Replace with actual deployed index ID
        queries=[query_vector],
        num_neighbors=top_k,
    )

    # Parse Results
    return [{"id": r["id"], "distance": r["distance"]} for r in results[0]]

Step 4: Main Workflow

In [None]:

def main():
    # Replace with actual data
    raw_data = [
        {"id": 1, "vector": [0.1, 0.2, 0.3, ...], "title": "Title 1", "description": "Description 1", "rank": 5},
        {"id": 2, "vector": [0.4, 0.5, 0.6, ...], "title": "Title 2", "description": "Description 2", "rank": 3},
    ]

    # Load schema
    with open("schema 2.json", "r") as schema_file:
        schema = json.load(schema_file)

    # Prepare data
    print("Preparing data...")
    formatted_data = prepare_schema_based_data(raw_data, schema)

    # Save to JSONL and upload to GCS
    save_jsonl_to_gcs(formatted_data, "data.jsonl", BUCKET_NAME, "data/SEARCH_PDP_DATA.jsonl")

    # Create and deploy index
    print("Creating and deploying index...")
    index_name, endpoint_name = create_and_deploy_index(dimensions=384)

    # Query the index
    query_vector = [0.1, 0.2, 0.3, ...]  # Replace with actual query vector
    print("Querying the index...")
    results = query_index(endpoint_name, query_vector)
    print("Search Results:", results)


if __name__ == "__main__":
    main()