In [1]:
import spacy

spacy_en = spacy.load('en_core_web_sm')
spacy_de = spacy.load('de_core_news_sm')

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]



In [2]:
tokenized = spacy_en.tokenizer('I am a graduate student')

for i, token in enumerate(tokenized):
    print(i, token.text)

0 I
1 am
2 a
3 graduate
4 student


In [3]:
# 최신 torchtext API 사용
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader
import torch

# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 중인 device: {device}")

# 토크나이저 함수 (기존과 동일)
def tokenize_de_new(text):
    return ['<sos>'] + tokenize_de(text.lower()) + ['<eos>']

def tokenize_en_new(text):
    return ['<sos>'] + tokenize_en(text.lower()) + ['<eos>']


사용 중인 device: cuda


In [4]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

In [5]:
# import os
# import shutil

# base_dir = os.path.join('.', '.data', 'multi30k')

# for name in os.listdir(base_dir):
#     sub_path = os.path.join(base_dir, name)
#     # 폴더이고, 폴더명과 같은 파일이 그 안에 있으면
#     if os.path.isdir(sub_path):
#         file_inside = os.path.join(sub_path, name)
#         if os.path.isfile(file_inside):
#             # 임시 파일명으로 이동
#             temp_name = name + '.tmp'
#             temp_path = os.path.join(base_dir, temp_name)
#             shutil.move(file_inside, temp_path)
#             print(f"{file_inside} → {temp_path} 임시 이동 완료")
#         # 폴더 삭제
#         shutil.rmtree(sub_path)
#         print(f"{sub_path} 폴더 삭제 완료")
#         # 임시 파일명을 원래 이름으로 변경
#         if os.path.exists(temp_path):
#             final_path = os.path.join(base_dir, name)
#             os.rename(temp_path, final_path)
#             print(f"{temp_path} → {final_path} 이름 변경 완료")

In [6]:
# import os
# import shutil

# base_dir = os.path.join('.', '.data', 'multi30k')

# # 파일명 변경
# old_names = ['test_2016_flickr.de', 'test_2016_flickr.en']
# new_names = ['test2016.de', 'test2016.en']

# for old_name, new_name in zip(old_names, new_names):
#     old_path = os.path.join(base_dir, old_name)
#     new_path = os.path.join(base_dir, new_name)
#     if os.path.exists(old_path):
#         os.rename(old_path, new_path)
#         print(f"{old_name} → {new_name} 변경 완료")

In [7]:
# 직접 파일을 읽어서 데이터셋 생성
import os

def read_data_files(base_path, split_name):
    de_file = os.path.join(base_path, f"{split_name}.de")
    en_file = os.path.join(base_path, f"{split_name}.en")
    
    with open(de_file, 'r', encoding='utf-8') as f_de, \
         open(en_file, 'r', encoding='utf-8') as f_en:
        de_lines = [line.strip() for line in f_de]
        en_lines = [line.strip() for line in f_en]
    
    return list(zip(de_lines, en_lines))

# 데이터 로딩
data_path = '.data/multi30k'
train_data = read_data_files(data_path, 'train')
valid_data = read_data_files(data_path, 'val')

# test 파일명 처리
try:
    test_data = read_data_files(data_path, 'test_2016_flickr')
except:
    test_data = read_data_files(data_path, 'test2016')

print(f"학습 데이터: {len(train_data)}개")
print(f"검증 데이터: {len(valid_data)}개") 
print(f"테스트 데이터: {len(test_data)}개")

학습 데이터: 29000개
검증 데이터: 1014개
테스트 데이터: 1000개


In [8]:
# 데이터 샘플 확인
print("30번째 예시:")
print("Source (독일어):", train_data[30][0])
print("Target (영어):", train_data[30][1])

# 토큰화된 결과 확인
print("\n토큰화된 결과:")
print("Source (독일어):", tokenize_de_new(train_data[30][0]))
print("Target (영어):", tokenize_en_new(train_data[30][1]))

30번째 예시:
Source (독일어): Ein Mann, der mit einer Tasse Kaffee an einem Urinal steht.
Target (영어): A man standing at a urinal with a coffee cup.

토큰화된 결과:
Source (독일어): ['<sos>', 'ein', 'mann', ',', 'der', 'mit', 'einer', 'tasse', 'kaffee', 'an', 'einem', 'urinal', 'steht', '.', '<eos>']
Target (영어): ['<sos>', 'a', 'man', 'standing', 'at', 'a', 'urinal', 'with', 'a', 'coffee', 'cup', '.', '<eos>']


