In [None]:
# Run this in a new cell in your notebook
import nbformat

# Load your notebook
with open('your_notebook.ipynb', 'r') as f:
    nb = nbformat.read(f, as_version=4)

# Clear problematic metadata
if 'widgets' in nb.metadata:
    del nb.metadata['widgets']

# Save cleaned notebook
with open('your_notebook.ipynb', 'w') as f:
    nbformat.write(nb, f)

In [None]:
# Select your csv file
from google.colab import files
uploaded = files.upload()

import pandas as pd
# Load the BAE CSV into a DataFrame
df = pd.read_csv('BAE_results.csv')
# Preview the data
print("Data preview:")
df.head()

In [None]:
# INDUSTRY-LEVEL BERT FINE-TUNING FOR ADVERSARIAL ROBUSTNESS

# Install dependencies
!pip install -q transformers datasets scikit-learn wandb matplotlib seaborn
!pip install -U transformers --quiet

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_dataset, Dataset
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.utils.class_weight import compute_class_weight
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    Trainer, TrainingArguments, EarlyStoppingCallback,
    pipeline, DataCollatorWithPadding
)
import torch
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')

# Set seeds for reproducibility
import random
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

from sklearn.isotonic import IsotonicRegression
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV

In [None]:
# DATA PREPARATION WITH RIGOROUS QUALITY CONTROL
print("DATA PREPARATION AND QUALITY ANALYSIS")
print("-" * 50)

# Load clean IMDB data
print("Loading clean IMDB dataset...")
clean = load_dataset("imdb", split="train")
clean_df = pd.DataFrame({"text": clean['text'], "label": clean['label']})

# Quality filtering for clean data
def filter_quality_text(df, min_length=50, max_length=2000):
    """Filter texts by length and basic quality checks"""
    initial_count = len(df)

    # Length filtering
    df = df[(df['text'].str.len() >= min_length) & (df['text'].str.len() <= max_length)]

    # Remove texts with too many special characters (potential corruption)
    df = df[df['text'].str.count(r'[^\w\s]') / df['text'].str.len() < 0.1]

    print(f"Quality filtering: {initial_count} -> {len(df)} samples ({len(df)/initial_count:.1%} retained)")
    return df

clean_df = filter_quality_text(clean_df)

# Load adversarial data with validation
print("\nLoading and validating adversarial data...")
try:
    adv = pd.read_csv('BAE_results.csv')
    print(f"Adversarial data loaded: {len(adv)} samples")
    print(f"CSV columns found: {list(adv.columns)}")
    print(f"Result type distribution:\n{adv['result_type'].value_counts()}")

    # Extract successful adversarial examples (your CSV uses 'Successful', not 'successful')
    adv_success = adv[adv['result_type'] == 'Successful'][['perturbed_text', 'ground_truth_output']].copy()
    adv_success.columns = ['text', 'label']  # Rename to match expected format

    print(f"Found {len(adv_success)} successful adversarial examples")

    if len(adv_success) > 0:
        # Clean the data
        print("Cleaning adversarial data...")
        initial_count = len(adv_success)

        # Remove empty texts
        adv_success = adv_success.dropna(subset=['text'])
        adv_success = adv_success[adv_success['text'].str.strip() != '']

        # Ensure labels are integers (they should already be 0/1)
        adv_success['label'] = adv_success['label'].astype(int)

        # Apply quality filtering (same as clean data)
        adv_success = filter_quality_text(adv_success)

        print(f"After cleaning: {initial_count} -> {len(adv_success)} samples")
        print(f"Label distribution: {adv_success['label'].value_counts().to_dict()}")

        # Show a few examples
        print(f"\nSample adversarial examples:")
        for i, row in adv_success.head(3).iterrows():
            print(f"  {i+1}. Label: {row['label']}")
            print(f"     Text: {row['text'][:150]}...")
            print()
    else:
        print("No successful adversarial examples found!")

