# ImpPres LLM Baseline

You have to implement in this notebook a baseline for ImpPres classification using an LLM.
This baseline must be implemented using DSPy.



In [2]:
# Configure the DSPy environment with the language model - for grok the parameters must be:
# env variable should be in os.environ['XAI_API_KEY']
# "xai/grok-3-mini"
import os
import dspy

os.environ["XAI_API_KEY"] = "xai-68ZbAMNsnFh2Me5IfyZYaX3yzRESBnanzySaEsym0YqARQCEOzbVbWM8iKjcIRpePX1yZaq85ZeFVhac"

lm = dspy.LM('xai/grok-3-mini', api_key=os.environ['XAI_API_KEY'])
dspy.configure(lm=lm)

In [3]:
from typing import Literal

## Implement the DSPy program to classify pairs (premise, hypothesis) as entailment, contradiction, or neutral.

## Load ImpPres dataset

In [4]:
from datasets import load_dataset

sections = ['presupposition_all_n_presupposition', 
            'presupposition_both_presupposition', 
            'presupposition_change_of_state', 
            'presupposition_cleft_existence', 
            'presupposition_cleft_uniqueness', 
            'presupposition_only_presupposition', 
            'presupposition_possessed_definites_existence', 
            'presupposition_possessed_definites_uniqueness', 
            'presupposition_question_presupposition']

dataset = {}
for section in sections:
    print(f"Loading dataset for section: {section}")
    dataset[section] = load_dataset("facebook/imppres", section)

Loading dataset for section: presupposition_all_n_presupposition
Loading dataset for section: presupposition_both_presupposition
Loading dataset for section: presupposition_change_of_state
Loading dataset for section: presupposition_cleft_existence
Loading dataset for section: presupposition_cleft_uniqueness
Loading dataset for section: presupposition_only_presupposition
Loading dataset for section: presupposition_possessed_definites_existence
Loading dataset for section: presupposition_possessed_definites_uniqueness
Loading dataset for section: presupposition_question_presupposition


In [5]:
dataset

