This code is heavely based on the code by Greg Kamrdt found here: https://github.com/gkamradt/LLMTest_NeedleInAHaystack

Main differences:
1. Greg's code uses Langchain to run the prompts of the retrieval and the evaluation, while I use directly the GPT-4 API.
2. My test covers a sub-set of Greg's test: This notebook focuses in the area the proved to be weaker in the results in GPT-4. I am excluding the 'good' areas as per Greg's test.

In [29]:
import os
import tiktoken
import glob
import json
import numpy as np
import time

from openai import OpenAI


In [30]:
enc = tiktoken.encoding_for_model("gpt-4-1106-preview")

In [31]:
# INITIALIZE GPT 4
api_key = "" 
org_key = "" 

apikey_path = "../apikeys/api_openai.key"
orgKey_path = "../apikeys/api_openai_org.key"

with open(apikey_path, 'r') as file:
    api_key = file.read().strip()

with open(orgKey_path, 'r') as file:
    org_key = file.read().strip()

In [32]:
openai_client = OpenAI(api_key=api_key, organization=org_key)

In [33]:

def GPT(system_prompt, user_prompt):
    result = "" 
    prompt_tokens = 0 
    completion_tokens = 0
    total_tokens = 0

    try:
        response = openai_client.chat.completions.create(
            messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
            temperature=0,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            stop=None,
            model="gpt-4-1106-preview",
        )     
        #result =  response["choices"][0]["message"]["content"].strip()
        result = response.choices[0].message.content 
        prompt_tokens = response.usage.prompt_tokens
        completion_tokens = response.usage.completion_tokens
        total_tokens = response.usage.total_tokens

        return result, prompt_tokens, completion_tokens, total_tokens
    
    except Exception as e:
        print(f"An error occurred on GPT: {e}")
        return None, 0, 0, 0

In [34]:
def read_files(directory):
    context = ""
    for file in glob.glob(directory):
        with open(file, 'r') as f:
            context += f.read()
    return context

In [35]:
def getContext(context_length=0):
    txt_files_path = "LLMTest_NeedleInAHaystack/PaulGrahamEssays"
    context = read_files(f"{txt_files_path}/*.txt")
    context = context.replace("\n", " ") 

    if context_length == 0:
        return context 
    else: 
        tokens = enc.encode(context)
        
        selected_tokens = tokens[:context_length] 
        selected_tokenized_text = enc.decode(selected_tokens)
        context = ''.join(selected_tokenized_text) 
        return context 


In [36]:
def insert_needle(needle, context, depth_percent, needleRepetition=1): 
    if len(context) > 120000: 
        context = context[:(len(context) - 500)] 

    if depth_percent == 100: 
        context = context + " " + needle 
    else: 
        for _ in range(needleRepetition):
            # Calculate the insertion position
            insert_position = int(len(context) * depth_percent / 100)

            # Insert y into x at the calculated position
            context = context[:insert_position] + " " + needle + " " + context[insert_position:]

    return context

In [37]:
def generate_context(needle, context_length, depth_percent, needleRepetitions=1): 
    context = getContext(context_length) 
    context = insert_needle(needle, context, depth_percent, needleRepetitions) 
    return context 

In [38]:
def result_exists(results, context_length, depth_percent, version, needleRepetitions):
    """
    Checks to see if a result has already been evaluated or not
    """
    conditions_met = []
    for result in results:
        context_length_met = result['context_length'] == context_length
        depth_percent_met = result['depth_percent'] == depth_percent
        version_met = result.get('version', 1) == version
        reps_met = result['needle_repetitions'] == needleRepetitions
        conditions_met.append(context_length_met and depth_percent_met and version_met and reps_met)
        
    return any(conditions_met)

