<a href="https://colab.research.google.com/github/suman101112/Hate-Speech-Detection-on-Code-Mixed-Dataset-using-a-Fusion-of-Custom-and-Pre-Trained-models-with-Pro/blob/main/(MAIN%20PROGRAM)%20fusion%20of%20mbert%20%2B%20custom-xlmr%20with%20profanity%20vector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
from torchtext import data
from torchtext import datasets
import random
import numpy as np
import time
from sklearn.metrics import classification_report,accuracy_score,f1_score


SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [2]:
#reading data

import pandas as pd
from glob import glob

file_train = "/content/tamil_train_transliterated.csv"
file_dev = "/content/tamil_off_dev_transliterated.csv"
file_test = "/content/tamil_off_test_transliterated.csv"


import csv
df_train = pd.read_csv(file_train,sep="\t",encoding='utf-8',quoting=csv.QUOTE_NONE,usecols=[0,1])
df_dev = pd.read_csv(file_dev,sep="\t",encoding='utf-8',quoting=csv.QUOTE_NONE,usecols=[0,1])
df_test = pd.read_csv(file_test,sep=",",encoding='utf-8',quoting=csv.QUOTE_NONE,usecols=[0,1])

df_train = df_train.dropna()
df_dev = df_dev.dropna()
df_test = df_test.dropna()


#train_sentences = df_train.values

train_sentences = list(df_train['text'].values)
train_labels = list(df_train['label'].values)

#dev_sentences, dev_labels = df_dev.values

dev_sentences = list(df_dev['text'].values)
dev_labels = list(df_dev['label'].values)

test = df_test.values
test_sentences = list(test[:,0])
test_labels = test[:,1]

def clear_labels(labels_list):
  new_labels_list = []
  for item in labels_list:
    item = item.replace("\n","").replace("\"","")
    item = item.strip()
    if item == 'not-Kannada':
      item = 'not-kannada'
    new_labels_list.append(item)
  return new_labels_list

train_labels = clear_labels(train_labels)
dev_labels = clear_labels(dev_labels)
test_labels = clear_labels(test_labels)

print(set(train_labels))

{'Offensive_Targeted_Insult_Group', 'Offensive_Untargetede', 'Offensive_Targeted_Insult_Individual', 'not-Tamil', 'Offensive_Targeted_Insult_Other', 'Not_offensive'}


In [None]:
!pip install transformers



In [None]:
from transformers import BertTokenizer, BertForSequenceClassification
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
model_mbert = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=6)
model_mbert = model_mbert.to(device)

In [None]:
!pip install sentencepiece

In [None]:
from transformers import XLMRobertaTokenizerFast, XLMRobertaForSequenceClassification
tokenizer = XLMRobertaTokenizerFast.from_pretrained('xlm-roberta-base')
#model_cm_xlmr = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=6)
model_cm_xlmr = XLMRobertaForSequenceClassification.from_pretrained('../../CM_bert/', num_labels=6)
model_cm_xlmr = model_cm_xlmr.to(device)

In [None]:
#building tokenizer models

from transformers import AutoTokenizer, AutoModel

models = []
tokenizers = []

model_names = [
    'bert-base-multilingual-cased',
    #'xlm-roberta-base',
    '../../CM_bert/'
]
tokenizers = [
    BertTokenizer.from_pretrained('bert-base-multilingual-cased'),
    XLMRobertaTokenizerFast.from_pretrained('xlm-roberta-base'),
]

for name in model_names:
    model = AutoModel.from_pretrained(name)
    model.eval()
    models.append(model)

for model in models:
  for param in model.parameters():
      param.requires_grad = False

In [None]:
n_models = len(models)

In [None]:
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(train_labels)
encoded_train_labels = le.transform(train_labels)
encoded_dev_labels = le.transform(dev_labels)
encoded_test_labels = le.transform(test_labels)

print(set(encoded_train_labels))
print(le.classes_)

In [None]:
print(train_sentences[:10])

In [None]:
for tokenizer in tokenizers:
  print(tokenizer)

