In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from transformers import AutoTokenizer, AutoModelForMaskedLM
from transformers import RobertaModel, RobertaTokenizer
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import numpy as np
import json
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
train_json_path = './data/annotations_v2/semeval2024_dev_release/subtask2a/train.json'
train_images_folder_path = './data/train_images'
device = "cuda:0" #cpu

### Pretrained Text Model

In [None]:
# Step 1: Load pre-trained RoBERTa model and tokenizer
model_name = 'roberta-base'  # or any other pre-trained model
tokenizer = RobertaTokenizer.from_pretrained(model_name)
roberta_model = RobertaModel.from_pretrained(model_name)
roberta_model.to(device)

In [None]:
# tokenizer = AutoTokenizer.from_pretrained('xlm-roberta-large')
# xlm_model = AutoModelForMaskedLM.from_pretrained("xlm-roberta-large")

# # tokenizer = AutoTokenizer.from_pretrained('xlm-roberta-base')
# # xlm_model = AutoModelForMaskedLM.from_pretrained("xlm-roberta-base")

# for param in xlm_model.parameters():
#     param.requires_grad = False

# xlm_model.to(device)

# # prepare input
# text = "dummy demo ok"
# encoded_input = tokenizer(text, return_tensors='pt',truncation=True,max_length=100,padding=True)
# encoded_input.to("cuda:0")
# # forward pass
# output = model(**encoded_input)

# features = torch.mean(output.logits, dim=1)
# features = features.squeeze()


### Pretrained Image Model

In [None]:
resnet_model = torchvision.models.resnet152(weights='DEFAULT')
for param in resnet_model.parameters():
    param.requires_grad = False
resnet_model.to(device)

### Custom Classes

