# Preparation of Hands-on Data

Can be executed with serverless compute.

## Data Preparation

In [0]:
%run ./config

In [0]:
# Create catalog
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog_name} TO `account users`")
# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{system_schema_name}")
spark.sql(f"GRANT USE SCHEMA, SELECT ON SCHEMA {catalog_name}.{system_schema_name} TO `account users`")

In [0]:
# Create volume
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{system_schema_name}.data")

In [0]:
import shutil
import os

# Define volume path
volume_path = f"/Volumes/{catalog_name}/{system_schema_name}/data"

# Path to workspace files
workspace_data_path = f"{os.getcwd()}/data"

# Copy CSV files in the data folder to the volume
for fname in ["cust_service_data.csv", "policies.csv", "product_docs.csv"]:
    src = os.path.join(workspace_data_path, fname)
    dst = os.path.join(volume_path, fname)
    shutil.copyfile(src, dst)

# Load CSV files from the volume
cust_service_data_df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(f"{volume_path}/cust_service_data.csv")
)
policies_df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(f"{volume_path}/policies.csv")
)
product_docs_df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .option("multiline", True)
    .load(f"{volume_path}/product_docs.csv")
)

display(cust_service_data_df)
display(policies_df)
display(product_docs_df)

# Save to Unity Catalog tables
cust_service_data_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.cust_service_data")
policies_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.policies")
product_docs_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.product_docs")

## Create Vector Search Index

Create the Vector Search Index to be used in `02_agent_eval`. Each user will use this Vector Search Index.

In [0]:
# Source table name
source_table = f"{catalog_name}.{system_schema_name}.product_docs"

