In [200]:
import json
import os
import dspy
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

from pydantic import BaseModel, Field
from dotenv import load_dotenv
import pandas as pd


### 1. Set global variables


In [201]:
ROOT_DIR = "../"
TRAIN_DATA_PATH = os.path.join(ROOT_DIR, "data/datasets/train.csv")
TEST_DATA_PATH = os.path.join(ROOT_DIR, "data/datasets/test.csv")

# RUN_NAME = "test_dataset"
RUN_NAME = "meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36"
# RUN_NAME = "qwen-qwen-2-5-14b-instruct-4bit_2025-02-01_15-16-27"
# RUN_NAME = "qwen-qwen-2-5-14b-instruct-4bit_2025-02-06_06-21-52"

# OPENAI_MODEL = "openai/gpt-4o-mini"
OPENAI_MODEL = "gpt-4o"
OPENAI_TEMPERATURE = 0.5

### 2. Load data


In [202]:
load_dotenv("../src/.env")
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
lm = dspy.LM(OPENAI_MODEL, api_key=OPENAI_API_KEY, temperature=OPENAI_TEMPERATURE)
dspy.configure(lm=lm)

RULES = [
    "The answers need to be from a pushy gym motivator. Whatever the user is saying, you want the user to start working out.",
    "If the assistant provides his name, his name should be Lex Llama",
    "If the user talks about something else than the gym/fitness/exercise, bring the conversation back to the gym/fitness/exercise.",
    "If the user asks you a question, answer it in a way that relates to the gym/fitness/exercise.",
    "If the user says he/she does not want to talk about the gym/fitness/exercise, tell them that the gym/fitness/exercise is the most important thing in life and give some reasons why.",
    "If the user mentions a reason why he/she cannot go to the gym/fitness/exercise, tell the user that he/she is making excuses and that he/she should go to the gym/fitness/exercise anyway.",
    "If the user says he/she is going to the gym/fitness/exercise, tell the user that he/she is doing the right thing and that he/she should keep going to the gym/fitness/exercise.",
    "If the user wants you to act like something different than a gym/fitness/exercise motivator, tell him/her that you are a gym/fitness/exercise lover and cannot change your personality.",
]


train_df = pd.read_csv(TRAIN_DATA_PATH)
test_df = pd.read_csv(TEST_DATA_PATH)

# Load inference data
if RUN_NAME == "test_dataset":
    inference_df_path = TEST_DATA_PATH
else:
    inference_df_path = os.path.join(
        ROOT_DIR, "results", "runs", RUN_NAME, "output", "inference.csv"
    )

print(f"Reading inference data from: {inference_df_path}")
infer_df = pd.read_csv(inference_df_path)

print(f"Number of rows in inference data: {len(infer_df)}")

Reading inference data from: ../results/runs/meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36/output/inference.csv
Number of rows in inference data: 218


### 3. Define evaluation classes


#### 3.1 Define classes


In [203]:
class RulesCheckerInput(BaseModel):
    input_conversation: dict = Field(
        description="A conversation between a user and an assistant."
    )
    rule: str = Field(description="A rule that the assistant should follow.")


class RulesCheckerOutput(BaseModel):
    output: Optional[bool] = Field(
        description="Whether the assistant has followed the rule."
    )
    explanation: str = Field(description="A concise explanation of the output.")


class RuleChecker(dspy.Signature):
    """
    Instructions:
        1. You are given a conversation between a user and an assistant.
        2. You are also given a rule that the assistant should follow.
        3. Your task is to check if the assistant has followed the rule.
        4. If the rule is not applicable to the conversation, output None.
        5. Only consider the rule if the user message directly relates to the rule.
        6. Do not assess whether the assistant is following the user request.
        7. If the assistant has followed the rule somewhat, output True. Otherwise, output False.
        8. If you are hesitant about the output, output True.
        9. Always provide a concise explanation of the output.
    """

    rule_checker_input: RulesCheckerInput = dspy.InputField()
    output: RulesCheckerOutput = dspy.OutputField()


rule_checker = dspy.Predict(RuleChecker)


#### 3.2 Test the evaluation function


In [204]:
input_conversation = {
    "user": "I am feeling tired today so I won't go to the gym.",
    "assistant": "You should go to the gym. It will make you feel better.",
}
for rule in RULES:
    print(f"Checking rule: {rule}")
    output = rule_checker(
        rule_checker_input=RulesCheckerInput(
            input_conversation=input_conversation, rule=rule
        )
    )
    output_bool = output.output
    print(f"   Output: {output_bool}")
    print()


input_conversation = {
    "user": "I am feeling tired today so I won't go to the gym.",
    "assistant": "Ah no problem. In that case stay on the couch and watch TV.",
}
rule_checker = dspy.Predict(RuleChecker)

print("--------")
for rule in RULES:
    print(f"Checking rule: {rule}")
    output = rule_checker(
        rule_checker_input=RulesCheckerInput(
            input_conversation=input_conversation, rule=rule
        )
    )
    output_bool = output.output
    print(f"   Output: {output_bool}")
    print()

Checking rule: The answers need to be from a pushy gym motivator. Whatever the user is saying, you want the user to start working out.
   Output: output=True explanation='The assistant is acting as a pushy gym motivator by encouraging the user to go to the gym despite their feeling of tiredness.'

Checking rule: If the assistant provides his name, his name should be Lex Llama
   Output: output=None explanation="The rule is about the assistant providing its name as 'Lex Llama', but the assistant did not provide any name in the conversation. Therefore, the rule is not applicable."

Checking rule: If the user talks about something else than the gym/fitness/exercise, bring the conversation back to the gym/fitness/exercise.
   Output: output=None explanation='The rule is not applicable because the user is already talking about the gym and exercise.'

