In [10]:
# Avoid warnings from libraries
import warnings
warnings.filterwarnings('ignore')

# Import libraries
from transformers import AutoProcessor, AutoTokenizer, CLIPModel
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import transformers
from transformers import AdamW, get_linear_schedule_with_warmup
import torch.nn.functional as F
from torch import nn

from PIL import Image

from tqdm import tqdm
import numpy as np
import pandas as pd
import torch
import random
import re
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import csv

In [11]:
parent_dir = # insert here the path of your parent dir
dataset_path = parent_dir + "data/"
models_dir = parent_dir + "models/"
results_csv =  parent_dir + "results.csv"
text_aug_path = parent_dir + "data/text_augmentations/all_data.csv"

In [12]:
model_name =   # insert the name of the model that will be loaded
results_path = models_dir+model_name+"/"

# Multimodal model parameters
MAX_LENGTH = 48
encoding_dimension = 512
dense_hidden_size = 128
pretraining = "openai/clip-vit-base-patch32"

fusion = "cross" # cross or concat

# General training
batch_size = 32
lr = 1.5e-7
n_epochs = 4
n_classes = 3
patience = 3 # Number of epochs to wait before early stopping

# Data augmentation
data_augmentation = True

image_augmentations = transforms.Compose([
    transforms.RandomRotation(degrees=40),
    transforms.RandomAffine(degrees=0, translate=(0.4, 0.4), scale=(0.7, 1.3), shear=0),
    transforms.RandomHorizontalFlip(p=0.0),
    transforms.RandomVerticalFlip(p=0.0),
    transforms.ColorJitter(brightness=0, contrast=0, saturation=0, hue=0),
    transforms.RandomPerspective(distortion_scale=0.5)
])

In [13]:
def get_random_seed_through_os():
    RAND_SIZE = 4
    random_data = os.urandom(
        RAND_SIZE
    )
    random_seed = int.from_bytes(random_data, byteorder="big")
    return random_seed

RANDOM_SEED = 123
# RANDOM_SEED = get_random_seed_through_os()
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)

In [14]:
#Clean text and load the images names (as lists)
def clean_sentence(sentence, norm_user = True, norm_hashtag = True, separete_characters = True):
    # Convert instance to string
    sentence = str(sentence)

    # All text to lowecase
    sentence = sentence.lower()

    # Normalize users and url
    if norm_user == True:
        sentence = re.sub(r'\@\w+','@usuario', sentence)
    if norm_hashtag == True:
        sentence = re.sub(r"http\S+|www\S+|https\S+", 'url', sentence, flags=re.MULTILINE)

    # Separate special characters
    if separete_characters == True:
        sentence = re.sub(r":", " : ", sentence)
        sentence = re.sub(r",", " , ", sentence)
        sentence = re.sub(r"\.", " . ", sentence)
        sentence = re.sub(r"!", " ! ", sentence)
        sentence = re.sub(r"¡", " ¡ ", sentence)
        sentence = re.sub(r"“", " “ ", sentence)
        sentence = re.sub(r"'", " ' ", sentence)
        sentence = re.sub(r"”", " ” ", sentence)
        sentence = re.sub(r"\(", " ( ", sentence)
        sentence = re.sub(r"\)", " ) ", sentence)
        sentence = re.sub(r"\?", " ? ", sentence)
        sentence = re.sub(r"\¿", " ¿ ", sentence)

    # Substituting multiple spaces with single space
    sentence = re.sub(r'\s+', ' ', sentence, flags=re.I)

    return sentence

In [15]:
current_epoch = 1

