<a href="https://www.kaggle.com/code/sivamanivallabhani/bertfinetuning?scriptVersionId=175467008" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
# Regular imports
import matplotlib.pyplot as plt
import torch
from torch import nn
import numpy as np 
device = "cuda" if torch.cuda.is_available() else "cpu"

# Import torch.nn.functional as F
import torch.nn.functional as F

In [3]:
# Define data path
data_path = "/kaggle/input/davidson1/labeled_data.csv"


In [4]:
# Load data
import pandas as pd
data = pd.read_csv(data_path)
df = pd.DataFrame(data)
df.columns = ['Unnamed: 0', 'count', 'hate_speech', 'offensive_language', 'neither',
              'classk', 'tweet']
df["classk"] = df["classk"].map({2:1, 1:0, 0:0})  # Convert labels to binary (hate vs not hate)
tweets = df.tweet
res = []


In [5]:
# Process tweets
for i in range(len(tweets)):
    splitt = tweets[i].split(":")
    if len(splitt) == 1:
        res.append(splitt[0])
    else:
        res.append(splitt[-1])

tweets = res

In [6]:

# Import tokenizer and BERT model
from transformers import AutoTokenizer, BertModel
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = BertModel.from_pretrained("bert-base-uncased")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [8]:
# Define custom dataset class
class Custom_Dataset(torch.utils.data.Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
        
    def __getitem__(self, index: int) -> (torch.Tensor, int):
        X = self.data[index]
        y = self.labels[index]
        return (X, y)
    
    def __len__(self) -> int:
        return len(self.data)


In [9]:
# Split data into train and test sets
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(tweets, list(df.classk), test_size=0.2, shuffle=True)

# Define train and test datasets and dataloaders
train_data = Custom_Dataset(data=X_train, labels=y_train)
test_data = Custom_Dataset(data=X_test, labels=y_test)
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=3, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=1, shuffle=False)

In [10]:
class Model0(nn.Module):
    def __init__(self, model, tokenizer):
        super().__init__()
        self.tokenizer = tokenizer
        self.model = model
        self.fc1 = nn.Linear(768, 256)  # Additional layer for fine-tuning
        self.fc2 = nn.Linear(256, 2)     # Output layer

    def forward(self, X):
        inputs = self.tokenizer(X, return_tensors="pt").to(device)
        outputs = self.model(**inputs)
        outputs = outputs.last_hidden_state[0][0].unsqueeze(1)

        
        # Additional layers for fine-tuning
        outputs = torch.relu(self.fc1(outputs.squeeze()))  # Remove extra dimension
        outputs = self.fc2(outputs)

        return outputs


In [9]:
# Instantiate the model
model0 = Model0(model, tokenizer)
model0.to(device)

# Define the optimizer
optimizer = torch.optim.Adam(params=model0.parameters(), lr=0.00002)

In [10]:
# Define the inter and intra space loss functions
def L_inter(spaces):
    means = [torch.mean(spaces[i]) for i in range(spaces.shape[0])]
    loss = torch.tensor(0.0, requires_grad=True)
    for k in range(len(means)):
        cur = torch.tensor(0.0, requires_grad=True)
        for l in range(len(means)):
            if l == k:
                continue
            cur = cur + (1 / (1 - ((means[k] * means[l]))))
        loss = loss + cur
    return loss

def L_intra(spaces):
    loss = torch.tensor(0.0, requires_grad=True)
    for k in range(len(spaces)):
        loss = loss + 1 / torch.var(spaces[k], dim=0)
    return torch.sum(loss) / loss.shape[0]

In [None]:
# Evaluate the model
correct = 0
total = len(y_test)

true = []
preds = []

with torch.no_grad():
    for i, batch in enumerate(test_loader):
#         if i == 100 :
#             break
        X, y = batch
        y_preds = model0(X)

        # Move y tensor to the same device as y_preds
        y = y.to(y_preds.device)

        y_predict = torch.argmax(y_preds)

        true.extend(y.cpu())  # Move true labels back to CPU
        preds.append(y_predict.cpu())  # Move predictions back to CPU

        cur = torch.eq(y_predict, y)

        correct += torch.sum(cur)

print(f"Test Accuracy: {correct/total}")


In [None]:
# Train the model
model0.train()
try:
    for _ in range(5):
        for i, batch in enumerate(train_loader):
#             if i==100:
#                 break
            if i % 2000 == 0:
                print(i)

            X, Y = batch
            loss = 0

            for X, y in zip(X, Y):
                y_preds = model0(X)
                loss += F.cross_entropy(y_preds.view(1, -1).to(device), torch.tensor([y]).to(device))

            loss /= len(Y)
#             loss += 0.7 * L_inter(torch.cat((hate_space, not_hate_space))) + 0.5 * L_intra(torch.stack([hate_space, not_hate_space]))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        

