In [None]:
!pip install transformers

In [None]:
import pandas as pd
import numpy as np
import torch 
import torch.nn as nn
import shutil

In [None]:
train_data=pd.read_csv('/kaggle/input/d/harisudarsan/twitter/train_data.csv')
val_data=pd.read_csv('/kaggle/input/d/harisudarsan/twitter/val_data.csv')
test_data=pd.read_csv('/kaggle/input/d/harisudarsan/twitter/test_data.csv')

In [None]:
MAX_LEN=256
train_batch_size=16
val_batch_size=32
test_batch_size=32
epoch=3
lr=1e-04

In [None]:
from transformers import RobertaTokenizer

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")


Creating a custom dataset 

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.df = df
        self.title = df['body']
        self.targets = self.df['target'].values
        self.max_len = max_len

    def __len__(self):
        return len(self.title)

    def __getitem__(self, index):
        title = str(self.title[index])
        title = " ".join(title.split())
        
        inputs = self.tokenizer.encode_plus(
            title,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        target = torch.tensor(self.targets[index], dtype=torch.long)

        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs["token_type_ids"].flatten(),
            'targets': target
        }

In [None]:
train_dataset = CustomDataset(train_data, tokenizer, MAX_LEN)
valid_dataset = CustomDataset(val_data, tokenizer, MAX_LEN)
test_dataset = CustomDataset(test_data, tokenizer, MAX_LEN)

In [None]:
train_data_loader = torch.utils.data.DataLoader(train_dataset, 
    batch_size=train_batch_size,
    shuffle=True,
    num_workers=0
)

val_data_loader = torch.utils.data.DataLoader(valid_dataset, 
    batch_size=val_batch_size,
    shuffle=False,
    num_workers=0
)
test_data_loader = torch.utils.data.DataLoader(test_dataset, 
    batch_size=test_batch_size,
    shuffle=False,
    num_workers=0
)


In [None]:
device=torch.device('cuda')if torch.cuda.is_available() else torch.device('cpu')

In [None]:
def load_ckp(checkpoint_fpath, model, optimizer):    
    # load check point
    checkpoint = torch.load(checkpoint_fpath)
    # initialize state_dict from checkpoint to model
    model.load_state_dict(checkpoint['state_dict'])
    # initialize optimizer from checkpoint to optimizer
    optimizer.load_state_dict(checkpoint['optimizer'])
    # initialize valid_loss_min from checkpoint to valid_loss_min
    valid_loss_min = checkpoint['valid_loss_min']
    # return model, optimizer, epoch value, min validation loss 
    return model, optimizer, checkpoint['epoch'], valid_loss_min.item()

def save_ckp(state, is_best, checkpoint_path, best_model_path):
    """
    state: checkpoint we want to save
    is_best: is this the best checkpoint; min validation loss
    checkpoint_path: path to save checkpoint
    best_model_path: path to save best model
    """
    f_path = checkpoint_path
    # save checkpoint data to the path given, checkpoint_path
    torch.save(state, f_path)
    # if it is a best model, min validation loss
    if is_best:
        best_fpath = best_model_path
        # copy that checkpoint file to best path given, best_model_path
        shutil.copyfile(f_path, best_fpath)

BERT base model


In [None]:
import torch
from transformers import BertModel

class BERTClass(torch.nn.Module):
    def __init__(self, num_classes):
        super(BERTClass, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)
        self.dropout = torch.nn.Dropout(0.3)
        self.linear = torch.nn.Linear(768, num_classes)  # Adjust 768 to match BERT hidden size
    
    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids, 
            attention_mask=attn_mask, 
            token_type_ids=token_type_ids
        )
        last_hidden_state = output.last_hidden_state
        pooled_output = last_hidden_state[:, 0]  # Extract the [CLS] token representation
        pooled_output_dropout = self.dropout(pooled_output)
        output = self.linear(pooled_output_dropout)
        return output

num_classes = 3  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
bert_model = BERTClass(num_classes)
bert_model.to(device)


RoBERTa base model


In [None]:
import torch
from transformers import RobertaModel, RobertaConfig

