In [2]:
# Import all necessary libraries
import os
import sys
import json
import torch
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import warnings
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from sklearn.calibration import calibration_curve
from transformers import RobertaTokenizer, RobertaForSequenceClassification, BertTokenizer
import torch.nn.functional as F

# Setup
warnings.filterwarnings('ignore')
sys.path.append('../src')
sys.path.append('../')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"✅ Libraries loaded | PyTorch: {torch.__version__} | CUDA: {torch.cuda.is_available()} | Device: {device}")


✅ Libraries loaded | PyTorch: 2.6.0+cu118 | CUDA: True | Device: cuda


In [3]:
# Import LCF-ATEPC model from local file
import torch.nn as nn
from transformers import BertModel, BertTokenizer

class LCF_ATEPC(nn.Module):
    """
    Local copy of LCF-ATEPC model for loading
    """
    def __init__(self, pretrained_model_name='bert-base-uncased',
                 hidden_size=768, num_aspect_labels=2, num_sentiment_labels=2,
                 context_window=3, dropout_rate=0.15):
        super(LCF_ATEPC, self).__init__()
        self.bert = BertModel.from_pretrained(pretrained_model_name)
        self.hidden_size = hidden_size
        self.context_window = context_window
        
        # Dropout and normalization
        self.dropout_rate = dropout_rate
        self.dropout = nn.Dropout(dropout_rate)
        
        # Layer normalization
        self.ate_layer_norm = nn.LayerNorm(hidden_size)
        self.apc_layer_norm = nn.LayerNorm(hidden_size)
        
        # Self-attention for aspects
        self.aspect_attention = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=8,
            dropout=dropout_rate,
            batch_first=True
        )
        
        # Fusion layer
        self.fusion_fc = nn.Linear(hidden_size * 2, hidden_size)
        self.fusion_act = nn.GELU()
        
        # Classifiers
        self.aspect_classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.GELU(),
            nn.LayerNorm(hidden_size // 2),
            nn.Linear(hidden_size // 2, num_aspect_labels)
        )
        
        self.sentiment_classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.GELU(),
            nn.LayerNorm(hidden_size // 2),
            nn.Linear(hidden_size // 2, num_sentiment_labels)
        )

    def forward(self, input_ids, attention_mask, token_type_ids=None, aspect_positions=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask,
                            token_type_ids=token_type_ids,
                            output_hidden_states=True)
        sequence_output = outputs.last_hidden_state
        
        # For inference we only need sentiment prediction
        apc_logits = None
        if aspect_positions is not None:
            batch_size, seq_len, hid = sequence_output.size()
            apc_feats = []

            actual_batch_size = min(batch_size, len(aspect_positions))
            
            for b in range(actual_batch_size):
                try:
                    spans = aspect_positions[b]
                    
                    if not spans:
                        apc_feats.append(sequence_output[b, 0])
                        continue
                        
                    # Normalize span format
                    if not isinstance(spans, list) and not isinstance(spans, tuple):
                        spans = [spans]
                    elif len(spans) == 2 and all(isinstance(x, (int, float)) for x in spans):
                        spans = [spans]
                    
                    batch_span_feats = []
                    for span in spans:
                        if isinstance(span, (list, tuple)) and len(span) == 2:
                            s, e = span
                            s = max(0, min(int(s), seq_len - 1))
                            e = max(s, min(int(e), seq_len - 1))
                            
                            # Local context around aspect
                            left = max(0, s - self.context_window)
                            right = min(seq_len, e + 1 + self.context_window)
                            
                            # Aspect representation
                            aspect_repr = sequence_output[b, s:e, :]
                            if aspect_repr.size(0) == 0:
                                aspect_repr = sequence_output[b, 0].unsqueeze(0)
                            else:
                                aspect_repr = aspect_repr.mean(dim=0, keepdim=True)
                                
                            # Context representation
                            context_repr = sequence_output[b, left:right, :]
                            
                            # Self-attention to improve aspect-context relationship
                            if context_repr.size(0) > 1:
                                context_mask = torch.ones(context_repr.size(0), device=context_repr.device)
                                context_attn_output, _ = self.aspect_attention(
                                    context_repr.unsqueeze(0),
                                    context_repr.unsqueeze(0),
                                    context_repr.unsqueeze(0),
                                    key_padding_mask=(1 - context_mask.unsqueeze(0)).bool()
                                )
                                context_repr = context_attn_output.squeeze(0)
                                context_repr = context_repr.mean(dim=0, keepdim=True)
                            else:
                                context_repr = context_repr.mean(dim=0, keepdim=True)
                            
                            # Combine aspect and context
                            combined = torch.cat([aspect_repr, context_repr], dim=1)
                            fused = self.fusion_fc(combined.view(-1))
                            fused = self.fusion_act(fused)
                            
                            batch_span_feats.append(fused)
                    
                    if not batch_span_feats:
                        apc_feats.append(sequence_output[b, 0])
                    else:
                        span_tensor = torch.stack(batch_span_feats)
                        apc_feats.append(span_tensor.mean(dim=0))
                        
                except Exception as e:
                    apc_feats.append(sequence_output[b, 0])
            
            # Handle case when batch is smaller than expected
            if actual_batch_size < batch_size:
                for b in range(actual_batch_size, batch_size):
                    apc_feats.append(sequence_output[b, 0])
            
            # Stack all features and normalize
            apc_tensor = torch.stack(apc_feats, dim=0)
            apc_tensor = self.apc_layer_norm(apc_tensor)
            
            # Sentiment classifier
            apc_logits = self.sentiment_classifier(apc_tensor)

        return apc_logits

print("✅ LCF-ATEPC model defined successfully!")


✅ LCF-ATEPC model defined successfully!


In [4]:
# Define class for loading all models
class ModelLoader:
    """
    Loader for 3 models from models/ folder
    """
    def __init__(self, models_dir='../models'):
        self.models_dir = models_dir
        self.models = {}
        self.tokenizers = {}
        self.device = device
        
    def load_lcf_atepc(self):
        """Load LCF-ATEPC model"""
        print("🔄 Loading LCF-ATEPC model...")
        model_path = os.path.join(self.models_dir, 'lcf_atepc')
        
        # Load configuration
        with open(os.path.join(model_path, "inference_config.json"), "r") as f:
            config = json.load(f)
        
        # Load tokenizer
        tokenizer_path = os.path.join(model_path, "tokenizer")
        tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
        
        # Create model
        model = LCF_ATEPC(
            pretrained_model_name=config["pretrained_model_name"],
            num_aspect_labels=config["num_aspect_labels"],
            num_sentiment_labels=config["num_sentiment_labels"],
            context_window=config["context_window"]
        )
        
        # Load weights
        model_weights_path = os.path.join(model_path, "model.pt")
        model.load_state_dict(torch.load(model_weights_path, map_location=self.device))
        model.to(self.device)
        model.eval()
        
        self.models['lcf_atepc'] = model
        self.tokenizers['lcf_atepc'] = tokenizer
        print("✅ LCF-ATEPC loaded successfully!")
        
    def load_roberta_model(self, model_name):
        """Load RoBERTa model with flexible path detection"""
        print(f"🔄 Loading {model_name} model...")
        base_model_path = os.path.join(self.models_dir, model_name)
        
        # Handle special case for roberta_absa_final
        if model_name == 'roberta_absa_final':
            # Try calibrated directory first
            calibrated_path = os.path.join(base_model_path, 'calibrated')
            if os.path.exists(calibrated_path) and os.path.exists(os.path.join(calibrated_path, 'tokenizer.json')):
                model_path = calibrated_path
                print(f"   📍 Using calibrated model from: {calibrated_path}")
            else:
                # Fallback to checkpoint directories
                checkpoint_dirs = [d for d in os.listdir(base_model_path) if d.startswith('checkpoint-')]
                if checkpoint_dirs:
                    # Use the highest numbered checkpoint
                    latest_checkpoint = max(checkpoint_dirs, key=lambda x: int(x.split('-')[1]))
                    model_path = os.path.join(base_model_path, latest_checkpoint)
                    print(f"   📍 Using checkpoint: {latest_checkpoint}")
                else:
                    raise ValueError(f"No valid model path found for {model_name}")
        else:
            model_path = base_model_path
        
        # Load tokenizer and model
        tokenizer = RobertaTokenizer.from_pretrained(model_path)
        
        # For calibrated model, we need to handle the custom model file
        if model_name == 'roberta_absa_final' and 'calibrated' in model_path:
            # Load config first
            config_path = os.path.join(model_path, 'config.json')
            if not os.path.exists(config_path):
                # Use a default config from another roberta model
                config_path = os.path.join(self.models_dir, 'roberta_absa', 'config.json')
            
            from transformers import RobertaConfig
            config = RobertaConfig.from_pretrained(config_path)
            model = RobertaForSequenceClassification(config)
            
            # Load the calibrated weights
            calibrated_model_path = os.path.join(model_path, 'calibrated_model.pt')
            if os.path.exists(calibrated_model_path):
                checkpoint = torch.load(calibrated_model_path, map_location=self.device)
                
                # Handle different checkpoint formats
                if isinstance(checkpoint, dict):
                    if 'model_state_dict' in checkpoint:
                        state_dict = checkpoint['model_state_dict']
                        print(f"   📦 Extracting model weights from 'model_state_dict' key")
                    elif 'state_dict' in checkpoint:
                        state_dict = checkpoint['state_dict']
                        print(f"   📦 Extracting model weights from 'state_dict' key")
                    else:
                        # Assume the checkpoint itself is the state dict
                        state_dict = checkpoint
                        print(f"   📦 Using checkpoint directly as state_dict")
                else:
                    state_dict = checkpoint
                
                # Remove "model." prefix from keys if present (for custom ABSA models)
                cleaned_state_dict = {}
                prefix_removed = False
                for key, value in state_dict.items():
                    if key.startswith('model.') and not key.startswith('model_'):
                        # Remove "model." prefix
                        new_key = key[6:]  # Remove "model." (6 characters)
                        # Skip non-standard ABSA layers that don't exist in RobertaForSequenceClassification
                        if not any(layer in new_key for layer in ['aspect_attention', 'aspect_marker_detector']):
                            cleaned_state_dict[new_key] = value
                            prefix_removed = True
                    elif key == 'temperature':
                        # Skip temperature parameter (used for calibration)
                        continue
                    else:
                        cleaned_state_dict[key] = value
                
                if prefix_removed:
                    print(f"   🔧 Removed 'model.' prefix from parameter names")
                    print(f"   🎯 Filtered out {len(state_dict) - len(cleaned_state_dict)} incompatible parameters")
                    state_dict = cleaned_state_dict
                
                model.load_state_dict(state_dict, strict=False)
                print(f"   ✅ Loaded calibrated weights from: calibrated_model.pt")
            else:
                raise ValueError(f"Calibrated model file not found: {calibrated_model_path}")
        else:
            model = RobertaForSequenceClassification.from_pretrained(model_path)
        
        model.to(self.device)
        model.eval()
        
        self.models[model_name] = model
        self.tokenizers[model_name] = tokenizer
        print(f"✅ {model_name} loaded successfully!")
        
    def load_all_models(self):
        """Load all models"""
        print("🚀 Starting to load all models...")
        
        # Load LCF-ATEPC
        self.load_lcf_atepc()
        
        # Load all RoBERTa models
        roberta_models = ['roberta_absa', 'roberta_absa_pseudo', 'roberta_absa_final']
        for model_name in roberta_models:
            try:
                self.load_roberta_model(model_name)
            except Exception as e:
                print(f"❌ Failed to load {model_name}: {e}")
                print(f"   Skipping {model_name} and continuing with other models...")
        
        print("🎉 Model loading completed!")
        print(f"📊 Successfully loaded models: {len(self.models)}")
        if len(self.models) < 4:
            print(f"⚠️  Some models failed to load. Available models: {list(self.models.keys())}")
        
    def get_model_info(self):
        """Get information about loaded models"""
        info = {}
        for name, model in self.models.items():
            info[name] = {
                'parameters': sum(p.numel() for p in model.parameters()),
                'trainable_parameters': sum(p.numel() for p in model.parameters() if p.requires_grad),
                'architecture': type(model).__name__
            }
        return info

# Create model loader
loader = ModelLoader()
print("🔧 Model loader created!")


🔧 Model loader created!


In [5]:
# Test loading each model individually first
print("🔍 Testing individual model loading...")

# Test LCF-ATEPC
try:
    loader.load_lcf_atepc()
    print("✅ LCF-ATEPC test passed")
except Exception as e:
    print(f"❌ LCF-ATEPC test failed: {e}")

# Test each RoBERTa model individually
roberta_models = ['roberta_absa', 'roberta_absa_pseudo']
for model_name in roberta_models:
    try:
        print(f"\n🧪 Testing {model_name}...")
        loader.load_roberta_model(model_name)
        print(f"✅ {model_name} test passed")
    except Exception as e:
        print(f"❌ {model_name} test failed: {e}")

print("\n" + "=" * 80)

# Display model information
print("\n📈 Information about loaded models:")
print("=" * 80)

model_info = loader.get_model_info()
if model_info:
    for name, info in model_info.items():
        print(f"\n🎯 {name.upper()}:")
        print(f"   - Architecture: {info['architecture']}")
        print(f"   - Total parameters: {info['parameters']:,}")
        print(f"   - Trainable parameters: {info['trainable_parameters']:,}")
        print(f"   - Model size: {info['parameters'] * 4 / (1024**2):.1f} MB")
else:
    print("❌ No models were loaded successfully!")

print("\n" + "=" * 80)


🔍 Testing individual model loading...
🔄 Loading LCF-ATEPC model...
✅ LCF-ATEPC loaded successfully!
✅ LCF-ATEPC test passed

🧪 Testing roberta_absa...
🔄 Loading roberta_absa model...
✅ roberta_absa loaded successfully!
✅ roberta_absa test passed

🧪 Testing roberta_absa_pseudo...
🔄 Loading roberta_absa_pseudo model...
✅ roberta_absa_pseudo loaded successfully!
✅ roberta_absa_pseudo test passed


📈 Information about loaded models:

🎯 LCF_ATEPC:
   - Architecture: LCF_ATEPC
   - Total parameters: 113,621,764
   - Trainable parameters: 113,621,764
   - Model size: 433.4 MB

🎯 ROBERTA_ABSA:
   - Architecture: RobertaForSequenceClassification
   - Total parameters: 124,647,170
   - Trainable parameters: 124,647,170
   - Model size: 475.5 MB

🎯 ROBERTA_ABSA_PSEUDO:
   - Architecture: RobertaForSequenceClassification
   - Total parameters: 124,647,170
   - Trainable parameters: 124,647,170
   - Model size: 475.5 MB



In [6]:
# Create 30 test examples for comprehensive evaluation
test_examples = [
    # Contradictory sentiments (15 examples)
    {"text": "This movie has stunning visual effects, but the plot is absolutely boring.", "aspect": "visual effects", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "The acting was terrible, but the cinematography is excellent.", "aspect": "acting", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "I didn't like the movie overall, but the soundtrack is magnificent.", "aspect": "soundtrack", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Great movie with excellent acting, but the dialogue is poorly written.", "aspect": "dialogue", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "Weak plot, but the special effects are mind-blowing.", "aspect": "special effects", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Bad movie overall, but the lead actor gave an outstanding performance.", "aspect": "lead actor", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Excellent film with great direction, but the music is annoying.", "aspect": "music", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "Poor storyline, but the costume design is absolutely brilliant.", "aspect": "costume design", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Amazing movie with superb acting, but the editing is choppy.", "aspect": "editing", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "Disappointing film overall, but the production design is stunning.", "aspect": "production design", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Great movie but the supporting cast was disappointing.", "aspect": "supporting cast", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "Boring film with excellent sound design.", "aspect": "sound design", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Fantastic movie but the script has major flaws.", "aspect": "script", "expected_sentiment": "negative", "category": "Contradictory"},
    {"text": "Poor film overall but the directing is masterful.", "aspect": "directing", "expected_sentiment": "positive", "category": "Contradictory"},
    {"text": "Excellent movie with terrible camera work.", "aspect": "camera work", "expected_sentiment": "negative", "category": "Contradictory"},
    
    # Consistent sentiments (10 examples)
    {"text": "Amazing movie! The actors perform brilliantly.", "aspect": "actors", "expected_sentiment": "positive", "category": "Consistent"},
    {"text": "Terrible movie with bad script and poor acting.", "aspect": "script", "expected_sentiment": "negative", "category": "Consistent"},
    {"text": "Magnificent film with stunning special effects.", "aspect": "special effects", "expected_sentiment": "positive", "category": "Consistent"},
    {"text": "Awful movie with horrible cinematography.", "aspect": "cinematography", "expected_sentiment": "negative", "category": "Consistent"},
    {"text": "Outstanding film with excellent direction.", "aspect": "direction", "expected_sentiment": "positive", "category": "Consistent"},
    {"text": "Poor movie with weak dialogue.", "aspect": "dialogue", "expected_sentiment": "negative", "category": "Consistent"},
    {"text": "Brilliant film with amazing soundtrack.", "aspect": "soundtrack", "expected_sentiment": "positive", "category": "Consistent"},
    {"text": "Disappointing movie with terrible editing.", "aspect": "editing", "expected_sentiment": "negative", "category": "Consistent"},
    {"text": "Fantastic movie with superb acting.", "aspect": "acting", "expected_sentiment": "positive", "category": "Consistent"},
    {"text": "Bad film with poor visual effects.", "aspect": "visual effects", "expected_sentiment": "negative", "category": "Consistent"},
    
    # Subtle sentiments (5 examples)
    {"text": "The movie is not bad, the music is quite pleasant.", "aspect": "music", "expected_sentiment": "positive", "category": "Subtle"},
    {"text": "The director tried but it didn't work out well.", "aspect": "director", "expected_sentiment": "negative", "category": "Subtle"},
    {"text": "The acting could have been better.", "aspect": "acting", "expected_sentiment": "negative", "category": "Subtle"},
    {"text": "The cinematography is reasonably good.", "aspect": "cinematography", "expected_sentiment": "positive", "category": "Subtle"},
    {"text": "The plot is somewhat lacking.", "aspect": "plot", "expected_sentiment": "negative", "category": "Subtle"}
]

print(f"✅ Created {len(test_examples)} test examples")
categories = {}
for example in test_examples:
    categories[example['category']] = categories.get(example['category'], 0) + 1
print(f"📊 Categories: {dict(categories)}")


✅ Created 30 test examples
📊 Categories: {'Contradictory': 15, 'Consistent': 10, 'Subtle': 5}


In [7]:
# Class for performing inference with different models
class ModelPredictor:
    """
    Class for performing predictions with all models
    """
    def __init__(self, models, tokenizers, device):
        self.models = models
        self.tokenizers = tokenizers
        self.device = device
        
    def find_aspect_positions(self, text, aspect, tokenizer):
        """Find aspect position in tokenized text"""
        # Tokenize text and aspect separately
        tokens = tokenizer.tokenize(text.lower())
        aspect_tokens = tokenizer.tokenize(aspect.lower())
        
        # Search for aspect token sequence
        for i in range(len(tokens) - len(aspect_tokens) + 1):
            if tokens[i:i+len(aspect_tokens)] == aspect_tokens:
                return [(i+1, i+len(aspect_tokens))]  # +1 to account for [CLS] token
        
        # If exact match not found, search for partial match
        aspect_lower = aspect.lower()
        text_lower = text.lower()
        if aspect_lower in text_lower:
            start_char = text_lower.find(aspect_lower)
            end_char = start_char + len(aspect_lower)
            # Approximate position mapping
            return [(1, 3)]  # Use approximate positions
        
        return [(1, 2)]  # Fallback positions
    
    def predict_lcf_atepc(self, text, aspect):
        """Prediction for LCF-ATEPC model"""
        model = self.models['lcf_atepc']
        tokenizer = self.tokenizers['lcf_atepc']
        
        # Prepare input data
        marked_text = text
        aspect_lower = aspect.lower().strip()
        text_lower = text.lower()
        
        if aspect_lower in text_lower:
            start_idx = text_lower.find(aspect_lower)
            end_idx = start_idx + len(aspect_lower)
            marked_text = f"{text[:start_idx]}[ASPECT]{text[start_idx:end_idx]}[/ASPECT]{text[end_idx:]}"
        
        # Tokenization
        encoding = tokenizer(
            marked_text,
            padding="max_length",
            truncation=True,
            max_length=128,
            return_tensors="pt"
        ).to(self.device)
        
        # Find aspect positions
        aspect_positions = self.find_aspect_positions(text, aspect, tokenizer)
        
        # Perform prediction
        with torch.no_grad():
            logits = model(
                input_ids=encoding["input_ids"],
                attention_mask=encoding["attention_mask"],
                aspect_positions=[aspect_positions]
            )
            
            if logits is not None:
                probs = F.softmax(logits, dim=-1)
                prediction = torch.argmax(logits, dim=-1).item()
                confidence = probs.max().item()
                
                return {
                    'prediction': prediction,
                    'confidence': confidence,
                    'probabilities': probs.cpu().numpy().flatten(),
                    'sentiment': 'positive' if prediction == 1 else 'negative'
                }
        
        # Fallback
        return {
            'prediction': 0,
            'confidence': 0.5,
            'probabilities': np.array([0.5, 0.5]),
            'sentiment': 'negative'
        }
    
    def predict_roberta(self, model_name, text, aspect):
        """Prediction for RoBERTa models"""
        model = self.models[model_name]
        tokenizer = self.tokenizers[model_name]
        
        # Prepare input text (aspect + context)
        input_text = f"{aspect} {tokenizer.sep_token} {text}"
        
        # Tokenization
        encoding = tokenizer(
            input_text,
            padding="max_length",
            truncation=True,
            max_length=128,
            return_tensors="pt"
        ).to(self.device)
        
        # Perform prediction
        with torch.no_grad():
            outputs = model(**encoding)
            logits = outputs.logits
            probs = F.softmax(logits, dim=-1)
            prediction = torch.argmax(logits, dim=-1).item()
            confidence = probs.max().item()
            
            return {
                'prediction': prediction,
                'confidence': confidence,
                'probabilities': probs.cpu().numpy().flatten(),
                'sentiment': 'positive' if prediction == 1 else 'negative'
            }
    
    def predict_all(self, text, aspect):
        """Get predictions from all models"""
        results = {}
        
        # LCF-ATEPC
        if 'lcf_atepc' in self.models:
            results['lcf_atepc'] = self.predict_lcf_atepc(text, aspect)
        
        # RoBERTa models - only predict for available models
        roberta_models = ['roberta_absa', 'roberta_absa_pseudo']
        for model_name in roberta_models:
            if model_name in self.models:
                results[model_name] = self.predict_roberta(model_name, text, aspect)
        
        return results

# Create predictor
predictor = ModelPredictor(loader.models, loader.tokenizers, device)
print("🎯 Model predictor created successfully!")


🎯 Model predictor created successfully!


In [8]:
# Run predictions for all test examples
print("🚀 Running model evaluation on 30 examples...")

all_results = []
for i, example in enumerate(test_examples):
    predictions = predictor.predict_all(example['text'], example['aspect'])
    all_results.append({
        'example_id': i,
        'text': example['text'],
        'aspect': example['aspect'],
        'expected_sentiment': example['expected_sentiment'],
        'category': example['category'],
        'predictions': predictions
    })

print(f"✅ Evaluation completed! Processed {len(all_results)} examples")


🚀 Running model evaluation on 30 examples...
✅ Evaluation completed! Processed 30 examples


In [9]:
# Calculate metrics for analysis
available_models = list(loader.models.keys())
model_names = [name for name in ['lcf_atepc', 'roberta_absa', 'roberta_absa_pseudo'] if name in available_models]

model_display_names = {
    'lcf_atepc': 'LCF-ATEPC',
    'roberta_absa': 'RoBERTa Base', 
    'roberta_absa_pseudo': 'RoBERTa Pseudo'
}

# Calculate accuracy metrics
model_accuracy = {}
model_by_category = {}
all_categories = list(set(r['category'] for r in all_results))

for model_name in model_names:
    correct = sum(1 for r in all_results if r['predictions'][model_name]['sentiment'] == r['expected_sentiment'])
    model_accuracy[model_name] = correct / len(all_results)
    
    model_by_category[model_name] = {}
    for category in all_categories:
        cat_results = [r for r in all_results if r['category'] == category]
        cat_correct = sum(1 for r in cat_results if r['predictions'][model_name]['sentiment'] == r['expected_sentiment'])
        model_by_category[model_name][category] = cat_correct / len(cat_results) if cat_results else 0

print(f"📊 Analysis ready for {len(model_names)} models on {len(all_results)} examples")


📊 Analysis ready for 3 models on 30 examples


In [10]:
# Interactive visualizations with Plotly
from plotly.subplots import make_subplots

# 1. Overall Model Performance
fig_overall = go.Figure()
models = [model_display_names[name] for name in model_names]
accuracies = [model_accuracy[name] for name in model_names]

fig_overall.add_trace(go.Bar(
    x=models, y=accuracies,
    text=[f'{acc:.1%}' for acc in accuracies],
    textposition='auto',
    marker_color=['#2E86AB', '#A23B72', '#F18F01'][:len(models)]
))

fig_overall.update_layout(
    title='ABSA Model Performance Comparison',
    yaxis_title='Accuracy',
    yaxis=dict(range=[0, 1], tickformat='.0%'),
    showlegend=False,
    height=400
)
fig_overall.show()

# 2. Performance by Category 
fig_cat = go.Figure()
colors = ['#2E86AB', '#A23B72', '#F18F01']

for i, model_name in enumerate(model_names):
    accuracies_by_cat = [model_by_category[model_name][cat] for cat in all_categories]
    fig_cat.add_trace(go.Bar(
        name=model_display_names[model_name],
        x=all_categories,
        y=accuracies_by_cat,
        marker_color=colors[i]
    ))

fig_cat.update_layout(
    title='Model Performance by Category',
    yaxis_title='Accuracy',
    yaxis=dict(range=[0, 1], tickformat='.0%'),
    barmode='group',
    height=400
)
fig_cat.show()


In [11]:
# Overconfidence Analysis for RoBERTa Models
confidence_data = []
for result in all_results:
    for model_name in model_names:
        pred = result['predictions'][model_name]
        is_correct = pred['sentiment'] == result['expected_sentiment']
        confidence_data.append({
            'model': model_display_names[model_name],
            'confidence': pred['confidence'],
            'correct': is_correct,
            'category': result['category']
        })

df_confidence = pd.DataFrame(confidence_data)

# 3. Confidence Distribution Analysis
fig_conf = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Confidence Distribution', 'Overconfidence Analysis', 'Contradictory Performance', 'Calibration Curves'),
    specs=[[{'secondary_y': False}, {'secondary_y': False}],
           [{'secondary_y': False}, {'secondary_y': False}]]
)

# Confidence distributions
for i, model_name in enumerate(model_names):
    model_data = df_confidence[df_confidence['model'] == model_display_names[model_name]]
    fig_conf.add_trace(
        go.Histogram(x=model_data['confidence'], name=model_display_names[model_name], 
                    opacity=0.7, nbinsx=20),
        row=1, col=1
    )

# Overconfidence comparison
overconf_data = []
for model_name in model_names:
    model_data = df_confidence[df_confidence['model'] == model_display_names[model_name]]
    correct_conf = model_data[model_data['correct']]['confidence'].mean()
    incorrect_conf = model_data[~model_data['correct']]['confidence'].mean()
    overconf_data.append({
        'model': model_display_names[model_name],
        'correct_confidence': correct_conf,
        'incorrect_confidence': incorrect_conf,
        'overconfidence': incorrect_conf - 0.5  # How much above random chance for wrong answers
    })

overconf_df = pd.DataFrame(overconf_data)
fig_conf.add_trace(
    go.Bar(x=overconf_df['model'], y=overconf_df['incorrect_confidence'], 
           name='Wrong Answer Confidence', marker_color='red', opacity=0.7),
    row=1, col=2
)
fig_conf.add_trace(
    go.Bar(x=overconf_df['model'], y=overconf_df['correct_confidence'], 
           name='Correct Answer Confidence', marker_color='green', opacity=0.7),
    row=1, col=2
)

# Contradictory sentiment performance
contradictory_perf = []
for model_name in model_names:
    contradictory_data = df_confidence[
        (df_confidence['model'] == model_display_names[model_name]) & 
        (df_confidence['category'] == 'Contradictory')
    ]
    acc = contradictory_data['correct'].mean() if len(contradictory_data) > 0 else 0
    contradictory_perf.append(acc)

fig_conf.add_trace(
    go.Bar(x=[model_display_names[name] for name in model_names], 
           y=contradictory_perf,
           marker_color=['green' if x > 0.6 else 'orange' if x > 0.4 else 'red' for x in contradictory_perf]),
    row=2, col=1
)

# Simple calibration visualization
for i, model_name in enumerate(model_names):
    model_data = df_confidence[df_confidence['model'] == model_display_names[model_name]]
    # Bin confidences and calculate accuracy per bin
    bins = np.linspace(0, 1, 11)
    bin_centers = (bins[:-1] + bins[1:]) / 2
    bin_accuracies = []
    
    for j in range(len(bins)-1):
        bin_mask = (model_data['confidence'] >= bins[j]) & (model_data['confidence'] < bins[j+1])
        bin_data = model_data[bin_mask]
        bin_acc = bin_data['correct'].mean() if len(bin_data) > 0 else 0
        bin_accuracies.append(bin_acc)
    
    fig_conf.add_trace(
        go.Scatter(x=bin_centers, y=bin_accuracies, 
                  name=model_display_names[model_name], mode='lines+markers'),
        row=2, col=2
    )

# Perfect calibration line
fig_conf.add_trace(
    go.Scatter(x=[0, 1], y=[0, 1], mode='lines', 
              line=dict(dash='dash', color='black'), name='Perfect Calibration'),
    row=2, col=2
)

fig_conf.update_layout(height=800, title_text="Model Confidence and Overconfidence Analysis")
fig_conf.show()

# Print summary
print("📊 OVERCONFIDENCE ANALYSIS SUMMARY:")
print("-" * 50)
for _, row in overconf_df.iterrows():
        status = "⚠️ OVERCONFIDENT" if row['overconfidence'] > 0.2 else "✅ Well-calibrated"
        print(f"{row['model']:15}: {status} (Wrong confidence: {row['incorrect_confidence']:.3f})")


📊 OVERCONFIDENCE ANALYSIS SUMMARY:
--------------------------------------------------
LCF-ATEPC      : ✅ Well-calibrated (Wrong confidence: 0.587)
RoBERTa Base   : ⚠️ OVERCONFIDENT (Wrong confidence: 1.000)
RoBERTa Pseudo : ⚠️ OVERCONFIDENT (Wrong confidence: 0.874)


In [12]:
# Final conclusions
best_overall = max(model_names, key=lambda x: model_accuracy[x])
contradictory_acc = {name: model_by_category[name]['Contradictory'] for name in model_names}
best_contradictory = max(model_names, key=lambda x: contradictory_acc[x])

print("🎯 KEY FINDINGS:")
print("-" * 30)
print(f"Best Overall: {model_display_names[best_overall]} ({model_accuracy[best_overall]:.1%})")
print(f"Best on Contradictory: {model_display_names[best_contradictory]} ({contradictory_acc[best_contradictory]:.1%})")

roberta_models = [name for name in model_names if 'roberta' in name]
if roberta_models:
    roberta_overconf = overconf_df[overconf_df['model'].str.contains('RoBERTa')]['overconfidence'].mean()
    print(f"RoBERTa Overconfidence Issue: {roberta_overconf:.3f} (above 0.2 is concerning)")

🎯 KEY FINDINGS:
------------------------------
Best Overall: LCF-ATEPC (86.7%)
Best on Contradictory: LCF-ATEPC (73.3%)
RoBERTa Overconfidence Issue: 0.437 (above 0.2 is concerning)


In [13]:
# Create final metrics table
metrics_data = []

for model_name in model_names:
    # Calculate metrics
    y_true = [1 if r['expected_sentiment'] == 'positive' else 0 for r in all_results]
    y_pred = [1 if r['predictions'][model_name]['sentiment'] == 'positive' else 0 for r in all_results]
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    
    # Get overconfidence data
    model_overconf = overconf_df[overconf_df['model'] == model_display_names[model_name]]
    overconf_score = model_overconf['overconfidence'].iloc[0] if len(model_overconf) > 0 else 0
    
    metrics_data.append({
        'Model': model_display_names[model_name],
        'Overall_Accuracy': model_accuracy[model_name],
        'Contradictory_Accuracy': model_by_category[model_name]['Contradictory'],
        'Precision': precision,
        'Recall': recall,
        'F1_Score': f1,
        'Overconfidence_Score': overconf_score,
        'Architecture': 'LCF-ATEPC' if model_name == 'lcf_atepc' else 'RoBERTa'
    })

# Create and display table
df_metrics = pd.DataFrame(metrics_data)
print("📊 FINAL RESULTS SUMMARY:")
print(df_metrics.round(3).to_string(index=False))

# Save results
df_metrics.to_csv('../models/model_comparison_results.csv', index=False)
print(f"\n💾 Results saved to model_comparison_results.csv")
print("✅ Analysis completed successfully!")


📊 FINAL RESULTS SUMMARY:
         Model  Overall_Accuracy  Contradictory_Accuracy  Precision  Recall  F1_Score  Overconfidence_Score Architecture
     LCF-ATEPC             0.867                   0.733      1.000   0.733     0.846                 0.087    LCF-ATEPC
  RoBERTa Base             0.700                   0.533      0.750   0.600     0.667                 0.500      RoBERTa
RoBERTa Pseudo             0.733                   0.533      0.733   0.733     0.733                 0.374      RoBERTa

💾 Results saved to model_comparison_results.csv
✅ Analysis completed successfully!