except KeyboardInterrupt:
    print("Training interrupted by the user.")

In [None]:
# Evaluate the model
correct = 0
total = len(y_test)

true = []
preds = []

with torch.no_grad():
    for i, batch in enumerate(test_loader):
#         if i == 100 :
#             break
        X, y = batch
        y_preds = model0(X)

        # Move y tensor to the same device as y_preds
        y = y.to(y_preds.device)

        y_predict = torch.argmax(y_preds)

        true.extend(y.cpu())  # Move true labels back to CPU
        preds.append(y_predict.cpu())  # Move predictions back to CPU

        cur = torch.eq(y_predict, y)

        correct += torch.sum(cur)

print(f"Test Accuracy: {correct/total}")


In [13]:
# Define hate and not_hate words
hate_words = ['Moist', 'Cunt', 'Panties', 'Fuck', 'Hate', 'Nigger', 'Pussy', 'Ass', 
              'Motherfucker', 'Bitch', 'Damn']
not_hate_words = ['Love', 'Peace', 'Kindness', 'Happiness', 'Respect', 'Friendship',
                  'Appreciation', 'Hope', 'Encouragement', 'Support', 'Caring']

In [17]:
#contrastive loss fucntion 
def kl_divergence(p, q):
    return (p * (p / q).log()).sum()
def ContrastiveLoss():
    sum=0
    for i in hate_words:
        for j in not_hate_space: 
            sum+=kl_divergence(i,j)
            

SyntaxError: incomplete input (1525118760.py, line 2)

In [18]:
import torch.nn.functional as F

# Example usage:
predicted_probs = torch.tensor([[0.9, 0.1], [0.3, 0.7], [0.8, 0.2]])
target_labels = torch.tensor([0, 1, 0])  # Assuming binary classification
ce_loss = F.cross_entropy(predicted_probs, target_labels)
ce_loss

In [14]:
temp = []
for word in hate_words:
    inputs = tokenizer(word, return_tensors="pt").to(device)
    outputs = model(**inputs)
    outputs = torch.mean(outputs.last_hidden_state[0][1:], dim=0)
    temp.append(outputs.tolist())

hate_space = torch.tensor(temp).to(device).requires_grad_()

# Process not_hate words
temp = []
for word in not_hate_words:
    inputs = tokenizer(word, return_tensors="pt").to(device)
    outputs = model(**inputs)
    outputs = torch.mean(outputs.last_hidden_state[0][1:], dim=0)
    temp.append(outputs.tolist())

not_hate_space = torch.tensor(temp).to(device).requires_grad_()

In [None]:
import random
l=[[random.uniform(0,1)] for i in range(11)]
l

In [16]:
hate_space.max(1)[0]

tensor([1.1603, 0.8926, 1.1271, 1.6617, 1.2231, 1.5119, 0.9701, 0.9552, 1.3656,
        1.2014, 1.5585], grad_fn=<MaxBackward0>)

In [None]:
# next(iter(test_loader)) 
tensor_shape = (11,1)
hate_space_weights = torch.tensor(l).to(device).requires_grad_()
not_hate_space_weights = torch.tensor(l).to(device).requires_grad_()

print(hate_space_weights.device,not_hate_space_weights.is_leaf)

In [None]:
hate_space_weights.shape

In [None]:
hate_space_weights.is_leaf

In [None]:
import torch.nn.functional as F

class Model1(nn.Module):

    def __init__(self, model, tokenizer):
        super().__init__()
        self.tokenizer = tokenizer
        self.model = model

    def forward(self, X):
        inputs = self.tokenizer(X, return_tensors="pt").to(device)
        outputs = self.model(**inputs)
        outputs = outputs.last_hidden_state[0][1:len(outputs.last_hidden_state[0])].transpose(0, 1)#shape=>(768,no of tokens)
        col_norms = torch.norm(outputs, p=2, dim=0, keepdim=True)
        outputs = outputs / col_norms#  makes every word resp unit vector 
        hate_att_scores=torch.matmul(hate_space/torch.norm(hate_space,p=2, dim=0, keepdim=True),outputs)#shape=>(11,v)
        not_hate_att_scores=torch.matmul(not_hate_space/torch.norm(not_hate_space,p=2, dim=0, keepdim=True),outputs)#shape=>(11,v)
        Psk_hate = torch.matmul(hate_att_scores.max(1)[0], hate_space_weights)#shape=>(1)
        Psk_not_hate = torch.matmul(not_hate_att_scores.max(1)[0], not_hate_space_weights)#shape=>(1)
        output_tensor = torch.cat((Psk_hate, Psk_not_hate), dim=0).requires_grad_()#
        return output_tensor

In [None]:
model1 = Model1(model, tokenizer)

