This notebook shows an example of creating a vector search index, uploading data to it and seeing the sync update

In [0]:
%pip install databricks-vectorsearch
dbutils.library.restartPython()

In [0]:
from databricks.vector_search.client import VectorSearchClient

# The following line automatically generates a PAT Token for authentication
client = VectorSearchClient()

# The following line uses the service principal token for authentication
# client = VectorSearchClient(service_principal_client_id=<CLIENT_ID>,service_principal_client_secret=<CLIENT_SECRET>)

client.create_endpoint(
    name="dilshad_wine_reviews_vector_search_endpoint",
    endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)

Fetch the data using the URL from kaggle for downloading the dataset (using the cURL option)

In [0]:
%sh wget -O /Volumes/dilshad_shawki/test/my_test_csvs/archive.zip "https://www.kaggle.com/api/v1/datasets/download/zynicide/wine-reviews?dataset_version_number=4"

In [0]:
%sh
export ROOT_DIR=/Volumes/dilshad_shawki/test/my_test_csvs
export OUTPUT_DIR=$ROOT_DIR/wine_reviews
mkdir -p $OUTPUT_DIR
unzip -o $ROOT_DIR/archive.zip -d $OUTPUT_DIR

In [0]:
%sql
CREATE OR REPLACE TABLE dilshad_shawki.test.wine_reviews_data AS
SELECT uuid() as wine_review_id, * FROM read_files('/Volumes/dilshad_shawki/test/my_test_csvs/wine_reviews/winemag-data_first150k.csv')

In [0]:
# Vector index
catalog_name = 'dilshad_shawki'
schema_name = 'test'
table_name = 'wine_reviews_data'
source_table_name = f'{catalog_name}.{schema_name}.{table_name}'
vs_index_fullname = f"{source_table_name}_vsindex"

In [0]:
spark.sql(f'ALTER TABLE {source_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)')

In [0]:
client = VectorSearchClient()

index = client.create_delta_sync_index(
  endpoint_name="dilshad_wine_reviews_vector_search_endpoint",
  source_table_name=source_table_name,
  index_name=vs_index_fullname,
  pipeline_type="TRIGGERED",
  primary_key="wine_review_id",
  embedding_source_column="description",
  embedding_model_endpoint_name="databricks-gte-large-en", # This model is used for ingestion, and is also used for querying unless model_endpoint_name_for_query is specified.
)
index.describe()['status']['message']

In [0]:
index.describe()['status']['message']

Now let us add more data to the source table and sync the vector store database index

In [0]:
%sql
CREATE OR REPLACE TABLE dilshad_shawki.test.wine_reviews_data_extra AS
SELECT uuid() as wine_review_id,
* FROM READ_FILES('/Volumes/dilshad_shawki/test/my_test_csvs/winemag-data-130k-v2.csv')

Take a sample of the data from extra table

In [0]:
new_df = spark.sql('SELECT * FROM dilshad_shawki.test.wine_reviews_data_extra TABLESAMPLE (0.1 PERCENT)')
display(new_df)

Append this to the existing table to mimic new data coming in

In [0]:
new_df.write.mode("append").option("mergeSchema", "true").saveAsTable("dilshad_shawki.test.wine_reviews_data")

Trigger the sync

In [0]:
index.sync()

check the status of the sync

In [0]:
display(index.describe()['status'])