Importing libraries needed for this lab. 

1. pymongo: a python driver that gives access to DocumentDB clusters
2. boto3: the AWS sdk that allows you access services programatically 
3. json: a native library to work with json data
4. pandas: a Python library that helps with data analysis and manipulation: 
5. langchain: an open-source framework that allows developers to build applications using large language models (LLMs).

In [1]:
# Loading the Libraries
import pymongo
import boto3
import json
import pandas as pd 
import gradio as gr
from langchain_community.embeddings import BedrockEmbeddings
from langchain_aws import ChatBedrock
from langchain_aws import ChatBedrockConverse
from langchain_community.vectorstores.documentdb import DocumentDBVectorSearch
from langchain_community.vectorstores.documentdb import DocumentDBSimilarityType
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from botocore.exceptions import ClientError
import langchain

connecting to DocumentDB cluster. Required parameters:

1. Documentdb cluster indentifier
2. Username created for lab
3. password created for lab

For more information see: 

https://docs.aws.amazon.com/documentdb/latest/developerguide/connect_programmatically.html

In [2]:
# Connecting to DocumentDB Cluster
client = pymongo.MongoClient(
"workshop-cloudformation-documentdb.cluster-cjqqqo2wky0f.us-west-2.docdb.amazonaws.com:27017",
username="Labuser",
password="PgXYh-~j37%LaFgy",
retryWrites=False,
tls='true',
tlsCAFile="global-bundle.pem")