# Enable Change Data Feed
spark.sql(f"""
    ALTER TABLE {source_table} 
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

print(f"✅ Enabled Change Data Feed for {source_table}")

# Check settings
spark.sql(f"SHOW TBLPROPERTIES {source_table}").filter("key = 'delta.enableChangeDataFeed'").show()

In [0]:
import requests
import json
import time

context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host = context.apiUrl().get()
token = context.apiToken().get()

headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

# Parameters
endpoint_name = "vector-search-endpoint-1" # Change as needed
index_name = f"{catalog_name}.{system_schema_name}.product_docs_index"
source_table = f"{catalog_name}.{system_schema_name}.product_docs"

# Check existence and create Vector Search Endpoint
endpoint_url = f"{host}/api/2.0/vector-search/endpoints/{endpoint_name}"
create_endpoint_url = f"{host}/api/2.0/vector-search/endpoints"

response = requests.get(endpoint_url, headers=headers)
if response.status_code == 200:
    print(f"✅ Vector Search Endpoint '{endpoint_name}' already exists")
else:
    print(f"🔄 Creating Vector Search Endpoint '{endpoint_name}' ...")
    payload = {
        "name": endpoint_name,
        "endpoint_type": "STANDARD"
    }
    create_response = requests.post(create_endpoint_url, headers=headers, json=payload)
    if create_response.status_code in [200, 201]:
        print(f"✅ Created Vector Search Endpoint '{endpoint_name}'!")
    elif create_response.status_code == 409:
        print(f"⚠️  Vector Search Endpoint '{endpoint_name}' already exists (409)")
    else:
        print(f"❌ Endpoint creation error: {create_response.status_code}")
        print(create_response.text)

# Create index
url = f"{host}/api/2.0/vector-search/indexes"

payload = {
    "name": index_name,
    "endpoint_name": endpoint_name,
    "primary_key": "product_id",
    "index_type": "DELTA_SYNC",
    "delta_sync_index_spec": {
        "source_table": source_table,
        "pipeline_type": "TRIGGERED",
        "embedding_source_columns": [{  # columns is an array
            "name": "product_doc",
            "embedding_model_endpoint_name": "databricks-gte-large-en"
        }]
    }
}

print("Creating Vector Search Index...")
response = requests.post(url, headers=headers, json=payload)

if response.status_code in [200, 201]:
    print("✅ Index creation request sent successfully!")
    print("📊 Initial sync will start automatically...\n")
else:
    print(f"❌ Index creation error: {response.status_code}")
    print(response.text)

In [0]:
if response.status_code in [200, 201]:
    
    # Monitor index status
    def monitor_index_status(index_name, timeout_minutes=60):
        status_url = f"{host}/api/2.0/vector-search/indexes/{index_name}"
        start_time = time.time()
        timeout_seconds = timeout_minutes * 60
        
        print(f"⏳ Monitoring index status...")
        print(f"Timeout: {timeout_minutes} minutes")
        print("-" * 70)
        
        previous_count = 0
        first_check = True
        
        while time.time() - start_time < timeout_seconds:
            try:
                response = requests.get(status_url, headers=headers)
                
                if response.status_code == 200:
                    data = response.json()
                    status = data.get("status", {})
                    spec = data.get("delta_sync_index_spec", {})
                    
                    # Status information
                    state = status.get("detailed_state", "Unknown")
                    ready = status.get("ready", False)
                    indexed_count = status.get("indexed_row_count", 0)
                    message = status.get("message", "")
                    
                    # Get total row count at first check
                    if first_check:
                        total_rows = spec.get("num_rows", 0)
                        if total_rows > 0:
                            print(f"📊 Total number of rows to be indexed: {total_rows:,}")
                        first_check = False
                    
                    # Show progress
                    elapsed = int(time.time() - start_time)
                    rows_diff = indexed_count - previous_count
                    speed = f"{rows_diff:,} rows" if rows_diff > 0 else "Initializing"
                    
                    print(f"\r⏱️  {elapsed//60}m {elapsed%60}s | "
                          f"State: {state} | "
                          f"Rows: {indexed_count:,} | "
                          f"Speed: {speed}/30s | "
                          f"Ready: {ready}", end="")
                    
                    previous_count = indexed_count
                    
                    # Success: ready is True and state is ONLINE or READY
                    if ready == True and ("ONLINE" in state or state == "READY"):
                      print(f"\n\n✅ Index is ready!")
                      print(f"📊 Total indexed rows: {indexed_count:,}")
                      print(f"⏱️  Total time: {elapsed//60}m {elapsed%60}s")
                      print(f"📋 Final state: {state}")
                      return True
                
                    # Error
                    if state in ["FAILED", "ERROR"]:
                      print(f"\n\n❌ Index creation failed!")
                      print(f"Error state: {state}")
                      return False
                
                    # Timeout
                    if elapsed > timeout_seconds:
                      print(f"\n\n⏰ Timed out after {timeout_minutes} minutes")
                      print(f"Final state: {state}, Ready: {ready}")
                      return False
                        
                else:
                    print(f"\n⚠️  Status check error: {response.status_code}")
                    print(response.text)
                
                time.sleep(30)  # Check every 30 seconds
                
            except Exception as e:
                print(f"\n⚠️  Exception occurred: {e}")
                time.sleep(30)
        
        print(f"\n\n⏰ Timed out after {timeout_minutes} minutes")
        return False
    
    # Start monitoring the index
    success = monitor_index_status(index_name, timeout_minutes=60)
    
    if success:
        print("\n🎉 Vector search index is ready!")
        
        # Show final index info
        final_url = f"{host}/api/2.0/vector-search/indexes/{index_name}"
        final_response = requests.get(final_url, headers=headers)
        if final_response.status_code == 200:
            final_data = final_response.json()
            print(f"\n📋 Index details:")
            print(f"  - Name: {final_data.get('name')}")
            print(f"  - Endpoint: {final_data.get('endpoint_name')}")
            print(f"  - Status: {final_data.get('status', {}).get('detailed_state')}")
            print(f"  - Rows: {final_data.get('status', {}).get('indexed_row_count', 0):,}")
    else:
        print("\n😞 Index creation did not complete successfully")
        
elif response.status_code == 409:
    print("⚠️  Index already exists")
    print("You can delete and recreate it if necessary")
else:
    print(f"❌ Index creation error: {response.status_code}")
    print(response.text)

# END