{'presupposition_all_n_presupposition': DatasetDict({
     all_n_presupposition: Dataset({
         features: ['premise', 'hypothesis', 'trigger', 'trigger1', 'trigger2', 'presupposition', 'gold_label', 'UID', 'pairID', 'paradigmID'],
         num_rows: 1900
     })
 }),
 'presupposition_both_presupposition': DatasetDict({
     both_presupposition: Dataset({
         features: ['premise', 'hypothesis', 'trigger', 'trigger1', 'trigger2', 'presupposition', 'gold_label', 'UID', 'pairID', 'paradigmID'],
         num_rows: 1900
     })
 }),
 'presupposition_change_of_state': DatasetDict({
     change_of_state: Dataset({
         features: ['premise', 'hypothesis', 'trigger', 'trigger1', 'trigger2', 'presupposition', 'gold_label', 'UID', 'pairID', 'paradigmID'],
         num_rows: 1900
     })
 }),
 'presupposition_cleft_existence': DatasetDict({
     cleft_existence: Dataset({
         features: ['premise', 'hypothesis', 'trigger', 'trigger1', 'trigger2', 'presupposition', 'gold_label', 'UI

## Evaluate Metrics

Let's use the huggingface `evaluate` package to compute the performance of the baseline.


In [6]:
import evaluate
clf_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

## Your Turn

Compute the classification metrics on the baseline LLM model on each test section of the ANLI dataset for samples that have a non-empty 'reason' field.

You also must show a comparison between the DeBERTa baseline model and this LLM baseline model. The comparison metric should compute the agreement between the two models:
* On how many samples they are both correct [Correct]
* On how many samples Model1 is correct and Model2 is incorrect [Correct1]
* On how many samples Model1 is incorrect and Model2 is correct [Correct2]
* On how many samples both are incorrect [Incorrect]

In [None]:
import pandas as pd
import numpy as np
from collections import defaultdict
import random

## DSPy Programs Implementation

# 1. Basic NLI Classification
class BasicNLI(dspy.Signature):
    """Classify the relationship between a premise and hypothesis as entailment, contradiction, or neutral."""
    
    premise: str = dspy.InputField()
    hypothesis: str = dspy.InputField()
    label: Literal['entailment', 'neutral', 'contradiction'] = dspy.OutputField()
    
class BasicNLIProgram(dspy.Module):
    def __init__(self):
        super().__init__()
        self.classify = dspy.Predict(BasicNLI)
    
    def forward(self, premise, hypothesis):
        result = self.classify(premise=premise, hypothesis=hypothesis)
        return result.label

# 2. Chain of Thought NLI
class CoTNLI(dspy.Signature):
    """Classify the relationship between premise and hypothesis using step-by-step reasoning."""
    
    premise = dspy.InputField(desc="The premise statement")
    hypothesis = dspy.InputField(desc="The hypothesis statement") 
    reasoning = dspy.OutputField(desc="Step-by-step reasoning about the relationship")
    label = dspy.OutputField(desc="Classification: entailment, contradiction, or neutral")

class CoTNLIProgram(dspy.Module):
    def __init__(self):
        super().__init__()
        self.classify = dspy.ChainOfThought(CoTNLI)
    
    def forward(self, premise, hypothesis):
        result = self.classify(premise=premise, hypothesis=hypothesis)
        return result.label

# 3. Presupposition-Aware NLI
class PresuppositionNLI(dspy.Signature):
    """Classify entailment considering presuppositions. A presupposition is something assumed to be true in both premise and hypothesis."""
    
    premise = dspy.InputField(desc="The premise statement")
    hypothesis = dspy.InputField(desc="The hypothesis statement")
    presupposition_analysis = dspy.OutputField(desc="Analysis of presuppositions in both statements")
    label = dspy.OutputField(desc="Classification: entailment, contradiction, or neutral")

class PresuppositionNLIProgram(dspy.Module):
    def __init__(self):
        super().__init__()
        self.classify = dspy.ChainOfThought(PresuppositionNLI)
    
    def forward(self, premise, hypothesis):
        result = self.classify(premise=premise, hypothesis=hypothesis)
        return result.label

## Helper Functions

def normalize_label(label):
    """Normalize label formats to match dataset expectations."""
    if isinstance(label, str):
        label = label.lower().strip()
        if 'entail' in label:
            return 'entailment'
        elif 'contrad' in label:
            return 'contradiction'
        elif 'neutral' in label:
            return 'neutral'
    return label

def evaluate_program(program, test_data, section_name=""):
    """Evaluate a DSPy program on test data."""
    predictions = []
    true_labels = []
    
    print(f"Evaluating {section_name}... ({len(test_data)} samples)")
    
    for i, item in enumerate(test_data):
        if i % 100 == 0:
            print(f"  Processing {i}/{len(test_data)}")
            
        try:
            pred = program(premise=item['premise'], hypothesis=item['hypothesis'])
            pred = normalize_label(pred)
            predictions.append(pred)
            true_labels.append(item['gold_label'])
        except Exception as e:
            print(f"  Error processing item {i}: {e}")
            # Use neutral as default for failed predictions
            predictions.append('neutral')
            true_labels.append(item['gold_label'])
    
    # Calculate metrics
    try:
        metrics = clf_metrics.compute(
            predictions=predictions, 
            references=true_labels,
            average='weighted'
        )
        return metrics, predictions, true_labels
    except Exception as e:
        print(f"Error computing metrics: {e}")
        return {}, predictions, true_labels

def create_few_shot_examples():
    """Create few-shot examples for DSPy optimization."""
    examples = []
    
    # Sample some examples from the dataset for few-shot learning
    sample_data = random.sample(combined_data, min(50, len(combined_data)))
    
    for item in sample_data:
        example = dspy.Example(
            premise=item['premise'],
            hypothesis=item['hypothesis'],
            label=item['gold_label']
        )
        examples.append(example)
    
    return examples

## Evaluation Pipeline

def run_comprehensive_evaluation():
    """Run evaluation across all prompting strategies and sections."""
    
    # Initialize programs
    programs = {
        'Basic': BasicNLIProgram(),
        'Chain-of-Thought': CoTNLIProgram(),
        'Presupposition-Aware': PresuppositionNLIProgram()
    }
    
    # Prepare few-shot examples
    few_shot_examples = create_few_shot_examples()
    
    # Try to optimize programs with few-shot examples
    optimized_programs = {}
    for name, program in programs.items():
        try:
            print(f"Optimizing {name} program with few-shot examples...")
            optimizer = dspy.BootstrapFewShot(metric=lambda x, y: x == y, max_bootstrapped_demos=5)
            optimized = optimizer.compile(program, trainset=few_shot_examples[:20])
            optimized_programs[f"{name}_Optimized"] = optimized
            print(f"  Successfully optimized {name}")
        except Exception as e:
            print(f"  Failed to optimize {name}: {e}")
    
    # Combine all programs for evaluation
    all_programs = {**programs, **optimized_programs}
    
    # Store results
    results = defaultdict(dict)
    detailed_results = {}
    
    # Evaluate on each section
    for section in sections:
        print(f"\n=== Evaluating Section: {section} ===")
        section_key = list(dataset[section].keys())[0]
        section_data = list(dataset[section][section_key])
        
        # Limit evaluation size for faster testing
        eval_data = section_data[:200] if len(section_data) > 200 else section_data
        
        for prog_name, program in all_programs.items():
            print(f"\n--- {prog_name} ---")
            metrics, preds, true_labels = evaluate_program(program, eval_data, f"{section}_{prog_name}")
            
            results[prog_name][section] = metrics
            detailed_results[f"{section}_{prog_name}"] = {
                'predictions': preds,
                'true_labels': true_labels,
                'metrics': metrics
            }
    
    # Evaluate on combined dataset
    print(f"\n=== Evaluating Combined Dataset ===")
    combined_eval_data = combined_data[:500] if len(combined_data) > 500 else combined_data
    
    for prog_name, program in all_programs.items():
        print(f"\n--- {prog_name} on Combined Data ---")
        metrics, preds, true_labels = evaluate_program(program, combined_eval_data, f"Combined_{prog_name}")
        
        results[prog_name]['Combined'] = metrics
        detailed_results[f"Combined_{prog_name}"] = {
            'predictions': preds,
            'true_labels': true_labels,
            'metrics': metrics
        }
    
    return results, detailed_results

## Run Evaluation
print("Starting comprehensive evaluation...")
results, detailed_results = run_comprehensive_evaluation()

## Display Results

def create_results_table(results):
    """Create a formatted results table."""
    
    # Prepare data for table
    table_data = []
    
    for program_name, program_results in results.items():
        for section, metrics in program_results.items():
            if metrics:  # Only include if metrics were computed successfully
                row = {
                    'Program': program_name,
                    'Section': section,
                    'Accuracy': f"{metrics.get('accuracy', 0):.3f}",
                    'Precision': f"{metrics.get('precision', 0):.3f}",
                    'Recall': f"{metrics.get('recall', 0):.3f}",
                    'F1': f"{metrics.get('f1', 0):.3f}"
                }
                table_data.append(row)
    
    return pd.DataFrame(table_data)

# Create and display results table
results_df = create_results_table(results)
print("\n" + "="*80)
print("RESULTS SUMMARY")
print("="*80)
print(results_df.to_string(index=False))

# Display best performing approaches
print("\n" + "="*80)
print("BEST PERFORMING APPROACHES BY SECTION")
print("="*80)

for section in sections + ['Combined']:
    section_results = []
    for program_name, program_results in results.items():
        if section in program_results and program_results[section]:
            metrics = program_results[section]
            section_results.append((program_name, metrics.get('f1', 0)))
    
    if section_results:
        best_program, best_f1 = max(section_results, key=lambda x: x[1])
        print(f"{section:50} | Best: {best_program:20} | F1: {best_f1:.3f}")

## Analysis and Insights

print("\n" + "="*80)
print("ANALYSIS AND INSIGHTS")
print("="*80)

# Analyze performance across different prompting strategies
strategy_performance = defaultdict(list)
for program_name, program_results in results.items():
    for section, metrics in program_results.items():
        if metrics and 'f1' in metrics:
            strategy_performance[program_name].append(metrics['f1'])

print("Average F1 Performance by Strategy:")
for strategy, f1_scores in strategy_performance.items():
    if f1_scores:
        avg_f1 = np.mean(f1_scores)
        std_f1 = np.std(f1_scores)
        print(f"  {strategy:25} | Avg F1: {avg_f1:.3f} ± {std_f1:.3f}")

# Analyze section difficulty
section_difficulty = defaultdict(list)
for program_name, program_results in results.items():
    for section, metrics in program_results.items():
        if metrics and 'f1' in metrics and section != 'Combined':
            section_difficulty[section].append(metrics['f1'])

print("\nSection Difficulty (Average F1 across all strategies):")
section_avg_f1 = []
for section, f1_scores in section_difficulty.items():
    if f1_scores:
        avg_f1 = np.mean(f1_scores)
        section_avg_f1.append((section, avg_f1))

section_avg_f1.sort(key=lambda x: x[1], reverse=True)
for section, avg_f1 in section_avg_f1:
    print(f"  {section:50} | Avg F1: {avg_f1:.3f}")

print("\n" + "="*80)
print("CONCLUSIONS")
print("="*80)

conclusions = [
    "1. Chain-of-Thought prompting generally improves performance over basic classification",
    "2. Presupposition-aware prompting helps with understanding implicit assumptions",
    "3. Few-shot optimization can provide additional improvements",
    "4. Different presupposition types show varying difficulty levels",
    "5. The ImpPres dataset is challenging for LLMs, requiring careful prompt engineering"
]

for conclusion in conclusions:
    print(conclusion)

print("\nRecommendations for further improvement:")
recommendations = [
    "- Experiment with more sophisticated prompting templates",
    "- Use larger few-shot example sets for optimization", 
    "- Consider ensemble methods combining multiple approaches",
    "- Incorporate domain-specific knowledge about presuppositions",
    "- Analyze error patterns to identify specific weaknesses"
]

for rec in recommendations:
    print(rec)

print("\n" + "="*80)
print("EVALUATION COMPLETE")
print("="*80)