class RoBERTaClass(torch.nn.Module):
    def __init__(self, num_classes):
        super(RoBERTaClass, self).__init__()
        self.config = RobertaConfig.from_pretrained('roberta-base')
        self.bert_model = RobertaModel(config=self.config)
        self.dropout = torch.nn.Dropout(0.3)
        self.linear = torch.nn.Linear(768, num_classes)  # Adjust 768 to match RoBERTa hidden size

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids=input_ids,
            attention_mask=attn_mask,
            token_type_ids=token_type_ids,
            return_dict=True
        )
        output_dropout = self.dropout(output.pooler_output)
        output = self.linear(output_dropout)
        return output

num_classes = 3
Rmodel = RoBERTaClass(num_classes)
Rmodel.to(device)


Modified RoBERTa Model

In [None]:
import torch
from transformers import RobertaModel, RobertaConfig

class RoBERTaClass(torch.nn.Module):
    def __init__(self, num_classes, hidden_size=768, dropout_prob=0.3):
        super(RoBERTaClass, self).__init__()
        self.config = RobertaConfig.from_pretrained('roberta-base')
        self.bert_model = RobertaModel(config=self.config)
        self.dropout = torch.nn.Dropout(dropout_prob)
        self.fc1 = torch.nn.Linear(hidden_size, 512)  # Add first fully connected layer
        self.fc2 = torch.nn.Linear(512, 256)  # Add second fully connected layer
        self.linear = torch.nn.Linear(256, num_classes)  # Adjust output layer based on the number of classes

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids=input_ids,
            attention_mask=attn_mask,
            token_type_ids=token_type_ids,
            return_dict=True
        )
        output_dropout = self.dropout(output.pooler_output)
        output_fc1 = torch.relu(self.fc1(output_dropout))  # Apply ReLU activation to the first FC layer
        output_fc2 = torch.relu(self.fc2(output_fc1))  # Apply ReLU activation to the second FC layer
        output = self.linear(output_fc2)
        return output

# Define the number of classes for your specific task
num_classes = 3

# Create the model with the additional layers
Rmmodel = RoBERTaClass(num_classes)
Rmmodel.to(device)


Hybrid GRU model

In [None]:
class HybridModel(nn.Module):
    def __init__(self, num_classes, roberta_model, gru_hidden_size, num_gru_layers):
        super(HybridModel, self).__init__()
        self.roberta = roberta_model
        self.gru = nn.GRU(roberta_model.config.hidden_size, gru_hidden_size, num_gru_layers, batch_first=True)  # Updated
        self.fc = nn.Linear(gru_hidden_size, num_classes)
        
    def forward(self, input_ids, attention_mask):
        roberta_output = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = roberta_output.last_hidden_state
        gru_output, _ = self.gru(sequence_output)
        gru_output = gru_output[:, -1, :]  # Take the last hidden state
        logits = self.fc(gru_output)
        return logits

# Set your hyperparameters
num_gru_layers = 2  # Number of GRU layers
model = HybridModel(num_classes, roberta_model, gru_hidden_size, num_gru_layers)
model.to(device)


In [None]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

optimizer = torch.optim.Adam(params =  model.parameters(), lr=lr)
val_targets=[]
val_outputs=[]


Training, validation and testing part.

In [None]:
import torch.nn as nn
from sklearn.metrics import accuracy_score, classification_report,precision_score

