### Hyperparameter Optimization using Optuna

In [None]:
!pip install ipywidgets
!jupyter nbextension enable --py widgetsnbextension
!pip install transformers==4.17 
!pip install datasets
!pip install optuna
!pip install ray

In [None]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

import torch.nn as nn 
import torch.optim as optim 
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

Load data

In [None]:
from datasets import load_dataset
dataset = load_dataset('tweet_eval','emotion')

Parameters for model

In [None]:
pretrained_model_name = "roberta-base" 
max_length = 50
batch_size = 32
d_in = 768 
d_h = 512 
d_out = num_labels 
freeze_pretrained = False #True

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

Tokenizer

In [None]:
from transformers import AutoTokenizer,Trainer, TrainingArguments

tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name, cache_dir = ".")
# Tokenize text, add padding or truncate the text to the max length    
def preprocess(examples):
    return tokenizer(examples['text'],truncation=True,   padding='max_length',max_length=max_length)   

dataset_op = dataset.map(preprocess, batched=True)

Helper functions

In [None]:
from sklearn.metrics import f1_score

def f1_score_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average='weighted')

def accuracy_per_class(preds, labels):
    #print per class accuracy and return averaged accuracy over all classes
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    num, den =0,0
    for label in np.unique(labels_flat):
        y_preds = preds_flat[labels_flat==label]
        y_true = labels_flat[labels_flat==label]
        print(f'Class: {label}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n')
        num+=len(y_preds[y_preds==label])
        den+=len(y_true)
        
    return num/den

Optuna for Hyperparameter Optimization

In [None]:
import optuna 
from transformers import RobertaForSequenceClassification

def objective(trial: optuna.Trial):     
    model = RobertaForSequenceClassification.from_pretrained(pretrained_model_name, num_labels=d_out, output_attentions=False, output_hidden_states=False)     
    
    training_args = TrainingArguments(         
        output_dir='optuna-test',                 
        learning_rate=trial.suggest_loguniform('learning_rate', low=4e-5, high=0.01),         
        weight_decay=trial.suggest_loguniform('weight_decay', 4e-5, 0.01),         
        num_train_epochs=trial.suggest_int('num_train_epochs', low = 2,high= 12),         
        per_device_train_batch_size=8,         
        per_device_eval_batch_size=8,         
        disable_tqdm=True
        )     
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=dataset_op['train'],         
        eval_dataset=dataset_op['validation']     
        )      
    
    result = trainer.train()     
    return result.training_loss

In [None]:
# We want to minimize the loss
study = optuna.create_study(study_name='hyper-parameter-search', direction='minimize',
                           pruner=optuna.pruners.SuccessiveHalvingPruner()) 
study.optimize(func=objective, n_trials=10, gc_after_trial=True)   

In [None]:
print('Minimum objective value: ' + str(study.best_value))
print(study.best_trial)
print('Best parameter: ' + str(study.best_params))

### Custom BPE Tokenizer

In [None]:
from pathlib import Path
from tokenizers import ByteLevelBPETokenizer

ofile = open("data/train.txt",'w')
for i in dataset['train']['text']:
  ofile.write(i)
ofile.close()
ofile = open("data/validation.txt",'w')
for i in dataset['validation']['text']:
  ofile.write(i)
ofile.close()
ofile = open("data/test.txt",'w')
for i in dataset['test']['text']:
  ofile.write(i)
ofile.close()

paths = [str(x) for x in Path(".").glob("data/*.txt")]


tokenizer = ByteLevelBPETokenizer()
# Customize training
tokenizer.train(files=paths, vocab_size=50265, min_frequency=2, special_tokens=[
    "<s>",
    "<pad>",
    "</s>",
    "<unk>",
    "<mask>",
])

In [None]:
!mkdir wikiTokens
tokenizer.save_model("wikiTokens")

Tokenizer

In [None]:
from transformers import RobertaTokenizerFast
tokenizer = RobertaTokenizerFast.from_pretrained("./wikiTokens", max_len=510)

In [None]:
from transformers import LineByLineTextDataset

train_dataset = LineByLineTextDataset(
    tokenizer=tokenizer,
    file_path="./data/train.txt",
    block_size=510,
)

val_dataset = LineByLineTextDataset(
    tokenizer=tokenizer,
    file_path="./data/validation.txt",
    block_size=510,
)

In [None]:
max_length=510
encoded_data_train = tokenizer.batch_encode_plus(
    dataset["train"]["text"], 
    add_special_tokens=True, 
    return_attention_mask=True, 
    truncation=True,
    padding="max_length", 
    max_length=max_length, 
    return_tensors='pt'
)

encoded_data_test = tokenizer.batch_encode_plus(
    dataset["test"]["text"], 
    add_special_tokens=True, 
    return_attention_mask=True, 
    truncation=True,
    padding="max_length",  
    max_length=max_length, 
    return_tensors='pt'
)

encoded_data_val = tokenizer.batch_encode_plus(
    dataset["validation"]["text"], 
    add_special_tokens=True, 
    return_attention_mask=True, 
    truncation=True,
    padding="max_length", 
    max_length=max_length, 
    return_tensors='pt'
)