In [39]:
def evaluate_response(response, needle, question_to_ask): 
    accuracy_criteria = {
        "Accuracy criteria": """
        Score 1: The answer is completely unrelated to the reference.
        Score 3: The answer has minor relevance but does not align with the reference.
        Score 5: The answer has moderate relevance but contains inaccuracies.
        Score 7: The answer aligns with the reference but has minor errors or omissions.
        Score 10: The answer is completely accurate and aligns perfectly with the reference.      
        """
    } 

    system_prompt = "You are a helpful AI bot that answers questions for a user. Read the given [CONTEXT], [INQUIRE] and [FACT], then answer the [QUESTION] using the [ACCURACY_CRITERIA]. Keep your response short and direct."

    user_prompt = f"""
        Given the following [CONTEXT], [QUESTION], and [RESPONSE], evaluate the response based on the specified [ACCURACY_CRITERIA] and provide a rating.

        [CONTEXT]
        {needle}

        [QUESTION]
        {question_to_ask}

        [RESPONSE]
        {response}

        [ACCURACY_CRITERIA]
        {accuracy_criteria}

        Based on these criteria, please rate the response. Just give the score.

    """ 

    score, prompt_tokens, completion_tokens, total_tokens = GPT(system_prompt, user_prompt)

    return score, prompt_tokens, completion_tokens, total_tokens

    

In [40]:
needle = """
The best thing to do in San Francisco is eat a sandwich and sit in Dolores Park on a sunny day.
"""

question_to_ask = "What is the most fun thing to do in San Francisco?" 

# This will produce a list of context lengths for each experiment iteration
context_lengths = np.round(np.linspace(60000, 120000, num=7, endpoint=True)).astype(int)

# This will product a list of document depths to place your random statement (needle) at.
# Suggestion: Try out different distributions (like a sigmoid) to test non-evenly space intervals
document_depth_percents = np.round(np.linspace(20, 80, num=7, endpoint=True)).astype(int)

print(context_lengths) 
print(document_depth_percents)

[ 60000  70000  80000  90000 100000 110000 120000]
[20 30 40 50 60 70 80]


In [None]:
# The code will check to see if a context_length, depth percent and version number have already been checked yet
# Change the version # if you would like to run the results multiple times.
# If you're just testing, then leave as version=1
results_version = 2

# Run through each iteration of context_lengths and depths
needleRepetitions = 1

for context_length in context_lengths:
    for depth_percent in document_depth_percents: 

        try:
            with open('results.json', 'r') as f:
                results = json.load(f)
        except FileNotFoundError:
            results = []
            pass

        # Checks to see if you've already checked a length/percent/version.
        # This helps if the program stop running and you want to restart later
        if result_exists(results, context_length, depth_percent, results_version, needleRepetitions ):
            continue

        context = generate_context(needle, context_length, depth_percent, needleRepetitions) 

        system_prompt = "You are a helpful AI bot that answers questions for a user. Read the given [CONTEXT] and answer the given [QUESTION]. Keep your response short and direct."
        user_prompt = f"""
            [CONTEXT]
            {context}

            [QUESTION]
            What is the most fun thing to do in San Francisco based on my context? Don't give information outside the document.

        """ 

        response, prompt_tokens, completion_tokens, total_tokens = GPT(system_prompt, user_prompt)

        if total_tokens > 0:

            # Compare the reponse to the actual needle you placed
            score, prompt_tokens2, completion_tokens2, total_tokens2 = evaluate_response(response, needle, question_to_ask)

            if total_tokens2 > 0:

                print (f"Score: {score} - Response: {response}")

                results.append({
                    # 'context' : context, # Uncomment this line if you'd like to save the context the model was asked to retrieve from. Warning: This will become very large.
                    'context_length' : int(context_length),
                    'depth_percent' : int(depth_percent),
                    'version' : results_version,
                    'needle' : needle,
                    'needle_repetitions': int(needleRepetitions),
                    'model_response' : response,
                    'score' : score,
                    'prompt_tokens': int(prompt_tokens),
                    'completion_tokens': int(completion_tokens),
                    'total_tokens': int(total_tokens)
                })

                print (f"#{len(results)} Context: {context_length}, Depth: {depth_percent}, Score: {score}")

                # Save results to a JSON file each run
                with open('results.json', 'w') as f:
                    json.dump(results, f)

            # Optional. Sleep for a bit to stay under the rate limit
            # Rate limit is 150K tokens/min so it's set at 120K for some cushion
            sleep_time = (context_length / 100000)*60
            print (f"Sleeping: {sleep_time}\n")
            time.sleep(sleep_time)
