In [1]:
import os
os.chdir('/content/drive/MyDrive/TFG/Material Inicial')

In [2]:
dicModels = {
    'finiteautomata/beto-sentiment-analysis': {'tam_Embeddings':512, 'tokenizer': f'finiteautomata/beto-sentiment-analysis','directory':'./BetoTwitter'},
    'dccuchile/bert-base-spanish-wwm-cased': {'tam_Embeddings':512, 'tokenizer':f'dccuchile/bert-base-spanish-wwm-cased','directory':'./BetoNormal'},
    'PlanTL-GOB-ES/roberta-base-bne': {'tam_Embeddings':512, 'tokenizer':f'PlanTL-GOB-ES/roberta-base-bne', 'directory':'./RobertaMarIA'},
    'cardiffnlp/twitter-xlm-roberta-base-sentiment': {'tam_Embeddings':512, 'tokenizer':f'cardiffnlp/twitter-xlm-roberta-base-sentiment', 'directory':'./RobertaCardiff'},
    'maxpe/bertin-roberta-base-spanish_semeval18_emodetection': {'tam_Embeddings':512, 'tokenizer':f'bertin-project/bertin-roberta-base-spanish','directory':'./BertinTwitter'},
    'bertin-project/bertin-roberta-base-spanish': {'tam_Embeddings':512, 'tokenizer':f'bertin-project/bertin-roberta-base-spanish','directory':'./BertinNormal'},    
    'pysentimiento/robertuito-base-cased': {'tam_Embeddings':128, 'tokenizer':f'pysentimiento/robertuito-base-cased','directory':'./RobertaTwitter'},
}

In [None]:
!pip install datasets
!pip install transformers
!pip install optuna
from typing import Optional
import pandas as pd
from datasets import load_dataset
import numpy as np
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer, AutoTokenizer, DataCollatorWithPadding
import operator
import optuna
from transformers import EarlyStoppingCallback
import sklearn.metrics as metrics

class EarlyStoppingCallbackOptuna(object):
    """Early stopping callback for Optuna."""

    def __init__(self, early_stopping_rounds: int, direction: str = "minimize") -> None:
        self.early_stopping_rounds = early_stopping_rounds

        self._iter = 0

        if direction == "minimize":
            self._operator = operator.lt
            self._score = np.inf
        elif direction == "maximize":
            self._operator = operator.gt
            self._score = -np.inf
        else:
            ValueError(f"invalid direction: {direction}")

    def __call__(self, study: optuna.Study, trial: optuna.Trial) -> None:
        """Do early stopping."""
        if self._operator(study.best_value, self._score):
            self._iter = 0
            self._score = study.best_value
        else:
            self._iter += 1

        if self._iter >= self.early_stopping_rounds:
            study.stop()

class MyTrainer(Trainer):
    def log(self, logs) -> None:
        logs["learning_rate"] = self._get_learning_rate()
        super().log(logs)

class Pruebas():
    def __init__(self,model, dicObject):
        self.model = model
        self.dicObject = dicObject


    def objective(self,trial: optuna.Trial):     
        model=AutoModelForSequenceClassification.from_pretrained(self.model, num_labels=4, ignore_mismatched_sizes=True)
        model.config.id2label = {0: 'OFP', 1: 'OFG', 2: 'NO', 3: 'NOM'}
        model.config.label2id = {'OFP': 0, 'OFG': 1, 'NO': 2, 'NOM': 3} 

        dataset = load_dataset('csv', data_files={'train': 'train.csv', 'test':'test.csv','validation': 'validation.csv'})
        #We load the data we need from local:
        self.tokenizer = AutoTokenizer.from_pretrained(self.dicObject['tokenizer'], model_max_length=self.dicObject['tam_Embeddings'])
        #Preprocess all texts at once
        tokenized_OffendES = dataset.map(self.preprocess_text, batched=True)

        data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer)
         

        training_args = TrainingArguments(         
            output_dir='./testBetoTwitter2/'+str(trial.number),                 
            learning_rate= trial.suggest_loguniform('learning_rate', low=7e-7, high=4e-5),        
            weight_decay= trial.suggest_loguniform('weight_decay', 4e-5, 0.01),         
            num_train_epochs=0.24,  #5
            per_device_train_batch_size= trial.suggest_categorical('per_device_train_batch_size', [8, 16, 32]),      
            per_device_eval_batch_size = trial.suggest_categorical('per_device_train_batch_size', [8, 16, 32]), 
            load_best_model_at_end = True,
            evaluation_strategy="epoch",
            save_strategy="epoch",
            disable_tqdm=True,
    )     
        trainer = MyTrainer(
            model=model,
            args=training_args,         
            train_dataset=tokenized_OffendES["train"],
            eval_dataset=tokenized_OffendES["validation"],
            tokenizer=self.tokenizer,
            data_collator=data_collator,
            compute_metrics=self.compute_metrics,
            callbacks = [EarlyStoppingCallback(early_stopping_patience = 3)]  
    )      
    

        result = trainer.train()
        dicResult = trainer.evaluate()
        df = pd.DataFrame(dicResult, index=[0])
        df.to_csv(self.dicObject['directory']+'/'+str(trial.number)+"/evaluationResult.txt", sep='\t', index=False)
        
    
        return result.training_loss


    def preprocess_text(self, texts):
        return self.tokenizer(texts["comment"],truncation=True, padding='max_length',max_length=512)

    def compute_metrics(self,p):    
        pred, labels = p
        pred = np.argmax(pred, axis=1)
        
        macp = metrics.precision_score(y_true=labels, y_pred=pred, average='macro')
        mar = metrics.recall_score(y_true=labels, y_pred=pred, average='macro')
        maf1 = metrics.f1_score(y_true=labels, y_pred=pred, average='macro')
        mip = metrics.precision_score(y_true=labels, y_pred=pred, average='micro')
        mir = metrics.recall_score(y_true=labels, y_pred=pred, average='micro')
        mif1 = metrics.f1_score(y_true=labels, y_pred=pred, average='micro')
        wp = metrics.precision_score(y_true=labels, y_pred=pred, average='weighted')
        wr = metrics.recall_score(y_true=labels, y_pred=pred, average='weighted')
        wf1 = metrics.f1_score(y_true=labels, y_pred=pred, average='weighted')

        return {'maf': maf1, 'map': macp, 'mar': mar, 'mif': mif1, 'mip': mip, 'mir': mir, 'avgf': wf1, 'avgp': wp, 'avgr': wr}

    def entrenarModelo(self,directory): 
        # We want to minimize the loss! 
        study = optuna.create_study(study_name='hyper-parameter-search', direction='minimize') 
        # Optimize the objective using 5 different trials
        early_stopping = EarlyStoppingCallbackOptuna(3, direction='minimize')
        study.optimize(func=self.objective, n_trials=1, callbacks=[early_stopping])
        # Gives the best loss value 
        with open('./resultOptimization'+directory.split('./')[1]+'.txt', 'w') as f:
            f.write(str(study.best_value) + '\n' ) 
            f.write(str(study.best_params) + '\n')
            f.write(str(study.best_trial) + '\n')

if __name__=="__main__":
  for i in dicModels:
    p = Pruebas(i, dicModels[i])
    p.entrenarModelo(dicModels[i]['directory'])