In [None]:
from torch.utils.data import TensorDataset

input_ids_train = encoded_data_train['input_ids']
attention_masks_train = encoded_data_train['attention_mask']
labels_train = torch.tensor(dataset["train"]["label"])

input_ids_test = encoded_data_test['input_ids']
attention_masks_test = encoded_data_test['attention_mask']
labels_test = torch.tensor(dataset["test"]["label"])

input_ids_val = encoded_data_val['input_ids']
attention_masks_val = encoded_data_val['attention_mask']
labels_val = torch.tensor(dataset["validation"]["label"])

dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train)
dataset_test = TensorDataset(input_ids_test, attention_masks_test, labels_test)
dataset_val = TensorDataset(input_ids_val, attention_masks_val, labels_val)

In [None]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

batch_size = 5 # greater value causes memory error

dataloader_train = DataLoader(dataset_train, 
                              sampler=RandomSampler(dataset_train), 
                              batch_size=batch_size)

dataloader_validation = DataLoader(dataset_val, 
                                   sampler=SequentialSampler(dataset_val), 
                                   batch_size=batch_size)

dataloader_test = DataLoader(dataset_test, 
                                   sampler=SequentialSampler(dataset_test), 
                                   batch_size=batch_size)

In [None]:
import random
import numpy as np

seed_val = 0
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

Model

In [None]:
from transformers import RobertaForSequenceClassification

model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=4, output_attentions=False, output_hidden_states=False)
for param in model.base_model.parameters():
    param.requires_grad = False
n_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("\nThe pre-trained model has {} trainable parameters".format(n_params))

In [None]:
def evaluate(dataloader_test):

    model.eval()
    
    loss_val_total = 0
    predictions, true_vals = [], []
    
    for batch in dataloader_test:
        
        batch = tuple(b.to(device) for b in batch)
        
        inputs = {'input_ids':      batch[0],
                  'attention_mask': batch[1],
                  'labels':         batch[2],
                 }

        with torch.no_grad():        
            outputs = model(**inputs)
            
        # print(outputs)
        loss = outputs[0]
        # loss = loss_fct(outs, labels)
        logits = outputs[1]
        loss_val_total += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = inputs['labels'].cpu().numpy()
        predictions.append(logits)
        true_vals.append(label_ids)
    
    loss_val_avg = loss_val_total/len(dataloader_test) 
    
    predictions = np.concatenate(predictions, axis=0)
    true_vals = np.concatenate(true_vals, axis=0)
            
    return loss_val_avg, predictions, true_vals

In [None]:
from tqdm.notebook import tqdm
for epoch in tqdm(range(1, n_epochs+1)):
    model.train()
    
    loss_train_total = 0

    progress_bar = tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False)
    for batch in progress_bar:

        model.zero_grad()
        optimizer.zero_grad()
        
        batch = tuple(b.to(device) for b in batch)
        
        inputs = {'input_ids':      batch[0],
                  'attention_mask': batch[1],
                  'labels':         batch[2],
                 }       
        # print(inputs)
        outputs = model(**inputs)
        # print(outputs)
        loss = outputs[0]
        # print(loss)
        loss_train_total += loss.item()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))})
        # break
        
    # torch.save(model.state_dict(), f'./finetuned_RoBERTa_epoch_{epoch}.model')
        
    tqdm.write(f'\nEpoch {epoch}')
    
    loss_train_avg = loss_train_total/len(dataloader_train)            
    tqdm.write(f'Training loss: {loss_train_avg}')
    #On validation set
    val_loss, predictions, true_vals = evaluate(dataloader_validation)
    val_f1 = f1_score_func(predictions, true_vals)
    tqdm.write(f'Validation loss: {val_loss}')
    tqdm.write(f'F1 Score (Weighted): {val_f1}')

In [None]:
#On test set
val_loss, predictions, true_vals = evaluate(dataloader_test)
val_f1 = f1_score_func(predictions, true_vals)
print('Test loss: {}'.format(val_loss))
print('F1 Score (Weighted): {}'.format(val_f1))

In [None]:
training_args = TrainingArguments(
    output_dir="./wikiTokens",
    overwrite_output_dir=True,
    num_train_epochs=10,
    save_steps=10_000,
    save_total_limit=2,
    prediction_loss_only=True,
    do_eval=True,
    evaluation_strategy = "epoch",
    load_best_model_at_end = True,
    save_strategy = "epoch"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset= val_dataset
)

trainer.train()

In [None]:
best_model = RobertaForSequenceClassification.from_pretrained(pretrained_model_name, num_labels=d_out, output_attentions=False, output_hidden_states=False)

best_model.to(device)

best_model.load_state_dict(torch.load('./finetuned_RoBERTa_epoch_7.model', map_location=torch.device('cpu')))

best_model.eval()

# _, predictions, true_vals = evaluate(dataloader_test)
# accuracy_per_class(predictions, true_vals)