In [9]:
# 어휘집 구축을 위한 토큰 생성기
def yield_tokens_de(data_iter):
    for de_text, en_text in data_iter:
        yield tokenize_de_new(de_text)

def yield_tokens_en(data_iter):
    for de_text, en_text in data_iter:
        yield tokenize_en_new(en_text)

# 어휘집 구축 (최신 API)
from collections import Counter

# 토큰 빈도 계산
def get_vocab_with_min_freq(token_generator, min_freq=2):
    counter = Counter()
    for tokens in token_generator:
        counter.update(tokens)
    
    # min_freq 이상인 토큰만 선택
    filtered_counter = {token: count for token, count in counter.items() if count >= min_freq}
    return filtered_counter

# 독일어 어휘집
de_counter = get_vocab_with_min_freq(yield_tokens_de(train_data), min_freq=2)

# 특수 토큰을 어휘에 먼저 추가
special_tokens = ['<unk>', '<pad>', '<sos>', '<eos>']
de_vocab_tokens = special_tokens + list(de_counter.keys())

SRC_vocab = build_vocab_from_iterator([de_vocab_tokens])

# 영어 어휘집  
en_counter = get_vocab_with_min_freq(yield_tokens_en(train_data), min_freq=2)
en_vocab_tokens = special_tokens + list(en_counter.keys())

TRG_vocab = build_vocab_from_iterator([en_vocab_tokens])

# 기본 토큰 설정 (최신 API)
# UNK 토큰을 기본값으로 설정 (인덱스 0)
unk_idx = 0  # 특수 토큰 리스트에서 <unk>가 첫 번째

# Vocab 객체를 더 유연하게 만들기 위해 래퍼 함수 정의
def create_vocab_lookup(vocab, unk_idx=0):
    """어휘집에서 토큰을 인덱스로 변환하는 함수"""
    def lookup_tokens(tokens):
        if isinstance(tokens, str):
            return vocab.get(tokens, unk_idx)
        return [vocab.get(token, unk_idx) for token in tokens]
    return lookup_tokens

# 어휘집 검색 함수 생성 (최신 API 호환)
# Vocab 객체에서 직접 토큰-인덱스 매핑 생성
try:
    # 최신 버전 시도
    src_tokens = list(SRC_vocab.get_itos())
    trg_tokens = list(TRG_vocab.get_itos())
except AttributeError:
    # 다른 방법으로 토큰 리스트 얻기
    src_tokens = de_vocab_tokens
    trg_tokens = en_vocab_tokens

SRC_lookup = create_vocab_lookup(dict(zip(src_tokens, range(len(src_tokens)))))
TRG_lookup = create_vocab_lookup(dict(zip(trg_tokens, range(len(trg_tokens)))))

print(f"독일어 어휘집 크기: {len(SRC_vocab)}")
print(f"영어 어휘집 크기: {len(TRG_vocab)}")

# 기존 코드와의 호환성을 위한 클래스 생성
class VocabWrapper:
    def __init__(self, vocab, lookup_func, tokens):
        self.vocab = vocab
        self.stoi = {token: idx for idx, token in enumerate(tokens)}
        self.itos = tokens
        self.pad_token = '<pad>'
        self.lookup = lookup_func
    
    def __len__(self):
        return len(self.vocab)
    
    def __getitem__(self, token):
        """vocab['token'] 형태로 사용 가능"""
        return self.stoi.get(token, 0)  # 기본값은 <unk> 인덱스 0

# 기존 변수명 유지
SRC = VocabWrapper(SRC_vocab, SRC_lookup, src_tokens)
TRG = VocabWrapper(TRG_vocab, TRG_lookup, trg_tokens)

1lines [00:00, ?lines/s]
1lines [00:00, ?lines/s]

독일어 어휘집 크기: 7853
영어 어휘집 크기: 5893





In [10]:
print("어휘집 테스트:")
print(f"abcabc (없는 단어): {TRG.stoi.get('abcabc', TRG.stoi['<unk>'])}")
print(f"<pad> 토큰: {TRG.stoi['<pad>']}")
print(f"<sos> 토큰: {TRG.stoi['<sos>']}")
print(f"hello: {TRG.stoi.get('hello', TRG.stoi['<unk>'])}")

# GPU 확인
print(f"\nGPU 사용 가능: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU 이름: {torch.cuda.get_device_name(0)}")
    
    # GPU 테스트
    test_tensor = torch.randn(3, 3).to(device)
    print(f"테스트 텐서가 {test_tensor.device}에 있습니다.")

어휘집 테스트:
abcabc (없는 단어): 0
<pad> 토큰: 1
<sos> 토큰: 4
hello: 4529

