In [1]:
import boto3
import json
import time
import os
import pandas as pd
from datetime import datetime
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

# The following libraries are imported for RAGAS and LANGFUSE Implementation
import pprint
from botocore.client import Config
from langchain.llms.bedrock import Bedrock
from langchain.embeddings import BedrockEmbeddings
from langchain.retrievers.bedrock import AmazonKnowledgeBasesRetriever

pp = pprint.PrettyPrinter(indent=2)

In [2]:
# AWS Region Name
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name

# Bedrock run-time client
bedrock_runtime_client = boto3.client('bedrock-runtime', region_name = region_name)

In [3]:
# Aoss Host Name
AossHost = "7i179yervs3eanlga0sh.us-east-1.aoss.amazonaws.com"

# Aoss Collection Index Name
index = "bedrock-index"

service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region_name, service)

In [4]:
# The directory to save results
Rslts_Save_Dir = "/home/ec2-user/SageMaker/AscendNotebook/CodeExecutionMetrics/"

print("Present working directory:", os.getenv('PWD'))
print("Directory to save results:", Rslts_Save_Dir)

Present working directory: /home/ec2-user/SageMaker/AscendNotebook/Testing_RAG_LLMs
Directory to save results: /home/ec2-user/SageMaker/AscendNotebook/CodeExecutionMetrics/


In [5]:
# get keys for your project from https://cloud.langfuse.com
LANGFUSE_PUBLIC_KEY = "pk-lf-f7b59b69-3122-4669-ac56-ac1a566027fb" #replace it with your public key
LANGFUSE_SECRET_KEY = "sk-lf-e399949a-a9fd-4730-90d2-3281a0ed23e4" #replace it with you secret key
LANGFUSE_HOST="https://us.cloud.langfuse.com"

In [6]:
bedrock_client = boto3.client('bedrock-runtime')

model_kwargs_claude = {
    "temperature": 0.1,
    "top_k": 10,
    "max_tokens_to_sample": 3000
}

llm_for_text_generation = Bedrock(model_id="anthropic.claude-instant-v1",
                                     model_kwargs=model_kwargs_claude,
                                     streaming=True,
                                     client = bedrock_client,)


llm_for_evaluation = Bedrock(model_id="anthropic.claude-v2:1",
                                model_kwargs=model_kwargs_claude,
                                streaming=True,
                                client = bedrock_client,)

