## KB_BERT

### 필요한 모델 및 라이브러리 불러오기

In [None]:
!git clone https://github.com/KB-AI-Research/KB-ALBERT.git

In [None]:
!pip install torch transformers

### Library

In [2]:
from transformers import AlbertForSequenceClassification, AdamW
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer
import torch
from torch.optim import Adam
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import numpy as np

In [None]:
# Load Tokenizer and Model
kb_albert_model_path = "/content/drive/MyDrive/kb_chs/data/kb-albert-char-base-v2"  # Update the path here 
model = AutoModel.from_pretrained(kb_albert_model_path)
tokenizer = AutoTokenizer.from_pretrained(kb_albert_model_path)

In [None]:
# Load Dataset
df = pd.read_csv('/content/drive/MyDrive/kb_chs/data/aihub_ox.csv')

# Custom Dataset
class QADataset(Dataset):
    def __init__(self, df, tokenizer):
        self.df = df            # 문제 데이터 파일과 토큰화된 데이터
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        context = self.df['context'].iloc[idx] # 질문의 바탕이 되는 내용
        question = self.df['question'].iloc[idx] # 질문
        answer = self.df['answer'].iloc[idx] # 정답
        inputs = self.tokenizer.encode_plus(question, context, max_length=512, padding='max_length', truncation=True, return_tensors='pt', return_token_type_ids=False)
        return {'inputs': inputs, 'answer': answer}

# Split the dataframe into train and validation data
train, test_df = train_test_split(df, test_size=0.2, random_state=42) # 데이터 분리
train_df, val_df = train_test_split(train, test_size=0.2, random_state=42) # 데이터 분리

# Initialize Datasets and Dataloaders for train and validation set
train_dataset = QADataset(train_df, tokenizer) # 훈련 데이터
val_dataset = QADataset(val_df, tokenizer) # 검정 데이터

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)  # No need to shuffle validation data

# Initialize model
model = AlbertForSequenceClassification.from_pretrained(kb_albert_model_path, num_labels=2) # 2개의 라벨 분류
model = model.to('cuda') # GPU 사용 설정

# Initialize optimizer
optimizer = Adam(model.parameters(), lr=1e-4) # Adam 옵티마이저

# 설정값
best_loss = 9999999
max_patience = 5
num_patience = 0

# Model Training
for epoch in range(50):  # Number of epochs
    model.train()
    train_loss_list = []
    for batch in train_dataloader: # 훈련 데이터 배치 반복
        inputs = {k: v.squeeze(1).to('cuda') for k, v in batch['inputs'].items()}  # squeeze the unnecessary dimension
        labels = torch.stack([torch.tensor(int(a)).to('cuda') for a in batch['answer']]) # 입력 데이터
        
        # 모델 순전파
        optimizer.zero_grad()
        outputs = model(**inputs, labels=labels)
        loss = outputs.loss
        
        # 역전파 및 가중치 업데이트
        loss.backward()
        optimizer.step()
        train_loss_list.append(loss.item())
        
    model.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        val_loss_list = []
        y_true = []  # 실제 정답값 저장
        y_pred = []  # 예측값 저장
        for batch in val_dataloader:
            inputs = {k: v.squeeze(1).to('cuda') for k, v in batch['inputs'].items()} # 입력 데이터
            labels = torch.stack([torch.tensor(int(a)).to('cuda') for a in batch['answer']]) # 정답값
            outputs = model(**inputs, labels=labels) # 예측값
            loss = outputs.loss
            val_loss_list.append(loss.item())

            _, predicted = torch.max(outputs.logits, 1) # 예측값 조정
            total += labels.size(0)
            correct += (predicted == labels).sum().item() # 정답률
            y_true.extend(labels.cpu().numpy()) # 실제값 저장 
            y_pred.extend(predicted.cpu().numpy()) # 예측값 저장

    # y_true와 y_pred를 사용하여 f1 스코어 계산
    f1 = f1_score(y_true, y_pred, average='weighted')

    print(f'train_loss: {np.mean(train_loss_list):.5f} val_loss: {np.mean(val_loss_list):.5f}') # train과 valid의 loss값
    print(f"val_Accuracy: {(100 * correct / total):.2f} val_F1 Score: {f1:.2f}") # val 데이터의 정확도와 f1-score

    val_loss = np.mean(val_loss_list)
    if best_loss > val_loss: # loss 최소화
        print("Save new model on epoch: %d" % (epoch + 1))
        best_loss = val_loss
        best_model = model
        print('Model Saved')

    else:
        num_patience += 1

    if num_patience >= max_patience: # Early Stop
        print(f"Early Stopped after epoch {epoch+1}")
        break

In [None]:
test_dataset = QADataset(test_df, tokenizer) # 검정 데이터
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Inference
- kb-bert

In [None]:
# Evaluation
model.eval()
with torch.no_grad():
    total = 0
    correct = 0
    y_true = []  # 실제 정답값 저장
    y_pred = []  # 예측값 저장
    for batch in test_dataloader:
        inputs = {k: v.squeeze(1).to('cuda') for k, v in batch['inputs'].items()} # 입력 데이터
        labels = torch.stack([torch.tensor(int(a)).to('cuda') for a in batch['answer']]) # 정답값
        outputs = model(**inputs) # 예측값
        _, predicted = torch.max(outputs.logits, 1) # 예측값 조정
        total += labels.size(0)
        correct += (predicted == labels).sum().item() # 정답률
        y_true.extend(labels.cpu().numpy()) # 실제값 저장
        y_pred.extend(predicted.cpu().numpy()) # 예측값 저장


# y_true와 y_pred를 사용하여 f1 스코어 계산
f1 = f1_score(y_true, y_pred, average='weighted')

print('Accuracy: %d %%' % (100 * correct / total)) #
print('F1 Score:', f1)