In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader
from tqdm import tqdm  # Import tqdm for progress bar
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class EmotionDataset(data.Dataset):
    def __init__(self, root_dir, challenge, sequence_length=10, window_size=10):
        self.root_dir = root_dir
        self.challenge = challenge
        self.sequence_length = sequence_length
        self.window_size = window_size
        self.data = []
        self.labels = []
        
        # Load data
        for file in tqdm(os.listdir(root_dir), desc=f"Loading {challenge} dataset"):
            if file.endswith(".txt"):
                file_path = os.path.join(root_dir, file)
                try:
                    features = np.loadtxt(file_path, delimiter=",", dtype=np.float32)
                except ValueError as e:
                    print(f"Error loading {file}: {e}")
                    continue
                
                if features.size == 0:
                    continue
                
                if len(features.shape) == 1:
                    features = features.reshape(1, -1)
                
                if self.challenge == 'VA':
                    labels = features[:, -2:]
                elif self.challenge == 'EXPR':
                    labels = features[:, -1].astype(int)
                elif self.challenge == 'AU':
                    labels = features[:, -12:]
                else:
                    raise ValueError("Invalid challenge type")
                
                # Filtering conditions
                filtered_features = []
                filtered_labels = []
                for i, label in enumerate(labels):
                    if self.challenge == 'VA' and (-5 in label):
                        continue
                    if self.challenge == 'EXPR' and label == -1:
                        continue
                    if self.challenge == 'AU' and (-1 in label):
                        continue
                    if self.challenge == 'EXPR':
                        filtered_features.append(features[i, :-1])
                        filtered_labels.append(label)
                    else:
                        filtered_features.append(features[i, :-len(label)])
                        filtered_labels.append(label)
                if filtered_features:
                    self.data.append(np.array(filtered_features))
                    self.labels.append(np.array(filtered_labels))
        
        

        # For EXPR challenge, convert labels to one-hot encoding
        if self.challenge == 'EXPR':
            self.labels = [np.eye(8)[label] for label in self.labels]

        # Convert the lists to numpy arrays after filtering
        self.data = np.vstack(self.data) if self.data else np.array([])
        self.labels = np.vstack(self.labels) if self.labels else np.array([])
        
        # Create sequences with temporal coherence using window size and sequence length
        self.sequences = []
        self.sequence_labels = []
        
        for i in range(0, len(self.data) - self.sequence_length + 1, self.window_size):
            sequence_data = self.data[i:i+self.sequence_length]
            sequence_label = self.labels[i:i+self.sequence_length]  # Capture labels for all frames in sequence
            self.sequences.append(sequence_data)
            self.sequence_labels.append(sequence_label)
        
        self.sequences = np.array(self.sequences)
        self.sequence_labels = np.array(self.sequence_labels)
        
    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return (torch.tensor(self.sequences[idx], dtype=torch.float32),
                torch.tensor(self.sequence_labels[idx], dtype=torch.float32))

In [2]:
challenge = "EXPR"  # Emotion classification

# Load dataset and create data loaders
train_set = EmotionDataset(f'Features_CLIP/{challenge}/training_set_features', challenge, sequence_length=100, window_size=20)
val_set = EmotionDataset(f'Features_CLIP/{challenge}/validation_set_features', challenge, sequence_length=100,window_size=100)
    
train_loader = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_set, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

In [3]:
emotion_descriptions = {
    "Neutral": "A face showing neutrality.",
    "Anger": "A face showing anger.",
    "Disgust": "A face showing disgust.",
    "Fear": "A face showing fear.",
    "Happiness": "A face showing happiness.",
    "Sadness": "A face showing sadness.",
    "Surprise": "A face showing surprise.",
    "Other": "A face showing an undefined emotion."
}


import open_clip
model, _, preprocess = open_clip.create_model_and_transforms("ViT-B-16", pretrained="openai")
tokenizer = open_clip.get_tokenizer("ViT-B-16")
model.to(device).eval()

# Tokenize text descriptions in smaller chunks
emotion_texts = list(emotion_descriptions.values())

# Process each text separately to avoid memory issues
text_features = []
with torch.no_grad():
    for text in emotion_texts:
        tokenized = tokenizer([text]).to(device)  # Tokenize one at a time
        feature = model.encode_text(tokenized)
        feature /= feature.norm(dim=-1, keepdim=True)  # Normalize
        text_features.append(feature.cpu())  # Move to CPU to free up GPU memory

text_features = torch.vstack(text_features).to(device)  # Move back to GPU after processing

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

class CLIP_E_LSTM(nn.Module):
    def __init__(self, input_dim=512, hidden_dim=512, num_layers=1):
        super(CLIP_E_LSTM, self).__init__()
        
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 512)  # Project LSTM output to feature space
    
    def forward(self, img_features):
        # LSTM expects (batch_size, sequence_length, feature_dim)
        lstm_out, _ = self.lstm(img_features)  # Output shape: (batch, seq_len, hidden_dim)
        return self.fc(lstm_out)  # Take last LSTM output (seq-to-single)

