In [15]:
# RoBERTa BRAINTEASER Implementation - Comprehensive Notebook
# SemEval 2024 Task 9: Lateral Thinking Puzzles

import os
import re
import json
import numpy as np
import pandas as pd
import torch
import logging
from tqdm.notebook import tqdm
from typing import List, Dict, Tuple, Any, Optional
from dataclasses import dataclass
from torch.nn import CrossEntropyLoss
from transformers import RobertaTokenizer, RobertaForMaskedLM
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML, Markdown
import warnings
warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("🧠 RoBERTa BRAINTEASER Implementation")
print("=" * 50)
print("SemEval 2024 Task 9: Lateral Thinking Puzzles")
print("Model: RoBERTa-Large with Masked Language Modeling")
print("=" * 50)

🧠 RoBERTa BRAINTEASER Implementation
SemEval 2024 Task 9: Lateral Thinking Puzzles
Model: RoBERTa-Large with Masked Language Modeling


In [16]:
# =====================================
# 1. CONFIGURATION AND SETUP
# =====================================

@dataclass
class Config:
    """Configuration class for RoBERTa BRAINTEASER implementation"""

    # Model configuration
    model_name: str = "roberta-large"
    device: str = "cuda:0" if torch.cuda.is_available() else "cpu"
    max_sequence_per_time: int = 80

    # Data paths (to be set by user)
    sentence_data_path: str = "/kaggle/input/puzzle-data/SP-train.npy"
    wordplay_data_path: str = "/kaggle/input/puzzle-data/WP-train.npy"

    # Evaluation settings
    lowercase_choices: bool = True
    batch_processing: bool = True

    def __post_init__(self):
        if not torch.cuda.is_available() and self.device.startswith("cuda"):
            self.device = "cpu"
            logger.warning("CUDA not available, switching to CPU")

config = Config()
print(f"📱 Device: {config.device}")
print(f"🤖 Model: {config.model_name}")

📱 Device: cuda:0
🤖 Model: roberta-large


In [17]:
# =====================================
# 2. MODEL INITIALIZATION
# =====================================

class RoBERTaBrainTeaserModel:
    """RoBERTa model wrapper for BRAINTEASER evaluation"""

    def __init__(self, config: Config):
        self.config = config
        self.tokenizer = None
        self.model = None
        self.device = config.device

    def load_model(self):
        """Load RoBERTa tokenizer and model"""
        try:
            print("🔄 Loading RoBERTa model...")
            self.tokenizer = RobertaTokenizer.from_pretrained(self.config.model_name)
            self.model = RobertaForMaskedLM.from_pretrained(self.config.model_name)
            self.model.to(self.device)
            self.model.eval()
            print("✅ Model loaded successfully!")

            # Set pad token if not available
            if self.tokenizer.pad_token_id is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

        except Exception as e:
            logger.error(f"Error loading model: {e}")
            raise e

    def get_model_info(self):
        """Display model information"""
        if self.model is None:
            print("❌ Model not loaded yet. Call load_model() first.")
            return

        num_params = sum(p.numel() for p in self.model.parameters())
        trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)

        info = {
            "Model Name": self.config.model_name,
            "Device": self.device,
            "Total Parameters": f"{num_params:,}",
            "Trainable Parameters": f"{trainable_params:,}",
            "Vocab Size": self.tokenizer.vocab_size,
            "Max Position Embeddings": self.model.config.max_position_embeddings,
        }

        print("🔍 Model Information:")
        print("-" * 30)
        for key, value in info.items():
            print(f"{key}: {value}")

# Initialize model
roberta_model = RoBERTaBrainTeaserModel(config)
roberta_model.load_model()
roberta_model.get_model_info()

🔄 Loading RoBERTa model...
✅ Model loaded successfully!
🔍 Model Information:
------------------------------
Model Name: roberta-large
Device: cuda:0
Total Parameters: 355,412,057
Trainable Parameters: 355,412,057
Vocab Size: 50265
Max Position Embeddings: 514


In [18]:
# =====================================
# 3. DATA LOADING AND PREPROCESSING
# =====================================

class BrainTeaserDataLoader:
    """Enhanced data loader for BRAINTEASER dataset"""

    def __init__(self, config: Config):
        self.config = config
        self.sentence_data = []
        self.wordplay_data = []
        self.all_data = []

    def load_data(self) -> bool:
        """Load BRAINTEASER data from numpy files"""
        try:
            # Load sentence puzzles
            if os.path.exists(self.config.sentence_data_path):
                sentence_raw = np.load(self.config.sentence_data_path, allow_pickle=True)
                self.sentence_data = self._process_raw_data(sentence_raw, "SP")
                print(f"✅ Loaded {len(self.sentence_data)} Sentence Puzzle examples")
            else:
                print(f"⚠️ Sentence data not found at {self.config.sentence_data_path}")

            # Load word puzzles
            if os.path.exists(self.config.wordplay_data_path):
                wordplay_raw = np.load(self.config.wordplay_data_path, allow_pickle=True)
                self.wordplay_data = self._process_raw_data(wordplay_raw, "WP")
                print(f"✅ Loaded {len(self.wordplay_data)} Word Puzzle examples")
            else:
                print(f"⚠️ Wordplay data not found at {self.config.wordplay_data_path}")

            # Combine all data
            self.all_data = self.sentence_data + self.wordplay_data
            print(f"📊 Total examples: {len(self.all_data)}")

            return len(self.all_data) > 0

        except Exception as e:
            logger.error(f"Error loading data: {e}")
            return False

    def _process_raw_data(self, raw_data: np.ndarray, task_type: str) -> List[Dict]:
        """Process raw numpy data into structured format"""
        processed = []

        for item in raw_data:
            if hasattr(item, 'item'):
                item = item.item()

            # Ensure required fields exist
            if not all(key in item for key in ['question', 'choice_list', 'label']):
                continue

            processed_item = {
                'id': item.get('id', f"{task_type}_{len(processed)}"),
                'question': item['question'],
                'choice_list': item['choice_list'],
                'label': item['label'],
                'task_type': task_type,
                'answer': item.get('answer', ''),
                'distractors': item.get('distractor', [])
            }

            processed.append(processed_item)

        return processed

    def get_data_statistics(self) -> Dict[str, Any]:
        """Generate comprehensive data statistics"""
        if not self.all_data:
            return {}

        stats = {
            'total_examples': len(self.all_data),
            'sentence_puzzles': len(self.sentence_data),
            'word_puzzles': len(self.wordplay_data),
            'task_distribution': {
                'SP': len(self.sentence_data) / len(self.all_data) * 100,
                'WP': len(self.wordplay_data) / len(self.all_data) * 100
            }
        }

        # Question length statistics
        all_q_lengths = [len(item['question'].split()) for item in self.all_data]
        sp_q_lengths = [len(item['question'].split()) for item in self.sentence_data]
        wp_q_lengths = [len(item['question'].split()) for item in self.wordplay_data]

        stats['question_length'] = {
            'overall': {'mean': np.mean(all_q_lengths), 'std': np.std(all_q_lengths)},
            'sentence_puzzles': {'mean': np.mean(sp_q_lengths), 'std': np.std(sp_q_lengths)} if sp_q_lengths else {},
            'word_puzzles': {'mean': np.mean(wp_q_lengths), 'std': np.std(wp_q_lengths)} if wp_q_lengths else {}
        }

        # Answer distribution
        answer_distribution = {}
        for item in self.all_data:
            label = item['label']
            answer_distribution[label] = answer_distribution.get(label, 0) + 1

        stats['answer_distribution'] = answer_distribution

        return stats

    def display_examples(self, num_examples: int = 3):
        """Display example questions from the dataset"""
        if not self.all_data:
            print("❌ No data loaded")
            return

        print("📋 Example Questions:")
        print("=" * 50)

        # Show examples from each task type
        sp_examples = [item for item in self.all_data if item['task_type'] == 'SP'][:num_examples]
        wp_examples = [item for item in self.all_data if item['task_type'] == 'WP'][:num_examples]

        for i, example in enumerate(sp_examples):
            print(f"\n🧩 Sentence Puzzle {i+1}:")
            print(f"Question: {example['question']}")
            print("Choices:")
            for j, choice in enumerate(example['choice_list']):
                marker = "✅" if j == example['label'] else "  "
                print(f"  {marker} ({chr(65+j)}) {choice}")
            print(f"Correct Answer: {example.get('answer', 'N/A')}")

        for i, example in enumerate(wp_examples):
            print(f"\n🎯 Word Puzzle {i+1}:")
            print(f"Question: {example['question']}")
            print("Choices:")
            for j, choice in enumerate(example['choice_list']):
                marker = "✅" if j == example['label'] else "  "
                print(f"  {marker} ({chr(65+j)}) {choice}")
            print(f"Correct Answer: {example.get('answer', 'N/A')}")

# Load data
data_loader = BrainTeaserDataLoader(config)
data_loaded = data_loader.load_data()

if data_loaded:
    # Display statistics
    stats = data_loader.get_data_statistics()
    print("\n📊 Dataset Statistics:")
    print("-" * 30)
    print(f"Total Examples: {stats['total_examples']}")
    print(f"Sentence Puzzles: {stats['sentence_puzzles']} ({stats['task_distribution']['SP']:.1f}%)")
    print(f"Word Puzzles: {stats['word_puzzles']} ({stats['task_distribution']['WP']:.1f}%)")
    print(f"Average Question Length: {stats['question_length']['overall']['mean']:.1f} ± {stats['question_length']['overall']['std']:.1f} words")

    # Display examples
    data_loader.display_examples()
else:
    print("❌ Please ensure data files are available in the specified paths")

✅ Loaded 507 Sentence Puzzle examples
✅ Loaded 396 Word Puzzle examples
📊 Total examples: 903

📊 Dataset Statistics:
------------------------------
Total Examples: 903
Sentence Puzzles: 507 (56.1%)
Word Puzzles: 396 (43.9%)
Average Question Length: 24.5 ± 20.0 words
📋 Example Questions:

🧩 Sentence Puzzle 1:
Question: Mr. and Mrs. Mustard have six daughters and each daughter has one brother. But there are only 9 people in the family, how is that possible?
Choices:
     (A) Some daughters get married and have their own family.
  ✅ (B) Each daughter shares the same brother.
     (C) Some brothers were not loved by family and moved away.
     (D) None of above.
Correct Answer: Each daughter shares the same brother.

🧩 Sentence Puzzle 2:
Question: The six daughters of Mr. and Mrs. Mustard each have one brother. However, the family only consists of nine people; how is that possible?
Choices:
     (A) Some brothers were not loved by family and moved away.
     (B) Some daughters get married 

In [19]:
# =====================================
# 4. CORE SCORING FUNCTIONS
# =====================================

class RoBERTaScorer:
    """Core scoring functionality for RoBERTa BRAINTEASER evaluation"""

    def __init__(self, model_wrapper: RoBERTaBrainTeaserModel):
        self.model_wrapper = model_wrapper
        self.tokenizer = model_wrapper.tokenizer
        self.model = model_wrapper.model
        self.device = model_wrapper.device
        self.config = model_wrapper.config

    def score_question(self, question: str, choices: List[str]) -> int:
        """
        Score a single question with multiple choices using masked language modeling

        Args:
            question: The question text
            choices: List of answer choices

        Returns:
            Index of the predicted choice (0-based)
        """
        # Preprocess choices
        if self.config.lowercase_choices:
            processed_choices = [choice[0].lower() + choice[1:] if choice else choice
                               for choice in choices]
        else:
            processed_choices = choices

        # Get pad token ID
        pad_token_id = self.tokenizer.pad_token_id if self.tokenizer.pad_token_id is not None else 0

        # Encode question and choices
        question_ids = self.tokenizer.encode(question)
        choice_ids = [self.tokenizer.encode(choice, add_prefix_space=True)[1:-1]
                     for choice in processed_choices]

        # Create sequences for each choice
        sequences = [question_ids[:-1] + choice_ids[i] + [self.tokenizer.sep_token_id]
                    for i in range(len(choice_ids))]

        # Create label IDs for loss calculation
        label_ids = [[-100] + text[1:-1] + [-100] for text in sequences]

        # Prepare input tensors
        sequences, label_ids, attention_mask = self._prepare_input(sequences, label_ids, pad_token_id)

        # Get prediction
        prediction = self._token_wise_scoring(sequences, label_ids, attention_mask)

        return prediction

    def _prepare_input(self, sequences: List[List[int]], label_ids: List[List[int]],
                      pad_token_id: int) -> Tuple[List[List[int]], List[List[int]], np.ndarray]:
        """Prepare input sequences with padding and attention masks"""
        max_length = max([len(text) for text in sequences])
        attention_mask = np.zeros((len(sequences), max_length))

        for i in range(len(sequences)):
            attention_mask[i][:len(sequences[i])] = 1

        # Pad sequences
        sequences = [text + [pad_token_id] * (max_length - len(text)) for text in sequences]
        label_ids = [text + [-100] * (max_length - len(text)) for text in label_ids]

        return sequences, label_ids, attention_mask

    def _token_wise_scoring(self, sequences: List[List[int]], label_ids: List[List[int]],
                           attention_mask: np.ndarray) -> int:
        """Perform token-wise scoring using masked language modeling"""
        choice_losses = [0 for _ in range(len(sequences))]

        for i in range(len(sequences)):
            curr_label_ids = label_ids[i]
            tmp_seq_list = []
            tmp_label_list = []
            tmp_attention_mask = []

            # Create masked versions for each non-ignored token
            for j, label in enumerate(curr_label_ids):
                if label == -100:
                    continue

                # Create masked sequence
                masked_seq = sequences[i][:j] + [self.tokenizer.mask_token_id] + sequences[i][j+1:]
                label_seq = [-100] * j + sequences[i][j:j+1] + [-100] * (len(sequences[i]) - j - 1)

                tmp_seq_list.append(torch.tensor(masked_seq).long().to(self.device))
                tmp_label_list.append(torch.tensor(label_seq).long().to(self.device))
                tmp_attention_mask.append(torch.tensor(attention_mask[i]).long().to(self.device))

            if not tmp_seq_list:
                continue

            # Stack tensors
            tmp_seq_list = torch.stack(tmp_seq_list)
            tmp_label_list = torch.stack(tmp_label_list)
            tmp_attention_mask = torch.stack(tmp_attention_mask)

            # Compute loss in batches
            if len(tmp_seq_list) < self.config.max_sequence_per_time:
                loss = self._get_lm_score(tmp_seq_list, tmp_label_list, tmp_attention_mask)
            else:
                loss = []
                for chunk_start in range(0, len(tmp_seq_list), self.config.max_sequence_per_time):
                    chunk_end = chunk_start + self.config.max_sequence_per_time
                    chunk_loss = self._get_lm_score(
                        tmp_seq_list[chunk_start:chunk_end],
                        tmp_label_list[chunk_start:chunk_end],
                        tmp_attention_mask[chunk_start:chunk_end]
                    )
                    loss.append(chunk_loss)
                loss = np.concatenate(loss)

            # Average loss for this choice
            choice_losses[i] = sum(loss) / len(loss) if len(loss) > 0 else float('inf')

        # Return index of choice with minimum loss
        return choice_losses.index(min(choice_losses))

    def _get_lm_score(self, batch: torch.Tensor, label_ids: torch.Tensor,
                     attention_mask: torch.Tensor) -> np.ndarray:
        """Get cross-entropy loss for a batch of sequences"""
        with torch.no_grad():
            num_choices, max_length = batch.shape
            label_ids_flat = label_ids.view(-1)

            # Forward pass
            outputs = self.model(batch, attention_mask=attention_mask)
            lm_logits = outputs.logits.view(-1, outputs.logits.size(-1))

            # Compute loss
            loss_fct = CrossEntropyLoss(reduction="none")
            loss = loss_fct(lm_logits, label_ids_flat)
            loss = loss.view(num_choices, -1).sum(1).cpu().numpy()

        return loss

# Initialize scorer
scorer = RoBERTaScorer(roberta_model)
print("✅ RoBERTa scorer initialized successfully!")

✅ RoBERTa scorer initialized successfully!


In [20]:
# =====================================
# 5. EVALUATION FUNCTIONS
# =====================================

class BrainTeaserEvaluator:
    """Comprehensive evaluation for BRAINTEASER tasks"""

    def __init__(self, scorer: RoBERTaScorer):
        self.scorer = scorer
        self.results = []

    def evaluate_dataset(self, data: List[Dict], show_progress: bool = True) -> Dict[str, Any]:
        """Evaluate the entire dataset"""
        predictions = []
        ground_truth = []
        detailed_results = []

        iterator = tqdm(data, desc="Evaluating") if show_progress else data

        for sample in iterator:
            question = sample['question']
            choices = sample['choice_list']
            true_label = sample['label']

            # Get prediction
            prediction = self.scorer.score_question(question, choices)

            predictions.append(prediction)
            ground_truth.append(true_label)

            # Store detailed result
            result_item = sample.copy()
            result_item['prediction'] = prediction
            result_item['correct'] = prediction == true_label
            result_item['predicted_choice'] = choices[prediction] if prediction < len(choices) else "Invalid"

            detailed_results.append(result_item)

        self.results = detailed_results

        # Calculate metrics
        metrics = self._calculate_metrics(detailed_results)

        return metrics

    def _calculate_metrics(self, results: List[Dict]) -> Dict[str, Any]:
        """Calculate comprehensive evaluation metrics"""
        # Basic accuracy
        correct = sum(1 for r in results if r['correct'])
        total = len(results)
        instance_accuracy = correct / total if total > 0 else 0

        # Task-specific accuracy
        sp_results = [r for r in results if r['task_type'] == 'SP']
        wp_results = [r for r in results if r['task_type'] == 'WP']

        sp_accuracy = sum(1 for r in sp_results if r['correct']) / len(sp_results) if sp_results else 0
        wp_accuracy = sum(1 for r in wp_results if r['correct']) / len(wp_results) if wp_results else 0

        # Group-based accuracy (for adversarial variants)
        group_accuracy = self._calculate_group_accuracy(results)

        metrics = {
            'instance_accuracy': instance_accuracy,
            'sentence_puzzle_accuracy': sp_accuracy,
            'word_puzzle_accuracy': wp_accuracy,
            'group_accuracy': group_accuracy,
            'total_examples': total,
            'correct_predictions': correct,
            'sp_examples': len(sp_results),
            'wp_examples': len(wp_results)
        }

        return metrics

    def _calculate_group_accuracy(self, results: List[Dict]) -> Dict[str, float]:
        """Calculate group-based accuracy for adversarial variants"""
        # Group by base ID (remove variant suffixes)
        groups = {}
        for result in results:
            base_id = result['id'].split('_')[0] if '_' in result['id'] else result['id']
            base_id = base_id.split('-')[1] if '-' in base_id else base_id

            if base_id not in groups:
                groups[base_id] = []
            groups[base_id].append(result)

        # Calculate group accuracy
        correct_groups = 0
        total_groups = len(groups)

        group_details = {}

        for group_id, group_results in groups.items():
            all_correct = all(r['correct'] for r in group_results)
            if all_correct:
                correct_groups += 1

            group_details[group_id] = {
                'all_correct': all_correct,
                'individual_results': [(r['id'], r['correct']) for r in group_results]
            }

        group_accuracy = correct_groups / total_groups if total_groups > 0 else 0

        return {
            'overall': group_accuracy,
            'correct_groups': correct_groups,
            'total_groups': total_groups,
            'details': group_details
        }

    def display_results(self, metrics: Dict[str, Any]):
        """Display comprehensive evaluation results"""
        print("\n🎯 EVALUATION RESULTS")
        print("=" * 50)

        print(f"📊 Overall Performance:")
        print(f"  Instance Accuracy: {metrics['instance_accuracy']:.4f} ({metrics['correct_predictions']}/{metrics['total_examples']})")

        if metrics['sp_examples'] > 0:
            print(f"  Sentence Puzzles: {metrics['sentence_puzzle_accuracy']:.4f} ({sum(1 for r in self.results if r['task_type'] == 'SP' and r['correct'])}/{metrics['sp_examples']})")

        if metrics['wp_examples'] > 0:
            print(f"  Word Puzzles: {metrics['word_puzzle_accuracy']:.4f} ({sum(1 for r in self.results if r['task_type'] == 'WP' and r['correct'])}/{metrics['wp_examples']})")

        if 'group_accuracy' in metrics and isinstance(metrics['group_accuracy'], dict):
            group_acc = metrics['group_accuracy']
            print(f"  Group Accuracy: {group_acc['overall']:.4f} ({group_acc['correct_groups']}/{group_acc['total_groups']})")

        # Error analysis
        self._display_error_analysis()

    def _display_error_analysis(self):
        """Display detailed error analysis"""
        if not self.results:
            return

        errors = [r for r in self.results if not r['correct']]

        print(f"\n❌ Error Analysis:")
        print(f"  Total Errors: {len(errors)}")

        if errors:
            # Error by task type
            sp_errors = [e for e in errors if e['task_type'] == 'SP']
            wp_errors = [e for e in errors if e['task_type'] == 'WP']

            print(f"  Sentence Puzzle Errors: {len(sp_errors)}")
            print(f"  Word Puzzle Errors: {len(wp_errors)}")

            # Show a few example errors
            print(f"\n🔍 Sample Errors:")
            for i, error in enumerate(errors[:3]):
                print(f"\n  Error {i+1} ({error['task_type']}):")
                print(f"    Question: {error['question'][:100]}...")
                print(f"    Predicted: {error['predicted_choice']}")
                print(f"    Correct: {error['choice_list'][error['label']]}")

    def export_results(self, filename: str = "roberta_results.json"):
        """Export detailed results to JSON file"""
        if not self.results:
            print("❌ No results to export")
            return

        export_data = {
            'model': self.scorer.config.model_name,
            'total_examples': len(self.results),
            'results': self.results
        }

        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)

        print(f"✅ Results exported to {filename}")

In [21]:
# =====================================
# 6. MAIN EVALUATION EXECUTION
# =====================================

def run_roberta_evaluation():
    """Run the complete RoBERTa evaluation on BRAINTEASER"""

    if not data_loaded:
        print("❌ Cannot run evaluation: No data loaded")
        return None

    print("\n🚀 Starting RoBERTa BRAINTEASER Evaluation")
    print("=" * 50)

    # Initialize evaluator
    evaluator = BrainTeaserEvaluator(scorer)

    # Run evaluation
    metrics = evaluator.evaluate_dataset(data_loader.all_data)

    # Display results
    evaluator.display_results(metrics)

    # Export results
    evaluator.export_results()

    return evaluator, metrics

# Run evaluation if data is loaded
if data_loaded:
    # Test with a small sample first
    print("\n🧪 Testing with sample data...")
    sample_data = data_loader.all_data[:5]  # Test with first 5 examples

    evaluator = BrainTeaserEvaluator(scorer)
    sample_metrics = evaluator.evaluate_dataset(sample_data)
    evaluator.display_results(sample_metrics)

    # Ask user if they want to run full evaluation
    print(f"\n❓ Run full evaluation on all {len(data_loader.all_data)} examples?")
    print("This may take several minutes depending on your hardware.")
    print("Uncomment the line below to run full evaluation:")
    print("# full_evaluator, full_metrics = run_roberta_evaluation()")
else:
    print("\n⚠️ Please load data first to run evaluation")


🧪 Testing with sample data...


Evaluating:   0%|          | 0/5 [00:00<?, ?it/s]


🎯 EVALUATION RESULTS
📊 Overall Performance:
  Instance Accuracy: 0.8000 (4/5)
  Sentence Puzzles: 0.8000 (4/5)
  Group Accuracy: 0.5000 (1/2)

❌ Error Analysis:
  Total Errors: 1
  Sentence Puzzle Errors: 1
  Word Puzzle Errors: 0

🔍 Sample Errors:

  Error 1 (SP):
    Question: Mr. and Mrs. Mustard have six daughters and each daughter has one brother. But there are only 9 peop...
    Predicted: Some daughters get married and have their own family.
    Correct: Each daughter shares the same brother.

❓ Run full evaluation on all 903 examples?
This may take several minutes depending on your hardware.
Uncomment the line below to run full evaluation:
# full_evaluator, full_metrics = run_roberta_evaluation()


In [22]:
# =====================================
# 7. UTILITY FUNCTIONS AND ANALYSIS
# =====================================

def analyze_prediction_patterns(evaluator: BrainTeaserEvaluator):
    """Analyze patterns in model predictions"""
    if not evaluator.results:
        print("❌ No results available for analysis")
        return

    print("\n🔍 Prediction Pattern Analysis")
    print("=" * 40)

    # Analyze answer distribution
    pred_distribution = {}
    true_distribution = {}

    for result in evaluator.results:
        pred = result['prediction']
        true = result['label']

        pred_distribution[pred] = pred_distribution.get(pred, 0) + 1
        true_distribution[true] = true_distribution.get(true, 0) + 1

    print("📊 Answer Distribution:")
    print("Position | Predicted | Actual | Difference")
    print("-" * 45)

    for i in range(4):
        pred_count = pred_distribution.get(i, 0)
        true_count = true_distribution.get(i, 0)
        diff = pred_count - true_count

        print(f"    {chr(65+i)}    |    {pred_count:3d}    |  {true_count:3d}   |    {diff:+3d}")

def create_results_visualization(evaluator: BrainTeaserEvaluator):
    """Create visualizations of the results"""
    if not evaluator.results:
        print("❌ No results available for visualization")
        return

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('RoBERTa BRAINTEASER Results Analysis', fontsize=16, fontweight='bold')

    # Task-wise accuracy
    sp_results = [r for r in evaluator.results if r['task_type'] == 'SP']
    wp_results = [r for r in evaluator.results if r['task_type'] == 'WP']

    sp_acc = sum(1 for r in sp_results if r['correct']) / len(sp_results) if sp_results else 0
    wp_acc = sum(1 for r in wp_results if r['correct']) / len(wp_results) if wp_results else 0

    task_names = ['Sentence Puzzles', 'Word Puzzles']
    task_accs = [sp_acc, wp_acc]

    axes[0, 0].bar(task_names, task_accs, color=['skyblue', 'lightcoral'])
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].set_title('Accuracy by Task Type')
    axes[0, 0].set_ylim(0, 1)

    # Add accuracy values on bars
    for i, v in enumerate(task_accs):
        axes[0, 0].text(i, v + 0.01, f'{v:.3f}', ha='center', va='bottom')

    # Prediction distribution
    pred_counts = [0, 0, 0, 0]
    for result in evaluator.results:
        if 0 <= result['prediction'] < 4:
            pred_counts[result['prediction']] += 1

    positions = ['A', 'B', 'C', 'D']
    axes[0, 1].bar(positions, pred_counts, color='lightgreen')
    axes[0, 1].set_ylabel('Count')
    axes[0, 1].set_title('Prediction Distribution')
    axes[0, 1].set_xlabel('Answer Choice')

    # Confidence vs Accuracy (using inverse of loss as confidence proxy)
    correct_results = [r for r in evaluator.results if r['correct']]
    incorrect_results = [r for r in evaluator.results if not r['correct']]

    axes[1, 0].hist([len(correct_results), len(incorrect_results)],
                   bins=2, labels=['Correct', 'Incorrect'],
                   color=['green', 'red'], alpha=0.7)
    axes[1, 0].set_ylabel('Count')
    axes[1, 0].set_title('Correct vs Incorrect Predictions')
    axes[1, 0].legend()

    # Question length vs accuracy
    question_lengths = [len(r['question'].split()) for r in evaluator.results]
    accuracies = [1 if r['correct'] else 0 for r in evaluator.results]

    # Bin by question length
    length_bins = np.arange(0, max(question_lengths) + 5, 5)
    binned_acc = []
    bin_centers = []

    for i in range(len(length_bins) - 1):
        mask = (np.array(question_lengths) >= length_bins[i]) & (np.array(question_lengths) < length_bins[i+1])
        if np.sum(mask) > 0:
            bin_acc = np.mean(np.array(accuracies)[mask])
            binned_acc.append(bin_acc)
            bin_centers.append((length_bins[i] + length_bins[i+1]) / 2)

    if bin_centers and binned_acc:
        axes[1, 1].plot(bin_centers, binned_acc, 'o-', color='purple', linewidth=2, markersize=6)
        axes[1, 1].set_xlabel('Question Length (words)')
        axes[1, 1].set_ylabel('Accuracy')
        axes[1, 1].set_title('Accuracy vs Question Length')
        axes[1, 1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def get_hardest_questions(evaluator: BrainTeaserEvaluator, n: int = 5):
    """Identify the hardest questions based on model performance"""
    if not evaluator.results:
        print("❌ No results available")
        return

    # Get incorrect predictions
    errors = [r for r in evaluator.results if not r['correct']]

    print(f"\n🔥 Top {min(n, len(errors))} Hardest Questions:")
    print("=" * 60)

    for i, error in enumerate(errors[:n]):
        print(f"\n{i+1}. Question ID: {error['id']} ({error['task_type']})")
        print(f"   Question: {error['question']}")
        print("   Choices:")
        for j, choice in enumerate(error['choice_list']):
            marker = "❌" if j == error['prediction'] else "✅" if j == error['label'] else "  "
            print(f"     {marker} ({chr(65+j)}) {choice}")
        print(f"   Model predicted: {error['predicted_choice']}")
        print(f"   Correct answer: {error['choice_list'][error['label']]}")
        print("-" * 60)

def compare_task_performance(evaluator: BrainTeaserEvaluator):
    """Compare performance between sentence and word puzzles"""
    if not evaluator.results:
        print("❌ No results available")
        return

    sp_results = [r for r in evaluator.results if r['task_type'] == 'SP']
    wp_results = [r for r in evaluator.results if r['task_type'] == 'WP']

    print("\n📊 Task Performance Comparison")
    print("=" * 40)

    if sp_results:
        sp_correct = sum(1 for r in sp_results if r['correct'])
        sp_accuracy = sp_correct / len(sp_results)
        sp_avg_length = np.mean([len(r['question'].split()) for r in sp_results])

        print(f"🧩 Sentence Puzzles:")
        print(f"   Accuracy: {sp_accuracy:.4f} ({sp_correct}/{len(sp_results)})")
        print(f"   Avg Question Length: {sp_avg_length:.1f} words")
        print(f"   Error Rate: {(1-sp_accuracy)*100:.1f}%")

    if wp_results:
        wp_correct = sum(1 for r in wp_results if r['correct'])
        wp_accuracy = wp_correct / len(wp_results)
        wp_avg_length = np.mean([len(r['question'].split()) for r in wp_results])

        print(f"\n🎯 Word Puzzles:")
        print(f"   Accuracy: {wp_accuracy:.4f} ({wp_correct}/{len(wp_results)})")
        print(f"   Avg Question Length: {wp_avg_length:.1f} words")
        print(f"   Error Rate: {(1-wp_accuracy)*100:.1f}%")

    if sp_results and wp_results:
        print(f"\n📈 Comparison:")
        better_task = "Sentence Puzzles" if sp_accuracy > wp_accuracy else "Word Puzzles"
        diff = abs(sp_accuracy - wp_accuracy)
        print(f"   Better Performance: {better_task}")
        print(f"   Performance Gap: {diff:.4f} ({diff*100:.1f}%)")

In [23]:
# =====================================
# 8. ADVANCED ANALYSIS FUNCTIONS
# =====================================

def analyze_error_patterns(evaluator: BrainTeaserEvaluator):
    """Detailed analysis of error patterns"""
    if not evaluator.results:
        print("❌ No results available")
        return

    print("\n🔍 Detailed Error Pattern Analysis")
    print("=" * 50)

    errors = [r for r in evaluator.results if not r['correct']]

    # Pattern 1: Question characteristics that lead to errors
    print("📝 Error-prone Question Characteristics:")

    # Length analysis
    error_lengths = [len(r['question'].split()) for r in errors]
    correct_lengths = [len(r['question'].split()) for r in evaluator.results if r['correct']]

    print(f"   Avg length of failed questions: {np.mean(error_lengths):.1f} words")
    print(f"   Avg length of correct questions: {np.mean(correct_lengths):.1f} words")

    # Keyword analysis
    error_keywords = {}
    correct_keywords = {}

    common_words = ['what', 'how', 'why', 'when', 'where', 'who', 'which', 'not', 'no', 'never']

    for word in common_words:
        error_count = sum(1 for r in errors if word.lower() in r['question'].lower())
        correct_count = sum(1 for r in evaluator.results if r['correct'] and word.lower() in r['question'].lower())

        total_with_word = error_count + correct_count
        if total_with_word > 0:
            error_rate = error_count / total_with_word
            print(f"   '{word}' error rate: {error_rate:.3f} ({error_count}/{total_with_word})")

    # Pattern 2: Prediction patterns
    print(f"\n🎯 Prediction Patterns:")
    print("   Most confused answer pairs:")

    confusion_matrix = {}
    for error in errors:
        true_label = error['label']
        pred_label = error['prediction']
        pair = (true_label, pred_label)
        confusion_matrix[pair] = confusion_matrix.get(pair, 0) + 1

    sorted_confusions = sorted(confusion_matrix.items(), key=lambda x: x[1], reverse=True)
    for (true_pos, pred_pos), count in sorted_confusions[:5]:
        print(f"   {chr(65+true_label)} → {chr(65+pred_pos)}: {count} times")

def benchmark_against_baselines(evaluator: BrainTeaserEvaluator):
    """Compare RoBERTa performance against known baselines"""
    if not evaluator.results:
        print("❌ No results available")
        return

    print("\n📊 Baseline Comparison")
    print("=" * 40)

    # Calculate our metrics
    total_correct = sum(1 for r in evaluator.results if r['correct'])
    total_questions = len(evaluator.results)
    our_accuracy = total_correct / total_questions

    # Known baselines from literature
    baselines = {
        'Random Baseline': 0.25,  # 4-choice multiple choice
        'Human Performance': 0.92,  # From original paper
        'ChatGPT (Zero-shot)': 0.575,  # Average of SP (60.77%) and WP (56.10%)
        'Competition Winner': 0.835,  # Average of best results (81.7% SP, 85.4% WP)
    }

    print("Model Comparison:")
    print("-" * 30)

    # Add our result
    baselines['RoBERTa-Large (Ours)'] = our_accuracy

    # Sort by performance
    sorted_baselines = sorted(baselines.items(), key=lambda x: x[1], reverse=True)

    for model, accuracy in sorted_baselines:
        bar_length = int(accuracy * 40)  # Scale to 40 characters
        bar = "█" * bar_length + "░" * (40 - bar_length)

        if model == 'RoBERTa-Large (Ours)':
            print(f"👉 {model:<25} {accuracy:.3f} |{bar}|")
        else:
            print(f"   {model:<25} {accuracy:.3f} |{bar}|")

    # Performance analysis
    human_gap = baselines['Human Performance'] - our_accuracy
    random_improvement = our_accuracy - baselines['Random Baseline']

    print(f"\n📈 Performance Analysis:")
    print(f"   Gap to Human Performance: {human_gap:.3f} ({human_gap*100:.1f}%)")
    print(f"   Improvement over Random: {random_improvement:.3f} ({random_improvement*100:.1f}%)")

    if our_accuracy > baselines['ChatGPT (Zero-shot)']:
        chatgpt_improvement = our_accuracy - baselines['ChatGPT (Zero-shot)']
        print(f"   ✅ Outperforms ChatGPT by: {chatgpt_improvement:.3f} ({chatgpt_improvement*100:.1f}%)")
    else:
        chatgpt_gap = baselines['ChatGPT (Zero-shot)'] - our_accuracy
        print(f"   ❌ ChatGPT advantage: {chatgpt_gap:.3f} ({chatgpt_gap*100:.1f}%)")

In [24]:
# =====================================
# 9. EXPORT AND REPORTING FUNCTIONS
# =====================================

def generate_comprehensive_report(evaluator: BrainTeaserEvaluator,
                                output_file: str = "roberta_brainteaser_report.html"):
    """Generate a comprehensive HTML report"""
    if not evaluator.results:
        print("❌ No results available")
        return

    # Calculate metrics
    total_correct = sum(1 for r in evaluator.results if r['correct'])
    total_questions = len(evaluator.results)
    accuracy = total_correct / total_questions

    sp_results = [r for r in evaluator.results if r['task_type'] == 'SP']
    wp_results = [r for r in evaluator.results if r['task_type'] == 'WP']

    sp_accuracy = sum(1 for r in sp_results if r['correct']) / len(sp_results) if sp_results else 0
    wp_accuracy = sum(1 for r in wp_results if r['correct']) / len(wp_results) if wp_results else 0

    # Generate HTML report
    html_content = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>RoBERTa BRAINTEASER Evaluation Report</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
            .metric {{ display: inline-block; margin: 10px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }}
            .correct {{ color: green; }}
            .incorrect {{ color: red; }}
            .example {{ margin: 10px 0; padding: 10px; border-left: 3px solid #ccc; }}
            table {{ border-collapse: collapse; width: 100%; }}
            th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            th {{ background-color: #f2f2f2; }}
        </style>
    </head>
    <body>
        <div class="header">
            <h1>🧠 RoBERTa BRAINTEASER Evaluation Report</h1>
            <p>Model: {config.model_name} | Device: {config.device}</p>
            <p>Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>

        <h2>📊 Overall Performance</h2>
        <div class="metric">
            <h3>Instance Accuracy</h3>
            <p><strong>{accuracy:.4f}</strong> ({total_correct}/{total_questions})</p>
        </div>

        <div class="metric">
            <h3>Sentence Puzzles</h3>
            <p><strong>{sp_accuracy:.4f}</strong> ({sum(1 for r in sp_results if r['correct'])}/{len(sp_results)})</p>
        </div>

        <div class="metric">
            <h3>Word Puzzles</h3>
            <p><strong>{wp_accuracy:.4f}</strong> ({sum(1 for r in wp_results if r['correct'])}/{len(wp_results)})</p>
        </div>

        <h2>❌ Error Analysis</h2>
        <p>Total Errors: {len([r for r in evaluator.results if not r['correct']])}</p>

        <h3>Sample Errors:</h3>
    """

    # Add error examples
    errors = [r for r in evaluator.results if not r['correct']][:5]
    for i, error in enumerate(errors):
        html_content += f"""
        <div class="example">
            <strong>Error {i+1} ({error['task_type']}):</strong><br>
            <strong>Q:</strong> {error['question']}<br>
            <strong>Predicted:</strong> <span class="incorrect">{error['predicted_choice']}</span><br>
            <strong>Correct:</strong> <span class="correct">{error['choice_list'][error['label']]}</span>
        </div>
        """

    # Add detailed results table
    html_content += """
        <h2>📋 Detailed Results</h2>
        <table>
            <tr>
                <th>ID</th>
                <th>Task</th>
                <th>Question</th>
                <th>Predicted</th>
                <th>Correct</th>
                <th>Status</th>
            </tr>
    """

    for result in evaluator.results[:20]:  # Show first 20 results
        status = "✅" if result['correct'] else "❌"
        question_preview = result['question'][:50] + "..." if len(result['question']) > 50 else result['question']

        html_content += f"""
            <tr>
                <td>{result['id']}</td>
                <td>{result['task_type']}</td>
                <td>{question_preview}</td>
                <td>{result['predicted_choice'][:30]}</td>
                <td>{result['choice_list'][result['label']][:30]}</td>
                <td>{status}</td>
            </tr>
        """

    html_content += """
        </table>
        <p><em>Showing first 20 results. See JSON export for complete results.</em></p>
    </body>
    </html>
    """

    # Write to file
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(html_content)

    print(f"✅ Comprehensive report saved to {output_file}")

In [25]:
# =====================================
# 10. INTERACTIVE USAGE EXAMPLES
# =====================================

def interactive_question_test():
    """Interactive function to test individual questions"""
    print("\n🎮 Interactive Question Testing")
    print("=" * 40)
    print("Enter a question and choices to test the RoBERTa model")
    print("Type 'quit' to exit")

    while True:
        print("\n" + "-" * 40)
        question = input("Enter question: ").strip()

        if question.lower() == 'quit':
            break

        if not question:
            continue

        print("Enter 4 choices (press Enter after each):")
        choices = []
        for i in range(4):
            choice = input(f"Choice {chr(65+i)}: ").strip()
            choices.append(choice)

        if len(choices) == 4 and all(choices):
            try:
                prediction = scorer.score_question(question, choices)
                predicted_choice = choices[prediction]

                print(f"\n🤖 Model Prediction:")
                print(f"   Predicted Answer: ({chr(65+prediction)}) {predicted_choice}")
                print(f"   Confidence Rank: {prediction + 1}/4")

            except Exception as e:
                print(f"❌ Error processing question: {e}")
        else:
            print("❌ Please provide all 4 choices")

def quick_start_demo():
    """Quick demonstration of key features"""
    print("\n🚀 RoBERTa BRAINTEASER Quick Start Demo")
    print("=" * 50)

    if not data_loaded:
        print("❌ Demo requires data to be loaded")
        return

    # Show model info
    roberta_model.get_model_info()

    # Show data stats
    stats = data_loader.get_data_statistics()
    print(f"\n📊 Loaded {stats['total_examples']} examples")

    # Test on a few examples
    print("\n🧪 Testing on sample questions...")
    sample_data = data_loader.all_data[:3]

    for i, example in enumerate(sample_data):
        print(f"\n📝 Example {i+1} ({example['task_type']}):")
        print(f"Q: {example['question']}")

        try:
            prediction = scorer.score_question(example['question'], example['choice_list'])
            predicted_choice = example['choice_list'][prediction]
            correct_choice = example['choice_list'][example['label']]
            is_correct = prediction == example['label']

            print(f"Model: ({chr(65+prediction)}) {predicted_choice}")
            print(f"Actual: ({chr(65+example['label'])}) {correct_choice}")
            print(f"Result: {'✅ Correct' if is_correct else '❌ Wrong'}")

        except Exception as e:
            print(f"❌ Error: {e}")

In [26]:
# =====================================
# 11. FINAL EXECUTION AND SUMMARY
# =====================================

print("\n" + "="*60)
print("🎯 ROBERTA BRAINTEASER IMPLEMENTATION READY")
print("="*60)

if data_loaded:
    print(f"✅ Data loaded: {len(data_loader.all_data)} examples")
    print(f"✅ Model ready: {config.model_name}")
    print(f"✅ Device: {config.device}")

    print("\n🚀 Available Functions:")
    print("   run_roberta_evaluation() - Run full evaluation")
    print("   quick_start_demo() - Quick demonstration")
    print("   interactive_question_test() - Test custom questions")
    print("   analyze_prediction_patterns(evaluator) - Analyze patterns")
    print("   create_results_visualization(evaluator) - Create plots")
    print("   generate_comprehensive_report(evaluator) - Generate HTML report")

    print("\n💡 Quick Start:")
    print("   evaluator, metrics = run_roberta_evaluation()")

    evaluator, metrics = run_roberta_evaluation()
    analyze_prediction_patterns(evaluator)
    generate_comprehensive_report(evaluator)

else:
    print("❌ Data not loaded. Please:")
    print("   1. Ensure data files exist:")
    print(f"      - {config.sentence_data_path}")
    print(f"      - {config.wordplay_data_path}")
    print("   2. Update config.sentence_data_path and config.wordplay_data_path")
    print("   3. Run: data_loader.load_data()")

print("\n📚 Implementation Features:")
print("   ✅ Clean, organized code structure")
print("   ✅ Comprehensive error handling")
print("   ✅ Detailed evaluation metrics")
print("   ✅ Interactive analysis tools")
print("   ✅ Visualization capabilities")
print("   ✅ Export functionality")
print("   ✅ Baseline comparisons")

print("\n" + "="*60)


🎯 ROBERTA BRAINTEASER IMPLEMENTATION READY
✅ Data loaded: 903 examples
✅ Model ready: roberta-large
✅ Device: cuda:0

🚀 Available Functions:
   run_roberta_evaluation() - Run full evaluation
   quick_start_demo() - Quick demonstration
   interactive_question_test() - Test custom questions
   analyze_prediction_patterns(evaluator) - Analyze patterns
   create_results_visualization(evaluator) - Create plots
   generate_comprehensive_report(evaluator) - Generate HTML report

💡 Quick Start:
   evaluator, metrics = run_roberta_evaluation()

🚀 Starting RoBERTa BRAINTEASER Evaluation


Evaluating:   0%|          | 0/903 [00:00<?, ?it/s]


🎯 EVALUATION RESULTS
📊 Overall Performance:
  Instance Accuracy: 0.3422 (309/903)
  Sentence Puzzles: 0.4517 (229/507)
  Word Puzzles: 0.2020 (80/396)
  Group Accuracy: 0.0789 (15/190)

❌ Error Analysis:
  Total Errors: 594
  Sentence Puzzle Errors: 278
  Word Puzzle Errors: 316

🔍 Sample Errors:

  Error 1 (SP):
    Question: Mr. and Mrs. Mustard have six daughters and each daughter has one brother. But there are only 9 peop...
    Predicted: Some daughters get married and have their own family.
    Correct: Each daughter shares the same brother.

  Error 2 (SP):
    Question: A woman shoots publicly at people at a National Park. The park is full of people, but no one gets ki...
    Predicted: The woman wanted to cause chaos.
    Correct: The woman was a photographer.

  Error 3 (SP):
    Question: Tom is a clean freak but he never dries his hair after a shower. How is this possible?...
    Predicted: None of above.
    Correct: This man is bald.
✅ Results exported to roberta_results