In [None]:
class customDataset(Dataset):
    def __init__(self, train_json_path , train_images_folder_path , device="cuda:0"):
        self.train_json_data = self.read_json_data(train_json_path)
        self.pretrained_image_model = resnet_model
        self.pretrained_text_model = roberta_model
        self.tokenizer = tokenizer
        self.train_images_folder_path = train_images_folder_path
        self.transform = transforms.Compose([
                            transforms.ToTensor(),
                            transforms.Lambda(lambda x: x[:3, :, :])  # Keep only the first 3 channels (R, G, B)
                         ])
        self.max_length = 512
        self.device = device
        self.all_targets = [
                            "Logos",
                            "Repetition",
                            "Obfuscation, Intentional vagueness, Confusion",
                            "Reasoning",
                            "Justification",
                            "Slogans",
                            "Bandwagon",
                            "Appeal to authority",
                            "Flag-waving",
                            "Appeal to fear/prejudice",
                            "Simplification",
                            "Causal Oversimplification",
                            "Black-and-white Fallacy/Dictatorship",
                            "Thought-terminating cliché",
                            "Distraction",
                            "Misrepresentation of Someone's Position (Straw Man)",
                            "Presenting Irrelevant Data (Red Herring)",
                            "Whataboutism",
                            "Ethos",
                            "Glittering generalities (Virtue)",
                            "Ad Hominem",
                            "Doubt",
                            "Name calling/Labeling",
                            "Smears",
                            "Reductio ad hitlerum",
                            "Pathos",
                            "Exaggeration/Minimisation",
                            "Loaded Language",
                            "Transfer",
                            "Appeal to (Strong) Emotions"
                    ]

    def __len__(self):
        return len(self.train_json_data)

    def __getitem__(self, idx):
        
        if(idx in [163 , 1687 , 1769 , 3683]):
            idx = 1000
            
        image_name = self.train_json_data[idx]['image']
        text_content = self.train_json_data[idx]['text']
        labels = self.train_json_data[idx]['labels']
        raw_gt = self.get_ground_truth(labels)
        gt_tensor = self.convert_gt_to_tensor(raw_gt)
        image_tensor = self.read_image(image_name)
        image_features = self.pretrained_image_model(image_tensor.to(self.device)).squeeze()
        text_features = self.get_text_tensor(text_content)
        return image_features , text_features , gt_tensor

    def read_image(self,image_name):
        image = Image.open(f"{self.train_images_folder_path}/{image_name}")
        image_tensor = self.transform(image)
        image_tensor = image_tensor.reshape((1,image_tensor.shape[0],image_tensor.shape[1],image_tensor.shape[2]))
        return image_tensor

    def get_text_tensor(self,text):
        tokens = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')
        tokens.to(self.device)
        with torch.no_grad():
            outputs = self.pretrained_text_model(**tokens)
            
        features = outputs.last_hidden_state.mean(dim=1)
        return features.squeeze()


    def convert_gt_to_tensor(self,gt):
        categories = self.all_targets
        num_categories = len(categories)
        multi_label_ground_truth = [gt]

        # Create a tensor for multi-label classification
        tensor = np.zeros((len(multi_label_ground_truth), num_categories))
        for i, labels in enumerate(multi_label_ground_truth):
            indices = [categories.index(label) for label in labels]
            tensor[i, indices] = 1

        tensor = torch.tensor(tensor)
        return tensor.squeeze()
        # return tensor

    def get_ground_truth(self,labels):
        ground_truth = {}
        for label in labels:
            if(label in ["Name calling/Labeling" , "Doubt" , "Smears" , "Reductio ad hitlerum"]):
                ground_truth[label] = 1
                ground_truth["Ethos"] = 1
                ground_truth["Ad Hominem"] = 1
            
            if(label in ["Bandwagon","Appeal to authority"]):
                ground_truth[label] = 1
                ground_truth["Ethos"] = 1
                ground_truth["Logos"] = 1
                ground_truth["Justification"] = 1
            
            if(label in ["Glittering generalities (Virtue)"]):
                ground_truth[label] = 1
                ground_truth["Ethos"] = 1
            
            if(label in ["Transfer"]):
                ground_truth[label] = 1
                ground_truth["Ethos"] = 1
                ground_truth["Pathos"] = 1
            
            if(label in ["Appeal to (Strong) Emotions","Exaggeration/Minimisation","Loaded Language"]):
                ground_truth[label] = 1
                ground_truth["Pathos"] = 1
            
            if(label in ["Flag-waving","Appeal to fear/prejudice"]):
                ground_truth[label]=1
                ground_truth["Pathos"] = 1
                ground_truth["Logos"] = 1
                ground_truth["Justification"] = 1
            
            if(label in ["Slogans"]):
                ground_truth[label] = 1
                ground_truth["Justification"] = 1
                ground_truth["Logos"] = 1
            
            if(label in ["Repetition","Obfuscation, Intentional vagueness, Confusion"]):
                ground_truth[label] = 1
                ground_truth["Logos"] = 1
            
            if(label in ["Misrepresentation of Someone's Position (Straw Man)","Presenting Irrelevant Data (Red Herring)"]):
                ground_truth[label] = 1
                ground_truth["Logos"] = 1
                ground_truth["Distraction"] = 1
                ground_truth["Reasoning"] = 1
            
            if(label in ["Whataboutism"]):
                ground_truth[label] = 1
                ground_truth["Ethos"] = 1
                ground_truth["Ad Hominem"] = 1
                ground_truth["Logos"] = 1
                ground_truth["Distraction"] = 1
                ground_truth["Reasoning"] = 1
            
            if(label in ["Causal Oversimplification", "Black-and-white Fallacy/Dictatorship", "Thought-terminating cliché"]):
                ground_truth[label] = 1
                ground_truth["Logos"] = 1
                ground_truth["Reasoning"] = 1
                ground_truth["Simplification"] = 1

        gt = list(ground_truth.keys())
        return gt
    
    def read_json_data(self,file_path):
        f = open(file_path)
        train_json_data = json.load(f)
        f.close()
        return train_json_data

In [None]:
train_dataset = customDataset(train_json_path,train_images_folder_path)
train_dataloader = DataLoader(train_dataset, batch_size=500, shuffle=True)

