# Seq2se으로 번역기 만들기

## 1. 라이브러리 및 GitHub 데이터 다운로드

In [38]:
import os
import re
from collections import Counter

import urllib.request
import zipfile

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import tarfile 


In [39]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("DEVICE:", DEVICE)


DEVICE: cuda


In [40]:
# 1) 데이터 폴더
DATA_DIR = "data_korean_english_news_v1"
os.makedirs(DATA_DIR, exist_ok=True)

# 2) 저장소 zip URL
ZIP_URL = "https://github.com/jungyeul/korean-parallel-corpora/archive/refs/heads/master.zip"  # [web:1]
ZIP_PATH = os.path.join(DATA_DIR, "korean-parallel-corpora-master.zip")

# 3) 다운로드
if not os.path.exists(ZIP_PATH):
    print("Downloading corpus zip...")
    urllib.request.urlretrieve(ZIP_URL, ZIP_PATH)
    print("Download finished.")

# 4) zip 압축 해제 (저장소 디렉토리 생성)
EXTRACT_ROOT = os.path.join(DATA_DIR, "korean-parallel-corpora-master")
if not os.path.exists(EXTRACT_ROOT):
    print("Extracting zip...")
    with zipfile.ZipFile(ZIP_PATH, "r") as zf:
        zf.extractall(DATA_DIR)
    print("Extraction finished.")
else:
    print("Extract root already exists:", EXTRACT_ROOT)

# 5) korean-english-news-v1 디렉토리
BASE_DIR = EXTRACT_ROOT  # = data_korean_english_news_v1/korean-parallel-corpora-master
NEWS_DIR = os.path.join(BASE_DIR, "korean-english-news-v1")
print("NEWS_DIR:", NEWS_DIR)
print("FILES before tar extraction:", os.listdir(NEWS_DIR))

# 6) korean-english-park.*.tar.gz 모두 압축 해제
for fname in os.listdir(NEWS_DIR):
    if fname.endswith(".tar.gz"):
        tar_path = os.path.join(NEWS_DIR, fname)
        print("Extracting:", tar_path)
        with tarfile.open(tar_path, "r:gz") as tar:
            tar.extractall(NEWS_DIR)
        print("Done:", fname)

print("FILES after tar extraction:", os.listdir(NEWS_DIR))


Extract root already exists: data_korean_english_news_v1/korean-parallel-corpora-master
NEWS_DIR: data_korean_english_news_v1/korean-parallel-corpora-master/korean-english-news-v1
FILES before tar extraction: ['korean-english-park.train.tar.gz', 'korean-english-park.test.ko', 'korean-english-park.test.tar.gz', 'korean-english-park.dev.tar.gz', 'korean-english-park.test.en', 'korean-english-park.dev.en', 'README.md', 'korean-english-park.train.ko', 'korean-english-park.train.en', 'korean-english-park.dev.ko']
Extracting: data_korean_english_news_v1/korean-parallel-corpora-master/korean-english-news-v1/korean-english-park.train.tar.gz


  tar.extractall(NEWS_DIR)


Done: korean-english-park.train.tar.gz
Extracting: data_korean_english_news_v1/korean-parallel-corpora-master/korean-english-news-v1/korean-english-park.test.tar.gz
Done: korean-english-park.test.tar.gz
Extracting: data_korean_english_news_v1/korean-parallel-corpora-master/korean-english-news-v1/korean-english-park.dev.tar.gz
Done: korean-english-park.dev.tar.gz
FILES after tar extraction: ['korean-english-park.train.tar.gz', 'korean-english-park.test.ko', 'korean-english-park.test.tar.gz', 'korean-english-park.dev.tar.gz', 'korean-english-park.test.en', 'korean-english-park.dev.en', 'README.md', 'korean-english-park.train.ko', 'korean-english-park.train.en', 'korean-english-park.dev.ko']


In [41]:
TRAIN_BASE_PATH = "korean-english-park.train"
KO_TRAIN_PATH = os.path.join(NEWS_DIR, "korean-english-park.train.ko")
EN_TRAIN_PATH = os.path.join(NEWS_DIR, "korean-english-park.train.en")


## 2. 전처리 및 토큰화


### 2-1. 전처리 함수

In [42]:
MAX_LEN = 40
MIN_FREQ = 2
MAX_VOCAB = 20000

