<a href="https://colab.research.google.com/github/onewon1234/AI_DL_Project/blob/KLUE-BERT/KlueBert_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:

import pandas as pd
import os
import zipfile

file_path = '/content/drive/MyDrive/data/open.zip'

extract_path = '/content/data'
os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("압축 해제 완료")

압축 해제 완료


In [3]:

print(os.listdir(extract_path))

['sample_submission.csv', 'test.csv', 'train.csv']


In [4]:

csv1_path = os.path.join(extract_path, 'sample_submission.csv')
sample_submission_data = pd.read_csv(csv1_path)
print(sample_submission_data.head())

csv2_path = os.path.join(extract_path, 'train.csv')
train_data = pd.read_csv(csv2_path)
print(train_data.head())

csv3_path = os.path.join(extract_path, 'test.csv')
test_data = pd.read_csv(csv3_path)
print(test_data.head())


          ID  answer_0  answer_1  answer_2  answer_3
0  TEST_0000         0         1         2         3
1  TEST_0001         0         1         2         3
2  TEST_0002         0         1         2         3
3  TEST_0003         0         1         2         3
4  TEST_0004         0         1         2         3
           ID                                         sentence_0  \
0  TRAIN_0000                 블록체인 기술은 투표 과정의 투명성을 크게 향상시킬 수 있다.   
1  TRAIN_0001  줄거리 자동 생성의 인공지능 알고리즘은 대량의 텍스트 데이터를 분석하여 핵심 정보를...   
2  TRAIN_0002  마지막으로, 키친타올을 보관할 때는 쉽게 접근할 수 있는 곳에 두어 낭비를 방지하는...   
3  TRAIN_0003   책의 페이지가 손상되지 않도록 수직으로 세워 두거나 평평하게 눕혀 보관하는 것이 좋다.   
4  TRAIN_0004  인공지능 모델은 반복적인 실험을 통해 지속적으로 학습하며, 이를 통해 발견의 정확성...   

                                          sentence_1  \
0  이러한 특성은 유권자들에게 신뢰를 제공하며, 민주적 참여를 촉진하는 데 기여할 수 있다.   
1     결과적으로, 이러한 기술은 사용자에게 신속하고 효율적인 정보 전달을 가능하게 한다.   
2          재사용 가능한 천이나 스펀지를 활용하면 키친타올의 필요성을 줄일 수 있다.   
3      정기적으로 먼지를 털어내고, 곰팡이나 해충의 발생 여부를 점검하는 것이 중요

In [5]:
import torch

In [6]:
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix
from tqdm import tqdm

In [7]:
# Pairwise 쌍 만들기 (조합 이용)
from itertools import combinations

def create_pairwise_data(train_data):
  pairs = []
  labels = []
  for _, row in train_data.iterrows():
    sentences = [row['sentence_0'], row['sentence_1'], row['sentence_2'], row['sentence_3']]
    answers = [row['answer_0'], row['answer_1'], row['answer_2'], row['answer_3']]

    #문장 쌍 생성
    for i, j in combinations(range(4),2):
      pairs.append((sentences[i], sentences[j]))
      labels.append(1 if answers[i] < answers[j] else 0)   #answers가 작은 값이 앞에 오게 설정

  return pairs, labels

In [8]:
#Dataset 커스텀 class 만들기
class SentencePairDataset(Dataset):
  def __init__(self, pairs, labels, tokenizer, max_length=128):
    sentence1s = [pair[0] for pair in pairs]
    sentence2s = [pair[1] for pair in pairs]

    self.encodings = tokenizer(
        sentence1s,
        sentence2s,
        add_special_tokens = True,
        max_length = max_length,
        padding = 'max_length',    #max_length보다 짧으면 패딩으로 채
        truncation=True,           #max_length보다 길면 자동으로 자름
        return_tensors='pt'
    )
    self.labels = torch.tensor(labels, dtype=torch.long)    #이진 분류를 위해 long type으로 dtype

  def __getitem__(self, idx):
      item = {
        'input_ids': self.encodings['input_ids'][idx],                 #tokenizing 결과
        'attention_mask': self.encodings['attention_mask'][idx],      #padding인지 아닌지 구분
        'token_type_ids': self.encodings['token_type_ids'][idx],      #문장 구분용 (1st 문장:0, 2nd 문장:1)
        'labels': self.labels[idx]                                    #정답값
      }
      return item

  def __len__(self):
    return len(self.labels)

In [9]:
#문장쌍 이진분류 위한 BERT 분류기 만들기
class SentencePairClassifier(nn.Module):
  def __init__(self, model_name='klue/bert-base'):    #klue_BERT 모델 사용
    super().__init__()
    self.bert = AutoModel.from_pretrained(model_name)
    self.classifier = nn.Linear(self.bert.config.hidden_size, 1)   #BERT 출력 벡터 차원 = 1로 설정

  def forward(self, input_ids, attention_mask, token_type_ids):   #데이터 계산 경로 지정
    #BERT 모델 사용해서 문장 임베딩 추출
    outputs = self.bert(
        input_ids = input_ids,
        attention_mask = attention_mask,
        token_type_ids = token_type_ids
    )
    # [CLS] 토큰 임베딩 추출
    cls_output = outputs.last_hidden_state[:, 0, :]
    #이진 분류 수행 - Sigmoid 함수 이용
    logits = self.classifier(cls_output)
    return logits.squeeze(-1)