In [None]:
class Text_Processor(nn.Module):
    def __init__(self , device):
        super(Text_Processor, self).__init__()
        self.fc1 = nn.Linear(768, 1000 , device=device)  
        self.fc2 = nn.Linear(1000, 500 , device=device)
        self.dp = nn.Dropout(p=0.2)
   
    def forward(self,x):
        x = F.relu(self.fc1(x))
        x = self.dp(x)
        x = F.relu(self.fc2(x))
        return x

class Image_Processor(nn.Module):
    def __init__(self , device):
        super(Image_Processor, self).__init__()
        self.fc1 = nn.Linear(1000, 1000 , device=device)  
        self.fc2 = nn.Linear(1000, 500 , device=device)
        self.dp = nn.Dropout(p=0.2)
   
    def forward(self,x):
        x = F.relu(self.fc1(x))
        x = self.dp(x)
        x = F.relu(self.fc2(x))
        return x

class Text_Image_Processor(nn.Module):
    def __init__(self , device):
        super(Text_Image_Processor, self).__init__()
        self.text_processor = Text_Processor(device)
        self.image_processor = Image_Processor(device)
        self.fc1 = nn.Linear(1000, 500 , device=device)
        self.fc2 = nn.Linear(500, 250 , device=device)
        self.fc3 = nn.Linear(250, 250 , device=device)
        self.fc4 = nn.Linear(250,30 , device=device)
        self.dp = nn.Dropout(p=0.2)
   
    def forward(self,image_feature , text_feature):
        i_f = self.image_processor(image_feature)
        t_f = self.text_processor(text_feature)
        c_f = torch.concat((i_f,t_f))
        x = F.relu(self.fc1(c_f))
        x = F.relu(self.fc2(x))
        x = self.dp(x)
        x = F.relu(self.fc3(x))
        x = F.sigmoid(self.fc4(x))
        return x

In [None]:
TI_P = Text_Image_Processor("cuda:0")

In [None]:
TI_P

In [None]:
# image_features,  text_features ,  train_labels = next(iter(train_dataloader))
# for image_features, text_features , train_labels in enumerate(next(iter(train_dataloader))):
#     pass

criterion = nn.BCELoss()

# Define the optimizer
optimizer = torch.optim.Adam(TI_P.parameters(), lr=0.001)

print(len(train_dataset))

In [None]:
TI_P.load_state_dict(torch.load("./model_v3.pt"))
TI_P.train()

In [None]:
epochs = 20
for epoch in range(epochs):
    running_loss = 0.0
    try:
        image_batch,text_batch,target_batch =  next(iter(train_dataloader))
    except:
        print("Missed an epoch")
        continue
    for input_image,input_text, target in zip(image_batch,text_batch,target_batch):
        try:    
            # i_f , t_f , gt = train_dataset[x]
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            output = TI_P(input_image,input_text)
            # print(output)
            # print(target)
            loss = criterion(output.float().to("cpu"), target.float().to("cpu"))
        
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
        
            # Print statistics
            running_loss += loss.item()

            # if x % 10 == 9:
            #     print(f"[{epoch + 1}, {x + 1}] loss: {running_loss / 10:.3f}")
            #     running_loss = 0.0
        except:
            pass
    
    print(f"Epoch # {epoch} has average running loss = {running_loss/500}")
    if(epoch%20==0):
        torch.save(TI_P.state_dict(), "./model_v3.pt")

print("Finished Training")

In [None]:
torch.save(TI_P.state_dict(), "./model_v3.pt")
# faults = []
# for x in range(0,len(train_dataset)):
#     try:
#         a,b,c = train_dataset[x]
#     except:
#         print("error",x)
#         faults.append(x)

In [None]:
# train_dataset.train_json_data[163]