GPU 사용 가능: True
GPU 이름: NVIDIA GeForce RTX 4070 Ti
테스트 텐서가 cuda:0에 있습니다.


In [11]:
import torch

print(f"PyTorch 버전: {torch.__version__}")
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
print(f"CUDA 버전: {torch.version.cuda}")
print(f"cuDNN 버전: {torch.backends.cudnn.version()}")

if torch.cuda.is_available():
    print(f"GPU 개수: {torch.cuda.device_count()}")
    print(f"GPU 이름: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA를 사용할 수 없습니다.")

PyTorch 버전: 2.7.1+cu118
CUDA 사용 가능: True
CUDA 버전: 11.8
cuDNN 버전: 90100
GPU 개수: 1
GPU 이름: NVIDIA GeForce RTX 4070 Ti


In [12]:
# 최신 PyTorch DataLoader로 BucketIterator 대체
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 중인 device: {device}")

BATCH_SIZE = 128

# 커스텀 Dataset 클래스
class TranslationDataset(Dataset):
    def __init__(self, data, src_vocab, trg_vocab, tokenize_src, tokenize_trg):
        self.data = data
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab
        self.tokenize_src = tokenize_src
        self.tokenize_trg = tokenize_trg
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        src_text, trg_text = self.data[idx]
        
        # 토큰화
        src_tokens = self.tokenize_src(src_text)
        trg_tokens = self.tokenize_trg(trg_text)
        
        # 인덱스로 변환
        src_indices = [self.src_vocab[token] for token in src_tokens]
        trg_indices = [self.trg_vocab[token] for token in trg_tokens]
        
        return torch.tensor(src_indices), torch.tensor(trg_indices)

# collate 함수 (배치 처리용)
def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    
    # 패딩
    src_batch = pad_sequence(src_batch, batch_first=True, padding_value=SRC['<pad>'])
    trg_batch = pad_sequence(trg_batch, batch_first=True, padding_value=TRG['<pad>'])
    
    return src_batch, trg_batch

# Dataset 생성
train_dataset_new = TranslationDataset(train_data, SRC, TRG, tokenize_de_new, tokenize_en_new)
valid_dataset_new = TranslationDataset(valid_data, SRC, TRG, tokenize_de_new, tokenize_en_new)
test_dataset_new = TranslationDataset(test_data, SRC, TRG, tokenize_de_new, tokenize_en_new)

# DataLoader 생성 (BucketIterator 대체)
train_iterator = DataLoader(
    train_dataset_new, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
    collate_fn=collate_fn,
    pin_memory=True if torch.cuda.is_available() else False
)

valid_iterator = DataLoader(
    valid_dataset_new, 
    batch_size=BATCH_SIZE, 
    shuffle=False, 
    collate_fn=collate_fn,
    pin_memory=True if torch.cuda.is_available() else False
)

test_iterator = DataLoader(
    test_dataset_new, 
    batch_size=BATCH_SIZE, 
    shuffle=False, 
    collate_fn=collate_fn,
    pin_memory=True if torch.cuda.is_available() else False
)

print(f"DataLoader 생성 완료!")
print(f"학습 배치 수: {len(train_iterator)}")
print(f"검증 배치 수: {len(valid_iterator)}")
print(f"테스트 배치 수: {len(test_iterator)}")

# 샘플 배치 확인
sample_batch = next(iter(train_iterator))
src_sample, trg_sample = sample_batch
print(f"\n샘플 배치 크기:")
print(f"Source: {src_sample.shape}")
print(f"Target: {trg_sample.shape}")
print(f"Device: {src_sample.device if torch.cuda.is_available() else 'CPU'}")


사용 중인 device: cuda
DataLoader 생성 완료!
학습 배치 수: 227
검증 배치 수: 8
테스트 배치 수: 8

샘플 배치 크기:
Source: torch.Size([128, 30])
Target: torch.Size([128, 29])
Device: cpu


In [13]:
# 새로운 DataLoader 형태로 배치 처리
for i, batch in enumerate(train_iterator):
    src, trg = batch  # 튜플 언패킹
    
    # GPU로 데이터 이동
    src = src.to(device)
    trg = trg.to(device)

    print(f"첫 번째 배치 크기 - Source: {src.shape}, Target: {trg.shape}")
    print(f"Device: {src.device}")

    print("\n첫 번째 문장의 토큰 인덱스:")
    for j in range(min(10, src.shape[1])):  # 처음 10개 토큰만 출력
        token_idx = src[0][j].item()
        token = SRC.itos[token_idx] if token_idx < len(SRC.itos) else '<unk>'
        print(f"인덱스 {j}: {token_idx} -> '{token}'")

    break


