<a href="https://colab.research.google.com/github/itsmeeeeeee/MML/blob/main/MMFA_ensemble.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Ensemble Method on Multimodal Fusion with Self-Attention (MMFA) for Multimodal Sentiment Analysis

Group: 5

In [None]:
! pip install torch torchvision
! pip install transformers pandas numpy

In [None]:
from torchvision import models, transforms
from PIL import Image, ImageFile

import os
import numpy as np
import torch
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import train_test_split

#import torch.nn.functional as F

In [None]:
#Mount Google Drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
# Import necessary libraries

import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import train_test_split


**Data extraction is made with Resnet and Bert by Features_extraction file:** https://colab.research.google.com/drive/144qkv0HiAqRXuwlfGNi9M-DxNGmBffYV?usp=drive_link

### **Read the data**

In [None]:
import pandas as pd
import numpy as np
import torch

# Load labels from CSV
file_path_labels = "/content/drive/MyDrive/MultimodalNLP/projekt/data/labels.csv"
labeled_data = pd.read_csv(file_path_labels)
labels = labeled_data['overall_sentiment'].values

# Define your existing mapping
label_mapping = {
    "very_negative": 0,
    "negative": 0,
    "positive": 1,
    "very_positive": 1
}

# Get valid indices for labels that exist in the mapping
valid_indices = [i for i, label in enumerate(labels) if label in label_mapping]
filtered_labels = [labels[i] for i in valid_indices]

# Apply the mapping to convert filtered text labels to numeric labels
numeric_labels = np.array([label_mapping[label] for label in filtered_labels])

# Convert labels to a torch tensor
numeric_labels = torch.tensor(numeric_labels, dtype=torch.long)
print("numeric labels:", numeric_labels.shape)

# Load image features
file_path_im = "/content/drive/MyDrive/MultimodalNLP/projekt/features_data/image_features_restnet.npy"
image_features = np.load(file_path_im)

# Load text features
file_path_emb = "/content/drive/MyDrive/MultimodalNLP/projekt/features_data/text_features_bert.npy"
text_features = np.load(file_path_emb)

# Filter image and text features using valid indices
filtered_image_features = torch.tensor(image_features[valid_indices], dtype=torch.float32)
filtered_text_features = torch.tensor(text_features[valid_indices], dtype=torch.float32)

print("Filtered Image features:", filtered_image_features.shape)
print("Filtered Text features:", filtered_text_features.shape)


numeric labels: torch.Size([4791])
Filtered Image features: torch.Size([4791, 1000])
Filtered Text features: torch.Size([4791, 768])


### **Split the data**

In [None]:
from sklearn.model_selection import train_test_split


train_text_bert, test_text_bert, train_image_restnet, test_image_restnet, train_labels, test_labels = train_test_split(filtered_text_features,
    filtered_image_features, numeric_labels, test_size=0.2, random_state=42)  #20%



In [None]:
len(train_text_bert)

3832

In [None]:
from torch.utils.data import DataLoader, TensorDataset


In [None]:

# Convert in Tendors


train_dataset_bert_restnet = TensorDataset(train_text_bert, train_image_restnet, train_labels)

test_dataset_bert_restnet = TensorDataset(test_text_bert, test_image_restnet, test_labels)

test_loader = DataLoader(test_dataset_bert_restnet, batch_size=64, shuffle=False)




## **Build the Models**

####**Self-Attention Block**

