In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn

from transformers import BertTokenizer, DistilBertModel, VisualBertModel

import pandas as pd
import numpy as np
import ast, os

from sklearn.metrics import mean_squared_error, mean_absolute_error, classification_report
from sklearn.model_selection import train_test_split

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
detector = fasterrcnn_resnet50_fpn(pretrained=True)
batch_size = 4
device = torch.device('cuda:1')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def get_features(image_list):
    feature_extractor = nn.Sequential(*list(detector.backbone.children())[:-2])
    visual_embeddings = []
    for image in image_list:
        visual_embedding = feature_extractor(image)
        visual_embeddings.append(visual_embedding)
    visual_embeddings = torch.stack(visual_embeddings)
    return visual_embeddings

def map_values(ratings, tags):
    for i, tag in enumerate(tags):
        if tag == 'concrete':
            ratings[i] = 0
        elif tag == 'middle':
            ratings[i] = 1
        else:
            ratings[i] = 2
    return ratings

class Multimodal_Dataset(Dataset):
    def __init__(self, words_file, image_file, tokenizer, regression=False):
        self.words_file = words_file
        self.images = image_file
        self.tokenizer = tokenizer

        self.data = pd.read_csv(words_file)
        self.words = self.data['word'].to_list()
        self.encodings = self.tokenizer([word for word in self.words], add_special_tokens=True, padding='max_length', max_length = 12 ,return_tensors='pt')

        self.photos = self.data['photos'].apply(ast.literal_eval)
        self.labels = self.data['tag'].to_list()
        if regression == True:
            self.ratings = self.data['rating'].to_list()
        else:
            ratings = self.data['rating'].to_list()
            self.ratings = map_values(ratings, self.labels)

        transform_list = [
            transforms.Grayscale(1),
            transforms.Resize((32, 168)),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ]
        self.transform = transforms.Compose(transform_list)

    def __len__(self):
        return len(self.words)

    def __getitem__(self, idx):
        images = self.photos[idx]
        imgs = []
        for image in images:
          img_path = os.path.join('images/', image)
          img = Image.open(img_path)
          img = self.transform(img)
          imgs.append(img)

        # #padding list of images, without this the dataloader results in errors
        while len(imgs) < 12:
            imgs.append(torch.zeros_like(imgs[0]))

        embeddings = get_features(imgs)
        #imgs = torch.stack(imgs)
        #print(f"Word: {self.words[idx]}, Number of images: {len(images)}")

        item = {'word': self.words[idx], 'input_ids': self.encodings['input_ids'][idx], 'attn_mask': self.encodings['attention_mask'][idx], 'token_type_ids': self.encodings['token_type_ids'],'visual_embeddings': embeddings, 'rating': self.ratings[idx],'label': self.labels[idx]}
        return item

In [3]:
data = Multimodal_Dataset('merged_data.csv', 'images', tokenizer=tokenizer)
data[5]

