In [43]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

def generate_dataset(seq_length, num_samples, vocab_size): # vocab_size:
    inputs = torch.randint(1, vocab_size, (num_samples, seq_length))
    outputs = inputs.clone()
    return TensorDataset(inputs, outputs)

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.linear = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()
    
    def forward(self, input_seq):
        batch_size, seq_length = input_seq.size() # batch_size, seq_length
        hidden = torch.zeros(batch_size, self.hidden_size)
        
        for char_idx in range(seq_length):
            x_t = nn.functional.one_hot(input_seq[:, char_idx], num_classes = self.linear.in_features).float()
            hidden = self.activation(self.linear(x_t) + hidden)
        return hidden

class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.output_size = output_size
        
        # self.i2h = nn.Linear(input_size, hidden_size) # input -> hidden
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()
        self.linear2 = nn.Linear(hidden_size, output_size)
        
    def forward(self, target_seq, hidden):
        batch_size, seq_len = target_seq.size()
        outputs = torch.zeros(batch_size, seq_len, self.output_size)
        
        for char_idx in range(seq_len):
            if char_idx == 0:
                previous_y = torch.zeros(batch_size, self.input_size)
            else:
                y_prev = target_seq[:, char_idx -1]
                previous_y = nn.functional.one_hot(y_prev, self.input_size).float()
            hidden = self.activation(self.linear1(previous_y) + hidden)
            output = self.linear2(hidden)
            outputs[:, char_idx, :] = output
        return outputs

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input_seq, target_seq):
        encoder_hidden = self.encoder(input_seq)
        decoder_output = self.decoder(target_seq, encoder_hidden)
        return decoder_output

def train_model(model, dataloader, criterion, optimizer, num_epochs, device):
    model.to(device)
    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        for inputs, targets in dataloader:
            # inputs.shape - batch_size, sequence_length
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            
            outputs = model(inputs, targets)
            outputs = outputs.view(-1, outputs.size(-1)) # batch_size * seq_size, output_size
            targets = targets.view(-1) # batch_size * seq_len
            loss = criterion(outputs, targets)
            
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_loss = epoch_loss / len(dataloader)
        print(f'Epoch [{epoch}/{num_epochs}], loss: {avg_loss}')

def evaluate_model(model, dataloader, device):
    model.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs, targets) # batch_size, seq_length, vocab_size

            predicted = torch.argmax(outputs, dim = 2)
            correct += (predicted == targets).sum().item()
            total += targets.size(0) * targets.size(1)
    acc = correct / total
    return acc

if __name__ == '__main__':
    seq_length = 10
    num_samples = 1000
    vocab_size = 5  # Including a padding index if needed
    hidden_size = 64
    batch_size = 32
    num_epochs = 20
    learning_rate = 0.001

    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # print(f"Using device: {device}")

    dataset = generate_dataset(seq_length, num_samples, vocab_size)
    dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = True)

    encoder = Encoder(input_size = vocab_size, hidden_size = hidden_size)
    decoder = Decoder(input_size = vocab_size, hidden_size = hidden_size, output_size = vocab_size)

    model = Seq2Seq(encoder, decoder).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr = learning_rate)

    train_model(model, dataloader, criterion, optimizer, num_epochs, device)

    acc = evaluate_model(model, dataloader, device)
    print(f"Training Accuracy: {acc * 100:.2f}%\n")
    
    with torch.no_grad():
        test_input, test_target = dataset[0]
        test_input = test_input.unsqueeze(0).to(device)
        test_target = test_target.unsqueeze(0).to(device)

        output = model(test_input, test_target)

        predicted = torch.argmax(output, dim = 2)
        print("Sample Input Sequence:   ", test_input.squeeze().tolist())
        print("Sample Target Sequence:  ", test_target.squeeze().tolist())
        print("Predicted Sequence       :  ", predicted.squeeze().tolist())
    
    for x, y in dataset:
        print(x, y)
        break

