<a href="https://colab.research.google.com/github/leburik12/Auto_Regressive_Deep_LSTM/blob/main/Auto_Regressive_Deep_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from typing import List, Tuple, Optional, Dict
import requests
import re
from collections import Counter
import math
import random

In [None]:
def fetch_gutenberg_text(url):
    try:
        response = requests.get(url, timeout=15)
        response.raise_for_status()
        return response.text
    except Exception:
        return None

def preprocess_text( text: str,
                    seq_length: int = 100,
                    batch_size: int = 32) -> Tuple[torch.Tensor, torch.Tensor, Dict]:
    """Convert text to sequences for RNN training."""
    # Create character vocabulary
    chars = sorted(list(set(text)))
    char_to_idx = {ch: i for i, ch in enumerate(chars)}
    idx_to_char = {i: ch for i, ch in enumerate(chars)}

    # Convert text to indices
    indices = [char_to_idx[ch] for ch in text]

    # Create sequences
    num_seqs = len(indices) // seq_length
    X = np.zeros((num_seqs, seq_length), dtype=np.int64)
    y = np.zeros((num_seqs, seq_length), dtype=np.int64)

    for i in range(num_seqs):
        start_idx = i * seq_length
        end_idx = start_idx + seq_length
        X[i] = indices[start_idx:end_idx]
        y[i] = indices[start_idx+1:end_idx+1]

    # Convert to PyTorch tensors
    X_tensor = torch.from_numpy(X)
    y_tensor = torch.from_numpy(y)

    # Create batches
    num_batches = num_seqs // batch_size
    X_batches = X_tensor[:num_batches*batch_size].view(batch_size, -1, seq_length)
    y_batches = y_tensor[:num_batches*batch_size].view(batch_size, -1, seq_length)

    return X_batches, y_batches, char_to_idx, idx_to_char

In [None]:
class LSTMCell(nn.Module):
    """Basic LSTM Cell from scratch."""

    def __init__(self, input_size: int, hidden_size: int):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Concatenated weights for the four gates: [i, f, g, o]
        self.weight_ih = nn.Parameter(torch.randn(4 * hidden_size, input_size) * 0.01)
        self.weight_hh = nn.Parameter(torch.randn(4 * hidden_size, hidden_size) * 0.01)
        self.bias_ih = nn.Parameter(torch.zeros(4 * hidden_size))
        self.bias_hh = nn.Parameter(torch.zeros(4 * hidden_size))

    def forward(self, x: torch.Tensor, state: tuple[torch.Tensor, torch.Tensor]) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass for a single timestep.

        Args:
            x: Input tensor of shape (batch_size, input_size)
            state: Tuple (h_prev, c_prev) where each is (batch_size, hidden_size)

        Returns:
            h_new: New hidden state (batch_size, hidden_size)
            (h_new, c_new): New state tuple for next timestep
        """
        h_prev, c_prev = state

        # Compute all gates in one matrix multiplication (efficient)
        gates = (torch.matmul(x, self.weight_ih.t()) + self.bias_ih) + \
                (torch.matmul(h_prev, self.weight_hh.t()) + self.bias_hh)

        # Split into four gates
        input_gate, forget_gate, cell_gate, output_gate = gates.chunk(4, dim=1)

        i = torch.sigmoid(input_gate)    # How much new info to add
        f = torch.sigmoid(forget_gate)   # How much old cell to keep
        g = torch.tanh(cell_gate)        # Candidate new cell values
        o = torch.sigmoid(output_gate)   # How much cell to expose as hidden

        c_new = f * c_prev + i * g        # Core: additive update to cell
        h_new = o * torch.tanh(c_new)     # Hidden is gated view of cell

        return h_new, c_new

    def init_state(self, batch_size: int) -> tuple[torch.Tensor, torch.Tensor]:
        """Initialize hidden and cell states with zeros."""
        device = self.weight_ih.device
        return (torch.zeros(batch_size, self.hidden_size, device=device),
                torch.zeros(batch_size, self.hidden_size, device=device))

In [None]:
class DeepRNNLayer(nn.Module):
    """A single layer of a Deep RNN that processes sequences."""

    def __init__(self, input_size: int, hidden_size: int,
                 cell_type: str = 'rnn', activation: str = 'tanh'):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.cell_type = cell_type

        if cell_type == 'rnn':
            self.cell = RNNCell(input_size, hidden_size, activation)
        elif cell_type == 'lstm':
            self.cell = LSTMCell(input_size, hidden_size)
        elif cell_type == 'gru':
            self.cell = nn.GRUCell(input_size, hidden_size)
        else:
            raise ValueError(f"Unsupported cell type: {cell_type}")

    def forward(self, x: torch.Tensor,
                h_prev: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass through the RNN layer.

        Args:
            x: Input tensor of shape (batch_size, seq_len, input_size)
            h_prev: Initial hidden state

        Returns:
            outputs: Sequence of hidden states (batch_size, seq_len, hidden_size)
            h_last: Last hidden state
        """
        batch_size, seq_len, _ = x.shape

        # Initialize hidden state if not provided
        if h_prev is None:
            if self.cell_type == 'lstm':
                h_prev = self.cell.init_state(batch_size)
            else:
                h_prev = self.cell.init_hidden(batch_size)

        # Process sequence
        outputs = []
        h_current = h_prev

        for t in range(seq_len):
          x_t = x[:, t, :]
          if self.cell_type == 'lstm':
            h, c = h_current
            h_new, c_new = self.cell(x_t, (h,c))
            h_current = (h_new, c_new)
            outputs.append(h_new)
          else:
            h_current = self.cell(x_t, h_current)
            outputs.append(h_current)

        outputs = torch.stack(outputs, dim=1)

        if self.cell_type == 'lstm':
          h_last = h_current
        else:
          h_last = h_current[0]

        return outputs, h_last

