# Score Answers

This notebook provides a testing framework to evaluate responses from DC API's chat.

For input, you'll need a csv file with `question`, `answer` and `ground_truth` columns. 
It will score the answers using an AWS Bedrock Claude model (either opus, haiku or sonnet) and produce a csv file additionally containing a `score`, `pass` (T/F) and `reason` columns.

Adapted from this [AWS sample notebook](https://github.com/aws-samples/llm-based-advanced-summarization/blob/main/Prompt%20Evaluation.ipynb)


## Prerequisite

If you haven't already logged into AWS, close this notebook, then, in your terminal, from this project's directory, login to AWS. Once you're logged in, repopen the Jupyter notebook. For example: 

`export AWS_PROFILE=staging && aws sso login`

`jupyter notebook`

In [None]:
import os

#confirm that you have AWS credentials
print(os.getenv('AWS_PROFILE'))

## Setup the Environment

First, import the libraries we'll need.


In [None]:
%pip install boto3
%pip install bs4

In [None]:
import boto3, time, json, csv
import json, os
from urllib.parse import urljoin
from botocore.config import Config
from datetime import datetime


In [None]:
#increase the standard time out limits in boto3, because Bedrock may take a while to respond to large requests.
my_config = Config(
    connect_timeout=60*3,
    read_timeout=60*3,
)
bedrock = boto3.client(service_name='bedrock-runtime',config=my_config)
bedrock_service = boto3.client(service_name='bedrock',config=my_config)

In [None]:
#check that it's working:
models = bedrock_service.list_foundation_models()
for line in models["modelSummaries"]:
    #print this out if you want to see all the models you have access to.
    print (line["modelId"])
    pass
if "anthropic.claude-3" in str(models):
    print("Claud-v3 found!")
else:
    print ("Error, no model found.")

## Create helper functions to send messages to Claude

In [None]:
# Choose what model to use ('haiku', 'sonnet', or 'opus)
model_version = 'sonnet'

In [None]:
MAX_ATTEMPTS = 3 #how many times to retry if Claude is not working.
session_cache = {} #for this session, do not repeat the same query to claude.
def ask_claude(messages,system="", DEBUG=False, model_version="haiku"):
    '''
    Send a prompt to Bedrock, and return the response.  Debug is used to see exactly what is being sent to and from Bedrock.
    messages can be an array of role/message pairs, or a string.
    '''
    raw_prompt_text = str(messages)
    
    if type(messages)==str:
        messages = [{"role": "user", "content": messages}]
    
    promt_json = {
        "system":system,
        "messages": messages,
        "max_tokens": 3000,
        "temperature": 0.7,
        "anthropic_version":"",
        "top_k": 250,
        "top_p": 0.7,
        "stop_sequences": ["\n\nHuman:"]
    }
    
    if DEBUG: print("sending:\nSystem:\n",system,"\nMessages:\n","\n".join(messages))
    
    if model_version== "opus":
        modelId = 'anthropic.claude-3-opus-20240229-v1:0'
    elif model_version== "sonnet":
        modelId = 'anthropic.claude-3-5-sonnet-20240620-v1:0'
    elif model_version== "haiku":
        modelId = 'anthropic.claude-3-haiku-20240307-v1:0'
    else:
        print ("ERROR:  Bad model version, must be opus, sonnet, or haiku.")
        modelId = 'error'
    
    if raw_prompt_text in session_cache:
        return [raw_prompt_text,session_cache[raw_prompt_text]]
    attempt = 1
    while True:
        try:
            response = bedrock.invoke_model(body=json.dumps(promt_json), modelId=modelId, accept='application/json', contentType='application/json')
            response_body = json.loads(response.get('body').read())
            results = response_body.get("content")[0].get("text")
            if DEBUG:print("Recieved:",results)
            break
        except Exception as e:
            print("Error with calling Bedrock: "+str(e))
            attempt+=1
            if attempt>MAX_ATTEMPTS:
                print("Max attempts reached!")
                results = str(e)
                break
            else:#retry in 10 seconds
                time.sleep(10)
    session_cache[raw_prompt_text] = results
    return [raw_prompt_text,results]

In [None]:
from queue import Queue
from threading import Thread

# Threaded function for queue processing.
def thread_request(q, result, model):
    while not q.empty():
        work = q.get()                      #fetch new work from the Queue
        thread_start_time = time.time()
        try:
            data = ask_claude(work[1], model)
            result[work[0]] = data          #Store data back at correct index
        except Exception as e:
            error_time = time.time()
            print('Error with prompt!',str(e))
            result[work[0]] = (str(e))
        #signal to the queue that task has been processed
        q.task_done()
    return True

def ask_claude_threaded(prompts, model, DEBUG=False):
    '''
    Call ask_claude, but multi-threaded.
    Returns a dict of the prompts and responces.
    '''
    print(f"Using model: {model}...")
    q = Queue(maxsize=0)
    num_theads = min(50, len(prompts))
    
    #Populating Queue with tasks
    results = [{} for x in prompts];
    #load up the queue with the promts to fetch and the index for each job (as a tuple):
    for i in range(len(prompts)):
        #need the index and the url in each queue item.
        q.put((i,prompts[i]))
        
    #Starting worker threads on queue processing
    for i in range(num_theads):
        #print('Starting thread ', i)
        worker = Thread(target=thread_request, daemon=True, args=(q,results, model))
        # worker.setDaemon(True)    #setting threads as "daemon" allows main program to 
                                  #exit eventually even if these dont finish 
                                  #correctly.
        worker.start()

    #now we wait until the queue has been processed
    q.join()

    # if DEBUG:print('All tasks completed.')
    print('All tasks completed.')
    return results

## Test that it's working

In [None]:
%%time
#check that Claude responses are working:
try:
    query = "Please say the number four."
    result = ask_claude(query)
    print(query)
    print(result[1])
except Exception as e:
    print("Error with calling Claude: "+str(e))

In [None]:
%%time
#test if our threaded Claude calls are working
q1 = [{"role": "user", "content": "Please say the number one."}]
q2 = [{"role": "user", "content": "Please say the number two."}]
q3 = [{"role": "user", "content": "Please say the number 55."}]

print(ask_claude_threaded([q1,q2,q3], 'opus'))

## Create the scoring prompt

In [None]:
scoring_prompt_template = """You are a grader.  Consider the following question along with its correct answer or ground truth and a submitted answer to grade.
Here is the question:
<question>{{QUESTION}}</question>
Here is the correct answer:
<ground_truth>{{GROUND_TRUTH}}</ground_truth>
Here is the submitted answer:
<answer>{{ANSWER}}</answer>
Please provide a score from 0 to 100 on how well this answer matches the correct answer for this question.
The score should be high if the answers say essentially the same thing.
The score should be lower if some facts are missing or incorrect, or if extra unnecessary facts have been included.
The score should be 0 for entirely wrong answers.  Put the score in <SCORE> tags. and your reasoning in <REASON> tags.
Do not consider your own answer to the question, but instead score based on the ground_truth above."""

In [None]:
# create the score answers function
def score_answers(prompt_template, question_answers, model):
    '''
    ask our LLM to score each of the generated answers.
    '''
    
    prompts = []
    for question in question_answers:  
        print(f"Scoring: {question['question']}...")
        prompts.append(scoring_prompt_template.replace("{{QUESTION}}", question["question"]).replace("{{GROUND_TRUTH}}",question["ground_truth"]).replace("{{ANSWER}}",question["answer"]))
    return ask_claude_threaded(prompts, model)

## Test that it's working with fake data (optional)

In [None]:
test_question_set = [
    {'question': "What color is green?", 'ground_truth': "It's green", "answer": "It's between yellow and blue and can be grassy or olive"},
    {'question': "Where am I?", 'ground_truth': "Here.", "answer": "You are neither here nor there. You are everywhere."},
    {'question': "Do dogs like cats?", 'ground_truth': "No.", "answer": "More than cats like dogs."}
]

In [None]:
fake_result = score_answers(scoring_prompt_template, test_question_set, model_version)
print(fake_result)

## Configure input source and load data

In [None]:
# put the path to your input file here
input_filename = 'output_files/20240917115836/40_realistic_with_ground_truth.csv'

In [None]:
# load the input data
csvfile = csv.DictReader(open(input_filename, "r", newline='', encoding='utf_8_sig') )
question_set = list(csvfile)


In [None]:
# verify it's loaded
print(question_set)

## Score the answers

In [None]:
# import dependencies
from bs4 import BeautifulSoup as BS

In [None]:
# create the function to score the answers
def evaluate_prompt(prompt_template, question_answers, threshhold):
    """
    Call score answers and format the results once all threads have returned.
    """
    scored_answers = score_answers(prompt_template, question_answers, model_version)
    print ("Done.")
    
    scores = []
    scores.append(['question','ground_truth','answer','score','reason','passed'])
    for prompt,response in scored_answers:
        soup = BS(prompt)
        question = soup.find('question').text
        ground_truth = soup.find('ground_truth').text
        answer = soup.find('answer').text
        soup = BS(response)
        score = soup.find('score').text
        reason = soup.find('reason').text
        passed = True
        if int(score)<threshhold:
            passed = False
        scores.append([question,ground_truth,answer,score,reason,passed])
        
    return scores

In [None]:
# Define what score would be passing
threshold_to_pass = 90

# Run the evaluation
scores = evaluate_prompt(scoring_prompt_template, question_set,threshhold=threshold_to_pass)

In [None]:
# can take a second
print(scores)

In [None]:
# write to file

timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
os.makedirs(os.path.join('output_files/scored', timestamp), exist_ok=True)
output_base_path = f"output_files/scored/{timestamp}"
filename = os.path.join(output_base_path, f"{os.path.splitext(os.path.basename(input_filename))[0]}.csv")

with open(filename, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(scores)

print(f"Output files saved to: {filename}")