# Data

In [1]:
import pickle
from pathlib import Path
import pandas as pd
import torch

## Preprocessing

In [2]:
def load_data(filename):
    data_dir = Path("./dataset/cola/") / filename
    dataset = pd.read_csv(
        data_dir, 
        sep="\t", 
        header=0, 
        encoding='utf-8', 
        names=['source', 'acceptability_label', 'source_annotation', 'sentence']
    )
    dataset['label'] = dataset['acceptability_label'].astype(int)

    return dataset

def tokenize_datasets(dataset, tokenizer, arch="encoder"):
    sentence = dataset['sentence'].tolist()

    tokenize_sent = tokenizer(
        sentence,
        return_tensors="pt",
        padding = True,
        truncation = True,
        max_length = 150,
        add_special_tokens=True,
        return_token_type_ids = True
    )

    return tokenize_sent

## Custom Dataset Class

In [3]:
class ColaDataset(torch.utils.data.Dataset):
    def __init__(self, tokenized_dataset, labels= None, test=False):
        self.tokenized_dataset = tokenized_dataset
        self.labels = labels
    
    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.tokenized_dataset.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    
    def __len__(self, idx):
        return len(self.labels)

# Model

In [4]:
import torch.nn as nn
from transformers import ElectraModel, ElectraPreTrainedModel

In [5]:
class FCLayer(nn.Module):
    def __init__(self, input_dim, output_dim, dropout_rate = 0.0, use_activation=True):
        super().__init__()
        self.use_activation = use_activation
        self.dropout = nn.Dropout(dropout_rate)
        self.linear = nn.Linear(input_dim, output_dim)
        self.tanh = nn.Tanh()
    
    def forward(self, x):
        x = self.dropout(x)
        if self.use_activation:
            x = self.tanh(x)
        return self.linear(x)

class PoolingHead(nn.Module):
    def __init__(self, input_dim, hidden_dim, pooler_dropout):
        super().__init__()
        self.dense = nn.Linear(input_dim, hidden_dim)
        self.dropout = nn.Dropout(p = pooler_dropout)
    
    def forward(self, hidden_states):
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.dense(hidden_states)
        hidden_states = torch.tanh(hidden_states)
        return hidden_states

class Electra(ElectraPreTrainedModel):
    def __init__(self, config):
        self.electra = ElectraModel(config)
        self.num_labels = config.num_labels
        self.pooling = PoolingHead(input_dim=config.hidden_size, hidden_dim=config.hidden_size, pooler_dropout=0.1)
        self.classifier = nn.Linear(config.hidden_size, self.num_labels)
        
    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None):
        outputs = self.electra(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs[0][:, 0, :]
        pooled_output = self.pooling(pooled_output)
        logits = self.classifier(pooled_output)
        outputs = (logits,) + outputs[2:]

        return outputs


# Loss

In [6]:
class CrossEntropy(nn.Module):
    def __init__(self):
        super(CrossEntropy, self).__init__()
        self.CE = nn.CrossEntropyLoss()
        return

    def forward(self, inputs, target):
        """
        :param inputs: predictions
        :param target: target labels
        :return: loss
        """
        # target = torch.argmax(target, dim=-1)
        loss = self.CE(inputs, target)
        return loss

_criterion_entrypoints = {
    'cross_entropy': CrossEntropy,
}

def criterion_entrypoint(criterion_name):
    return _criterion_entrypoints[criterion_name]

def is_criterion(criterion_name):
    return criterion_name in _criterion_entrypoints

def create_criterion(criterion_name, **kwargs):
    if is_criterion(criterion_name):
        create_fn = criterion_entrypoint(criterion_name)
        criterion = create_fn(**kwargs)
    else:
        raise RuntimeError('Unknown loss (%s)' % criterion_name)
    return criterion

# Utility function(s)

In [7]:
def check_arch(model_type):
  archs = {
    "encoder" : ["Bert", "Electra", "XLMRoberta", "Electra_BoolQ", "Roberta"],
    "encoder-decoder" : ["T5", "Bart", "Bart_BoolQ"]
  }
  for arch in archs:
    if model_type in archs[arch]:
      return arch
  raise ValueError(f"Model [{model_type}] no defined archtecture")

# Training setup

In [11]:
import json
import numpy as np
import os
import random
import argparse
from importlib import import_module
import glob
import re
from collections import defaultdict
from tqdm import tqdm
import time
from time import sleep

from sklearn.metrics import accuracy_score, classification_report

import transformers
from transformers import AutoTokenizer, PreTrainedTokenizerFast

from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader


## Set seed

In [12]:
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)
    

## Training utilities

In [13]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc}

def increment_output_dir(output_path, exist_ok = False):
    path = Path(output_path)
    if (path.exists() and exist_ok) or (not path.exists()):
        return str(path)
    else:
        dirs = glob.glob(f"{path}*")
        matches = [re.search(rf"%s(\d+)" %path.stem, d) for d in dirs]
        i = [int(m.groups()[0]) for m in matches if m]
        n = max(i) + 1 if i else 2
        return f"{path}{n}"

# Training

In [None]:
def train(model_dir, args):
    set_seed()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    num_classes = 2

    # tokenizer
    MODEL_NAME=args.pretrained_model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

    # load dataset
    train_dataset = load_data("./dataset/NIKL_CoLA_train.tsv")
    