In [1]:
# ==============================================================================
# PART 1: 모델 정의 (Transformer 전체 아키텍처)
# ==============================================================================
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

print(f"PyTorch Version: {torch.__version__}")

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model
    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps
    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta

class ResidualConnection(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super(ResidualConnection, self).__init__()
        self.norm = LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        assert d_model % n_head == 0
        self.d_k = d_model // n_head
        self.n_head = n_head
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_concat = nn.Linear(d_model, d_model)
    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)
        q = self.w_q(q).view(batch_size, -1, self.n_head, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(batch_size, -1, self.n_head, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(batch_size, -1, self.n_head, self.d_k).transpose(1, 2)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            # mask==0인 부분, 즉 <pad>이거나 미래 시점인 부분을 -1e9로 채움
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = F.softmax(scores, dim=-1)
        context = torch.matmul(attn, v)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.n_head * self.d_k)
        output = self.w_concat(context)
        return output

class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()
    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.res_conn1 = ResidualConnection(d_model, dropout)
        self.res_conn2 = ResidualConnection(d_model, dropout)
    def forward(self, x, src_mask):
        x = self.res_conn1(x, lambda x: self.self_attn(x, x, x, src_mask))
        x = self.res_conn2(x, self.feed_forward)
        return x

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, n_layers, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = TokenEmbedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_head, d_ff, dropout) for _ in range(n_layers)])
        self.norm = LayerNorm(d_model)
    def forward(self, x, src_mask):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, src_mask)
        return self.norm(x)

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.enc_dec_attn = MultiHeadAttention(d_model, n_head)
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.res_conn1 = ResidualConnection(d_model, dropout)
        self.res_conn2 = ResidualConnection(d_model, dropout)
        self.res_conn3 = ResidualConnection(d_model, dropout)
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.res_conn1(x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.res_conn2(x, lambda x: self.enc_dec_attn(x, encoder_output, encoder_output, src_mask))
        x = self.res_conn3(x, self.feed_forward)
        return x

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, n_layers, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = TokenEmbedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_head, d_ff, dropout) for _ in range(n_layers)])
        self.norm = LayerNorm(d_model)
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, n_layers, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size, d_model, n_head, d_ff, n_layers, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, n_head, d_ff, n_layers, dropout)
        self.generator = nn.Linear(d_model, tgt_vocab_size)
    def forward(self, src, tgt, src_mask, tgt_mask):
        encoder_output = self.encoder(src, src_mask)
        decoder_output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)
        return self.generator(decoder_output)

# ==============================================================================
# PART 2: 데이터 준비 (Tatoeba 데이터셋)
# ==============================================================================
import unicodedata
import re
import random
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# 데이터셋 다운로드 및 압축 해제
!wget http://www.manythings.org/anki/fra-eng.zip -O fra-eng.zip
!unzip -o fra-eng.zip

PAD_TOKEN = 0
SOS_TOKEN = 1
EOS_TOKEN = 2

class Vocabulary:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.index2word = {PAD_TOKEN: "<pad>", SOS_TOKEN: "<s>", EOS_TOKEN: "</s>"}
        self.n_words = 3
    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)
    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.index2word[self.n_words] = word
            self.n_words += 1

def normalize_string(s):
    s = s.lower().strip()
    s = ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def prepare_data():
    lines = open('fra.txt', encoding='utf-8').read().strip().split('\n')
    pairs = [[normalize_string(s) for s in l.split('\t')[:2]] for l in lines]
    pairs = [list(reversed(p)) for p in pairs] # Reverse for French -> English
    input_lang, output_lang = Vocabulary('fra'), Vocabulary('eng')
    MAX_LENGTH = 15
    pairs = [p for p in pairs if len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH]
    for pair in pairs:
        input_lang.add_sentence(pair[0])
        output_lang.add_sentence(pair[1])
    return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepare_data()
print("Data preparation complete.")
print(f"French vocab size: {input_lang.n_words}")
print(f"English vocab size: {output_lang.n_words}")