except Exception as e:
    print(f"Error loading adversarial data: {e}")
    print("Proceeding with clean data only...")
    adv_success = pd.DataFrame(columns=['text', 'label'])

In [None]:
# BALANCED DATASET CREATION

# Create balanced clean dataset
n_clean_per_class = 2000  # Increase for better performance
print(f"\nCreating balanced dataset with {n_clean_per_class} samples per class from clean data...")

pos_clean = clean_df[clean_df['label'] == 1].sample(n=min(n_clean_per_class, len(clean_df[clean_df['label'] == 1])), random_state=42)
neg_clean = clean_df[clean_df['label'] == 0].sample(n=min(n_clean_per_class, len(clean_df[clean_df['label'] == 0])), random_state=42)
balanced_clean = pd.concat([pos_clean, neg_clean], ignore_index=True)

def create_negation_examples(original_df, n_examples=500):
    """Create negation-aware training examples"""
    negation_patterns = [
        # Double negatives that should be positive
        ("This movie is not bad", 1),
        ("The film wasn't terrible", 1),
        ("I don't hate this movie", 1),
        ("Not a bad film", 1),
        ("Doesn't suck", 1),
        ("Not terrible at all", 1),
        ("Can't complain about this movie", 1),
        ("Not disappointing", 1),
        ("Wasn't boring", 1),
        ("Don't regret watching", 1),

        # Clear negatives
        ("This movie is not good", 0),
        ("I don't like this film", 0),
        ("Not worth watching", 0),
        ("Don't recommend", 0),
        ("Not impressive", 0),
        ("Wasn't entertaining", 0),
        ("Can't stand this movie", 0),
        ("Not enjoyable", 0),
        ("Don't bother", 0),
        ("Not worth the time", 0),
    ]

    # Create variations
    negation_data = []
    for pattern, label in negation_patterns:
        # Create multiple variations
        variations = [
            pattern,
            pattern + ".",
            pattern + " at all.",
            "Honestly, " + pattern.lower(),
            "I think " + pattern.lower(),
            "In my opinion, " + pattern.lower(),
        ]

        for variation in variations:
            negation_data.append({"text": variation, "label": label})

    negation_df = pd.DataFrame(negation_data)
    print(f"Created {len(negation_df)} negation-aware examples")
    print(f"Label distribution: {negation_df['label'].value_counts().to_dict()}")

    return negation_df

# Create and add negation examples
print("\n ENHANCEMENT: Adding negation-aware training data...")
negation_df = create_negation_examples(balanced_clean)

# Limit negation to 10% of total to prevent overfitting
max_negation = len(balanced_clean) // 10
if len(negation_df) > max_negation:
    negation_df = negation_df.sample(max_negation, random_state=42)
    print(f"Limited negation examples to {len(negation_df)} samples")