In [None]:
# all_targets = [
#     "Logos",
#     "Repetition",
#     "Obfuscation, Intentional vagueness, Confusion",
#     "Reasoning",
#     "Justification",
#     "Slogans",
#     "Bandwagon",
#     "Appeal to authority",
#     "Flag-waving",
#     "Appeal to fear/prejudice",
#     "Simplification",
#     "Causal Oversimplification",
#     "Black-and-white Fallacy/Dictatorship",
#     "Thought-terminating cliché",
#     "Distraction",
#     "Misrepresentation of Someone's Position (Straw Man)",
#     "Presenting Irrelevant Data (Red Herring)",
#     "Whataboutism",
#     "Ethos",
#     "Glittering generalities (Virtue)",
#     "Ad Hominem",
#     "Doubt",
#     "Name calling/Labeling",
#     "Smears",
#     "Reductio ad hitlerum",
#     "Pathos",
#     "Exaggeration/Minimisation"
#     "Loaded Language",
#     "Transfer",
#     "Appeal to (Strong) Emotions"
# ]

In [None]:
# labels =  [
#             "Smears",
#             "Misrepresentation of Someone's Position (Straw Man)"
#         ]

In [None]:
# ground_truth = {}
# for label in labels:
#     if(label in ["Name calling/Labeling" , "Doubt" , "Smears" , "Reductio ad hitlerum"]):
#         ground_truth[label] = 1
#         ground_truth["Ethos"] = 1
#         ground_truth["Ad Hominem"] = 1

#     if(label in ["Bandwagon","Appeal to authority"]):
#         ground_truth[label] = 1
#         ground_truth["Ethos"] = 1
#         ground_truth["Logos"] = 1
#         ground_truth["Justification"] = 1

#     if(label in ["Glittering generalities (Virtue)"]):
#         ground_truth[label] = 1
#         ground_truth["Ethos"] = 1

#     if(label in ["Transfer"]):
#         ground_truth[label] = 1
#         ground_truth["Ethos"] = 1
#         ground_truth["Pathos"] = 1

#     if(label in ["Appeal to (Strong) Emotions","Exaggeration/Minimisation","Loaded Language"]):
#         ground_truth[label] = 1
#         ground_truth["Pathos"] = 1

#     if(label in ["Flag-waving","Appeal to fear/prejudice"]):
#         ground_truth[label]=1
#         ground_truth["Pathos"] = 1
#         ground_truth["Logos"] = 1
#         ground_truth["Justification"] = 1

#     if(label in ["Slogans"]):
#         ground_truth[label] = 1
#         ground_truth["Justification"] = 1
#         ground_truth["Logos"] = 1

#     if(label in ["Repetition","Obfuscation, Intentional vagueness, Confusion"]):
#         ground_truth[label] = 1
#         ground_truth["Logos"] = 1

#     if(label in ["Misrepresentation of Someone's Position (Straw Man)","Presenting Irrelevant Data (Red Herring)"]):
#         ground_truth[label] = 1
#         ground_truth["Logos"] = 1
#         ground_truth["Distraction"] = 1
#         ground_truth["Reasoning"] = 1

#     if(label in ["Whataboutism"]):
#         ground_truth[label] = 1
#         ground_truth["Ethos"] = 1
#         ground_truth["Ad Hominem"] = 1
#         ground_truth["Logos"] = 1
#         ground_truth["Distraction"] = 1
#         ground_truth["Reasoning"] = 1

#     if(label in ["Causal Oversimplification", "Black-and-white Fallacy/Dictatorship", "Thought-terminating cliché"]):
#         ground_truth[label] = 1
#         ground_truth["Logos"] = 1
#         ground_truth["Reasoning"] = 1
#         ground_truth["Simplification"] = 1

In [None]:
# gt = list(ground_truth.keys())
# gt

In [None]:
# import numpy as np

# # Define your categories
# categories = all_targets
# num_categories = len(categories)

# # Define your ground truth for multi-label classification
# multi_label_ground_truth = [gt]

# # Create a tensor for multi-label classification
# tensor = np.zeros((len(multi_label_ground_truth), num_categories))
# for i, labels in enumerate(multi_label_ground_truth):
#     indices = [categories.index(label) for label in labels]
#     tensor[i, indices] = 1

# print(tensor)