# ANLI with LLM

You have to implement in this notebook a better ANLI classifier using an LLM.
This classifier must be implemented using DSPy.


In [2]:
# Configure the DSPy environment with the language model - for grok the parameters must be:
# env variable should be in os.environ['XAI_API_KEY']
# "xai/grok-3-mini"
import os
import dspy

with open("grok_key.ini") as f:
        for line in f:
            if "XAI_API_KEY" in line and not line.strip().startswith("#"):
                key_value = line.strip().split("=")
                if len(key_value) == 2:
                    os.environ["XAI_API_KEY"] = key_value[1].split()[0]

with open("gemini_key.ini") as f:
        for line in f:
            if "GEMINI_API_KEY" in line and not line.strip().startswith("#"):
                key_value = line.strip().split("=")
                if len(key_value) == 2:
                    os.environ["GEMINI_API_KEY"] = key_value[1].split()[0]

lm = dspy.LM('xai/grok-3-mini', api_key=os.environ['XAI_API_KEY'])
# for ollama 
# lm = dspy.LM('ollama_chat/devstral', api_base='http://localhost:11434', api_key='')
dspy.configure(lm=lm)

In [3]:
from typing import Literal

#joint prompt module, identical to module in 1.3
class anli_classification_signature(dspy.Signature):

    """Label the relationship between given premise and hypothesis."""
    
    premise: str = dspy.InputField()
    hypothesis: str = dspy.InputField()
    label: Literal['entailment', 'contradiction', 'neutral'] = dspy.OutputField()
    explanation: str = dspy.OutputField()

joint_prompt = dspy.ChainOfThought(anli_classification_signature)

#pipeline approach
class explanation_signature(dspy.Signature):

    """Explain the relationship between the premise and the hypothesis."""

    premise: str = dspy.InputField()
    hypothesis: str = dspy.InputField()
    explanation: str = dspy.OutputField()

explain_prompt = dspy.ChainOfThought(explanation_signature)

class label_signature(dspy.Signature):

    """Label the relationship between the premise and the hypothesis based on the explanation provided."""

    premise: str = dspy.InputField()
    hypothesis: str = dspy.InputField()
    explanation: str = dspy.InputField()
    label: Literal['entailment', 'contradiction', 'neutral'] = dspy.OutputField()

label_prompt = dspy.ChainOfThought(label_signature)

## Load ANLI dataset

In [4]:
from datasets import load_dataset

dataset = load_dataset("facebook/anli")
dataset = dataset.filter(lambda x: x['reason'] != None and x['reason'] != "")

## Evaluate Metrics

Let's use the huggingface `evaluate` package to compute the performance of the baseline.


In [5]:
from evaluate import load

accuracy = load("accuracy")
precision = load("precision")
recall = load("recall")
f1 = load("f1")


In [6]:
import evaluate
clf_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

## Your Turn

Compute the classification metrics on the baseline LLM model on each test section of the ANLI dataset for samples that have a non-empty 'reason' field.

You also must show a comparison between the DeBERTa baseline model and this LLM baseline model. The comparison metric should compute the agreement between the two models:
* On how many samples they are both correct [Correct]
* On how many samples Model1 is correct and Model2 is incorrect [Correct1]
* On how many samples Model1 is incorrect and Model2 is correct [Correct2]
* On how many samples both are incorrect [Incorrect]

In [7]:
#lets inspect the sentence-transformer similarity score between the premise and hypothesis and human-reason.
import json
from sentence_transformers import CrossEncoder

similarity_ranker = CrossEncoder("cross-encoder/stsb-distilroberta-base")

with open("evaluation_list.json", "r") as f:
    evaluation_list = json.load(f)

In [8]:
sentence_combinations = [[item["reason_baseline_model"], f"{item['premise']} {item['hypothesis']}"] for item in evaluation_list]

scores_human_and_input = similarity_ranker.predict(sentence_combinations)
sum = 0
for score in scores_human_and_input:
    sum = sum + score

average = sum / len(scores_human_and_input)

In [9]:
print(average)
print(scores_human_and_input[:10])

0.49441814
[0.53344446 0.6161707  0.6025264  0.49474877 0.6256187  0.61067724
 0.5879405  0.5242762  0.5361876  0.33098713]


In [16]:
#now lets check the similarity of the previous model's explanation to the human-given one.

positive_optimized_cot = []
negative_optimized_cot = []

for item in evaluation_list:
    if item["pred_llm_label"] == item["gold_label"]:
        positive_optimized_cot.append(item)
    else:
        negative_optimized_cot.append(item)

positive_sentence_combinations = [[item["reason_baseline_model"], item["CoT_reasoning"]] for item in positive_optimized_cot]
negative_sentence_combinations = [[item["reason_baseline_model"], item["CoT_reasoning"]] for item in negative_optimized_cot]