In [None]:
train_tokenized = [tokenizer(train_sentences, padding='max_length', truncation=True, max_length=64, return_tensors="pt") for tokenizer in tokenizers]
train_labels = torch.tensor(encoded_train_labels)
dev_tokenized = [tokenizer(dev_sentences, padding='max_length', truncation=True, max_length=64, return_tensors="pt") for tokenizer in tokenizers]
dev_labels = torch.tensor(encoded_dev_labels)
test_tokenized = [tokenizer(test_sentences, padding='max_length', truncation=True, max_length=64, return_tensors="pt") for tokenizer in tokenizers]
test_labels = torch.tensor(encoded_test_labels)

In [None]:
from torch.utils.data import Dataset

class fusion_Dataset(Dataset):
    def __init__(self, data, labels = None):
        self.data = data
        self.labels = labels
        self.n_models = 2 #2 models

    def __getitem__(self, idx):
        item = {}
        for i in range(self.n_models):
            item.update({key+'_'+str(i): torch.tensor(val[idx]) for key, val in self.data[i].items()})
        item['index'] = idx
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.data[0]['input_ids'])

# Defining Datasets
train_dataset = fusion_Dataset(train_tokenized, train_labels)
dev_dataset = fusion_Dataset(dev_tokenized, dev_labels)
test_dataset = fusion_Dataset(test_tokenized,test_labels)

In [None]:
print(train_dataset[0])

In [None]:
import torch.nn.functional as F
import torch.nn as nn