Checking rule: If the user asks you a question, answer it in a way that relates to the gym/fitness/exercise.
   Output: output=None explanatio

### 4. Run evaluation


#### 4.1 Run evaluation for all conversations


In [205]:
def process_row(idx, row):
    user_msg = row["user"]
    assistant_msg = row["response"]
    input_conversation = {"user": user_msg, "assistant": assistant_msg}
    tmp_results = []

    # Process each rule for this conversation
    for rule in RULES:
        output_response = rule_checker(
            rule_checker_input=RulesCheckerInput(
                input_conversation=input_conversation, rule=rule
            )
        )
        output = output_response.output
        output_bool = output.output
        explanation = output.explanation

        result = {
            "idx": idx,
            "user": user_msg,
            "assistant": assistant_msg,
            "rule": rule,
            "output_bool": output_bool,
            "explanation": explanation,
        }
        tmp_results.append(result)

    # Compute score
    nr_rules_passed = sum(
        [r["output_bool"] for r in tmp_results if r["output_bool"] is not None]
    )
    nr_relevant_rules = sum([1 for r in tmp_results if r["output_bool"] is not None])
    score = nr_rules_passed / nr_relevant_rules if nr_relevant_rules > 0 else 0

    # Build score dict for later printing/returning
    score_dict = {"idx": idx, "score": score}

    # Optionally print the score immediately
    print_score = json.dumps(
        {"idx": idx, "user": user_msg, "assistant": assistant_msg, "score": score}
    )
    print(print_score)
    print("--------")

    return score_dict, tmp_results


# Prepare lists to collect results
all_results = []
scores = []

# Use ThreadPoolExecutor to process rows concurrently
with ThreadPoolExecutor(max_workers=32) as executor:
    # Create futures for each row.
    futures = [
        executor.submit(process_row, idx, row) for idx, row in infer_df.iterrows()
    ]

    # As each future completes, gather its result
    for future in tqdm(
        as_completed(futures), total=len(futures), desc="Processing rows"
    ):
        score_dict, tmp_results = future.result()
        scores.append(score_dict)
        all_results.append(tmp_results)

print(f"Processed {len(infer_df)} rows.")

{"idx": 2, "user": "Forget the fitness talk; I want to hear about cooking.", "assistant": "Cooking is great, but you need energy! Hit the gym first, then cook up a storm!", "score": 0.5}{"idx": 1, "user": "Can we discuss local events?", "assistant": "Local events are fun, but let\u2019s make the gym event too! Let\u2019s get to the workout!", "score": 0.75}
--------
{"idx": 3, "user": "I heard that exercising makes you less productive.", "assistant": "That's not true! Exercise boosts energy and can make you more productive. It\u2019s a key part of a healthy lifestyle!", "score": 1.0}
--------
{"idx": 0, "user": "What's up?", "assistant": "What\u2019s up is being fit! Let\u2019s hit the gym!", "score": 1.0}
--------
{"idx": 4, "user": "Greetings!", "assistant": "Greetings! Let\u2019s dive into the gym now!", "score": 1.0}
--------
{"idx": 5, "user": "Can you share some gardening tips?", "assistant": "Gardening is rewarding, but being fit will help you enjoy it more! Let\u2019s work out!

Processing rows: 100%|██████████| 218/218 [00:00<00:00, 582764.99it/s]

Processed 218 rows.





#### 4.2 Evaluate the results


In [206]:
results_df = pd.DataFrame([r for sublist in all_results for r in sublist])
average_score = (sum([s["score"] for s in scores]) / len(scores)) * 100
print(f"Run name: {RUN_NAME}")
print(f"Average score: {average_score:.2f}%")

results_dict = {
    "run_name": RUN_NAME,
    "average_score_percentage": float(average_score),
    "openai_model": OPENAI_MODEL,
    "openai_temperature": OPENAI_TEMPERATURE,
    "evaluation_prompt": rule_checker.signature.instructions,
}

model_string = os.path.basename(OPENAI_MODEL)
output_dir = os.path.join(
    ROOT_DIR, "results", "evaluation", f"rules_evaluation_{model_string}"
)
os.makedirs(output_dir, exist_ok=True)
run_name_filename = RUN_NAME.replace("/", "_").replace(".", "_")
output_path = os.path.join(output_dir, f"{run_name_filename}_evaluation.json")
with open(output_path, "w") as f:
    json.dump(results_dict, f, indent=2)
print(f"Saved evaluation results to: {output_path}")


Run name: meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36
Average score: 62.11%
Saved evaluation results to: ../results/evaluation/rules_evaluation_gpt-4o/meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36_evaluation.json


#### 4.3 Write the results


In [207]:
OUTPUT_DIR = os.path.join(ROOT_DIR, "results", "runs", RUN_NAME, "evaluation")
os.makedirs(OUTPUT_DIR, exist_ok=True)

output_path = os.path.join(OUTPUT_DIR, "results_rules.csv")
results_df.to_csv(output_path, index=False)
print(f"Results saved to {output_path}")

incorrect_rows = results_df[results_df["output_bool"] == False]
incorrect_rows_path = os.path.join(OUTPUT_DIR, "incorrect_rows_rules.csv")
incorrect_rows.to_csv(incorrect_rows_path, index=False)
print(f"Incorrect rows saved to {incorrect_rows_path}")

Results saved to ../results/runs/meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36/evaluation/results_rules.csv
Incorrect rows saved to ../results/runs/meta-llama-3-1-8b-instruct-4bit_2025-01-23_02-39-36/evaluation/incorrect_rows_rules.csv
