In [2]:
import random
import requests
import zipfile
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

def download_zip(url, output_path):
    response = requests.get(url, headers=headers, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"ZIP file downloaded to {output_path}")
    else:
        raise Exception(f"Failed to download. HTTP Response Code: {response.status_code}")

url = "http://www.manythings.org/anki/fra-eng.zip"
zip_path = "fra-eng.zip"
txt_path = "fra.txt"

if not os.path.exists(txt_path):
    download_zip(url, zip_path)
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall()
        print("Extracted files:", zip_ref.namelist())

pairs = []
with open(txt_path, encoding="utf-8") as f:
    for line in f.readlines():
        eng, fra, *_ = line.strip().split("\t")
        pairs.append((eng, fra))

pairs = pairs[:50000]

print("샘플 데이터:", pairs[0])

ZIP file downloaded to fra-eng.zip
Extracted files: ['_about.txt', 'fra.txt']
샘플 데이터: ('Go.', 'Va !')


In [3]:
pairs[:10]

[('Go.', 'Va !'),
 ('Go.', 'Marche.'),
 ('Go.', 'En route !'),
 ('Go.', 'Bouge !'),
 ('Hi.', 'Salut !'),
 ('Hi.', 'Salut.'),
 ('Run!', 'Cours\u202f!'),
 ('Run!', 'Courez\u202f!'),
 ('Run!', 'Prenez vos jambes à vos cous !'),
 ('Run!', 'File !')]

In [4]:
def tokenize(text):
    return text.lower().split()

def build_vocab(texts, min_freq=2):
    counter = Counter()
    for txt in texts:
        counter.update(tokenize(txt))
    vocab = {"<pad>":0, "<sos>":1, "<eos>":2, "<unk>":3}
    for word, freq in counter.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)
    return vocab

src_texts = [src for src, _ in pairs]
trg_texts = [trg for _, trg in pairs]

SRC_VOCAB = build_vocab(src_texts)
TRG_VOCAB = build_vocab(trg_texts)

PAD_IDX = SRC_VOCAB["<pad>"]
SOS_IDX = SRC_VOCAB["<sos>"]
EOS_IDX = SRC_VOCAB["<eos>"]

In [5]:
def numericalize(text, vocab):
    return [vocab.get(tok, vocab["<unk>"]) for tok in tokenize(text)]

def tensor_transform(tokens, vocab):
    return torch.tensor([vocab["<sos>"]] + tokens + [vocab["<eos>"]], dtype=torch.long)

class TranslationDataset(Dataset):
    def __init__(self, pairs, src_vocab, trg_vocab):
        self.pairs = pairs
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        src, trg = self.pairs[idx]
        src_tensor = tensor_transform(numericalize(src, self.src_vocab), self.src_vocab)
        trg_tensor = tensor_transform(numericalize(trg, self.trg_vocab), self.trg_vocab)
        return src_tensor, trg_tensor

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=PAD_IDX)
    trg_batch = nn.utils.rnn.pad_sequence(trg_batch, padding_value=PAD_IDX)
    return src_batch, trg_batch

dataset = TranslationDataset(pairs, SRC_VOCAB, TRG_VOCAB)
train_size = int(len(dataset) * 0.9)
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size])

train_iter = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
valid_iter = DataLoader(valid_dataset, batch_size=64, shuffle=False, collate_fn=collate_fn)

In [6]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        hidden, cell = self.encoder(src)
        trg_len, batch_size = trg.shape
        trg_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        input = trg[0, :]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            top1 = output.argmax(1)
            teacher_force = random.random() < teacher_forcing_ratio
            input = trg[t] if teacher_force else top1
        return outputs

In [7]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

INPUT_DIM = len(SRC_VOCAB)
OUTPUT_DIM = len(TRG_VOCAB)
ENC_EMB_DIM, DEC_EMB_DIM = 256, 256
HID_DIM = 512
N_LAYERS = 2
DROPOUT = 0.5

enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DROPOUT)
model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

In [8]:
def train(model, iterator, optimizer, criterion, clip=1):
    model.train()
    epoch_loss = 0
    for src, trg in iterator:
        src, trg = src.to(DEVICE), trg.to(DEVICE)
        optimizer.zero_grad()
        output = model(src, trg)  # [trg_len, batch_size, output_dim]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

for epoch in range(10):
    loss = train(model, train_iter, optimizer, criterion)
    print(f"Epoch {epoch+1}, Loss: {loss:.3f}")

Epoch 1, Loss: 4.606
Epoch 2, Loss: 3.512
Epoch 3, Loss: 2.989
Epoch 4, Loss: 2.648
Epoch 5, Loss: 2.388
Epoch 6, Loss: 2.173
Epoch 7, Loss: 2.012
Epoch 8, Loss: 1.859
Epoch 9, Loss: 1.732
Epoch 10, Loss: 1.644


In [9]:
def translate_sentence(model, sentence, src_vocab, trg_vocab, device, max_len=50):
    model.eval()
    tokens = sentence.lower().split()

    # 영어 입력을 인덱스로 변환
    src_indexes = [src_vocab.get(tok, src_vocab["<unk>"]) for tok in tokens]
    src_tensor = torch.tensor([src_vocab["<sos>"]] + src_indexes + [src_vocab["<eos>"]],
                              dtype=torch.long).unsqueeze(1).to(device)  # [seq_len, 1]

    # 인코더 통과
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # 디코더 초기 입력은 <sos>
    trg_indexes = [trg_vocab["<sos>"]]

    for _ in range(max_len):
        trg_tensor = torch.tensor([trg_indexes[-1]], dtype=torch.long).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
            pred_token = output.argmax(1).item()

        trg_indexes.append(pred_token)

        if pred_token == trg_vocab["<eos>"]:
            break

    # 인덱스를 단어로 변환
    id2word = {idx: word for word, idx in trg_vocab.items()}
    trg_tokens = [id2word[idx] for idx in trg_indexes]

    return trg_tokens[1:]  # <sos> 제외


In [10]:
example_sentence = "i am hungry"
translation = translate_sentence(model, example_sentence, SRC_VOCAB, TRG_VOCAB, DEVICE)
print(" ".join(translation))

je suis des <eos>
