In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
from collections import Counter
from typing import Tuple 

# 데이터 처리 모듈

In [None]:
# data_handler.py
# Vocabulary class to handle token-index mapping
class Vocabulary:
    """
    A class to handle the mapping between words and their corresponding indices. This class also handles special tokens
    like PAD, SOS, EOS, and OOV which are essential in sequence modeling for padding, indicating start, end of sequences, 
    and out-of-vocabulary tokens.

    Attributes:
        PAD (str): Padding token.
        SOS (str): Start-of-sequence token.
        EOS (str): End-of-sequence token.
        OOV (str): Out-of-vocabulary token.
        pad_idx (int): Index of the padding token.
        sos_idx (int): Index of the start-of-sequence token.
        eos_idx (int): Index of the end-of-sequence token.
        oov_idx (int): Index of the out-of-vocabulary token.
        word2index (dict): Mapping of words to indices.
        index2word (dict): Mapping of indices to words.
        word_count (dict): Count of each word's occurrences.
        n_words (int): Total number of words including special tokens.
    """
    PAD = '[PAD]'
    SOS = '[SOS]'
    EOS = '[EOS]'
    OOV = '[OOV]'
    pad_idx = 0
    sos_idx = 1
    eos_idx = 2
    oov_idx = 3
    SPECIAL_TOKENS = [PAD, SOS, EOS, OOV]

    def __init__(self, word_count_threshold = 0):
        """
        Initializes the Vocabulary class, adding special tokens and initializing the word-index mappings.
        
        Args:
            word_count_threshold (float): The threshold for filtering OOV words. 
        """
        self.word2index = {}
        self.index2word = {}
        self.word_count = {}
        self.n_words = 0  # Count PAD, SOS, EOS, OOV tokens
        self.threshold = word_count_threshold

        # Special tokens
        self.pad_idx = Vocabulary.pad_idx
        self.sos_idx = Vocabulary.sos_idx
        self.eos_idx = Vocabulary.eos_idx
        self.oov_idx = Vocabulary.oov_idx

        # Initialize the special tokens
        self.add_word(Vocabulary.PAD)
        self.add_word(Vocabulary.SOS)
        self.add_word(Vocabulary.EOS)
        self.add_word(Vocabulary.OOV)

    def add_word(self, word):
        """
        Adds a word to the vocabulary if it doesn't exist, or increments its count if it does.

        Args:
            word (str): The word to add.
        """
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word_count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words +=  1
        else:
            self.word_count[word] +=  1

    def add_sentence(self, sentence):
        """
        Adds each word in a sentence to the vocabulary.

        Args:
            sentence (list of str): List of words to add.
        """
        for word in sentence:
            self.add_word(word)

    def word_to_index(self, word):
        """
        Maps a word to its corresponding index. If the word is not found, it returns the index for OOV.

        Args:
            word (str): The word to convert.

        Returns:
            int: Index of the word, or the OOV index if the word is not in the vocabulary.
        """
        return self.word2index.get(word, self.oov_idx)

def indices_to_one_hot(batch, vocab_size):
    """
    Converts a batch of indices to one-hot encoded vectors.

    Args:
        batch (torch.Tensor): Input tensor of indices with shape [batch_size, seq_length].
        vocab_size (int): Size of the vocabulary for the one-hot encoding.

    Returns:
        torch.Tensor: One-hot encoded tensor with shape [batch_size, seq_length, vocab_size].
    """
    assert batch.dim() == 2, f"Input batch should have 2 dimensions, but got {batch.dim()}"
    batch_size, seq_length = batch.size()
    
    one_hot = torch.zeros(batch_size, seq_length, vocab_size).to(batch.device)
    one_hot.scatter_(2, batch.unsqueeze(2), 1)  # [batch_size, seq_length, vocab_size]
    
    assert one_hot.size() == (batch_size, seq_length, vocab_size), f"Output should have shape {(batch_size, seq_length, vocab_size)}, but got {one_hot.size()}"
    
    return one_hot

def generate_target_sequence(input_seq, length, lookback = 3, mod = 10):
    res = []
    
    for idx in range(length):
        if idx < lookback:
            res.append(sum(input_seq[-lookback+idx:] + res[:idx]) % mod)
        else:
            res.append(sum(res[-lookback:]) % mod)
    
    return res 

