In [None]:
! pip install -r requirements.txt

In [None]:
# import all libraries
import json
import datetime
import time
import urllib 
import gradio as gr

from azure.core.exceptions import AzureError
from azure.core.credentials import AzureKeyCredential

#Cosmos DB imports
from azure.cosmos import CosmosClient
from azure.cosmos.aio import CosmosClient as CosmosAsyncClient
from azure.cosmos import PartitionKey, exceptions

from openai import AzureOpenAI
from dotenv import load_dotenv

In [None]:
# load environment variables. 

from dotenv import dotenv_values

env_name = ".env" # following .env template change to your own .env file name
config = dotenv_values(env_name)

OPENAI_API_KEY = config['AZURE_OPENAI_KEY']
OPENAI_API_ENDPOINT = config['AZURE_OPENAI_ENDPOINT']
OPENAI_API_VERSION = config['AZURE_OPENAI_VERSION'] # at the time of authoring, the api version is 2024-02-01
COMPLETIONS_MODEL_DEPLOYMENT_NAME = config['AZURE_OPENAI_COMPLETIONS_DEPLOYMENT']
EMBEDDING_MODEL_DEPLOYMENT_NAME = config['AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT']
COSMOSDB_NOSQL_ACCOUNT_KEY = config['COSMOSDB_KEY']
COSMOSDB_NOSQL_ACCOUNT_ENDPOINT = config['COSMOSDB_URI']

In [None]:
#Initialize OpenAI Client
AOAI_client = AzureOpenAI(api_key=OPENAI_API_KEY, azure_endpoint=OPENAI_API_ENDPOINT, api_version=OPENAI_API_VERSION,)

In [None]:
# method to create embeddings using azure open ai
def generate_embeddings(text):
    '''
    Generate embeddings from string of text.
    This will be used to vectorize data and user input for interactions with Azure OpenAI.
    '''
    response = AOAI_client.embeddings.create(input=text, model=EMBEDDING_MODEL_DEPLOYMENT_NAME)
    embeddings =response.model_dump()
    time.sleep(0.5) 
    return embeddings['data'][0]['embedding']

In [None]:
# Load the data file
data =[]
with open('text-sample.json', 'r') as d:
    data = json.load(d)
print(json.dumps(data, indent=2))

In [None]:
# Generate embeddings for title and content fields
n = 0
for item in data:
    n+=1
    item['id'] = str(n)
    title = item['title']
    content = item['content']
    title_embeddings = generate_embeddings(title)
    content_embeddings = generate_embeddings(content)
    item['titleVector'] = title_embeddings
    item['contentVector'] = content_embeddings
    item['@search.action'] = 'upload'
    print("Creating embeddings for item:", n, "/" ,len(data), end='\r')


In [None]:
#Save embeddings to sample_text_w_embeddings.json file
with open("text-sample_w_embeddings.json", "w") as f:
    json.dump(data, f)

In [None]:
# Initialize Cosmos Client
cosmos_client = CosmosClient(url=COSMOSDB_NOSQL_ACCOUNT_ENDPOINT, credential=COSMOSDB_NOSQL_ACCOUNT_KEY)

In [None]:
#create database
DATABASE_NAME = "vector-nosql-db"
db= cosmos_client.create_database_if_not_exists(
    id=DATABASE_NAME
)
properties = db.read()
print(json.dumps(properties))

In [None]:
# define vector embedding policy
vector_embedding_policy = {
    "vectorEmbeddings": [
        {
            "path":"/titleVector",
            "dataType":"float32",
            "distanceFunction":"dotproduct",
            "dimensions":1536
        },
        {
            "path":"/contentVector",
            "dataType":"float32",
            "distanceFunction":"cosine",
            "dimensions":1536
        }
    ]
}


In [None]:
# define vector indexing policy
indexing_policy = {
    "includedPaths": [
        {
            "path": "/*"
        }
    ],
    "excludedPaths": [
        {
            "path": "/\"_etag\"/?"
        },
        {
            "path": "/titleVector/*"
        },
        {
            "path": "/contentVector/*"
        }
    ],
    "vectorIndexes": [
        {"path": "/titleVector",
         "type": "quantizedFlat"
        },
        {"path": "/contentVector",
         "type": "quantizedFlat"
        }
    ]
}



In [None]:
# create container and cache container for semantic caching
CONTAINER_NAME = "vector-nosql-cont"
CACHE_CONTAINER_NAME = "vector-nosql-cache"


try:    
    container = db.create_container_if_not_exists(
                    id=CONTAINER_NAME,
                    partition_key=PartitionKey(path='/id', kind='Hash'),
                    indexing_policy=indexing_policy,
                    vector_embedding_policy=vector_embedding_policy)

    print('Container with id \'{0}\' created'.format(id))

except exceptions.CosmosResourceExistsError:
    print('A container with id \'{0}\' already exists'.format(id))