class ContrastiveLoss(torch.nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature

    def forward(self, caption_embeddings, image_embeddings):
        """
        Compute contrastive loss based on normalized embeddings.
        
        Args:
            caption_embeddings (torch.Tensor): (N, D) tensor with text embeddings.
            image_embeddings (torch.Tensor): (N, D) tensor with image embeddings.
        
        Returns:
            torch.Tensor: Scalar contrastive loss.
        """
        # Normalize embeddings
        #print(np.shape(caption_embeddings))
        image_embeddings = F.normalize(image_embeddings, p=2, dim=-1)
        
        caption_embeddings = F.normalize(caption_embeddings, p=2, dim=-1)
        #print(np.shape(caption_embeddings))
        # Compute cosine similarities (logits)
        logits = (caption_embeddings @ image_embeddings.T) / self.temperature
        
        # Compute similarities within images and within captions
        
        images_similarity = (image_embeddings @ image_embeddings.T)
        captions_similarity = (caption_embeddings @ caption_embeddings.T)

        # Compute targets as the softmax of the mean similarity
        targets = F.softmax((captions_similarity + images_similarity) / (2 * self.temperature), dim=-1)

        # Compute loss for captions and images
        captions_loss = F.cross_entropy(logits, targets, reduction="mean")
        images_loss = F.cross_entropy(logits.T, targets.T, reduction="mean")

        # Return the mean loss
        return (captions_loss + images_loss) / 2

def train_clip_e(model, train_loader, text_features, optimizer, device, epochs=10):
    model.train()
    contrastive_loss_fn = ContrastiveLoss().to(device)  # Instantiate the loss function
    for epoch in range(epochs):
        total_loss = 0
        for img_features, labels in train_loader:
         
            img_features = img_features.to(device)  # Shape: (batch, seq_len, 512)
            labels = labels.argmax(dim=2).to(device)  # Convert one-hot to class indices
            
            optimizer.zero_grad()
            img_features = model(img_features)  # Process through LSTM
            
            batch_text_features = text_features[labels].to(device)
            # Joining dimensions 0 and 1
            batch_text_features = batch_text_features.view(-1, batch_text_features.size(2))  
            img_features = img_features.view(-1, img_features.size(2))
            
           
            loss = contrastive_loss_fn(batch_text_features, img_features)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader):.4f}")
        evaluate_clip_e(clip_e_model, val_loader, text_features, device)

# Inference Function (Sequence-based)
def infer_clip_e(model, img_features, text_features):
    with torch.no_grad():
        
        img_features = model(img_features)  
        
            # Normalize image features
        # Normalize image features (batch_size, seq_len, feature_dim)
        img_features = F.normalize(img_features, p=2, dim=-1)  # Normalize image features
        
        # Normalize text features (8, feature_dim)
        text_features = F.normalize(text_features, p=2, dim=-1)  # Normalize text features
        
        # Reshape image features by joining the first two dimensions (batch_size * seq_len, feature_dim)
        img_features_reshaped = img_features.view(-1, img_features.size(-1))  # (batch_size * seq_len, feature_dim)

        # Compute similarity (batch_size * seq_len, 8) using batch-wise matrix multiplication
        probs = torch.einsum('qd,td->qt', img_features_reshaped, text_features)  # (batch_size * seq_len, 8)


        probs = F.softmax(probs, dim=-1)  # (batch_size * seq_len, 8)

        
        
        
        # Find the most similar text feature index for each image feature
        final_pred = torch.argmax(probs, dim=-1)  # (batch_size, seq_len)
     
    

        
        return final_pred
from sklearn.metrics import classification_report
# Evaluation Function (F1-Score Tracking)
def evaluate_clip_e(model, data_loader, text_features, device, best_f1=0.0, save_path="FIXED_EXPR_CLIP_temp_new.pt"):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for img_features, labels in data_loader:
            img_features = img_features.to(device)
            labels = labels.argmax(dim=2).to(device)
            #img_features = img_features.view(-1, img_features.size(2))

            preds = infer_clip_e(model, img_features, text_features)

            # Join dimensions 0 and 1 to create a 1D tensor
            labels = labels.view(-1)  # Shape will be (64 * 8,)
  
            
            preds = preds.view(-1) 
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    report = classification_report(all_labels, all_preds, output_dict=True)
    
    for emotion, metrics in report.items():
        if isinstance(metrics, dict):
            print(f"Emotion {emotion}: F1-Score {metrics['f1-score']:.4f}")
    
    macro_f1 = report["macro avg"]["f1-score"]
    print(f"Overall Macro F1-score: {macro_f1:.4f}")

    #plt.hist(all_preds,bins=20, color='blue', edgecolor='black')
    #plt.show()

    #plt.hist(all_labels,bins=20, color='blue', edgecolor='black')
    #plt.show()

    # Save model if it's the best so far
    
    if macro_f1 > 0.33:
        best_f1 = macro_f1
        print([macro_f1,best_f1])
        # Save model state_dict
        torch.save(model.state_dict(), save_path)

        print(f"New best model saved with Macro F1-score: {macro_f1:.4f}")

    model.train()
    
    return macro_f1, best_f1
# 🔹 Initialize and Train
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_e_model = CLIP_E_LSTM().to(device)
optimizer = optim.Adam(clip_e_model.parameters(), lr=1e-5)

train_clip_e(clip_e_model, train_loader, text_features, optimizer, device, epochs=20)