In [None]:
!mkdir Datasets
!mv test.csv Datasets/test_set.csv
!mv train.csv Datasets/train_set.csv
!mv val.csv Datasets/validation_set.csv

In [None]:
import os
%env CUBLAS_WORKSPACE_CONFIG=:4096:8

# Fine-tune BERT

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig
from sklearn.metrics import accuracy_score
import pandas as pd
from tqdm import tqdm
import time
import random

# For replication purposes
torch.use_deterministic_algorithms(True)
torch.manual_seed(420)
random.seed(0)

class SequenceClassificationDataset(Dataset): # Handle the input data and labels for PyTorch's DataLoader
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __len__(self):
        return len(self.inputs['input_ids']) # Return the total number of samples in the dataset

    def __getitem__(self, idx):
       
       # Retrieve the input_ids, attention_mask, and label corresponding to the index
        input_ids = self.inputs['input_ids'][idx]
        attention_mask = self.inputs['attention_mask'][idx]
        label = self.labels[idx]
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long)
        }

class BertFineTuning:
    def __init__(self, dataset_path, train_file, validation_file, feature_col, label_col, model_name, batch_size, learning_rate, num_epochs, max_len, optimizer=None, device='cpu'):
        self.dataset_path = dataset_path
        self.train_file = train_file
        self.validation_file = validation_file
        self.feature_col = feature_col
        self.label_col = label_col
        self.model_name = model_name
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.num_epochs = num_epochs
        self.max_len = max_len
        self.optimizer = optimizer
        self.device = torch.device(device)  

        # Load tokenizer
        self.tokenizer = BertTokenizer.from_pretrained(self.model_name, max_len=self.max_len)

        # Load datasets
        self.train_df = pd.read_csv(os.path.join(self.dataset_path, self.train_file))
        self.validation_df = pd.read_csv(os.path.join(self.dataset_path, self.validation_file))

        # Calculate number of unique labels
        self.num_labels = len(self.train_df[self.label_col].unique())

        # Tokenize datasets
        self.tokenized_train = self.tokenize_dataset(self.train_df, self.feature_col, self.label_col)
        self.tokenized_validation = self.tokenize_dataset(self.validation_df, self.feature_col, self.label_col)

        # Model configuration
        self.model_config = BertConfig.from_pretrained(self.model_name, num_labels=self.num_labels)
        self.model = BertForSequenceClassification.from_pretrained(self.model_name, config=self.model_config)
        self.model.to(self.device)

        # Optimizer
        if self.optimizer is None:
            raise ValueError("Please provide an optimizer instance.")

        if self.optimizer == 'Adam':
            self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)

        # DataLoaders
        self.train_dataloader = self.create_dataloader(self.tokenized_train)
        self.validation_dataloader = self.create_dataloader(self.tokenized_validation, shuffle=False)

    def tokenize_dataset(self, df, feature_col, label_col):
        return self.tokenizer(list(df[feature_col]),
                              padding=True,
                              truncation=True,
                              return_tensors='pt'), list(df[label_col])

    def create_dataloader(self, tokenized_dataset, shuffle=True):
        dataset = SequenceClassificationDataset(tokenized_dataset[0], tokenized_dataset[1])
        return DataLoader(dataset, batch_size=self.batch_size, shuffle=shuffle)

    def evaluate_model(self, dataloader):
        self.model.eval() # Set the model to evaluation mode
        all_labels = []
        all_predictions = []

        with torch.no_grad():
            for batch in dataloader: 
                inputs = {key: value.to(self.device) for key, value in batch.items()} 
                labels = inputs["labels"] 
                outputs = self.model(**inputs) 
                logits = outputs.logits 

                _, predicted = torch.max(logits, 1) 
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predicted.cpu().numpy())

        accuracy = accuracy_score(all_labels, all_predictions) 
        return accuracy

    def train(self):
        for epoch in range(self.num_epochs): 
            self.model.train() 
            train_losses = [] 

            # Iterate over batches in the training data loader, displaying progress using tqdm
            for batch in tqdm(self.train_dataloader, desc=f'Epoch {epoch + 1}/{self.num_epochs}'):
                inputs = {key: value.to(self.device) for key, value in batch.items()} 
                outputs = self.model(**inputs) 
                loss = outputs.loss 
                train_losses.append(loss.item()) 

                self.optimizer.zero_grad() 
                loss.backward() 
                self.optimizer.step() 

            # Validation
            validation_losses = [] 
            validation_accuracy = self.evaluate_model(self.validation_dataloader) 

            for batch in self.validation_dataloader:
              inputs = {key: value.to(self.device) for key, value in batch.items()} 
              outputs = self.model(**inputs) 
              loss = outputs.loss 
              validation_losses.append(loss.item())

            print(f'Epoch {epoch + 1}/{self.num_epochs} - Training Loss: {sum(train_losses) / len(train_losses):.4f} - Validation Loss: {sum(validation_losses) / len(validation_losses):.4f} - Validation Accuracy: {validation_accuracy:.4f}')

    def save_model(self, directory):
        self.model.save_pretrained(directory)
        self.tokenizer.save_pretrained(directory)