# Create the cache collection with vector index
try:
    cache_container = db.create_container_if_not_exists(id=CACHE_CONTAINER_NAME, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy)
    print('Container with id \'{0}\' created'.format(cache_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise


In [None]:
# Intialize container client.
CONTAINER_NAME = "vector-nosql-cont"
CACHE_CONTAINER_NAME = "vector-nosql-cache"

container = db.get_container_client(CONTAINER_NAME)
cache_container = db.get_container_client(CACHE_CONTAINER_NAME)

In [None]:
#insert data and embeddings into cosmos db.

with open('text-sample_w_embeddings.json') as f:
    data = json.load(f)

container_client = db.get_container_client(CONTAINER_NAME)

for item in data:
  print("writing item",item['id'])
  container_client.upsert_item(item)


In [None]:
# gets chat history from cache container.
def get_chat_history(container, completions=3):
    results = container.query_items(
        query= '''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ],enable_cross_partition_query=True)
    results = list(results)
    return results

In [None]:
# test the vector search by running sample query
query = "What are the services for running ML models?"
results = vector_search(query)
for result in results: 
  #print(result)
    print(f"Similarity Score: {result['SimilarityScore']}")
    print(f"patientId: {result['title']}")  
    print(f"patientId: {result['content']}")

In [None]:
#This function helps to ground the model with prompts and system instructions.

def generate_completion(vector_search_results, user_prompt, chat_history):
    system_prompt = '''
    You are an intelligent assistant for Microsoft Azure services.
    You are designed to provide helpful answers to user questions about Azure services given the information about to be provided.
        - Only answer questions related to the information provided below, provide at least 3 clear suggestions in a list format.
        - Write two lines of whitespace between each answer in the list.
        - If you're unsure of an answer, you can say ""I don't know"" or ""I'm not sure"" and recommend users search themselves."
        - Only provide answers that have products that are part of Microsoft Azure and part of these following prompts.
    '''

    messages=[{"role": "system", "content": system_prompt}]
    
        #chat history
    for chat in chat_history:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})

    for item in vector_search_results:
        messages.append({"role": "system", "content": item['content']})
    messages.append({"role": "user", "content": user_prompt})
    response = AOAI_client.chat.completions.create(model=COMPLETIONS_MODEL_DEPLOYMENT_NAME, messages=messages,temperature=0)
    
    return response

In [None]:
# function to cache response for semantic caching
import uuid
def cache_response(container, user_prompt, prompt_vectors, response):
    # Create a dictionary representing the chat document
    chat_document = {
        'id':  str(uuid.uuid4()),  
        'prompt': user_prompt,
        'completion': response.choices[0].message.content,
        'completionTokens': str(response.usage.completion_tokens),
        'promptTokens': str(response.usage.prompt_tokens),
        'totalTokens': str(response.usage.total_tokens),
        'model': response.model,
        'vector': prompt_vectors
    }
    # Insert the chat document into the Cosmos DB container
    container.create_item(body=chat_document)
    print("item inserted into cache.", chat_document)

In [None]:
# Perform a vector search on the Cosmos DB Cache container
def get_cache(container, vectors, similarity_score=0.0, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    return results

In [None]:
# fuction for chat completion

def chat_completion(cache_container,user_input):
   # container = db.get_container_client(CONTAINER_NAME)
    cache_container = db.get_container_client(CACHE_CONTAINER_NAME)
    #while user_input.lower() != "end":
    user_embeddings = generate_embeddings(user_input)

   # Query the chat history cache first to see if this question has been asked before
    cache_results = get_cache(container = cache_container, vectors = user_embeddings, similarity_score=0.99, num_results=1)
    if len(cache_results) > 0:
        print("Cached Result\n")
        return cache_results[0]['completion'], True
   
    else: 

      print("New result\n")
      search_results = vector_search(user_input)
      #chat history
      chat_history = get_chat_history(cache_container, 3)

      completions_results = generate_completion(search_results, user_input,chat_history)
      #completions_results = generate_completion(search_results, user_input)

      print("\n")

      print("Caching response \n")
      #cache the response
      cache_response(cache_container, user_input, user_embeddings, completions_results)

      return completions_results.choices[0].message.content, False

In [None]:
# Run this block which used gradio to spin up a simple chat like UI
chat_history = []
with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="Azure Assistant")
    
    msg = gr.Textbox(label="Ask me about Azure Services!")
    clear = gr.Button("Clear")

    def user(user_message, chat_history):
        # Create a timer to measure the time it takes to complete the request
        start_time = time.time()
        # Get LLM completion
        response_payload, cached = chat_completion(cache_container, user_message)
        # Stop the timer
        end_time = time.time()
        elapsed_time = round((end_time - start_time) * 1000, 2)
        #response = response_payload
        print(response_payload)
        # Append user message and response to chat history
        details = f"\n (Time: {elapsed_time}ms)"
        if cached:
         details += " (Cached)"
        chat_history.append([user_message, response_payload + details])
        
        return gr.update(value=""), chat_history
    
    msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)

    clear.click(lambda: None, None, chatbot, queue=False)

# Launch the Gradio interface
demo.launch(debug=True)

In [None]:
demo.close()