# FinEmo Training on Google Colab

**Why Colab?** PyTorch on macOS has segfault bugs with BatchNorm/weighted loss. Colab provides free GPU access with no issues.

**Expected Accuracy**: 60-70% (vs 46% on macOS CPU)

In [None]:
# Step 1: Install Dependencies
!pip install torch torchvision xgboost scikit-learn pandas numpy matplotlib seaborn -q

import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# Step 2: Upload Data Files
from google.colab import files
import os

os.makedirs('data/annotated', exist_ok=True)
os.makedirs('data/features', exist_ok=True)
os.makedirs('models/classifiers', exist_ok=True)

print("Please upload: fingpt_annotated_scaled.csv")
uploaded = files.upload()
!mv fingpt_annotated_scaled.csv data/annotated/

print("\nPlease upload: train_features_scaled.npy")
uploaded = files.upload()
!mv train_features_scaled.npy data/features/

print("\n✅ Files uploaded successfully!")

In [None]:
# Step 3: Define Model Architecture
import torch.nn as nn

class ImprovedMLPClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dims=[512, 384, 256, 128], 
                 num_classes=6, dropout=0.4):
        super(ImprovedMLPClassifier, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.BatchNorm1d(hidden_dim))  # Works on GPU!
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            prev_dim = hidden_dim
        
        layers.append(nn.Linear(prev_dim, num_classes))
        self.model = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.model(x)

print("✅ Model class defined")

In [None]:
# Step 4: Load and Prepare Data
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight

print("="*80)
print("LOADING DATA")
print("="*80)

# Load features and labels
X = np.load('data/features/train_features_scaled.npy')
df = pd.read_csv('data/annotated/fingpt_annotated_scaled.csv')
y = df['emotion'].values

print(f"\nDataset: {len(X)} samples, {X.shape[1]} features")

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

print(f"Classes: {list(label_encoder.classes_)}")

# Show distribution
print("\nClass distribution:")
unique, counts = np.unique(y, return_counts=True)
for label, count in zip(unique, counts):
    print(f"  {label:<15} {count:>3} ({count/len(y)*100:.1f}%)")

# Split data
X_train, X_val, y_train, y_val = train_test_split(
    X, y_encoded, test_size=0.2, stratify=y_encoded, random_state=42
)

print(f"\nTrain: {len(X_train)}, Val: {len(X_val)}")

# Compute class weights
class_weights = compute_class_weight(
    'balanced', classes=np.unique(y_train), y=y_train
)

print("\nClass weights (for imbalance handling):")
for i, weight in enumerate(class_weights):
    print(f"  {label_encoder.classes_[i]:<15} {weight:.3f}")

In [None]:
# Step 5: Balance Dataset with SMOTE + Setup Focal Loss
!pip install imbalanced-learn -q

from imblearn.over_sampling import SMOTE
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Apply SMOTE to balance classes
print("\n" + "="*80)
print("BALANCING DATASET WITH SMOTE")
print("="*80)

print("\nOriginal distribution:")
for i, emotion in enumerate(label_encoder.classes_):
    count = (y_train == i).sum()
    print(f"  {emotion:<15} {count} samples")

smote = SMOTE(sampling_strategy='auto', random_state=42, k_neighbors=3)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

print(f"\nOriginal: {len(X_train)} samples")
print(f"After SMOTE: {len(X_train_balanced)} samples")

print("\nBalanced distribution:")
for i, emotion in enumerate(label_encoder.classes_):
    count = (y_train_balanced == i).sum()
    print(f"  {emotion:<15} {count} samples")

# Define Focal Loss (better for imbalanced data)
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        
    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss
        
        if self.alpha is not None:
            alpha_t = self.alpha[targets]
            focal_loss = alpha_t * focal_loss
            
        return focal_loss.mean()

# Recompute class weights for balanced data
from sklearn.utils.class_weight import compute_class_weight
class_weights_balanced = compute_class_weight(
    'balanced', classes=np.unique(y_train_balanced), y=y_train_balanced
)
class_weights_tensor = torch.FloatTensor(class_weights_balanced).to(device)