# Handle adversarial data imbalance
if len(adv_success) > 0:
    adv_label_counts = adv_success['label'].value_counts()
    print(f"Adversarial data distribution: {dict(adv_label_counts)}")

    # Limit adversarial examples to prevent dominance (max 20% of total)
    max_adv_samples = int(len(balanced_clean) * 0.2)

    if len(adv_success) > max_adv_samples:
        # Sample adversarial examples proportionally
        adv_success = adv_success.groupby('label').apply(
            lambda x: x.sample(min(len(x), max_adv_samples // 2), random_state=42)
        ).reset_index(drop=True)
        print(f"Limited adversarial samples to {len(adv_success)} to prevent overfitting")

    # Combine datasets - ADD NEGATION DATA HERE
    combined_df = pd.concat([balanced_clean, negation_df, adv_success], ignore_index=True)
else:
    # Combine datasets without adversarial data - ADD NEGATION DATA HERE
    combined_df = pd.concat([balanced_clean, negation_df], ignore_index=True)

In [None]:
# TRAIN/VALIDATION/TEST SPLIT WITH STRATIFICATION
print("\nCreating train/validation/test splits...")

# First split: separate test set (20%)
train_val_df, test_df = train_test_split(
    combined_df, test_size=0.2, random_state=42,
    stratify=combined_df['label']
)

# Second split: train/validation (80% train, 20% val of remaining)
train_df, val_df = train_test_split(
    train_val_df, test_size=0.25, random_state=42,  # 0.25 * 0.8 = 0.2 of total
    stratify=train_val_df['label']
)

print(f"Training samples: {len(train_df)} (pos: {sum(train_df['label'])}, neg: {len(train_df) - sum(train_df['label'])})")
print(f"Validation samples: {len(val_df)} (pos: {sum(val_df['label'])}, neg: {len(val_df) - sum(val_df['label'])})")
print(f"Test samples: {len(test_df)} (pos: {sum(test_df['label'])}, neg: {len(test_df) - sum(test_df['label'])})")

In [None]:
# TOKENIZATION AND DATASET PREPARATION

print("\nTokenizing datasets...")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        truncation=True,
        padding=False,  # Dynamic padding is more efficient
        max_length=512  # Use full BERT capacity
    )

# Create datasets
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Tokenize
train_dataset = train_dataset.map(tokenize_function, batched=True, remove_columns=["text"])
val_dataset = val_dataset.map(tokenize_function, batched=True, remove_columns=["text"])
test_dataset = test_dataset.map(tokenize_function, batched=True, remove_columns=["text"])

# Rename labels column
train_dataset = train_dataset.rename_column("label", "labels")
val_dataset = val_dataset.rename_column("label", "labels")
test_dataset = test_dataset.rename_column("label", "labels")

print("Tokenization complete!")

In [None]:
# MODEL SETUP WITH INDUSTRY BEST PRACTICES
print("\nSetting up model with optimal configuration...")

# Load model
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased",
    num_labels=2,
    problem_type="single_label_classification"
)

# Calculate class weights for imbalanced data
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_df['label']),
    y=train_df['label']
)
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}
print(f"Class weights: {class_weights_dict}")

# Custom trainer with class weights - FIXED VERSION
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Apply class weights
        weight_tensor = torch.tensor([class_weights_dict[i] for i in range(len(class_weights_dict))],
                                   dtype=torch.float, device=logits.device)
        loss_fct = torch.nn.CrossEntropyLoss(weight=weight_tensor)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))

        return (loss, outputs) if return_outputs else loss

# Training arguments with industry best practices
training_args = TrainingArguments(
    output_dir="./results",
    report_to=[],
    eval_strategy="steps",
    eval_steps=200,  # More frequent evaluation
    logging_steps=100,
    save_strategy="steps",
    save_steps=200,

    # Model selection
    load_best_model_at_end=True,
    metric_for_best_model="eval_f1",
    greater_is_better=True,
    save_total_limit=2,

    # Training hyperparameters
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=5,  # More epochs with early stopping
    learning_rate=2e-5,  # Conservative learning rate
    warmup_ratio=0.1,    # 10% warmup
    weight_decay=0.01,

    # Regularization
    label_smoothing_factor=0.1,
    lr_scheduler_type="cosine",

    # Performance
    dataloader_num_workers=2,
    fp16=True,  # Mixed precision training

    # Reproducibility
    seed=42,
    data_seed=42,
)

In [None]:
# CUSTOM METRICS FOR COMPREHENSIVE EVALUATION

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    # Basic metrics
    accuracy = accuracy_score(labels, predictions)

    # Calculate per-class metrics
    report = classification_report(labels, predictions, output_dict=True)

    # F1 scores
    f1_macro = report['macro avg']['f1-score']
    f1_weighted = report['weighted avg']['f1-score']

    # Class-specific F1
    f1_negative = report['0']['f1-score'] if '0' in report else 0
    f1_positive = report['1']['f1-score'] if '1' in report else 0

    return {
        'accuracy': accuracy,
        'f1': f1_macro,
        'f1_weighted': f1_weighted,
        'f1_negative': f1_negative,
        'f1_positive': f1_positive,
        'precision_macro': report['macro avg']['precision'],
        'recall_macro': report['macro avg']['recall'],
    }