첫 번째 배치 크기 - Source: torch.Size([128, 24]), Target: torch.Size([128, 27])
Device: cuda:0

첫 번째 문장의 토큰 인덱스:
인덱스 0: 4 -> '<sos>'
인덱스 1: 23 -> 'ein'
인덱스 2: 541 -> 'bauarbeiter'
인덱스 3: 34 -> 'steht'
인덱스 4: 92 -> 'oben'
인덱스 5: 35 -> 'auf'
인덱스 6: 36 -> 'einer'
인덱스 7: 4154 -> 'holzkonstruktion'
인덱스 8: 17 -> '.'
인덱스 9: 18 -> '<eos>'


## 1. Multi-head attention 아키텍쳐

- 세 가지 요소를 입력을 받는 attention
    - 쿼리 (Queries)
    - 키 (Keys)
    - 값 (Values)
- 하이퍼 파라미터
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_heads : 헤드의 개수 = scaled dot-product attention의 개수
    - dropout_ratio : 드롭아웃 비율

In [36]:
# 긴급 해결: CPU 모드로 전환하여 디버깅
print("=== 긴급 디버깅: CPU 모드 전환 ===")

# 모든 것을 CPU로 이동
device_cpu = torch.device('cpu')
model_cpu = model.to(device_cpu)

# 샘플 배치로 테스트
sample_batch = next(iter(train_iterator))
src_sample, trg_sample = sample_batch

# CPU로 이동
src_cpu = src_sample.to(device_cpu)
trg_cpu = trg_sample.to(device_cpu)

print(f"Source shape: {src_cpu.shape}")
print(f"Target shape: {trg_cpu.shape}")

# 인덱스 범위 확인
print(f"\nSource 인덱스 범위: {src_cpu.min().item()} ~ {src_cpu.max().item()}")
print(f"Target 인덱스 범위: {trg_cpu.min().item()} ~ {trg_cpu.max().item()}")
print(f"Source vocab 크기: {len(SRC)}")
print(f"Target vocab 크기: {len(TRG)}")

# 범위 초과 체크
if src_cpu.max().item() >= len(SRC):
    print(f"⚠️ SOURCE 범위 초과! 최대 인덱스: {src_cpu.max().item()}, vocab 크기: {len(SRC)}")
    
if trg_cpu.max().item() >= len(TRG):
    print(f"⚠️ TARGET 범위 초과! 최대 인덱스: {trg_cpu.max().item()}, vocab 크기: {len(TRG)}")

# CPU에서 모델 테스트
try:
    print("\nCPU에서 모델 테스트...")
    with torch.no_grad():
        output, _ = model_cpu(src_cpu[:1], trg_cpu[:1, :-1])  # 첫 번째 샘플만
    print("✅ CPU에서 모델 실행 성공!")
    print(f"Output shape: {output.shape}")
except Exception as e:
    print(f"❌ CPU에서도 오류 발생: {e}")

# 특수 토큰 인덱스 확인
print(f"\n=== 특수 토큰 인덱스 ===")
print(f"SRC <pad>: {SRC.stoi['<pad>']}")
print(f"SRC <sos>: {SRC.stoi['<sos>']}")
print(f"SRC <eos>: {SRC.stoi['<eos>']}")
print(f"TRG <pad>: {TRG.stoi['<pad>']}")
print(f"TRG <sos>: {TRG.stoi['<sos>']}")
print(f"TRG <eos>: {TRG.stoi['<eos>']}")


=== 긴급 디버깅: CPU 모드 전환 ===


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [37]:
# CPU 모드로 수정된 train 함수
def train_cpu_debug(model, iterator, optimizer, criterion, clip, device_override=None):
    """디버깅용 train 함수 - CPU 모드"""
    model.train()
    epoch_loss = 0
    device_to_use = device_override if device_override else torch.device('cpu')
    
    print(f"학습 시작 - Device: {device_to_use}")
    
    for i, batch in enumerate(iterator):
        # 배치 언패킹
        src, trg = batch
        
        # 디바이스로 이동
        src = src.to(device_to_use)
        trg = trg.to(device_to_use)
        
        # 인덱스 범위 체크
        if src.max().item() >= len(SRC):
            print(f"배치 {i}: SRC 인덱스 초과 {src.max().item()} >= {len(SRC)}")
            continue
        if trg.max().item() >= len(TRG):
            print(f"배치 {i}: TRG 인덱스 초과 {trg.max().item()} >= {len(TRG)}")
            continue
        
        optimizer.zero_grad()
        
        try:
            # 모델 실행
            output, _ = model(src, trg[:, :-1])
            output_dim = output.shape[-1]
            
            # 출력 변환
            output = output.contiguous().view(-1, output_dim)
            trg_flat = trg[:,1:].contiguous().view(-1)
            
            loss = criterion(output, trg_flat)
            loss.backward()
            
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
            optimizer.step()
            
            epoch_loss += loss.item()
            
            if i % 10 == 0:
                print(f"배치 {i}/{len(iterator)}: Loss = {loss.item():.4f}")
                
        except Exception as e:
            print(f"배치 {i}에서 오류: {e}")
            print(f"SRC shape: {src.shape}, TRG shape: {trg.shape}")
            print(f"SRC range: {src.min()}-{src.max()}, TRG range: {trg.min()}-{trg.max()}")
            break
    
    return epoch_loss / len(iterator)