def indexes_from_sentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]

class TatoebaDataset(Dataset):
    def __init__(self, pairs, input_lang, output_lang):
        self.pairs = pairs
        self.input_lang = input_lang
        self.output_lang = output_lang
    def __len__(self):
        return len(self.pairs)
    def __getitem__(self, idx):
        pair = self.pairs[idx]
        input_tensor = torch.tensor(indexes_from_sentence(self.input_lang, pair[0]), dtype=torch.long)
        output_tensor = torch.tensor(indexes_from_sentence(self.output_lang, pair[1]), dtype=torch.long)
        return input_tensor, output_tensor

def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(torch.cat([torch.tensor([SOS_TOKEN]), src_sample, torch.tensor([EOS_TOKEN])], dim=0))
        tgt_batch.append(torch.cat([torch.tensor([SOS_TOKEN]), tgt_sample, torch.tensor([EOS_TOKEN])], dim=0))
    src_padded = pad_sequence(src_batch, padding_value=PAD_TOKEN, batch_first=True)
    tgt_padded = pad_sequence(tgt_batch, padding_value=PAD_TOKEN, batch_first=True)
    return src_padded, tgt_padded

dataset = TatoebaDataset(pairs, input_lang, output_lang)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
print("DataLoader is ready.")

# ==============================================================================
# PART 3: 최종 마스크 생성 함수 및 모델 테스트
# ==============================================================================

def create_masks(src, tgt, device):
    """
    소스 및 타겟 시퀀스에 대한 마스크를 생성합니다.
    이 함수는 브로드캐스팅을 활용하여 올바른 4D 마스크를 생성합니다.
    """
    # 소스 마스크 (B, 1, 1, S_len)
    src_mask = (src != PAD_TOKEN).unsqueeze(1).unsqueeze(2).to(device)

    # 타겟 마스크 (B, 1, T_len, T_len)
    seq_len = tgt.size(1)
    # 패딩 마스크 (B, 1, T_len, 1)
    tgt_pad_mask = (tgt != PAD_TOKEN).unsqueeze(1).unsqueeze(3).to(device)
    # 룩어헤드 마스크 (T_len, T_len)
    tgt_look_ahead_mask = torch.tril(torch.ones(seq_len, seq_len, device=device)).bool()
    # 최종 결합
    tgt_mask = tgt_pad_mask & tgt_look_ahead_mask
    return src_mask, tgt_mask

# --- 테스트 실행 ---
print("\n--- Final Model Test Start ---")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 하이퍼파라미터
D_MODEL = 512
N_HEAD = 8
D_FF = 2048
N_LAYERS = 6
DROPOUT = 0.1

# 모델 생성
model = Transformer(
    src_vocab_size=input_lang.n_words,
    tgt_vocab_size=output_lang.n_words,
    d_model=D_MODEL,
    n_head=N_HEAD,
    d_ff=D_FF,
    n_layers=N_LAYERS,
    dropout=DROPOUT
).to(device)

# 데이터로더에서 한 배치 가져오기
src_batch, tgt_batch = next(iter(dataloader))
src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device)

tgt_input = tgt_batch[:, :-1]

# 마스크 생성
src_mask, tgt_mask = create_masks(src_batch, tgt_input, device)

# 모델 forward pass
try:
    output = model(src_batch, tgt_input, src_mask, tgt_mask)
    print("\n--- TEST RESULT: SUCCESS! ---")
    print(f"Source batch shape: {src_batch.shape}")
    print(f"Target input shape: {tgt_input.shape}")
    print(f"Source mask shape: {src_mask.shape}")
    print(f"Target mask shape: {tgt_mask.shape}")
    print(f"Final output shape: {output.shape}")
except Exception as e:
    print("\n--- TEST RESULT: FAILED ---")
    print(f"An error occurred: {e}")