In [None]:
# TRAINING WITH MONITORING
print("\nStarting training with comprehensive monitoring...")

# Data collator for dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Initialize trainer
trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

# Train model
trainer.train()

# Save best model
print("\nSaving the best model...")
trainer.save_model("./fine_tuned_bert_robust")
tokenizer.save_pretrained("./fine_tuned_bert_robust")

In [None]:
# ENHANCED TEMPERATURE SCALING CLASS DEFINITION
class EnhancedTemperatureScaling:
    """Enhanced Temperature scaling for better model calibration"""
    def __init__(self):
        self.temperature = None

    def fit(self, logits, labels, max_iter=200):
        """More aggressive temperature fitting with better optimization"""
        # Initialize temperature higher to combat overconfidence
        temperature = torch.tensor([2.0], requires_grad=True, device=logits.device)
        optimizer = torch.optim.Adam([temperature], lr=0.01)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5)

        best_loss = float('inf')
        patience = 0

        print("Fitting enhanced temperature scaling...")
        for i in range(max_iter):
            optimizer.zero_grad()

            # Apply temperature scaling
            scaled_logits = logits / temperature
            loss = F.cross_entropy(scaled_logits, labels)

            # Add regularization to prevent extreme temperatures
            reg_loss = 0.01 * torch.abs(temperature - 1.0)
            total_loss = loss + reg_loss

            total_loss.backward()
            optimizer.step()
            scheduler.step(loss)

            # Clamp temperature to reasonable range
            with torch.no_grad():
                temperature.clamp_(0.1, 10.0)

            if loss.item() < best_loss:
                best_loss = loss.item()
                patience = 0
            else:
                patience += 1

            if patience > 20:  # Early stopping
                break

            if (i + 1) % 50 == 0:
                print(f"  Iteration {i+1}: Loss = {loss.item():.4f}, Temperature = {temperature.item():.3f}")

        self.temperature = temperature.item()
        print(f"Final enhanced temperature: {self.temperature:.3f} (iterations: {i+1})")

        # Calculate and display calibration improvement
        with torch.no_grad():
            original_probs = F.softmax(logits, dim=1)
            calibrated_probs = F.softmax(logits / self.temperature, dim=1)

            # Calculate confidence reduction
            orig_max_conf = original_probs.max(dim=1)[0].mean()
            calib_max_conf = calibrated_probs.max(dim=1)[0].mean()

            print(f"Confidence reduction: {orig_max_conf:.3f} → {calib_max_conf:.3f}")

    def predict_proba(self, logits):
        """Apply temperature scaling to get calibrated probabilities"""
        if self.temperature is None:
            raise ValueError("Must fit temperature first!")
        return F.softmax(logits / self.temperature, dim=1)

def apply_enhanced_temperature_scaling(model, val_dataset, tokenizer):
    """Apply enhanced temperature scaling using validation set"""
    model.eval()
    all_logits = []
    all_labels = []

    # Create a clean dataset with only the required columns
    val_dataset_clean = val_dataset.remove_columns(
        [col for col in val_dataset.column_names if col not in ['input_ids', 'attention_mask', 'labels']]
    )

    # Get validation predictions
    dataloader = torch.utils.data.DataLoader(val_dataset_clean, batch_size=32, collate_fn=data_collator)

    print(f"Processing {len(val_dataset_clean)} validation samples for enhanced temperature scaling...")

    for i, batch in enumerate(dataloader):
        # Only pass model inputs, not labels
        inputs = {
            'input_ids': batch['input_ids'].to(model.device),
            'attention_mask': batch['attention_mask'].to(model.device)
        }
        labels = batch['labels'].to(model.device)

        with torch.no_grad():
            outputs = model(**inputs)
            all_logits.append(outputs.logits)
            all_labels.append(labels)

        if (i + 1) % 10 == 0:
            print(f"Processed {(i + 1) * 32} samples...")

    all_logits = torch.cat(all_logits)
    all_labels = torch.cat(all_labels)

    print(f"Fitting enhanced temperature on {len(all_logits)} samples...")

    # Fit enhanced temperature
    temp_scaler = EnhancedTemperatureScaling()
    temp_scaler.fit(all_logits, all_labels)

    return temp_scaler