def preprocess_en(text: str) -> str:
    # 소문자화 + 알파벳/숫자/구두점만 남기기
    text = text.lower()
    text = re.sub(r"[^a-z0-9,.!?\' ]", " ", text)
    # 구두점 앞뒤 공백 정리[web:18]
    text = re.sub(r"([,.!?])", r" \1 ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def preprocess_ko(text: str) -> str:
    # 한글/숫자/기본 구두점만 남기기
    text = re.sub(r"[^가-힣0-9,.!?\' ]", " ", text)
    text = re.sub(r"([,.!?])", r" \1 ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def tokenize_ko(text: str):
    # 한국어: 어절 단위 토큰화 (공백 기준)[web:38]
    return text.split()

def tokenize_en(text: str):
    # 영어: 공백 기준 토큰화
    return text.split()


### 2-2. 파일 읽기 + 토큰화

In [43]:
with open(KO_TRAIN_PATH, encoding="utf-8") as f_ko, \
     open(EN_TRAIN_PATH, encoding="utf-8") as f_en:
    kor_lines = [l.strip() for l in f_ko]
    eng_lines = [l.strip() for l in f_en]

assert len(kor_lines) == len(eng_lines)
pairs = list(set(zip(kor_lines, eng_lines)))  # 중복 제거
print("총 문장 쌍 수:", len(pairs))


총 문장 쌍 수: 78941


In [44]:
kor_tokenized = []
eng_tokenized = []

for ko, en in pairs:
    ko_p = preprocess_ko(ko)
    en_p = preprocess_en(en)
    if not ko_p or not en_p:
        continue

    ko_tokens = tokenize_ko(ko_p)
    en_tokens = tokenize_en(en_p)

    if len(ko_tokens) <= MAX_LEN and len(en_tokens) <= MAX_LEN:
        kor_tokenized.append(ko_tokens)
        eng_tokenized.append(en_tokens)

print("전처리 후 문장 쌍 수:", len(kor_tokenized))
print("예시 한국어 토큰:", kor_tokenized[0][:10])
print("예시 영어 토큰:", eng_tokenized[0][:10])


전처리 후 문장 쌍 수: 71902
예시 한국어 토큰: ['아일랜드만', '유일하게', '아일랜드', '헌법의', '필요', '조건들로', '인해', '국민투표를', '실시할', '예정이다']
예시 영어 토큰: ['ireland', 'is', 'expected', 'to', 'hold', 'a', 'vote', 'on', 'the', 'treaty']


## 3. Vocab, Dataset, DataLoader

In [45]:
SPECIAL_TOKENS = ["<pad>", "<sos>", "<eos>", "<unk>"]

def build_vocab(corpus, min_freq=2, max_vocab=20000):
    counter = Counter(token for sent in corpus for token in sent)
    vocab = SPECIAL_TOKENS.copy()
    vocab += [w for w, c in counter.most_common(max_vocab) if c >= min_freq]
    stoi = {w: i for i, w in enumerate(vocab)}
    itos = {i: w for w, i in stoi.items()}
    return stoi, itos


In [46]:
src_stoi, src_itos = build_vocab(kor_tokenized, MIN_FREQ, MAX_VOCAB)
tgt_stoi, tgt_itos = build_vocab(eng_tokenized, MIN_FREQ, MAX_VOCAB)

PAD_IDX_SRC = src_stoi["<pad>"]
PAD_IDX_TGT = tgt_stoi["<pad>"]
SOS_IDX_TGT = tgt_stoi["<sos>"]
EOS_IDX_TGT = tgt_stoi["<eos>"]

print("SRC vocab size:", len(src_stoi))
print("TGT vocab size:", len(tgt_stoi))


SRC vocab size: 20004
TGT vocab size: 20004


In [47]:
def encode(tokens, stoi):
    ids = [stoi.get(t, stoi["<unk>"]) for t in tokens]
    return [stoi["<sos>"]] + ids + [stoi["<eos>"]]

def pad_sequence(seq, max_len, pad_idx):
    return seq + [pad_idx] * (max_len - len(seq))


In [48]:
class TranslationDataset(Dataset):
    def __init__(self, src_corpus, tgt_corpus, src_stoi, tgt_stoi, max_len=MAX_LEN+2):
        self.src = src_corpus
        self.tgt = tgt_corpus
        self.src_stoi = src_stoi
        self.tgt_stoi = tgt_stoi
        self.max_len = max_len

    def __len__(self):
        return len(self.src)

    def __getitem__(self, idx):
        src_ids = encode(self.src[idx], self.src_stoi)
        tgt_ids = encode(self.tgt[idx], self.tgt_stoi)

        src_ids = src_ids[:self.max_len]
        tgt_ids = tgt_ids[:self.max_len]

        src_padded = pad_sequence(src_ids, self.max_len, PAD_IDX_SRC)
        tgt_padded = pad_sequence(tgt_ids, self.max_len, PAD_IDX_TGT)

        return torch.tensor(src_padded), torch.tensor(tgt_padded)


In [49]:
dataset = TranslationDataset(kor_tokenized, eng_tokenized, src_stoi, tgt_stoi)
BATCH_SIZE = 64
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)


## 4. Attentional Seq2Seq 모델

In [50]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.gru = nn.GRU(emb_dim, hid_dim, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)               # (B, S, E)
        outputs, hidden = self.gru(embedded)         # outputs: (B, S, H)
        return outputs, hidden                       # hidden: (1, B, H)


In [51]:
class LuongAttention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        self.linear = nn.Linear(hid_dim, hid_dim, bias=False)

    def forward(self, hidden, encoder_outputs, mask=None):
        hidden = hidden[-1]                          # (B, H)
        scores = torch.bmm(
            encoder_outputs,                         # (B, S, H)
            self.linear(hidden).unsqueeze(2)         # (B, H, 1)
        ).squeeze(2)                                 # (B, S)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn_weights = torch.softmax(scores, dim=1)  # (B, S)
        context = torch.bmm(
            attn_weights.unsqueeze(1),               # (B, 1, S)
            encoder_outputs                          # (B, S, H)
        ).squeeze(1)                                 # (B, H)
        return context, attn_weights


In [52]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_idx)
        self.gru = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True)
        self.attn = LuongAttention(hid_dim)
        self.fc_out = nn.Linear(hid_dim * 2, vocab_size)

    def forward(self, input, hidden, encoder_outputs, mask=None):
        embedded = self.embedding(input).unsqueeze(1)   # (B,1,E)
        context, attn = self.attn(hidden, encoder_outputs, mask)  # (B,H)
        context = context.unsqueeze(1)                  # (B,1,H)

        rnn_input = torch.cat([embedded, context], dim=2)  # (B,1,E+H)
        output, hidden = self.gru(rnn_input, hidden)       # output: (B,1,H)

        output = output.squeeze(1)                     # (B,H)
        context = context.squeeze(1)                   # (B,H)
        logits = self.fc_out(torch.cat([output, context], dim=1))  # (B,V)
        return logits, hidden, attn