def create_integer_sequence_dataset(num_samples = 1000, max_input_len = 10, max_target_len = 15, vocab_size = 20, batch_size = 32, train_valid_test = (0.8, 0.1, 0.1)):
    """
    Creates a dataset of integer sequences, where input sequences are randomly generated and their targets are sorted sequences.

    Args:
        num_samples (int): The number of samples to generate in the dataset.
        max_input_len (int): The maximum length of input sequences in the dataset.
        max_target_len (int): The maximum length of target sequences in the dataset.
        vocab_size (int): The size of the vocabulary.
        batch_size (int): The batch size for the DataLoader.
        train_valid_test (tuple): Proportions for splitting the dataset into training, validation, and testing sets.

    Returns:
        tuple: A tuple containing the list of DataLoader objects for training, validation, and testing sets, 
               and the associated Vocabulary object.
    """
    vocab = Vocabulary()
    no_special_token = len(Vocabulary.SPECIAL_TOKENS)
    for i in range(no_special_token, vocab_size):  
        vocab.add_word(str(i))

    data = []
    for _ in range(num_samples):
        input_len = random.randint(3, max_input_len)
        
        input_seq = [random.randint(no_special_token, vocab_size - 1) for _ in range(input_len)]
        target_seq = input_seq 
        # target_seq = generate_target_sequence(input_seq, max_target_len)
        
        # input_seq = [(no_special_token + i) % vocab_size for i in range(input_len)]
        # target_seq = [(input_seq[-1] + i) % vocab_size for i in range(len(input_seq))]

        data.append((input_seq, target_seq))

    # Split dataset
    lengths = [int(num_samples * ratio) for ratio in train_valid_test]
    lengths[-1] = num_samples - sum(lengths[:-1])  # Adjust the last set size to account for rounding

    datasets = random_split(data, lengths)
    dataloaders = [DataLoader(dataset, batch_size = batch_size, shuffle = True, collate_fn = collate_fn) for dataset in datasets]

    return dataloaders, vocab


def create_lang_pair(data_file, batch_size = 32, train_valid_test = (0.8, 0.1, 0.1)):
    source_vocab = Vocabulary()
    target_vocab = Vocabulary()

    text = open(data_file, 'r', encoding = 'utf-8').read() 
    data = []
    num_samples = 0
    for line in text.split('\n'):
        line = line.strip()
        try:
            source, target, _ = line.split('\t')
        except ValueError:
            try:
                source, target = line.split('\t') 
            except ValueError:
                continue 

        source = source.strip().split()
        target = target.strip().split()
        source_vocab.add_sentence(source)
        target_vocab.add_sentence(target)

        data.append((source, target))
        num_samples += 1

    lengths = [int(num_samples * ratio) for ratio in train_valid_test]
    lengths[-1] = num_samples - sum(lengths[:-1])  # Adjust the last set size to account for rounding

    datasets = random_split(data, lengths)
    dataloaders = [DataLoader(dataset, batch_size = batch_size, shuffle = True, collate_fn = lambda x: collate_fn_language(x, source_vocab, target_vocab)) for dataset in datasets]

    return dataloaders, source_vocab, target_vocab

def collate_fn(batch):
    """
    Custom collate function for batching integer sequence data.
    
    Args:
        batch (list of tuple): List of input-output sequence pairs where input is an unsorted sequence of integers, and target is the sorted sequence.

    Returns:
        tuple: Padded input and target tensors of shape [batch_size, max_seq_len].
    """
    input_seqs = [item[0] for item in batch]
    target_seqs = [item[1] for item in batch]

    # Add EOS token to sequences
    input_seqs = [seq + [Vocabulary.eos_idx] for seq in input_seqs]
    target_seqs = [[Vocabulary.sos_idx] + seq + [Vocabulary.eos_idx] for seq in target_seqs]

    # Get max sequence lengths
    input_max_len = max([len(seq) for seq in input_seqs])
    target_max_len = max([len(seq) for seq in target_seqs])

    # Pad sequences
    input_padded = []
    for seq in input_seqs:
        seq = seq + [Vocabulary.pad_idx] * (input_max_len - len(seq))
        input_padded.append(seq)

    target_padded = []
    for seq in target_seqs:
        seq = seq + [Vocabulary.pad_idx] * (target_max_len - len(seq))
        target_padded.append(seq)

    # Convert to tensors
    input_padded = torch.tensor(input_padded, dtype = torch.long)  # [batch_size, max_seq_len]
    target_padded = torch.tensor(target_padded, dtype = torch.long)  # [batch_size, max_seq_len]

    return input_padded, target_padded

