## RNN example 06: seq2seq

In [None]:
import random
import torch
import torch.nn as nn
import torch.optim as optim

# 랜덤 시드 설정
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 번역할 문장 쌍
raw_data = [
    "I feel hungry. 나는 배가 고프다.",
    "Pytorch is very easy. 파이토치는 매우 쉽다.",
    "Pytorch is a framework for deep learning. 파이토치는 딥러닝을 위한 프레임워크이다.",
    "Pytorch is very clear to use. 파이토치는 사용하기 매우 직관적이다."
]

# Define special tokens
SOS_token = 0
EOS_token = 1

class Vocab:
    def __init__(self):
        """어휘 사전 초기화"""
        self.vocab2index = {"<SOS>": SOS_token, "<EOS>": EOS_token}
        self.index2vocab = {SOS_token: "<SOS>", EOS_token: "<EOS>"}
        self.vocab_count = {}
        self.n_vocab = len(self.vocab2index)

    def add_vocab(self, sentence):
        """문장 내 단어를 어휘 사전에 추가"""
        for word in sentence.split(" "):
            if word not in self.vocab2index:
                self.vocab2index[word] = self.n_vocab
                self.vocab_count[word] = 1
                self.index2vocab[self.n_vocab] = word
                self.n_vocab += 1
            else:
                self.vocab_count[word] += 1

def is_valid_pair(pair, source_max_length, target_max_length):
    """문장 쌍이 최대 길이 조건을 만족하는지 확인"""
    return len(pair[0].split(" ")) < source_max_length and len(pair[1].split(" ")) < target_max_length

def preprocess_data(corpus, source_max_length, target_max_length):
    """데이터 전처리 함수"""
    print("Reading corpus...")
    pairs = [[s for s in line.strip().lower().split("\t")] for line in corpus]
    print("Read {} sentence pairs".format(len(pairs)))

    # 최대 길이 조건에 맞는 문장 쌍만 필터링
    pairs = [pair for pair in pairs if is_valid_pair(pair, source_max_length, target_max_length)]
    print("Trimmed to {} sentence pairs".format(len(pairs)))

    # 소스와 타겟 어휘 사전 생성
    source_vocab = Vocab()
    target_vocab = Vocab()

    print("Counting words...")
    for pair in pairs:
        source_vocab.add_vocab(pair[0])
        target_vocab.add_vocab(pair[1])
    print("source vocab size =", source_vocab.n_vocab)
    print("target vocab size =", target_vocab.n_vocab)

    return pairs, source_vocab, target_vocab

# 전처리 함수 호출 예제
# 예: preprocess_data(raw_data, 10, 12)


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        """Initialize the encoder with embedding and LSTM layers."""
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)

    def forward(self, x, hidden):
        """Forward pass through the encoder."""
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.lstm(x, hidden)
        return x, hidden

class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        """Initialize the decoder with embedding, LSTM, and linear layers."""
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, x, hidden):
        """Forward pass through the decoder."""
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.lstm(x, hidden)
        x = self.softmax(self.out(x[0]))
        return x, hidden

def tensorize(vocab, sentence):
    """Convert a sentence to a tensor of word indices."""
    indexes = [vocab.vocab2index[word] for word in sentence.split(" ")]
    indexes.append(vocab.vocab2index["<EOS>"])
    return torch.Tensor(indexes).long().to(device).view(-1, 1)

