In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import polars as pl
# Huggin Face 
from transformers import (
    AutoTokenizer, 
    AutoConfig,
    AutoModelForSequenceClassification, 
    Trainer, 
    TrainingArguments, 
    DataCollatorWithPadding
)
from tokenizers import AddedToken

from datasets import Dataset
# scikit-learn
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold

from typing import Tuple,List

2024-05-13 08:38:59.109126: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-13 08:38:59.109253: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-13 08:38:59.190130: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
! pip install optuna



In [3]:
! pip install ray[tune]



In [4]:
class Tokenize(object):
    def __init__(self, train, valid, tokenizer,max_length):
        self.tokenizer = tokenizer
        self.train = train
        self.valid = valid
        self.max_length = max_length
        
    def get_dataset(self, df):
        ds = Dataset.from_dict({
                'essay_id': [e for e in df['essay_id']],
                'full_text': [ft for ft in df['full_text']],
                'label': [s for s in df['label']],
            })
        return ds
        
    def tokenize_function(self, example):
        tokenized_inputs = self.tokenizer(
            example['full_text'], truncation=True, max_length=self.max_length,
        )
        return tokenized_inputs
    
    def __call__(self):
        train_ds = self.get_dataset(self.train)
        valid_ds = self.get_dataset(self.valid)
        
        tokenized_train = train_ds.map(
            self.tokenize_function, batched=True
        )
        tokenized_valid = valid_ds.map(
            self.tokenize_function, batched=True
        )
        
        return tokenized_train, tokenized_valid, self.tokenizer

In [5]:
class AESTraining:
    
    GOOGLE_BERT_BASE_CASED:str = "google-bert/bert-base-cased"
    METRIC_NAME:str = "qwk"
    NUMBER_OF_LABEL:int = 6
    #MODEL_NAME:List[str]=[AESTraining.GOOGLE_BERT_BASE_CASED]
        
    def __init__(self,
                 model_name:str,
                 metric_name:str,
                 number_of_label:int,
                 learning_rate:float,
                 token_max_len:int,
                 batch_siz:int,
                 weight_decay:float,
                 train_epochs:int,
                 optim:str
                 ):
        self.model_name = model_name
        self.metric_name = model_name
        self.token_max_len = token_max_len
        self.batch_size = batch_siz
        self.learning_rate = learning_rate
        self.train_epochs = train_epochs
        self.weight_decay = weight_decay
        self.number_of_label = number_of_label
        self.optim = optim
        
    #@staticmethod
    #def read_train_csv(path:str="/kaggle/input/learning-agency-lab-automated-essay-scoring-2/train.csv")->pl.DataFrame:
    #    return pl.read_csv(path)

 
    @staticmethod
    def df(path:str="/kaggle/input/learning-agency-lab-automated-essay-scoring-2/train.csv", 
              n_splits:int = 5, 
              label_col_name: str = "score",
              random_state: int = 42) ->pd.DataFrame:

        skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
        data = pd.read_csv(path)
        data['label'] = data['score'].apply(lambda x: x-1)
        for i, (_, val_index) in enumerate(skf.split(data, data[label_col_name])):
            data.loc[val_index, "fold"] = i
        return data
        
    def init_model(self):
        config = AutoConfig.from_pretrained(self.model_name)
        config.num_labels = self.number_of_label
        return AutoModelForSequenceClassification.from_pretrained(self.model_name,num_labels=self.number_of_label)
        
    def tokenizer(self):
        return AutoTokenizer.from_pretrained(self.model_name)
        
    
    def default_training_Ars(self)->TrainingArguments:
        return TrainingArguments(
            f"{self.model_name}-finetuned",
            evaluation_strategy = "epoch",
            save_strategy = "epoch",
            learning_rate= self.learning_rate ,#2e-5,
            per_device_train_batch_size=self.batch_size,
            per_device_eval_batch_size=self.batch_size,
            num_train_epochs=self.train_epochs,
            weight_decay= self.weight_decay, #0.01,
            load_best_model_at_end=True,
            metric_for_best_model=self.metric_name,
            push_to_hub=False,
            optim=self.optim,
        )
    


In [6]:
aes_training = AESTraining(model_name="/kaggle/input/huggingfacedebertav3variants/deberta-v3-base",
                           metric_name=AESTraining.METRIC_NAME,
                           number_of_label=AESTraining.NUMBER_OF_LABEL,
                           learning_rate=2e-5,
                           token_max_len=1024,
                           batch_siz=16,
                           weight_decay=0.01,
                           train_epochs=5,
                           optim="adamw_torch",
                          )
data : pd.DataFrame = aes_training.df()

In [7]:
def compute_metrics(eval_pred):
    
    predictions, labels = eval_pred
    qwk = cohen_kappa_score(labels, predictions.argmax(-1), weights='quadratic')
    results = {
        'qwk': qwk
    }
    return results

def optuna_hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
        "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [16, 32, 64, 128]),
    }

In [8]:
def search_hp():
    best_runs = []
    for fold in range(len(data['fold'].unique())):

        train = data[data['fold'] != fold]
        valid = data[data['fold'] == fold].copy()

        tokenizer = aes_training.tokenizer()
        tokenizer.add_tokens([AddedToken("\n", normalized=False)])
        tokenizer.add_tokens([AddedToken(" "*2, normalized=False)])
        tokenize = Tokenize(train, valid, tokenizer,aes_training.token_max_len)
        tokenized_train, tokenized_valid, _ = tokenize()

        # model = aes_training.init_model
        data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
        
        def init_model():
            model_path = '/kaggle/input/huggingfacedebertav3variants/deberta-v3-base'
            num_labels=6
            return AutoModelForSequenceClassification.from_pretrained(model_path, num_labels=num_labels)
        
        
        training_args = aes_training.default_training_Ars()
        training_args.output_dir = '/kaggle/working/output' 
        

        trainer = Trainer(
            model=None,
            args=training_args,
            train_dataset=tokenized_train,
            eval_dataset=tokenized_valid,
            compute_metrics=compute_metrics,
            tokenizer=tokenizer,
            model_init=init_model,
            data_collator=data_collator,
        )

        best_run = trainer.hyperparameter_search(n_trials=1, direction="maximize",backend="optuna",hp_space=optuna_hp_space,)  
        best_runs.appenda(best_run)
        
    return best_runs
    

In [9]:
# search_hp()