In [None]:
"""
Implementing a self-attention module based on the concept explained in:
https://medium.com/@wangdk93/implement-self-attention-and-cross-attention-in-pytorch-1f1a366c9d4b#d075

"""
class SelfAttention(nn.Module):
    def __init__(self, feature_dim, dropout=0.1):
        super(SelfAttention, self).__init__()

        # Scaling factor to normalize the dot products
        self.scale = 1.0 / (feature_dim ** 0.5)

       # Linear transformations for the query, key, and value vectors
        self.query = nn.Linear(feature_dim, feature_dim)
        self.key = nn.Linear(feature_dim, feature_dim)
        self.value = nn.Linear(feature_dim, feature_dim)
        #dropout
        #self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Generate query, key, value tensors
        queries = self.query(x.unsqueeze(1))
        keys = self.key(x.unsqueeze(1))
        values = self.value(x.unsqueeze(1))

        # Calculate the attention scores and apply softmax
        scores = torch.bmm(queries, keys.transpose(1, 2)) * self.scale
        attention_weights = F.softmax(scores, dim=-1)
        # dropout
        #attention_weights = self.dropout(attention_weights)

        # Multiply the attention weights by the values
        weighted = torch.bmm(attention_weights, values)
        return weighted.squeeze(1)

### **LSTM**

In [None]:

class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # For bidirectional LSTM

    def forward(self, x):
        # Initialize hidden and cell states
        # Dimensions: (num_layers * num_directions, batch_size, hidden_size)
        batch_size = x.size(0)  # Get the current batch size
        num_directions = 2 if self.lstm.bidirectional else 1
        h0 = torch.zeros(self.lstm.num_layers * num_directions, batch_size, self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(self.lstm.num_layers * num_directions, batch_size, self.lstm.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # Use the last timestep
        return out

class MultimodalFusionLSTMClassifier(nn.Module):
    def __init__(self, text_dim, image_dim, hidden_dim, lstm_hidden_dim, num_classes):
        super(MultimodalFusionLSTMClassifier, self).__init__()
        self.text_model = nn.Linear(text_dim, hidden_dim)
        self.image_model = nn.Linear(image_dim, hidden_dim)

        # Self-attention module for refining text and image features by focusing on important elements.
        self.text_attention = SelfAttention(hidden_dim)
        self.image_attention = SelfAttention(hidden_dim)

        # LSTM Classifier
        #self.classifier = LSTMClassifier(hidden_dim, lstm_hidden_dim, num_layers=2, num_classes=num_classes)
        self.classifier = LSTMClassifier(hidden_dim * 2, lstm_hidden_dim, num_layers=2, num_classes=num_classes)




    def forward(self, text_features, image_features):
        # Process text features and give attention
        text_features = self.text_model(text_features)
        text_features = self.text_attention(text_features)

        # Process image features and give attention
        image_features = self.image_model(image_features)
        image_features = self.image_attention(image_features)

        # Combine text and image features
        combined_features = torch.cat([text_features, image_features], dim=1)
        combined_features = combined_features.unsqueeze(1)

        # Apply dropout
        #combined_features = self.dropout(combined_features)

        # Classification
        output = self.classifier(combined_features)
        return output

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold

def evaluate_model(model, loader, device):
    """Evaluate the model on given data loader and return the average loss."""
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for text_features, image_features, labels in loader:
            text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
            outputs = model(text_features, image_features)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(loader)

def train_and_save_best_models_lstm(train_dataset, num_folds=5, num_epochs=10):
    kf = KFold(n_splits=num_folds)
    best_models = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        dev_loader = DataLoader(val_subset, batch_size=64, shuffle=True)

        model = MultimodalFusionLSTMClassifier(text_dim=768, image_dim=1000, hidden_dim=128, lstm_hidden_dim=256, num_classes=2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        best_val_loss = float('inf')
        best_model_state = None

        for epoch in range(num_epochs):
            model.train()
            train_total_loss=0
            for text_features, image_features, labels in train_loader:
                text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(text_features, image_features)
                loss = nn.CrossEntropyLoss()(outputs, labels)
                loss.backward()
                optimizer.step()
                train_total_loss += loss.item()

            # Calculate average training loss for the epoch
            avg_train_loss = train_total_loss / len(train_loader)

            # Evaluate on the development set
            dev_loss = evaluate_model(model, dev_loader, device)
            if dev_loss < best_val_loss:
              best_val_loss = dev_loss
              best_model_state = model


            print(f"Fold {fold}, Epoch {epoch + 1}: Training Loss: {avg_train_loss:.4f}, Validation Loss: {dev_loss:.4f}")
            best_models.append(best_model_state)
            torch.save(best_model_state, f'best_model_fold_{fold}.pth')
            #print(f"Fold {fold}: Best Validation Loss: {best_val_loss:.4f}")

    return best_models


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_models_lstm = train_and_save_best_models_lstm(train_dataset_bert_restnet,num_folds=5, num_epochs=10)


Fold 0, Epoch 1: Training Loss: 0.4125, Validation Loss: 0.4080
Fold 0, Epoch 2: Training Loss: 0.3984, Validation Loss: 0.4087
Fold 0, Epoch 3: Training Loss: 0.3976, Validation Loss: 0.4142
Fold 0, Epoch 4: Training Loss: 0.3965, Validation Loss: 0.4171
Fold 0, Epoch 5: Training Loss: 0.3947, Validation Loss: 0.4442
Fold 0, Epoch 6: Training Loss: 0.3954, Validation Loss: 0.4281
Fold 0, Epoch 7: Training Loss: 0.3968, Validation Loss: 0.4146
Fold 0, Epoch 8: Training Loss: 0.3988, Validation Loss: 0.4180
Fold 0, Epoch 9: Training Loss: 0.3934, Validation Loss: 0.4162
Fold 0, Epoch 10: Training Loss: 0.3926, Validation Loss: 0.4162
Fold 1, Epoch 1: Training Loss: 0.4213, Validation Loss: 0.3822
Fold 1, Epoch 2: Training Loss: 0.4056, Validation Loss: 0.3831
Fold 1, Epoch 3: Training Loss: 0.4039, Validation Loss: 0.3860
Fold 1, Epoch 4: Training Loss: 0.4031, Validation Loss: 0.3822
Fold 1, Epoch 5: Training Loss: 0.4025, Validation Loss: 0.3842
Fold 1, Epoch 6: Training Loss: 0.4046,

###**RNN**

In [None]:

class Classifier_RNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes):
        super(Classifier_RNN, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # Adjusting for bidirectional RNN

    def forward(self, x):
        # Initialize the hidden state
        # Dimensions: (num_layers * num_directions, batch_size, hidden_size)
        batch_size = x.size(0)  # Get the current batch size
        num_directions = 2 if self.rnn.bidirectional else 1
        h0 = torch.zeros(self.rnn.num_layers * num_directions, batch_size, self.rnn.hidden_size).to(x.device)

        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # Use the last timestep of output
        return out

class MultimodalFusionRNNClassifier(nn.Module):
    def __init__(self, text_dim, image_dim, hidden_dim, rnn_hidden_dim, num_classes):
        super(MultimodalFusionRNNClassifier, self).__init__()
        self.text_model = nn.Linear(text_dim, hidden_dim)
        self.image_model = nn.Linear(image_dim, hidden_dim)

        # Self-attention module for refining text and image features by focusing on important elements.
        self.text_attention = SelfAttention(hidden_dim)
        self.image_attention = SelfAttention(hidden_dim)

        # RNN Classifier
        self.classifier = Classifier_RNN(hidden_dim * 2, hidden_dim, num_layers=2, num_classes=num_classes)

        #self.dropout = nn.Dropout(0.5)

    def forward(self, text_features, image_features):
        text_features = self.text_model(text_features)  # [batch_size, hidden_dim]
        image_features = self.image_model(image_features.squeeze(1))  # [batch_size, hidden_dim]


        # Process image features and give attention
        text_features = self.text_attention(text_features)
        image_features = self.image_attention(image_features)

        # Combine text and image features
        combined_features = torch.cat([text_features, image_features], dim=1)
        combined_features = combined_features.unsqueeze(1)

        # Apply dropout
        #combined_features = self.dropout(combined_features)

        # Classification
        output = self.classifier(combined_features)
        return output


In [None]:
import copy
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold

def evaluate_model(model, loader, device):
    """Evaluate the model on given data loader and return the average loss."""
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for text_features, image_features, labels in loader:
            text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
            outputs = model(text_features, image_features)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(loader)

def train_and_save_best_models_rnn(train_dataset, num_folds=5, num_epochs=10):
    kf = KFold(n_splits=num_folds)
    best_models = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        dev_loader = DataLoader(val_subset, batch_size=64, shuffle=True)

        model = MultimodalFusionRNNClassifier(text_dim=768, image_dim=1000, hidden_dim=128, rnn_hidden_dim=128, num_classes=2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        best_val_loss = float('inf')
        best_model_state = None

        for epoch in range(num_epochs):
            model.train()
            train_total_loss=0
            for text_features, image_features, labels in train_loader:
                text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(text_features, image_features)
                loss = nn.CrossEntropyLoss()(outputs, labels)
                loss.backward()
                optimizer.step()
                train_total_loss+=loss.item()
            # Calculate average training loss for the epoch
            avg_train_loss = train_total_loss / len(train_loader)

            # Evaluate on the development set
            dev_loss = evaluate_model(model, dev_loader, device)
            if dev_loss < best_val_loss:
                best_val_loss = dev_loss
                best_model_state = copy.deepcopy(model)

            print(f"Fold {fold}, Epoch {epoch + 1}: Training Loss: {avg_train_loss:.4f}, Validation Loss: {dev_loss:.4f}")

            best_models.append(best_model_state)
            torch.save(best_model_state, f'best_model_fold_{fold}.pth')
            #print(f"Fold {fold}: Best Validation Loss: {best_val_loss:.4f}")

    return best_models


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_models_rrn = train_and_save_best_models_rnn(train_dataset_bert_restnet,num_folds=5, num_epochs=10)

Fold 0, Epoch 1: Training Loss: 0.4212, Validation Loss: 0.4291
Fold 0, Epoch 2: Training Loss: 0.3990, Validation Loss: 0.4122
Fold 0, Epoch 3: Training Loss: 0.3984, Validation Loss: 0.4099
Fold 0, Epoch 4: Training Loss: 0.3972, Validation Loss: 0.4195
Fold 0, Epoch 5: Training Loss: 0.4023, Validation Loss: 0.4145
Fold 0, Epoch 6: Training Loss: 0.4012, Validation Loss: 0.4195
Fold 0, Epoch 7: Training Loss: 0.3969, Validation Loss: 0.4215
Fold 0, Epoch 8: Training Loss: 0.3979, Validation Loss: 0.4114
Fold 0, Epoch 9: Training Loss: 0.3968, Validation Loss: 0.4145
Fold 0, Epoch 10: Training Loss: 0.3967, Validation Loss: 0.4173
Fold 1, Epoch 1: Training Loss: 0.4253, Validation Loss: 0.3837
Fold 1, Epoch 2: Training Loss: 0.4041, Validation Loss: 0.3909
Fold 1, Epoch 3: Training Loss: 0.4112, Validation Loss: 0.3863
Fold 1, Epoch 4: Training Loss: 0.4069, Validation Loss: 0.3914
Fold 1, Epoch 5: Training Loss: 0.4030, Validation Loss: 0.3842
Fold 1, Epoch 6: Training Loss: 0.3984,

###**CNN**

In [None]:
class CNNClassifier(nn.Module):
    def __init__(self, input_channels, num_channels, num_classes, hidden_dim):
        super(CNNClassifier, self).__init__()
        kernel_size = 3
        padding = 1
        stride = 1
        pool_kernel_size = 2
        pool_stride = 2

        # Calculate the size after each layer
        size_after_conv = (hidden_dim + 2 * padding - (kernel_size - 1) - 1) // stride + 1
        size_after_pool = size_after_conv // pool_stride

        size_after_conv2 = (size_after_pool + 2 * padding - (kernel_size - 1) - 1) // stride + 1
        final_size = size_after_conv2 // pool_stride

        self.conv1 = nn.Conv1d(input_channels, num_channels, kernel_size, padding=padding)
        self.conv2 = nn.Conv1d(num_channels, num_channels * 2, kernel_size, padding=padding)
        self.pool = nn.MaxPool1d(pool_kernel_size, stride=pool_stride)

        # Fully connected layer input size calculation
        self.fc_input_size = num_channels * 2 * final_size
        self.fc = nn.Linear(self.fc_input_size, num_classes)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


class MultimodalFusionCNNClassifier(nn.Module):
    def __init__(self, text_dim, image_dim, hidden_dim, num_classes):
        super(MultimodalFusionCNNClassifier, self).__init__()
        self.text_model = nn.Linear(text_dim, hidden_dim)
        self.image_model = nn.Linear(image_dim, hidden_dim)
        self.text_attention = SelfAttention(hidden_dim)
        self.image_attention = SelfAttention(hidden_dim)
        self.classifier = CNNClassifier(2, hidden_dim * 2, num_classes, hidden_dim)  # Pass hidden_dim
        self.hidden_dim = hidden_dim  # Ensure hidden_dim is stored as an instance variable

    def forward(self, text_features, image_features):
        text_features = self.text_model(text_features)
        image_features = self.image_model(image_features)
        text_features = self.text_attention(text_features)
        image_features = self.image_attention(image_features)
        combined_features = torch.cat([text_features, image_features], dim=1)
        combined_features = combined_features.view(-1, 2, self.hidden_dim)  # Use self.hidden_dim correctly
        output = self.classifier(combined_features)
        return output




In [None]:
import copy
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold

def evaluate_model(model, loader, device):
    """Evaluate the model on given data loader and return the average loss."""
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for text_features, image_features, labels in loader:
            text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
            outputs = model(text_features, image_features)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(loader)

def train_and_save_best_models_cnn(train_dataset, num_folds=5, num_epochs=10):
    kf = KFold(n_splits=num_folds)
    best_models = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        dev_loader = DataLoader(val_subset, batch_size=64, shuffle=True)

        model = MultimodalFusionCNNClassifier(text_dim=768, image_dim=1000, hidden_dim=128, num_classes=2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        best_val_loss = float('inf')
        best_model_state = None

        for epoch in range(num_epochs):
            model.train()
            total_train_loss = 0
            for text_features, image_features, labels in train_loader:
                text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(text_features, image_features)
                loss = nn.CrossEntropyLoss()(outputs, labels)
                loss.backward()
                optimizer.step()
                total_train_loss += loss.item()

            # Calculate average training loss for the epoch
            avg_train_loss = total_train_loss / len(train_loader)


            # Evaluate on the development set
            dev_loss = evaluate_model(model, dev_loader, device)
            if dev_loss < best_val_loss:
                best_val_loss = dev_loss
                best_model_state = copy.deepcopy(model)

            print(f"Fold {fold}, Epoch {epoch + 1}: Training Loss: {avg_train_loss:.4f}, Validation Loss: {dev_loss:.4f}")



            best_models.append(best_model_state)
            torch.save(best_model_state, f'best_model_fold_{fold}.pth')
            print(f"Fold {fold}: Best Validation Loss: {best_val_loss:.4f}")

    return best_models


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_models_crn = train_and_save_best_models_cnn(train_dataset_bert_restnet,num_folds=5, num_epochs=10)

Fold 0, Epoch 1: Training Loss: 0.4492, Validation Loss: 0.4279
Fold 0: Best Validation Loss: 0.4279
Fold 0, Epoch 2: Training Loss: 0.3979, Validation Loss: 0.4096
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 3: Training Loss: 0.4041, Validation Loss: 0.4172
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 4: Training Loss: 0.3939, Validation Loss: 0.4159
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 5: Training Loss: 0.3927, Validation Loss: 0.4175
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 6: Training Loss: 0.3981, Validation Loss: 0.4240
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 7: Training Loss: 0.3794, Validation Loss: 0.4178
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 8: Training Loss: 0.3725, Validation Loss: 0.4382
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 9: Training Loss: 0.3732, Validation Loss: 0.4223
Fold 0: Best Validation Loss: 0.4096
Fold 0, Epoch 10: Training Loss: 0.3741, Validation Loss: 0.4363
Fold 0: Best Validation Lo

### **Simple model with softmax**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleFusionClassifier(nn.Module):
    def __init__(self, text_dim, image_dim, hidden_dim, num_classes):
        super(SimpleFusionClassifier, self).__init__()
        self.text_model = nn.Linear(text_dim, hidden_dim)
        self.image_model = nn.Linear(image_dim, hidden_dim)
        self.text_attention = SelfAttention(hidden_dim)
        self.image_attention = SelfAttention(hidden_dim)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, text_features, image_features):
        # Verarbeiten der Textfeatures
        text_features = self.text_model(text_features)
        text_features = self.text_attention(text_features)

        # Verarbeiten der Bildfeatures
        image_features = self.image_model(image_features)
        image_features = self.image_attention(image_features)

        # Kombinieren der Features
        combined_features = torch.cat([text_features, image_features], dim=1)
        output = self.fc(combined_features)
        return output


In [None]:

def train_and_evaluate(train_dataset, num_folds=5, num_epochs=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    kf = KFold(n_splits=num_folds)
    best_models = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        dev_loader = DataLoader(val_subset, batch_size=64, shuffle=True)

        model = SimpleFusionClassifier(text_dim=768, image_dim=1000, hidden_dim=128, num_classes=2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        best_val_loss = float('inf')

        for epoch in range(num_epochs):
            model.train()
            total_train_loss=0
            for text_features, image_features, labels in train_loader:
                text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(text_features, image_features)
                loss = nn.CrossEntropyLoss()(outputs, labels)
                loss.backward()
                optimizer.step()
                total_train_loss+=loss.item()

            avg_train_loss=total_train_loss/len(train_loader)

            dev_loss = evaluate_model(model, dev_loader, device)
            if dev_loss < best_val_loss:
                best_val_loss = dev_loss
                best_model_state = model
            print(f"Fold {fold}, Epoch {epoch + 1}: Training Loss: {avg_train_loss:.4f}, Validation Loss: {dev_loss:.4f}")



            best_models.append(best_model_state)
            torch.save(best_model_state, f'best_model_fold_{fold}.pth')
            #print(f"Fold {fold}: Best Validation Loss: {best_val_loss:.4f}")

    return best_models

def evaluate_model(model, loader, device):
    model.eval()
    total_loss = 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for text_features, image_features, labels in loader:
            text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
            outputs = model(text_features, image_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(loader)


simpel_model=train_and_evaluate(train_dataset_bert_restnet,num_folds=5, num_epochs=10)






Fold 0, Epoch 1: Training Loss: 0.4799, Validation Loss: 0.4119
Fold 0, Epoch 2: Training Loss: 0.4008, Validation Loss: 0.4159
Fold 0, Epoch 3: Training Loss: 0.4000, Validation Loss: 0.4281
Fold 0, Epoch 4: Training Loss: 0.3957, Validation Loss: 0.4200
Fold 0, Epoch 5: Training Loss: 0.3981, Validation Loss: 0.4213
Fold 0, Epoch 6: Training Loss: 0.3899, Validation Loss: 0.4175
Fold 0, Epoch 7: Training Loss: 0.3877, Validation Loss: 0.4213
Fold 0, Epoch 8: Training Loss: 0.3860, Validation Loss: 0.4456
Fold 0, Epoch 9: Training Loss: 0.4083, Validation Loss: 0.4115
Fold 0, Epoch 10: Training Loss: 0.3822, Validation Loss: 0.4232
Fold 1, Epoch 1: Training Loss: 0.4440, Validation Loss: 0.3887
Fold 1, Epoch 2: Training Loss: 0.4077, Validation Loss: 0.4047
Fold 1, Epoch 3: Training Loss: 0.3984, Validation Loss: 0.3973
Fold 1, Epoch 4: Training Loss: 0.3999, Validation Loss: 0.4458
Fold 1, Epoch 5: Training Loss: 0.4071, Validation Loss: 0.3883
Fold 1, Epoch 6: Training Loss: 0.3967,

### **Multi Layer Perzeptron**


In [None]:
class MLPClassifier(nn.Module):
    def __init__(self, text_dim, image_dim, hidden_dim, num_classes):
        super(MLPClassifier, self).__init__()
        self.text_model = nn.Linear(text_dim, hidden_dim)
        self.image_model = nn.Linear(image_dim, hidden_dim)
        self.text_attention = SelfAttention(hidden_dim)
        self.image_attention = SelfAttention(hidden_dim)
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim * 4)  # first MLP-layer
        self.fc2 = nn.Linear(hidden_dim * 4, hidden_dim * 2)  # secobd MLP-Layer
        self.fc3 = nn.Linear(hidden_dim * 2, num_classes)     # third-layer

    def forward(self, text_features, image_features):
        text_features = self.text_model(text_features)
        text_features = self.text_attention(text_features)
        image_features = self.image_model(image_features)
        image_features = self.image_attention(image_features)
        combined_features = torch.cat([text_features, image_features], dim=1)

        combined_features = F.relu(self.fc1(combined_features))
        combined_features = F.relu(self.fc2(combined_features))
        output = self.fc3(combined_features)
        return output

In [None]:


def train_and_evaluate(train_dataset, num_folds=5, num_epochs=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    kf = KFold(n_splits=num_folds)
    best_models = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
        train_subset = Subset(train_dataset, train_idx)
        val_subset = Subset(train_dataset, val_idx)
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        dev_loader = DataLoader(val_subset, batch_size=64, shuffle=True)

        model = MLPClassifier(text_dim=768, image_dim=1000, hidden_dim=128, num_classes=2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        best_val_loss = float('inf')

        for epoch in range(num_epochs):
            model.train()
            total_train_loss=0
            for text_features, image_features, labels in train_loader:
                text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(text_features, image_features)
                loss = nn.CrossEntropyLoss()(outputs, labels)
                loss.backward()
                optimizer.step()
                total_train_loss+=loss.item()

            avg_train_loss=total_train_loss / len(train_loader)

            dev_loss = evaluate_model(model, dev_loader, device)
            if dev_loss < best_val_loss:
                best_val_loss = dev_loss
                best_model_state = copy.deepcopy(model)

            print(f"Fold {fold}, Epoch {epoch + 1}: Training Loss: {avg_train_loss:.4f}, Validation Loss: {dev_loss:.4f}")

            best_models.append(best_model_state)
            torch.save(best_model_state, f'best_model_fold_{fold}.pth')
            #print(f"Fold {fold}: Best Validation Loss: {best_val_loss:.4f}")

    return best_models

def evaluate_model(model, loader, device):
    model.eval()
    total_loss = 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for text_features, image_features, labels in loader:
            text_features, image_features, labels = text_features.to(device), image_features.to(device), labels.to(device)
            outputs = model(text_features, image_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

    return total_loss / len(loader)


best_models_mlp=train_and_evaluate(train_dataset_bert_restnet,num_folds=5, num_epochs=10)

Fold 0, Epoch 1: Training Loss: 0.4293, Validation Loss: 0.4218
Fold 0, Epoch 2: Training Loss: 0.4009, Validation Loss: 0.4291
Fold 0, Epoch 3: Training Loss: 0.4012, Validation Loss: 0.4205
Fold 0, Epoch 4: Training Loss: 0.3985, Validation Loss: 0.4187
Fold 0, Epoch 5: Training Loss: 0.3923, Validation Loss: 0.4394
Fold 0, Epoch 6: Training Loss: 0.3849, Validation Loss: 0.4211
Fold 0, Epoch 7: Training Loss: 0.3890, Validation Loss: 0.4323
Fold 0, Epoch 8: Training Loss: 0.3774, Validation Loss: 0.4188
Fold 0, Epoch 9: Training Loss: 0.3793, Validation Loss: 0.4230
Fold 0, Epoch 10: Training Loss: 0.3814, Validation Loss: 0.4312
Fold 1, Epoch 1: Training Loss: 0.4255, Validation Loss: 0.3904
Fold 1, Epoch 2: Training Loss: 0.4090, Validation Loss: 0.3857
Fold 1, Epoch 3: Training Loss: 0.4095, Validation Loss: 0.3889
Fold 1, Epoch 4: Training Loss: 0.3986, Validation Loss: 0.3931
Fold 1, Epoch 5: Training Loss: 0.4004, Validation Loss: 0.3989
Fold 1, Epoch 6: Training Loss: 0.3875,

## Logits and Majority voting

In [None]:


import torch

def get_logits_from_models(models, test_loader):
    all_model_logits = []  # Diese Liste speichert die Logits von jedem Modell

    for model in models:
        model.eval()  # Setzt das Modell in den Evaluierungsmodus
        model_logits = []  # Eine Liste, um Logits für das aktuelle Modell zu speichern

        with torch.no_grad():  # Deaktiviert die Gradientenberechnung
            for text_feature, image_feature, _ in test_loader:
                # Erhalte die Logits für die aktuellen Features
                logits = model(text_feature, image_feature)

                model_logits.append(logits)  # Füge die Logits zur Liste hinzu



        model_logits_tensor = torch.cat(model_logits , dim=0)
        all_model_logits.append(model_logits_tensor)

    return all_model_logits  # Gibt eine Liste von Tensoren zurück, jeweils ein Tensor pro Modell




def get_label_from_logits(logits):
    """Convert averaged logits to labels."""
    return torch.argmax(logits, dim=1)

def average_logits(logits_list):
    """Average logits across different models."""
    stacked_logits = torch.stack(logits_list, dim=0)
    return torch.mean(stacked_logits, dim=0)

def majority_vote(labels_list):
    """Perform a majority vote across different architectures."""
    # Assuming labels_list is a list of tensors, one per architecture
    labels_array = torch.stack(labels_list, dim=0)
    labels_mode, _ = torch.mode(labels_array, dim=0)  # Get the mode along the first dim
    return labels_mode


def calculate_accuracy(predictions, labels):
    correct = (predictions == labels).sum().item()
    total = len(labels)
    accuracy = correct / total
    return accuracy



## Ensemble evaluation

In [None]:

models=[best_models_crn,best_models_rrn,simpel_model,best_models_mlp,best_models_lstm]
#models=[best_models_crn,best_models_rr]

models=[best_models_lstm,best_models_crn]

all_logits=[]

for model in models:
  all_logits.append(get_logits_from_models(model, test_loader))



final_labels_for_each_arch=[]

for logits_list in all_logits:
    means=average_logits(logits_list)
    get_label=get_label_from_logits(means)
    #print(get_labels)
    final_labels_for_each_arch.append(get_label)

final_vote_labels = majority_vote(final_labels_for_each_arch)

print(calculate_accuracy(final_vote_labels,test_labels))

0.8873826903023984


In [None]:
from sklearn.metrics import f1_score

score = f1_score(test_labels, final_vote_labels, average='macro')
print("macro F1 Score: ", score)

macro F1 Score:  0.47016574585635357