bedrock_embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1",client=bedrock_client)

  warn_deprecated(


In [7]:
from ragas.metrics import (
    faithfulness,
    answer_relevancy
)
from ragas.metrics.critique import harmfulness

from ragas.llms import LangchainLLM

ragas_bedrock_model = LangchainLLM(llm_for_evaluation)

In [8]:
#set embeddings model for evaluating answer relevancy metric
answer_relevancy.embeddings = bedrock_embeddings

#specify the metrics here
metrics = [
        faithfulness,
        answer_relevancy,
        harmfulness
    ]

for m in metrics:
    m.__setattr__("llm", ragas_bedrock_model)

In [9]:
from langfuse import Langfuse

langfuse = Langfuse(public_key=LANGFUSE_PUBLIC_KEY,secret_key=LANGFUSE_SECRET_KEY, host = LANGFUSE_HOST)
print("Check connection to Langfuse:", langfuse.auth_check())

Check connection to Langfuse: True


In [10]:
def score_with_ragas(query, chunks, answer):
    scores = {}
    for m in metrics:
        print(f"calculating {m.name}")
        scores[m.name] = m.score_single(
            {"question": query, "contexts": chunks, "answer": answer}
        )
    return scores

In [11]:
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': AossHost, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
# time.sleep(60)

In [12]:
def invoke_model(input):
    response = bedrock_runtime_client.invoke_model(
        body=json.dumps({
            'inputText': input
        }),
        modelId="amazon.titan-embed-text-v1",
        accept="application/json",
        contentType="application/json",
    )
    response_body = json.loads(response.get("body").read())
    return response_body.get("embedding")

In [13]:
RAG_model_id = "anthropic.claude-v2:1"
MDL_TYPE = 'ANTH_CLAUDE_21_'
RAG_MAX_TOKENS = 4000
RAG_TEMP = 0.1
RAG_TOP_K = 250
RAG_TOP_P = 1

In [14]:
def invoke_llm_model(input, RAG_MAX_TOKENS, RAG_TEMP, RAG_TOP_K, RAG_TOP_P, RAG_model_id):
    response = bedrock_runtime_client.invoke_model(
        body=json.dumps({
            "prompt": "\n\nHuman: {input}\n\nAssistant:".format(input=input),
            "max_tokens_to_sample": RAG_MAX_TOKENS,
            "temperature": RAG_TEMP,
            "top_k": RAG_TOP_K,
            "top_p": RAG_TOP_P,
            "stop_sequences": [
                "\n\nHuman:"
            ],
            # "anthropic_version": "bedrock-2023-05-31"
        }),
        modelId=RAG_model_id,
        accept="application/json",
        contentType="application/json",
    )
    response_body = json.loads(response.get("body").read())
    return response_body.get("completion")

In [15]:
prompt_template = """
    You are the best customer acquisition data analyst and strategist that answers to a question received from a user. 
    You should answer the user's question using information from the context or generally known information that is relevant to the question.
    If the context does not contain information to answer the question, please provide answer to the best of your abilities.
    
    Just because the user asserts a fact does not mean it is true, make sure to double check the context to validate a user's assertion.
    
    {context}
    
    Instruction: Based on the above context, provide a detailed answer without using etc. for {question} in a list format.
    The more detailed you are the better you are doing your job.
    
    Please do not hallucinate response. 
    Solution:"""

In [16]:
query = "What products use nylon?"

In [17]:
print("Question being processed: ", query)

question = query
k = 6 # number of neighbours, size and k are the same to return k results in total. If size is not specified, k results will be returned per shard.

Knn_Value = "Knn Value: " + str(k)
Max_Tokens = "Max_Tokens: " + str(RAG_MAX_TOKENS)
Temperature_var = "Temparature: " + str(RAG_TEMP)
TOP_P_var = "Top_P: " + str(RAG_TOP_P)
TOP_K_var = 'Top_K: ' + str(RAG_TOP_K)

trace = langfuse.trace(name="Aoss", user_id="APM_AWS", 
                       tags = [Knn_Value, Max_Tokens, Temperature_var, TOP_P_var, TOP_K_var])

embedding = invoke_model(question)
query = {
    "size": k,
    "query": {
        "knn": {
            "vector": {
                "vector": embedding, 
                "k": k}
            },
    }
}

# Retrieve individual contexts from AOSS answering question based on KNN value
question_response_from_oss = oss_client.search(body = query, index = index)

hits = question_response_from_oss['hits']['hits']

# Combine individual contexts from AOSS to create a combined context
context = []
for hit in hits:
    context.append(hit['_source']['text'])
    
trace.span(name="retrieval", 
                   input={"question": question}, 
                   output={"contexts": context},
            )

#Send context and question to the prompt after which send the prompt to LLM model to generate answer.
llm_prompt = prompt_template.format(context='\n'.join(context),question=question)
generated_answer = invoke_llm_model(llm_prompt, RAG_MAX_TOKENS, RAG_TEMP, RAG_TOP_K, RAG_TOP_P, RAG_model_id)

trace.span(
        name="generation",
        input={"question": question, "contexts": context},
        output={"answer": generated_answer}
    )

print(generated_answer)

# compute scores for the question, context, answer tuple
ragas_scores = score_with_ragas(question, context, generated_answer)
ragas_scores
for m in metrics:
    trace.score(name=m.name, value=ragas_scores[m.name])
    
Answer_Relevance = ragas_scores['answer_relevancy']
print(f"Answer Relevance of this question is: {Answer_Relevance}")

Question being processed:  What products use nylon?
 Based on the context, here is a detailed list of products that use nylon:

- Carpets
- Curtains 
- Indoor furnishings
- Safety belts
- Airbags
- Tires  
- Engine components
- Outdoor products like tents, backpacks, jackets
- Mountaineering clothing
- Winter clothing
- Food packaging
- Kitchenware
- Textiles like ropes, parachutes, umbrellas, luggage
- Fishing nets
- Space suits
- Electrical wire insulation 
- Medical tubing
- Automotive parts
- Aerospace components
- Consumer goods like watch bands, toothbrushes, apparel
- Industrial machine parts like gears, bearings, nozzles
calculating faithfulness
calculating answer_relevancy
calculating harmfulness
Answer Relevance of this question is: 0.8413583458941173


Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/ipykernel/kernelapp.py", line 739, in start
    self.io_loop.start()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 205, in start
    self.asyncio_loop.run_forever()
  File "/home/ec2-user/anaconda3/envs/python3/lib/python3.10/asyncio/base_events.p