### Applying RAG to Vectorized Chat History

### RAG Architecture

<img src="https://appliedaipublicdata.blob.core.windows.net/chathistorydemo/Chat_history_Rag_Architecture.png" style="width:800px;\"/>



In [None]:
%pip install azure-cosmos==4.7.0
%pip install openai==0.28.1

In [None]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
from azure.cosmos import CosmosClient
from azure.cosmos import CosmosClient
from azure.cosmos.aio import CosmosClient as CosmosAsyncClient
from azure.cosmos import PartitionKey, exceptions

import os
import requests
import json
from datetime import datetime, timedelta

import ipywidgets as widgets
from IPython.display import display as w_display

from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    to_timestamp, current_timestamp, concat, col, split, explode, udf,
    monotonically_increasing_id, when, rand, coalesce, lit, input_file_name,
    regexp_extract, concat_ws, length, ceil
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, FloatType
)

from synapse.ml.featurize.text import PageSplitter
from synapse.ml.services.openai import OpenAIEmbedding, OpenAIChatCompletion
import openai
import pandas as pd

### Step 1: Load mirrored chat history into spark dataframe

In [None]:
#df = spark.sql(f"SELECT * FROM fabcon2024.Container1 where _ts between {next_highest_timestamp} and {highest_timestamp}")
chathistory_container_name = 'chat_history'
df = spark.sql(f"SELECT * FROM fabcondemo1.{chathistory_container_name}")
display(df)

### Step2: Generate vector embedding using built-in Azure OpenAI through Synapse-ML Library

We will generate vector representation for each conversation. 

In [None]:
Embd = (
    OpenAIEmbedding()
    .setDeploymentName('text-embedding-ada-002') # set deployment_name as text-embedding-ada-002
    .setTextCol("Messages")
    .setErrorCol("Error")    
    .setOutputCol("Embedding")
)
df_embeddings = Embd.transform(df)
display(df_embeddings)

### Step 3: Establish connection to Cosmos DB, define vector Search Policy

In the hidden cell below, we define the connection string.

nosql_conn_string = 'AccountEndpoint= Endpoint/;AccountKey=KEY'

In [None]:
endpoint = 'https://accountname.documents.azure.com:443/'
key = 'Account KEY'
nosql_conn_string = 'AccountEndpoint=endpoint;AccountKey=key'

In [None]:
COSMOS_NOSQL_CLIENT = CosmosClient.from_connection_string(nosql_conn_string)

In [None]:
indexing_policy = {
    "includedPaths": [{"path": "/*"}],
    "excludedPaths": [
        {"path": '/"_etag"/?'},
        {"path": "/content_vector/*"},
    ],
    "vectorIndexes": [
        {"path": "/contentVector", "type": "quantizedFlat"},
    ],
}

In [None]:
vector_embedding_policy = {
    "vectorEmbeddings": [
        {
            "path": "/contentVector",
            "dataType": "float32",
            "distanceFunction": "dotproduct",
            "dimensions": 1536,
        }
    ]
}

In [None]:
SEARCH_INDEX_CONFIG = {
    "indexingPolicy": indexing_policy,
    "vectorEmbeddingPolicy": vector_embedding_policy,
}

In [None]:
class NOSQLDBService():
    def __init__(
        self, db_name, container_name, search_index_config=SEARCH_INDEX_CONFIG
    ):
        self.db_name = db_name
        self.container_name = container_name
        self.search_index_config = search_index_config
        self.client = COSMOS_NOSQL_CLIENT
        self._create_db()
        self._create_container()

    def _create_db(self):
        import json

        self.db = self.client.create_database_if_not_exists(id=self.db_name)
        self.db_properties = self.db.read()
        print(json.dumps(self.db_properties))

    def _create_container(self):
        try:
            self.container = self.db.create_container_if_not_exists(
                id=self.container_name,
                partition_key=PartitionKey(path="/id", kind="Hash"),
                indexing_policy=self.search_index_config["indexingPolicy"],
                vector_embedding_policy=self.search_index_config[
                    "vectorEmbeddingPolicy"
                ],
            )
        except exceptions.CosmosResourceExistsError:
            print(f"Container {self.container_name} already exists.")
            self.container = self.db.get_container_client(self.container_name)
        except exceptions.CosmosHttpResponseError as e:
            print(f"Failed to create container {self.container_name}: {e}")


In [None]:
db_name = 'chatbot'
container_name = 'chat_vectors'
chatbot_nosql = NOSQLDBService(db_name= db_name, container_name=container_name, search_index_config=SEARCH_INDEX_CONFIG)

### Step 4: Insert vector data into CosmosDB nosql vectordb

In [None]:
@udf(returnType = StringType())
def store_data(Id, Messages, Embedding):
    client = CosmosClient.from_connection_string(nosql_conn_string)
    # Get the database
    database = client.get_database_client(f'{db_name}')

    # Get the container
    container = database.get_container_client(f'{container_name}')

    # Convert the DataFrame to a list of tuples for bulk insertion
    data_dict = {
        'id' : str(Id),
        'Messages': Messages,
        'contentVector': Embedding.tolist(), 
    }
    
    try:
       container.upsert_item(data_dict) 
    except Exception as e:
        print(f"Error inserting item: {e}")

In [None]:
df_embeddings_ingested = df_embeddings.withColumn("InsertStatus", store_data("Id", "Messages", "Embedding"))
#display(df_embeddings_ingested)

### Step 5: Query from the CosmosDB NoSQL vector database

