<a href="https://colab.research.google.com/github/jungwoo1208/AI_Study/blob/main/seq2seq_translator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 영어-한글로 만들고 싶었으나 데이터 부족으로 영어-프랑스어로 제작해봄

In [1]:
import os
import re
import unicodedata
import urllib3
import zipfile
import shutil
import numpy as np
import pandas as pd
import torch
from collections import Counter
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

In [2]:
num_samples = 100000

In [3]:
!wget -c https://www.manythings.org/anki/fra-eng.zip &&unzip -o fra-eng.zip

--2025-08-12 05:15:05--  https://www.manythings.org/anki/fra-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8143096 (7.8M) [application/zip]
Saving to: ‘fra-eng.zip’


2025-08-12 05:15:06 (21.4 MB/s) - ‘fra-eng.zip’ saved [8143096/8143096]

Archive:  fra-eng.zip
  inflating: _about.txt              
  inflating: fra.txt                 


In [4]:
def unicode_to_ascii(s):
  return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

In [5]:
def preprocess_sentence(sent):

  sent = unicode_to_ascii(sent.lower())
  sent= re.sub(r"([?.!,?`])",r" \1",sent)
  sent= re.sub(r"[^a-zA-Z!,?`]+"," ",sent)
  sent = re.sub(r"\s+", " ",sent)

  return sent

In [6]:
def load_preprocessed_data():
    encoder_input, decoder_input, decoder_target = [], [], []

    with open("fra.txt", "r") as lines:
        for i, line in enumerate(lines):
            src_line, tar_line, _ = line.strip().split("\t")

            src_line = [w for w in preprocess_sentence(src_line).split()]

            tar_line = preprocess_sentence(tar_line)
            tar_line_in = [w for w in ("<sos> " + tar_line).split()]
            tar_line_out = [w for w in (tar_line + " <eos>").split()]

            encoder_input.append(src_line)
            decoder_input.append(tar_line_in)
            decoder_target.append(tar_line_out)

    return encoder_input, decoder_input, decoder_target

In [7]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()

In [8]:
def build_vocab(sents):
    word_list = []
    for sent in sents:
        for word in sent:
            word_list.append(word)
    word_counts = Counter(word_list)
    vocab = sorted(word_counts, key=word_counts.get, reverse=True)

    word_to_index = {}
    word_to_index["<PAD>"] = 0
    word_to_index["<UNK>"] = 1

    for index, word in enumerate(vocab):
        word_to_index[word] = index + 2

    return word_to_index

In [9]:
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_fra_in+ sents_fra_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)

print(src_vocab_size)
print(tar_vocab_size)

16331
26065


In [10]:
index_to_src ={v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def texts_to_sequences(sents,word_to_index):
  encoded_X_data =[]
  for sent in tqdm(sents):
    index_sequences =[]
    for word in sent:
      try:
        index_sequences.append(word_to_index[word])
      except KeyError:
        index_sequences.append(word_to_index["<UNK>"])
    encoded_X_data.append(index_sequences)
  return encoded_X_data


In [11]:
encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_fra_in, tar_vocab)
decoder_target = texts_to_sequences(sents_fra_out, tar_vocab)

for i, (item1,item2) in zip(range(5), zip(sents_en_in, encoder_input)):
  print(f"index:{i}   {item1} -> {item2}")

100%|██████████| 237838/237838 [00:00<00:00, 310802.44it/s]
100%|██████████| 237838/237838 [00:00<00:00, 270785.29it/s]
100%|██████████| 237838/237838 [00:00<00:00, 733057.28it/s]

index:0   ['go'] -> [51]
index:1   ['go'] -> [51]
index:2   ['go'] -> [51]
index:3   ['go'] -> [51]
index:4   ['hi'] -> [2597]





In [12]:
def pad_sequences(sentences, max_len=None):
  if max_len is None:
    max_len = max([len(sentence) for sentence in sentences])
  features = np.zeros((len(sentences),max_len),dtype=int)
  for index, sentence in enumerate(sentences):
    if len(sentence) !=0:
      features[index, :len(sentence)] = np.array(sentence)[:max_len]
  return features

In [13]:
encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)

In [14]:
print(encoder_input.shape)
print(decoder_input.shape)
print(decoder_target.shape)

(237838, 65)
(237838, 69)
(237838, 69)


In [15]:
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print(indices)

[134400  74809 134942 ...   6638 144837  12499]


In [16]:
encoder_input= encoder_input[indices]
decoder_input= decoder_input[indices]
decoder_target= decoder_target[indices]