Epoch [1/20], loss: 1.5230573676526546
Epoch [2/20], loss: 1.411104254424572
Epoch [3/20], loss: 1.3869245052337646
Epoch [4/20], loss: 1.3727898225188255
Epoch [5/20], loss: 1.3576303720474243
Epoch [6/20], loss: 1.344936266541481
Epoch [7/20], loss: 1.3282239697873592
Epoch [8/20], loss: 1.311855487525463
Epoch [9/20], loss: 1.2975837476551533
Epoch [10/20], loss: 1.282768189907074
Epoch [11/20], loss: 1.2677641957998276
Epoch [12/20], loss: 1.2532121017575264
Epoch [13/20], loss: 1.2403131611645222
Epoch [14/20], loss: 1.229046680033207
Epoch [15/20], loss: 1.2158268615603447
Epoch [16/20], loss: 1.2040522210299969
Epoch [17/20], loss: 1.1925436593592167
Epoch [18/20], loss: 1.1816879659891129
Epoch [19/20], loss: 1.171809360384941
Epoch [20/20], loss: 1.1587859317660332
Training Accuracy: 53.62%

Sample Input Sequence:    [2, 2, 2, 3, 2, 4, 4, 1, 3, 4]
Sample Target Sequence:   [2, 2, 2, 3, 2, 4, 4, 1, 3, 4]
Predicted Sequence       :   [2, 2, 4, 4, 4, 4, 4, 4, 4, 4]
tensor([2, 2, 

RNN의 파라미터 수를 계산하는 방법에 대해 설명해 드리겠습니다. 입력층과 은닉층 사이의 파라미터를 중심으로 설명하겠습니다.
RNN 파라미터 구조
RNN의 기본 구조에서 입력층과 은닉층 사이의 파라미터는 다음과 같습니다:
입력 가중치 (Wx)
은닉 상태 가중치 (Wh)
편향 (b)
파라미터 수 계산
입력 가중치 (Wx)
크기: $D_h \times d$
$D_h$: 은닉 상태의 크기
$d$: 입력 벡터의 차원
파라미터 수: $D_h \times d$
은닉 상태 가중치 (Wh)
크기: $D_h \times D_h$
파라미터 수: $D_h \times D_h$
편향 (b)
크기: $D_h \times 1$
파라미터 수: $D_h$
총 파라미터 수
총 파라미터 수는 위의 세 가지 파라미터를 합한 것입니다:
(
D
h
×
d
)
+
(
D
h
×
D
h
)
+
D
h
(D 
h
​
 ×d)+(D 
h
​
 ×D 
h
​
 )+D 
h
​
 
예시 계산
입력 벡터의 차원($d$)이 4이고, 은닉 상태의 크기($D_h$)가 5인 경우:
Wx: $5 \times 4 = 20$
Wh: $5 \times 5 = 25$
b: $5$
총 파라미터 수: $20 + 25 + 5 = 50$
이렇게 RNN의 입력층과 은닉층 사이의 파라미터 수를 계산할 수 있습니다. 이 구조는 시간에 따라 같은 가중치를 공유하므로, 시간 단계가 늘어나도 파라미터 수는 변하지 않습니다

In [24]:
input_sequence = [torch.randint(1, 4, (5,)) for _ in range(4)]
input_seq = torch.stack(input_sequence)

print(input_seq)
print(input_seq.shape)
print(input_seq[:, 3])
one_hot = nn.functional.one_hot(input_seq[:, 3], 3)
print(one_hot.shape)
print(one_hot)
# 1 2 3
# [1, 0, 0]
# [0, 1, 0]
# [0, 0, 1]
# X_t = nn.functional.one_hot(input_seq[:, char_idx],
                            # num_calsses = self.linear.in_features).float()

tensor([[2, 3, 1, 1, 1],
        [3, 3, 1, 2, 3],
        [1, 1, 2, 3, 2],
        [1, 2, 3, 3, 2]])
torch.Size([4, 5])
tensor([1, 2, 3, 3])


RuntimeError: Class values must be smaller than num_classes.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

def generate_dataset(seq_length, num_samples, vocab_size):
    """
    Generates a synthetic dataset where each input sequence is identical to the output sequence.

    Args:
        seq_length (int): The length of each sequence.
        num_samples (int): The number of samples in the dataset.
        vocab_size (int): The size of the vocabulary (number of unique tokens).

    Returns:
        TensorDataset: A dataset containing input and output sequences.
    """
    # Generate random integers between 1 and vocab_size-1 for input sequences
    inputs = torch.randint(1, vocab_size, (num_samples, seq_length))
    outputs = inputs.clone()  # Output is the same as input
    return TensorDataset(inputs, outputs)

class Encoder(nn.Module):
    """
    RNN Encoder implemented with nn.Linear layers.

    Architecture:
        Input -> Linear -> Tanh -> Hidden State
        Repeats for each time step.

        x_t ----> [Linear] ----> [Tanh] ----> h_t

    Model Formula:
        h_t = tanh(W_x * x_t + W_h * h_{t-1} + b)

    Args:
        input_size (int): Size of input features (vocab size).
        hidden_size (int): Size of hidden state.
    """
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.linear = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()

    def forward(self, input_seq):
        """
        Forward pass for the encoder.

        Args:
            input_seq (Tensor): Input sequence tensor of shape (batch, seq_len).

        Returns:
            Tensor: Final hidden state tensor of shape (batch, hidden_size).
        """
        batch_size, seq_len = input_seq.size()
        # Initialize hidden state to zeros
        hidden = torch.zeros(batch_size, self.hidden_size)
        for t in range(seq_len):
            # One-hot encode input tokens
            x_t = nn.functional.one_hot(input_seq[:, t], num_classes=self.linear.in_features).float()
            hidden = self.activation(self.linear(x_t) + hidden)
        return hidden

class Decoder(nn.Module):
    """
    RNN Decoder implemented with nn.Linear layers.

    Architecture:
        Input -> Linear -> Tanh -> Hidden State -> Linear -> Output
        Repeats for each time step.

        y_{t-1} ----> [Linear] ----> [Tanh] ----> h_t ----> [Linear] ----> y_t

    Model Formula:
        h_t = tanh(W_x * y_{t-1} + W_h * h_{t-1} + b)
        y_t = W_o * h_t + b_o

    Args:
        input_size (int): Size of input features (vocab size).
        hidden_size (int): Size of hidden state.
        output_size (int): Size of output features (vocab size).
    """
    def __init__(self, input_size, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.activation = nn.Tanh()
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, target_seq, hidden):
        """
        Forward pass for the decoder.

        Args:
            target_seq (Tensor): Target sequence tensor of shape (batch, seq_len).
            hidden (Tensor): Hidden state tensor of shape (batch, hidden_size).

        Returns:
            Tensor: Output logits of shape (batch, seq_len, output_size).
        """
        batch_size, seq_len = target_seq.size()
        outputs = torch.zeros(batch_size, seq_len, self.linear2.out_features)
        for t in range(seq_len):
            # During training, use teacher forcing: input is the actual target token
            if t == 0:
                # At t=0, use a start-of-sequence token (assuming index 0)
                y_t_minus_1 = torch.zeros(batch_size, self.linear1.in_features, device=target_seq.device)
            else:
                y_prev = target_seq[:, t-1]
                y_t_minus_1 = nn.functional.one_hot(y_prev, num_classes=self.linear1.in_features).float()
            hidden = self.activation(self.linear1(y_t_minus_1) + hidden)
            output = self.linear2(hidden)
            outputs[:, t, :] = output
        return outputs

class Seq2Seq(nn.Module):
    """
    Sequence-to-Sequence model integrating Encoder and Decoder.

    Architecture:
        Encoder -> Decoder

    Args:
        encoder (nn.Module): Encoder module.
        decoder (nn.Module): Decoder module.
    """
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, input_seq, target_seq):
        """
        Forward pass for the Seq2Seq model.

        Args:
            input_seq (Tensor): Input sequence tensor of shape (batch, seq_len).
            target_seq (Tensor): Target sequence tensor of shape (batch, seq_len).

        Returns:
            Tensor: Output logits of shape (batch, seq_len, output_size).
        """
        encoder_hidden = self.encoder(input_seq)
        decoder_output = self.decoder(target_seq, encoder_hidden)
        return decoder_output

def train_model(model, dataloader, criterion, optimizer, num_epochs, device):
    """
    Trains the Seq2Seq model.

    Args:
        model (nn.Module): The Seq2Seq model to train.
        dataloader (DataLoader): DataLoader for training data.
        criterion (nn.Module): Loss function.
        optimizer (optim.Optimizer): Optimizer for updating model parameters.
        num_epochs (int): Number of training epochs.
        device (torch.device): Device to run the training on (CPU or GPU).

    Returns:
        None
    """
    model.to(device)
    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            # Forward pass
            outputs = model(inputs, targets)
            # Reshape outputs and targets for loss computation
            outputs = outputs.view(-1, outputs.size(-1))  # (batch * seq_len, output_size)
            targets = targets.view(-1)  # (batch * seq_len)
            loss = criterion(outputs, targets)
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_loss = epoch_loss / len(dataloader)
        print(f"Epoch [{epoch}/{num_epochs}], Loss: {avg_loss:.4f}")

def evaluate_accuracy(model, dataloader, device):
    """
    Evaluates the accuracy of the Seq2Seq model.

    Args:
        model (nn.Module): The Seq2Seq model to evaluate.
        dataloader (DataLoader): DataLoader for evaluation data.
        device (torch.device): Device to run the evaluation on (CPU or GPU).

    Returns:
        float: Accuracy of the model on the evaluation data.
    """
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs, targets)
            # Get predicted tokens
            predicted = torch.argmax(outputs, dim=2)
            # Compare with targets
            correct += (predicted == targets).sum().item()
            total += targets.numel()
    accuracy = correct / total
    return accuracy

