In [0]:
%pip install -U langchain==0.3.2 langchain-experimental==0.3.2 langgraph==0.2.34 langchain-databricks==0.1.0 datasets==3.0.2

dbutils.library.restartPython()


Collecting datasets==3.0.2
  Downloading datasets-3.0.2-py3-none-any.whl (472 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 472.7/472.7 kB 6.8 MB/s eta 0:00:00
Collecting fsspec[http]<=2024.9.0,>=2023.1.0
  Downloading fsspec-2024.9.0-py3-none-any.whl (179 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 179.3/179.3 kB 8.6 MB/s eta 0:00:00
Collecting tqdm>=4.66.3
  Downloading tqdm-4.66.5-py3-none-any.whl (78 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 78.4/78.4 kB 10.4 MB/s eta 0:00:00
Collecting xxhash
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 194.1/194.1 kB 13.0 MB/s eta 0:00:00
Collecting dill<0.3.9,>=0.3.0
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 116.3/116.3 kB 9.1 MB/s eta 0:00:00
Collecting requests<3,>=2
  Downloading requests-2.32.3-py3-none-any.whl (64 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 64.9/64.9 kB 9.5

# Setup

In [0]:
from config import *

catalog_name = CATALOG_NAME
schema_name = SCHEMA_NAME

# Source Table
table_name = TABLE_NAME
source_table_fullname = f"{catalog_name}.{schema_name}.{table_name}"

#  Vector Search Endpoint
vs_endpoint_name = VS_ENDPOINT_NAME

# Vector Search index
vs_index = VS_INDEX
vs_index_table_fullname = f"{catalog_name}.{schema_name}.{vs_index}"



# Load Dataset

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS cache;

In [0]:
import os
from datasets import load_dataset

# os.environ['HF_DATASETS_CACHE'] = "/Volumes/"
# Define a persistent cache directory
cache_dir = "workspace_us.default.cache"
split = "train[:20%]" # 10K samples

# Load dataset from Hugging Face, limit to 50%
dataset = load_dataset("owaiskha9654/PubMed_MultiLabel_Text_Classification_Dataset_MeSH", split=split, cache_dir=cache_dir)



(…)ext Classification Dataset Processed.csv:   0%|          | 0.00/120M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [0]:
print(len(dataset.column_names))
print(dataset.column_names)
print(len(dataset))

20
['Title', 'abstractText', 'meshMajor', 'pmid', 'meshid', 'meshroot', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'L', 'M', 'N', 'Z']
10000


# Create Source Table

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, concat, lit

# The dataset has Title, abstractText, and meshMajor columns
pmid = dataset["pmid"]
title = dataset['Title']
abstract_text = dataset['abstractText']

# Create DataFrame
df = spark.createDataFrame(zip(pmid, title, abstract_text), ["pmid", "title", "abstract_text"])
df = df.withColumn("pmid", df["pmid"].cast("string"))

# Create content column
df = df.withColumn("content", concat(lit("Title: "), df["title"], lit("\nAbstract: "), df["abstract_text"]))

# Drop columns title, abstract
df = df.drop('title', 'abstract_text')

# Drop  null
df = df.dropna()

# Add a contiguous 'id' column starting from 1
window_spec = Window.orderBy("content")
df = df.withColumn("id", row_number().over(window_spec))

# Save DataFrame as a Delta table
df.write.format("delta").option("overwriteSchema", "true").option("delta.enableChangeDataFeed", "true").mode("overwrite").saveAsTable(source_table_fullname)




# Create Vector Search Endpoint

In [0]:
#  Create VEctor Search Endpoint

import time
from databricks.vector_search.client import VectorSearchClient

def wait_for_vs_endpoint_to_be_ready(vsc, vs_endpoint_name):
  for i in range(180):
    endpoint = vsc.get_endpoint(vs_endpoint_name)
    status = endpoint.get("endpoint_status", endpoint.get("status"))["state"].upper()
    if "ONLINE" in status:
      return endpoint
    elif "PROVISIONING" in status or i <6:
      if i % 20 == 0: 
        print(f"Waiting for endpoint to be ready, this can take a few min... {endpoint}")
      time.sleep(10)
    else:
      raise Exception(f'''Error with the endpoint {vs_endpoint_name}. - this shouldn't happen: {endpoint}.\n Please delete it and re-run the previous cell: vsc.delete_endpoint("{vs_endpoint_name}")''')
  raise Exception(f"Timeout, your endpoint isn't ready yet: {vsc.get_endpoint(vs_endpoint_name)}")

def create_vs_endpoint(vs_endpoint_name):

    # check if the endpoint exists
    if vs_endpoint_name not in [e['name'] for e in vsc.list_endpoints()['endpoints']]:
        vsc.create_endpoint(name=vs_endpoint_name, endpoint_type="STANDARD")

    # check the status of the endpoint
    wait_for_vs_endpoint_to_be_ready(vsc, vs_endpoint_name)
    print(f"Endpoint named {vs_endpoint_name} is ready.")

vsc = VectorSearchClient()

#  Create a new endpoint if not exist
vsc.create_endpoint(name=vs_endpoint_name, endpoint_type="STANDARD")
print(f"Endpoint named {vs_endpoint_name} is ready.")

wait_for_vs_endpoint_to_be_ready(vsc, vs_endpoint_name)

[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
Endpoint named pubned_vs_endpoint is ready.
Waiting for endpoint to be ready, this can take a few min... {'name': 'pubned_vs_endpoint', 'creator': 'nt@neoedu.co.th', 'creation_timestamp': 1730105043063, 'last_updated_timestamp': 1730105043063, 'endpoint_type': 'STANDARD', 'last_updated_user': 'nt@neoedu.co.th', 'id': '22d4102f-6fe9-4328-8971-497579875cd7', 'endpoint_status': {'state': 'PROVISIONING'}, 'num_indexes': 0}
Waiting for endpoint to be ready, this can take a few min... {'name': 'pubned_vs_endpoint', 'creator': 'nt@neoedu.co.th', 'creation_timestamp': 1730105043063, 'last_updated_timestamp': 1730105043063, 'endpoint_type': 'STANDARD', 'last_updated_user': 'nt@neoedu.co.th', 'id': '22d4102f-6fe9-4328-8971-497579875cd7', 'endpoint_status': {'state':

{'name': 'pubned_vs_endpoint',
 'creator': 'nt@neoedu.co.th',
 'creation_timestamp': 1730105043063,
 'last_updated_timestamp': 1730105043063,
 'endpoint_type': 'STANDARD',
 'last_updated_user': 'nt@neoedu.co.th',
 'id': '22d4102f-6fe9-4328-8971-497579875cd7',
 'endpoint_status': {'state': 'ONLINE'},
 'num_indexes': 0}

# Create Vector Search Index

In [0]:
# Create or sync the index

def index_exists(vsc, endpoint_name, index_full_name):
  try:
      dict_vsindex = vsc.get_index(endpoint_name, index_full_name).describe()
      return dict_vsindex.get('status').get('ready', False)
  except Exception as e:
      if 'RESOURCE_DOES_NOT_EXIST' not in str(e):
          print(f'Unexpected error describing the index. This could be a permission issue.')
          raise e
  return False

embedding_model_endpoint = "databricks-bge-large-en"

if not index_exists(vsc, vs_endpoint_name, vs_index_table_fullname):
    print(f"Creating index {vs_index_table_fullname} on endpoint {vs_endpoint_name}...")
        
    vsc.create_delta_sync_index(
        endpoint_name=vs_endpoint_name,
        index_name=vs_index_table_fullname,
        source_table_name= source_table_fullname,
        pipeline_type="TRIGGERED", #Sync needs to be manually triggered
        primary_key="id",
        embedding_source_column="content",
        embedding_model_endpoint_name=embedding_model_endpoint
        )
else:
    # Trigger a sync to update our vs content with the new data saved in the table
    vsc.get_index(vs_endpoint_name, vs_index_table_fullname).sync()



Creating index workspace_us.default.pubmed_index on endpoint pubned_vs_endpoint...


In [0]:
def wait_for_index_to_be_ready(vsc, vs_endpoint_name, index_name):
  for i in range(180):
    idx = vsc.get_index(vs_endpoint_name, index_name).describe()
    index_status = idx.get('status', idx.get('index_status', {}))
    status = index_status.get('detailed_state', index_status.get('status', 'UNKNOWN')).upper()
    url = index_status.get('index_url', index_status.get('url', 'UNKNOWN'))
    if "ONLINE" in status:
      return
    if "UNKNOWN" in status:
      print(f"Can't get the status - will assume index is ready {idx} - url: {url}")
      return
    elif "PROVISIONING" in status:
      if i % 40 == 0: print(f"Waiting for index to be ready, this can take a few min... {index_status} - pipeline url:{url}")
      time.sleep(10)
    else:
        raise Exception(f'''Error with the index - this shouldn't happen. DLT pipeline might have been killed.\n Please delete it and re-run the previous cell: vsc.delete_index("{index_name}, {vs_endpoint_name}") \nIndex details: {idx}''')
  raise Exception(f"Timeout, your index isn't ready yet: {vsc.get_index(index_name, vs_endpoint_name)}")

# Let's wait for the index to be ready and all our embeddings to be created and indexed
wait_for_index_to_be_ready(vsc, vs_endpoint_name, vs_index_table_fullname)

Waiting for index to be ready, this can take a few min... {'detailed_state': 'PROVISIONING_PIPELINE_RESOURCES', 'message': 'Index is currently pending setup of pipeline resources. Check latest status: https://dbc-d1407b2d-ba16.cloud.databricks.com/explore/data/workspace_us/default/pubmed_index', 'indexed_row_count': 0, 'provisioning_status': {'provisioning_pipeline_time_spent_seconds': 36.0}, 'ready': False, 'index_url': 'dbc-d1407b2d-ba16.cloud.databricks.com/api/2.0/vector-search/endpoints/pubned_vs_endpoint/indexes/workspace_us.default.pubmed_index'} - pipeline url:dbc-d1407b2d-ba16.cloud.databricks.com/api/2.0/vector-search/endpoints/pubned_vs_endpoint/indexes/workspace_us.default.pubmed_index
Waiting for index to be ready, this can take a few min... {'detailed_state': 'PROVISIONING_INITIAL_SNAPSHOT', 'message': 'Index is currently is in the process of syncing initial data. Check latest status: https://dbc-d1407b2d-ba16.cloud.databricks.com/explore/data/workspace_us/default/pubmed_



[0;31m---------------------------------------------------------------------------[0m
[0;31mHTTPError[0m                                 Traceback (most recent call last)
File [0;32m/local_disk0/.ephemeral_nfs/envs/pythonEnv-66e6a84c-bb99-44c9-8991-f90551b96378/lib/python3.10/site-packages/databricks/vector_search/utils.py:78[0m, in [0;36mRequestUtils.issue_request[0;34m(url, method, token, params, json, verify, auth, data, headers)[0m
[1;32m     77[0m [38;5;28;01mtry[39;00m:
[0;32m---> 78[0m     [43mresponse[49m[38;5;241;43m.[39;49m[43mraise_for_status[49m[43m([49m[43m)[49m
[1;32m     79[0m [38;5;28;01mexcept[39;00m [38;5;167;01mException[39;00m [38;5;28;01mas[39;00m e:

File [0;32m/local_disk0/.ephemeral_nfs/envs/pythonEnv-66e6a84c-bb99-44c9-8991-f90551b96378/lib/python3.10/site-packages/requests/models.py:1024[0m, in [0;36mResponse.raise_for_status[0;34m(self)[0m
[1;32m   1023[0m [38;5;28;01mif[39;00m http_error_msg:
[0;32m-> 1024[0m     [