In [1]:
label_map = {
    "negative": 0,
    "neutral": 1,
    "positive": 2
}

In [4]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from transformers import AlbertTokenizer

class MVSADataset(Dataset):
    def __init__(self, root_dir, label_file, tokenizer_name='albert-base-v2', max_length=30, transform=None):
        self.root_dir = root_dir
        self.label_path = os.path.join(root_dir, label_file)
        self.tokenizer = AlbertTokenizer.from_pretrained(tokenizer_name)
        self.max_length = max_length
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

        self.data = []
        with open(self.label_path, 'r', encoding='utf-8') as f:
            next(f)  # skip header
            for line in f:
                line = line.strip()
                if not line:
                    continue
                id_, sentiments = line.split('\t')
                text_sent, image_sent = sentiments.split(',')
                img_path = os.path.join(root_dir, 'data', f'{id_}.jpg')
                txt_path = os.path.join(root_dir, 'data', f'{id_}.txt')

                if os.path.exists(img_path) and os.path.exists(txt_path):
                    with open(txt_path, 'r', encoding='ISO-8859-1') as txt_file:
                        text = txt_file.read().strip()
                    self.data.append({
                        'id': id_,
                        'text': text,
                        'image_path': img_path,
                        'text_label': label_map[text_sent],
                        'image_label': label_map[image_sent],
                    })

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        
        # Load and process image
        image = Image.open(sample['image_path']).convert('RGB')
        image = self.transform(image)

        # Tokenize text
        encoding = self.tokenizer(sample['text'],
                                  padding='max_length',
                                  truncation=True,
                                  max_length=self.max_length,
                                  return_tensors='pt')
        
        input_ids = encoding['input_ids'].squeeze(0)  # shape: (max_length)
        attention_mask = encoding['attention_mask'].squeeze(0)

        # Return data
        return {
            'image': image,  # (3, 224, 224)
            'input_ids': input_ids,  # (30,)
            'attention_mask': attention_mask,  # (30,)
            'text_label': torch.tensor(sample['text_label'], dtype=torch.long),
            'image_label': torch.tensor(sample['image_label'], dtype=torch.long),
        }


In [5]:
# Example usage
root_dir = 'MVSA_Single'  # Change this to your actual path
dataset = MVSADataset(root_dir=root_dir, label_file='label.txt')

# Split into train/test later – for now just get DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Test one batch
batch = next(iter(dataloader))
print(batch['image'].shape)         # [32, 3, 224, 224]
print(batch['input_ids'].shape)     # [32, 30]
print(batch['text_label'].shape)    # [32]

torch.Size([32, 3, 224, 224])
torch.Size([32, 30])
torch.Size([32])


To split the dataset into training and validation sets, we'll follow these steps:

Shuffle the dataset: This ensures that the data is randomly distributed.

Split the data: We can split the dataset, for example, 80% for training and 20% for validation. This is a common practice, but you can adjust the split ratio if needed.

In [7]:
from sklearn.model_selection import train_test_split

# Example usage to split dataset into training and validation sets
def split_dataset(dataset, test_size=0.2, random_seed=42):
    # Create a list of indices for the dataset
    data_size = len(dataset)
    indices = list(range(data_size))
    
    # Shuffle and split the dataset into train and validation
    train_indices, val_indices = train_test_split(indices, test_size=test_size, random_state=random_seed)
    
    # Create DataLoader for training and validation sets
    train_dataset = torch.utils.data.Subset(dataset, train_indices)
    val_dataset = torch.utils.data.Subset(dataset, val_indices)
    
    return train_dataset, val_dataset

# Split the dataset
train_dataset, val_dataset = split_dataset(dataset)

# Create DataLoader for both train and validation sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [8]:
# Print the number of items in the training and validation sets
print(f'Number of training samples: {len(train_dataset)}')
print(f'Number of validation samples: {len(val_dataset)}')


Number of training samples: 3895
Number of validation samples: 974