# Defining the dataset class for loading the images and the associated text
class CustomDataset(Dataset):
    def __init__(self, image_folder, csv_file, mode):
        self.image_folder = image_folder
        self.data = pd.read_csv(csv_file)
        self.mode = mode
        
        # Load text tokenizer and image processor
        self.tokenizer = AutoTokenizer.from_pretrained(pretraining)
        self.image_processor = AutoProcessor.from_pretrained(pretraining)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        global current_epoch
        img_name = self.data.iloc[idx, 0]
        img_path = os.path.join(self.image_folder, img_name+ ".jpg")
        
        image = Image.open(img_path).convert('RGB')
        
        # Data augmenting
        if data_augmentation and self.mode == "train":
            image = image_augmentations(image)

        sentence = clean_sentence(self.data.iloc[idx, 1])

        # Preprocess image according to CLIP
        inputs = self.image_processor(images=image, padding=True, return_tensors="pt")
        image = inputs["pixel_values"].squeeze(0)

        # Tokenize the text
        encoded_dict = self.tokenizer(sentence, padding='max_length', max_length=MAX_LENGTH, truncation=True, return_tensors="pt")
        input_ids = encoded_dict["input_ids"].squeeze(0)
        attention_mask = encoded_dict["attention_mask"].squeeze(0)


        return image, input_ids, attention_mask

In [16]:
class EarlyFusion(nn.Module):
    def __init__(self, encoding_dimension, dense_hidden_size, n_classes, fusion):
        super(EarlyFusion, self).__init__()
        self.clip_model = CLIPModel.from_pretrained(pretraining)

        self.encoding_dimension = encoding_dimension
        self.dense_hidden_size = dense_hidden_size
        self.n_classes = n_classes
        
        self.fusion = fusion
        if self.fusion == "cross":
            self.features_dim = self.encoding_dimension**2
        elif self.fusion == "concat":
            self.features_dim = self.encoding_dimension*2

        self.layer_norm = nn.LayerNorm(self.features_dim)

        # MLP Classifier
        self.classifier = nn.Sequential(
            nn.Linear(self.features_dim, self.dense_hidden_size),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(self.dense_hidden_size, self.dense_hidden_size),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(self.dense_hidden_size, self.n_classes) 
        )

    def forward(self, image, input_ids, attention_mask):
        # Extract image and text features
        text_features = self.clip_model.get_text_features(input_ids=input_ids, attention_mask=attention_mask)
        image_features = self.clip_model.get_image_features(image)
        
        if self.fusion == "cross":
            features = torch.bmm(image_features.unsqueeze(2), text_features.unsqueeze(1))
            features = features.reshape(features.shape[0], -1) 
            norm_output = self.layer_norm(features)
        
        elif self.fusion == "concat":        
            concat_output = torch.cat((text_features, image_features), 1)
            norm_output = self.layer_norm(concat_output)
        
        out = self.classifier(norm_output)

        return out

In [17]:
# Loading images and text form the dataset
# train_dataset = CustomDataset(dataset_path, text_aug_path, mode="train")
# val_dataset = CustomDataset(dataset_path + "val_images", dataset_path + "validation_data_task_2.csv", mode="val")
test_dataset = CustomDataset(dataset_path + "test_images", dataset_path + "test_data.csv", mode="val")

# Defining data loaders
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# Instantiating the multimodal model
multi_model = EarlyFusion(encoding_dimension, dense_hidden_size, n_classes, fusion)

# Train set up
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
multi_model.to(device)
# optimizer = AdamW(multi_model.parameters(), lr)

# Define the number of training steps (epochs * number of batches)
# total_steps = len(train_loader) * n_epochs
# # Create a schedule for the LR update
# scheduler = get_linear_schedule_with_warmup(optimizer,
#                                             num_warmup_steps = 0,
#                                             num_training_steps = total_steps)

# Uncomment for adding class weights
#-------------------------------------
# class_weights = compute_class_weight('balanced', classes=np.unique(train_dataset.data['encoded_labels']), y=train_dataset.data['encoded_labels'])
# print("Class weights: ", class_weights)
# class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)
# criterion = nn.CrossEntropyLoss(weight=class_weights)

# criterion = nn.CrossEntropyLoss()