# Usage
start_time = time.time()
model = 'bert'
model_name = 'bert-base-uncased'

# Hyperparameters
learning_rate = 2e-5
num_epochs = 3
batch_size = 6
max_len = 512

optimizer = 'Adam'
device = 'cuda' if torch.cuda.is_available() else 'cpu' 

# Paths and filenames
absolute_path = "./"
dataset_path = absolute_path + "Datasets/"
train_file = 'train_set.csv'
validation_file = 'validation_set.csv'
feature_col = 'text' 
label_col = 'label' 
trained_model = model + '_optimizer_' + optimizer + '_lr_' + str(learning_rate) + '_epochs_' + str(num_epochs) + '_bs_' + str(batch_size) + '_maxlen_' + str(max_len)

# Fine-Tuning Phase
classifier = BertFineTuning(dataset_path, train_file, validation_file, feature_col, label_col, model_name, batch_size, learning_rate, num_epochs, max_len, optimizer, device)
classifier.train()
classifier.save_model(absolute_path + 'TrainedModels/' + trained_model)
print("Training time: {:.2f} seconds".format(time.time() - start_time))

# Time
classifier.save_model(absolute_path + 'TrainedModels/' + trained_model)
training_time = time.time() - start_time
inference_start_time = time.time()
validation_accuracy = classifier.evaluate_model(classifier.validation_dataloader)
inference_time = time.time() - inference_start_time

Mounted at /content/gdrive


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Epoch 1/3: 100%|██████████| 595/595 [00:38<00:00, 15.28it/s]


Epoch 1/3 - Training Loss: 0.0861 - Validation Loss: 0.0501 - Validation Accuracy: 0.9843


Epoch 2/3: 100%|██████████| 595/595 [00:34<00:00, 17.20it/s]


Epoch 2/3 - Training Loss: 0.0260 - Validation Loss: 0.0825 - Validation Accuracy: 0.9832


Epoch 3/3: 100%|██████████| 595/595 [00:33<00:00, 17.76it/s]


Epoch 3/3 - Training Loss: 0.0116 - Validation Loss: 0.0584 - Validation Accuracy: 0.9877
Training time: 184.37 seconds


# Predict using the fine-tuned BERT

