In [2]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import os
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report, precision_recall_fscore_support, f1_score
import torch.nn.functional as F
import re
from typing import Dict, List, Tuple, Optional

In [3]:

# Load model and tokenizer
model_path = "../hate_speech_model"
tokenizer = AutoTokenizer.from_pretrained(model_path, local_files_only=True)
model = AutoModelForSequenceClassification.from_pretrained(model_path, local_files_only=True)
model.eval()


RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(64001, 768, padding_idx=1)
      (position_embeddings): Embedding(130, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
         

In [4]:

# Label mappings
label_map = {0: "hatespeech", 1: "offensive", 2: "normal"}
inv_label_map = {v: k for k, v in label_map.items()}


In [5]:

def clean_text(text):
    """Clean text by removing non-ASCII characters and normalizing whitespace"""
    text = re.sub(r"[^\x00-\x7F]+", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def get_probabilities(texts: List[str], labels: List[int], model, tokenizer) -> Tuple[np.ndarray, List[int]]:
    """Get model probabilities for all texts, handling errors gracefully"""
    model.eval()
    all_probs = []
    new_labels = []
    skipped = 0
    
    for i, (text, label) in enumerate(zip(texts, labels)):
        try:
            if not isinstance(text, str):
                text = str(text)
            text = clean_text(text)
            
            # Skip empty texts
            if not text.strip():
                print(f"⚠️ Skipping empty text at index {i}")
                skipped += 1
                continue
            
            inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
            with torch.no_grad():
                logits = model(**inputs).logits
                probs = F.softmax(logits, dim=1).squeeze()
                all_probs.append(probs.cpu().numpy())
                new_labels.append(label)
                
        except Exception as e:
            print(f"⚠️ Skipping example {i}: {repr(text[:50])} -> {e}")
            skipped += 1
    
    print(f"✅ Processed {len(all_probs)} examples. Skipped {skipped} problematic examples.")
    return np.array(all_probs), new_labels

def optimize_confidence_threshold(probs: np.ndarray, true_labels: List[int], 
                                metric: str = "f1") -> float:
    """
    Optimize a single confidence threshold where predictions below threshold are marked as uncertain.
    This is useful when you want to identify low-confidence predictions.
    """
    thresholds = np.arange(0.1, 1.0, 0.01)
    best_score = 0
    best_threshold = 0.5
    
    for threshold in thresholds:
        predictions = []
        valid_true = []
        
        for i, prob in enumerate(probs):
            max_prob = np.max(prob)
            if max_prob >= threshold:
                predictions.append(np.argmax(prob))
                valid_true.append(true_labels[i])
        
        if len(predictions) > 0:
            if metric == "f1":
                score = f1_score(valid_true, predictions, average='weighted')
            elif metric == "accuracy":
                score = np.mean(np.array(valid_true) == np.array(predictions))
            
            if score > best_score:
                best_score = score
                best_threshold = threshold
    
    return best_threshold

def optimize_class_specific_thresholds(probs: np.ndarray, true_labels: List[int]) -> Dict[int, float]:
    """
    Optimize thresholds for each class to maximize per-class F1 scores.
    Uses a proper multi-class decision strategy.
    """
    best_thresholds = {}
    threshold_range = np.arange(0.1, 0.9, 0.02)
    
    for class_id in range(len(label_map)):
        best_f1 = 0
        best_threshold = 0.5
        
        for threshold in threshold_range:
            # Create binary predictions for this class
            binary_preds = (probs[:, class_id] >= threshold).astype(int)
            binary_true = (np.array(true_labels) == class_id).astype(int)
            
            # Calculate F1 for this class
            if np.sum(binary_preds) > 0:  # Avoid division by zero
                precision = np.sum(binary_preds & binary_true) / np.sum(binary_preds)
                recall = np.sum(binary_preds & binary_true) / np.sum(binary_true) if np.sum(binary_true) > 0 else 0
                f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
                
                if f1 > best_f1:
                    best_f1 = f1
                    best_threshold = threshold
        
        best_thresholds[class_id] = best_threshold
    
    return best_thresholds

def predict_with_confidence_threshold(probs: np.ndarray, threshold: float, 
                                    fallback: str = "argmax") -> List:
    """Make predictions using a confidence threshold"""
    predictions = []
    
    for prob in probs:
        max_prob = np.max(prob)
        if max_prob >= threshold:
            predictions.append(np.argmax(prob))
        else:
            if fallback == "argmax":
                predictions.append(np.argmax(prob))
            elif fallback == "uncertain":
                predictions.append(-1)  # Use -1 for uncertain
            else:
                predictions.append(fallback)
    
    return predictions

def predict_with_class_thresholds(probs: np.ndarray, thresholds: Dict[int, float]) -> List[int]:
    """
    Make predictions using class-specific thresholds.
    Strategy: For each sample, check which classes exceed their thresholds,
    then pick the one with highest probability among those.
    """
    predictions = []
    
    for prob in probs:
        # Find classes that exceed their thresholds
        confident_classes = []
        for class_id, threshold in thresholds.items():
            if prob[class_id] >= threshold:
                confident_classes.append((class_id, prob[class_id]))
        
        if confident_classes:
            # Pick the class with highest probability among confident classes
            confident_classes.sort(key=lambda x: x[1], reverse=True)
            predictions.append(confident_classes[0][0])
        else:
            # Fallback to argmax if no class is confident enough
            predictions.append(np.argmax(prob))
    
    return predictions


In [6]:

# Load and prepare data
print("Loading validation data...")
df_val = pd.read_csv("../data/val_data.csv")

texts = df_val["text"].tolist()
first_label = df_val["label"].iloc[0]
if isinstance(first_label, str):
    true_labels = [inv_label_map[label] for label in df_val["label"]]
else:
    true_labels = df_val["label"].tolist()

print(f"Loaded {len(texts)} examples")
print(f"Label distribution: {np.bincount(true_labels)}")

# Get probabilities
print("\nGetting model probabilities...")
val_probs, true_labels = get_probabilities(texts, true_labels, model, tokenizer)

# Baseline: Standard argmax predictions
baseline_preds = np.argmax(val_probs, axis=1)
print("\n" + "="*50)
print("BASELINE (Argmax) Results:")
print("="*50)
print(classification_report(true_labels, baseline_preds, target_names=list(label_map.values()), digits=3))

# Method 1: Confidence threshold optimization
print("\n" + "="*50)
print("METHOD 1: Confidence Threshold Optimization")
print("="*50)

confidence_threshold = optimize_confidence_threshold(val_probs, true_labels, metric="f1")
print(f"Optimal confidence threshold: {confidence_threshold:.3f}")

confidence_preds = predict_with_confidence_threshold(val_probs, confidence_threshold)
print("\nResults with confidence threshold:")
print(classification_report(true_labels, confidence_preds, target_names=list(label_map.values()), digits=3))

# Show uncertainty analysis
uncertain_preds = predict_with_confidence_threshold(val_probs, confidence_threshold, fallback="uncertain")
uncertain_count = sum(1 for p in uncertain_preds if p == -1)
print(f"Uncertain predictions: {uncertain_count} ({uncertain_count/len(uncertain_preds)*100:.1f}%)")

# Method 2: Class-specific threshold optimization
print("\n" + "="*50)
print("METHOD 2: Class-Specific Threshold Optimization")
print("="*50)

class_thresholds = optimize_class_specific_thresholds(val_probs, true_labels)
print("Optimal class-specific thresholds:")
for class_id, threshold in class_thresholds.items():
    print(f"  {label_map[class_id]}: {threshold:.3f}")

class_threshold_preds = predict_with_class_thresholds(val_probs, class_thresholds)
print("\nResults with class-specific thresholds:")
print(classification_report(true_labels, class_threshold_preds, target_names=list(label_map.values()), digits=3))

# Analysis of prediction changes
print("\n" + "="*50)
print("PREDICTION CHANGE ANALYSIS")
print("="*50)

baseline_vs_confidence = np.sum(baseline_preds != confidence_preds)
baseline_vs_class = np.sum(baseline_preds != class_threshold_preds)

print(f"Predictions changed from baseline:")
print(f"  Confidence threshold: {baseline_vs_confidence} ({baseline_vs_confidence/len(baseline_preds)*100:.1f}%)")
print(f"  Class thresholds: {baseline_vs_class} ({baseline_vs_class/len(baseline_preds)*100:.1f}%)")


Loading validation data...
Loaded 7971 examples
Label distribution: [1627 3870 2474]

Getting model probabilities...
⚠️ Skipping example 1808: 'this idyllic national socialist life could of been' -> index out of range in self
⚠️ Skipping example 4943: '#Pedogate #Pizzagate #FollowTheWhiteRabbit RIP Dem' -> index out of range in self
⚠️ Skipping example 5104: 'My bitch was so loaded last night &#128514;&#12851' -> index out of range in self
⚠️ Skipping example 7895: 'RT @winkSOSA: "@AintShitSweet__: "@Rakwon_OGOD: Ni' -> index out of range in self
✅ Processed 7967 examples. Skipped 4 problematic examples.

BASELINE (Argmax) Results:
              precision    recall  f1-score   support

  hatespeech      0.594     0.839     0.696      1625
   offensive      0.895     0.740     0.810      3868
      normal      0.812     0.812     0.812      2474

    accuracy                          0.783      7967
   macro avg      0.767     0.797     0.773      7967
weighted avg      0.808     0.783 