<a href="https://colab.research.google.com/github/sbbaik/small_bert/blob/main/Small_BERT_Korean_Pre_training_Consolidated_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import os
import math
from google.colab import drive
from torch.utils.data import Dataset, DataLoader
from tokenizers import BertWordPieceTokenizer
from datasets import load_dataset # Hugging Face datasets 라이브러리 임포트

# --- 1. Colab 환경 설정 및 구글 드라이브 마운트 ---
# 구글 드라이브 마운트
drive.mount('/content/drive')

# 드라이브에 저장할 경로 설정
SAVE_PATH = '/content/drive/MyDrive/small_bert_korean'

# 디렉토리가 없으면 생성
if not os.path.exists(SAVE_PATH):
    os.makedirs(SAVE_PATH)
    print(f"디렉토리 '{SAVE_PATH}'가 생성되었습니다.")

# --- 2. 하이퍼파라미터 정의 ---
d_model = 768  # 일반 BERT-base와 동일한 임베딩 차원
n_head = 12    # 일반 BERT-base와 동일한 헤드 수
num_layers = 6 # BERT-base의 절반
dim_feedforward = d_model * 4
max_len = 50
dropout = 0.1

# --- 3. Positional Encoding 클래스 정의 ---
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=50):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # math.log를 사용하기 위해 math 모듈이 임포트되어 있어야 합니다.
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# --- 4. SmallBERT 모델 클래스 정의 ---
class SmallBERT(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, num_layers, dim_feedforward, max_len, dropout):
        super(SmallBERT, self).__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_head,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )

        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers
        )

        # 새로운 선형 레이어 추가: d_model -> vocab_size로 차원을 변환
        self.output_layer = nn.Linear(d_model, vocab_size)

    def forward(self, src):
        # src: (batch_size, seq_len)
        x = self.embedding(src) * math.sqrt(d_model)
        x = self.pos_encoder(x)
        output = self.transformer_encoder(x)

        # 마지막에 output_layer를 통과시켜 vocab_size 차원으로 변환
        output = self.output_layer(output)

        return output

# --- 5. 한국어 데이터셋 준비 (NSMC 데이터셋 사용) ---
print("네이버 영화 리뷰 감성분석 데이터셋(NSMC)을 로드합니다...")
# NSMC 데이터셋 로드
# 'train' 스플릿만 사용 (사전 학습용)
nsmc_dataset = load_dataset('nsmc', split='train')
print(f"NSMC 데이터셋 로드 완료. 총 {len(nsmc_dataset)}개의 샘플.")

# 데이터셋 클래스 정의 (Hugging Face Dataset 객체를 직접 받도록 수정)
class TextDataset(Dataset):
    def __init__(self, hf_dataset, tokenizer, block_size=50):
        self.tokenizer = tokenizer
        self.block_size = block_size
        self.examples = []

        # Hugging Face Dataset의 'document' 필드에서 텍스트를 추출하여 토큰화
        for i, entry in enumerate(hf_dataset):
            text = entry['document']
            if text is None: # None 값 처리
                continue
            tokenized_text = self.tokenizer.encode(text)
            token_ids = tokenized_text.ids

            # 텍스트를 고정된 길이의 블록으로 분할
            for j in range(0, len(token_ids) - block_size + 1, block_size):
                self.examples.append(torch.tensor(token_ids[j:j+block_size], dtype=torch.long))

        print(f"TextDataset에 {len(self.examples)}개의 블록이 준비되었습니다.")

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, i):
        return self.examples[i]

# --- 6. BPE 기반 한국어 토크나이저 학습 및 저장 ---
tokenizer = BertWordPieceTokenizer(
    clean_text=True,
    handle_chinese_chars=False,
    strip_accents=False,
    lowercase=False
)

# NSMC 데이터셋의 'document' 필드에서 텍스트를 추출하여 토크나이저 학습에 사용
# generator를 사용하여 메모리 효율적으로 텍스트를 전달
def get_training_corpus():
    for i in range(0, len(nsmc_dataset), 1000): # 대규모 데이터셋의 경우 배치로 처리
        yield [text for text in nsmc_dataset[i : i + 1000]['document'] if text is not None]

print("토크나이저를 학습합니다...")
tokenizer.train_from_iterator(
    get_training_corpus(),
    vocab_size=30000,
    min_frequency=2,
    show_progress=True,
    special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]
)

# 학습된 토크나이저를 드라이브에 저장
tokenizer.save_model(SAVE_PATH)
print(f"토크나이저가 '{SAVE_PATH}'에 저장되었습니다.")

# --- 7. 모델 인스턴스 생성 및 사전 학습 루프 ---
vocab_size = tokenizer.get_vocab_size() # 토크나이저 학습 후 실제 vocab_size 가져오기

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SmallBERT(vocab_size, d_model, n_head, num_layers, dim_feedforward, max_len, dropout).to(device)

# 데이터로더 설정 (NSMC 데이터셋을 TextDataset에 전달)
dataset_for_training = TextDataset(nsmc_dataset, tokenizer, block_size=max_len)
dataloader = DataLoader(dataset_for_training, batch_size=32, shuffle=True)

# 옵티마이저 및 손실 함수
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.token_to_id("[PAD]")) # PAD 토큰은 손실 계산에서 제외

# 사전 학습 루프
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in dataloader:
        inputs = batch.to(device)

        # MLM을 위한 마스킹
        labels = inputs.clone().detach()
        # 15%의 토큰을 마스킹
        masked_indices = torch.rand(inputs.shape) < 0.15
        # [CLS], [SEP], [PAD] 토큰은 마스킹하지 않도록 예외 처리
        masked_indices[inputs == tokenizer.token_to_id("[CLS]")] = False
        masked_indices[inputs == tokenizer.token_to_id("[SEP]")] = False
        masked_indices[inputs == tokenizer.token_to_id("[PAD]")] = False

        inputs[masked_indices] = tokenizer.token_to_id("[MASK]")

        optimizer.zero_grad()
        outputs = model(inputs)

        # 마스킹된 토큰에 대한 손실 계산
        # outputs: (batch_size * seq_len, vocab_size)
        # labels: (batch_size * seq_len)
        loss = criterion(outputs.view(-1, vocab_size), labels.view(-1))

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Perplexity: {math.exp(avg_loss):.4f}")

    # 중간 체크포인트 저장 (구글 드라이브)
    checkpoint_path = os.path.join(SAVE_PATH, f'model_checkpoint_epoch_{epoch+1}.pt')
    torch.save(model.state_dict(), checkpoint_path)
    print(f"모델 체크포인트가 {checkpoint_path}에 저장되었습니다.")

# --- 8. 최종 모델 및 성능 지표 저장 ---
final_model_path = os.path.join(SAVE_PATH, 'final_small_bert.pt')
torch.save(model.state_dict(), final_model_path)
print(f"최종 모델이 {final_model_path}에 저장되었습니다.")

performance_log_path = os.path.join(SAVE_PATH, 'training_performance.txt')
with open(performance_log_path, 'w') as f:
    f.write(f"Final Average Loss: {avg_loss}\n")
    f.write(f"Final Perplexity: {math.exp(avg_loss)}\n")
    f.write(f"Total Epochs: {num_epochs}\n")
print(f"학습 성능 지표가 {performance_log_path}에 저장되었습니다.")