In [None]:
import torch
import torch.nn as nn
from transformers import RobertaModel
from tokenizers import ByteLevelBPETokenizer
from pathlib import Path
import torch
import json
import matplotlib.pyplot as plt
import pandas as pd
from datasets import load_dataset
from transformers import AutoTokenizer
import numpy as np
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
import torch.nn.functional as F

# Creating and Implementing Model

In [None]:
class ModelLoader:
    def __init__(self, config_path):
        self.model = RobertaModel.from_pretrained('roberta-base')
        self.config = self._load_config(config_path)
        
    def _load_config(self, config_path):
        with open(config_path, "r") as f:
            return json.load(f)
        
    def get_model(self):
        return self.model
    
    def get_config(self):
        return self.config

    
    
class Tokenizer:
    def __init__(self):
        self.special_tokens = ["<s>", "<pad>", "</s>", "<unk>", "<mask>"]
        self.tokenizer = self._load_tokenizer()
    
    def _load_tokenizer(self):
        # Load the tokenizer
        tokenizer = AutoTokenizer.from_pretrained("roberta-base")
        
        return tokenizer
    
    def encode(self, text):
        return self.tokenizer.encode(text)
    
    def decode(self, tokens, skip_special_tokens=False):
        if isinstance(tokens, list):
            return self.tokenizer.decode(tokens)
        elif isinstance(tokens, torch.Tensor):
            return self.tokenizer.decode(tokens.tolist())
        else:
            raise ValueError("Tokens must be a list or torch.Tensor")
            
        if skip_special_tokens:
            for token in self.special_tokens:
                decoded = decoded.replace(token, '')
            decoded = ' '.join(decoded.split())  # Remove extra spaces
    
    def token_to_id(self, token):
        return self.tokenizer.convert_tokens_to_ids(token)
    
    def id_to_token(self, id):
        return self.tokenizer.convert_ids_to_tokens(id)
    
    def get_vocab_size(self):
        return len(self.tokenizer)
    
    def get_eos_token_id(self):
        return self.tokenizer.eos_token_id
    
    def get_bos_token_id(self):
        return self.tokenizer.bos_token_id
    
    def get_pad_token_id(self):
        return self.tokenizer.pad_token_id

    def get_mask_token_id(self):
        return self.tokenizer.mask_token_id
    
    
    
class Encoding(Tokenizer):
    def __init__(self, prompt):
        super().__init__(tokenizer_path)
        self.prompt = prompt
        
    def enc(self):
        encoded = self.tokenizer.encode(self.prompt)
        tokens = encoded.ids
        bos_token = "<s>"
        bos_id = self.tokenizer.token_to_id(bos_token)
        if bos_id is not None:
            tokens = [bos_id] + tokens
        else:
            print(f"Error: '{bos_token}' token not found in vocabulary. This should not happen.")
            print("Vocabulary:", self.tokenizer.get_vocab())
        
        return torch.tensor(tokens)

    
    
class Embedding:
    def __init__(self, model, config):
        self.model = model
        self.dim = config["hidden_size"]
        self.vocab_size = config["vocab_size"] 
        self.embedding_layer = nn.Embedding(self.vocab_size, self.dim)
        self.embedding_layer.weight.data.copy_(self.model.embeddings.word_embeddings.weight)

    def get_embeddings(self, tokens):
        return self.embedding_layer(tokens).to(torch.bfloat16)

    
    
class PositionalEmbedding(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.pos_embedding = nn.Embedding(config["max_position_embeddings"], config["hidden_size"])

    def forward(self, seq_length):
        positions = torch.arange(seq_length, device=self.pos_embedding.weight.device)
        return self.pos_embedding(positions)

    
    
class RobertaLikeModel(nn.Module):
    def __init__(self, model_loader, num_labels):
        super().__init__()
        self.config = model_loader.get_config()
        self.pretrained_model = model_loader.get_model()
        self.encoder = self.pretrained_model.encoder
        
        self.tokenizer = Tokenizer()
        self.word_embedding = Embedding(self.pretrained_model, self.config)
        self.positional_embedding = PositionalEmbedding(self.config)
        
        self.attention = self.pretrained_model.encoder.layer[0].attention
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Dropout(self.config["hidden_dropout_prob"]),
            nn.Linear(self.config["hidden_size"], num_labels)
        )
        
        # Add dropout for regularization
        self.dropout = nn.Dropout(self.config["hidden_dropout_prob"])

    def forward(self, prompts):                                          # nn.Forward runs automatically when you apply it to input data
        # Handle both single string and list of strings
        if isinstance(prompts, str):
            prompts = [prompts]
        
        # Encode the prompts
        encoded = [self.tokenizer.encode(prompt) for prompt in prompts]
        
        # Pad sequences to the same length
        max_len = max(len(seq) for seq in encoded)
        padded = [seq + [self.tokenizer.get_pad_token_id()] * (max_len - len(seq)) for seq in encoded]
        
        # Convert to tensor and add batch dimension if necessary
        input_ids = torch.tensor(padded)
        if input_ids.dim() == 1:
            input_ids = input_ids.unsqueeze(0)
        
        # Word embeddings
        word_embeds = self.word_embedding.get_embeddings(input_ids)
        
        # Positional embeddings
        pos_embeds = self.positional_embedding(input_ids.size(1))
        
        # Combine embeddings
        embeddings = word_embeds + pos_embeds
        
        # Create attention mask
        attention_mask = (input_ids != self.tokenizer.get_pad_token_id()).float()
        extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
        extended_attention_mask = extended_attention_mask.to(dtype=next(self.parameters()).dtype)
        extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0
        
        # Pass through all encoder layers
        encoder_outputs = self.encoder(embeddings, attention_mask=extended_attention_mask)
        sequence_output = encoder_outputs[0]
        
        # Use the [CLS] token representation for classification
        pooled_output = sequence_output[:, 0, :]
        
        # Pass through the classification head
        logits = self.classifier(pooled_output)
        
        return logits