print("CPU 디버깅 train 함수 정의 완료")


CPU 디버깅 train 함수 정의 완료


In [38]:
# CPU 모드에서 테스트 실행
print("=== CPU 모드 테스트 실행 ===")

# 옵티마이저를 CPU 모델용으로 재생성
optimizer_cpu = torch.optim.Adam(model_cpu.parameters(), lr=0.0005)

# CPU 모드에서 한 배치만 테스트
try:
    print("CPU 모드에서 한 배치 테스트...")
    test_loss = train_cpu_debug(model_cpu, train_iterator, optimizer_cpu, criterion, 1, torch.device('cpu'))
    print(f"✅ CPU 테스트 성공! Loss: {test_loss:.4f}")
    
    # 문제가 없다면 GPU로 다시 시도
    print("\n=== GPU 모드 재시도 준비 ===")
    print("문제가 해결되었다면 다음 단계:")
    print("1. 모델을 GPU로 다시 이동")
    print("2. 옵티마이저 재생성")
    print("3. GPU에서 학습 시작")
    
except Exception as e:
    print(f"❌ CPU에서도 오류: {e}")
    print("근본적인 데이터 문제가 있습니다.")
    
    # 상세 디버깅
    print("\n=== 상세 디버깅 ===")
    sample_src, sample_trg = next(iter(train_iterator))
    print(f"샘플 데이터:")
    print(f"  SRC: {sample_src[0][:5]}")  # 첫 5개 토큰만
    print(f"  TRG: {sample_trg[0][:5]}")
    
    # 실제 토큰 확인
    print(f"\n실제 토큰:")
    for i, idx in enumerate(sample_src[0][:5]):
        if idx.item() < len(SRC.itos):
            token = SRC.itos[idx.item()]
            print(f"  SRC[{i}]: {idx.item()} -> '{token}'")
        else:
            print(f"  SRC[{i}]: {idx.item()} -> INVALID!")
            

=== CPU 모드 테스트 실행 ===


NameError: name 'model_cpu' is not defined

In [15]:
import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, dropout_ratio, device):
        super().__init__()

        assert hidden_dim % n_heads == 0

        self.hidden_dim = hidden_dim # 임베딩 차원
        self.n_heads = n_heads # 헤드(head)의 개수, 서로 다른 attention 컨셉 수
        self.head_dim = hidden_dim // n_heads # 각 헤드의 차원

        self.fc_q = nn.Linear(hidden_dim, hidden_dim) # Query 값에 적용될 FC 레이어
        self.fc_k = nn.Linear(hidden_dim, hidden_dim) # Key 값에 적용될 FC 레이어
        self.fc_v = nn.Linear(hidden_dim, hidden_dim) # Value 값에 적용될 FC 레이어

        self.fc_o = nn.Linear(hidden_dim, hidden_dim) # 최종 결과에 적용될 FC 레이어

        self.dropout = nn.Dropout(dropout_ratio)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)  

    def forward(self, query, key, value, mask = None):

        batch_size = query.shape[0]

        # query : [batch_size, query_len, hidden_dim]
        # key : [batch_size, key_len, hidden_dim]
        # value : [batch_size, value_len, hidden_dim]

        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        # hidden_dim => n_heads x head_dim 형태로 변형 
        # view: 텐서의 shape을 바꿔줌. -1은 자동 계산.
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0,2,1,3)

        # Q : [batch_size, n_heads, query_len, head_dim]
        # K : [batch_size, n_heads, key_len, head_dim]
        # V : [batch_size, n_heads, value_len, head_dim]

        # Attention Energy 계산
        energy = torch.matmul(Q, K.permute(0,1,3,2)) / self.scale

        # 마스크 (mask)를 사용하는 경우
        if mask is not None:
            energy = energy.masked_fill(mask ==0, -1e10)

        # attention 스코어에 대한 softmax 연산
        # attention : [batch)size, n_heads, query_len, key_len]
        attention = torch.softmax(energy, dim = -1)

        # Scaled Dot-Product Attention을 계산
        # x : [batch_size, n_heads, query_len, head_dim]
        x = torch.matmul(self.dropout(attention), V)

        # permute는 텐서 차원 순서 변경, contiguous는 메모리 연속성 보장 함수
        # x : [batch_size, query_len, n_heads, head_dim]
        x = x.permute(0,2,1,3).contiguous()
        
        # x : [batch_size, query_len, hidden_dim]
        x = x.view(batch_size, -1, self.hidden_dim)

        x = self.fc_o(x)

        return x, attention
        