In [None]:
class DeepRNN(nn.Module):

  def __init__(self, input_size: int, hidden_size: int, num_layers: int,
               output_size: int, cell_type: str = 'rnn',
               activation: str = 'tanh', dropout: float = 0.0):
    super().__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.output_size = output_size
    self.cell_type = cell_type

    self.layers = nn.ModuleList()

    for i in range(num_layers):
      layer_input_size = input_size if i == 0 else hidden_size
      self.layers.append(
          DeepRNNLayer(layer_input_size, hidden_size, cell_type, activation)
      )

    # Dropout for regularization (applied between RNN layers)
    self.dropout = nn.Dropout(dropout) if dropout > 0.0 else None

    # Output layer
    self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x: torch.Tensor,
              hs: Optional[List[torch.Tensor]] = None) -> Tuple[torch.Tensor, List[torch.Tensor]]:

    batch_size, seq_len, _ = x.shape

    if hs is None:
      hs = [None] * self.num_layers

    hs_list = []
    current_input = x

    for i, layer in enumerate(self.layers):
      layer_output, h_last = layer(current_input, hs[i])

      if self.dropout is not None and i < self.num_layers - 1:
        layer_output = self.dropout(layer_output)

      current_input = layer_output
      hs_list.append(h_last)

    batch_size, seq_len, hidden_size = layer_output.shape
    outputs_flat = layer_output.reshape(-1, hidden_size)
    outputs_flat = self.fc(outputs_flat)

    outputs = outputs_flat.reshape(batch_size, seq_len, self.output_size)

    return outputs, hs_list


  def init_hidden(self, batch_size: int) -> List[torch.Tensor]:
    hs = []
    for layer in self.layers:
      if layer.cell_type == 'lstm':
        hs.append(layer.cell.init_state(batch_size))
      else:
        hs.append(layer.cell.init_hidden(batch_size))
    return hs

In [None]:
class TextDataset(torch.utils.data.Dataset):
    """Dataset for text generation tasks."""

    def __init__(self, text: str, seq_length: int = 100):
        self.text = text
        self.seq_length = seq_length
        self.chars = sorted(list(set(text)))
        self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)}
        self.idx_to_char = {i: ch for i, ch in enumerate(self.chars)}
        self.indices = [self.char_to_idx[ch] for ch in text]
        self.num_samples = len(self.indices) - seq_length

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        input_seq = self.indices[idx:idx + self.seq_length]
        target_seq = self.indices[idx + 1:idx + self.seq_length + 1]
        return torch.tensor(input_seq), torch.tensor(target_seq)