def collate_fn_language(batch, source_vocab, target_vocab):
    """
    Custom collate function for batching language sequence data.

    Args:
        batch (list of tuple): List of input-output sequence pairs where input is an unsorted sequence of words, 
                               and target is the reversed sequence.
        vocab (Vocabulary): Vocabulary object for word-to-index conversions.

    Returns:
        tuple: Padded input and target tensors of shape [batch_size, max_seq_len].
    """
    input_seqs = [item[0] for item in batch]
    target_seqs = [item[1] for item in batch]

    # Convert words to indices and add EOS token
    input_seqs_indices = []
    for seq in input_seqs:
        seq_indices = [source_vocab.word_to_index(word) for word in seq] + [source_vocab.eos_idx]
        input_seqs_indices.append(seq_indices)

    target_seqs_indices = []
    for seq in target_seqs:
        seq_indices = [target_vocab.sos_idx] + [target_vocab.word_to_index(word) for word in seq] + [target_vocab.eos_idx]
        target_seqs_indices.append(seq_indices)

    # Get max sequence lengths
    input_max_len = max([len(seq) for seq in input_seqs_indices])
    target_max_len = max([len(seq) for seq in target_seqs_indices])

    # Pad sequences
    input_padded = []
    for seq in input_seqs_indices:
        seq = seq + [source_vocab.pad_idx] * (input_max_len - len(seq))
        input_padded.append(seq)

    target_padded = []
    for seq in target_seqs_indices:
        seq = seq + [target_vocab.pad_idx] * (target_max_len - len(seq))
        target_padded.append(seq)

    # Convert to tensors
    input_padded = torch.tensor(input_padded, dtype = torch.long)  # [batch_size, max_seq_len]
    target_padded = torch.tensor(target_padded, dtype = torch.long)  # [batch_size, max_seq_len]

    return input_padded, target_padded

if __name__ == '__main__':
    # 데이터셋을 생성하는 함수 호출
    # 'kor.txt' 파일을 읽어 (train, valid, test) 데이터와 source_vocab, target_vocab을 생성
    (train_data, valid_data, test_data), source_vocab, target_vocab = create_lang_pair('kor.txt')

    # 학습 데이터에서 첫 번째 샘플을 출력하는 루프
    for input_sequence, target_sequence in train_data:
        # 첫 번째 입력 시퀀스 출력
        print("첫 번째 입력 시퀀스 (Input Sequence):", input_sequence[0].tolist())  # 첫 번째 입력 시퀀스의 값을 리스트로 변환하여 출력
        print("첫 번째 타겟 시퀀스 (Target Sequence):", target_sequence[0].tolist())  # 첫 번째 타겟 시퀀스의 값을 리스트로 변환하여 출력
        
        # 각 시퀀스의 의미를 설명
        print("\n# 각 시퀀스의 의미")
        print("- 입력 시퀀스는 모델에 주어진 단어의 인덱스를 나타냅니다.")
        print("- 타겟 시퀀스는 모델이 예측해야 하는 정답 단어의 인덱스를 나타냅니다.")
        
        # 더 많은 정보를 출력하기 위해 각 시퀀스의 길이도 출력
        print(f"\n입력 시퀀스의 길이: {len(input_sequence[0])} (단어 수)")
        print(f"타겟 시퀀스의 길이: {len(target_sequence[0])} (단어 수)")
        
        break  # 첫 번째 샘플만 출력하고 루프 종료


---
## 이하
---

- seq2seq
- 인코더
  - 입력문장을 읽고 인코딩
  - RNN구조로 입력 문장의 단어를 순차적 처리
  - 처리 후 최종 hidden state를 문장 임베딩으로 사용
- 디코더
  - 인코딩의 문장 임베딩을 입력으로 받아 디코딩하여 한 단어씩 출력, 번역된 문장을 생성
- 번역과정
  1. 인코더가 입력문장을 단어 단위로 읽어 문장 임베딩을 생성
  2. 디코더는 문장 임베딩과 이전에 생성된 단어를 입력으로 받아 다음 단어의 확률을 계산
  3. 가장 높은 확률의 단어를 선택하여 출력
  4. 2-3 과정을 반복 전체 번역문 생성
  5. 특수 토큰(ex:<EOS>)이 생성되면 번역 종료
- Beam Search: 첫 단어 선택의 오류를 방지하기 위해 Top-K개의 가능성 있는 번역 후보를 유지하며 번역을 진행
- Teacher Forcing: 학습 시 이전 시점의 실제 정답 단어를 다음 입력으로 사용하여 학습 속도와 성능을 향상
- Attention 메커니즘: 긴 문장 번역 시 성능 향상을 위해 입력 문장의 관련 부분에 집중하는 기법을 사용

In [None]:
def generate_dataset(seq_len, n_samples, vocab_size):
    # torch.randint(최소값, 최대값, 사이즈): 1~vocab_size-1 범위의 무작위 정수로 구성된 텐서
    inputs = torch.randint(1, vocab_size, (n_samples, seq_len))
    outputs = inputs.clone() # 복사본 대입
    return TensorDataset(inputs, outputs)

