In [0]:
# Permission is based on File or folder based ACL assignments to the Data Lake filesystem (container) . RBAC assignments to the top level Azure Data Lake resource is not required.
# https://docs.databricks.com/storage/azure-storage.html
spark.conf.set("fs.azure.account.auth.type.adls04.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adls04.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adls04.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientid"))
spark.conf.set("fs.azure.account.oauth2.client.secret.adls04.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientsecret"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adls04.dfs.core.windows.net", "https://login.microsoftonline.com/{}/oauth2/token".format(dbutils.secrets.get("myscope", key="tenantid")))

In [0]:
import json
import logging
import time
from azure.cosmos.aio import CosmosClient
from azure.cosmos import  PartitionKey, exceptions
from openai import AzureOpenAI
from time import sleep
import time
import json
import pyspark.pandas as ps
import uuid
import gradio as gr

In [0]:
cosmosdb_url =dbutils.secrets.get(scope="myscope", key="cosmosdb-url")
cosmosdb_key =dbutils.secrets.get(scope="myscope", key="cosmosdb-key")
cosmosdb_database_name = dbutils.secrets.get(scope="myscope", key="cosmosdb-database")
cosmosdb_collection_name = "movies"
cosmos_vector_property_name = "vector"
cosmosdb_chathistory_cache_name = "chat_cache"

azure_openai_endpoint=dbutils.secrets.get(scope="myscope", key="aoai-endpoint")
azure_openai_api_key=dbutils.secrets.get(scope="myscope", key="aoai-api-key")
azure_openai_deployment=dbutils.secrets.get(scope="myscope", key="aoai-deploymentname")
azure_openai_api_version = "2024-02-15-preview"
azure_openai_embedding_deployment = dbutils.secrets.get(scope="myscope", key="aoai-embedding-deployment")
azure_openai_embedding_model = dbutils.secrets.get(scope="myscope", key="aoai-embedding-model")

In [0]:
cosmos_client = CosmosClient(url=cosmosdb_url, credential=cosmosdb_key)
azure_openai_client = AzureOpenAI(
    api_key=azure_openai_api_key,
    api_version=azure_openai_api_version,
    azure_endpoint=azure_openai_endpoint,
)

####Create a database and containers with vector policies

In [0]:
# create a database
db = await cosmos_client.create_database_if_not_exists(id=cosmosdb_database_name)

In [0]:
# Create the vector embedding policy
vector_embedding_policy = {
    "vectorEmbeddings": [
        {
            "path": "/" + cosmos_vector_property_name,
            "dataType": "float32",
            "distanceFunction": "dotproduct",
            "dimensions": 1536
        }
    ]
}


# Create the vector index policy
indexing_policy = {
    "includedPaths": [{"path": "/*"}],
    "excludedPaths": [
        {"path": '/"_etag"/?', "path": "/" + cosmos_vector_property_name + "/*"}
    ],
    "vectorIndexes": [
        {"path": "/" + cosmos_vector_property_name, "type": "quantizedFlat"}
    ],
}

In [0]:
# create the collection using the vector index policies
try:
    pass
    container = await db.create_container_if_not_exists(
        id=cosmosdb_collection_name,
        partition_key=PartitionKey(path="/id"),
        vector_embedding_policy=vector_embedding_policy
    )
    print('Container with id \'{0}\' created'.format(id))
except exceptions.CosmosHttpResponseError:
    raise

In [0]:
# create the cache collection for storing chat history using the vector index policies
try:
    pass
    container_cache = await db.create_container_if_not_exists(
        id=cosmosdb_chathistory_cache_name,
        partition_key=PartitionKey(path="/id"),
        vector_embedding_policy=indexing_policy
    )
    print('Container with id \'{0}\' created'.format(id))
except exceptions.CosmosHttpResponseError:
    raise

In [0]:
# develop function to embed movie data attributes.
# add retry logic to handle throttling due to quota limits.


# @retry(wait=wait_random_exponential(min=1, max=10))
def generate_embeddings(text: str):
    """
    Generate embeddings from string of text.
    This will be used to vectorize data and user input for interactions with Azure OpenAI.
    """
    response = azure_openai_client.embeddings.create(
        input=text, model=azure_openai_embedding_deployment
    )
    embeddings = response.model_dump()
    return embeddings["data"][0]["embedding"]

####Load data and upload to azure cosmos db container
####Azure Cosmos DB Python SDK does not currently support bulk inserts so we'll have to insert the items sequentially

In [0]:
async def upload_data_to_cosmosdb():
    pass

    json_data = "abfss://experiments@adls04.dfs.core.windows.net/jsondata/cosmosdb-rag/"
    df_ = spark.read.format("json").option("multiline", "true").load(json_data)
    #df_.display()
    df_.count()

    # Convert to pandas
    df_pandas = ps.DataFrame(df_)
    # Drop the 'vector' column from the pyspark.pandas DataFrame
    df_pandas_ = df_pandas.drop(columns=['vector'])

    # Capture just 5 records from the pyspark.pandas DataFrame
    # df_pandas_head = df_pandas_.head(150)
    df_pandas_head = df_pandas_

    # Convert the captured records to JSON
    json_records = df_pandas_head.to_json(orient='records')

    # Convert JSON string to JSON object
    json_object = json.loads(json_records)

    # Generate embeddings for title and content fields and upload to cosmos db collection/container
    n = 0
    for item in json_object:
        n+=1
        id = item['id']
        overview = item["overview"]
        tagline = item["tagline"]
        title = item["title"]
        overview_embeddings = generate_embeddings(overview)
        item["vector"] = overview_embeddings
        item["@search.action"] = "upload"
        #print("Creating embeddings for item:", n, "/" ,len(json_object), end='\r')
        #print(".....uploading documents")
        await container.upsert_item(body=item)
    print("Inserted {} documents into collection.".format(n))

In [0]:
# run function
json_data = "abfss://experiments@adls04.dfs.core.windows.net/jsondata/cosmosdb-rag/"
df_ = spark.read.format("json").option("multiline", "true").load(json_data)
if df_.count() > 4489:
    pass
    await upload_data_to_cosmosdb()
else:
    print("The number of records is less than or equal to 4489. The function will not run.")

####Vector search in Azure Cosmos DB for NoSQL
#### Simple a function that will take in user's query, generate embeddings for the query text and then use the embedding to run a vector search to find the similar items. The most similar items must be used as additional knowledgebase for the completions model to answer the user's query

In [0]:
# https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query

In [0]:
# Search retrieval function
async def vector_search(query, container, similarity_score=0.03, num_results=2):
    query_embedding = generate_embeddings(query)
    search_result = ""
    results = container.query_items(
            query='SELECT TOP @num_results c.overview, c.title, VectorDistance(c.vector,@embedding) AS SimilarityScore  FROM c WHERE VectorDistance(c.vector,@embedding) > @similarity_score ORDER BY VectorDistance(c.vector,@embedding)',
            parameters=[
                {"name": "@embedding", "value": query_embedding}, 
                {"name": "@num_results", "value": num_results},
                {"name": "@similarity_score", "value": similarity_score} 
            ],
            #enable_cross_partition_query=True
            )
    formatted_results = []
    async for result in results: 
        score = result.pop("SimilarityScore")
        formatted_result = {
            'SimilarityScore': score,
            "document": result
        }
        formatted_results.append(formatted_result)

    return formatted_results

In [0]:
# test the search retrieval function

query= "do you have a fantasy movie?"
results = await vector_search(query=query, container=container, num_results=2)
results

In [0]:
async def get_chat_history(container_cache, completions=1):
    results = container_cache.query_items(
        query= '''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ])
    items = []
    async for result in results:
        items.append(result)
    
    return items

In [0]:
try:
  pass
  chat_hist = await get_chat_history(container_cache=container_cache)
  print(chat_hist[0]["completion"])
except Exception as e:
  print(e)

In [0]:
#This function grounds the model with system prompts, user queries and vector search results to enable accurate and relevant responses.

def generate_completion(vector_search_results, user_input, chat_history_list):
    system_prompt = '''
    You are an intelligent assistant for imdb movies.
    You are designed to provide helpful answers to user questions about movies given the information about provided.
        - Only answer questions related to the information provided below in the context.
        - Write the response as key value pairs.
        Example:
            Title: Matrix
            Overview: A movie about a man who is awoken from his sleep and finds himself in a strange new
        - If you're unsure of an answer, you can say ""I don't know"" or ""I'm not sure"" and recommend users search themselves."
        - Only provide answers that have movie titles and overview that are part of the provided context.
    '''
    # add system prompt
    messages=[{"role": "system", "content": system_prompt}]
    #chat history
    for chat in chat_history_list:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
    #print("Temp Chat history Messages going to openai:\n", messages)
    # add current user_input
    messages.append({"role": "user", "content": user_input})
    for item in vector_search_results:
        messages.append({"role": "system", "content": json.dumps(item["document"])})
        #print("####")
    #print("Vector search Messages going to openai:\n", messages)
    response = azure_openai_client.chat.completions.create(model=azure_openai_deployment, messages=messages,temperature=0.1)
    
    return response

In [0]:
# test the model generation function

question = "do you have a batman movie?"
#user_input = "tell me about a tom cruise movie?"
search_results = await vector_search(query=question, container=container, similarity_score=0.03, num_results=2)
x = generate_completion(vector_search_results=search_results, user_input=question, chat_history_list=await get_chat_history(container_cache=container_cache, completions=1))

# x
# print("\n\n")
x.to_dict()["choices"][0]["message"]["content"]

In [0]:
search_results[0]["document"]

In [0]:
async def save_chat_history(container_cache, user_input, user_input_embedding, completion_results):
    chat_history_object = {
        "id": str(uuid.uuid4()),
        "prompt": user_input,
        "completion": completion_results.to_dict()["choices"][0]["message"]["content"],
        "completionTokens": str(completion_results.to_dict()["usage"]["completion_tokens"]),
        'promptTokens': str(completion_results.to_dict()['usage']['prompt_tokens']),
        'totalTokens': str(completion_results.to_dict()['usage']['total_tokens']),
        'model': completion_results.to_dict()['model'],
        'vector': user_input_embedding
        }
    try:
        pass
        # Insert the chat document into the Cosmos DB container
        await container_cache.create_item(body=chat_history_object)
        #print("item inserted into cache.", chat_history_object)
    except Exception as e:
        print(e)


# Get chat history from chat history container
# Perform a vector search on the Cosmos DB chat history container
async def get_cache(container, vectors, similarity_score=0.02, num_results=1):
    # Execute the query
    formatted_results = []
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ], populate_query_metrics=True)
    #results = list(results)
    #print(results)
    async for result in results: 
        #print(f"Similarity Score: {result['SimilarityScore']}") 
        formatted_results.append(result['completion'])

    return formatted_results

# Test the get chat history function
await get_cache(container=container_cache, vectors=generate_embeddings(text="do you have a spy movie?"), similarity_score=0.99)

#### Loop function to perform Q&A over the sample movie data! It uses the user input, search result to generate the model output.

#### Also, add a chat history cache container/collection to the cosmos db database. This will first be checked before routing queries to the LLM.

In [0]:
# Create a loop of user input and model output. You can now perform Q&A over the sample data!# Create a loop of user input and model output. You can now perform Q&A over the sample data!

async def chat_loop():
    pass
    print("*** Please ask your model questions about imdb movies. Type 'end' to end the session.")
    while True:
        try:
            user_input = input("User prompt: ").lower()
            user_input_embeddings = generate_embeddings(text=user_input)
            
            # Query the chat history cache first to see if this question has been asked before
            cache_results = await get_cache(container=container_cache, vectors=user_input_embeddings, similarity_score=0.02, num_results=1)
            if len(cache_results) > 0:
                pass
                print("Cached Result\n")
                print(cache_results[0])
                continue
                
            if user_input == "end":
                print("\n\nExiting chat..")
                print("Good bye, please let me know if you need further help.")
                break
                
                #return "Good bye, please let me know if you need further help."
        except Exception as e:
            print(e)
            continue
        search_results = await vector_search(query=user_input, container=container, similarity_score=0.03, num_results=2)
        #chat history
        chat_history_list = await get_chat_history(container_cache=container_cache, completions=1)
        completion_results = generate_completion(vector_search_results=search_results, user_input=user_input, chat_history_list=chat_history_list)
        print("Completion Result\n")
        
        # save the chat history to cosmos db
        await save_chat_history(container_cache=container_cache, user_input=user_input, user_input_embedding=user_input_embeddings, completion_results=completion_results)
        #return completion_results.to_dict()["choices"][0]["message"]["content"]
        print(completion_results.to_dict()["choices"][0]["message"]["content"])

In [0]:
# Run the function
await chat_loop()

#### Create a Gradio UI for the Chatbot

In [0]:
# Create a loop of user input and model output. You can now perform Q&A over the sample data!

async def chat_bot_function(user_input):
    
    #print("*** Please ask your model questions about imdb movies. Type 'end' to end the session.")
    try:
        user_input = user_input.lower()
        # Query the chat history cache first to see if this question has been asked before
        user_input_embeddings = generate_embeddings(text=user_input)
        cache_results = await get_cache(container=container_cache, vectors=user_input_embeddings, similarity_score=0.99, num_results=1)
        if len(cache_results) > 0:
            print("Cached Result\n")
            return cache_results[0]

        search_results = await vector_search(query=user_input, container=container, similarity_score=0.8, num_results=1)
        chat_history_list = await get_chat_history(container_cache=container_cache, completions=1)
        completion_results = generate_completion(vector_search_results=search_results, user_input=user_input, chat_history_list=chat_history_list)
        await save_chat_history(container_cache=container_cache, user_input=user_input, user_input_embedding=user_input_embeddings, completion_results=completion_results)
        response = completion_results.to_dict()["choices"][0]["message"]["content"]
        print("Completion Result")
        return response
    except Exception as e:
        print(e)

In [0]:
query = "do you have a spy movie?"
await chat_bot_function(user_input=query)

In [0]:
def generate_response(message, history):
    formatted_history = []
    for user, assistant in history:
        formatted_history.append({"role": "user", "content": user })
        formatted_history.append({"role": "assistant", "content":assistant})

    formatted_history.append({"role": "user", "content": message})
  
    response = client.chat.completions.create(model='gpt-3.5-turbo',
    messages= formatted_history,
    temperature=1.0)

    return response.choices[0].message.content

In [0]:
gr.ChatInterface(
    fn=chat_bot_function,
    chatbot=gr.Chatbot(label="Assistant", height=250),
    textbox=gr.Textbox(placeholder="Ask me a question about any movie", scale=7),
    title="RAG Movie Recommender",
    #description="I will try to answer your movie related questions as accurately as possible",
    theme="soft",
    retry_btn=None,
    undo_btn="Delete Previous",
).launch(share=True)