In [None]:
from tqdm import tqdm
import json
import pandas as pd
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from transformers import AutoTokenizer
from transformers import RobertaModel
import random
import warnings

warnings.filterwarnings('ignore')

random.seed(1337)

def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")
get_device()

tokenizer_path = ""

In [2]:
### Define the ModelLoader, Tokenizer, Encoding, Embedding, PositionalEmbedding, MultiHeadAttention, and RobertaLikeModel Classes
## These classes make up the original RoBERTa model architecture and tokenizer functionality from the paper

# Function for loading the model and configuration
class ModelLoader:
    def __init__(self, config_path):
        self.model = RobertaModel.from_pretrained('roberta-base')
        self.config = self._load_config(config_path)
        
    def _load_config(self, config_path):
        with open(config_path, "r") as f:
            return json.load(f)
        
    def get_model(self):
        return self.model
    
    def get_config(self):
        return self.config

    
# Function for loading the tokenizer
class Tokenizer:
    def __init__(self):
        self.special_tokens = ["<s>", "<pad>", "</s>", "<unk>", "<mask>"]
        self.tokenizer = self._load_tokenizer()
    
    def _load_tokenizer(self):
        tokenizer = AutoTokenizer.from_pretrained("roberta-base")
        return tokenizer
    
    def encode(self, text):
        return self.tokenizer.encode(text)
    
    def decode(self, tokens, skip_special_tokens=False):
        if isinstance(tokens, list):
            decoded = self.tokenizer.decode(tokens, skip_special_tokens=skip_special_tokens)
        elif isinstance(tokens, torch.Tensor):
            decoded = self.tokenizer.decode(tokens.tolist(), skip_special_tokens=skip_special_tokens)
        else:
            raise ValueError("Tokens must be a list or torch.Tensor")
        
        return decoded
    
    def token_to_id(self, token):
        return self.tokenizer.convert_tokens_to_ids(token)
    
    def id_to_token(self, id):
        return self.tokenizer.convert_ids_to_tokens(id)
    
    def get_vocab_size(self):
        return len(self.tokenizer)
    
    def get_eos_token_id(self):
        return self.tokenizer.eos_token_id
    
    def get_bos_token_id(self):
        return self.tokenizer.bos_token_id
    
    def get_pad_token_id(self):
        return self.tokenizer.pad_token_id

    def get_mask_token_id(self):
        return self.tokenizer.mask_token_id
    

# Function for encoding text with special handling for beginning of sequence (BOS) token    
class Encoding(Tokenizer):
    def __init__(self, prompt):
        super().__init__(tokenizer_path)
        self.prompt = prompt
        
    def enc(self):
        encoded = self.tokenizer.encode(self.prompt)
        tokens = encoded.ids
        bos_token = "<s>"
        bos_id = self.tokenizer.token_to_id(bos_token)
        if bos_id is not None:
            tokens = [bos_id] + tokens
        else:
            print(f"Error: '{bos_token}' token not found in vocabulary.")
            print("Vocabulary:", self.tokenizer.get_vocab())
        
        return torch.tensor(tokens)

    
# Function for creating embeddings from the model's word embeddings using config parameters
class Embedding:
    def __init__(self, model, config):
        self.model = model
        self.dim = config["hidden_size"]
        self.vocab_size = config["vocab_size"] 
        self.embedding_layer = nn.Embedding(self.vocab_size, self.dim)
        self.embedding_layer.weight.data.copy_(self.model.embeddings.word_embeddings.weight)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.embedding_layer.to(self.device)

    def get_embeddings(self, tokens):
        return self.embedding_layer(tokens).to(torch.bfloat16)

    
# Function for creating positional embeddings using the model's configuration    
class PositionalEmbedding(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.pos_embedding = nn.Embedding(config["max_position_embeddings"], config["hidden_size"])
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, seq_length):
        positions = torch.arange(seq_length, device = self.device)
        return self.pos_embedding(positions)

    