## 2. Position-wise Feedforward 아키텍쳐
- 입력과 출력이 동일
- 하이퍼 파라미터
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - pf_dim : feedforward 레이어에서의 내부 임베딩 차원
    - dropout_ratio : 드롭아웃 비율

In [16]:
class PositionwiseFeedforwardLayer(nn.Module):

    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):
        x = self.dropout(torch.relu(self.fc_1(x)))
        x = self.fc_2(x)
        
        return x

## 3-1. Encoder 레이어 아키텍쳐
- 하나의 인코더 레이어에 대해 정의
    - 입력과 출력의 차원이 같다.
    - 이 특징을 사용해 트랜스포머의 인코더는 인코더 레이어를 여러 번 중첩해서 사용함.

- 하이퍼 파라미터
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_heads : 헤드의 개수 = scaled dot-product attention의 개수
    - dropout_ratio : 드롭아웃 비율

- <pad> 토큰에 대해 마스크 값을 0으로 설정함

In [17]:
class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attn = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    # 하나의 임베딩이 복제되어 Q, K, V 로 입력되는 방식

    def forward(self, src, src_mask):

        # src : [batch_size, src_len, hidden_dim]
        # src_mask : [batch_size, src_len]

        # self attention
        # 필요 시 마스크 행렬을 이용해 어텐션할 단어 조절 가능

        # self.self_attn은 MultiHeadAttentionLayer를 의미하며, 이 레이어는 입력값으로 Q(Query), K(Key), V(Value), 그리고 마스크(src_mask)를 받습니다.
        # 트랜스포머 인코더에서는 self-attention이므로 Q, K, V 모두 동일한 입력(src)을 사용합니다.

        # src_mask는 패딩 토큰 등에 대한 마스킹을 위해 사용됩니다.
        # self.self_attn의 반환값은 (attention_output, attention_weights)로, 여기서는 attention_output만 사용하고 attention_weights는 사용하지 않으므로 _로 처리합니다.
        
        # _src는 self-attention 레이어를 통과한 결과(즉, 어텐션을 적용한 후의 임베딩)이고,
        # src는 아직 self-attention을 통과하지 않은 입력 임베딩입니다.
        
        # self-attention의 출력(_src)에 드롭아웃을 적용한 뒤, 입력(src)와 더해주고,
        # 그 결과를 LayerNorm에 통과시켜서 최종적으로 src를 업데이트합니다.

        _src, _ = self.self_attn(src, src, src, src_mask)
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        _src = self.positionwise_feedforward(src)
        src = self.ff_layer_norm(src + self.dropout(_src))

        return src
        

## 3-2. Encoder 아키텍쳐
- 전체 인코터 아키텍쳐의 정의
- 하이퍼 파라미터
    - input_dim : 하나의 단어에 대한 원-핫 인코딩 차원
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_layers : 내부적으로 사용할 인코더 레이어의 개수
    - n_heads: 헤드의 개수 = scaled dot-product attention의 개수
    - pf_dim : feedforward 레이어에서의 내부 임베딩 차원
    - dropout_ratio : 드롭아웃 비율
    - max_length : 문장 내 최대 단어 개수
- 위치 임베딩 (positional embedding)을 학습하는 형태로 구현
- <pad> 토큰에 대해 마스크 값을 0으로 설정

In [18]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length = 100):
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(input_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.dropout = nn.Dropout(dropout_ratio)
        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, src, src_mask):
        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))

        for layer in self.layers:
            src = layer(src, src_mask)

        return src

## 4-1. Decoder 레이어 아키텍쳐
- 하나의 디코더 레이어에 대해 정의
    - 입력과 출력의 차원이 같음.
    - 이 특징을 이용해 인코더와 마찬가지로 디코더 레이어를 여러 번 중첩해서 사용.
    - 디코더 레이어에서는 두 개의 multi-head attention 레이어를 사용.
