In [None]:
# HuggingFace transformers 설치
!pip install transformers
!pip install datasets

In [1]:
import numpy as np
import pandas as pd
import random
import torch
from sklearn import metrics
from torch.nn import functional as F
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from transformers import AutoTokenizer, ElectraModel, ElectraConfig, AdamW, get_linear_schedule_with_warmup
from tqdm.notebook import tqdm
import gc
from sklearn import metrics
from datasets import load_dataset

In [2]:
def one_hot_encoder(dataset, n_labels=44):
    one_hot = [0] * n_labels
    label_idx = dataset
    for idx in label_idx:
        one_hot[idx] = 1
    return {"labels": torch.LongTensor(one_hot)}

In [5]:
class ClassificationHead(nn.Module):

    def __init__(self, config, num_labels):
        super().__init__()

        self.dropout = nn.Dropout(0.1) # config 파일 만들면 바꿔야함
        self.dense = nn.Linear(768, 768)
        self.linear = nn.Linear(768, num_labels) # config 파일 만들면 바꿔야함
        self.gelu = nn.GELU()

    def forward(self, x):
        x = x[:, 0, :] # [CLS] 토큰에 해당하는 부분만 가져온다
        x = self.dropout(x)
        x = self.dense(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.linear(x)
        x = torch.sigmoid(x)
        
        torch.cuda.empty_cache()

        return x

In [6]:
def loss_function(outputs, labels):
    if labels is None:
        return None
    return nn.BCEWithLogitsLoss()(outputs, labels.float())

In [7]:
def log_metrics(preds, labels):
    preds = torch.stack(preds)
    preds = preds.cpu().detach().numpy()
    labels = torch.stack(labels)
    labels = labels.cpu().detach().numpy()
    
    fpr_micro, tpr_micro, _ = metrics.roc_curve(labels.ravel(), preds.ravel())
    auc_micro = metrics.auc(fpr_micro, tpr_micro)
    
    return {"auc_micro": auc_micro}

In [8]:
class ElectraClassification(nn.Module):
    def __init__(self):
        super().__init__()

        self.num_labels = 44
        self.config = ElectraConfig.from_pretrained("beomi/KcELECTRA-base", 
                                           problem_type="multi_label_classification",
                                           num_labels=self.num_labels)
        # self.num_labels = len(labels) if labels is not None else 0
        
        self.model = ElectraModel.from_pretrained("beomi/KcELECTRA-base", config=self.config).to(device)

        self.classifier = ClassificationHead("beomi/KcELECTRA-base", self.num_labels)

        # self.loss_function = nn.BCEWithLogitsLoss()

        # self.init_weights()

    def forward(self, input_ids=None, attention_mask=None):

        model_output = self.model(input_ids, attention_mask)[0]
        output = self.classifier(model_output)


        return output




In [9]:
# train
def train(model, train_dataloader, scheduler, batch_size, n_epochs, lr=1e-5):

    total_loss = 0
        
    model.train()

    for train_input in tqdm(train_dataloader):
        optimizer.zero_grad()
        y_batch = train_input["labels"].to(device)
        mask = train_input["attention_mask"].to(device)
        input_id = train_input["input_ids"].to(device)
        y_pred = model(input_id, mask)
        loss = loss_function(y_pred, y_batch) 
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()


    return total_loss


In [12]:
def test(model, test_dataloader, batch_size):
    
    test_loss = 0
    targets = []
    outputs = []

    model.eval()
    with torch.no_grad():
        for test_input in tqdm(test_dataloader):
            y_batch = test_input["labels"].to(device)
            mask = test_input["attention_mask"].to(device)
            input_id = test_input["input_ids"].squeeze(1).to(device)
            y_pred = model(input_id, mask)
            loss = loss_function(y_pred, y_batch)
            test_loss += loss.item()

            targets.extend(y_batch)
            outputs.extend(y_pred)

    
    return test_loss, targets, outputs

In [None]:
if __name__ == "__main__":

    # 랜덤 시드 고정
    SEED = 42
    np.random.seed(SEED)
    random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)  # type: ignore
    torch.cuda.manual_seed_all(SEED) # if use multi-GPU
    torch.backends.cudnn.deterministic = True  # type: ignore
    torch.backends.cudnn.benchmark = True  # type: ignore

    max_length = 64
    n_epochs = 10
    batch_size = 32
    lr = 2e-5

    dataset = load_dataset("searle-j/kote")

    tokenizer = AutoTokenizer.from_pretrained("beomi/KcELECTRA-base", do_lower_case=False)

    dataset = dataset.map(lambda x: tokenizer(x["text"],
                                              truncation=True,
                                              max_length=max_length,
                                              padding="max_length",
                                              return_token_type_ids=False,
                                              return_attention_mask=True,
                                              add_special_tokens=True), batched=True)
    dataset = dataset.map(lambda x: one_hot_encoder(x["labels"]))
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])

    train_dataset = ConcatDataset([dataset["train"], dataset["validation"]])
    test_dataset = dataset["test"]
    
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)   

    device = "cuda" if torch.cuda.is_available() else "cpu"

    model = ElectraClassification().to(device)

    optimizer = AdamW(model.parameters(), lr=lr)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=2500, num_training_steps = len(train_dataset) / batch_size *n_epochs)

    best_val_loss = 100

    for epoch in range(n_epochs):
        gc.collect()
        torch.cuda.empty_cache()
        train_loss= train(model, train_dataloader, scheduler, batch_size, n_epochs, lr=lr)
        test_loss, labels, preds = test(model, test_dataloader, batch_size)

        auc_score = log_metrics(preds, labels)["auc_micro"]
        avg_train_loss, avg_val_loss = train_loss / len(train_dataloader), test_loss / len(test_dataloader)

        print(f"[{epoch+1}/{n_epochs}]")
        print(f"AUC score: {auc_score}")
        print(f"Average Train Loss: {avg_train_loss}")
        print(f"Average Valid Loss: {avg_val_loss}")
        print("\n")

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), "./best_model.pt")
            print(f"Model saved as current valid loss: {best_val_loss}")