# Function for creating multi-head attention mechanism using the model's configuration    
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.num_attention_heads = config["num_attention_heads"]
        self.attention_head_size = int(config["hidden_size"] / config["num_attention_heads"])
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(config["hidden_size"], self.all_head_size)
        self.key = nn.Linear(config["hidden_size"], self.all_head_size)
        self.value = nn.Linear(config["hidden_size"], self.all_head_size)

        self.dropout = nn.Dropout(config["attention_probs_dropout_prob"])
        self.dense = nn.Linear(config["hidden_size"], config["hidden_size"])
        self.LayerNorm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"])

    # Define a method to transpose the input tensor for multi-head attention
    def transpose_for_scores(self, x):
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(self, hidden_states, attention_mask=None):
        mixed_query_layer = self.query(hidden_states)
        mixed_key_layer = self.key(hidden_states)
        mixed_value_layer = self.value(hidden_states)

        query_layer = self.transpose_for_scores(mixed_query_layer)
        key_layer = self.transpose_for_scores(mixed_key_layer)
        value_layer = self.transpose_for_scores(mixed_value_layer)

        # Take the dot product between "query" and "key" to get the raw attention scores.
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / torch.sqrt(torch.tensor(self.attention_head_size, dtype = torch.float))
        
        # Apply the attention mask
        if attention_mask is not None:
            attention_scores = attention_scores + attention_mask

        # Normalize the attention scores to probabilities.
        attention_probs = F.softmax(attention_scores, dim = -1)

        attention_probs = self.dropout(attention_probs)

        context_layer = torch.matmul(attention_probs, value_layer)
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        context_layer = context_layer.view(*new_context_layer_shape)

        attention_output = self.dense(context_layer)
        attention_output = self.dropout(attention_output)
        attention_output = self.LayerNorm(attention_output + hidden_states)

        return attention_output
    
    
# Putting all RoBERTa-like components together in a model class 
class RobertaLikeModel(nn.Module):
    def __init__(self, model_loader, num_labels):
        super().__init__()
        self.config = model_loader.get_config()
        self.pretrained_model = model_loader.get_model()
        self.encoder = self.pretrained_model.encoder # We will not be training a tokenizer on multiple terabytes of text... 
        
        self.tokenizer = Tokenizer()
        self.word_embedding = Embedding(self.pretrained_model, self.config)
        self.positional_embedding = PositionalEmbedding(self.config)
        
        self.attention = MultiHeadAttention(self.config)
        # This can be replaced by the following line to make use of the pretrained attention weights:
        # self.attention = self.pretrained_model.encoder.layer[0].attention
        
        self.classifier = nn.Sequential(
            nn.Dropout(self.config["hidden_dropout_prob"]),
            nn.Linear(self.config["hidden_size"], num_labels)
        )
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, prompts):
        if isinstance(prompts, str):
            prompts = [prompts]
        
        encoded = [self.tokenizer.encode(prompt) for prompt in prompts]
        max_len = max(len(seq) for seq in encoded)
        padded = [seq + [self.tokenizer.get_pad_token_id()] * (max_len - len(seq)) for seq in encoded]
        
        input_ids = torch.tensor(padded, device=self.device)
        if input_ids.dim() == 1:
            input_ids = input_ids.unsqueeze(0)
        
        word_embeds = self.word_embedding.get_embeddings(input_ids)
        pos_embeds = self.positional_embedding(input_ids.size(1))
        
        embeddings = word_embeds + pos_embeds
        
        attention_mask = (input_ids != self.tokenizer.get_pad_token_id()).float()
        extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
        extended_attention_mask = extended_attention_mask.to(dtype=next(self.parameters()).dtype)
        extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0
        
        encoder_outputs = self.encoder(embeddings, attention_mask=extended_attention_mask)
        sequence_output = encoder_outputs[0]
        
        pooled_output = sequence_output[:, 0, :]
        
        logits = self.classifier(pooled_output)
        
        return logits

In [None]:
### Define the CustomDataset class

class CustomDataset():
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        if not isinstance(text, str):
            text = str(text)

        return {
            'text': text,
            'label': torch.tensor(label, dtype = torch.float32)
        }

In [None]:
### Preprocessing Data

df = pd.read_csv("roberta_from_scratch/motn_data.csv", encoding = 'latin-1')
df = df.iloc[: , 1:]

# Remove duplicates (excluding CASEID)
columns_to_check = ['comment_text'] + [col for col in df.columns if col not in ['CASEID', 'comment_text']]
df = df.drop_duplicates(subset = columns_to_check)

df['list'] = df[df.columns[2:]].values.tolist()
new_df = df[['CASEID', 'comment_text', 'list']].copy()

new_df['list'] = new_df['list'].apply(lambda x: [float(i) for i in x])

train_size = 0.8 # 80% Train Size
train_dataset = new_df.sample(frac = train_size, random_state = 1337)
test_dataset = new_df.drop(train_dataset.index).reset_index(drop = True)
train_dataset = train_dataset.reset_index(drop = True)

