<a href="https://colab.research.google.com/github/visionbyangelic/EmotiWave/blob/main/text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#go emotion preprocessing


In [None]:
!pip install neattext


In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
import re
import neattext.functions as nfx
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from torch.cuda.amp import GradScaler, autocast
import os


In [None]:
# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)



In [None]:
# Verify GPU availability for future steps
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Using device: {device}")

In [None]:
#load the dataset
df = pd.read_csv("/content/drive/MyDrive/emotion_dataset_raw.csv")
print('dataset preview:')
print(df.head())
print('dataset shape:')
print(df.head())

In [None]:
#remove shame and disgust
df = df[df['Emotion'] != 'disgust']
df = df[df['Emotion'] != 'shame']

In [None]:
#filter emotions to match fer2013
emotion_map = {
    'anger': 'anger',
    'fear': 'fear',
    'joy': 'happy',
    'sadness': 'sad',
    'surprise': 'surprise',
    'neutral': 'neutral'
}
df = df[df['Emotion'].isin(emotion_map.keys())]
df['Emotion'] = df['Emotion'].map(emotion_map)
print("\nEmotions after filtering:", df['Emotion'].unique())

In [None]:
#clean text
!pip install neattext

import neattext.functions as nfx

def clean_text(text):
    text = str(text).lower()
    text = re.sub(r'http\S+|www\S+', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

df['Clean_Text'] = df['Text'].apply(nfx.remove_userhandles).apply(clean_text)
print("\nSample cleaned text:")
print(df['Clean_Text'].head())

In [None]:
#check data integrity
print("\nRows with missing text:", df['Clean_Text'].isnull().sum())
print("Rows with missing emotion labels:", df['Emotion'].isnull().sum())
print("Rows with text length < 1:", (df['Clean_Text'].str.len() < 1).sum())
df = df.dropna(subset=['Clean_Text', 'Emotion'])
df = df[df['Clean_Text'].str.len() >= 1]


In [None]:
#visualize emotion distribution
plt.figure(figsize=(8, 5))
order = df['Emotion'].value_counts().index
sns.countplot(x='Emotion', data=df, order=order)
plt.xticks(rotation=45)
plt.title("Emotion Class Distribution (Full Dataset)")
plt.xlabel("Emotion")
plt.ylabel("Number of Samples")
plt.show()


In [None]:
#Split data into train (80%), validation (10%), test (10%)
X = df['Clean_Text']
y = df['Emotion']
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

In [None]:
# Create dataframes
train_df = pd.DataFrame({'Text': X_train, 'Emotion': y_train}).reset_index(drop=True)
val_df = pd.DataFrame({'Text': X_val, 'Emotion': y_val}).reset_index(drop=True)
test_df = pd.DataFrame({'Text': X_test, 'Emotion': y_test}).reset_index(drop=True)

In [None]:
#sanity check
print(df['Emotion'].unique())
print(df['Emotion'].value_counts())

In [None]:
#Balance training set (undersample 'happy' to 6,720)
emotion_counts = train_df['Emotion'].value_counts()
print("\nOriginal training emotion distribution:\n", emotion_counts)
if emotion_counts.get('happy', 0) > 6720:
    happy_df = train_df[train_df['Emotion'] == 'happy'].sample(n=6720, random_state=42)
    other_df = train_df[train_df['Emotion'] != 'happy']
    train_df = pd.concat([happy_df, other_df]).sample(frac=1, random_state=42).reset_index(drop=True)
print("\nBalanced training emotion distribution:\n", train_df['Emotion'].value_counts())


In [None]:
#neutral dropped which could harm the model performance



In [None]:
# Plot balanced training distribution
order = df['Emotion'].value_counts().index

plt.figure(figsize=(8, 5))
sns.countplot(x='Emotion', data=train_df, order=order)
plt.xticks(rotation=45)
plt.title("Balanced Training Emotion Distribution")
plt.xlabel("Emotion")
plt.ylabel("Number of Samples")
plt.show()

In [None]:
#Map emotions to numeric labels
label_map = {
    'anger': 0,
    'fear': 1,
    'happy': 2,
    'sad': 3,
    'surprise': 4,
    'neutral': 5
}
train_df['Emotion'] = train_df['Emotion'].map(label_map)
val_df['Emotion'] = val_df['Emotion'].map(label_map)
test_df['Emotion'] = test_df['Emotion'].map(label_map)
print("\nUnique emotion labels after mapping:", sorted(train_df['Emotion'].unique()))


In [None]:
#Save preprocessed data
train_df.to_csv('/content/goemotions_train.csv', index=False)
val_df.to_csv('/content/goemotions_val.csv', index=False)
test_df.to_csv('/content/goemotions_test.csv', index=False)

# Confirm split sizes
print(f"\nTraining samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Test samples: {len(test_df)}")

# Download CSVs
from google.colab import files
files.download('/content/goemotions_train.csv')
files.download('/content/goemotions_val.csv')
files.download('/content/goemotions_test.csv')

In [None]:
# Save CSVs to Google Drive
train_df.to_csv('/content/drive/MyDrive/goemotions_train.csv', index=False)
val_df.to_csv('/content/drive/MyDrive/goemotions_val.csv', index=False)
test_df.to_csv('/content/drive/MyDrive/goemotions_test.csv', index=False)

#training

using focal loss and class weight to habdle the imbalance especially in neutral in comparison to happy.

also using pytorch dataset and dataloader which aims to oprepare the preprocessed data for bert based text classification miroring fer2013

In [None]:
!pip install transformers torch

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from torch.cuda.amp import GradScaler, autocast
import os

In [None]:
#set random seed for reproductibility
torch.manual_seed(42)
np.random.seed(42)

# Verify GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Using device: {device}")

In [None]:
# Load preprocessed data
train_df = pd.read_csv('/content/goemotions_train.csv')
val_df = pd.read_csv('/content/goemotions_val.csv')
test_df = pd.read_csv('/content/goemotions_test.csv')


In [None]:
# Verify training distribution for class weights
print("Training emotion distribution:")
print(train_df['Emotion'].value_counts())

In [None]:
# Define custom Dataset
class GoEmotionsDataset(Dataset):
    def __init__(self, df, tokenizer, max_length=128):
        self.df = df
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = str(self.df['Text'].iloc[idx])
        label = int(self.df['Emotion'].iloc[idx])

        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'labels': torch.tensor(label, dtype=torch.long)
        }