client.server_info()


  client = pymongo.MongoClient(


{'version': '5.0.0',
 'versionArray': [5, 0, 0, 0],
 'bits': 64,
 'maxBsonObjectSize': 16777216,
 'ok': 1.0,
 'operationTime': Timestamp(1746772379, 1)}

Creating a database and collection with the existing DocumentDB connection.

Required paramenters:

    1. Database Name
    2. Collection Name

The current cell below will create a database named "workshopdatabase" and a collection named "workshopcollection"
    
    https://www.w3schools.com/python/python_mongodb_create_db.asp

In [3]:
#create database/collection
db = client["workshopdatabase"]
collection = db["workshopcollection"]

Reading movie data from the 'movies_metadata_new.csv' and inserting it into the DocumentDB collection created earlier.

In [4]:
# Loading the DocumentDB database from the example dataset in csv

# Read the CSV file into a DataFrame
csv_file = "movie_data.csv" 
data = pd.read_csv(csv_file)
# Convert the DataFrame to a list of dictionaries (one per row)
data_dict = data.to_dict(orient="records")
# Insert the data into the MongoDB collection
collection.insert_many(data_dict)
print("CSV data has been successfully uploaded to DocumentDB")

CSV data has been successfully uploaded to DocumentDB


initializing a bedrock client through boto3 that allows api calls to be made to Bedrock
For more information: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime.html

The default value for this workshop is set to us-west-2, please replace this with the value of region you are running this workshop in.

In [6]:
# Starting the bedrock client
bedrock_client = boto3.client('bedrock-runtime','us-west-2')

Creating a function that utilizes the client above to access the amazon titan embeddings model to create embeddings.

In [7]:
# Defining Bedrock model parameters
modelId = "amazon.titan-embed-text-v1"  # (Change this to try different embedding models)
accept = "application/json"
contentType = "application/json"

#Define Generate Embedding Function
def generate_embedding(text):
    body = json.dumps({"inputText": text})
    response = bedrock_client.invoke_model(
        body=body, modelId=modelId, accept=accept, contentType=contentType
    )
    response_body = json.loads(response.get("body").read())
    embedding = response_body.get("embedding")
    return embedding

checking to see if thefunction works replace '<text>' with any word of your choice to see the created embeddings

In [None]:
# Checking if Bedrock is generating embeddings
generate_embedding("mouse")

creating embeddings for the 'overview' field of the movie data to allow. We will then update each record in DocumentDB to have a field called 'embedding_br' which holds the embeddings for the for of the 'overview' field. From here our data ingestion and preparation is complete

In [11]:
# Fetch all documents that have overview field
documents_to_update = list(collection.find({'overview': {"$exists": True}}))

# Define the batch size for processing
batch_size = 100  # You can adjust this based on your requirements

# Process documents in batches
for i in range(0, len(documents_to_update), batch_size):
    batch = documents_to_update[i:i + batch_size]

    # Generate embeddings for the current batch and store it alongside existing data as new field
    for doc in batch:
        doc['embedding_br'] = generate_embedding(doc['overview'])

    # Update the batch of documents
    bulk_operations = [pymongo.ReplaceOne({'_id': doc['_id']}, doc) for doc in batch]
    collection.bulk_write(bulk_operations)

print("Batch processing completed.")

Batch processing completed.


Creating a text index on the 'overview' field to allow for keyword searching. For more information see: 
https://docs.aws.amazon.com/documentdb/latest/developerguide/text-search.html

Required Parameter: name

In [13]:
#Creating native text search index

collection.create_index ([("overview","text")],name="text_index")

'text_index'

projection attributes

In [14]:
projection = {
"_id":0,
"primary_genre": 1, 
"overview": 1,
"original_title":1
}

creating a function that uses takes a keyword to search on the 'overview' field.
Parameter required: keyword

In [15]:
def search_text(keyword):
    results = collection.aggregate([{"$match": {"$text": {"$search": keyword}}},{"$project": projection},{"$limit": 3}])
    return json.dumps(list(results))


Utilizing the function above to search the overview field for a keyword
required parameter: keyword

In [16]:
keyword = "romance"
text_results = search_text(keyword)
for item in eval(text_results):
    print(item)

{'primary_genre': 'Comedy', 'original_title': "Breakfast at Tiffany's", 'overview': "Fortune hunter Holly Golightly finds herself captivated by aspiring writer Paul Varjak, who's moved into her building on a wealthy woman's dime. As romance blooms between Paul and Holly, Doc Golightly shows up on the scene, revealing Holly's past."}
{'primary_genre': 'Romance', 'original_title': 'Grease', 'overview': "Australian good girl Sandy and greaser Danny fell in love over the summer. But when they unexpectedly discover they're now in the same high school, will they be able to rekindle their romance despite their eccentric friends?"}
{'primary_genre': 'Drama', 'original_title': 'Elizabeth', 'overview': 'The story of the ascension to the throne and the early reign of Queen Elizabeth the First, the endless attempts by her council to marry her off, the Catholic hatred of her and her romance with Lord Robert Dudley.'}


creating a vector index on the 'embeddings_br' field. 
Required parameters:

1. similarity
2. name

For more information see: https://docs.aws.amazon.com/documentdb/latest/developerguide/vector-search.html

In [17]:
#Creating HNSW vector search index
similarity = "cosine" #can switch to 'euclidean' or 'dot product'
collection.create_index ([("embedding_br","vector")], 
    vectorOptions= {
        "type": "hnsw", 
        "similarity": similarity,
        "dimensions": 1536,
        "m": 16,
        "efConstruction": 64},
    name="vs_index")

'vs_index'

creating a function that utilzies DocumentDB's vector search to perform semantic search
Required parameters: keyword/phase
For more information see: https://docs.aws.amazon.com/documentdb/latest/developerguide/vector-search.html#w7aac21c11c15

In [18]:
#Semantic search function
similarity = "cosine" #can switch to 'euclidean' or 'dot product' to match the above
def search_semantic(keyword):
    query = {"vectorSearch" : {"vector" : generate_embedding(keyword), "path": "embedding_br", "similarity": similarity, "k": 3}}
    results = collection.aggregate([{'$search': query},{"$project": projection}])
    return json.dumps(list(results))

Utilizing the function above to utilizing vector search of a keyword or phrase. You may compare this with the previous keyword search to see how results may differ.

Required Paramter: keyword/phrase

In [19]:
keyword_or_phrase = "romance" 
vector_search_results = search_semantic(keyword_or_phrase)
for item in eval(vector_search_results):
    print(item)

{'primary_genre': 'Drama', 'original_title': 'Closer', 'overview': 'A witty, romantic, and very dangerous love story about chance meetings, instant attractions, and casual betrayals. Two couples disintegrate when they begin destructive adulterous affairs with each other.'}
{'primary_genre': 'Crime', 'original_title': 'Thief of Hearts', 'overview': "A woman trapped in a boring marriage begins an affair with a handsome man who seems able to read her mind. She doesn't know that he has broken into her house and read her diaries, where she has recorded her deepest thoughts and fantasies."}
{'primary_genre': 'Drama', 'original_title': 'The House of Mirth', 'overview': 'A woman risks losing her chance of happiness with the only man she has ever loved.'}


Creating a hybrid search approach that combines the results of both keyword search and vector search. You can compare the results of this with the the semantic search and keyword search functions above. 

Required parameter: keyword

In [20]:
def search_hybrid(keyword):
    results1 = json.loads(search_semantic(keyword))
    results2 = json.loads(search_text(keyword))
    results1 = results1[:2]
    results2 = results2[:1]
    seen = set()
    combined_results = []
    for d in results1 + results2:
        t = tuple(sorted(d.items()))
        if t not in seen:
            seen.add(t)
            combined_results.append(d)
    return json.dumps(combined_results)

In [21]:
keyword_or_phrase = "romance"
search_hybrid(keyword_or_phrase)

'[{"primary_genre": "Drama", "original_title": "Closer", "overview": "A witty, romantic, and very dangerous love story about chance meetings, instant attractions, and casual betrayals. Two couples disintegrate when they begin destructive adulterous affairs with each other."}, {"primary_genre": "Crime", "original_title": "Thief of Hearts", "overview": "A woman trapped in a boring marriage begins an affair with a handsome man who seems able to read her mind. She doesn\'t know that he has broken into her house and read her diaries, where she has recorded her deepest thoughts and fantasies."}, {"primary_genre": "Comedy", "original_title": "Breakfast at Tiffany\'s", "overview": "Fortune hunter Holly Golightly finds herself captivated by aspiring writer Paul Varjak, who\'s moved into her building on a wealthy woman\'s dime. As romance blooms between Paul and Holly, Doc Golightly shows up on the scene, revealing Holly\'s past."}]'

A UI that allows you to easily compare and contrast the different search options. Note that you have to type text BEFORE clicking the buttons. If you get an error, it is because you had a blank search, simply type in the query and click the search of choice to proceed.

In [22]:
# Creating a UI with Gradio
def format_output(data):
    movies = json.loads(data)
    html_content = "<ul style='list-style: none; padding: 0;'>"
    for movie in movies:
        html_content += f"<li style='margin-bottom: 20px;'><strong>{movie['original_title']}</strong><br/>{movie['overview']}</li>"
    html_content += "</ul>"
    return html_content

with gr.Blocks(gr.themes.Soft()) as demo:
    with gr.Column():
        gr.Markdown("## Movie Search Demo")
        query_input = gr.Textbox(label="Enter your query here...")
        with gr.Row():
            semantic_btn = gr.Button("Semantic Search")
            text_btn = gr.Button("Text Search")
            hybrid_btn = gr.Button ("Hybrid Search")
        output = gr.HTML(label="Search Results")

    def format_semantic(query):
        return format_output(search_semantic(query))

    def format_text(query):
        return format_output(search_text(query))
    
    def format_hybrid(query):
        return format_output(search_hybrid(query))

    semantic_btn.click(format_semantic, inputs=query_input, outputs=output)
    text_btn.click(format_text, inputs=query_input, outputs=output)
    hybrid_btn.click(format_hybrid, inputs=query_input, outputs=output)

demo.launch()

--------


Running on local URL:  http://127.0.0.1:7860
Sagemaker notebooks may require sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Running on public URL: https://af076ffa8a95cbb4dc.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gradio/queueing.py", line 536, in process_events
    response = await route_utils.call_process_api(
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gradio/route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gradio/blocks.py", line 1935, in process_api
    result = await self.call_function(
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/gradio/blocks.py", line 1520, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/anyio/_bac

Integrating our search functions with generative AI to create a more collaborative experience.

In [23]:
# Use the native inference API to chat with database

chat_history = []

def rag_native_api(question, chat_history):

    context_docs = search_semantic(question)

    context= context_docs
    query= question
    chat_history = chat_history
    
    # Set the model ID, e.g., Llama 3 8b Instruct.
    model_id = "meta.llama3-8b-instruct-v1:0"

    # Define the prompt for the model.
    prompt = f"Given this text extracts:{context} and also consider the history of this chat {json.dumps(chat_history)} Please answer the following question:{query}"


    # Embed the prompt in Llama 3's instruction format.
    formatted_prompt = f"""
    <|begin_of_text|>
    <|start_header_id|>user<|end_header_id|>
    {prompt}
    <|eot_id|>
    <|start_header_id|>assistant<|end_header_id|>
    """

    # Format the request payload using the model's native structure.
    native_request = {
        "prompt": formatted_prompt,
        "max_gen_len": 2048,
        "temperature": 0,
    }

    # Convert the native request to JSON.
    request = json.dumps(native_request)

    try:
        # Invoke the model with the request.
        response = bedrock_client.invoke_model(modelId=model_id, body=request)

    except (ClientError, Exception) as e:
        print(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")
        exit(1)

    # Decode the response body.
    model_response = json.loads(response["body"].read())

    # Extract and print the response text.
    response_text = model_response["generation"]
    return(response_text)




Initializing UI

In [24]:
with gr.Blocks() as demo:
    gr.Markdown(
    """
    # Amazon DocumentDB Powered Chatbot Demo Powered by HNSW Index
    """)
    gr.ChatInterface(rag_native_api)
demo.launch()

--------


Running on local URL:  http://127.0.0.1:7861
Sagemaker notebooks may require sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Running on public URL: https://1a4921f3a642e0cc7a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




Creating the same RAG capabilities using Langchain
for more information see: https://python.langchain.com/docs/integrations/llms/bedrock/

In [25]:
llm = ChatBedrockConverse(model="meta.llama3-8b-instruct-v1:0", client=bedrock_client, temperature=0, max_tokens=None )

In [26]:
# Use the langchain API to chat with database

chat_history = []

def rag_langchain(question, chat_history):
    context_docs = search_semantic(question)
    
    # Create a PromptTemplate for the user's question
    question_prompt_template = PromptTemplate(
        input_variables=["context", "query", "chat_history"],
        template="Given this text extracts:\n-----\n{context}\n-----\n and also consider the history of this chat {chat_history}\nPlease answer the following question: {query}",)

    # Create an LLMChain
    llm_chain = LLMChain(prompt=question_prompt_template, llm=llm)

    # Prepare the input for the LLMChain
    input_data = {
        "context": context_docs,
        "query": question,
        "chat_history": chat_history,
    }
    
    # Run the LLMChain
    output = llm_chain.invoke(input_data)['text']
    
    return output

In [27]:
# Creating UI for your Chatbot
with gr.Blocks() as demo:
    gr.Markdown(
    """
    # Amazon DocumentDB Powered Chatbot Demo Powered by HNSW Index
    """)
    gr.ChatInterface(rag_langchain)
demo.launch()

--------


Running on local URL:  http://127.0.0.1:7862
Sagemaker notebooks may require sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Running on public URL: https://6d6519c70d7944183c.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)




  llm_chain = LLMChain(prompt=question_prompt_template, llm=llm)