print("Full Dataset: {}".format(new_df.shape))
print("Train Dataset: {}".format(train_dataset.shape))
print("Test Dataset: {}".format(test_dataset.shape))

train_dataset = CustomDataset(train_dataset['comment_text'], train_dataset['list'], tokenizer = "RoBERTa")
val_dataset = CustomDataset(test_dataset['comment_text'], test_dataset['list'], tokenizer = "RoBERTa")

Full Dataset: (13987, 3)
Train Dataset: (11190, 3)
Test Dataset: (2797, 3)


In [None]:
### Defining the Training Function

def train_model(model, train_dataset, val_dataset, num_epochs = 50, batch_size = 16, learning_rate = 1e-5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, drop_last = True)
    val_dataloader = DataLoader(val_dataset, batch_size = batch_size, drop_last = True)
    
    optimizer = AdamW(model.parameters(), lr = learning_rate)
    
    loss_fn = nn.BCEWithLogitsLoss()  # Combines sigmoid + BCE, for multi-label
    
    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        
        total_train_loss = 0

        # Training
        for batch in tqdm(train_dataloader, desc = f"Epoch {epoch + 1} - Training"):
            prompts = batch['text']
            labels = batch['label'].to(device)
            
            optimizer.zero_grad()
            outputs = model(prompts)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
            
        train_loss = total_train_loss / len(train_dataloader)
        
        model.eval()
        total_val_loss = 0
        total_exact_matches = 0
        total_samples = 0
        total_jaccard = 0
        all_preds = []
        all_labels = []
        
        # Validation
        with torch.no_grad():
            for batch in tqdm(val_dataloader, desc = f"Epoch {epoch + 1} - Validation"):
                prompts = batch['text']
                labels = batch['label'].to(device)
                outputs = model(prompts)
                
                loss = loss_fn(outputs, labels)
                total_val_loss += loss.item()
                preds = torch.sigmoid(outputs) > 0.5
                
                # Collect predictions and labels for micro F1 calculation
                all_preds.append(preds.cpu())
                all_labels.append(labels.cpu())
                
                # 1. Exact accuracy
                total_exact_matches += torch.sum(torch.all(preds == labels, dim=1)).item()
                total_samples += labels.size(0)
                
                # 2. Jaccard Index (intersection over union)
                preds_bool = preds.bool()
                labels_bool = labels.bool()
                intersection = torch.sum(preds_bool & labels_bool, dim=1).float()
                union = torch.sum(preds_bool | labels_bool, dim=1).float()
                batch_jaccard = torch.mean(intersection / (union + 1e-8)).item()
                total_jaccard += batch_jaccard
        
        # Calculate metrics val_loss, exact_accuracy, jaccard_accuracy
        val_loss = total_val_loss / len(val_dataloader)
        exact_accuracy = total_exact_matches / total_samples
        jaccard_accuracy = total_jaccard / len(val_dataloader)
        
        # 3. Calculate micro F1
        all_preds = torch.cat(all_preds, dim=0)
        all_labels = torch.cat(all_labels, dim=0)
        all_preds_bool = all_preds.bool()
        all_labels_bool = all_labels.bool()
        
        micro_tp = torch.sum(all_preds_bool & all_labels_bool).float()
        micro_fp = torch.sum(all_preds_bool & ~all_labels_bool).float()
        micro_fn = torch.sum(~all_preds_bool & all_labels_bool).float()
        
        micro_precision = micro_tp / (micro_tp + micro_fp + 1e-8)
        micro_recall = micro_tp / (micro_tp + micro_fn + 1e-8)
        micro_f1 = 2 * micro_precision * micro_recall / (micro_precision + micro_recall + 1e-8)
        
        print(
            f"Epoch {epoch + 1} | train_loss: {train_loss:.4f} | val_loss: {val_loss:.4f} | "
            f"micro_f1: {micro_f1:.4f} | exact_acc: {exact_accuracy:.4f} | jaccard: {jaccard_accuracy:.4f}"
        )

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "roberta_model.pth")
        else:
            patience_counter += 1
            if patience_counter >= 3: # Stop if validation loss does not improve for 3 epochs
                print(f"Early stopping at epoch {epoch + 1}")
                break
    
    return model

In [None]:
### Train

# Load model and RoBERTa Config
config_path = "config.json" 
model_loader = ModelLoader(config_path)

# Initialize model
model = RobertaLikeModel(model_loader, num_labels = 13).to(get_device())

# Train model
trained_model = train_model(model, train_dataset, val_dataset)