<h2 style="text-align:left">Databricks Vector Search</h2>


## How does it work?
We will implement option1 from [Databricks docs](https://docs.databricks.com/en/generative-ai/vector-search.html):

You provide a source Delta table that contains data in text format. Databricks calculates the embeddings, using a model that you specify, and optionally saves the embeddings to a table in Unity Catalog. As the Delta table is updated, the index stays synced with the Delta table.

The following diagram illustrates the process:

Calculate query embeddings. Query can include metadata filters.

Perform similarity search to identify most relevant documents.

Return the most relevant documents and append them to the query.  
    
<img src="https://nmd-gen-ai-workshop-content.s3.us-west-2.amazonaws.com/images/Screenshot+2024-05-25+at+3.02.39%E2%80%AFPM.png" style="float: left; height: 350px; ">

In [None]:
from databricks.vector_search.client import VectorSearchClient

In [None]:
#modify if needed
CATLOG_NAME = "<catalog_name>" 
SCHEMA_NAME = "vectorsearch"

VS_ENDPOINT_NAME = "vector_search_endpoint"
VS_MODEL_NAME = "databricks-bge-large-en"

TABLE_NAME = "movies"
INDEX_NAME = "moviesindex"

In [None]:
vsclient = VectorSearchClient(disable_notice=True)

In [None]:
#create VectorSearch endpoint if it does not exist
try:  
  vsclient.get_endpoint(name=VS_ENDPOINT_NAME)
  print("VectorSearch endpoint already exists.")
except Exception as e:
  if ('status_code 404' in e.args[0]): #endpoint does not exist   
    #vsclient.create_endpoint_and_wait(name=VS_ENDPOINT_NAME)
    vsclient.create_endpoint(name=VS_ENDPOINT_NAME) 
    print("VectorSearch endpoint is created.")
  else:
    print(e)

VectorSearch endpoint already exists.


In [None]:
#delete index if it exists
try:  
  vsclient.delete_index(endpoint_name=VS_ENDPOINT_NAME, index_name=f"{CATLOG_NAME}.{SCHEMA_NAME}.{INDEX_NAME}")
  print("Deleted index")
except Exception as e:
  if ('status_code 404' in e.args[0]):
    print("Index does not exist")
  else:
    print(e)



Index does not exist


In [None]:
#delete unity catalog table if it exists
#spark.sql(f"""DROP TABLE IF EXISTS {CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}""")

In [None]:
#test dataset has movies plots
df = spark.read.csv("s3:/<s3bucketname>/data/movie_plots.csv", header=True)
display(df)

movie_id,title,genre,plot
1,The Shawshank Redemption,Drama,"Two imprisoned men bond over a number of years, finding solace and eventual redemption through acts of common decency."
2,The Godfather,Crime,The aging patriarch of an organized crime dynasty transfers control of his clandestine empire to his reluctant son.
3,The Dark Knight,Action,"When the menace known as the Joker emerges from his mysterious past, he wreaks havoc and chaos on the people of Gotham."
4,Pulp Fiction,Crime,"The lives of two mob hitmen, a boxer, a gangster's wife, and a pair of diner bandits intertwine in four tales of violence and redemption."
5,Schindler's List,Biography,"In German-occupied Poland during World War II, Oskar Schindler gradually becomes concerned for his Jewish workforce after witnessing their persecution by the Nazis."
6,The Lord of the Rings: The Return of the King,Adventure,Gandalf and Aragorn lead the World of Men against Sauron's army to draw his gaze from Frodo and Sam as they approach Mount Doom with the One Ring.
7,Fight Club,Drama,"An insomniac office worker and a devil-may-care soap maker form an underground fight club that evolves into something much, much more."
8,Forrest Gump,Drama,"The presidencies of Kennedy and Johnson, the events of Vietnam, Watergate, and other history unfold through the perspective of an Alabama man with an IQ of 75."
9,Inception,Sci-Fi,A thief who steals corporate secrets through the use of dream-sharing technology is given the inverse task of planting an idea into the mind of a C.E.O.
10,The Matrix,Sci-Fi,A computer hacker learns from mysterious rebels about the true nature of his reality and his role in the war against its controllers.


In [None]:
#load data into delta table and enable change data feed
(df
    .write
    .format('delta')
    .mode('overwrite')
    .option('overwriteSchema','true')
    .option("delta.enableChangeDataFeed", "true")
    .saveAsTable(f"{CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}")
  )

#spark.sql(f"""ALTER TABLE {CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME} SET TBLPROPERTIES (delta.enableChangeDataFeed = true) """)

In [None]:
#create vectorsearch index
index = vsclient.create_delta_sync_index_and_wait(
  endpoint_name=VS_ENDPOINT_NAME,
  source_table_name=f"{CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}",
  index_name=f"{CATLOG_NAME}.{SCHEMA_NAME}.{INDEX_NAME}",
  pipeline_type='TRIGGERED',
  primary_key="movie_id",
  embedding_source_column="plot",
  embedding_model_endpoint_name=VS_MODEL_NAME
)

In [None]:
#syncing index with delta table
#index.sync()

In [None]:
#helper function to display search results
def display_search_results(results):
    rows = results['result']['data_array']
    for (movie_id, genre, plot, title, score) in rows:
        if len(plot) > 32:
            # trim text output for readability
            plot = plot[0:32] + "..."
        print(f"id: {movie_id}  title: {title} genre '{genre}' plot '{plot}' score: {score}")

In [None]:
#testing similarity search
results = index.similarity_search(
  query_text="A group of friends embark on an epic adventure to destroy a powerful ring.",
  columns=["movie_id", "genre","plot","title"],
  num_results=3,
  )
display_search_results(results)

id: 6  title: The Lord of the Rings: The Return of the King genre 'Adventure' plot 'Gandalf and Aragorn lead the Wor...' score: 0.59288234
id: 20  title: Interstellar genre 'Sci-Fi' plot 'A team of explorers travel throu...' score: 0.5451615
id: 12  title: The Empire Strikes Back genre 'Action' plot 'After the Rebels are brutally ov...' score: 0.5049474


In [None]:
#testing hybrid search
results = index.similarity_search(
  query_text="A group of friends embark on an epic adventure to destroy a powerful ring.",
  columns=["movie_id", "genre","plot","title"],
  num_results=3,
  query_type="hybrid"
  )
display_search_results(results)

id: 6  title: The Lord of the Rings: The Return of the King genre 'Adventure' plot 'Gandalf and Aragorn lead the Wor...' score: 0.9765625
id: 17  title: Saving Private Ryan genre 'War' plot 'Following the Normandy Landings,...' score: 0.9606894841269842
id: 16  title: The Usual Suspects genre 'Mystery' plot 'A sole survivor tells of the twi...' score: 0.9485294117647058


In [None]:
#testing similarity search with filtering
results = index.similarity_search(
  query_text="A group of friends embark on an epic adventure to destroy a powerful ring.",
  columns=["movie_id", "genre","plot","title"],
  num_results=3,
  filters={"genre NOT": "Adventure"}
  )
display_search_results(results)

id: 20  title: Interstellar genre 'Sci-Fi' plot 'A team of explorers travel throu...' score: 0.5451615
id: 12  title: The Empire Strikes Back genre 'Action' plot 'After the Rebels are brutally ov...' score: 0.5049474
id: 17  title: Saving Private Ryan genre 'War' plot 'Following the Normandy Landings,...' score: 0.4915407


Run the next 3 cells and experiment with adding new movie to the table:

In [None]:
#add new data to the delta table
spark.sql(f"""INSERT INTO {CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME} (movie_id, title, genre, plot) VALUES (21, 'Peter Pan','Kids', 'Peter Pan, a boy who never grows up, takes the children Wendy, John, and Michael to magical Neverland, where Wendy mothers the Lost Boys.') """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [None]:
#it takes a while to rebuild the index
index.sync()

{}

In [None]:
#testing new data
results = index.similarity_search(
  query_text="Adventures of the boy who never grows up.",
  columns=["movie_id", "genre","plot","title"],
  num_results=3
  )
display_search_results(results)

id: 21  title: Peter Pan genre 'Kids' plot 'Peter Pan, a boy who never grows...' score: 0.591169
id: 13  title: City of God genre 'Crime' plot 'In the slums of Rio, two kids' p...' score: 0.518501
id: 10  title: The Matrix genre 'Sci-Fi' plot 'A computer hacker learns from my...' score: 0.50492597


In [None]:
#delete vectorsearch index
vsclient.delete_index(endpoint_name=VS_ENDPOINT_NAME, index_name=f"{CATLOG_NAME}.{SCHEMA_NAME}.{INDEX_NAME}")

{}

In [None]:
#delete unity catalog table
spark.sql(f"""DROP TABLE {CATLOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}""")

DataFrame[]