In [17]:
print([index_to_src[word] for word in encoder_input[6242]])
print([index_to_tar[word] for word in decoder_input[6242]])
print([index_to_tar[word] for word in decoder_target[6242]])

['i', 'don', 't', 'take', 'it', 'personally', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['<sos>', 'je', 'ne', 'le', 'prends', 'pas', 'personnellement', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>',

In [18]:
n_of_val =int(100000*0.1)


In [19]:
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [20]:
print(encoder_input_train.shape)
print(decoder_input_train.shape)
print(decoder_target_train.shape)
print(encoder_input_test.shape)
print(decoder_input_test.shape)
print(decoder_target_test.shape)

(227838, 65)
(227838, 69)
(227838, 69)
(10000, 65)
(10000, 69)
(10000, 69)


In [21]:
import torch
import torch.nn as nn
import torch.optim as optim

embadding_dim = 256
hidden_units = 256

class Encoder(nn.Module):
  def __init__(self, src_vocab_size, embadding_dim, hidden_units):
    super(Encoder, self).__init__()
    self.embedding = nn.Embedding(src_vocab_size, embadding_dim, padding_idx=0)
    self.lstm = nn.LSTM(embadding_dim, hidden_units, batch_first=True)

  def forward(self, x):
    x = self.embedding(x)
    _, (hidden, cell)= self.lstm(x)
    return hidden, cell

class Decoder(nn.Module):
  def __init__(self, tar_vocab_size, embadding_dim, hidden_units):
    super(Decoder, self).__init__()
    self.embedding = nn.Embedding(tar_vocab_size, embadding_dim, padding_idx=0)
    self.lstm = nn.LSTM(embadding_dim, hidden_units, batch_first=True)
    self.fc = nn.Linear(hidden_units, tar_vocab_size)

  def forward(self, x, hidden, cell):
    x = self.embedding(x)
    output, (hidden, cell) = self.lstm(x, (hidden, cell))
    output = self.fc(output)
    return output, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        hidden, cell = self.encoder(src)

        # Ensure hidden and cell states are contiguous before passing to the decoder
        hidden = hidden.contiguous()
        cell = cell.contiguous()

        output, _, _ = self.decoder(trg, hidden, cell)
        return output

encoder = Encoder(src_vocab_size, embadding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embadding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)

loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [22]:
print(model)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(16331, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(26065, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=26065, bias=True)
  )
)


In [23]:
def evaluation(model, dataloader, loss_function, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for encoder_input, decoder_input, decoder_target in dataloader:
            encoder_input = encoder_input.to(device)
            decoder_input = decoder_input.to(device)
            decoder_target = decoder_target.to(device)

            outputs = model(encoder_input, decoder_input)
            loss = loss_function(
                outputs.view(-1, tar_vocab_size),
                decoder_target.view(-1)
            )
            total_loss += loss.item()

            mask = decoder_target != 0  # padding 제외
            preds = outputs.argmax(dim=2)
            total_correct += (preds == decoder_target)[mask].sum().item()
            total_count += mask.sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_count if total_count > 0 else 0
    model.train()  # 평가 후 다시 학습 모드로
    return avg_loss, accuracy


In [24]:
encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)

encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)

batch_size = 64
train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [25]:
epochs =10
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(16331, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(26065, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=26065, bias=True)
  )
)

In [26]:
best_val_loss =float('inf')

for epoch in range(epochs):
  model.train()

  for encoder_input, decoder_input, decoder_target in train_dataloader:
    encoder_input = encoder_input.to(device)
    decoder_input = decoder_input.to(device)
    decoder_target = decoder_target.to(device)

    optimizer.zero_grad()
    outputs = model(encoder_input, decoder_input)

    loss= loss_function(outputs.view(-1, tar_vocab_size), decoder_target.view(-1))
    loss.backward()
    optimizer.step()
  train_loss, train_acc = evaluation(model, train_dataloader, loss_function, device)
  val_loss, val_acc = evaluation(model, test_dataloader, loss_function, device)

  print(f"Epoch: {epoch+1}")
  print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

  if val_loss < best_val_loss:
    best_val_loss = val_loss
    torch.save(model.state_dict(), 'best_model.pth')

