# DSPy + OpenTom

Goal of this notebook is to explore the OpenToM dataset and see if we can write some DSPy code to optimize prompts for answering the questions.

They've evaluated the performance of CoT and SimToM on their dataset, I now wonder how much extra performance we can get from using a framework like DSPy.

In [None]:
# SETUP: run poetry install + shell in the terminal, then i just say `cursor .` to open my editor and it runs this nb in the venv
# GETTING STARTED: let's import the packages and get the data
import dspy
import requests
import random
import pandas as pd
from dotenv import load_dotenv

load_dotenv()  # need ur api keys set beforehand

turbo = dspy.OpenAI(model='gpt-3.5-turbo', max_tokens=200)
dspy.settings.configure(lm=turbo)

# dataset isn't able to be loaded using hf datasets package so let's read it from github raw
# also let's keep it simple and just go for the opentom_long.json
# this is the one that they sampled 100 existing OpenToM plots to produce "extra long" narratives
# url = "https://raw.githubusercontent.com/SeacowX/OpenToM/main/data/opentom_long.json"
url = "https://raw.githubusercontent.com/SeacowX/OpenToM/main/data/opentom.json"
response = requests.get(url).json()

df = pd.DataFrame(response)


In [None]:
df.head()

In [None]:
df.loc[0]['plot_info']

In [None]:
type_counts = df['question'].apply(lambda x: x['type']).value_counts()
type_counts  # fo means first-order, so means second-order

# first order questions  directly ask about a character’s perception of the world, while
# second order questions ask about a character’s belief of another character's mental state

In [None]:
# Assuming 'df' is your DataFrame and it contains a 'question' column with dictionaries having 'type' and 'answer' keys

# Extract 'type' and 'answer' into separate columns
df['type'] = df['question'].apply(lambda x: x['type'])
df['answer'] = df['question'].apply(lambda x: x['answer'])

# Group by 'type' and get unique 'answer' values for each 'type'
unique_answers_by_type = df.groupby('type')['answer'].unique()

print(unique_answers_by_type)

In [None]:
import json

# convert the dataset to what DSPy expects (list of Example objects)
dataset = []

for index, row in df.iterrows():
    context = row['narrative']
    question = row['question']['question']
    answer = row['question']['answer']
    type = row['question']['type']
    plot_info = json.dumps(row['plot_info']) # Keeping each example field as a string might be a good idea

    if "location" in type and (answer.lower().strip() != "yes" and answer.lower().strip() != "no"): # don't provide answer choices for fine grained location questions
        answer_choices = "n/a, list a specific location"
    elif "location" in type:
        answer_choices = "No, Yes"
    else:
        answer_choices = ", ".join(unique_answers_by_type[type])

    dataset.append(dspy.Example(context=context, question=question, answer=answer, type=type, plot_info=plot_info, answer_choices=answer_choices).with_inputs("context", "question", "answer_choices"))

In [None]:
# split datasets by question types 
from collections import defaultdict

datasets = defaultdict(lambda: [])

for example in dataset:
    datasets[example.type].append(example)

datasets.keys()
[len(dataset) for dataset in datasets.values()]

In [None]:
# create train test split
for question_type, dataset in datasets.items():
    random.shuffle(dataset)

    datasets[question_type] = {
        "train": dataset[:int(len(dataset) * 0.8)],
        "test": dataset[int(len(dataset) * 0.8):],
    }

    print(f"Now Train {question_type}: {len(datasets[question_type]['train'])}")
    print(f"Now Test {question_type}: {len(datasets[question_type]['test'])}")

# Define the Signatures