scores_human_and_cot_positive= similarity_ranker.predict(positive_sentence_combinations)
scores_human_and_cot_negative= similarity_ranker.predict(negative_sentence_combinations)
sum_pos = 0
sum_neg = 0

for score in scores_human_and_cot_positive:
    sum_pos = sum_pos + score
for score in scores_human_and_cot_negative:
    sum_neg = sum_neg + score


average_pos = sum_pos / len(scores_human_and_cot_positive)
average_neg = sum_neg / len(scores_human_and_cot_negative)

print(f"average score positive label: {average_pos} , average score negative label: {average_neg}")
print(scores_human_and_cot_positive[:10])
print(scores_human_and_cot_negative[:10])

average score positive label: 0.6060670018196106 , average score negative label: 0.5626145005226135
[0.68913245 0.6223486  0.6875196  0.563914   0.71642214 0.706596
 0.66135895 0.73152167 0.57437295 0.6244497 ]
[0.59876716 0.5674838  0.64908993 0.78117406 0.53045356 0.4975226
 0.50495046 0.40600345 0.5768601  0.4279902 ]


In [17]:
#now lets check the similarity of the previous model's explanation to the input.

positive_sentence_combinations = [[f"{item['premise']} {item['hypothesis']}", item["CoT_reasoning"]] for item in positive_optimized_cot]
negative_sentence_combinations = [[f"{item['premise']} {item['hypothesis']}", item["CoT_reasoning"]] for item in negative_optimized_cot]


scores_input_and_cot_positive= similarity_ranker.predict(positive_sentence_combinations)
scores_input_and_cot_negative= similarity_ranker.predict(negative_sentence_combinations)
sum_pos = 0
sum_neg = 0

for score in scores_input_and_cot_positive:
    sum_pos = sum_pos + score
for score in scores_input_and_cot_negative:
    sum_neg = sum_neg + score


average_pos = sum_pos / len(scores_input_and_cot_positive)
average_neg = sum_neg / len(scores_input_and_cot_negative)

print(f"average score positive label: {average_pos} , average score negative label: {average_neg}")
print(scores_input_and_cot_positive[:10])
print(scores_input_and_cot_negative[:10])

average score positive label: 0.6503134965896606 , average score negative label: 0.6418948173522949
[0.77032435 0.6697404  0.7522436  0.6257553  0.75218487 0.53648984
 0.65566486 0.6724035  0.6455479  0.65812373]
[0.59979516 0.5802889  0.64465845 0.656749   0.77157074 0.4411291
 0.6103543  0.682139   0.6131603  0.533062  ]


In [None]:
#now lets check the similarity of the human reason explanation to the input.

sentence_combinations = [[f"{item['premise']} {item['hypothesis']}", item["reason_baseline_model"]] for item in evaluation_list]


scores_input_and_human= similarity_ranker.predict(sentence_combinations)
sum = 0

for score in scores_input_and_human:
    sum = sum + score


average = sum / len(scores_input_and_human)

print(f"average score: {average}")
print(scores_input_and_human[:10])

average score: 0.47828394174575806
[0.46913087 0.547252   0.637528   0.5595323  0.62431926 0.6304468
 0.59376085 0.5797517  0.5563645  0.30430552]


In [None]:
import numpy as np


Mean: 0.6070817708969116, std: 0.0934133231639862, mean-std=: 0.513668417930603
0.5592646


In [52]:

#we'll use this.
mean = np.median(scores_human_and_cot_positive)
std = np.std(scores_human_and_cot_positive)
print(f"Mean: {mean}, std: {std}, mean-std=: {mean - std}")
print(np.percentile(scores_human_and_cot_positive, 30))

mean = np.median(scores_input_and_cot_positive)
std = np.std(scores_input_and_cot_positive)
print(f"Mean: {mean}, std: {std}, mean-std=: {mean - std}")
print(np.percentile(scores_input_and_cot_positive, 30))

mean = np.median(scores_input_and_human)
std = np.std(scores_input_and_human)
print(f"Mean: {mean}, std: {std}, mean-std=: {mean - std}")
print(np.percentile(scores_input_and_human, 30))


Mean: 0.6070817708969116, std: 0.0934133231639862, mean-std=: 0.513668417930603
0.5592646
Mean: 0.6537635326385498, std: 0.07992640882730484, mean-std=: 0.5738371014595032
0.6092924
Mean: 0.48854631185531616, std: 0.12554122507572174, mean-std=: 0.3630051016807556
0.4181483


In [63]:
def explanation_score(data, pred: dspy.ChainOfThought) -> float:
    explanation = pred.explanation
    human_reason = data["reason"]
    input_text = data["premise"] + " " + data["hypothesis"]

    sim_model_to_human = similarity_ranker.predict([(explanation, human_reason)])[0]
    sim_model_to_input = similarity_ranker.predict([(explanation, input_text)])[0]
    sim_input_to_human = similarity_ranker.predict([(input_text, human_reason)])[0]

    if sim_model_to_human >= 0.6070817708969116 or sim_model_to_input >= 0.6537635326385498 or sim_input_to_human < 0.3630051016807556:
        return 1.0
    return 0.0


