<a href="https://colab.research.google.com/github/richardcepka/notebooks/blob/main/SlovakT5_eval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers -qqq
!pip install sentencepiece -qqq
!pip install datasets -qqq
!pip install conllu -qqq
!pip install flax -qqq
!pip install ml-collections -qqq

In [None]:
from abc import ABC, abstractmethod

import numpy as np
from datasets import load_dataset, load_metric, Dataset, DatasetDict, concatenate_datasets
from ml_collections import config_dict
from transformers import T5ForConditionalGeneration, DataCollatorForSeq2Seq, Seq2SeqTrainingArguments, Seq2SeqTrainer, AutoTokenizer, Trainer

In [None]:
class Task(ABC):
    
    @abstractmethod
    def _prepare_data(self):
        pass

    @abstractmethod
    def get_data(self):
        pass

        
    @abstractmethod
    def map_string2id(self):
        pass
    
    @abstractmethod
    def _map_to_seq2seq(self):
        pass

    
    @abstractmethod
    @staticmethod
    def get_metric():
        pass

In [None]:
class SST2(Task):
    prefix = 'sst2'
    prefix1 = 'veta'  # 'sentence' 
    id2string = {0:'negatívna', 1: 'pozitívna'}  # {0: 'negative', 1: 'positive'}  
    string2id = {v: k for k, v in id2string.items()}
    seed = 7

    def __init__(self, seq2seq=True):
        self._prepare_data()
        if seq2seq:
          self._map_to_seq2seq()

    def _prepare_data(self):
      root_path = 'https://raw.githubusercontent.com/kinit-sk/slovakbert-auxiliary/main/sentiment_reviews/'
      dataset_names = [
              'kinit_golden_accomodation.csv', 
              'kinit_golden_books.csv', 
              'kinit_golden_cars.csv', 
              'kinit_golden_games.csv', 
              'kinit_golden_mobiles.csv', 
              'kinit_golden_movies.csv', 
              'kinit_golden_stress.csv'
      ]

      self.dataset = load_dataset('csv', data_files=[root_path + name for name in dataset_names], header=None, names=['text', 'labels'], split='train')
      self.dataset = self.dataset.filter(lambda example: example['labels'] != 0)  # drop labels == 0 (neutral)
      self.dataset = self.dataset.map(lambda example: {'labels': max(example['labels'], 0)})  # -1 -> 0 (negative), 1 -> 1 (positive)

      train_devtest = self.dataset.train_test_split(shuffle=True, seed=self.seed, test_size=0.4)
      dev_test = train_devtest['test'].train_test_split(seed=self.seed, test_size=0.6)
      self.dataset = DatasetDict(
          {
              'train': train_devtest['train'],
              'validation': dev_test['train'],
              'test': dev_test['test']
          }
      )
    
    def _map_to_seq2seq(self):
      self.dataset = self.dataset.map(
          lambda example: {
              'text': self.prefix + ' ' + self.prefix1 + ': ' + example['text'], 
              'labels': self.id2string[example['labels']]
          }
      )

    def map_string2id(self, string_label: str):
        try:
          return self.string2id[string_label]
        except KeyError:
          return -1

    def get_data(self):
      return self.dataset

    @staticmethod
    def get_metric():
      return load_metric('glue', 'sst2')


class STSB(Task):
    prefix = 'stsb'
    prefix1 = 'veta1'  # 'sentence1'  
    prefix2 = 'veta2'  # 'sentence2' 

    def __init__(self, seq2seq=True):
        self._prepare_data()
        if seq2seq:
          self._map_to_seq2seq()

    def _prepare_data(self):
      self.dataset = load_dataset('crabz/stsb-sk')
      self.dataset['train'] = Dataset.from_dict(self.dataset['train'][0])
      self.dataset['validation'] = Dataset.from_dict(self.dataset['validation'][0])
      self.dataset['test'] = Dataset.from_dict(self.dataset['test'][0])
    
    def _map_to_seq2seq(self):
      self.dataset = self.dataset.map(
          lambda example: {
              'text': self.prefix + ' ' + self.prefix1 + ': ' + example['sentence1'] + ' ' + self.prefix2 + ': ' + example['sentence2'], 
              'labels':  str(round(example['similarity_score'], 1))
          }, 
          remove_columns=['sentence1', 'sentence2', 'similarity_score']
      )
    
    def get_data(self):
      return self.dataset
    
    def map_string2id(self, string_label: str):
        try:
          return float(string_label)
        except ValueError:
          return -1

    @staticmethod
    def get_metric():
      return load_metric('glue', 'stsb')