Epoch: 1
Train Loss: 0.3502, Train Acc: 0.4543
Epoch: 2
Train Loss: 0.2457, Train Acc: 0.5756
Epoch: 3
Train Loss: 0.1903, Train Acc: 0.6451
Epoch: 4
Train Loss: 0.1554, Train Acc: 0.6948
Epoch: 5
Train Loss: 0.1325, Train Acc: 0.7293
Epoch: 6
Train Loss: 0.1160, Train Acc: 0.7580
Epoch: 7
Train Loss: 0.1037, Train Acc: 0.7797
Epoch: 8
Train Loss: 0.0934, Train Acc: 0.7980
Epoch: 9
Train Loss: 0.0855, Train Acc: 0.8119
Epoch: 10
Train Loss: 0.0794, Train Acc: 0.8240


In [27]:
print(outputs.shape)  # (batch, seq_len, vocab_size)
print(decoder_target.shape)  # (batch, seq_len)
print(tar_vocab_size)


torch.Size([62, 69, 26065])
torch.Size([62, 69])
26065


In [28]:
model.load_state_dict(torch.load('best_model.pth'))
model.to(device)

val_loss, val_acc = evaluation(model, test_dataloader, loss_function, device)
print(f"Validation Loss: {val_loss:.4f}, Validation Acc: {val_acc:.4f}")

Validation Loss: 0.2041, Validation Acc: 0.6652


In [29]:
index_to_src ={v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def seq_to_src(input_seq):
  sentence=''
  for encoded_word in input_seq:
    if(encoded_word!=0):
      sentence=sentence+index_to_src[encoded_word]+' '
  return sentence

def seq_to_tar(input_seq):
  sentence=''
  for encoded_word in input_seq:
    if (encoded_word!=0 and encoded_word!= tar_vocab['<sos>'] and encoded_word!= tar_vocab['<eos>']):
      sentence=sentence+index_to_tar[encoded_word]+' '
  return sentence

In [30]:
def decode_sequence(input_seq, model, max_output_len, tar_vocab, index_to_tar):
    # 1. 모델을 평가 모드로 설정
    model.eval()

    # 입력 시퀀스를 텐서로 변환하고 디바이스로 이동
    encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

    # 인코더를 통해 초기 hidden, cell state 얻기
    with torch.no_grad(): # 추론 시에는 gradient 계산이 필요 없음
        hidden, cell = model.encoder(encoder_inputs)

    # 2. <sos> 토큰의 실제 인덱스를 tar_vocab에서 가져와 시작점으로 사용
    decoder_input = torch.tensor([tar_vocab['<sos>']], dtype=torch.long).unsqueeze(0).to(device)

    decoded_tokens = []

    # 설정된 최대 길이만큼 반복하여 단어 생성
    for _ in range(max_output_len):
        with torch.no_grad():
            # 디코더로 다음 단어 예측
            output, hidden, cell = model.decoder(decoder_input, hidden, cell)

        # 가장 확률이 높은 단어의 인덱스 추출
        output_token = output.argmax(dim=-1).item()

        # 3. <eos> 토큰의 실제 인덱스를 만나면 번역 종료
        if output_token == tar_vocab['<eos>']:
            break

        decoded_tokens.append(output_token)

        # 현재 예측된 단어를 다음 시점의 입력으로 사용
        decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)

    # 정수 시퀀스를 문자열로 변환하여 반환
    return ' '.join(index_to_tar.get(token, '<unk>') for token in decoded_tokens)

# ===== 함수 호출 부분도 아래와 같이 수정해야 합니다 =====

for seq_index in [3, 50, 100, 300, 1001]:
    input_seq = encoder_input_train[seq_index]

    # 수정한 함수에 맞게 인자 전달
    translated_text = decode_sequence(input_seq, model, 20, tar_vocab, index_to_tar)

    print("입력 문장:", seq_to_src(encoder_input_train[seq_index]))
    print("정답 문장:", seq_to_tar(decoder_input_train[seq_index]))
    print("번역 문장:", translated_text)
    print("="*50)

입력 문장: i know you re not like that 
정답 문장: je sais que vous n etes pas comme ca 
번역 문장: je sais que vous n etes pas comme ca
입력 문장: don t do too much at the same time 
정답 문장: n en faites pas trop en meme temps 
번역 문장: ne fais pas trop de bruit pour toujours
입력 문장: i lost my passport i ll have to get a new one 
정답 문장: j ai perdu mon passeport je devrai en refaire faire un 
번역 문장: j ai perdu mon passeport je devrai en avoir un nouveau
입력 문장: they let me go 
정답 문장: elles m ont laissee partir 
번역 문장: ils m ont laissee partir
입력 문장: somebody has stolen my suitcase 
정답 문장: quelqu un a vole ma valise 
번역 문장: quelqu un a vole ma valise