# Defining CustomDataset Class

In [None]:
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        # Add input validation
        if not isinstance(text, str):
            #print(f"Warning: Non-string input detected at index {idx}. Input: {text}")
            text = str(text)  # Convert to string

        # We don't need to tokenize here because the model does it internally
        return {
            'text': text,
            'label': torch.tensor(label, dtype=torch.long).float()
        }

# Loading Training Data

In [None]:
df = pd.read_csv(".csv", encoding='latin-1')
df = pd.DataFrame(df)
df = df.iloc[: , 1:]

np.random.seed(1339)

# Creates the dataframe # List are the labels
df['list'] = df[df.columns[2:]].values.tolist()
new_df = df[['CASEID', 'comment_text', 'list']].copy()

# Applies float to list
new_df['list'] = new_df['list'].apply(lambda x: [float(i) for i in x])
new_df.head(10)

In [None]:
train_size = 0.8 # 80% Train Size
train_dataset=new_df.sample(frac=train_size,random_state=200)
test_dataset=new_df.drop(train_dataset.index).reset_index(drop=True)
train_dataset = train_dataset.reset_index(drop=True)

print("FULL Dataset: {}".format(new_df.shape))
print("TRAIN Dataset: {}".format(train_dataset.shape))
print("TEST Dataset: {}".format(test_dataset.shape))

train_dataset = CustomDataset(train_dataset.comment_text, train_dataset.list, Tokenizer._load_tokenizer("a"))
val_dataset = CustomDataset(test_dataset.comment_text, test_dataset.list, Tokenizer._load_tokenizer("a"))

# Defining Training Function

In [None]:
from torch.utils.data import DataLoader
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss

def train_model(model, train_dataset, val_dataset, num_epochs = 10, batch_size = 8, learning_rate = 1e-5):
    train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, drop_last = True)
    val_dataloader = DataLoader(val_dataset, batch_size = batch_size, drop_last = True)

    optimizer = AdamW(model.parameters(), lr = learning_rate)
    loss_fn = CrossEntropyLoss()

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for batch in train_dataloader:
            prompts = batch['text']
            labels = batch['label']

            optimizer.zero_grad()

            outputs = model(prompts)
            loss = loss_fn(outputs, labels)
            #print(loss)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            
        avg_train_loss = total_loss / len(train_dataloader)
        model.eval()
        val_loss = 0
        correct_predictions = 0
        total_predictions = 0

        with torch.no_grad():
            for batch in val_dataloader:
                prompts = batch['text']
                labels = batch['label']

                outputs = model(prompts)
                
                loss = loss_fn(outputs, labels)
                val_loss += loss.item()

                preds = torch.sigmoid(outputs) > 0.5  # Apply sigmoid and thresholding; this is a hyperparameter
                
                correct_predictions += torch.sum(preds == labels).item()
                total_predictions += labels.numel()  # Total number of elements

        avg_val_loss = val_loss / len(val_dataloader)
        accuracy = correct_predictions / total_predictions

        print(f'Epoch {epoch+1}/{num_epochs}')
        print(f'Training Loss: {avg_train_loss:.4f}')
        print(f'Validation Loss: {avg_val_loss:.4f}')
        print(f'Validation Accuracy: {accuracy:.4f}')
        print()

    return model

# Training

In [None]:
# Load model and RoBERTa Config
config_path = "config.json" # RoBERTa config from Github
model_loader = ModelLoader(config_path)

# Initialize model
model = RobertaLikeModel(model_loader, num_labels = 13)

# Train model
trained_model = train_model(model, train_dataset, val_dataset)