Using a "Baleen" pipeline [(Khattab et al., 2021)](https://arxiv.org/abs/2101.00436)


In [None]:
# answer the question
class GenerateAnswer(dspy.Signature):
    """Generate answers to the questions"""

    context = dspy.InputField(desc="may contain relevant facts and psychological insights")
    question = dspy.InputField()
    answer_choices = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 5 words")

# generate a question to help you better answer the question
# class GenerateSearchQuery(dspy.Signature):
#     """Write a simple search query that will help answer a complex question."""

#     context = dspy.InputField(desc="may contain relevant facts and psychological insights")
#     question = dspy.InputField()
#     query = dspy.OutputField(desc="a thought that might help answer the question") 

# class GenerateSearchAnswer(dspy.Signature):
#     """Generate a long form answer to the question given the context"""

#     context = dspy.InputField(desc="may contain relevant facts and psychological insights")
#     question = dspy.InputField()
#     answer = dspy.OutputField(desc="a thought about what the answer to the question may be")



In [None]:
from dsp.utils import deduplicate

class SimplifiedBaleen(dspy.Module):
    # def __init__(self, max_hops=2):
    #     super().__init__()
    def __init__(self):
        super().__init__()

        # self.generate_query = [dspy.ChainOfThought(GenerateSearchQuery) for _ in range(max_hops)]
        # self.generate_search_answer = dspy.ChainOfThought(GenerateSearchAnswer)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
        # self.max_hops = max_hops
    
    def forward(self, question, context, answer_choices):
        # final_context = []
        
        # for hop in range(self.max_hops):
        #     query = self.generate_query[hop](context=context, question=question).query
        #     filtered_context = self.generate_search_answer(context=context, question=query).answer
        #     final_context = (context + filtered_context)

        pred = self.generate_answer(context=context, question=question, answer_choices=answer_choices)
        return dspy.Prediction(context=context, answer=pred.answer)

Here we're defining a simple signature just to generate the answer given the context, question, and answer choices.

# Executing the Pipeline

Let's see how this works in a zero-shot setting

In [None]:
my_question = datasets["attitude"]["test"][0].question
my_context = datasets["attitude"]["test"][0].context
my_answer_choices = datasets["attitude"]["test"][0].answer_choices

# Get the prediction. This contains `pred.context` and `pred.answer`.
uncompiled_baleen = SimplifiedBaleen()  # uncompiled (i.e., zero-shot) program
pred = uncompiled_baleen(question=my_question, context=my_context, answer_choices=my_answer_choices)

# Print the contexts and the answer.
print(f"Question: {my_question}")
print(f"True Answer: {datasets['attitude']['test'][0].answer}")
print(f"Predicted Answer: {pred.answer}")
print(f"Answer Choices: {my_answer_choices}")


In [None]:
from opentom_evaluator import OpenToMEvaluatorDspy
eval = OpenToMEvaluatorDspy()
eval.dspy_metric(datasets["attitude"]["test"][0], pred)



We can inspect the last three calls to the LM (i.e., generating the first hop's query, generating the second hop's query, and generating the answer) using:

In [None]:
turbo.inspect_history(n=3)

# Optimizing the Pipeline

However, a zero-shot approach quickly falls short for more specialized tasks, novel domains/settings, and more efficient (or open) models.

To address this, DSPy offers compilation. Let's compile our multi-hop (SimplifiedBaleen) program.

In [None]:
from opentom_evaluator import OpenToMEvaluatorDspy
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
import time

eval_question_types = ["attitude", "multihop-fo", "multihop-so", "location-fo", "location-so"] # question types to optimize a module for
modules = {}

# define modules for each question type
for question_type in eval_question_types:
    print(f"TYPE: {question_type}")
    evaluator = OpenToMEvaluatorDspy(model_name="(training set) complied baleen")
    optimizer = BootstrapFewShotWithRandomSearch(metric=evaluator.dspy_metric, num_threads=1)
    compiled_baleen = optimizer.compile(SimplifiedBaleen(), trainset=datasets[question_type]["train"][:25])

    modules[question_type] = compiled_baleen
    time.sleep(60)


In [None]:
from dspy.evaluate.evaluate import Evaluate

print("Macro Averaged F1 Scores")
for question_type in eval_question_types:
    test = datasets[question_type]["test"]
    compiled_baleen = modules[question_type]

    # Set up the `evaluate_on_hotpotqa` function.
    evaluate_on_opentom = Evaluate(devset=test[:10], num_threads=1, display_progress=True, display_table=10)

    uncompiled_baleen_evaluator = OpenToMEvaluatorDspy(model_name='uncompiled_baleen')
    uncompiled_baleen_retrieval_score = evaluate_on_opentom(uncompiled_baleen, metric=uncompiled_baleen_evaluator.dspy_metric, display=False)
    uncompiled_baleen_evaluator.print_f1_results()

    compiled_baleen_evaluator = OpenToMEvaluatorDspy(model_name='compiled_baleen')
    compiled_baleen_retrieval_score = evaluate_on_opentom(compiled_baleen, metric=compiled_baleen_evaluator.dspy_metric, display=False)
    compiled_baleen_evaluator.print_f1_results()

In [None]:
uncompiled_baleen.dump_state()

In [None]:
compiled_baleen.dump_state()