# Embedding Text with local (per node) NVIDIA TensorRT accelerator and GPU based Aproximate Nearest Neighbor (ANN)

The demo extending existing [Azure OpenAI based demo](https://github.com/microsoft/SynapseML/blob/master/docs/Explore%20Algorithms/OpenAI/Quickstart%20-%20OpenAI%20Embedding%20and%20GPU%20based%20KNN.ipynb) when encoding is processed by OpenAI requests and KNN was using GPU based brute force search. This tutorial shows how to perform fast local embeddings using [multilingual E5 text embeddings](https://arxiv.org/abs/2402.05672) and fast aproximate Nearest Neighbor search using IVFFlat alcorithm. All tutorial stages accelerated by NVIDIA GPU using [NVIDIA TensorRT](https://developer.nvidia.com/tensorrt) and [Spark Rapids ML](https://github.com/NVIDIA/spark-rapids-ml). The tutorial folder contains two benchmark notebooks to demonstrate advantages of the presented GPU based approach compare to [previos CPU based demo](https://github.com/microsoft/SynapseML/blob/master/docs/Explore%20Algorithms/OpenAI/Quickstart%20-%20OpenAI%20Embedding.ipynb)

The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but currently the notebook was running on Databricks GPU based cluster using Standard_NC24ads_A100_v4 with 6 workers. Databricks Runtime was 13.3 LTS ML (includes Apache Spark 3.4.1, GPU, Scala 2.12) with related [init_script](https://github.com/microsoft/SynapseML/tree/master/tools/init_scripts) to install all required packages.


## Step 1: Prepare Environment

It will imports required libraries and get initial settings

In [0]:
import logging
import os 
import pyspark.sql.functions as F
import model_navigator as nav
import pandas as pd
import torch
import mlflow
import datetime
import pytz

from model_navigator.inplace.config import DEFAULT_CACHE_DIR
from stnavigator import SentenceTransformerNavigator
from pyspark.sql.functions import pandas_udf, col, struct, arrays_zip, explode, broadcast
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StructType, StructField, StringType
from pyspark.ml.functions import predict_batch_udf
from pyspark.ml.linalg import Vectors, VectorUDT
from spark_rapids_ml.knn import ApproximateNearestNeighbors, ApproximateNearestNeighborsModel
# from data_loader import load_input_data
from utility import generate_1M_data
logging.getLogger('py4j').setLevel(logging.ERROR)
mlflow.autolog(disable=True)
# Define the PST timezone
pst_timezone = pytz.timezone('US/Pacific')

INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


## Step 2: Load Data

In this demo we will explore a dataset of fine food reviews

In [0]:
limit = 100
logging.info(f"Start demo run {limit}")

# Get the current time in UTC and convert it to PST
current_start_time_utc = datetime.datetime.now(pytz.utc)
current_time_pst = current_start_time_utc.astimezone(pst_timezone)

# Print the current time in PST
logging.info("Current time in PST:", current_time_pst.strftime('%Y-%m-%d %H:%M:%S %Z%z'))

df = generate_1M_data(spark, "wasbs://publicwasb@mmlspark.blob.core.windows.net/fine_food_reviews_1k.csv")

df = df.limit(limit).repartition(10).cache()

INFO:root:Start demo run 100
--- Logging error ---
Traceback (most recent call last):
  File "/usr/lib/python3.10/logging/__init__.py", line 1100, in emit
    msg = self.format(record)
  File "/usr/lib/python3.10/logging/__init__.py", line 943, in format
    return fmt.format(record)
  File "/usr/lib/python3.10/logging/__init__.py", line 678, in format
    record.message = record.getMessage()
  File "/usr/lib/python3.10/logging/__init__.py", line 368, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/databricks/python_shell/scripts/db_ipykernel_launcher.py", line 141, in <module>
    app.start()
  File "/databricks/python/lib/python3.10/site-packages/ipykernel/kernelapp.py", line 712, in start
    self.io_loop.start()
  File "/databricks/python/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 199, in start
    self.asyncio_loop.run_forever()
  File "/usr/lib/python3.10/asyncio/base_events.py", lin

## Step 3: Generate Embeddings

We will first generate embeddings using NVIDIA TensorRT optimized SentenceTransformer

In [0]:
def predict_batch_fn():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = SentenceTransformerNavigator("intfloat/e5-large-v2").eval()
    model = nav.Module(model, name="e5-large-v2")
    model = model.to(device)
    nav.load_optimized()

    def predict(inputs):
        with torch.no_grad():
            output = model.encode(inputs.tolist(), convert_to_tensor=False, show_progress_bar=True)
        return output
    return predict

In [0]:
encode = predict_batch_udf(predict_batch_fn,
                           return_type=ArrayType(FloatType()),
                           batch_size=10)

# first pass caches model/fn
all_embeddings = df.withColumn("embeddings", encode(struct("combined"))).cache()                                                    

## Step 4: Build the query against embeddings

Get query embeddings running standard SentenceTransformer just on the driver. Convert embedding results to a data frame

In [0]:
from sentence_transformers import SentenceTransformer
if 'model' not in globals():
    model = SentenceTransformer("intfloat/e5-large-v2").eval()

# Generate embeddings
with torch.no_grad():
    query = ["desserts", "disgusting"]
    embeddings = [embedding.tolist() for embedding in model.encode(query)]

INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: intfloat/e5-large-v2
INFO:sentence_transformers.SentenceTransformer:Use pytorch device: cuda


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [0]:
# Prepare data including IDs
data_with_ids = [(1, query[0], embeddings[0]), (2, query[1], embeddings[1])]

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("query", StringType(), nullable=False),
    StructField("embeddings", ArrayType(FloatType(), containsNull=False), nullable=False)
])

# Create a DataFrame using the data with IDs and the schema
query_embeddings = spark.createDataFrame(data=data_with_ids, schema=schema).cache()

## Step 5: Build a fast vector index to over review embeddings

We will use fast NVIDIA Rapids indexer

In [0]:
rapids_knn = ApproximateNearestNeighbors(k=10)
rapids_knn.setInputCol("embeddings").setIdCol("id")

rapids_knn_model = rapids_knn.fit(all_embeddings.select("id", "embeddings"))

## Step 6: Find top k Nearest Neighbors

We will use fast ANN IVFFlat algorithm from Rapids

In [0]:
(_, _, knn_df) = rapids_knn_model.kneighbors(query_embeddings.select("id", "embeddings"))

In [0]:
display(knn_df)

query_id,indices,distances
1,"List(47, 77, 16, 36, 58, 30, 40, 84, 96, 25)","List(0.67687565, 0.6774632, 0.68367195, 0.68439406, 0.68577915, 0.68736494, 0.6878976, 0.69160885, 0.695291, 0.6957457)"
2,"List(67, 58, 75, 83, 98, 77, 30, 32, 40, 16)","List(0.6797807, 0.6797807, 0.68125886, 0.6842549, 0.69774866, 0.7001144, 0.7020483, 0.70247746, 0.70598215, 0.7062124)"
