## Hiểu về RNN , apply trong ng-gram

In [4]:
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

# N-gram Models Implementation
class NgramModel:
    def __init__(self, n):
        self.n = n
        self.context_counts = {}
        self.ngram_counts = {}
    
    def train(self, sequence):
        # Convert sequence to tuple for easier handling
        sequence = tuple(sequence)
        
        for i in range(len(sequence) - self.n + 1):
            context = sequence[i:i+self.n-1]
            next_token = sequence[i+self.n-1]
            
            # Update context counts
            if context in self.context_counts:
                self.context_counts[context] += 1
            else:
                self.context_counts[context] = 1
            
            # Update n-gram counts
            ngram = context + (next_token,)
            if ngram in self.ngram_counts:
                self.ngram_counts[ngram] += 1
            else:
                self.ngram_counts[ngram] = 1
    
    def get_probability(self, context, next_token):
        context = tuple(context)
        if context not in self.context_counts:
            return 0.0
        
        ngram = context + (next_token,)
        if ngram not in self.ngram_counts:
            return 0.0
        
        return self.ngram_counts[ngram] / self.context_counts[context]

# Simple RNN Implementation
class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden=None):
        # x shape: (batch_size, sequence_length, input_size)
        if hidden is None:
            hidden = torch.zeros(1, x.size(0), self.hidden_size)
        
        output, hidden = self.rnn(x, hidden)
        output = self.fc(output)
        return output, hidden



```mermaid
graph LR
    subgraph "Unigram Model"
        U["P(wt)"]
    end
    
    subgraph "Bigram Model"
        B1["wt-1"] --> B2["P(wt|wt-1)"]
    end
    
    subgraph "Trigram Model"
        T1["wt-2"] --> T2["wt-1"] --> T3["P(wt|wt-2,wt-1)"]
    end
    
    subgraph "RNN Model"
        R1["wt-n"] --> R2["hidden state"] --> R3["wt"]
        R2 --> R2
    end

```


In [5]:
# Generate synthetic sequence data
def generate_sine_wave(samples, sequence_length):
    x = np.linspace(0, 4*np.pi, samples)
    y = np.sin(x)
    
    # Create sequences
    sequences = []
    targets = []
    
    for i in range(len(y) - sequence_length):
        seq = y[i:i+sequence_length]
        target = y[i+1:i+sequence_length+1]
        sequences.append(seq)
        targets.append(target)
    
    return np.array(sequences), np.array(targets)

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Parameters
input_size = 1
hidden_size = 32
output_size = 1
sequence_length = 10
batch_size = 16
num_epochs = 100

# Generate data
sequences, targets = generate_sine_wave(1000, sequence_length)

In [6]:
print('sequences', sequences.size, targets.size, type(sequences))

sequences 9900 9900 <class 'numpy.ndarray'>


In [7]:
import torch.utils.data

# Convert to PyTorch tensors and reshape
X = torch.FloatTensor(sequences).reshape(-1, sequence_length, input_size)
y = torch.FloatTensor(targets).reshape(-1, sequence_length, output_size)

print('shape:', X.shape, y.shape)
# Create data loader
dataset = torch.utils.data.TensorDataset(X, y)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, loss function, and optimizer
model = SimpleRNN(input_size, hidden_size, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Training loop
losses = []
for epoch in range(num_epochs):
    epoch_loss = 0
    for batch_X, batch_y in dataloader:
        # Forward pass
        output, hidden = model(batch_X)
        print('batch_X size:', batch_X.size())
        print('output size:', output.size())
        print('hidden size:', hidden.size())
        loss = criterion(output, batch_y)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        print('epoch_loss:', epoch_loss)
    
    avg_loss = epoch_loss / len(dataloader)
    losses.append(avg_loss)
    
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

# Generate predictions
model.eval()
with torch.no_grad():
    test_sequence = X[:1]  # Take first sequence for testing
    pred_sequence, _ = model(test_sequence)
    
    # Convert to numpy for plotting
    true_sequence = y[0].numpy()
    pred_sequence = pred_sequence[0].numpy()


shape: torch.Size([990, 10, 1]) torch.Size([990, 10, 1])
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 0.5766623020172119
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 0.9505243599414825
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 1.406882256269455
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 1.8753823935985565
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 2.175409585237503
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: torch.Size([1, 16, 32])
epoch_loss: 2.3718900233507156
batch_X size: torch.Size([16, 10, 1])
output size: torch.Size([16, 10, 1])
hidden size: