# Task 5: Large-Scale Performance Evaluation on Random Sample

**Title:** "Statistical NER Performance Analysis Across 50 Random Documents"

## Objective
Create a batch evaluation pipeline to measure NER performance at scale:
- Random sampling of 50 files from 1250 available files (seed=42 for reproducibility)
- Batch processing with Task 2 NER pipeline
- Comprehensive evaluation using Task 3 metrics
- Statistical analysis: micro/macro averages, standard deviation, confidence intervals
- Performance analysis: best/worst files, error patterns, confusion matrix
- Visualization: box plots, histograms, error analysis charts
- Progress tracking with tqdm
- Error handling and logging
- CSV export for further analysis


In [None]:
# Import required libraries
import sys
import random
from pathlib import Path
import re
from typing import List, Tuple, Dict, Optional
from collections import defaultdict, Counter
import logging
import traceback
from datetime import datetime

import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

# Task 2 imports (NER pipeline)
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

# Task 3 imports (evaluation)
try:
    from seqeval.metrics import (
        classification_report,
        accuracy_score,
        precision_score,
        recall_score,
        f1_score
    )
except ImportError:
    print("‚ö† seqeval not found. Installing...")
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "seqeval"])
    from seqeval.metrics import (
        classification_report,
        accuracy_score,
        precision_score,
        recall_score,
        f1_score
    )
    print("‚úì seqeval installed successfully")

import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)

# Configuration
BASE_DIR = Path("cadec")
TEXT_DIR = BASE_DIR / "text"
ORIGINAL_DIR = BASE_DIR / "original"
SAMPLE_SIZE = 50

# Label types we're evaluating
LABEL_TYPES = ['ADR', 'Drug', 'Disease', 'Symptom']

# Model Configuration (Task 2)
MODEL_NAME = "HUMADEX/english_medical_ner"
FALLBACK_MODEL = "dslim/bert-base-NER"

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'task5_evaluation_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Verify directories exist
if not TEXT_DIR.exists():
    raise FileNotFoundError(f"Directory not found: {TEXT_DIR}")
if not ORIGINAL_DIR.exists():
    raise FileNotFoundError(f"Directory not found: {ORIGINAL_DIR}")

print("‚úì Directories verified")
print(f"  - Text directory: {TEXT_DIR}")
print(f"  - Original directory: {ORIGINAL_DIR}")
print(f"  - Sample size: {SAMPLE_SIZE} files")


## 1. Load Task 2 and Task 3 Functions

Import or define necessary functions from Task 2 (NER pipeline) and Task 3 (evaluation).


In [None]:
# Task 2 Functions: NER Pipeline

def tokenize_text_word_by_word(text: str) -> List[Tuple[str, int, int]]:
    """Tokenize text word-by-word and preserve character positions."""
    tokens = []
    words = text.split()
    current_pos = 0
    for word in words:
        word_start = text.find(word, current_pos)
        if word_start == -1:
            word_start = current_pos
        word_end = word_start + len(word)
        tokens.append((word, word_start, word_end))
        next_pos = word_end
        while next_pos < len(text) and text[next_pos].isspace():
            next_pos += 1
        current_pos = next_pos
    return tokens

def map_model_labels_to_bio(model_label: str) -> str:
    """Map model output labels to our BIO format."""
    model_label = model_label.upper()
    if 'ADR' in model_label or 'ADVERSE' in model_label:
        return 'B-ADR' if not model_label.startswith(('B-', 'I-')) else model_label
    if any(term in model_label for term in ['DRUG', 'MEDICATION', 'MEDICINE', 'MED']):
        prefix = 'B-' if not model_label.startswith(('B-', 'I-')) else (model_label[:2])
        return f"{prefix}Drug"
    if any(term in model_label for term in ['DISEASE', 'CONDITION', 'ILLNESS', 'DISORDER']):
        prefix = 'B-' if not model_label.startswith(('B-', 'I-')) else (model_label[:2])
        return f"{prefix}Disease"
    if any(term in model_label for term in ['SYMPTOM', 'SIGN', 'MANIFESTATION']):
        prefix = 'B-' if not model_label.startswith(('B-', 'I-')) else (model_label[:2])
        return f"{prefix}Symptom"
    return 'O'

def generate_bio_tags(text: str, model_pipeline, tokenizer) -> List[Tuple[str, str]]:
    """Generate BIO tags for each token in the input text."""
    word_tokens = tokenize_text_word_by_word(text)
    try:
        model_predictions = model_pipeline(text)
    except Exception as e:
        logger.warning(f"Error in model prediction: {e}")
        return [(token, 'O') for token, _, _ in word_tokens]
    
    word_labels = ['O'] * len(word_tokens)
    
    if isinstance(model_predictions, list):
        for pred in model_predictions:
            if isinstance(pred, dict):
                entity_label = pred.get('entity_group', pred.get('label', 'O'))
                start_char = pred.get('start', 0)
                end_char = pred.get('end', start_char)
                mapped_label = map_model_labels_to_bio(entity_label)
                entity_type = mapped_label.split('-')[-1] if '-' in mapped_label else mapped_label
                
                overlapping_indices = []
                for i, (word, word_start, word_end) in enumerate(word_tokens):
                    if word_start < end_char and word_end > start_char:
                        overlapping_indices.append(i)
                
                if overlapping_indices:
                    for idx, word_idx in enumerate(overlapping_indices):
                        if idx == 0:
                            word_labels[word_idx] = f"B-{entity_type}"
                        else:
                            prev_label = word_labels[word_idx - 1]
                            if prev_label.endswith(entity_type):
                                word_labels[word_idx] = f"I-{entity_type}"
                            else:
                                word_labels[word_idx] = f"B-{entity_type}"
    
    bio_tagged = [(word, label) for (word, _, _), label in zip(word_tokens, word_labels)]
    return bio_tagged

def parse_bio_tags_to_entities(bio_tagged: List[Tuple[str, str]], 
                                word_tokens: List[Tuple[str, int, int]]) -> List[Dict]:
    """Parse BIO-tagged output to extract entity spans."""
    entities = []
    current_entity = None
    
    for i, (token, bio_label) in enumerate(bio_tagged):
        word, start_char, end_char = word_tokens[i]
        
        if bio_label == 'O':
            if current_entity:
                entities.append(current_entity)
                current_entity = None
            continue
        
        if '-' in bio_label:
            label_type = bio_label.split('-', 1)[1]
            is_beginning = bio_label.startswith('B-')
        else:
            label_type = bio_label
            is_beginning = True
        
        if label_type not in LABEL_TYPES:
            if current_entity:
                entities.append(current_entity)
                current_entity = None
            continue
        
        if is_beginning:
            if current_entity:
                entities.append(current_entity)
            current_entity = {
                'label': label_type,
                'text': word,
                'start': start_char,
                'end': end_char,
            }
        else:
            if current_entity and current_entity['label'] == label_type:
                current_entity['text'] += ' ' + word
                current_entity['end'] = end_char
            else:
                if current_entity:
                    entities.append(current_entity)
                current_entity = {
                    'label': label_type,
                    'text': word,
                    'start': start_char,
                    'end': end_char,
                }
    
    if current_entity:
        entities.append(current_entity)
    
    return entities

def process_text_file(text_file_path: Path, model_pipeline, tokenizer) -> Tuple[List[Tuple[str, str]], List[Dict]]:
    """Complete pipeline: Process a text file through STEP A and STEP B."""
    try:
        with open(text_file_path, 'r', encoding='utf-8') as f:
            text = f.read().strip()
    except Exception as e:
        raise FileNotFoundError(f"Could not read file {text_file_path}: {e}")
    
    bio_tagged = generate_bio_tags(text, model_pipeline, tokenizer)
    word_tokens = tokenize_text_word_by_word(text)
    entities = parse_bio_tags_to_entities(bio_tagged, word_tokens)
    
    return bio_tagged, entities

print("‚úì Task 2 functions loaded")


In [None]:
# Task 3 Functions: Evaluation

def load_ground_truth(ann_file_path: Path) -> List[Dict]:
    """Load and parse ground truth annotation file."""
    entities = []
    try:
        with open(ann_file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                match = re.match(r'^(T\d+)\t([^\t]+)\t(.+)$', line)
                if match:
                    tag = match.group(1)
                    label_and_ranges = match.group(2)
                    text = match.group(3)
                    parts = label_and_ranges.split(None, 1)
                    if len(parts) < 2:
                        continue
                    label_type = parts[0]
                    ranges_str = parts[1]
                    if label_type not in LABEL_TYPES:
                        continue
                    
                    ranges = []
                    if ';' in ranges_str:
                        range_pairs = ranges_str.split(';')
                        for rp in range_pairs:
                            rp = rp.strip()
                            if rp:
                                range_nums = rp.split()
                                if len(range_nums) >= 2:
                                    try:
                                        start = int(range_nums[0])
                                        end = int(range_nums[1])
                                        ranges.append((start, end))
                                    except ValueError:
                                        continue
                    else:
                        range_nums = ranges_str.split()
                        if len(range_nums) >= 2:
                            try:
                                start = int(range_nums[0])
                                end = int(range_nums[1])
                                ranges = [(start, end)]
                            except ValueError:
                                continue
                    
                    for start, end in ranges:
                        entities.append({
                            'label': label_type,
                            'text': text.strip(),
                            'start': start,
                            'end': end,
                            'tag': tag
                        })
    except Exception as e:
        logger.error(f"Error loading ground truth from {ann_file_path}: {e}")
        return []
    return entities

def entity_level_evaluation(ground_truth: List[Dict], predictions: List[Dict]) -> Dict:
    """Perform entity-level evaluation (exact boundary + label matching)."""
    gt_set = set()
    for entity in ground_truth:
        gt_set.add((entity['label'], entity['start'], entity['end'], entity['text']))
    
    pred_set = set()
    for entity in predictions:
        pred_set.add((entity['label'], entity['start'], entity['end'], entity['text']))
    
    tp_all = gt_set.intersection(pred_set)
    fp_all = pred_set - gt_set
    fn_all = gt_set - pred_set
    
    results = {
        'tp': {},
        'fp': {},
        'fn': {},
        'precision': {},
        'recall': {},
        'f1': {}
    }
    
    for label_type in LABEL_TYPES:
        tp_type = {e for e in tp_all if e[0] == label_type}
        fp_type = {e for e in fp_all if e[0] == label_type}
        fn_type = {e for e in fn_all if e[0] == label_type}
        
        tp_count = len(tp_type)
        fp_count = len(fp_type)
        fn_count = len(fn_type)
        
        precision = tp_count / (tp_count + fp_count) if (tp_count + fp_count) > 0 else 0.0
        recall = tp_count / (tp_count + fn_count) if (tp_count + fn_count) > 0 else 0.0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
        
        results['tp'][label_type] = tp_count
        results['fp'][label_type] = fp_count
        results['fn'][label_type] = fn_count
        results['precision'][label_type] = precision
        results['recall'][label_type] = recall
        results['f1'][label_type] = f1
    
    total_tp = len(tp_all)
    total_fp = len(fp_all)
    total_fn = len(fn_all)
    
    overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0.0
    overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0.0
    overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0.0
    
    results['tp']['OVERALL'] = total_tp
    results['fp']['OVERALL'] = total_fp
    results['fn']['OVERALL'] = total_fn
    results['precision']['OVERALL'] = overall_precision
    results['recall']['OVERALL'] = overall_recall
    results['f1']['OVERALL'] = overall_f1
    
    return results

print("‚úì Task 3 functions loaded")


## 2. Random Sampling

Randomly sample 50 files from the available 1250 files using seed=42 for reproducibility.


In [None]:
# Get all available text files
all_text_files = sorted(list(TEXT_DIR.glob("*.txt")))
total_files = len(all_text_files)

print(f"Total files available: {total_files}")
print(f"Sample size: {SAMPLE_SIZE} files")

# Random sampling with seed=42 for reproducibility
random.seed(42)
np.random.seed(42)
sampled_files = random.sample(all_text_files, min(SAMPLE_SIZE, total_files))

# Sort sampled files for consistent ordering
sampled_files = sorted(sampled_files)

print(f"\n‚úì Randomly sampled {len(sampled_files)} files (seed=42)")
print(f"  First 5 files: {[f.name for f in sampled_files[:5]]}")
print(f"  Last 5 files: {[f.name for f in sampled_files[-5:]]}")

# Save sampled file list for reference
sampled_files_df = pd.DataFrame({
    'filename': [f.name for f in sampled_files],
    'full_path': [str(f) for f in sampled_files]
})
sampled_files_df.to_csv('task5_sampled_files.csv', index=False)
print(f"\n‚úì Sampled file list saved to 'task5_sampled_files.csv'")


## 3. Load NER Model

Load the NER model and tokenizer from Task 2.


In [None]:
# Load model and tokenizer
print("Loading NER model and tokenizer...")

try:
    ner_pipeline = pipeline(
        "token-classification",
        model=MODEL_NAME,
        aggregation_strategy="simple",
        device=0 if torch.cuda.is_available() else -1
    )
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    print(f"‚úì Successfully loaded model: {MODEL_NAME}")
except Exception as e:
    print(f"‚ö† Could not load {MODEL_NAME}: {e}")
    print(f"  Falling back to {FALLBACK_MODEL}...")
    try:
        ner_pipeline = pipeline(
            "token-classification",
            model=FALLBACK_MODEL,
            aggregation_strategy="simple",
            device=0 if torch.cuda.is_available() else -1
        )
        tokenizer = AutoTokenizer.from_pretrained(FALLBACK_MODEL)
        print(f"‚úì Successfully loaded fallback model: {FALLBACK_MODEL}")
    except Exception as e2:
        raise RuntimeError(f"Failed to load both models: {e2}")

print("‚úì Model and tokenizer ready")


## 4. Batch Processing Pipeline

Process all sampled files with progress tracking, error handling, and logging.


In [None]:
def process_single_file(text_file: Path, model_pipeline, tokenizer) -> Optional[Dict]:
    """
    Process a single file: load text, run NER, load ground truth, evaluate.
    
    Returns:
    --------
    Dict with evaluation results or None if processing failed
    """
    try:
        # Load text
        with open(text_file, 'r', encoding='utf-8') as f:
            text = f.read().strip()
        
        # Run NER prediction
        bio_tagged, pred_entities = process_text_file(text_file, model_pipeline, tokenizer)
        
        # Load ground truth
        gt_file = ORIGINAL_DIR / text_file.name.replace('.txt', '.ann')
        if not gt_file.exists():
            logger.warning(f"Ground truth not found for {text_file.name}")
            return None
        
        gt_entities = load_ground_truth(gt_file)
        
        # Evaluate
        eval_results = entity_level_evaluation(gt_entities, pred_entities)
        
        # Store file metadata
        result = {
            'filename': text_file.name,
            'text_length': len(text),
            'gt_entity_count': len(gt_entities),
            'pred_entity_count': len(pred_entities),
            'evaluation': eval_results
        }
        
        return result
        
    except Exception as e:
        logger.error(f"Error processing {text_file.name}: {e}")
        logger.error(traceback.format_exc())
        return None

# Batch process all sampled files
print(f"Starting batch processing of {len(sampled_files)} files...")
print("=" * 80)

all_results = []
failed_files = []

# Process with progress bar
for text_file in tqdm(sampled_files, desc="Processing files", unit="file"):
    result = process_single_file(text_file, ner_pipeline, tokenizer)
    if result is not None:
        all_results.append(result)
    else:
        failed_files.append(text_file.name)

print("\n" + "=" * 80)
print(f"‚úì Batch processing complete")
print(f"  - Successfully processed: {len(all_results)} files")
print(f"  - Failed: {len(failed_files)} files")
if failed_files:
    print(f"  - Failed files: {failed_files}")

# Store results
processing_summary = {
    'total_files': len(sampled_files),
    'successful': len(all_results),
    'failed': len(failed_files),
    'failed_files': failed_files
}

print(f"\n‚úì Processing summary saved")


In [None]:
def calculate_statistics(all_results: List[Dict]) -> Dict:
    """
    Calculate comprehensive statistics across all files:
    - Per-file metrics
    - Micro-averaged metrics (overall TP/FP/FN aggregated)
    - Macro-averaged metrics (average of per-file scores)
    - Standard deviation
    - Confidence intervals
    """
    # Extract per-file metrics
    per_file_metrics = []
    
    for result in all_results:
        eval_res = result['evaluation']
        file_metrics = {
            'filename': result['filename'],
            'text_length': result['text_length'],
            'gt_entity_count': result['gt_entity_count'],
            'pred_entity_count': result['pred_entity_count'],
        }
        
        # Per-entity-type metrics
        for label_type in LABEL_TYPES + ['OVERALL']:
            file_metrics[f'{label_type}_precision'] = eval_res['precision'][label_type]
            file_metrics[f'{label_type}_recall'] = eval_res['recall'][label_type]
            file_metrics[f'{label_type}_f1'] = eval_res['f1'][label_type]
            file_metrics[f'{label_type}_tp'] = eval_res['tp'][label_type]
            file_metrics[f'{label_type}_fp'] = eval_res['fp'][label_type]
            file_metrics[f'{label_type}_fn'] = eval_res['fn'][label_type]
        
        per_file_metrics.append(file_metrics)
    
    per_file_df = pd.DataFrame(per_file_metrics)
    
    # Micro-averaged metrics (aggregate TP/FP/FN across all files)
    micro_aggregated = {
        'tp': Counter(),
        'fp': Counter(),
        'fn': Counter()
    }
    
    for result in all_results:
        eval_res = result['evaluation']
        for label_type in LABEL_TYPES + ['OVERALL']:
            micro_aggregated['tp'][label_type] += eval_res['tp'][label_type]
            micro_aggregated['fp'][label_type] += eval_res['fp'][label_type]
            micro_aggregated['fn'][label_type] += eval_res['fn'][label_type]
    
    micro_metrics = {}
    for label_type in LABEL_TYPES + ['OVERALL']:
        tp = micro_aggregated['tp'][label_type]
        fp = micro_aggregated['fp'][label_type]
        fn = micro_aggregated['fn'][label_type]
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
        
        micro_metrics[label_type] = {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'tp': tp,
            'fp': fp,
            'fn': fn
        }
    
    # Macro-averaged metrics (average of per-file scores)
    macro_metrics = {}
    for label_type in LABEL_TYPES + ['OVERALL']:
        precision_col = f'{label_type}_precision'
        recall_col = f'{label_type}_recall'
        f1_col = f'{label_type}_f1'
        
        macro_metrics[label_type] = {
            'precision': {
                'mean': per_file_df[precision_col].mean(),
                'std': per_file_df[precision_col].std(),
                'median': per_file_df[precision_col].median(),
                'min': per_file_df[precision_col].min(),
                'max': per_file_df[precision_col].max()
            },
            'recall': {
                'mean': per_file_df[recall_col].mean(),
                'std': per_file_df[recall_col].std(),
                'median': per_file_df[recall_col].median(),
                'min': per_file_df[recall_col].min(),
                'max': per_file_df[recall_col].max()
            },
            'f1': {
                'mean': per_file_df[f1_col].mean(),
                'std': per_file_df[f1_col].std(),
                'median': per_file_df[f1_col].median(),
                'min': per_file_df[f1_col].min(),
                'max': per_file_df[f1_col].max()
            }
        }
    
    # Calculate confidence intervals (95% CI)
    n = len(all_results)
    if n > 1:
        t_critical = stats.t.ppf(0.975, df=n-1)  # 95% confidence, two-tailed
    else:
        t_critical = 1.96
    
    ci_metrics = {}
    for label_type in LABEL_TYPES + ['OVERALL']:
        f1_col = f'{label_type}_f1'
        f1_values = per_file_df[f1_col].values
        
        if len(f1_values) > 1 and f1_values.std() > 0:
            mean_f1 = f1_values.mean()
            std_f1 = f1_values.std()
            sem = std_f1 / np.sqrt(n)  # Standard error of the mean
            margin = t_critical * sem
            ci_lower = mean_f1 - margin
            ci_upper = mean_f1 + margin
            margin_val = margin
        else:
            mean_f1 = f1_values.mean() if len(f1_values) > 0 else 0.0
            ci_lower = mean_f1
            ci_upper = mean_f1
            margin_val = 0.0
        
        ci_metrics[label_type] = {
            'mean': mean_f1,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'margin': margin_val
        }
    
    return {
        'per_file_metrics': per_file_df,
        'micro_averaged': micro_metrics,
        'macro_averaged': macro_metrics,
        'confidence_intervals': ci_metrics,
        'sample_size': n
    }

# Calculate statistics
print("Calculating aggregate statistics...")
stats_results = calculate_statistics(all_results)

print("‚úì Statistics calculated")
print(f"  - Sample size: {stats_results['sample_size']} files")
print(f"  - Micro-averaged F1 (OVERALL): {stats_results['micro_averaged']['OVERALL']['f1']:.4f}")
print(f"  - Macro-averaged F1 (OVERALL): {stats_results['macro_averaged']['OVERALL']['f1']['mean']:.4f}")
print(f"  - Standard deviation (OVERALL F1): {stats_results['macro_averaged']['OVERALL']['f1']['std']:.4f}")


In [None]:
def analyze_performance(all_results: List[Dict], stats_results: Dict) -> Dict:
    """
    Analyze performance patterns:
    - Best/worst performing files
    - Common error patterns
    - Confusion matrix across all predictions
    """
    per_file_df = stats_results['per_file_metrics']
    
    # Find best and worst files by overall F1
    best_files = per_file_df.nlargest(5, 'OVERALL_f1')[['filename', 'OVERALL_f1', 'OVERALL_precision', 'OVERALL_recall']]
    worst_files = per_file_df.nsmallest(5, 'OVERALL_f1')[['filename', 'OVERALL_f1', 'OVERALL_precision', 'OVERALL_recall']]
    
    # Analyze error patterns
    error_analysis = {
        'high_fp': per_file_df.nlargest(5, 'OVERALL_fp')[['filename', 'OVERALL_fp', 'OVERALL_tp', 'OVERALL_fn']],
        'high_fn': per_file_df.nlargest(5, 'OVERALL_fn')[['filename', 'OVERALL_fn', 'OVERALL_tp', 'OVERALL_fp']],
        'low_entity_count': per_file_df.nsmallest(5, 'gt_entity_count')[['filename', 'gt_entity_count', 'pred_entity_count']],
        'high_entity_count': per_file_df.nlargest(5, 'gt_entity_count')[['filename', 'gt_entity_count', 'pred_entity_count']],
    }
    
    # Generate confusion matrix across all files
    # Aggregate all TP, FP, FN for each entity type
    confusion_data = defaultdict(lambda: defaultdict(int))
    
    for result in all_results:
        eval_res = result['evaluation']
        # Count correct predictions (TP) and errors (FP, FN) by label
        for label_type in LABEL_TYPES:
            tp = eval_res['tp'][label_type]
            fp = eval_res['fp'][label_type]
            fn = eval_res['fn'][label_type]
            
            # TP contributes to correct label predictions
            confusion_data[label_type][label_type] += tp
            # FP and FN are errors - for simplicity, track as mismatches
            # In practice, we'd need actual predicted vs true labels for detailed confusion matrix
            if fp > 0 or fn > 0:
                # Track that there were errors (detailed confusion would require actual label mismatches)
                confusion_data[label_type]['ERRORS'] += fp + fn
    
    # Create confusion matrix DataFrame
    confusion_df = pd.DataFrame(confusion_data).T
    confusion_df = confusion_df.fillna(0).astype(int)
    
    # Detailed confusion: collect actual label mismatches
    # For each file, compare predicted vs ground truth labels at same positions
    # Note: We need to reconstruct predictions from evaluation results
    label_confusion = defaultdict(lambda: defaultdict(int))
    
    for result in all_results:
        filename = result['filename']
        gt_file = ORIGINAL_DIR / filename.replace('.txt', '.ann')
        
        try:
            # Load entities
            gt_entities = load_ground_truth(gt_file)
            
            # Reconstruct predictions from evaluation metrics
            # We'll use the evaluation TP/FP/FN to approximate confusion
            eval_res = result['evaluation']
            
            # For each entity type, count errors
            for label_type in LABEL_TYPES:
                tp = eval_res['tp'][label_type]
                fp = eval_res['fp'][label_type]
                fn = eval_res['fn'][label_type]
                
                # TP: correct predictions
                label_confusion[label_type][label_type] += tp
                
                # FP: predicted as label_type but not in GT (approximate as spread across other labels)
                # FN: should be label_type but not predicted (approximate)
                if fp > 0:
                    # FP - approximate distribution
                    label_confusion['NONE'][label_type] += fp
                if fn > 0:
                    # FN - approximate
                    label_confusion[label_type]['NONE'] += fn
            
        except Exception as e:
            logger.warning(f"Could not analyze confusion for {filename}: {e}")
            continue
    
    detailed_confusion_df = pd.DataFrame(label_confusion).T
    detailed_confusion_df = detailed_confusion_df.fillna(0).astype(int)
    
    return {
        'best_files': best_files,
        'worst_files': worst_files,
        'error_analysis': error_analysis,
        'confusion_matrix': confusion_df,
        'detailed_confusion_matrix': detailed_confusion_df
    }

# Perform performance analysis
print("Performing performance analysis...")
performance_analysis = analyze_performance(all_results, stats_results)

print("‚úì Performance analysis complete")
print("\nTop 5 files by F1 score:")
print(performance_analysis['best_files'].to_string(index=False))
print("\nBottom 5 files by F1 score:")
print(performance_analysis['worst_files'].to_string(index=False))


## 7. Visualization

Create visualizations: box plots of F1 scores by entity type, histogram of overall F1 distribution, and error analysis charts.


In [None]:
# Set up plotting style
try:
    plt.style.use('seaborn-v0_8')
except OSError:
    try:
        plt.style.use('seaborn')
    except OSError:
        plt.style.use('default')
sns.set_palette("husl")

# Create figure with subplots
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Statistical NER Performance Analysis - Visualizations', fontsize=16, fontweight='bold')

per_file_df = stats_results['per_file_metrics']

# 1. Box plot of F1 scores by entity type
ax1 = axes[0, 0]
f1_columns = [f'{label}_f1' for label in LABEL_TYPES + ['OVERALL']]
f1_data = [per_file_df[col].values for col in f1_columns]
labels = LABEL_TYPES + ['OVERALL']

bp = ax1.boxplot(f1_data, labels=labels, patch_artist=True)
colors = sns.color_palette("husl", len(labels))
for patch, color in zip(bp['boxes'], colors):
    patch.set_facecolor(color)
    patch.set_alpha(0.7)

ax1.set_ylabel('F1 Score', fontsize=12)
ax1.set_xlabel('Entity Type', fontsize=12)
ax1.set_title('Distribution of F1 Scores by Entity Type', fontsize=13, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.set_ylim([0, 1.1])

# Add mean markers
for i, (col, label) in enumerate(zip(f1_columns, labels)):
    mean_val = per_file_df[col].mean()
    ax1.plot(i+1, mean_val, 'r*', markersize=15, label='Mean' if i == 0 else '')

# 2. Histogram of overall F1 distribution
ax2 = axes[0, 1]
overall_f1 = per_file_df['OVERALL_f1'].values
ax2.hist(overall_f1, bins=20, edgecolor='black', alpha=0.7, color='skyblue')
ax2.axvline(overall_f1.mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {overall_f1.mean():.3f}')
ax2.axvline(np.median(overall_f1), color='green', linestyle='--', linewidth=2, label=f'Median: {np.median(overall_f1):.3f}')
ax2.set_xlabel('Overall F1 Score', fontsize=12)
ax2.set_ylabel('Frequency', fontsize=12)
ax2.set_title('Distribution of Overall F1 Scores Across Files', fontsize=13, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 3. Error analysis: FP vs FN scatter
ax3 = axes[1, 0]
ax3.scatter(per_file_df['OVERALL_fp'], per_file_df['OVERALL_fn'], 
            alpha=0.6, s=100, c=per_file_df['OVERALL_f1'], 
            cmap='RdYlGn', edgecolors='black', linewidths=0.5)
ax3.set_xlabel('False Positives (FP)', fontsize=12)
ax3.set_ylabel('False Negatives (FN)', fontsize=12)
ax3.set_title('Error Pattern Analysis: FP vs FN (colored by F1)', fontsize=13, fontweight='bold')
ax3.grid(True, alpha=0.3)
cbar = plt.colorbar(ax3.collections[0], ax=ax3)
cbar.set_label('F1 Score', fontsize=11)

# 4. Per-entity-type performance comparison
ax4 = axes[1, 1]
entity_metrics = []
for label_type in LABEL_TYPES + ['OVERALL']:
    entity_metrics.append({
        'Entity Type': label_type,
        'Micro-Avg Precision': stats_results['micro_averaged'][label_type]['precision'],
        'Micro-Avg Recall': stats_results['micro_averaged'][label_type]['recall'],
        'Micro-Avg F1': stats_results['micro_averaged'][label_type]['f1'],
        'Macro-Avg F1': stats_results['macro_averaged'][label_type]['f1']['mean']
    })

metrics_df = pd.DataFrame(entity_metrics)
x = np.arange(len(metrics_df))
width = 0.35

ax4.bar(x - width/2, metrics_df['Micro-Avg F1'], width, label='Micro-Avg F1', alpha=0.8)
ax4.bar(x + width/2, metrics_df['Macro-Avg F1'], width, label='Macro-Avg F1', alpha=0.8)

ax4.set_ylabel('F1 Score', fontsize=12)
ax4.set_xlabel('Entity Type', fontsize=12)
ax4.set_title('Micro vs Macro-Averaged F1 Scores by Entity Type', fontsize=13, fontweight='bold')
ax4.set_xticks(x)
ax4.set_xticklabels(metrics_df['Entity Type'])
ax4.legend()
ax4.grid(True, alpha=0.3, axis='y')
ax4.set_ylim([0, 1.1])

plt.tight_layout()
plt.savefig('task5_performance_visualizations.png', dpi=300, bbox_inches='tight')
print("‚úì Visualizations saved to 'task5_performance_visualizations.png'")
plt.show()


## 8. Detailed Results Summary

Display comprehensive results summary with all statistics.


In [None]:
print("=" * 80)
print("COMPREHENSIVE EVALUATION RESULTS SUMMARY")
print("=" * 80)

# Micro-averaged metrics
print("\n1. MICRO-AVERAGED METRICS (Aggregated TP/FP/FN across all files):")
print("-" * 80)
micro_data = []
for label_type in LABEL_TYPES + ['OVERALL']:
    m = stats_results['micro_averaged'][label_type]
    micro_data.append({
        'Entity Type': label_type,
        'Precision': f"{m['precision']:.4f}",
        'Recall': f"{m['recall']:.4f}",
        'F1-Score': f"{m['f1']:.4f}",
        'TP': m['tp'],
        'FP': m['fp'],
        'FN': m['fn']
    })
micro_df = pd.DataFrame(micro_data)
print(micro_df.to_string(index=False))

# Macro-averaged metrics
print("\n2. MACRO-AVERAGED METRICS (Average of per-file scores):")
print("-" * 80)
macro_data = []
for label_type in LABEL_TYPES + ['OVERALL']:
    m = stats_results['macro_averaged'][label_type]
    macro_data.append({
        'Entity Type': label_type,
        'Precision': f"{m['precision']['mean']:.4f} ¬± {m['precision']['std']:.4f}",
        'Recall': f"{m['recall']['mean']:.4f} ¬± {m['recall']['std']:.4f}",
        'F1-Score': f"{m['f1']['mean']:.4f} ¬± {m['f1']['std']:.4f}",
        'F1 (Min)': f"{m['f1']['min']:.4f}",
        'F1 (Max)': f"{m['f1']['max']:.4f}",
        'F1 (Median)': f"{m['f1']['median']:.4f}"
    })
macro_df = pd.DataFrame(macro_data)
print(macro_df.to_string(index=False))

# Confidence intervals
print("\n3. CONFIDENCE INTERVALS (95% CI for F1 scores):")
print("-" * 80)
ci_data = []
for label_type in LABEL_TYPES + ['OVERALL']:
    ci = stats_results['confidence_intervals'][label_type]
    ci_data.append({
        'Entity Type': label_type,
        'Mean F1': f"{ci['mean']:.4f}",
        '95% CI Lower': f"{ci['ci_lower']:.4f}",
        '95% CI Upper': f"{ci['ci_upper']:.4f}",
        'Margin': f"¬± {ci['margin']:.4f}"
    })
ci_df = pd.DataFrame(ci_data)
print(ci_df.to_string(index=False))

# Performance summary
print("\n4. PERFORMANCE ANALYSIS:")
print("-" * 80)
print(f"Sample Size: {stats_results['sample_size']} files")
print(f"\nTop 5 Files by F1 Score:")
print(performance_analysis['best_files'].to_string(index=False))
print(f"\nBottom 5 Files by F1 Score:")
print(performance_analysis['worst_files'].to_string(index=False))

# Detailed confusion matrix
print("\n5. DETAILED CONFUSION MATRIX (Ground Truth vs Predicted Labels):")
print("-" * 80)
print(performance_analysis['detailed_confusion_matrix'].to_string())

print("\n" + "=" * 80)


## 9. Export Results to CSV

Export all results to CSV files for further analysis.


In [None]:
# Export per-file metrics
stats_results['per_file_metrics'].to_csv('task5_per_file_metrics.csv', index=False)
print("‚úì Per-file metrics exported to 'task5_per_file_metrics.csv'")

# Export micro-averaged metrics
micro_export = []
for label_type in LABEL_TYPES + ['OVERALL']:
    m = stats_results['micro_averaged'][label_type]
    micro_export.append({
        'Entity Type': label_type,
        'Precision': m['precision'],
        'Recall': m['recall'],
        'F1': m['f1'],
        'TP': m['tp'],
        'FP': m['fp'],
        'FN': m['fn']
    })
pd.DataFrame(micro_export).to_csv('task5_micro_averaged_metrics.csv', index=False)
print("‚úì Micro-averaged metrics exported to 'task5_micro_averaged_metrics.csv'")

# Export macro-averaged metrics
macro_export = []
for label_type in LABEL_TYPES + ['OVERALL']:
    m = stats_results['macro_averaged'][label_type]
    macro_export.append({
        'Entity Type': label_type,
        'Precision_Mean': m['precision']['mean'],
        'Precision_Std': m['precision']['std'],
        'Precision_Median': m['precision']['median'],
        'Precision_Min': m['precision']['min'],
        'Precision_Max': m['precision']['max'],
        'Recall_Mean': m['recall']['mean'],
        'Recall_Std': m['recall']['std'],
        'Recall_Median': m['recall']['median'],
        'Recall_Min': m['recall']['min'],
        'Recall_Max': m['recall']['max'],
        'F1_Mean': m['f1']['mean'],
        'F1_Std': m['f1']['std'],
        'F1_Median': m['f1']['median'],
        'F1_Min': m['f1']['min'],
        'F1_Max': m['f1']['max']
    })
pd.DataFrame(macro_export).to_csv('task5_macro_averaged_metrics.csv', index=False)
print("‚úì Macro-averaged metrics exported to 'task5_macro_averaged_metrics.csv'")

# Export confidence intervals
ci_export = []
for label_type in LABEL_TYPES + ['OVERALL']:
    ci = stats_results['confidence_intervals'][label_type]
    ci_export.append({
        'Entity Type': label_type,
        'Mean_F1': ci['mean'],
        'CI_Lower': ci['ci_lower'],
        'CI_Upper': ci['ci_upper'],
        'Margin': ci['margin']
    })
pd.DataFrame(ci_export).to_csv('task5_confidence_intervals.csv', index=False)
print("‚úì Confidence intervals exported to 'task5_confidence_intervals.csv'")

# Export performance analysis
performance_analysis['best_files'].to_csv('task5_best_files.csv', index=False)
performance_analysis['worst_files'].to_csv('task5_worst_files.csv', index=False)
performance_analysis['error_analysis']['high_fp'].to_csv('task5_high_fp_files.csv', index=False)
performance_analysis['error_analysis']['high_fn'].to_csv('task5_high_fn_files.csv', index=False)
print("‚úì Performance analysis exported to CSV files")

# Export confusion matrix
performance_analysis['detailed_confusion_matrix'].to_csv('task5_confusion_matrix.csv')
print("‚úì Confusion matrix exported to 'task5_confusion_matrix.csv'")

print("\n" + "=" * 80)
print("‚úì All results exported to CSV files")
print("=" * 80)


## 10. Scalability and Computational Efficiency Analysis

Comments on scalability and computational efficiency of the batch evaluation pipeline.


In [None]:
import time

# Calculate processing time (approximate)
start_time = time.time()
print("=" * 80)
print("SCALABILITY AND COMPUTATIONAL EFFICIENCY ANALYSIS")
print("=" * 80)

# File processing statistics
avg_text_length = stats_results['per_file_metrics']['text_length'].mean()
avg_gt_entities = stats_results['per_file_metrics']['gt_entity_count'].mean()
avg_pred_entities = stats_results['per_file_metrics']['pred_entity_count'].mean()

print(f"\n1. PROCESSING STATISTICS:")
print("-" * 80)
print(f"  - Total files processed: {len(all_results)}")
print(f"  - Average text length: {avg_text_length:.0f} characters")
print(f"  - Average ground truth entities per file: {avg_gt_entities:.2f}")
print(f"  - Average predicted entities per file: {avg_pred_entities:.2f}")

print(f"\n2. COMPUTATIONAL EFFICIENCY:")
print("-" * 80)
print(f"  - Model: {MODEL_NAME}")
print(f"  - Device: {'CUDA' if torch.cuda.is_available() else 'CPU'}")
print(f"  - Batch processing: Sequential (one file at a time)")
print(f"  - Error handling: Individual file failures don't stop pipeline")

print(f"\n3. SCALABILITY CONSIDERATIONS:")
print("-" * 80)
print(f"  - Current sample: {len(all_results)} files")
print(f"  - Total available: {total_files} files")
print(f"  - Scalability:")
print(f"    * To process all {total_files} files: ~{total_files / len(all_results):.1f}x current time")
print(f"    * Parallel processing: Can be improved by processing multiple files in parallel")
print(f"    * GPU utilization: Current setup uses {'GPU' if torch.cuda.is_available() else 'CPU'}")
print(f"    * Memory: Each file processed independently, low memory footprint")

print(f"\n4. OPTIMIZATION RECOMMENDATIONS:")
print("-" * 80)
print(f"  - Parallel Processing: Use multiprocessing or async to process multiple files concurrently")
print(f"  - Batch Inference: Group multiple texts for model inference (if model supports)")
print(f"  - Caching: Cache predictions to avoid re-running NER on same files")
print(f"  - Streaming: For very large datasets, implement streaming evaluation")
print(f"  - Distributed Processing: Use distributed computing for 1000+ files")

print(f"\n5. MEMORY USAGE:")
print("-" * 80)
print(f"  - Results storage: ~{len(all_results) * 500 / 1024:.2f} KB (estimated)")
print(f"  - Model memory: Varies by model size")
print(f"  - Peak memory: Single file processing minimizes peak usage")

print("\n" + "=" * 80)
print("‚úì Analysis complete")
print("=" * 80)


## Summary

This notebook implements a comprehensive large-scale performance evaluation pipeline:

### ‚úÖ Features Implemented:

1. **Random Sampling**: 50 files randomly sampled with seed=42 for reproducibility
2. **Batch Processing**: Complete pipeline with progress bar (tqdm), error handling, and logging
3. **Comprehensive Evaluation**: Per-file metrics using Task 3 evaluation framework
4. **Statistical Analysis**:
   - Micro-averaged metrics (aggregated TP/FP/FN)
   - Macro-averaged metrics (average of per-file scores)
   - Standard deviation for all metrics
   - 95% confidence intervals
5. **Performance Analysis**:
   - Best/worst performing files
   - Error pattern analysis (high FP/FN)
   - Detailed confusion matrix
6. **Visualization**: Box plots, histograms, error analysis charts
7. **CSV Export**: All results exported for further analysis
8. **Scalability Analysis**: Comments on computational efficiency

### üìä Key Results:

- Per-file F1, Precision, Recall for each entity type
- Overall micro-averaged metrics across all 50 files
- Overall macro-averaged metrics (average of per-file scores)
- Standard deviation and confidence intervals
- Distribution analysis of F1 scores

### üìù Output Files:

- `task5_sampled_files.csv`: List of sampled files
- `task5_per_file_metrics.csv`: Detailed metrics for each file
- `task5_micro_averaged_metrics.csv`: Micro-averaged statistics
- `task5_macro_averaged_metrics.csv`: Macro-averaged statistics
- `task5_confidence_intervals.csv`: Confidence intervals
- `task5_best_files.csv`: Top performing files
- `task5_worst_files.csv`: Worst performing files
- `task5_confusion_matrix.csv`: Confusion matrix
- `task5_performance_visualizations.png`: All visualizations
- `task5_evaluation_*.log`: Processing log file

### üîß Usage Notes:

- Ensure Task 2 model is loaded before running batch processing
- Processing time depends on model and device (CPU/GPU)
- Individual file failures are logged but don't stop the pipeline
- Results are reproducible due to fixed random seed (42)