In [None]:
class DeepRNNTrainer:
    """Trainer for Deep RNN models."""

    def __init__(self, model: nn.Module, optimizer: torch.optim.Optimizer,
                 criterion: nn.Module, device: str = 'cpu'):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device

    def train_epoch(self, data_loader: torch.utils.data.DataLoader) -> float:
        """Train for one epoch."""
        self.model.train()
        total_loss = 0
        total_samples = 0

        for batch_idx, (inputs, targets) in enumerate(data_loader):
            inputs = inputs.to(self.device)
            targets = targets.to(self.device)

            # Add embedding dimension
            batch_size, seq_len = inputs.shape
            inputs_one_hot = F.one_hot(inputs, num_classes=self.model.input_size).float()

            # Forward pass
            self.optimizer.zero_grad()
            outputs, _ = self.model(inputs_one_hot)

            # Reshape for loss computation
            outputs = outputs.reshape(-1, self.model.output_size)
            targets = targets.reshape(-1)

            # Compute loss
            loss = self.criterion(outputs, targets)

            # Backward pass
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)

            self.optimizer.step()

            total_loss += loss.item() * targets.shape[0]
            total_samples += targets.shape[0]

        return total_loss / total_samples

    def generate_text(self, start_text: str, length: int = 100,
                     temperature: float = 1.0) -> str:
        """Generate text using the trained model."""
        self.model.eval()
        generated = start_text

        with torch.no_grad():
            # Convert start text to tensor
            input_seq = torch.tensor([
                self.model.fetcher.char_to_idx[ch]
                for ch in start_text[-self.model.seq_length:]]
            ).unsqueeze(0).to(self.device)

            hidden = self.model.init_hidden(1)

            for _ in range(length):
                # Prepare input
                input_one_hot = F.one_hot(input_seq[:, -1:],
                                        num_classes=self.model.input_size).float()

                # Forward pass
                output, hidden = self.model(input_one_hot, hidden)

                # Get probabilities
                output = output[:, -1, :] / temperature
                probabilities = F.softmax(output, dim=-1).cpu().numpy()[0]

                # Sample next character
                next_idx = np.random.choice(len(probabilities), p=probabilities)
                next_char = self.model.fetcher.idx_to_char[next_idx]

                generated += next_char

                # Update input sequence
                next_idx_tensor = torch.tensor([[next_idx]]).to(self.device)
                input_seq = torch.cat([input_seq[:, 1:], next_idx_tensor], dim=1)

        return generated

In [None]:
url = "https://www.gutenberg.org/files/100/100-0.txt"
text = fetch_gutenberg_text(url)

assert text is not None
print("Text length:", len(text))
print(f"Unique characters: {len(set(text))}")

Text length: 5359444
Unique characters: 100


In [None]:
MAX_CHARS = 10_000
text = text[:MAX_CHARS]

In [None]:
SEQ_LENGTH = 100
BATCH_SIZE = 32

dataset = TextDataset(text, seq_length=SEQ_LENGTH)
dataloader = torch.utils.data.DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)

In [None]:
# Model parameters
vocab_size = len(dataset.chars)
hidden_size = 128
num_layers = 2
cell_type = 'lstm'  # Using LSTM for better memory
dropout = 0.2

In [None]:
print(f"\nüß† Model Configuration:")
print(f"  Vocabulary size: {vocab_size}")
print(f"  Hidden size: {hidden_size}")
print(f"  Number of layers: {num_layers}")
print(f"  Cell type: {cell_type}")
print(f"  Sequence length: {SEQ_LENGTH}")


üß† Model Configuration:
  Vocabulary size: 73
  Hidden size: 128
  Number of layers: 2
  Cell type: lstm
  Sequence length: 100


In [None]:
# Initialize model
model = DeepRNN(
    input_size=vocab_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    output_size=vocab_size,
    cell_type=cell_type,
    dropout=dropout
)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# Initialize trainer
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
criterion = nn.CrossEntropyLoss()

trainer = DeepRNNTrainer(
        model=model,
        optimizer=optimizer,
        criterion=criterion,
        device=device
    )

In [None]:
model.fetcher = dataset
model.seq_length = SEQ_LENGTH

In [None]:
num_epochs = 20
print(f"\nüèãÔ∏è  Training for {num_epochs} epochs...")


üèãÔ∏è  Training for 20 epochs...


In [None]:
for epoch in range(num_epochs):
  loss = trainer.train_epoch(dataloader)

  if (epoch + 1) % 5 == 0:
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

    # Generate sample text
    start_prompts = [
        "The quantum field theory",
        "As a scientist at MIT,",
        "The fundamental law",
        "Our research demonstrates",
        "Mathematically, we can"
    ]

    prompt = random.choice(start_prompts)
    generated = trainer.generate_text(
                start_text=prompt,
                length=200,
                temperature=0.7
    )

    print(f"\nüß™ Generated Text (Epoch {epoch+1}):")
    print("-" * 50)
    print(generated)
    print("-" * 50)
    print()

Epoch 5/20, Loss: 0.4177

üß™ Generated Text (Epoch 5):
--------------------------------------------------
The fundamental lawest annoons on hid prome,
Who lets so fair a house fall to decay,
Which husbandry in honeer,
Who for that unf an the frape thou shouldst depart,
Leaving thearey thy beauty‚Äôs legease,
The sall that tho
--------------------------------------------------

Epoch 10/20, Loss: 0.1587

üß™ Generated Text (Epoch 10):
--------------------------------------------------
The fundamental law one hid sweets and steester-surd‚Äôst thou be distilled:
Make sweet some vial; treasure thou some place,
With beauty‚Äôs treasure ere it be self-killed:
That use is not forbidden usury,
Which happies tho
--------------------------------------------------

Epoch 15/20, Loss: 0.1251

üß™ Generated Text (Epoch 15):
--------------------------------------------------
As a scientist at MIT,
And see the brave day sunk in hideous night,
When I behold the violet past prime,
And sable curls a

In [None]:
z`