In [None]:
# Training function
def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01):
    """Train the Seq2Seq model."""
    loss_total = 0

    # Optimizers for the encoder and decoder
    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

    # Prepare training data
    training_batch = [random.choice(pairs) for _ in range(n_iter)]
    training_source = [tensorize(source_vocab, pair[0]) for pair in training_batch]
    training_target = [tensorize(target_vocab, pair[1]) for pair in training_batch]

    criterion = nn.NLLLoss()

    for i in range(1, n_iter + 1):
        source_tensor = training_source[i - 1]
        target_tensor = training_target[i - 1]

        # Initialize LSTM hidden and cell states
        encoder_hidden = (torch.zeros([1, 1, encoder.hidden_size]).to(device),
                          torch.zeros([1, 1, encoder.hidden_size]).to(device))

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        source_length = source_tensor.size(0)
        target_length = target_tensor.size(0)

        loss = 0

        # Pass through the encoder
        for enc_input in range(source_length):
            _, encoder_hidden = encoder(source_tensor[enc_input], encoder_hidden)

        # Decoder with teacher forcing
        decoder_input = torch.Tensor([[SOS_token]]).long().to(device)
        decoder_hidden = encoder_hidden

        for di in range(target_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        loss_iter = loss.item() / target_length
        loss_total += loss_iter

        if i % print_every == 0:
            loss_avg = loss_total / print_every
            loss_total = 0
            print(f"[{i} - {i / n_iter * 100}%] loss = {loss_avg:.4f}")

# Evaluation function
def evaluate(pairs, source_vocab, target_vocab, encoder, decoder, target_max_length):
    """Evaluate the Seq2Seq model."""
    for pair in pairs:
        print(">", pair[0])
        print("=", pair[1])
        source_tensor = tensorize(source_vocab, pair[0])
        source_length = source_tensor.size()[0]

        # Initialize LSTM hidden and cell states
        encoder_hidden = (torch.zeros([1, 1, encoder.hidden_size]).to(device),
                          torch.zeros([1, 1, encoder.hidden_size]).to(device))

        # Pass through the encoder
        for ei in range(source_length):
            _, encoder_hidden = encoder(source_tensor[ei], encoder_hidden)

        # Decoder
        decoder_input = torch.Tensor([[SOS_token]], device=device).long()
        decoder_hidden = encoder_hidden
        decoded_words = []

        for di in range(target_max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            _, top_index = decoder_output.data.topk(1)
            if top_index.item() == EOS_token:
                decoded_words.append("<EOS>")
                break
            else:
                decoded_words.append(target_vocab.index2vocab[top_index.item()])

            decoder_input = top_index.squeeze().detach()

        predict_words = decoded_words
        predict_sentence = " ".join(predict_words)
        print("<", predict_sentence)
        print("")

# Note: Make sure to initialize both the hidden and cell states for the LSTM in both encoder and decoder.


In [None]:
# Maximum lengths for source and target sentences
SOURCE_MAX_LENGTH = 10
TARGET_MAX_LENGTH = 12

# Preprocess the raw data
load_pairs, load_source_vocab, load_target_vocab = preprocess(raw, SOURCE_MAX_LENGTH, TARGET_MAX_LENGTH)

# Display a random pair for verification
print(random.choice(load_pairs))

# Encoder and Decoder hidden layer sizes
enc_hidden_size = 16
dec_hidden_size = enc_hidden_size  # Typically, encoder and decoder have the same size

# Initialize Encoder and Decoder models
enc = Encoder(load_source_vocab.n_vocab, enc_hidden_size).to_device(device)
dec = Decoder(dec_hidden_size, load_target_vocab.n_vocab).to_device(device)

# Train the Seq2Seq model
train(load_pairs, load_source_vocab, load_target_vocab, enc, dec, 5000, print_every=1000)

# Evaluate the trained model
evaluate(load_pairs, load_source_vocab, load_target_vocab, enc, dec, TARGET_MAX_LENGTH)

## RNN example 07: pacekd sequence

In [None]:
import torch
import numpy as np
from torch.nn.utils.rnn import pad_sequence, pack_sequence, pack_padded_sequence, pad_packed_sequence

# 랜덤 단어 생성기에서 나온 임의의 단어들
data = ['hello world', 'midnight', 'calculation', 'path', 'short circuit']

# 사전 만들기
# 모든 문자를 포함하고 패딩 토큰 '<pad>'도 포함
char_set = ['<pad>'] + list(set(char for seq in data for char in seq))
char2idx = {char: idx for idx, char in enumerate(char_set)} # 문자를 인덱스로 매핑
print('char_set:', char_set)
print('char_set length:', len(char_set))

# 문자를 인덱스로 변환하고 텐서 리스트 생성
X = [torch.LongTensor([char2idx[char] for char in seq]) for seq in data]

# 변환된 결과 확인
for sequence in X:
    print(sequence)

# 시퀀스 길이 텐서 생성 ('pack_padded_sequence' 함수에서 사용됨)
lengths = [len(seq) for seq in X]
print('lengths:', lengths)

# 모든 시퀀스를 동일한 길이로 패딩 (배치 처리를 위함)
padded_sequence = pad_sequence(X, batch_first=True)
print(padded_sequence)
print(padded_sequence.shape)

# 길이에 따라 내림차순 정렬
sorted_idx = sorted(range(len(lengths)), key=lengths.__getitem__, reverse=True)
sorted_X = [X[idx] for idx in sorted_idx]

# 정렬된 결과 확인
for sequence in sorted_X:
    print(sequence)

# 패킹된 시퀀스 생성
packed_sequence = pack_sequence(sorted_X)
print(packed_sequence)

In [None]:
# 원-핫 인코딩을 사용한 패딩된 시퀀스
eye = torch.eye(len(char_set)) # Identity matrix 생성
embedded_tensor = eye[padded_sequence] # 원-핫 인코딩 적용
print(embedded_tensor.shape) # shape: (Batch_size, max_sequence_length, number_of_input_tokens)

# 원-핫 인코딩을 사용한 패킹된 시퀀스
embedded_packed_seq = pack_sequence([eye[X[idx]] for idx in sorted_idx])
print(embedded_packed_seq.data.shape)

# RNN 선언
rnn = torch.nn.RNN(input_size=len(char_set), hidden_size=30, batch_first=True)

# 패딩된 시퀀스를 사용하여 RNN 실행
rnn_output, hidden = rnn(embedded_tensor)
print(rnn_output.shape) # shape: (batch_size, max_seq_length, hidden_size)
print(hidden.shape)     # shape: (num_layers * num_directions, batch_size, hidden_size)

# 패킹된 시퀀스를 사용하여 RNN 실행
rnn_output, hidden = rnn(embedded_packed_seq)
print(rnn_output.data.shape)
print(hidden.data.shape)

# 패킹된 시퀀스 언패킹
unpacked_sequence, seq_lengths = pad_packed_sequence(embedded_packed_seq, batch_first=True)
print(unpacked_sequence.shape)
print(seq_lengths)

# 원-핫 인코딩된 패딩된 시퀀스 생성
embedded_padded_sequence = eye[pad_sequence(sorted_X, batch_first=True)]
print(embedded_padded_sequence.shape)

# 패딩된 시퀀스 패킹
sorted_lengths = sorted(lengths, reverse=True)
new_packed_sequence = pack_padded_sequence(embedded_padded_sequence, sorted_lengths, batch_first=True)
print(new_packed_sequence.data.shape)
print(new_packed_sequence.batch_sizes)