- 하이퍼 파라미터
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_heads : 헤드의 개수 = scaled dot-product attention의 개수
    - pf_dim : feedforward 레이어에서의 내부 임베딩 차원
    - dropout_ratio : 드롭아웃 비율
- 소스 문장의 <pad> 토큰에 대해 마스크 값을 0으로 설정
- 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없게 (바로 이전 단어만 보도록) 마스크를 사용함.

In [19]:
class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim)

        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        
        self.self_attn = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.encoder_attn = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)

        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    # 인코더의 출력값(enc_src)을 attention 과정에서 사용하기 위해 입력값으로 받음
    def forward(self, trg, enc_src, trg_mask, src_mask):

        # 1. self attention -> 자기 자신에 대해 어텐션 적용
        _trg, _ = self.self_attn(trg, trg, trg, trg_mask)

        # dropout, residual connection & layer normalization
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg))

        # 2. encoder attention -> 디코더의 쿼리를 이용해 인코더를 어텐션
        _trg, attention = self.encoder_attn(trg, enc_src, enc_src, src_mask)

        # dropout, residual connection & layer normalization
        trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))

        # 3. positionwise feedforward
        _trg = self.positionwise_feedforward(trg)

        # dropout, residual connection & layer normalization
        trg = self.ff_layer_norm(trg + self.dropout(_trg))

        return trg, attention



## 4-2. Decoder 아키텍쳐
- 전체 디코더 아키텍쳐를 정의
- 하이퍼 파라미터
    - output_dim : 하나의 단어에 대한 원-핫 인코딩 차원
    - hidden_dim : 하나의 단어에 대한 임베딩 차원
    - n_layers : 내부적으로 사용할 인코더 레이어의 개수
    - n_heads : 헤드의 개수 = scaled dot-product attention의 개수
    - pf_dim : feedforward 레이어에서의 내부 임베딩 차원
    - dropout_ratio : 드롭아웃 비율
    - max_length : 문장 내 최대 단어 개수
- 위치 임베딩 (positional embedding)을 학습하는 형태로 구현
- Seq2Seq 과 마찬가지로 실제 추론 (inference) 과정에서는 디코더를 반복적으로 넣을 필요가 있음.
    - 학습 시기에는 한 번에 출력 문장을 구해 학습함. 
- 소스 문장의 <pad> 토큰에 대해 마스크 값을 0으로 설정.
- 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록 만들기 위해 마스크를 사용.

In [20]:
class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length = 100):
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
        
        self.fc_out = nn.Linear(hidden_dim, output_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, trg, enc_src, trg_mask, src_mask):

        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)

        trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))

        for layer in self.layers: 
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)

        output = self.fc_out(trg)

        return output, attention


## 5. 최종 Transformer 아키텍쳐
- 전체 Transformer 모델을 정의함.
- 입력을 들어왔을 때 앞서 정의한 인코더와 디코더를 거쳐 출력 문장을 생성함.

In [21]:
class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx

        self.device = device

    def make_src_mask(self, src):
        # unsqueeze는 텐서의 차원을 늘려주는 함수.
        # 예를 들어, (batch_size, src_len) 형태의 src에 대해 unsqueeze(1)을 하면 (batch_size, 1, src_len)이 되고,
        # 다시 unsqueeze(2)를 하면 (batch_size, 1, 1, src_len)이 돼.
        # 이렇게 차원을 늘려주는 이유는 이후에 어텐션 연산에서 브로드캐스팅이 잘 되도록 맞춰주기 위해서임.
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        
        return src_mask
    
    def make_trg_mask(self, trg):

        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]

        trg_sub_mask = torch.tril(torch.ones(trg_len, trg_len, device = self.device)).bool()

        trg_mask = trg_pad_mask & trg_sub_mask

        return trg_mask
    
    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        enc_src = self.encoder(src, src_mask)

        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)

        return output, attention

In [22]:
# 모델 하이퍼파라미터 설정
INPUT_DIM = len(SRC)  # 새로운 vocab 구조에 맞게 수정
OUTPUT_DIM = len(TRG)  # 새로운 vocab 구조에 맞게 수정

print(f"입력 차원 (독일어 어휘 크기): {INPUT_DIM}")
print(f"출력 차원 (영어 어휘 크기): {OUTPUT_DIM}")
HIDDEN_DIM = 256
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1
     

입력 차원 (독일어 어휘 크기): 7853
출력 차원 (영어 어휘 크기): 5893