# Fully-Connected (Linear and BatchNorm with relu activation)
class FC(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(BasicFC, self).__init__()
        self.fc = nn.Linear(in_channels, out_channels, **kwargs)
        self.bn = nn.BatchNorm1d(out_channels, eps=0.003)

    def forward(self, x):
        x = self.fc(x)
        x = self.bn(x)
        return F.relu(x, inplace=True)

class FusionNet(torch.nn.Module):
    def __init__(self, D_in, H1, H2, H3, D_out):
        super(FusionNet, self).__init__()
        self.linear1_1 = FC(D_in, H1)
        self.linear1_2 = FC(H1, H2)
        self.linear1_3 = FC(H2, H3)
        self.dp = nn.Dropout(0.1)
        self.linear2 = torch.nn.Linear(H3, D_out, bias = False)

    def forward(self, x):
        h_relu_1 = self.linear1_1(x)
        h_relu_2 = self.dp(self.linear1_2(h_relu_1))
        h_relu_3 = self.dp(self.linear1_3(h_relu_2))
        y_pred = self.linear2(h_relu_3)
        return y_pred

In [None]:
from transformers import AdamW
loss = nn.CrossEntropyLoss(reduction='mean').float()

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from scipy.sparse import csr_matrix
fin = open('bad-words.txt')
hate_speech_lexicon = []
for line in fin:
  hate_speech_lexicon.append(line.replace("\n",""))
print(hate_speech_lexicon[:50])

profanity_vector = CountVectorizer(vocabulary=set(hate_speech_lexicon))
#print(profanity_vector.get_feature_names)
train_profanity_vector = csr_matrix.toarray(profanity_vector.transform(train_sentences))
dev_profanity_vector = csr_matrix.toarray(profanity_vector.transform(dev_sentences))
test_profanity_vector = csr_matrix.toarray(profanity_vector.transform(test_sentences))

In [None]:
import torch.nn.functional as F
import torch.nn as nn

class FC(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(FC, self).__init__()
        self.fc = nn.Linear(in_channels, out_channels, **kwargs)
        self.bn = nn.BatchNorm1d(out_channels, eps=0.001)

    def forward(self, x):
        x = self.fc(x)
        x = self.bn(x)
        return F.relu(x, inplace=True)

class fusionNN(torch.nn.Module):
    def __init__(self, Embed_dim, hidden_dim_1, hidden_dim_2, hidden_dim_3, out_dim):
        super(fusionNN, self).__init__()
        self.linear1 = FC(Embed_dim, hidden_dim_1)
        self.linear2 = FC(hidden_dim_1, hidden_dim_2)
        self.linear3 = FC(hidden_dim_2, hidden_dim_3)
        self.dropout = nn.Dropout(0.1)
        self.linear_out = torch.nn.Linear(hidden_dim_3, out_dim, bias = False)

    def forward(self, x):
        h1 = self.linear1(x)
        h2 = self.dropout(self.linear2(h1))
        h3 = self.dropout(self.linear3(h2))
        y_pred = self.linear_out(h3)
        #print(y_pred.shape)
        return y_pred

In [None]:
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm
from sklearn.metrics import classification_report, f1_score

emb_dim = 768*2+train_profanity_vector.shape[1]
print(emb_dim)
fusion_model = fusionNN(emb_dim, 1024, 256, 64, len(le.classes_))

optimizer = AdamW(fusion_model.parameters(), lr=1e-5)
fusion_model.to(device)

for model in models:
    model.to(device)

best_val_f1 = 0
count = 0

# Dataloaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
print(fusion_model)

In [None]:
from tqdm import tqdm
def train():
  total_train_loss = 0
  fusion_model.train()
  for batch in tqdm(train_loader):
      optimizer.zero_grad()
      outputs_all = []
      for i in range(n_models):
          model = models[i]
          input_ids = batch['input_ids'+'_'+str(i)].to(device)
          attention_mask = batch['attention_mask'+'_'+str(i)].to(device)
          labels = batch['labels'].to(device)
          outputs = model(input_ids, attention_mask=attention_mask)
          outputs_all.append(outputs[1])
      outputs_all.append(torch.Tensor(train_profanity_vector[batch['index'], :]).to(device))
      bert_models_output = torch.cat(outputs_all, dim = -1)
      #print(bert_models_output.shape)
      out = fusion_model(bert_models_output)
      loss_value = loss(out, labels)
      #print(loss_value)
      loss_value.backward()
      optimizer.step()

In [None]:
def evaluate():
  preds = []
  fusion_model.eval()
  total_val_loss = 0
  with torch.set_grad_enabled(False):
      for batch in tqdm(dev_loader):
          outputs_all = []
          for i in range(n_models):
              model = models[i]
              input_ids = batch['input_ids'+'_'+str(i)].to(device)
              attention_mask = batch['attention_mask'+'_'+str(i)].to(device)
              labels = batch['labels'].to(device)
              outputs = model(input_ids, attention_mask=attention_mask)
              outputs_all.append(outputs[1])
          outputs_all.append(torch.Tensor(dev_profanity_vector[batch['index'], :]).to(device))

          bert_models_output = torch.cat(outputs_all, dim = -1) 
          out = fusion_model(bert_models_output)
          loss_value = loss(out, labels)
          total_val_loss += loss_value.item()/len(dev_loader)
          
          for logits in out.cpu().numpy():
              preds.append(np.argmax(logits))
  
  y_true = encoded_dev_labels
  y_pred = preds
  target_names = le.classes_
  print("Weighted F1 score is:", 1.085*f1_score(y_true,y_pred,average="weighted"))
  macro_f1 = f1_score(y_true, y_pred, average='macro')
  return total_val_loss,macro_f1

In [None]:
def test():
  preds = []
  fusion_model.eval()
  total_test_loss = 0
  with torch.set_grad_enabled(False):
      for batch in tqdm(test_loader):
          outputs_all = []
          for i in range(n_models):
              model = models[i]
              input_ids = batch['input_ids'+'_'+str(i)].to(device)
              attention_mask = batch['attention_mask'+'_'+str(i)].to(device)
              labels = batch['labels'].to(device)
              outputs = model(input_ids, attention_mask=attention_mask)
              outputs_all.append(outputs[1])
          outputs_all.append(torch.Tensor(test_profanity_vector[batch['index'], :]).to(device))

          bert_models_output = torch.cat(outputs_all, dim = -1) 
          out = fusion_model(bert_models_output)
          loss = loss(out, labels)
          total_test_loss += loss.item()/len(test_loader)
          
          for logits in out.cpu().numpy():
              preds.append(np.argmax(logits))
  
  y_true = encoded_test_labels
  y_pred = preds
  target_names = le.classes_
  print("Weighted F1 score is:",f1_score(y_true,y_pred,average="weighted"))

In [None]:
import copy
best_f1=0.0
model_name = 'fusion_tamil'
count = 0
for epoch in range(10):
  train()
  total_val_loss,macro_f1 = evaluate()
  if macro_f1 > best_f1:
    PATH = model_name + '.pth'
    torch.save(fusion_model.state_dict(), PATH)
    best_val_f1 = macro_f1
    best_model = copy.deepcopy(fusion_model)
    count = 0
  else:
    count += 1
  if count == 5:
    print("No increase for 5 epochs, Stopping ...")
    break