In [None]:
# Initialize tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
# Create datasets
train_dataset = GoEmotionsDataset(train_df, tokenizer, max_length=128)
val_dataset = GoEmotionsDataset(val_df, tokenizer, max_length=128)
test_dataset = GoEmotionsDataset(test_df, tokenizer, max_length=128)

# Create DataLoaders
batch_size = 32  # For Tesla T4
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)
test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

In [None]:
# Calculate class weights for focal loss
class_counts = train_df['Emotion'].value_counts().sort_index()  # [anger, fear, happy, sad, surprise, neutral]
total_samples = len(train_df)
class_weights = torch.tensor([total_samples / (6 * count) for count in class_counts], dtype=torch.float).to(device)
print("\nClass weights for focal loss:", class_weights)

# Verify DataLoader
def verify_dataloader(loader, name):
    print(f"\n{name} DataLoader:")
    print(f"Number of batches: {len(loader)}")
    print(f"Total samples: {len(loader.dataset)}")
    for batch in loader:
        print("Sample batch shapes:")
        print(f"  input_ids: {batch['input_ids'].shape}")
        print(f"  attention_mask: {batch['attention_mask'].shape}")
        print(f"  labels: {batch['labels'].shape}")
        sample_text = tokenizer.decode(batch['input_ids'][0], skip_special_tokens=True)
        sample_label = batch['labels'][0].item()
        print(f"Sample text: {sample_text}")
        print(f"Sample label: {sample_label}")
        break