def train_model(n_epochs, training_loader, validation_loader, model, 
                optimizer, checkpoint_path, best_model_path):
  
    valid_loss_min = np.Inf
    criterion = nn.CrossEntropyLoss()  # Updated loss function for multi-class classification
    
    for epoch in range(1, n_epochs + 1):
        train_loss = 0
        valid_loss = 0
        model.train()
        print('############# Epoch {}: Training Start   #############'.format(epoch))
        
        for batch_idx, data in enumerate(training_loader):
            ids = data['input_ids'].to(device, dtype=torch.long)
            mask = data['attention_mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype=torch.long)  # Use long (integer class indices) for targets
            outputs = model(ids, mask, token_type_ids)
            
            optimizer.zero_grad()
            loss = criterion(outputs, targets)  # Calculate the CrossEntropyLoss
            loss.backward()
            optimizer.step()
            
            train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.item() - train_loss))
    
        print('############# Epoch {}: Training End     #############'.format(epoch))
    
        print('############# Epoch {}: Validation Start   #############'.format(epoch))
        model.eval()
        valid_loss = 0
        correct_predictions = 0
        total_samples = 0
   
        with torch.no_grad():
            for batch_idx, data in enumerate(validation_loader, 0):    
                ids = data['input_ids'].to(device, dtype=torch.long)
                mask = data['attention_mask'].to(device, dtype=torch.long)
                token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
                targets = data['targets'].to(device, dtype=torch.long)  # Use long (integer class indices) for targets
                outputs = model(ids, mask, token_type_ids)
                loss = criterion(outputs, targets)
                valid_loss = valid_loss + ((1 / (batch_idx + 1)) * (loss.item() - valid_loss))

                _, predicted_labels = torch.max(outputs, 1)
                correct_predictions += (predicted_labels == targets).sum().item()
                total_samples += targets.size(0)
        validation_accuracy = correct_predictions / total_samples
        print('Epoch: {} \tValidation Accuracy: {:.4f} \tAverage Validation Loss: {:.6f}'.format(
    epoch, validation_accuracy, valid_loss))

        print('############# Epoch {}: Validation End     #############'.format(epoch))
        
        # Calculate average losses
        train_loss = train_loss / len(training_loader)
        valid_loss = valid_loss / len(validation_loader)
        
        # Print training/validation statistics 
        print('Epoch: {} \tAvgerage Training Loss: {:.6f} \tAverage Validation Loss: {:.6f}'.format(
            epoch, train_loss, valid_loss))
      
        # Create checkpoint variable and add important data
        checkpoint = {
            'epoch': epoch + 1,
            'valid_loss_min': valid_loss,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict()
        }
        
        # Save checkpoint
        save_ckp(checkpoint, False, checkpoint_path, best_model_path)
        
        ## TODO: save the model if validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
                valid_loss_min, valid_loss))
            # Save checkpoint as the best model
            save_ckp(checkpoint, True, checkpoint_path, best_model_path)
            valid_loss_min = valid_loss

        print('############# Epoch {}  Done   #############\n'.format(epoch))
        
        #Testing

# Testing
    model.eval()
    test_preds = []
    test_labels = []

    with torch.no_grad():
        for batch in test_data_loader:
            input_ids = batch['input_ids'].to(device, dtype=torch.long)
            attention_mask = batch['attention_mask'].to(device, dtype=torch.long)
            token_type_ids = batch['token_type_ids'].to(device, dtype=torch.long)
            targets = batch['targets'].to(device, dtype=torch.long)  # Use long (integer class indices) for targets

            outputs = model(input_ids, attention_mask, token_type_ids)
            predicted_probs = torch.sigmoid(outputs)

            # Convert predicted probabilities to class predictions (0, 1, or 2) based on the maximum probability
            predicted_labels = torch.argmax(predicted_probs, dim=1).cpu().numpy()

            # Append predicted labels and ground truth labels to the lists
            test_preds.extend(predicted_labels)
            test_labels.extend(targets.cpu().numpy().tolist())

# Calculate accuracy and print classification report
    test_accuracy = accuracy_score(test_labels, test_preds)
    test_precision = precision_score(test_labels, test_preds, average=None) 
    print("Test Precision:")
    for idx, precision in enumerate(test_precision):
        print(f"Class {idx}: {precision:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    class_names = ["class_0", "class_1", "class_2"]
    print("Test Classification Report:")
    print(classification_report(test_labels, test_preds, target_names=class_names))
    return model


In [None]:
ckpt_path = "/kaggle/working//curr_ckpt"
best_model_path = "/kaggle/working/best_model.pt"

In [None]:
# BERT
trained_model = train_model(epoch, train_data_loader, val_data_loader, bert_model, optimizer, ckpt_path, best_model_path)

In [None]:
#RoBERTa
trained_model = train_model(epoch, train_data_loader, val_data_loader, Rmodel, optimizer, ckpt_path, best_model_path)

In [None]:
#Modified Roberta
trained_model = train_model(epoch, train_data_loader, val_data_loader, Rmmodel, optimizer, ckpt_path, best_model_path)

In [None]:
#Hybrid Model
trained_model = train_model(epoch, train_data_loader, val_data_loader, model, optimizer, ckpt_path, best_model_path)