PyTorch Version: 2.8.0+cu126
--2025-09-23 02:55:38--  http://www.manythings.org/anki/fra-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8143096 (7.8M) [application/zip]
Saving to: ‘fra-eng.zip’


2025-09-23 02:55:40 (4.47 MB/s) - ‘fra-eng.zip’ saved [8143096/8143096]

Archive:  fra-eng.zip
  inflating: _about.txt              
  inflating: fra.txt                 
Data preparation complete.
French vocab size: 25080
English vocab size: 15633
DataLoader is ready.

--- Final Model Test Start ---
Using device: cuda

--- TEST RESULT: SUCCESS! ---
Source batch shape: torch.Size([64, 15])
Target input shape: torch.Size([64, 13])
Source mask shape: torch.Size([64, 1, 1, 15])
Target mask shape: torch.Size([64, 1, 13, 13])
Final output shape: torch.Size([64, 13, 15633])


In [None]:
import time

def train_epoch(model, dataloader, optimizer, criterion, device):
    """ 1 에폭 동안 모델을 훈련시키는 함수 """
    model.train()  # 모델을 훈련 모드로 설정
    total_loss = 0

    for src_batch, tgt_batch in dataloader:
        src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device)

        # 훈련 데이터 준비
        # 디코더 입력: <s>부터 마지막에서 두 번째 단어까지
        tgt_input = tgt_batch[:, :-1]
        # 정답 레이블: 첫 번째 단어 <s>를 제외하고, </s>까지
        tgt_output = tgt_batch[:, 1:]

        # 마스크 생성
        src_mask, tgt_mask = create_masks(src_batch, tgt_input, device)

        # 순전파 및 역전파
        optimizer.zero_grad() # 그래디언트 초기화
        prediction = model(src_batch, tgt_input, src_mask, tgt_mask)

        # 손실 계산
        # prediction: (B, T, V) -> (B * T, V)
        # tgt_output: (B, T) -> (B * T)
        loss = criterion(
            prediction.reshape(-1, prediction.size(-1)),
            tgt_output.reshape(-1)
        )

        loss.backward() # 역전파
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # 그래디언트 클리핑 (학습 안정화)
        optimizer.step() # 파라미터 업데이트

        total_loss += loss.item()

    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    """ 모델의 성능을 평가하는 함수 """
    model.eval()  # 모델을 평가 모드로 설정
    total_loss = 0

    with torch.no_grad(): # 그래디언트 계산 비활성화
        for src_batch, tgt_batch in dataloader:
            src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device)

            tgt_input = tgt_batch[:, :-1]
            tgt_output = tgt_batch[:, 1:]

            src_mask, tgt_mask = create_masks(src_batch, tgt_input, device)

            prediction = model(src_batch, tgt_input, src_mask, tgt_mask)

            loss = criterion(
                prediction.reshape(-1, prediction.size(-1)),
                tgt_output.reshape(-1)
            )
            total_loss += loss.item()

    return total_loss / len(dataloader)


# --- 메인 훈련 루프 ---
N_EPOCHS = 10
LEARNING_RATE = 0.0001

# Loss 함수와 Optimizer 정의
# PAD_TOKEN은 Loss 계산에서 무시하도록 설정
criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.98), eps=1e-9)

print("\n--- Starting Training ---")

for epoch in range(N_EPOCHS):
    start_time = time.time()

    # 훈련 및 평가
    train_loss = train_epoch(model, dataloader, optimizer, criterion, device)
    valid_loss = evaluate(model, dataloader, criterion, device) # 간단히 훈련 데이터로 평가

    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)

    print(f"Epoch: {epoch+1:02} | Time: {epoch_mins:.0f}m {epoch_secs:.0f}s")
    print(f"\tTrain Loss: {train_loss:.3f}")
    print(f"\t Val. Loss: {valid_loss:.3f}")

print("\n--- Training Complete ---")


--- Starting Training ---
Epoch: 01 | Time: 10m 15s
	Train Loss: 3.160
	 Val. Loss: 2.361