class Encoder(nn.Module):
    def __init__(
        self,
        input_size: int,
        hidden_size: int
        ):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.linear = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()
    
    def forward(self, input_seq: torch.tensor) -> torch.tensor:
        batch_size, seq_length = input_seq.size()
        hidden = torch.zeros(batch_size, self.hidden_size) # 은닉상태값 초기화
        # 시퀀스 길이로 각 문자 인덱스 반복
        for char_idx in range(seq_length): # 각 문자 인덱스를 원핫 인코딩으로변환
            x_t = nn.functional.one_hot(input_seq[:, char_idx],
                                        num_classes = self.linear.in_features).float()
            hidden = self.activation(self.linear(x_t) + hidden)
        return hidden

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

def generate_dataset(seq_length, num_sample, vocab_size):
    inputs = torch.randint(1, vocab_size, (num_sample, seq_length))
    outputs = inputs.clone()

    return TensorDataset(inputs, outputs)

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.linear = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()

    def forward(self, input_seq):
        batch_size, seq_length = input_seq.size() # batch_size, seq_elngth
        hidden = torch.zeros(batch_size, self.hidden_size).to(device)

        for char_idx in range(seq_length):
            x_t = nn.functional.one_hot(input_seq[:, char_idx], num_classes = self.linear.in_features).float()
            hidden = self.activation(self.linear(x_t) + hidden)

        return hidden

class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.output_size = output_size

        # self.i2h = nn.Linear(input_size, hidden_size) # input -> hidden
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, target_seq, hidden):
        batch_size, seq_len = target_seq.size()
        outputs = torch.zeros(batch_size, seq_len, self.output_size).to(device)

        for char_idx in range(seq_len):
            if char_idx == 0:
                previous_y = torch.zeros(batch_size, self.input_size).to(device)
            else:
                y_prev = target_seq[:, char_idx - 1]
                previous_y = nn.functional.one_hot(y_prev, self.input_size).to(device).float()
            hidden = self.activation(self.linear1(previous_y) + hidden)
            output = self.linear2(hidden)

            outputs[:, char_idx, :] = output

        return outputs

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input_seq, target_seq):
        encoder_hidden = self.encoder(input_seq)
        decoder_output = self.decoder(target_seq, encoder_hidden)

        return decoder_output

def train_model(model, dataloader, criterion, optimizer, num_epochs, device):
    model.to(device)

    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0

        for inputs, targets in dataloader:
            # inputs.shape - batch_size, sequence_length
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()

            outputs = model(inputs, targets)
            outputs = outputs.view(-1, outputs.size(-1)) # batch_size * seq_length, output_size
            targets = targets.view(-1) # batch_size * seq_len

            loss = criterion(outputs, targets)

            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(dataloader)
        print(f'Epoch {epoch}, loss: {avg_loss}')

def evaluate_model(model, dataloader, device):
    model.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs, targets) # batch_size, seq_length, vocab_size

            predicted = torch.argmax(outputs, dim = 2)
            correct += (predicted == targets).sum().item()
            total += targets.size(0) * targets.size(1)
    acc = correct / total
    return acc

if __name__ == '__main__':
    seq_length = 10
    num_samples = 1000
    vocab_size = 5  # Including a padding index if needed
    hidden_size = 64
    batch_size = 32
    num_epochs = 20
    learning_rate = 0.001

    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # print(f"Using device: {device}")

    dataset = generate_dataset(seq_length, num_samples, vocab_size)
    dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = True)

    encoder = Encoder(input_size = vocab_size, hidden_size = hidden_size)
    decoder = Decoder(input_size = vocab_size, hidden_size = hidden_size, output_size = vocab_size)

    model = Seq2Seq(encoder, decoder).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr = learning_rate)

    train_model(model, dataloader, criterion, optimizer, num_epochs, device)

    acc = evaluate_model(model, dataloader, device)
    print(f"Training Accuracy: {acc * 100:.2f}%\n")

    with torch.no_grad():
        test_input, test_target = dataset[0]
        test_input = test_input.unsqueeze(0).to(device)
        test_target = test_target.unsqueeze(0).to(device)

        output = model(test_input, test_target)

        predicted = torch.argmax(output, dim = 2)
        print("Sample Input Sequence:   ", test_input.squeeze().tolist())
        print("Sample Target Sequence:  ", test_target.squeeze().tolist())
        print("Predicted Sequence       :  ", predicted.squeeze().tolist())