##### Define helper functions for the retrievals from Cosmosdb NoSQL vector database

In [None]:
# Helper function to check if a value is a vector
def is_vector(value):
    return isinstance(value, list) and all(isinstance(i, (int, float)) for i in value)

def retrieve_data(query, num_results=5, search_type = "vector", filter = "None", search_index_name = "contentVector"):
    client = CosmosClient.from_connection_string(nosql_conn_string)
    # Get the database
    database = client.get_database_client(f'{db_name}')

    # Get the container
    container = database.get_container_client(f'{container_name}')

    deployment_id = "text-embedding-ada-002"
    embeddings = openai.Embedding.create(deployment_id=deployment_id,
                                     input=query).data[0].embedding

    output = container.query_items(
        query="SELECT * FROM c OFFSET 0 LIMIT 1", enable_cross_partition_query=True
    )
    sample_entry = list(output)[0]
    selected_keys = [
        k
        for k in sample_entry.keys()
        if k not in ["_rid", "_self", "_etag", "_attachments", "_ts", "@search.action"]
    ]

    non_vector_keys = [
        k for k in selected_keys if sample_entry[k] and not is_vector(sample_entry[k])
    ]
    non_vector_keys = ["c." + key for key in non_vector_keys]
    columns_str = ", ".join(non_vector_keys)

        # Perform vector search or filter_vector search based on search_type
    if search_type == "vector":
        output = container.query_items(
            query=f"SELECT TOP @num_results {columns_str}, VectorDistance(c.{search_index_name}, @embedding) AS SimilarityScore FROM c ORDER BY VectorDistance(c.{search_index_name},@embedding)",
            parameters=[
                {"name": "@embedding", "value": embeddings},
                {"name": "@num_results", "value": num_results * 1},
            ],
            enable_cross_partition_query=True,
        )
    elif search_type == "filter_vector":
        output = container.query_items(
            query=f"SELECT TOP @num_results {columns_str}, VectorDistance(c.{search_index_name}, @embedding) AS SimilarityScore FROM c WHERE ({filter_text}) ORDER BY VectorDistance(c.{search_index_name},@embedding)",
            parameters=[
                {"name": "@embedding", "value": embeddings},
                {"name": "@num_results", "value": num_results * 1},
            ],
            enable_cross_partition_query=True,
        )
    else:
        raise ValueError(
            "Invalid Input.Also note that Hybrid search not supported. Valid search_type: 'vector', 'filter_vector'"
        )

    ans = []
    while len(ans) < num_results and output:
        try:
            res = next(output)
            ans.append(res)
        except StopIteration:
            break

    # Sanity checking the results
    if not ans:
        warnings.warn("No results found for the given query")
    if len(ans) < num_results:
        warnings.warn(f"Only {len(ans)} results found for the given query")

    if "SimilarityScore" not in ans[0].keys():
        raise ValueError(
            "SimilarityScore not found in the output. Please check dimension match between the query embeddings and the embeddings in the container"
        )


    return ans

In [None]:
def get_answer(question, context):
    """  
    Generates a response to a given question using provided context and an Azure OpenAI model.  
    
    Parameters:  
        question (str): The question that needs to be answered.  
        context (str): The contextual information related to the question that will help generate a relevant response.  
    
    Returns:  
        str: The response generated by the Azure OpenAI model based on the provided question and context.  
    """
    messages = [
        {
            "role": "system",
            "content": "You are a helpful chat assistant who will be provided text information for you to refer to in response."
        }
    ]

    messages.append(
        {
            "role": "user", 
            "content": question + "\n" + context,
        },
    )
    response = openai.ChatCompletion.create(
        deployment_id='gpt-4-32k', # see the note in the cell below for an alternative deployment_id.
        messages= messages,
        temperature=0,
    )

    return response.choices[0].message.content

In [None]:
def do_rag(question):
    retrieved_context = retrieve_data(query = question)
    messages_context = [entry["Messages"] for entry in retrieved_context]
    context = " ".join(messages_context)
    answer = get_answer(question, context)
    return answer

##### Ask questions

In [None]:
question1 = "list summary of top three conversations where users prefer AirStrider over WildRunner when chosing hiking shoes?"
answer1 = do_rag(question = question1)
print(answer1)

In [None]:
question2 = "List products that are disliked due to their color. Only return product name, type, and disliked color"
answer2 = do_rag(question2)
print(answer2)


In [None]:
question3 = "List top three products that are liked due to their design. Only return product name, type, and design features"
answer3 = do_rag(question3)
print(answer3)

In [None]:
question4 = "List top 3 products with customer complains in recent conversations. Only mention name, brand, and the top reason."
answer4 = do_rag(question4)
print(answer4)

In [None]:
question5 = "List top three products that users wish to see new features. Only mention name, brand, and the features."
answer5 = do_rag(question5)
print(answer5)

#### Store question and answer pairs in a lakehouse table

In [None]:
data = [('1', question1, answer1), ('2', question2, answer2), ('3', question3, answer3), ('4', question4, answer4), ('5', question5, answer5)]
df_rag = spark.createDataFrame(data, ["qaId", "Question", "Answer"])
tablename = 'chat_analysis_rag'
#combined_df.write.format("parquet").mode("overwrite").save(path)
df_rag.write.mode("overwrite").format("delta").saveAsTable(tablename)
display(df_rag)

##### Display mirrored vectordb

In [None]:
df = spark.sql("SELECT * FROM fabcondemo1.chat_vectors LIMIT 1000")
display(df)