# Deploy Vector Search After Pipeline Run

This notebook provisions a Vector Search endpoint and Delta Sync index for `metadata_docs` after your Lakeflow pipeline completes.

It does the following:
1. Validates the source table exists and has rows.
2. Creates (or reuses) a Vector Search endpoint.
3. Creates (or reuses) a Delta Sync index with managed embeddings.
4. Triggers sync (for `TRIGGERED` pipeline type).
5. Runs a similarity search sanity check.

## Optional package install

If your cluster does not already have Vector Search SDK installed, run this cell once and then restart Python.

In [None]:
%pip install -U databricks-vectorsearch databricks-sdk

In [None]:
from databricks.vector_search.client import VectorSearchClient
from pyspark.sql import functions as F
import time

# ---------- Config ----------
dbutils.widgets.text("catalog", "jack_demos_classic", "UC Catalog")
dbutils.widgets.text("schema", "data_catalogue_demo", "UC Schema")

catalog = dbutils.widgets.get("catalog").strip()
schema = dbutils.widgets.get("schema").strip()

if not catalog or not schema:
    raise ValueError("Both 'catalog' and 'schema' widget values are required.")

source_table = f"{catalog}.{schema}.metadata_docs"
index_name = f"{catalog}.{schema}.metadata_docs_index"

# Use a stable endpoint name if you have one already.
endpoint_name = "metadata-catalogue-vs-endpoint"
endpoint_type = "STANDARD"  # or "STORAGE_OPTIMIZED"

# Managed embedding model endpoint
embedding_model_endpoint = "databricks-gte-large-en"
pipeline_type = "TRIGGERED"  # or CONTINUOUS

display({
    "catalog": catalog,
    "schema": schema,
    "source_table": source_table,
    "index_name": index_name,
    "endpoint_name": endpoint_name,
    "embedding_model_endpoint": embedding_model_endpoint,
})

In [None]:
# ---------- Validate source table ----------
df = spark.table(source_table)
required_cols = {"doc_id", "content"}
missing = required_cols - set(df.columns)
if missing:
    raise ValueError(f"Source table missing required columns: {missing}")

row_count = df.count()
if row_count == 0:
    raise ValueError(f"{source_table} has 0 rows. Run/refresh pipeline first.")

print(f"Source table validated: {source_table} ({row_count} rows)")
display(df.select("doc_id", "full_table_name", "content").limit(5))

In [None]:
vsc = VectorSearchClient()

def endpoint_state(endpoint_payload: dict) -> str:
    # API shape can vary by SDK/runtime version.
    for path in [
        ("endpoint_status", "state"),
        ("status", "state"),
        ("state",),
    ]:
        cur = endpoint_payload
        ok = True
        for p in path:
            if isinstance(cur, dict) and p in cur:
                cur = cur[p]
            else:
                ok = False
                break
        if ok and cur:
            return str(cur)
    return "UNKNOWN"

# ---------- Create/reuse endpoint ----------
all_eps = vsc.list_endpoints()
eps = all_eps.get("endpoints", []) if isinstance(all_eps, dict) else []
exists = any(e.get("name") == endpoint_name for e in eps)

if not exists:
    print(f"Creating endpoint: {endpoint_name} ({endpoint_type})")
    vsc.create_endpoint(name=endpoint_name, endpoint_type=endpoint_type)
else:
    print(f"Endpoint already exists: {endpoint_name}")

for i in range(60):
    ep = vsc.get_endpoint(endpoint_name)
    state = endpoint_state(ep if isinstance(ep, dict) else {})
    print(f"Endpoint state: {state}")
    if state.upper() in {"ONLINE", "READY"}:
        break
    if state.upper() in {"FAILED", "ERROR"}:
        raise RuntimeError(f"Endpoint failed: {ep}")
    time.sleep(10)
else:
    raise TimeoutError("Endpoint did not become ready in time")

In [None]:
# ---------- Create/reuse Delta Sync index ----------
idxs = vsc.list_indexes(endpoint_name)
idx_list = idxs.get("vector_indexes", []) if isinstance(idxs, dict) else []
index_exists = any(i.get("name") == index_name for i in idx_list)

if not index_exists:
    print(f"Creating index: {index_name}")
    try:
        # Older SDK shape
        vsc.create_delta_sync_index(
            endpoint_name=endpoint_name,
            index_name=index_name,
            source_table_name=source_table,
            primary_key="doc_id",
            pipeline_type=pipeline_type,
            embedding_source_column="content",
            embedding_model_endpoint_name=embedding_model_endpoint,
        )
    except TypeError:
        # Newer SDK shape
        vsc.create_delta_sync_index(
            endpoint_name=endpoint_name,
            index_name=index_name,
            source_table_name=source_table,
            primary_key="doc_id",
            pipeline_type=pipeline_type,
            embedding_source_columns=[
                {
                    "name": "content",
                    "embedding_model_endpoint_name": embedding_model_endpoint,
                }
            ],
        )
else:
    print(f"Index already exists: {index_name}")

In [None]:
# ---------- Sync (TRIGGERED) + readiness wait ----------
try:
    index = vsc.get_index(endpoint_name=endpoint_name, index_name=index_name)
except TypeError:
    index = vsc.get_index(index_name=index_name, endpoint_name=endpoint_name)

if pipeline_type.upper() == "TRIGGERED":
    print("Triggering index sync...")
    try:
        index.sync()
    except Exception:
        # Some SDKs expose sync on client instead of handle.
        vsc.sync_index(index_name=index_name)

def index_state(payload: dict) -> str:
    for path in [
        ("status", "detailed_state"),
        ("status", "state"),
        ("detailed_state",),
        ("state",),
    ]:
        cur = payload
        ok = True
        for p in path:
            if isinstance(cur, dict) and p in cur:
                cur = cur[p]
            else:
                ok = False
                break
        if ok and cur:
            return str(cur)
    return "UNKNOWN"

for i in range(90):
    d = index.describe()
    s = index_state(d if isinstance(d, dict) else {})
    print(f"Index state: {s}")
    upper_s = s.upper()
    if any(k in upper_s for k in ["ONLINE", "READY", "SUCCESS"]):
        break
    if any(k in upper_s for k in ["FAILED", "ERROR"]):
        raise RuntimeError(f"Index failed: {d}")
    time.sleep(10)
else:
    raise TimeoutError("Index did not become ready in time")

print("Index is ready")

In [None]:
# ---------- Similarity search sanity check ----------
query = "Find tables with inventory or shipment information"
print(f"Query: {query}")

results = index.similarity_search(
    query_text=query,
    columns=["doc_id", "full_table_name", "content"],
    num_results=5,
)

display(results)