In [None]:
class BertPredictions:
    def __init__(self, model_path, device, max_len):
        self.model_path = model_path
        self.max_len = max_len
        self.device = torch.device(device)  
        self.model, self.tokenizer = self.load_fine_tuned_bert_model()

    def load_fine_tuned_bert_model(self):
        model = BertForSequenceClassification.from_pretrained(self.model_path) 
        tokenizer = BertTokenizer.from_pretrained(self.model_path) 
        model.to(self.device) 
        return model, tokenizer

    def predict(self, input):
        tokens = self.tokenizer.tokenize(self.tokenizer.decode(self.tokenizer.encode(input))) 

        # Truncate the tokens if the length exceeds max_len - 2
        if len(tokens) > self.max_len - 2:
            tokens = tokens[:self.max_len - 2]

        # Encode the tokens and convert them to PyTorch tensor
        input_ids = self.tokenizer.encode(tokens, return_tensors="pt").to(self.device)

        with torch.no_grad():
            self.model.eval() 
            logits = self.model(input_ids)[0] 
            predictions = torch.argmax(logits, dim=1).item() 

        return predictions

    def predict_and_save(self, dataset_path, test_file, feature_col, prediction_col):
        # Load the test dataset
        test_df = pd.read_csv(os.path.join(dataset_path, test_file))

        # Backup the original file by renaming it
        os.rename(os.path.join(dataset_path, test_file), os.path.join(dataset_path, 'test_set_original.csv'))

        # Iterate through each row in the DataFrame
        for index, row in tqdm(test_df.iterrows(), total=len(test_df)):
            content = row[feature_col]

            # Process the content and predict the label
            predicted_rating = self.predict(content)

            # Update the prediction_col column
            test_df.at[index, prediction_col] = predicted_rating

        test_df.to_csv(os.path.join(dataset_path, test_file), index=False)

# Usage
max_len = 512

str_params = 'bert_optimizer_Adam_lr_2e-05_epochs_3_bs_6_maxlen_512'
device = 'cuda' if torch.cuda.is_available() else 'cpu'  
optimizer = "Adam" 

# Paths and filenames
path = "./" 
dataset_path = path + "Datasets/"
test_file = "test_set.csv"
trained_model = path + 'TrainedModels/' + str_params  
feature_col = 'text' 
prediction_col = str_params + '_prediction'

# Instantiate the BertPredictions class
prediction = BertPredictions(trained_model, device, max_len)

# Run prediction and save results to CSV
prediction.predict_and_save(dataset_path, test_file, feature_col, prediction_col)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


100%|██████████| 1114/1114 [00:18<00:00, 59.50it/s]


# Metrics and results

In [None]:
from sklearn.metrics import (
    f1_score,
    accuracy_score,
    precision_score,
    recall_score,
    matthews_corrcoef,
    roc_auc_score,
    average_precision_score,
    confusion_matrix,
)
import numpy as np

# Metrics
def false_positive_rate(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return fp / (fp + tn)

SCORING = {
    "F1": f1_score,
    "Accuracy": accuracy_score,
    "Precision": precision_score,
    "Recall": recall_score,
    "MCC": matthews_corrcoef,
    "ROC AUC": roc_auc_score,
    "PRC AREA": average_precision_score,
    "FPR": false_positive_rate,
}

# Load predictions
predictions_df = pd.read_csv("Datasets/test_set.csv")
metric_values = {}

# Evaluate metrics
for metric_name, metric_func in SCORING.items():
    if metric_name in ["PRC AREA", "FPR", "ROC AUC"]:
        metric_values[metric_name] = metric_func(predictions_df["label"], predictions_df[prediction_col])
    else:
        metric_values[metric_name] = metric_func(predictions_df["label"], predictions_df[prediction_col])

# Time
metric_values["training_time"] = training_time
metric_values["inference_time"] = inference_time

columns = list(SCORING.keys()) + ["training_time", "inference_time"]

scores = pd.DataFrame(columns=columns)

row = {}

for metric in SCORING.keys():
    val = metric_values[metric]
    row[metric] = round(val, 4) if isinstance(val, (float, int)) else val

row["training_time"] = round(metric_values.get("training_time", 0), 4)
row["inference_time"] = round(metric_values.get("inference_time", 0), 4)

scores.loc["BERT"] = row

print(scores)