Step 1: Define the Model Architecture
We will build the model in two parts:

Text Encoder (Transformer-based): We will use a transformer model (like BERT or ALBERT) to process the text data. It will output hidden states that represent the textual information.

Image Encoder (CNN-based): We will use a pre-trained ResNet (like ResNet-50) to process the image data. The CNN will output feature vectors for the image.

Fusion Layer: We will combine the output from both the text and image encoders.

Prediction Layer: Finally, we will pass the combined features through a fully connected layer to predict the sentiment labels.

Let us break down each component:

Text Encoder (using ALBERT):

We will use the AlbertModel from the Hugging Face Transformers library.

Image Encoder (using ResNet):

We can use ResNet50 from PyTorch torchvision.models and remove the final classification layer to get the image features.

Fusion Layer:

We will concatenate the output from both encoders and use a fully connected layer to combine the information.

Final Prediction Layer:

A final fully connected layer to output the predicted sentiment classes.

Here is the code outline to define this architecture:

import torch
import torch.nn as nn
from transformers import AlbertModel, AlbertTokenizer
from torchvision import models

class MultimodalModel(nn.Module):
    def __init__(self, hidden_size=768, num_classes=3):
        super(MultimodalModel, self).__init__()

        # Text Encoder (ALBERT)
        self.text_encoder = AlbertModel.from_pretrained('albert-base-v2')
        
        # Image Encoder (ResNet50)
        resnet = models.resnet50(pretrained=True)
        self.image_encoder = nn.Sequential(*list(resnet.children())[:-1])  # Remove the final classification layer
        
        # Fully connected layers for fusion and prediction
        self.fc1 = nn.Linear(hidden_size + 2048, 1024)  # 2048 is the output size of ResNet50
        self.fc2 = nn.Linear(1024, num_classes)
        
        self.dropout = nn.Dropout(0.5)

    def forward(self, input_ids, attention_mask, images):
        # Process text through ALBERT
        text_features = self.text_encoder(input_ids=input_ids, attention_mask=attention_mask)
        text_embedding = text_features.last_hidden_state.mean(dim=1)  # Taking the mean of token embeddings
        
        # Process image through ResNet50
        image_features = self.image_encoder(images)
        image_features = image_features.view(image_features.size(0), -1)  # Flatten the features

        # Concatenate text and image features
        combined_features = torch.cat((text_embedding, image_features), dim=1)
        
        # Feed the concatenated features through fully connected layers
        x = self.fc1(combined_features)
        x = torch.relu(x)
        x = self.dropout(x)
        output = self.fc2(x)
        
        return output


In [13]:
from transformers import AlbertModel, AlbertTokenizer
import torch

class TextEncoder(nn.Module):
    def __init__(self, model_name='albert-base-v2'):
        super(TextEncoder, self).__init__()
        self.tokenizer = AlbertTokenizer.from_pretrained(model_name)
        self.model = AlbertModel.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state  # (batch_size, seq_len, hidden_size)
        # Take the mean of all tokens' embeddings for each input sentence
        sentence_embedding = torch.mean(last_hidden_state, dim=1)
        return sentence_embedding


In [14]:
import torchvision.models as models
import torch.nn as nn

class ImageEncoder(nn.Module):
    def __init__(self):
        super(ImageEncoder, self).__init__()
        resnet = models.resnet50(pretrained=True)
        # Remove the final classification layer (fc layer)
        self.resnet = nn.Sequential(*list(resnet.children())[:-1])  # ResNet50 without the fc layer

    def forward(self, images):
        features = self.resnet(images)  # (batch_size, 2048, 1, 1)
        features = features.view(features.size(0), -1)  # Flatten to (batch_size, 2048)
        return features