In [None]:
def L_inter(spaces):
    """this function calculates inter space losses between spaces 
    forces the class spaces of different class to move apart
    """
    means = [torch.mean(spaces[i]) for i in range(spaces.shape[0])]
    
    loss = torch.tensor(0.0, requires_grad=True)
    
    for k in range(len(means)):
        cur = torch.tensor(0.0, requires_grad=True)
        for l in range(len(means)):
            if l == k:
                continue
            cur = cur + (1 / (1 - ((means[k] * means[l]))))
        
        loss = loss + cur
    
    return loss
            
            

def L_intra(spaces):
    """"this function calculates intra space loss 
forces the vector representations inside a class space to have high variability(varience)
"""
    loss=torch.tensor(0.0, requires_grad=True)
    for k in range(len(spaces)):
        loss=loss+1/torch.var(spaces[k], dim=0)
    
    return torch.sum(loss)/loss.shape[0]

In [None]:
print(hate_space,not_hate_space)

In [None]:
print(hate_space_weights,not_hate_space_weights)

In [None]:
from sklearn.metrics import precision_score, recall_score,accuracy_score


model1.to(device)

print(hate_space.is_leaf)
optimizer = torch.optim.Adam(params=[hate_space, not_hate_space, hate_space_weights, not_hate_space_weights], lr=0.0002)
model1.train()


curr=0.1
prev=0
pprev=0
for _ in range(5):
    for i, batch in enumerate(train_loader):
#         if i == 100:
#             break
        if i % 2000 == 0:
            print(i)

        x, Y = batch
        loss = 0

        for X, y in zip(x, Y):
            probs = model1(X)
#             print(y_preds)
            #print(f"printing y_preds={y_preds} shape={y_preds.shape}")

            loss+=F.cross_entropy(probs.view(1, -1).to(device), torch.tensor([y]).to(device))
            """explisit soft max is not used to convert to probs is cross 
            entrophy loss has inbuilt softmax  function adding softmax explisity
            will effect adversly by smoothing again on smoothed values will decrese 
            the weightage for class with high prob """
#             break
        loss /= len(x)
        loss += 1*L_inter(torch.cat((hate_space, not_hate_space))) + 1*L_intra(torch.stack([hate_space, not_hate_space]))
            
        optimizer.zero_grad()
        
        loss.backward()
        optimizer.step()
#         print(hate_space.grad,hate_space_weights.grad)
    correct = 0
    total = len(y_test)

    true = []
    preds = []

    with torch.inference_mode():
        for i, batch in enumerate(test_loader):
#             if i==100:
#                 break

            X, y = batch
            Psk_hate, Psk_not_hate = model1(X)

            y_predict = torch.argmax(torch.tensor([Psk_hate.mean().item(), Psk_not_hate.mean().item()]))

            true.extend(y)
            preds.append(y_predict)

            cur = torch.eq(y_predict, y)

            correct += torch.sum(cur)
    pprev=prev
    prev=curr
    print(f"Test Accuracy: {correct/total}")
    curr=correct/total
    print(f"Accuracy - {accuracy_score(y_test, preds)}")
    print(f"precison score -{precision_score(y_test, preds, average='macro')}")
    print(f"recall score  -{recall_score(y_test, preds, average='macro')}")
  

In [None]:
print(hate_space_weights,not_hate_space_weights)

In [None]:
print(hate_space,not_hate_space)

In [None]:
def save_bert_model(model,tokenizer,params):
        output_dir = 'Saved/bert'
#         if(params['train_att']):
#             if(params['lambda_attn']>=1):
#                 params['lambda_attn']=int(params['lambda_attn'])

#             output_dir =  output_dir+str(params['supervised_layer_pos'])+'_'+str(params['num_supervised_heads'])+'_'+str(params['num_classes'])+'_'+str(params['lambda_attn'])+'/'
            
#         else:
#             output_dir=output_dir+'_'+str(params['num_classes'])+'/'
#         print(output_dir)
#         # Create output directory if needed
#         if not os.path.exists(output_dir):
#             os.makedirs(output_dir)

#         print("Saving model to %s" % output_dir)

        # Save a trained model, configuration and tokenizer using `save_pretrained()`.
        # They can then be reloaded using `from_pretrained()`
        model_to_save = model.module if hasattr(model, 'module') else model  # Take care of distributed/parallel training
        model_to_save.save_pretrained(output_dir)
        tokenizer.save_pretrained(output_dir)

In [None]:
save_bert_model(model,tokenizer,model.parameters())

In [None]:
torch.save(hate_space, 'hatespace.pt')
torch.save(hate_space_weights, 'hatespace.pt_weights')
torch.save(not_hate_space, 'not_hatespace.pt')
torch.save(not_hate_space_weights, 'not_hatespace_weights.pt')