# Example usage and Training Code
if __name__ == "__main__":
    # Hyperparameters
    seq_length = 10
    num_samples = 1000
    vocab_size = 50  # Including a padding index if needed
    hidden_size = 64
    batch_size = 32
    num_epochs = 20
    learning_rate = 0.001

    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Generate dataset
    dataset = generate_dataset(seq_length, num_samples, vocab_size)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Initialize Encoder and Decoder
    encoder = Encoder(input_size=vocab_size, hidden_size=hidden_size)
    decoder = Decoder(input_size=vocab_size, hidden_size=hidden_size, output_size=vocab_size)
    model = Seq2Seq(encoder, decoder)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Train the model
    print("Starting training...")
    train_model(model, dataloader, criterion, optimizer, num_epochs, device)
    print("Training completed.\n")

    # Evaluate accuracy on the training set
    accuracy = evaluate_accuracy(model, dataloader, device)
    print(f"Training Accuracy: {accuracy * 100:.2f}%\n")

    # Sample Output after Training
    # Let's test the model on a sample input
    with torch.no_grad():
        test_input, test_target = dataset[0]
        test_input = test_input.unsqueeze(0).to(device)  # (1, seq_len)
        test_target = test_target.unsqueeze(0).to(device)  # (1, seq_len)
        output = model(test_input, test_target)
        predicted = torch.argmax(output, dim=2)
        print("Sample Input Sequence:   ", test_input.squeeze().tolist())
        print("Sample Target Sequence:  ", test_target.squeeze().tolist())
        print("Predicted Sequence       :  ", predicted.squeeze().tolist())


