This notebook was tested in a `ml.t3.medium` instance and Sagemaker`Data Science 3` image Studio Notebook

<img src="chatbots.png" width="800"/>

This sample notebooks implements a general chatbot.
Key functionalities include:
1. Saving of Conversation History in DynamoDB
2. Handling Document upload for various supported document format (PDF, JPG, CSV, EXCEL, PNG, TXT, JSON) by passing the document local or S3 path.
3. Implementing various prompt template store locally (can also be stored in S3)

Install required packages

In [None]:
!pip install anthropic
!pip install s3fs -U
!pip install pandas -U
!pip install --force-reinstall amazon-textract-textractor==1.7.1

In [None]:
import boto3
from anthropic import Anthropic
from botocore.config import Config
import shutil
import os
import pandas as pd
import time
import json
import base64
import io
import re
import numpy as np
import openpyxl
from openpyxl.cell import Cell
from openpyxl.worksheet.cell_range import CellRange
import uuid
from botocore.exceptions import ClientError
from textractor import Textractor
from textractor.visualizers.entitylist import EntityList
from textractor.data.constants import TextractFeatures
from textractor.data.text_linearization_config import TextLinearizationConfig

#### Initialize Bedrock Runtime

In [None]:
# Create the bedrock runtime to invoke LLM
from botocore.config import Config
config = Config(
    read_timeout=600, # Read timeout parameter
    retries = dict(
        max_attempts = 10 ## Handle retries
    )
)
import boto3
bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name='us-west-2',config=config)

Configurable:
- `DYNAMODB_TABLE`: The name of the DynamoDB table used for storing chat history.
- `DYNAMODB_USER`: The default user ID for the application (used if authentication is disabled).
- `BUCKET`: The name of the S3 bucket used for caching documents and extracted text.
- `CHAT_HISTORY_LENGTH`: The number of recent chat messages to load from the DynamoDB table.
- `bedrock-region`: The AWS region where the Bedrock runtime is deployed.
- `LOAD_DOC_IN_ALL_CHAT_CONVO`: A boolean flag indicating whether to load documents in the chat history.
- `S3_DOC_CACHE_PATH`: S3 path to store attached document if from local system
- `TEXTRACT_RESULT_CACHE_PATH`: S3 path to cache extracted PDF and Images 

In [None]:
DYNAMODB_TABLE="SessionChatHistory" 
DYNAMODB_USER= "test-user-sonnet10"
SESSIONID=str(uuid.uuid4())
DYNAMODB  = boto3.resource('dynamodb')
dynamo=boto3.client('dynamodb')
chat_hist=[]
BUCKET="fairstone"
S3_DOC_CACHE_PATH='uploads'
TEXTRACT_RESULT_CACHE_PATH="textract_output"
LOAD_DOC_IN_ALL_CHAT_CONVO=False
CHAT_HISTORY_LENGTH=5
S3=boto3.client('s3')

#### Create DynamoDB Table
A DynamoDB Table is created with a user ID as partition Key and Session ID as sort key. 
This enables saving multiple chat session history under the same user id.\
Provide a bucket name that would be used to cache Amazon Textract results for document OCR.

In [None]:
import boto3
try:
    table = DYNAMODB.create_table(
        TableName=DYNAMODB_TABLE,
        KeySchema=[
            {
                'AttributeName': 'UserId',  # Partition key
                'KeyType': 'HASH'  
            },
            {
                'AttributeName': 'SessionId',   # Sort key
                'KeyType': 'RANGE'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'UserId',
                'AttributeType': 'S'   # String data type
            },
            {
                'AttributeName': 'SessionId',
                'AttributeType': 'S'
            },
        ],
        BillingMode='PAY_PER_REQUEST'  # On-demand billing
    )

    print("Table status:", table.table_status)

    # Wait until the table exists.
    table.meta.client.get_waiter("table_exists").wait(TableName="SessionChatHistory")
    print(table.item_count)
except dynamo.exceptions.ResourceInUseException as e:
    print(e.response['Error']['Message'])

#### Utility Functions

This function reads an Excel file from the specified S3 bucket using the provided S3 URI.
   It loads the workbook using openpyxl, unmerges any merged cells, and copies their values
   to individual cells. The worksheet data is then converted to a pandas DataFrame, and the
   `strip_newline` function is applied to each cell value to remove newline characters.
   Finally, the DataFrame is converted to a CSV string with pipe (|) as the delimiter and
   returned.

In [None]:
def strip_newline(cell):
    return str(cell).strip()

def table_parser_utills(file):        
    # Read from S3
    s3 = boto3.client('s3')
    match = re.match("s3://(.+?)/(.+)", file)
    if match:
        bucket_name = match.group(1)
        key = match.group(2)    
        obj = s3.get_object(Bucket=bucket_name, Key=key)  
    # Read Excel file from S3 into a buffer
    xlsx_buffer = io.BytesIO(obj['Body'].read())
    xlsx_buffer.seek(0) 
    # Load workbook, get active worksheet
    wb = openpyxl.load_workbook(xlsx_buffer)
    worksheet = wb.active    
    for merged_cell_range in all_merged_cell_ranges:
        merged_cell: Cell = merged_cell_range.start_cell
        worksheet.unmerge_cells(range_string=merged_cell_range.coord)
        for row_index, col_index in merged_cell_range.cells:
            cell: Cell = worksheet.cell(row=row_index, column=col_index)
            cell.value = merged_cell.value
    # determine table header index
    df = pd.DataFrame(worksheet.values)
    df=df.map(strip_newline)  
    return df.to_csv(sep="|", index=False,header=0)

 Retrieves a list of object keys from an S3 bucket with the specified prefix.
 
   This function uses the boto3 library to interact with AWS S3. It lists the objects
   in the specified bucket that have keys starting with the given prefix. The function
   extracts the object keys, removes the prefix from each key, and appends the resulting
   names to a list. If no objects are found with the specified prefix, an empty list is
   returned.


In [None]:
def get_s3_keys(prefix):
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)
    keys=""
    if "Contents" in response:
        keys = []
        for obj in response['Contents']:
            key = obj['Key']
            name = key[len(prefix):]
            keys.append(name)
    return keys

Extracts text from a PDF or image file using AWS Textract.

   This function checks if the extracted text content for the given file is already cached in S3.
   If cached, it retrieves the text from S3 and returns it. If not cached, it uses the Textractor
   library to extract the text from the file using AWS Textract.

   For PDF files, it makes an asynchronous call to start_document_analysis(), which may result in
   some wait time. For other file types (e.g., images), it makes a synchronous call to analyze_document().

   The extracted text is then processed using a TextLinearizationConfig to customize the output format.
   The resulting text is uploaded to S3 for caching and then returned.

In [None]:
def exract_pdf_text_aws(file):    
    file_base_name=os.path.basename(file)
    # Checking if extracted doc content is in S3
    if [x for x in get_s3_keys(f"{TEXTRACT_RESULT_CACHE_PATH}/") if file_base_name in x]:      
        response = S3.get_object(Bucket=BUCKET, Key=f"{TEXTRACT_RESULT_CACHE_PATH}/{file_base_name}.txt")
        text = response['Body'].read()
        return text
    else:
        dir_name, ext = os.path.splitext(file)
        extractor = Textractor(region_name="us-east-1")
        # Asynchronous call, you will experience some wait time. Try caching results for better experience
        if "pdf" in ext:
            print("Asynchronous call, you may experience some wait time.")
            document = extractor.start_document_analysis(
            file_source=file,
            features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],       
            save_image=False,   
            s3_output_path=f"s3://{BUCKET}/textract_output/"
        )
        #Synchronous call
        else:
            document = extractor.analyze_document(
            file_source=file,
            features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],  
            save_image=False,
        )
        config = TextLinearizationConfig(
        hide_figure_layout=True,   
        hide_header_layout=False,    
        table_prefix="<table>",
        table_suffix="</table>",
        )
        # Upload extracted content to s3
        S3.put_object(Body=document.get_text(config=config), Bucket=BUCKET, Key=f"{TEXTRACT_RESULT_CACHE_PATH}/{file_base_name}.txt") 
        return document.get_text(config=config)
    

In [None]:
def get_s3_obj_from_bucket_(file):
    """Retrieves an object from an S3 bucket given its S3 URI.
    Args:
       file (str): The S3 URI of the object to retrieve, in the format "s3://{bucket_name}/{key}".
   Returns:
       botocore.response.StreamingBody: The retrieved S3 object.
    """
    s3 = boto3.client('s3')
    match = re.match("s3://(.+?)/(.+)", file)
    if match:
        bucket_name = match.group(1)
        key = match.group(2)    
        obj = s3.get_object(Bucket=bucket_name, Key=key)  
    return obj

def put_obj_in_s3_bucket_(docs):
    """Uploads a file to an S3 bucket and returns the S3 URI of the uploaded object.
    Args:
       docs (str): The local file path of the file to upload to S3.
   Returns:
       str: The S3 URI of the uploaded object, in the format "s3://{bucket_name}/{file_path}".
    """
    file_name=os.path.basename(docs)
    file_path=f"{S3_DOC_CACHE_PATH}/{file_name}"
    S3.upload_file(docs, BUCKET, file_path)
    return f"s3://{BUCKET}/{file_path}"

Handles the processing of uploaded documents or files from S3 based on their file extension.

   This function takes a file path or S3 URI as input and processes the file based on its extension.
   It supports the following file types:
   - PDF, PNG, JPG: Extracts text from the file using the `extract_pdf_text_aws` function.
   - CSV: Reads the file using pandas' `read_csv` function.
   - XLSX, XLX: Parses the file using the `table_parser_utils` function.
   - JSON: Retrieves the object from S3 using the `get_s3_obj_from_bucket_` function and loads the JSON content.
   - TXT, PY: Retrieves the object from S3 using the `get_s3_obj_from_bucket_` function and reads the content.

   The function can be extended to handle additional file extensions by implementing the corresponding logic.


In [None]:
def handle_doc_upload_or_s3(file):
    dir_name, ext = os.path.splitext(file)
    if  ext in [".pdf", ".png", ".jpg"]:   
        content=exract_pdf_text_aws(file)
    elif "csv"  in ext:
        content= pd.read_csv(file)
    elif ext in [".xlsx", ".xlx"]:
        content=table_parser_utills(file)   
    elif  "json" in ext:      
        obj=get_s3_obj_from_bucket_(file)
        content = json.loads(obj['Body'].read())  
    elif  ext in [".txt",".py"]:       
        obj=get_s3_obj_from_bucket_(file)
        content = obj['Body'].read()
    # Implement any of file extension logic 
    return content

Stores long-term chat history in DynamoDB.

   This function takes a dictionary of messages and stores it in a DynamoDB table.
   It uses the user ID and session ID as the primary key to identify the item in the table.
   If an item with the same user ID and session ID already exists in the table, the function
   retrieves the existing messages and appends the new messages to the list.
   Finally, it puts the updated chat item back into the DynamoDB table.


In [None]:
def put_db(messages):
    """Store long term chat history in DynamoDB"""    
    chat_item = {
        "UserId": DYNAMODB_USER, # user id
        "SessionId": SESSIONID, # User session id
        "messages": [messages],  # 'messages' is a list of dictionaries
        "time":messages['time']
    }
    existing_item = DYNAMODB.Table(DYNAMODB_TABLE).get_item(Key={"UserId": DYNAMODB_USER, "SessionId":SESSIONID})
    if "Item" in existing_item:
        existing_messages = existing_item["Item"]["messages"]
        chat_item["messages"] = existing_messages + [messages]
    response = DYNAMODB.Table(DYNAMODB_TABLE).put_item(
        Item=chat_item
    )    

Retrieves chat history from DynamoDB and prepares it for the conversation.

   This function retrieves the chat history from DynamoDB based on the provided `chat_histories`
   and `cutoff` parameters. It processes the chat history and prepares it for the conversation
   based on the `claude3` flag and the `LOAD_DOC_IN_ALL_CHAT_CONVO` configuration.

In [None]:
def get_chat_history_db(chat_histories, cutoff,claude3):
    current_chat=[]
    chat_hist=chat_histories['Item']['messages'][-cutoff:]            
    for d in chat_hist:
        if d['image'] and claude3 and LOAD_DOC_IN_ALL_CHAT_CONVO:
            content=[]
            for img in d['image']:
                s3 = boto3.client('s3')
                match = re.match("s3://(.+?)/(.+)", img)
                image_name=os.path.basename(img)
                _,ext=os.path.splitext(image_name)
                if "jpg" in ext: ext=".jpeg"                        
                if match:
                    bucket_name = match.group(1)
                    key = match.group(2)    
                    obj = s3.get_object(Bucket=bucket_name, Key=key)
                    base_64_encoded_data = base64.b64encode(obj['Body'].read())
                    base64_string = base_64_encoded_data.decode('utf-8')                        
                content.extend([{"type":"text","text":image_name},{
                  "type": "image",
                  "source": {
                    "type": "base64",
                    "media_type": f"image/{ext.lower().replace('.','')}",
                    "data": base64_string
                  }
                }])
            content.extend([{"type":"text","text":d['user']}])
            current_chat.append({'role': 'user', 'content': content})
        elif d['document'] and LOAD_DOC_IN_ALL_CHAT_CONVO:
            doc='Here are the documents:\n'
            for docs in d['document']:
                uploads=handle_doc_upload_or_s3(docs)
                doc_name=os.path.basename(docs)
                doc+=f"<{doc_name}>\n{uploads}\n</{doc_name}>\n"
            if not claude3 and d["image"]:
                for docs in d['image']:
                    uploads=handle_doc_upload_or_s3(docs)
                    doc_name=os.path.basename(docs)
                    doc+=f"<{doc_name}>\n{uploads}\n</{doc_name}>\n"
            current_chat.append({'role': 'user', 'content': [{"type":"text","text":doc+d['user']}]})
        else:
            current_chat.append({'role': 'user', 'content': [{"type":"text","text":d['user']}]})
        current_chat.append({'role': 'assistant', 'content': d['assistant']})  
    return current_chat, chat_hist

Processes the streamed response from the Bedrock model and extracts the generated text.

Invokes the Bedrock Claude model with the provided chat history, system message, prompt, and optional image(s).

In [None]:
def bedrock_streemer(response):
    stream = response.get('body')
    answer = ""
    i = 1
    if stream:
        for event in stream:
            chunk = event.get('chunk')
            if  chunk:
                chunk_obj = json.loads(chunk.get('bytes').decode())
                if "delta" in chunk_obj:                    
                    delta = chunk_obj['delta']
                    if "text" in delta:
                        text=delta['text'] 
                        print(text, end="")
                        answer+=str(text)       
                        i+=1
                if "amazon-bedrock-invocationMetrics" in chunk_obj:
                    input_tokens= chunk_obj['amazon-bedrock-invocationMetrics']['inputTokenCount']
                    output_tokens=chunk_obj['amazon-bedrock-invocationMetrics']['outputTokenCount']
                    print(f"\nInput Tokens: {input_tokens}\nOutput Tokens: {output_tokens}")
    return answer,input_tokens, output_tokens

def bedrock_claude_(chat_history,system_message, prompt,model_id,image_path=None):
    content=[{
        "type": "text",
        "text": prompt
            }]
    if image_path:       
        if not isinstance(image_path, list):
            image_path=[image_path]      
        for img in image_path:
            s3 = boto3.client('s3')
            match = re.match("s3://(.+?)/(.+)", img)
            image_name=os.path.basename(img)
            _,ext=os.path.splitext(image_name)
            if "jpg" in ext: ext=".jpeg"                        
            if match:
                bucket_name = match.group(1)
                key = match.group(2)    
                obj = s3.get_object(Bucket=bucket_name, Key=key)
                base_64_encoded_data = base64.b64encode(obj['Body'].read())
                base64_string = base_64_encoded_data.decode('utf-8')
            content.extend([{"type":"text","text":image_name},{
              "type": "image",
              "source": {
                "type": "base64",
                "media_type": f"image/{ext.lower().replace('.','')}",
                "data": base64_string
              }
            }])
    chat_history.append({"role": "user",
            "content": content})
    prompt = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1500,
        "temperature": 0.5,
        "system":system_message,
        "messages": chat_history
    }
    answer = ""
    prompt = json.dumps(prompt)
    response = bedrock_runtime.invoke_model_with_response_stream(body=prompt, modelId=model_id, accept="application/json", contentType="application/json")
    answer,input_tokens,output_tokens=bedrock_streemer(response) 
    return answer, input_tokens, output_tokens

Invokes the Bedrock Claude model with retries and exponential backoff in case of throttling errors.


In [None]:
def _invoke_bedrock_with_retries(current_chat, chat_template, question, model_id, image_path):
    max_retries = 5
    backoff_base = 2
    max_backoff = 3  # Maximum backoff time in seconds
    retries = 0

    while True:
        try:
            response,input_tokens,output_tokens = bedrock_claude_(current_chat, chat_template, question, model_id, image_path)
            return response,input_tokens,output_tokens
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            else:
                # Some other API error, rethrow
                raise
                

#### Chat Function

Conducts a conversation with the Bedrock Claude model based on the user's question and optional uploaded documents.

   This function takes a user's question and a list of document paths (optional) as input. It retrieves the past chat
   history from DynamoDB (if configured) or uses local memory storage. It prepares the chat template based on whether
   documents are provided or not. If documents are provided, it handles the document uploads and extracts the text
   content. If the Claude3 model is used, it handles images separately. The function then invokes the Bedrock Claude
   model with retries and exponential backoff in case of throttling errors. The conversation history is stored in
   DynamoDB (if configured) or local memory for future reference.


In [None]:
from typing import List
def conversation_bedroc_chat_(question, model_id,upload_doc: List[str]):
    """
    Function takes a user query and a document path (from S3 or Local)
    passing a document path is optional
    """    
    num_retries=0
    if not isinstance(upload_doc, list):
        raise TypeError("documents must be in a list format")
        
    # Check if Claude3 model is used and handle images with the CLAUDE3 Model
    claude3=False
    if "sonnet" in model_id or "haiku" in model_id:
        claude3=True
    current_chat=[]
   
    # Retrieve past chat history from Dynamodb
    if DYNAMODB_TABLE:
        chat_histories = DYNAMODB.Table(DYNAMODB_TABLE).get_item(Key={"UserId": DYNAMODB_USER, "SessionId":SESSIONID})
        if "Item" in chat_histories:            
            current_chat,chat_hist=get_chat_history_db(chat_histories, CHAT_HISTORY_LENGTH,claude3)
        else:
            chat_hist=[]
    else:
        for d in chat_hist:
            current_chat.append({'role': 'user', 'content': d['user']})
            current_chat.append({'role': 'assistant', 'content': d['assistant']})
    ## prompt template for when a user uploads a doc
    doc_path=[]
    image_path=[]
    doc=""
    if upload_doc:  
        doc='Here are the documents:\n'
        for ids,docs in enumerate(upload_doc):
            _,extensions=os.path.splitext(docs)
            if not docs.startswith("s3://"):
                docs=put_obj_in_s3_bucket_(docs)
            if extensions in [".jpg",".jpeg",".png",".gif",".webp"] and claude3:       
                image_path.append(docs)
                continue
            uploads=handle_doc_upload_or_s3(docs)             
            doc_path.append(docs)
            doc_name=os.path.basename(docs)
            doc+=f"<{doc_name}>\n{uploads}\n</{doc_name}>\n"
        with open("prompt/doc_chat.txt","r") as f:
            chat_template=f.read()       
    else:        
        # Chat template for open ended query
        with open("prompt/chat.txt","r") as f:
            chat_template=f.read()    
    response,input_tokens,output_tokens=_invoke_bedrock_with_retries(current_chat, chat_template, doc+question, model_id, image_path)
    chat_history={"user":question,
    "assistant":response,
    "image":image_path,
    "document":doc_path,
    "modelID":model_id,
    "time":str(time.time()),
    "input_token":round(input_tokens) ,
    "output_token":round(output_tokens)}         
                 
    #store convsation memory in DynamoDB table
    if DYNAMODB_TABLE:
        put_db(chat_history)
    # use local memory for storage
    else:
        chat_hist.append(chat_history)   
    return response

#### Query the the chat bot with your questions.
Also takes a document path(s) stored in s3 or local. Once a documents path is passed, a different prompt template is triggered.

In [None]:
question="""Describe this app"""
model_id="anthropic.claude-3-haiku-20240307-v1:0"#"anthropic.claude-3-sonnet-20240229-v1:0""anthropic.claude-v2","anthropic.claude-3-haiku-20240307-v1:0"
docu=["bedrock-chat.py","config.json","pricing.json"]
res=conversation_bedroc_chat_(question, model_id,docu)