# Convert balanced data to tensors
X_train_t = torch.FloatTensor(X_train_balanced).to(device)
y_train_t = torch.LongTensor(y_train_balanced).to(device)
X_val_t = torch.FloatTensor(X_val).to(device)
y_val_t = torch.LongTensor(y_val).to(device)

# Create data loader
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Initialize model with REDUCED dropout
model = ImprovedMLPClassifier(
    input_dim=X_train.shape[1],
    hidden_dims=[512, 384, 256, 128],
    num_classes=len(label_encoder.classes_),
    dropout=0.25  # Reduced from 0.4
).to(device)

print("\n" + "="*80)
print("MODEL ARCHITECTURE")
print("="*80)
print(model)
print(f"\nTrainable parameters: {sum(p.numel() for p in model.parameters()):,}")

# Use Focal Loss and better hyperparameters
criterion = FocalLoss(alpha=class_weights_tensor, gamma=2.0)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5
)

print("\n✅ Ready to train with SMOTE + Focal Loss!")

In [None]:
# Step 6: Train Model with Data Augmentation
print("="*80)
print("TRAINING")
print("="*80)

epochs = 150  # Increased from 100
best_val_loss = float('inf')
patience_counter = 0
patience = 20  # Increased from 15

for epoch in range(epochs):
    # Training
    model.train()
    train_loss = 0
    
    for batch_X, batch_y in train_loader:
        # Add Gaussian noise for data augmentation (10% of batches)
        if np.random.rand() < 0.1:
            noise = torch.randn_like(batch_X) * 0.01
            batch_X = batch_X + noise
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    train_loss /= len(train_loader)
    
    # Validation
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_t)
        val_loss = criterion(val_outputs, y_val_t).item()
        _, predicted = torch.max(val_outputs, 1)
        val_acc = (predicted == y_val_t).float().mean().item()
    
    scheduler.step(val_loss)
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {train_loss:.4f}, "
              f"Val Loss: {val_loss:.4f}, "
              f"Val Acc: {val_acc:.4f}")
    
    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_model_state = model.state_dict().copy()
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"\nEarly stopping at epoch {epoch+1}")
            break

# Load best model
model.load_state_dict(best_model_state)
print(f"\n✅ Training complete! Best val loss: {best_val_loss:.4f}")

In [None]:
# Step 7: Evaluate Model
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report

print("="*80)
print("FINAL EVALUATION")
print("="*80)

model.eval()
with torch.no_grad():
    val_outputs = model(X_val_t)
    _, y_pred = torch.max(val_outputs, 1)
    y_pred = y_pred.cpu().numpy()

accuracy = accuracy_score(y_val, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(
    y_val, y_pred, average='macro', zero_division=0
)

print(f"\nOverall Metrics:")
print(f"  Accuracy:  {accuracy:.4f} ({accuracy*100:.1f}%)")
print(f"  Precision: {precision:.4f}")
print(f"  Recall:    {recall:.4f}")
print(f"  F1-Score:  {f1:.4f}")

print("\nPer-Class Metrics:")
print(classification_report(
    y_val, y_pred,
    target_names=label_encoder.classes_,
    zero_division=0
))

# Check if target achieved
target_met = accuracy >= 0.75
print("\n" + "="*80)
print("RESULTS SUMMARY")
print("="*80)
print(f"Target Accuracy: 75-80%")
print(f"Achieved:        {accuracy*100:.1f}%")
print(f"Status:          {'✅ TARGET ACHIEVED!' if target_met else '⚠️ Below target (may need more data)'}")

In [None]:
# Step 8: Save and Download Model
import pickle
from datetime import datetime

model = model.cpu()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = f'models/classifiers/mlp_improved_colab_{timestamp}.pkl'

with open(model_path, 'wb') as f:
    pickle.dump({
        'model': model,
        'label_encoder': label_encoder,
        'accuracy': accuracy,
        'f1_score': f1,
        'trained_on': 'Google Colab GPU',
        'architecture': 'ImprovedMLP [768->512->384->256->128->6]',
        'timestamp': timestamp
    }, f)

print(f"Model saved to: {model_path}")
print("\nDownloading model...")

files.download(model_path)
print("\n✅ Done! Model downloaded to your computer.")