Using device: cpu
Starting training...
Epoch [1/20], Loss: 3.9003
Epoch [2/20], Loss: 3.8393
Epoch [3/20], Loss: 3.7734
Epoch [4/20], Loss: 3.6947
Epoch [5/20], Loss: 3.6141
Epoch [6/20], Loss: 3.5328
Epoch [7/20], Loss: 3.4544
Epoch [8/20], Loss: 3.3807
Epoch [9/20], Loss: 3.3112
Epoch [10/20], Loss: 3.2445
Epoch [11/20], Loss: 3.1801
Epoch [12/20], Loss: 3.1218
Epoch [13/20], Loss: 3.0658
Epoch [14/20], Loss: 3.0160
Epoch [15/20], Loss: 2.9699
Epoch [16/20], Loss: 2.9272
Epoch [17/20], Loss: 2.8868
Epoch [18/20], Loss: 2.8503
Epoch [19/20], Loss: 2.8156
Epoch [20/20], Loss: 2.7828
Training completed.

Training Accuracy: 26.06%

Sample Input Sequence:    [6, 3, 11, 14, 22, 46, 35, 43, 42, 18]
Sample Target Sequence:   [6, 3, 11, 14, 22, 46, 35, 43, 42, 18]
Predicted Sequence       :   [42, 35, 35, 42, 42, 18, 42, 42, 42, 18]