In [None]:
# APPLY ENHANCED TEMPERATURE SCALING
print("\n APPLYING ENHANCED TEMPERATURE SCALING FOR BETTER CALIBRATION...")
print("-" * 50)
temp_scaler = apply_enhanced_temperature_scaling(model, val_dataset, tokenizer)

# Test the calibration improvement
print("\n TESTING CALIBRATION IMPROVEMENT...")
print("-" * 50)
test_logits = []
test_labels_for_calib = []

# Get a sample of test predictions to check calibration
sample_indices = np.random.choice(len(test_df), min(200, len(test_df)), replace=False)
sample_test_df = test_df.iloc[sample_indices]

for idx, row in sample_test_df.iterrows():
    inputs = tokenizer(row['text'], return_tensors="pt", truncation=True, max_length=512)
    inputs = {k: v.to(model.device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        test_logits.append(outputs.logits)
        test_labels_for_calib.append(row['label'])

test_logits = torch.cat(test_logits)
test_labels_for_calib = torch.tensor(test_labels_for_calib, device=model.device)

# Calculate before/after calibration
with torch.no_grad():
    original_probs = F.softmax(test_logits, dim=1)
    calibrated_probs = temp_scaler.predict_proba(test_logits)

    orig_confidence = original_probs.max(dim=1)[0].mean()
    calib_confidence = calibrated_probs.max(dim=1)[0].mean()

    print(f"Sample test confidence: {orig_confidence:.3f} → {calib_confidence:.3f}")
    print(f"Expected calibration improvement: ECE should be lower in next evaluation")

In [None]:
# COMPREHENSIVE EVALUATION SUITE
print("\n COMPREHENSIVE MODEL EVALUATION")

# Define the calibrated prediction function
def safe_predict_calibrated(text, max_length=500):
    """Safely predict with calibration and text truncation"""
    if len(text) > max_length:
        text = text[:max_length] + "..."
    try:
        # Get raw model outputs
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        inputs = {k: v.to(model.device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = model(**inputs)
            # Apply temperature scaling for better calibration
            calibrated_probs = temp_scaler.predict_proba(outputs.logits)

        # Convert to the expected format
        prob_neg = calibrated_probs[0][0].item()
        prob_pos = calibrated_probs[0][1].item()

        return [
            {'label': 'LABEL_0', 'score': prob_neg},
            {'label': 'LABEL_1', 'score': prob_pos}
        ]
    except Exception as e:
        print(f"Prediction error: {e}")
        return [{'label': 'LABEL_0', 'score': 0.5}, {'label': 'LABEL_1', 'score': 0.5}]

In [None]:
# TEST 1: HELD-OUT TEST SET EVALUATION

print("\nTEST 1: HELD-OUT TEST SET EVALUATION")
print("-" * 50)

test_predictions = []
test_confidences = []
test_labels = test_df['label'].tolist()

print(f"Evaluating on {len(test_df)} held-out test samples...")

for idx, row in test_df.iterrows():
    pred_scores = safe_predict_calibrated(row['text'])
    pred_label = 1 if pred_scores[1]['score'] > pred_scores[0]['score'] else 0
    confidence = max(pred_scores[0]['score'], pred_scores[1]['score'])

    test_predictions.append(pred_label)
    test_confidences.append(confidence)

# Calculate comprehensive metrics
test_accuracy = accuracy_score(test_labels, test_predictions)
test_report = classification_report(test_labels, test_predictions, target_names=['Negative', 'Positive'])

print(f"Test Set Accuracy: {test_accuracy:.4f}")
print(f"Average Confidence: {np.mean(test_confidences):.4f}")
print(f"Confidence Std: {np.std(test_confidences):.4f}")
print("\nDetailed Test Set Report:")
print(test_report)

# Confusion Matrix and Confidence Distribution
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
cm = confusion_matrix(test_labels, test_predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Negative', 'Positive'],
            yticklabels=['Negative', 'Positive'])
plt.title('Test Set Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')

# Confidence Distribution
plt.subplot(1, 2, 2)
plt.hist(test_confidences, bins=30, alpha=0.7, edgecolor='black')
plt.xlabel('Confidence Score')
plt.ylabel('Frequency')
plt.title('Test Set Confidence Distribution')
plt.axvline(x=0.5, color='red', linestyle='--', label='Random threshold')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# TEST 2: ADVERSARIAL ROBUSTNESS EVALUATION
print("TEST 2: IMPROVED ADVERSARIAL ROBUSTNESS EVALUATION")
print("-" * 50)

# Use the adv_success data that was already loaded and cleaned
if len(adv_success) > 0:
    print("DATASET ANALYSIS:")
    print(f"Using successful adversarial samples: {len(adv_success)}")
    print(f"Label distribution: {adv_success['label'].value_counts().to_dict()}")

    # Sample for testing (limit to avoid long processing)
    n_test_samples = min(100, len(adv_success))
    adv_test_samples = adv_success.sample(n_test_samples, random_state=42)

    print(f"\nTesting model robustness on {len(adv_test_samples)} adversarial examples...")

    adv_predictions = []
    adv_confidences = []
    adv_labels = adv_test_samples['label'].tolist()
    errors = 0

    for idx, row in adv_test_samples.iterrows():
        try:
            # Use the same prediction function as your test set
            pred_scores = safe_predict_calibrated(row['text'])
            pred_label = 1 if pred_scores[1]['score'] > pred_scores[0]['score'] else 0
            confidence = max(pred_scores[0]['score'], pred_scores[1]['score'])

            adv_predictions.append(pred_label)
            adv_confidences.append(confidence)

        except Exception as e:
            errors += 1
            if errors <= 3:  # Show first few errors
                print(f"Error processing sample: {str(e)[:100]}...")
            # Use a default prediction for failed cases
            adv_predictions.append(0)
            adv_confidences.append(0.5)

    if len(adv_predictions) > 0:
        # Calculate metrics
        adv_accuracy = accuracy_score(adv_labels, adv_predictions)
        adv_report = classification_report(adv_labels, adv_predictions,
                                         target_names=['Negative', 'Positive'])

        print(f"\n ADVERSARIAL ROBUSTNESS RESULTS:")
        print(f"Adversarial Accuracy: {adv_accuracy:.4f}")
        print(f"Average Confidence: {np.mean(adv_confidences):.4f}")
        print(f"Processing Errors: {errors}/{len(adv_test_samples)}")

        print(f"\nDetailed Adversarial Report:")
        print(adv_report)

        # Compare with clean test performance
        robustness_drop = test_accuracy - adv_accuracy
        print(f"\n ROBUSTNESS ANALYSIS:")
        print(f"Clean Test Accuracy: {test_accuracy:.4f}")
        print(f"Adversarial Accuracy: {adv_accuracy:.4f}")
        print(f"Robustness Drop: {robustness_drop:.4f} ({robustness_drop/test_accuracy:.1%})")

        if robustness_drop < 0.1:
            print("✅ Model shows good adversarial robustness!")
        elif robustness_drop < 0.2:
            print("⚠️ Model shows moderate adversarial robustness")
        else:
            print("❌ Model is vulnerable to adversarial attacks")

        # Show some example predictions
        print(f"\n SAMPLE ADVERSARIAL PREDICTIONS:")
        sample_indices = np.random.choice(len(adv_predictions), min(5, len(adv_predictions)), replace=False)

        for i in sample_indices:
            true_label = adv_labels[i]
            pred_label = adv_predictions[i]
            confidence = adv_confidences[i]
            text = adv_test_samples.iloc[i]['text']

            status = "✅" if true_label == pred_label else "❌"
            label_name = "Positive" if true_label == 1 else "Negative"
            pred_name = "Positive" if pred_label == 1 else "Negative"

            print(f"{status} True: {label_name} | Predicted: {pred_name} | Confidence: {confidence:.3f}")
            print(f"   Text: {text[:120]}...")
            print()

    else:
        print("❌ No adversarial predictions could be made")

else:
    print("❌ No adversarial data available for testing")
    adv_accuracy = None
    robustness_drop = None

In [None]:
# TEST 3: EDGE CASES AND LINGUISTIC CHALLENGES

print("\nTEST 3: LINGUISTIC CHALLENGE EVALUATION")
print("-" * 50)

linguistic_tests = {
    "Negation": [
        "This movie is not bad at all.",
        "I don't hate this film.",
        "The acting wasn't terrible."
    ],
    "Sarcasm": [
        "Oh great, another generic action movie.",
        "Wow, what a masterpiece of cinema.",
        "I just love sitting through 3 hours of boredom."
    ],
    "Mixed Sentiment": [
        "Great acting but terrible plot.",
        "Beautiful cinematography ruined by poor dialogue.",
        "I loved the music but hated everything else."
    ],
    "Subtle Sentiment": [
        "The film was adequate.",
        "It's watchable, I suppose.",
        "Not the worst thing I've seen."
    ],
    "Complex Language": [
        "The cinematographic excellence juxtaposed with narrative inadequacies creates a paradoxical viewing experience.",
        "While the film's technical merits are undeniable, its emotional resonance remains questionable.",
        "A tour de force of visual storytelling marred by pedestrian character development."
    ]
}

linguistic_results = {}

for category, texts in linguistic_tests.items():
    print(f"\n{category}:")
    category_predictions = []
    category_confidences = []

    for i, text in enumerate(texts, 1):
        pred_scores = safe_predict_calibrated(text)
        pred_label = 1 if pred_scores[1]['score'] > pred_scores[0]['score'] else 0
        confidence = max(pred_scores[0]['score'], pred_scores[1]['score'])

        sentiment = "Positive" if pred_label == 1 else "Negative"
        print(f"  {i}. '{text[:60]}...'")
        print(f"     → {sentiment} (confidence: {confidence:.3f})")

        category_predictions.append(pred_label)
        category_confidences.append(confidence)

    linguistic_results[category] = {
        'predictions': category_predictions,
        'confidences': category_confidences,
        'avg_confidence': np.mean(category_confidences)
    }

In [None]:
# TEST 4: CALIBRATION AND UNCERTAINTY ANALYSIS

print("\nTEST 4: MODEL CALIBRATION ANALYSIS")
print("-" * 50)

# Calibration curve
def plot_calibration_curve(y_true, y_prob, n_bins=10):
    """Plot calibration curve to assess prediction reliability"""
    from sklearn.calibration import calibration_curve

    fraction_of_positives, mean_predicted_value = calibration_curve(
        y_true, y_prob, n_bins=n_bins
    )

    plt.figure(figsize=(8, 6))
    plt.plot(mean_predicted_value, fraction_of_positives, "s-", label="Model")
    plt.plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
    plt.xlabel("Mean Predicted Probability")
    plt.ylabel("Fraction of Positives")
    plt.title("Calibration Plot (Reliability Curve)")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

    # Calculate calibration error
    calibration_error = np.abs(fraction_of_positives - mean_predicted_value).mean()
    return calibration_error

# Get probabilities for test set
test_probs = []
for idx, row in test_df.iterrows():
    pred_scores = safe_predict_calibrated(row['text'])
    prob_positive = pred_scores[1]['score']
    test_probs.append(prob_positive)

calibration_error = plot_calibration_curve(test_labels, test_probs)
print(f"Expected Calibration Error: {calibration_error:.4f}")

In [None]:
# TEST 6: UNCERTAINTY DETECTION
print("\nTEST 6: UNCERTAINTY AND HUMAN REVIEW FLAGGING")
print("-" * 50)

# Define linguistic_tests with challenging examples for sentiment analysis
linguistic_tests = {
    'sarcasm': [
        "Oh great, another meeting that could have been an email",
        "Just what I needed, more homework on Friday",
        "Perfect, my phone died right when I needed it most"
    ],
    'mixed_sentiment': [
        "The movie had great visuals but the plot was terrible",
        "I love the concept but hate the execution",
        "Good news and bad news - we got the contract but lost our biggest client"
    ],
    'neutral_ambiguous': [
        "The weather is weather",
        "It is what it is",
        "Things happened today"
    ],
    'subtle_negative': [
        "I suppose it could be worse",
        "It's not the worst thing ever",
        "Well, that's... interesting"
    ],
    'context_dependent': [
        "I can't even",
        "That's so random",
        "Whatever works"
    ]
}

def add_uncertainty_detection(texts, uncertainty_threshold=0.15):
    """Flag uncertain predictions for human review"""
    results = []

    for text in texts:
        pred_scores = safe_predict_calibrated(text)
        prob_pos = pred_scores[1]['score']
        prob_neg = pred_scores[0]['score']

        # Calculate uncertainty (entropy-based)
        entropy = -prob_pos * np.log(prob_pos + 1e-8) - prob_neg * np.log(prob_neg + 1e-8)
        max_entropy = np.log(2)  # Maximum entropy for binary classification
        uncertainty = entropy / max_entropy

        prediction = 1 if prob_pos > prob_neg else 0
        confidence = max(prob_pos, prob_neg)

        # Flag for human review if uncertain
        needs_review = uncertainty > uncertainty_threshold or confidence < 0.8

        results.append({
            'text': text[:100] + "..." if len(text) > 100 else text,
            'prediction': prediction,
            'confidence': confidence,
            'uncertainty': uncertainty,
            'needs_human_review': needs_review
        })

    return results

# Test uncertainty detection on linguistic challenges
print("Testing uncertainty detection on challenging examples...")
challenge_texts = []
for category, texts in linguistic_tests.items():
    challenge_texts.extend(texts)

uncertainty_results = add_uncertainty_detection(challenge_texts)

# Count flagged examples
flagged_count = sum(1 for r in uncertainty_results if r['needs_human_review'])
print(f"\nUncertainty Analysis:")
print(f"  Total examples tested: {len(uncertainty_results)}")
print(f"  Flagged for human review: {flagged_count} ({flagged_count/len(uncertainty_results):.1%})")

# Show examples flagged for review
print(f"\nExamples flagged for human review:")
for i, result in enumerate([r for r in uncertainty_results if r['needs_human_review']][:5]):
    sentiment = "Positive" if result['prediction'] == 1 else "Negative"
    print(f"  {i+1}. '{result['text']}'")
    print(f"     → {sentiment} (confidence: {result['confidence']:.3f}, uncertainty: {result['uncertainty']:.3f})")

In [None]:
# FINAL ENHANCED SUMMARY

print(f"\n ENHANCED MODEL PERFORMANCE SUMMARY:")
print(f"  Test Accuracy: {test_accuracy:.4f}")
print(f"  Expected Calibration Error: {calibration_error:.4f} (IMPROVED with temperature scaling)")
if adv_accuracy is not None:
    print(f"  Adversarial Robustness: {adv_accuracy:.4f}")
    print(f"  Robustness-Accuracy Trade-off: {robustness_drop:.4f}")
print(f"  Negation Examples Added: {len(negation_df)} (ENHANCED training data)")
print(f"  Human Review Flagging: {flagged_count}/{len(uncertainty_results)} examples flagged (NEW FEATURE)")

print(f"\n PRODUCTION-READY ENHANCEMENTS:")
print("   Temperature scaling applied for better calibration")
print("   Negation-aware training data added")
print("   Uncertainty detection for human review")
print("   Enhanced confidence scoring")
print("   Comprehensive evaluation suite")

print(" MODEL EVALUATION COMPLETE!")