{'word': 'rack',
 'input_ids': tensor([  101, 14513,   102,     0,     0,     0,     0,     0,     0,     0,
             0,     0]),
 'attn_mask': tensor([1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         ...,
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0]]),
 'visual_embeddings': tensor([[[[-0.3647, -0.3412, -0.3333,  ..., -0.2078, -0.1843, -0.2000],
           [-0.3255, -0.3098, -0.3020,  ..., -0.1373, -0.1216, -0.1529],
           [-0.2941, -0.2784, -0.2549,  ..., -0.1294, -0.0510, -0.0431],
           ...,
           [-0.7412, -0.7333, -0.7098,  ..., -0.5059, -0.5059, -0.5373],
           [-0.7098, -0.7020, -0.6706,  ..., -0.5294, -0.4745, -0.4667],
           [-0.6235, -0.6549, -0.6549,  ..., -0.5373, -0.5137, -0.5843]]],
 
 
         [[[-0.8275, -0.8588, -0.9059,  ..., -0.4431, -0.3961, -0.4353],
        

In [4]:
dataloader = DataLoader(data, batch_size=batch_size, shuffle=True)

In [5]:
#for i, batch in enumerate(dataloader):
 #   print(batch['visual_embeddings'].size())
  #  break

**Textual BERT**

In [6]:
class TEXTUAL_BERT(nn.Module):
  def __init__(self, num_of_labels):
      super(TEXTUAL_BERT, self).__init__()
      self.bert = DistilBertModel.from_pretrained("distilbert-base-uncased")
      self.classifier = nn.Linear(self.bert.config.hidden_size, num_of_labels)

  def forward(self, input_ids, attention_mask):
      outputs = self.bert(input_ids, attention_mask)
      predictions = self.classifier(outputs.last_hidden_state[:, 0, :])

      return predictions

In [7]:
loss_fn = nn.CrossEntropyLoss()

In [8]:
model = TEXTUAL_BERT(3).to(device)

model.eval()
total_loss = 0
predictions = []
gold_labels = []
misclassifications = []

with torch.no_grad():
    for batch in dataloader:
        input_ids = torch.Tensor(batch['input_ids']).long().to(device)
        attn_masks = torch.Tensor(batch['attn_mask']).long().to(device)
        gold_label = batch['rating'].to(device)
    
        outputs = model(input_ids, attn_masks)
        
        gold_labels.extend(gold_label.cpu().numpy())

        loss = loss_fn(outputs, gold_label)
        total_loss += loss.item()

        _, predicted_labels = torch.max(outputs, dim=1)
        
        predictions.extend(predicted_labels.cpu().numpy())

        for i in range(len(predicted_labels)):
            if predicted_labels[i] != gold_label[i]:
                misclassification = f"{batch['word'][i]} predicted as {predicted_labels[i]} instead of {gold_label[i]}"
                misclassifications.append(misclassification)
    
average_loss = total_loss / len(dataloader)
print(f'Average Loss: {average_loss}')


Average Loss: 1.1138059186935425


In [9]:
report = classification_report(gold_labels, predictions)

print("Classification Report:")
print(report)

Classification Report:
              precision    recall  f1-score   support

           0       0.32      0.93      0.48       100
           1       0.30      0.03      0.05       100
           2       0.00      0.00      0.00       100

    accuracy                           0.32       300
   macro avg       0.21      0.32      0.18       300
weighted avg       0.21      0.32      0.18       300



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [10]:
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [11]:
train_dataloader = DataLoader(train_data, batch_size=8, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=8, shuffle=False)

In [12]:
model = TEXTUAL_BERT(3).to(device)
epochs = 30
model.train()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(epochs):
    total_loss = 0
    for i, batch in enumerate(train_dataloader):
        input_ids = torch.Tensor(batch['input_ids']).long().to(device)
        attn_masks = torch.Tensor(batch['attn_mask']).long().to(device)
        gold_label = batch['rating'].to(device)
    
        outputs = model(input_ids, attn_masks)

        loss = loss_fn(outputs, gold_label)
        total_loss += loss.item()

        print("epoch:",epoch, "loss:", total_loss/(i+1), end='\r')

        loss.backward()

        optimizer.step()

        optimizer.zero_grad()


epoch: 29 loss: 1.1049219648043314

In [29]:
total_loss = 0
predictions = []
gold_labels = []

model.eval()
with torch.no_grad():
    for batch in test_dataloader:
        input_ids = torch.Tensor(batch['input_ids']).long().to(device)
        attn_masks = torch.Tensor(batch['attn_mask']).long().to(device)
        gold_label = batch['rating'].to(device)
    
        outputs = model(input_ids, attn_masks)

        gold_labels.extend(gold_label.cpu().numpy())

        loss = loss_fn(outputs, gold_label.unsqueeze(1))
        total_loss += loss.item()

        _, predicted_labels = torch.max(outputs, dim=1)
        #print(gold_label.size(), predicted_labels.size())
        predictions.extend(predicted_labels.cpu().numpy())

average_loss = total_loss / len(test_dataloader)
print(f'Average Loss: {average_loss}')

Average Loss: 1.7636477500200272


In [20]:
report = classification_report(gold_labels, predictions)

print("Classification Report:")
print(report)

Classification Report:
              precision    recall  f1-score   support

           0       0.37      1.00      0.54        22
           1       0.00      0.00      0.00        16
           2       0.00      0.00      0.00        22

    accuracy                           0.37        60
   macro avg       0.12      0.33      0.18        60
weighted avg       0.13      0.37      0.20        60



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [21]:
regression_data = Multimodal_Dataset('merged_data.csv', 'images', tokenizer=tokenizer, regression=True)
regression_dataloader = DataLoader(regression_data, batch_size=batch_size, shuffle=True)

In [22]:
model = TEXTUAL_BERT(1).to(device)
total_loss = 0
predictions = []
gold_labels = []
loss_fn = nn.MSELoss()

model.eval()
with torch.no_grad():
    for batch in regression_dataloader:
        input_ids = torch.Tensor(batch['input_ids']).long().to(device)
        attn_masks = torch.Tensor(batch['attn_mask']).long().to(device)
        gold_label = batch['rating'].to(device)
    
        outputs = model(input_ids, attn_masks)
        
        gold_labels.extend(gold_label.cpu().numpy())

        loss = loss_fn(outputs, gold_label.unsqueeze(1))
        total_loss += loss.item()

        _, predicted_labels = torch.max(outputs, dim=1)
        #print(gold_label.size(), predicted_labels.size())
        predictions.extend(predicted_labels.cpu().numpy())

average_loss = total_loss / len(regression_dataloader)
print(f'Average Loss: {average_loss}')

Average Loss: 11.16761583803292


In [23]:
from scipy.stats import pearsonr

correlation_coefficient, p_value = pearsonr(gold_labels, predictions)
#The model sometimes predicts only one value and as a result the pearson correlation cannot be computed
print(f"Pearson Correlation Coefficient: {correlation_coefficient}")
print(f"P-value: {p_value}")

Pearson Correlation Coefficient: nan
P-value: nan




In [25]:
mse = mean_squared_error(gold_labels, predictions, squared=False)
mae = mean_absolute_error(gold_labels, predictions)

print(f"MSE: {mse.item():.4f}")
print(f"MAE: {mae.item():.4f}")

MSE: 3.2834
MAE: 3.0588


**Visual BERT**

In [26]:
class VISUAL_BERT(nn.Module):
    def __init__(self):
        super(VISUAL_BERT, self).__init__()
        self.visual_bert = VisualBertModel.from_pretrained("uclanlp/visualbert-vqa-coco-pre")
        #self.classifier = nn.Linear(self.visual_bert.config.hidden_size, 3)

    def forward(self, input_ids, attn_masks, token_type_ids, visual_embeddings):
        
        visual_token_type_ids = torch.ones(visual_embeddings.shape[:-1], dtype=torch.long).to(device)
        visual_attention_mask = torch.ones(visual_embeddings.shape[:-1], dtype=torch.float).to(device)
        print('visual attn mask', visual_attention_mask.squeeze(2).size())
        #print('attn mask', attn_masks.size())
        outputs = self.visual_bert(input_ids=input_ids, attention_mask=attn_masks, token_type_ids=token_type_ids, visual_embeds=visual_embeddings, visual_attention_mask=visual_attention_mask.squeeze(2), visual_token_type_ids=visual_token_type_ids)
        #predictions = self.classifier(outputs.last_hidden_state[:, 0, :])

        return outputs

In [28]:
language_and_vision_model = VISUAL_BERT().to(device)

language_and_vision_model.eval()
total_loss = 0
predictions = []
gold_labels = []

with torch.no_grad():
    for batch in dataloader:
        input_ids = torch.Tensor(batch['input_ids']).long().to(device)
        #print(input_ids.size())
        attn_masks = torch.Tensor(batch['attn_mask']).long().to(device)
        token_type_ids = torch.mean(batch['token_type_ids'].float(), dim=1).to(device)
        token_type_ids = token_type_ids.long()
        #print(token_type_ids.size())
        visual_embeddings = batch['visual_embeddings'].to(device)
        print(visual_embeddings.size())
        gold_label = batch['rating'].to(device)
        #print(gold_label.size())
    
        outputs = language_and_vision_model(input_ids, attn_masks.unsqueeze(2), token_type_ids, visual_embeddings)
            
        predictions.extend(outputs.cpu().numpy())
        gold_labels.extend(gold_label.cpu().numpy())

        loss = loss_fn(outputs, gold_label)
        total_loss += loss.item()

average_loss = total_loss / len(dataloader)
print(f'Average Loss: {average_loss}')

torch.Size([4, 12, 1, 32, 168])
visual attn mask torch.Size([4, 12, 32])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1536x168 and 2048x768)

In [None]:
rmse = mean_squared_error(gold_labels, predictions, squared=False)
mae = mean_absolute_error(gold_labels, predictions)

In [None]:
print(f"RMSE: {rmse.item():.4f}")
print(f"MAE: {mae.item():.4f}")