In [23]:
# 패딩 인덱스 설정 (새로운 vocab 구조에 맞게)
SRC_PAD_IDX = SRC.stoi[SRC.pad_token]
TRG_PAD_IDX = TRG.stoi[TRG.pad_token]

print(f"Source 패딩 인덱스: {SRC_PAD_IDX}")
print(f"Target 패딩 인덱스: {TRG_PAD_IDX}")

# 인코더 & 디코더 & Transformer 객체 선언
enc = Encoder(INPUT_DIM, HIDDEN_DIM, ENC_LAYERS, ENC_HEADS, ENC_PF_DIM, ENC_DROPOUT, device)
dec = Decoder(OUTPUT_DIM, HIDDEN_DIM, DEC_LAYERS, DEC_HEADS, DEC_PF_DIM, DEC_DROPOUT, device)

model = Transformer(enc, dec, SRC_PAD_IDX, TRG_PAD_IDX, device).to(device)


Source 패딩 인덱스: 1
Target 패딩 인덱스: 1


- model 가중치 파라미터 초기화

In [24]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 9,038,341 trainable parameters


In [25]:
# 이 함수는 모델의 가중치(weight) 파라미터를 xavier_uniform 방식으로 초기화해주는 함수야.
# 신경망의 각 레이어(모듈) m에 대해, 만약 'weight' 속성이 있고 그 차원이 1보다 크면(즉, 선형 계층 등),
# nn.init.xavier_uniform을 사용해서 가중치를 초기화해. 이렇게 하면 학습 초기에 적절한 분포로 가중치가 설정되어
# 학습이 더 잘 되도록 도와줘.

def initialize_weights(m):
    if hasattr(m, 'weight') and m.weight.dim() > 1:
        nn.init.xavier_uniform_(m.weight.data)  # xavier_uniform_로 변경 (최신 권장 방식)

# 모델의 모든 서브모듈에 대해 위의 초기화 함수를 적용해.
model.apply(initialize_weights)

Transformer(
  (encoder): Encoder(
    (tok_embedding): Embedding(7853, 256)
    (pos_embedding): Embedding(100, 256)
    (layers): ModuleList(
      (0-2): 3 x EncoderLayer(
        (self_attn_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (ff_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (self_attn): MultiHeadAttentionLayer(
          (fc_q): Linear(in_features=256, out_features=256, bias=True)
          (fc_k): Linear(in_features=256, out_features=256, bias=True)
          (fc_v): Linear(in_features=256, out_features=256, bias=True)
          (fc_o): Linear(in_features=256, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (positionwise_feedforward): PositionwiseFeedforwardLayer(
          (fc_1): Linear(in_features=256, out_features=512, bias=True)
          (fc_2): Linear(in_features=512, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )


## 6. 학습 및 평가 함수 정의
- 모델 학습과 optimization 정의

In [26]:
import torch.optim as optim

lr = 0.0005
optimizer = torch.optim.Adam(model.parameters(), lr = lr)

criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

In [None]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    device = device_override if device_override else torch.device('cpu')


    for i, batch in enumerate(iterator):
        # 새로운 DataLoader 형태: 튜플 언패킹
        src, trg = batch
        
        # GPU로 데이터 이동
        src = src.to(device)
        trg = trg.to(device)

        optimizer.zero_grad()

        # 디코더 입력에서 마지막 토큰 제거 (teacher forcing)
        output, _ = model(src, trg[:, :-1])
        output_dim = output.shape[-1]

        # 출력을 1차원으로 변환
        output = output.contiguous().view(-1, output_dim)

        # 타겟에서 첫 번째 토큰(<sos>) 제거하고 1차원으로 변환
        trg = trg[:,1:].contiguous().view(-1)

        loss = criterion(output, trg)
        loss.backward()

        # 그래디언트 클리핑
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)


In [28]:
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for i, batch in enumerate(iterator):
            # 새로운 DataLoader 형태: 튜플 언패킹
            src, trg = batch
            
            # GPU로 데이터 이동
            src = src.to(device)
            trg = trg.to(device)

            # 디코더 입력에서 마지막 토큰 제거
            output, _ = model(src, trg[:,:-1])

            output_dim = output.shape[-1]
            # 출력을 1차원으로 변환
            output = output.contiguous().view(-1, output_dim)

            # 타겟에서 첫 번째 토큰(<sos>) 제거하고 1차원으로 변환
            trg = trg[:,1:].contiguous().view(-1)

            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [29]:
import math
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [30]:
import time
import math
import random

N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() # 시작 시간 기록

    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() # 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'transformer_german_to_english.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