verify_dataloader(train_loader, "Training")
verify_dataloader(val_loader, "Validation")
verify_dataloader(test_loader, "Test")

#Training

In [None]:
#define focal loss
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha  # Class weights
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = (self.alpha[targets] * (1 - pt) ** self.gamma * ce_loss)

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        return focal_loss

In [None]:
#initialize model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=6).to(device)

In [None]:
# Training setup
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
scaler = GradScaler()  # For mixed precision
num_epochs = 3  # Adjustable
best_val_loss = float('inf')
checkpoint_dir = '/content/drive/MyDrive/checkpoints/'
os.makedirs(checkpoint_dir, exist_ok=True)


In [None]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        with autocast():
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = FocalLoss(alpha=class_weights, gamma=2.0)(outputs.logits, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}")

    # Validation
    model.eval()
    val_loss = 0
    val_preds, val_labels = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            with autocast():
                outputs = model(input_ids, attention_mask=attention_mask)
                loss = FocalLoss(alpha=class_weights, gamma=2.0)(outputs.logits, labels)

            val_loss += loss.item()
            val_preds.extend(torch.argmax(outputs.logits, dim=1).cpu().numpy())
            val_labels.extend(labels.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}")

    # Save checkpoint
    checkpoint_path = f"{checkpoint_dir}/epoch_{epoch+1}.pth"
    torch.save(model.state_dict(), checkpoint_path)
    print(f"Saved checkpoint: {checkpoint_path}")

    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), f"{checkpoint_dir}/best_model.pth")
        print(f"Saved best model: {checkpoint_dir}/best_model.pth")

#evaluation


In [None]:
# Test evaluation
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        test_preds.extend(torch.argmax(outputs.logits, dim=1).cpu().numpy())
        test_labels.extend(labels.cpu().numpy())

test_accuracy = accuracy_score(test_labels, test_preds)
print(f"\nTest Accuracy: {test_accuracy:.4f}")


In [None]:
# Confusion matrix
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_map.keys(), yticklabels=label_map.keys())
plt.title("Test Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

#saving

In [None]:
# Generate and save confusion matrix
try:
    cm = confusion_matrix(test_labels, test_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_map.keys(), yticklabels=label_map.keys())
    plt.title("Test Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    # Ensure directory exists and save with single slash
    os.makedirs(checkpoint_dir, exist_ok=True)
    plt.savefig(f"{checkpoint_dir}/confusion_matrix.png", dpi=300)
    print(f"Saved confusion matrix to {checkpoint_dir}/confusion_matrix.png")
    plt.show()
except Exception as e:
    print(f"Error generating confusion matrix: {e}")

# Save model and results
torch.save(model.state_dict(), f"{checkpoint_dir}/final_model.pth")
print(f"Saved final model to {checkpoint_dir}/final_model.pth")

# Save training history
training_history = {
    'epoch': list(range(1, num_epochs + 1)),
    'train_loss': [0.1066, 0.0616, 0.0433],  # From your output
    'val_loss': [0.5346, 0.5891, 0.6356],
    'val_accuracy': [0.7308, 0.7359, 0.7332]
}
history_df = pd.DataFrame(training_history)
history_df.to_csv(f"{checkpoint_dir}/training_history.csv", index=False)
print(f"Saved training history to {checkpoint_dir}/training_history.csv")

# Download files to your PC (for VSCode)
from google.colab import files
try:
    files.download(f"{checkpoint_dir}/final_model.pth")
    #files.download(f"{checkpoint_dir}/confusion_matrix.png")
    #files.download(f"{checkpoint_dir}/training_history.csv")
    print("Files downloaded successfully.")
except FileNotFoundError as e:
    print(f"Download error: {e}. Attempting to zip and download checkpoints directory.")
    !zip -r checkpoints.zip /content/drive/MyDrive/checkpoints/
    files.download('checkpoints.zip')
    print("Downloaded checkpoints.zip containing all files.")