class QNLI(Task):
    prefix = 'qnli'
    prefix1 = 'otázka'  # 'question' 
    prefix2 = 'veta'   # 'sentence'
    id2string = {0: 'nevyplýva', 1: 'vyplýva'}  # {0: 'entailment', 1: 'not_entailment'}
    string2id = {v: k for k, v in id2string.items()}
    seed = 7

    def __init__(self, seq2seq=True):
        self._prepare_data()
        if seq2seq:
          self._map_to_seq2seq()

    def _prepare_data(self):
      self.dataset = load_dataset('crabz/boolq_sk') 
      self.dataset = self.dataset.map(lambda example: {'answer': int(example['answer'])})  # True -> 1, False -> 0
      dev_test = self.dataset['validation'].train_test_split(shuffle=True, seed=self.seed, test_size=0.7)
      self.dataset['validation'] = dev_test['train']
      self.dataset['test'] = dev_test['test']
    
    def _map_to_seq2seq(self):
      self.dataset = self.dataset.map(
          lambda example: {
              'text': self.prefix + ' ' + self.prefix1 + ': ' +  example['question'] + ' ' + self.prefix2 + ': ' + example['passage'], 
              'labels': self.id2string[example['answer']]
          }, 
          remove_columns=['question', 'passage', 'answer']
      )
    
    def get_data(self):
      return self.dataset
      
    def map_string2id(self, string_label: str):
        try:
          return self.string2id[string_label]
        except KeyError:
          return -1

    @staticmethod
    def get_metric():
      return load_metric('glue', 'qnli')
  

def get_slovak_glue_task(task, seq2seq=True):
      tasks = {
            'sst2': SST2,
            'stsb': STSB,
            'qnli': QNLI
      
      }
  
      if task not in set(tasks.keys()):
        raise KeyError(f'Not valid task, pleas choose from: {tasks.keys()}')
      else:
        return tasks[task](seq2seq)


In [None]:
def get_args():
    args = config_dict.ConfigDict()
    args.model_name = 'ApoTro/slovak-t5-small'
    args.tokenizer_name = 'ApoTro/slovak-t5-small'

    args.task = 'sst2'
    args.max_input_length = 512
    args.max_target_length = 4

    args.output_dir = './'
    args.num_train_epochs = 1
    args.learning_rate =   1e-4
    args.per_device_train_batch_size = 12
    args.per_device_eval_batch_size = 12
    args.gradient_accumulation_steps = 2
    args.eval_steps = 25
    return args

args = get_args()

In [None]:
model = T5ForConditionalGeneration.from_pretrained(args.model_name)
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name)

In [None]:
task = get_slovak_glue_task(args.task)
dataset = task.get_data()
metric = task.get_metric()
string2id = task.map_string2id

In [None]:
tokenized_datasets = dataset.map(lambda examples: tokenizer(examples['text'], max_length=args.max_input_length, truncation=True), remove_columns=['text', 'labels'])
target_tokenized = dataset.map(lambda examples: tokenizer(examples['labels'], max_length=args.max_input_length, truncation=True, return_attention_mask=False), remove_columns=['text', 'labels'] )

for s in ['train', 'test', 'validation']:
  tokenized_datasets[s] = tokenized_datasets[s].add_column('labels', target_tokenized[s]['input_ids'])

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_preds = [string2id(string_label) for string_label in decoded_preds]

    # Replace -100 in the labels as we can't decode them.
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    decoded_labels = [string2id(string_label) for string_label in decoded_labels]
    
    result = metric.compute(predictions=decoded_preds, references=decoded_labels)
    return result

In [None]:
training_args = Seq2SeqTrainingArguments(
      output_dir=args.output_dir,
      evaluation_strategy='steps',    
      eval_steps=args.eval_steps,
      num_train_epochs=args.num_train_epochs,
      learning_rate=args.learning_rate,
      per_device_train_batch_size=args.per_device_train_batch_size, 
      per_device_eval_batch_size=args.per_device_eval_batch_size,
      gradient_accumulation_steps=args.gradient_accumulation_steps,  
      predict_with_generate=True
)


trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=DataCollatorForSeq2Seq(tokenizer),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()