In [None]:
import os
from torch import optim, nn, utils, Tensor
import lightning.pytorch as pl
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertForSequenceClassification, BertTokenizer, AdamW
from sklearn.metrics import classification_report
import numpy as np
from lightning.pytorch.loggers import WandbLogger
import wandb
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch import Trainer, seed_everything
import torchmetrics
from torchmetrics.classification import BinaryAccuracy
from collections import defaultdict
import pandas as pd
import math
import torch


In [None]:
seed_everything(42, workers=True)
torch.cuda.empty_cache()

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
model = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=2)

In [None]:
class bias_classifier(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.bert = model
        self.accuracy = BinaryAccuracy()
        self.training_step_outputs = defaultdict(list)

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, logits = self(input_ids, attention_mask, labels)
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(labels,preds)
        self.training_step_outputs["acc"].append(acc)
        self.training_step_outputs["loss"].append(loss)
        
        return loss
    
    def on_train_epoch_end(self):
        train_loss_mean = torch.stack([x for x in self.training_step_outputs["loss"]]).mean()
        self.log('train_loss', train_loss_mean.item(), prog_bar=True)
        train_acc_mean = torch.stack([x for x in self.training_step_outputs["acc"]]).mean()
        self.log('train_acc', train_acc_mean.item(), prog_bar=True)
        self.training_step_outputs.clear()
    
    def validation_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, logits = self(input_ids, attention_mask, labels)
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(labels,preds)
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc',acc, prog_bar=True)
        
        
        return preds, labels
    
    def test_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, logits = self(input_ids, attention_mask, labels)
        preds = torch.argmax(logits, dim=1)
        acc = self.accuracy(labels,preds)
        self.log('test_loss', loss, prog_bar=True)
        self.log('test_acc',acc, prog_bar=True)
        
        return preds, labels

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(), lr=1e-5)
        return optimizer
    
    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        return outputs.loss, outputs.logits
    
    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        loss, logits = self(input_ids, attention_mask)
        preds = torch.argmax(logits, dim=1)
        
        return preds

bias = bias_classifier(model)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, index):
        text = self.texts[index]
        label = self.labels[index]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=128,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attention_mask'].squeeze()
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long)
        }


In [None]:
all_data_biased = np.load('PATH TO CORGI BIASED CORPUS',allow_pickle=True,fix_imports=True).item()
all_data_nb = np.load('PATH TO CORGI NON BIASED CORPUS',allow_pickle=True,fix_imports=True).item()

In [None]:
train_input_texts = list(all_data_biased['train']['ori_sentence']) + list(all_data_nb['train']['text'])
val_input_texts = list(all_data_biased['valid']['ori_sentence']) + list(all_data_nb['valid']['text'])
test_input_texts = list(all_data_biased['test']['ori_sentence']) + list(all_data_nb['test']['text'])

train_labels = [1]*len(list(all_data_biased['train']['ori_sentence'])) + [0] *len(list(all_data_nb['train']['text']))
val_labels = [1]*len(list(all_data_biased['valid']['ori_sentence'])) + [0] *len(list(all_data_nb['valid']['text']))
test_labels = [1]*len(list(all_data_biased['test']['ori_sentence'])) + [0] *len(list(all_data_nb['test']['text']))



In [None]:

train_dataset = CustomDataset(train_input_texts, train_labels, tokenizer)
val_dataset = CustomDataset(val_input_texts, val_labels, tokenizer)
test_dataset = CustomDataset(test_input_texts, test_labels, tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
#training
early_stopping = EarlyStopping('val_loss', patience=5)
checkpoint_callback = ModelCheckpoint(monitor="val_loss", save_top_k = -1)
trainer = pl.Trainer(max_epochs=10, accelerator="gpu",deterministic=True,callbacks=[checkpoint_callback]))
trainer.fit(model=bias, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)


In [None]:
#testing
for files in os.listdir("/home/t-rishavhada/Desktop/sanmati_real_world_mining/corgi/checkpoints/"):
    if files.endswith(".ckpt"):
        l = trainer.test(model=bias,dataloaders=test_dataloader,verbose=True, ckpt_path="/home/t-rishavhada/Desktop/sanmati_real_world_mining/corgi/checkpoints/"+files)
        print(l)

In [None]:
#INFERENCE
to_pred = pd.read_csv("PATH TO INFERENCE FILE")
pred_input_texts = to_pred["comments"].to_list()
cleaned_list = [item if not isinstance(item, float) or not math.isnan(item) else "" for item in pred_input_texts]
pred_labels = [99]*len(pred_input_texts)

pred_dataset = CustomDataset(cleaned_list, pred_labels, tokenizer)

pred_dataloader = DataLoader(pred_dataset, batch_size=64, shuffle=False)


l = trainer.predict(model=bias,dataloaders=pred_dataloader, ckpt_path="PATH TO BEST MODEL")




In [None]:
preds = [item.item() for tensor in l for item in tensor]