In [53]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def create_mask(self, src):
        return (src != PAD_IDX_SRC).to(self.device)   # (B,S)

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        tgt_len = tgt.size(1)
        vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, tgt_len, vocab_size, device=self.device)

        encoder_outputs, hidden = self.encoder(src)
        mask = self.create_mask(src)

        input_tok = tgt[:, 0]  # <sos>

        for t in range(1, tgt_len):
            logits, hidden, attn = self.decoder(input_tok, hidden, encoder_outputs, mask)
            outputs[:, t, :] = logits

            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = logits.argmax(1)
            input_tok = tgt[:, t] if teacher_force else top1

        return outputs


In [54]:
EMB_DIM = 256
HID_DIM = 512

encoder = Encoder(len(src_stoi), EMB_DIM, HID_DIM, PAD_IDX_SRC)
decoder = Decoder(len(tgt_stoi), EMB_DIM, HID_DIM, PAD_IDX_TGT)
model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX_TGT)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


## 5. 학습 루프 (loss 감소 확인)

In [55]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0

    for src, tgt in dataloader:
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()
        outputs = model(src, tgt, teacher_forcing_ratio=0.5)  # (B,T,V)

        output_dim = outputs.shape[-1]
        loss = criterion(
            outputs[:, 1:].reshape(-1, output_dim),
            tgt[:, 1:].reshape(-1)
        )
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


In [56]:
N_EPOCHS = 5  # 필요시 늘리기

for epoch in range(1, N_EPOCHS + 1):
    train_loss = train_one_epoch(model, dataloader, optimizer, criterion, DEVICE)
    print(f"[Epoch {epoch}] Train Loss: {train_loss:.4f}")


[Epoch 1] Train Loss: 6.1644
[Epoch 2] Train Loss: 5.3815
[Epoch 3] Train Loss: 4.8171
[Epoch 4] Train Loss: 4.3589
[Epoch 5] Train Loss: 4.0676


## 6. 테스트용 디코더 / 번역 함수

In [57]:
def greedy_decode(model, src_tokens, max_len=MAX_LEN+2):
    model.eval()
    with torch.no_grad():
        src_ids = encode(src_tokens, src_stoi)
        src_ids = src_ids[:max_len]
        src_padded = pad_sequence(src_ids, max_len, PAD_IDX_SRC)
        src_tensor = torch.tensor(src_padded).unsqueeze(0).to(DEVICE)

        encoder_outputs, hidden = model.encoder(src_tensor)
        mask = model.create_mask(src_tensor)

        input_tok = torch.tensor([SOS_IDX_TGT], device=DEVICE)
        generated = []

        for _ in range(max_len):
            logits, hidden, attn = model.decoder(input_tok, hidden, encoder_outputs, mask)
            top1 = logits.argmax(1)
            if top1.item() == EOS_IDX_TGT:
                break
            generated.append(top1.item())
            input_tok = top1

    tokens = [tgt_itos.get(i, "<unk>") for i in generated]
    tokens = [t for t in tokens if t not in ["<pad>", "<sos>", "<eos>"]]
    return " ".join(tokens)


In [58]:
def translate_ko2en(model, sentence_ko: str):
    ko_p = preprocess_ko(sentence_ko)
    ko_tokens = tokenize_ko(ko_p)
    return greedy_decode(model, ko_tokens)


### 테스트 예시

In [59]:
test_sentences = [
    "오늘 날씨가 정말 좋네요.",
    "이 모델이 제대로 번역을 할까요?",
    "한국어를 영어로 번역하는 예제입니다."
]

for s in test_sentences:
    print("SRC:", s)
    print("PRED:", translate_ko2en(model, s))
    print()


SRC: 오늘 날씨가 정말 좋네요.
PRED: it's really a lot of the

SRC: 이 모델이 제대로 번역을 할까요?
PRED: this is the the the the ?

SRC: 한국어를 영어로 번역하는 예제입니다.
PRED: the translated translated translated translated .

