<a href="https://colab.research.google.com/github/whatnews72/----/blob/master/%EC%9E%90%EC%97%B0%EC%96%B4%EC%B2%98%EB%A6%AC_5%EC%A3%BC%EC%B0%A8_%EC%8B%A4%EC%8A%B5__20215064_%EA%B3%A0%EC%8A%B9%EC%9A%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# -*- coding: utf-8 -*-
"""5주차_실습1_seq2seq_encoder_decoder번역_수강생용.ipynb"""

import os
import shutil
import zipfile
import pandas as pd
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import requests
import random
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Downloading and extracting the dataset
url = 'http://www.manythings.org/anki/fra-eng.zip'
filename = 'fra-eng.zip'
path = os.getcwd()
zipfilename = os.path.join(path, filename)

def download(url, file_name):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
    }
    with open(file_name, "wb") as f:
        response = requests.get(url, headers=headers)
        f.write(response.content)

download(url, zipfilename)

with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
    zip_ref.extractall(path)

# Reading the data
lines = pd.read_csv('fra.txt', names=['src', 'tar', 'lic'], sep='\t')
lines = lines[['src', 'tar']]
lines = lines[:60000]  # Limiting to 60,000 samples

# Adding special tokens
lines['tar'] = lines['tar'].apply(lambda x: '\t ' + x + ' \n')

# Building the vocabularies
src_tokenizer = get_tokenizer('basic_english')
tar_tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter, tokenizer):
    for text in data_iter:
        yield tokenizer(text)

src_vocab = build_vocab_from_iterator(yield_tokens(lines['src'], src_tokenizer), specials=["<unk>", "<pad>", "<bos>", "<eos>"])
tar_vocab = build_vocab_from_iterator(yield_tokens(lines['tar'], tar_tokenizer), specials=["<unk>", "<pad>", "<bos>", "<eos>"])

src_vocab.set_default_index(src_vocab["<unk>"])
tar_vocab.set_default_index(tar_vocab["<unk>"])

# Display vocabulary sizes
print(f"Source vocabulary size: {len(src_vocab)}")
print(f"Target vocabulary size: {len(tar_vocab)}")

class TranslationDataset(Dataset):
    def __init__(self, data, src_vocab, tar_vocab, src_tokenizer, tar_tokenizer):
        self.data = data
        self.src_vocab = src_vocab
        self.tar_vocab = tar_vocab
        self.src_tokenizer = src_tokenizer
        self.tar_tokenizer = tar_tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src = self.data.iloc[idx]['src']
        tar = self.data.iloc[idx]['tar']

        src_indices = [self.src_vocab[token] for token in self.src_tokenizer(src)]
        tar_indices = [self.tar_vocab[token] for token in self.tar_tokenizer(tar)]

        return torch.tensor(src_indices), torch.tensor(tar_indices)

def collate_fn(batch):
    src_batch, tar_batch = zip(*batch)
    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=src_vocab["<pad>"])
    tar_batch = nn.utils.rnn.pad_sequence(tar_batch, padding_value=tar_vocab["<pad>"])
    return src_batch, tar_batch

dataset = TranslationDataset(lines, src_vocab, tar_vocab, src_tokenizer, tar_tokenizer)
train_size = int(0.8 * len(dataset))
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size])

train_iterator = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
valid_iterator = DataLoader(valid_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)

        input = trg[0, :]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if random.random() < teacher_forcing_ratio else top1

        return outputs

def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(model.device), trg.to(model.device)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]

        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for i, (src, trg) in enumerate(iterator):
            src, trg = src.to(model.device), trg.to(model.device)
            output = model(src, trg, 0)
            output_dim = output.shape[-1]

            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Hyperparameters
INPUT_DIM = len(src_vocab)
OUTPUT_DIM = len(tar_vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

# Initialize encoder and decoder
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize seq2seq model
model = Seq2Seq(enc, dec, device).to(device)

# Optimizer
optimizer = optim.Adam(model.parameters())

# Loss function
criterion = nn.CrossEntropyLoss(ignore_index=src_vocab['<pad>'])

# Example training loop
N_EPOCHS = 10
CLIP = 1

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f}')

# Example inference
def translate_sentence(sentence, src_vocab, tar_vocab, model, device, max_len=50):
    model.eval()

    tokens = [token.lower() for token in sentence.split()]
    tokens = [src_vocab["<bos>"]] + [src_vocab[token] for token in tokens] + [src_vocab["<eos>"]]

    src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    trg_indexes = [tar_vocab["<bos>"]]

    for i in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)

        if pred_token == tar_vocab["<eos>"]:
            break

    trg_tokens = [tar_vocab.lookup_token(i) for i in trg_indexes]

    return trg_tokens[1:]

# Example usage
sentence = "A man is walking."

translation = translate_sentence(sentence, src_vocab, tar_vocab, model, device)
print(f'predicted trg: {" ".join(translation)}')


Source vocabulary size: 6531
Target vocabulary size: 12826
Epoch: 01
	Train Loss: 4.511
	 Val. Loss: 4.091
Epoch: 02
	Train Loss: 3.384
	 Val. Loss: 3.611
Epoch: 03
	Train Loss: 2.818
	 Val. Loss: 3.329
Epoch: 04
	Train Loss: 2.463
	 Val. Loss: 3.181
Epoch: 05
	Train Loss: 2.185
	 Val. Loss: 3.071
Epoch: 06
	Train Loss: 1.972
	 Val. Loss: 2.992
Epoch: 07
	Train Loss: 1.814
	 Val. Loss: 2.938
Epoch: 08
	Train Loss: 1.677
	 Val. Loss: 2.911
Epoch: 09
	Train Loss: 1.565
	 Val. Loss: 2.894
Epoch: 10
	Train Loss: 1.466
	 Val. Loss: 2.854
predicted trg: un homme homme est . . . ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? . . . . ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