In [10]:
# 모델 학습 정의
def train_epoch(model, train_loader, optimizer, criterion, device):
  model.train()
  total_loss = 0   #누적 손실 계

  for batch in tqdm(train_loader, desc='Training'):
    #batch_data -> GPU device로 이동시키기
    inputs_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    token_type_ids = batch['token_type_ids'].to(device)
    labels = batch['labels'].to(device).float()

    #순전파 - 입력 데이터 모델에 통과 -> 예측값 생성
    logits = model(inputs_ids, attention_mask, token_type_ids)
    loss = criterion(logits, labels)

    #역전파 & Optimizer
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  return total_loss / len(train_loader)  #평균 손실 반환 (학습률 계산)


In [11]:
# Model 평가 함수 정의
from sklearn.metrics import accuracy_score, f1_score

def evaluate(model, data_loader, criterion, device):
  model.eval()
  total_loss = 0
  all_preds = []
  all_labels = []

  with torch.no_grad():
    for batch in data_loader:
      input_ids = batch['input_ids'].to(device)
      attention_mask = batch['attention_mask'].to(device)
      token_type_ids = batch['token_type_ids'].to(device)
      labels = batch['labels'].to(device).float()

      logits = model(input_ids, attention_mask, token_type_ids)
      loss = criterion(logits, labels)
      total_loss += loss.item()

      preds = (torch.sigmoid(logits)>0.5).long()
      all_preds.extend(preds.cpu().tolist())
      all_labels.extend(labels.cpu().tolist())

  metrics = calculate_metrics(all_labels, all_preds)
  metrics['predictions'] = all_preds
  return total_loss / len(data_loader), metrics

In [12]:
# metrics 정의
def calculate_metrics(labels, preds):
  return {
      'accuracy': accuracy_score(labels, preds),
      'f1': f1_score(labels, preds)
  }

def print_metrics(metrics):
  print(f'Validation metrics:')
  print(f'Accuracy: {metrics["accuracy"]:.4f}')
  print(f'F1-score: {metrics["f1"]:.4f}')

In [13]:
#예측 저장하기 위한 함수 생성
def save_predictions(labels, preds, filename='predictions.csv'):
    results_df = pd.DataFrame({
        'true_order': labels,
        'predicted_order': preds
    })
    results_df.to_csv(filename, index=False)

In [None]:
# main 함수 정의
def main():
  csv2_path = os.path.join(extract_path, 'train.csv')
  train_data = pd.read_csv(csv2_path)
  train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)
  tokenizer = AutoTokenizer.from_pretrained('klue/bert-base')
  train_pairs, train_labels = create_pairwise_data(train_data)
  val_pairs, val_labels = create_pairwise_data(val_data)
  train_dataset = SentencePairDataset(train_pairs, train_labels, tokenizer)
  val_dataset = SentencePairDataset(val_pairs, val_labels, tokenizer)

  #epoch당 처리할 batch 개수 설정
  train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
  val_loader = DataLoader(val_dataset, batch_size=16)

  #디바이스 설정(GPU)
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  print(f'Using device: {device}')

  #모델 초기화
  model = SentencePairClassifier()
  model = model.to(device)

  # Optimizer, Loss ftn 설정
  optimizer =  torch.optim.AdamW(model.parameters(), lr=2e-5)   #AdamW 옵티마이저 사용
  criterion = torch.nn.BCEWithLogitsLoss()      #Binary Cross Entropy Loss 사용

  num_epochs = 5    #epoch=5로 설정
  best_val_f1 = 0
  patience = 2
  counter = 0


  for epoch in range(num_epochs):
    train_losses = []
    val_losses = []
    val_f1s = []
    print(f'\nEpoch: {epoch+1}/{num_epochs}')

    # Training
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    print(f'Average training loss: {train_loss:.4f}')

    # 검증
    val_loss, metrics = evaluate(model, val_loader, criterion, device)
    print(f'Average validation loss: {val_loss:.4f}')
    print_metrics(metrics)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_f1s.append(metrics['f1'])

    # 최고 성능 모델 저장
    if metrics['f1'] > best_val_f1:
        best_val_f1 = metrics['f1']
        counter = 0
        torch.save(model.state_dict(), 'best_model.path')
        print("✅ Best model saved.")
    else:
        counter += 1
        print(f"🔁 No improvement. Early stopping counter: {counter}/{patience}")
        if counter >= patience:
            print("Early stopping triggered.")
            break

  # best model로 최종 평가
  print("\nFinal Evaluation:")
  model.load_state_dict(torch.load('best_model.path'))
  _, final_metrics = evaluate(model, val_loader, criterion, device)
  print_metrics(final_metrics)

  save_predictions(val_labels, final_metrics['predictions'])


if __name__ == '__main__':
  main()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/289 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/425 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/248k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/495k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

Using device: cpu


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/445M [00:00<?, ?B/s]


Epoch: 1/5


Training:   5%|▌         | 112/2205 [40:01<12:29:59, 21.50s/it]

In [None]:
  # best model로 최종 평가
  print("\nFinal Evaluation:")
  model.load_state_dict(torch.load('best_model.path'))
  _, final_metrics = evaluate(model, val_loader, criterion, device)
  print_metrics(final_metrics)

  save_predictions(val_labels, final_metrics['predictions'])

if __name__ == '__main__':
  main()

In [None]:
if __name__ == '__main__':
    main()

    import matplotlib.pyplot as plt

    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    plt.plot(val_f1s, label='Validation F1')
    plt.title('Validation F1 over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('F1-score')
    plt.legend()
    plt.show()