refined_joint_prompt = dspy.Refine(
    module=joint_prompt,
    N=3,
    reward_fn=explanation_score,
    threshold=1.0
)

refined_explanation = dspy.Refine(
    module=explain_prompt,
    N=3,
    reward_fn=explanation_score,
    threshold=1.0
)

import random

#sample = random.sample(evaluation_list, 100)



In [67]:
sum_total = 0
sum_joint = 0
sum_pipeline = 0
joint_prompt_and_pipeline_eval = []

label_map = {
    "entailment": 0,
    "neutral": 1,
    "contradiction": 2
}

for item in dataset['dev_r3']:
    joint_prompt_answer = refined_joint_prompt(**item)
    pipeline_explanation = refined_explanation(**item)
    pipeline_label = label_prompt(premise = item["premise"], hypothesis = item["hypothesis"], explanation = pipeline_explanation.explanation)
    sum_total+=1
    print(f"results: joint= {label_map[joint_prompt_answer.label]}, pipeline= {label_map[pipeline_label.label]}, gold = {item['label']}")
    if label_map[joint_prompt_answer.label] == item["label"]:
        sum_joint+=1
    if label_map[pipeline_label.label] == item["label"]:
        sum_pipeline+=1
    print(f"Joint score: {sum_joint}/{sum_total} , Pipeline score: {sum_pipeline}/{sum_total}")

    joint_prompt_and_pipeline_eval.append({
        "premise": item["premise"],
        "hypothesis": item["hypothesis"],
        "gold_label": item["label"],
        "human_reason": item["reason"],
        "joint_prompt_explanation": joint_prompt_answer.explanation,
        "joint_prompt_label": joint_prompt_answer.label,
        "pipeline_explanation": pipeline_explanation.explanation,
        "pipeline_label": pipeline_label.label
    })


results: joint= 0, pipeline= 0, gold = 0
Joint score: 1/1 , Pipeline score: 1/1
results: joint= 0, pipeline= 0, gold = 0
Joint score: 2/2 , Pipeline score: 2/2
results: joint= 0, pipeline= 0, gold = 0
Joint score: 3/3 , Pipeline score: 3/3
results: joint= 0, pipeline= 0, gold = 0
Joint score: 4/4 , Pipeline score: 4/4
results: joint= 0, pipeline= 0, gold = 0
Joint score: 5/5 , Pipeline score: 5/5
results: joint= 0, pipeline= 0, gold = 0
Joint score: 6/6 , Pipeline score: 6/6
results: joint= 0, pipeline= 1, gold = 0
Joint score: 7/7 , Pipeline score: 6/7
results: joint= 0, pipeline= 0, gold = 0
Joint score: 8/8 , Pipeline score: 7/8
results: joint= 0, pipeline= 0, gold = 0
Joint score: 9/9 , Pipeline score: 8/9
results: joint= 0, pipeline= 0, gold = 0
Joint score: 10/10 , Pipeline score: 9/10
results: joint= 0, pipeline= 0, gold = 0
Joint score: 11/11 , Pipeline score: 10/11
results: joint= 0, pipeline= 0, gold = 0
Joint score: 12/12 , Pipeline score: 11/12
results: joint= 0, pipeline= 

In [78]:
label_map = {
    "entailment": 0,
    "neutral": 1,
    "contradiction": 2
}
def display_evaluation_metrics(test_predictions, test_references):
    print(f"accuracy: {accuracy.compute(predictions=test_predictions, references=test_references)}")
    print(f"precision: {precision.compute(references=test_references, predictions=test_predictions, average='weighted' )}")
    print(f"recall: {recall.compute(references=test_references, predictions=test_predictions, average='weighted' )}")
    print(f"f1: {f1.compute(references=test_references, predictions=test_predictions, average='weighted' )}")

joint_labels = [label_map[e["joint_prompt_label"]] for e in joint_prompt_and_pipeline_eval]
gold_labels = [e["gold_label"] for e in joint_prompt_and_pipeline_eval]
pipeline_labels = [label_map[e["pipeline_label"]] for e in joint_prompt_and_pipeline_eval]

print("joint prompt score:")
display_evaluation_metrics(joint_labels, gold_labels)
print("pipeline score:")
display_evaluation_metrics(pipeline_labels, gold_labels)

joint prompt score:
accuracy: {'accuracy': 0.7625}
precision: {'precision': 0.7861148285650138}
recall: {'recall': 0.7625}
f1: {'f1': 0.7663842400561296}
pipeline score:
accuracy: {'accuracy': 0.7533333333333333}
precision: {'precision': 0.7559982378726119}
recall: {'recall': 0.7533333333333333}
f1: {'f1': 0.7543837324875559}