In [15]:
class MultimodalFusionModel(nn.Module):
    def __init__(self, text_encoder, image_encoder, hidden_size=768, num_classes=3):
        super(MultimodalFusionModel, self).__init__()
        self.text_encoder = text_encoder
        self.image_encoder = image_encoder

        # Fully connected layers for fusion and prediction
        self.fc1 = nn.Linear(hidden_size + 2048, 1024)  # 2048 is the size of ResNet features
        self.fc2 = nn.Linear(1024, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, input_ids, attention_mask, images):
        # Process text through the text encoder
        text_features = self.text_encoder(input_ids, attention_mask)
        
        # Process image through the image encoder
        image_features = self.image_encoder(images)
        
        # Concatenate text and image features
        combined_features = torch.cat((text_features, image_features), dim=1)
        
        # Feed the combined features through fully connected layers
        x = self.fc1(combined_features)
        x = torch.relu(x)
        x = self.dropout(x)
        output = self.fc2(x)  # Final output (classification layer)
        
        return output


In [16]:
import torch.optim as optim
import torch.nn as nn

# Define the model
text_encoder = TextEncoder(model_name='albert-base-v2')
image_encoder = ImageEncoder()

model = MultimodalFusionModel(text_encoder=text_encoder, image_encoder=image_encoder, hidden_size=768, num_classes=3)

# Loss function (Cross Entropy)
criterion = nn.CrossEntropyLoss()

# Optimizer (Adam)
optimizer = optim.Adam(model.parameters(), lr=1e-5)


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/47.4M [00:00<?, ?B/s]



Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to C:\Users\SARTHAK KHANDELWAL/.cache\torch\hub\checkpoints\resnet50-0676ba61.pth


100%|█████████████████████████████████████████████████████████████████████████████| 97.8M/97.8M [00:18<00:00, 5.55MB/s]


In [24]:
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for batch_idx, batch in enumerate(train_loader):
        # Get inputs and labels
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        images = batch['image'].to(device)  # Use 'image' instead of 'images'
        text_labels = batch['text_label'].to(device)
        image_labels = batch['image_label'].to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids, attention_mask, images)
        
        # Calculate the loss
        loss = criterion(outputs, text_labels)  # You may adjust this depending on your task

        # Backward pass
        loss.backward()
        optimizer.step()

        # Update running loss
        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == text_labels).sum().item()  # Assuming text sentiment is the target
        total_predictions += text_labels.size(0)

        # Optionally print loss and accuracy at intervals
        if (batch_idx + 1) % 10 == 0:  # Print every 10 batches
            print(f"Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_predictions * 100
    return epoch_loss, epoch_accuracy


In [27]:
def validate_model(model, val_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():  # Disable gradient calculation during validation
        for batch_idx, batch in enumerate(val_loader):
            # Get inputs and labels
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            images = batch['image'].to(device)  # Use 'image' instead of 'images'
            text_labels = batch['text_label'].to(device)
            image_labels = batch['image_label'].to(device)

            # Forward pass
            outputs = model(input_ids, attention_mask, images)

            # Calculate the loss
            loss = criterion(outputs, text_labels)  # You may adjust this depending on your task

            # Update running loss
            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == text_labels).sum().item()  # Assuming text sentiment is the target
            total_predictions += text_labels.size(0)

    epoch_loss = running_loss / len(val_loader)
    epoch_accuracy = correct_predictions / total_predictions * 100
    return epoch_loss, epoch_accuracy


In [28]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 10  # Adjust as needed

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    
    # Training phase
    train_loss, train_accuracy = train_model(model, train_loader, criterion, optimizer, device)
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
    
    # Validation phase
    val_loss, val_accuracy = validate_model(model, val_loader, criterion, device)
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    print("-" * 50)

Epoch 1/10
Batch [10/122], Loss: 0.9939
Batch [20/122], Loss: 1.1470
Batch [30/122], Loss: 1.0683
Batch [40/122], Loss: 0.8694
Batch [50/122], Loss: 0.9890
Batch [60/122], Loss: 0.9181
Batch [70/122], Loss: 0.7898
Batch [80/122], Loss: 0.8475


KeyboardInterrupt: 