EarlyFusion(
  (clip_model): CLIPModel(
    (text_model): CLIPTextTransformer(
      (embeddings): CLIPTextEmbeddings(
        (token_embedding): Embedding(49408, 512)
        (position_embedding): Embedding(77, 512)
      )
      (encoder): CLIPEncoder(
        (layers): ModuleList(
          (0-11): 12 x CLIPEncoderLayer(
            (self_attn): CLIPAttention(
              (k_proj): Linear(in_features=512, out_features=512, bias=True)
              (v_proj): Linear(in_features=512, out_features=512, bias=True)
              (q_proj): Linear(in_features=512, out_features=512, bias=True)
              (out_proj): Linear(in_features=512, out_features=512, bias=True)
            )
            (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
            (mlp): CLIPMLP(
              (activation_fn): QuickGELUActivation()
              (fc1): Linear(in_features=512, out_features=2048, bias=True)
              (fc2): Linear(in_features=2048, out_features=512, bias=True

In [18]:
# Evaluación del modelo en el conjunto de prueba
test_predictions = []

# recover model best state
multi_model.load_state_dict(torch.load(results_path + "best_model.pt"))

with torch.no_grad():
      for images, input_ids, attention_masks in test_loader:
        images = images.to(device)
        input_ids = input_ids.to(device)
        attention_masks = attention_masks.to(device)

        outputs = multi_model(images, input_ids, attention_masks)
        _, predicted = torch.max(outputs, 1)

        test_predictions.extend(predicted.cpu().numpy())

print(test_predictions)

# Función para convertir un valor a un vector one-hot
def valor_a_one_hot(valor, num_clases):
    one_hot = np.zeros(num_clases, dtype=int)
    one_hot[valor] = 1
    return one_hot

# Número de clases (en este caso, 3: 0, 1 y 2)
num_clases = 3

# Nombre del archivo CSV de salida
nombre_archivo_csv = str(model_name) + '.csv'

# Abrir el archivo CSV en modo escritura
with open(nombre_archivo_csv, mode='w', newline='') as archivo_csv:
    escritor_csv = csv.writer(archivo_csv)

    # Iterar sobre la lista de valores y escribir cada vector one-hot en el CSV
    for valor in test_predictions:
        vector_one_hot = valor_a_one_hot(valor, num_clases)
        escritor_csv.writerow(vector_one_hot)

print(f'Vectores one-hot escritos en {nombre_archivo_csv}')


[2, 1, 1, 2, 1, 0, 2, 1, 0, 0, 1, 1, 2, 0, 0, 2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 1, 1, 2, 1, 2, 0, 0, 2, 2, 2, 2, 1, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 2, 0, 2, 2, 1, 0, 2, 2, 0, 2, 0, 0, 2, 0, 0, 0, 0, 1, 2, 1, 2, 0, 2, 2, 2, 1, 2, 0, 0, 2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 0, 2, 1, 2, 0, 2, 1, 0, 0, 2, 2, 0, 2, 2, 0, 2, 1, 0, 2, 0, 2, 2, 2, 2, 2, 0, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 0, 0, 1, 2, 0, 2, 2, 2, 2, 0, 2, 2, 1, 2, 1, 2, 2, 2, 1, 2, 2, 0, 2, 2, 2, 1, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 1, 2, 2, 2, 0, 1, 2, 2, 1, 2, 2, 0, 2, 1, 2, 2, 1, 2, 1, 2, 2, 0, 0, 0, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 1, 0, 2, 0, 2, 2, 2, 1, 1, 1, 0, 2, 2, 2, 1, 1, 2, 2, 0, 1, 2, 2, 2, 0, 2, 1, 0, 2, 2, 2, 0, 2, 2, 2, 1, 2, 0, 1, 2, 2, 2, 2, 0, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 1, 2, 0, 0, 2, 2, 2, 1, 2, 0, 1, 2, 2, 0, 2, 0, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 0, 2, 2, 2, 0, 2, 2, 1, 1, 0, 2, 2, 2, 2, 