<a href="https://colab.research.google.com/github/karankulshrestha/ai-notebooks/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import numpy as np

In [13]:
def sigmoid(input, derivative=False):
  if derivative:
    return input * (1 - input)
  return 1 / (1 + np.exp(-np.clip(input, -500, 500)))


def tanh(input, derivative=False):
  if derivative:
    return 1 - input ** 2
  return np.tanh(input)


def softmax(x): # output activation
  exp_x = np.exp(x - np.max(x))
  return exp_x / np.sum(exp_x, axis=0, keepdims=True)

In [14]:
class LSTM:
  def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.learning_rate = learning_rate

    # weights for forget gate
    self.Wf = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
    self.bf = np.zeros((hidden_size, 1))

    # weights for input gate
    self.Wi = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
    self.bi = np.zeros((hidden_size, 1))

    # weights for candidate cell state
    self.Wc = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 # Changed from 0.0 to 0.01
    self.bc = np.zeros((hidden_size, 1))

    # weights for output gate
    self.Wo = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 # Corrected dimensions
    self.bo = np.zeros((hidden_size, 1))

    # Initialize weights for final output
    self.Wy = np.random.randn(output_size, hidden_size) * 0.01
    self.by = np.zeros((output_size, 1))

  def forward(self, x, h_prev, c_prev):
    """
    Forward pass for one time step
    x: input at current time step (input_size, 1)
    h_prev: hidden state from previous time step (hidden_size, 1)
    c_prev: cell state from previous time step (hidden_size, 1)
    """

    # concatenate the input and prev hidden state
    concat = np.vstack((h_prev, x))

    # forget gate
    f = sigmoid(np.dot(self.Wf, concat) + self.bf)

    # input gate
    i = sigmoid(np.dot(self.Wi, concat) + self.bi)

    # candidate cell state
    c_tilde = tanh(np.dot(self.Wc, concat) + self.bc)

    # update cell state
    c = f * c_prev + i * c_tilde

    # output gate
    o = sigmoid(np.dot(self.Wo, concat) + self.bo)

    # update hidden state
    h = o * tanh(c)

    # Final output (logits)
    y_logits = np.dot(self.Wy, h) + self.by

    y = softmax(y_logits)

    # store values for backward pass
    cache = {
        'x': x, 'h_prev': h_prev, 'c_prev': c_prev,
        'concat': concat, 'f': f, 'i': i, 'c_tilde': c_tilde,
        'c':c, 'o': o, 'h': h, 'y': y
    }

    return y, h, c, cache


  def backward(self, dy, dh_next, dc_next, cache):
      """
        Backward pass for one time step
        dy: gradient of loss with respect to output
        dh_next: gradient from next time step
        dc_next: gradient of cell state from next time step
      """
      x = cache['x']
      h_prev = cache['h_prev']
      c_prev = cache['c_prev']
      concat = cache['concat']
      f = cache['f']
      i = cache['i']
      c_tilde = cache['c_tilde']
      c = cache['c']
      o = cache['o']
      h = cache['h']
      y = cache['y']

      # gradients of output layer
      dWy = np.dot(dy, h.T)
      dby = dy
      dh = np.dot(self.Wy.T, dy) + dh_next

      # gradient of output gate
      do = dh * tanh(c)
      do_input = sigmoid(o, derivative=True) * do

      dWo = np.dot(do, concat.T)
      dbo = do_input

      # Gradient of cell state
      dc = dh * o * tanh(tanh(c), derivative=True) + dc_next

      # Gradient of candidate cell state
      dc_tilde = dc * i
      dc_tilde_input = tanh(c_tilde, derivative=True) * dc_tilde

      dWc = np.dot(dc_tilde_input, concat.T)
      dbc = dc_tilde_input

      # Gradient of input gate
      di = dc * c_tilde
      di_input = sigmoid(i, derivative=True) * di

      dWi = np.dot(di_input, concat.T)
      dbi = di_input

      # Gradient of forget gate
      df = dc * c_prev
      df_input = sigmoid(f, derivative=True) * df

      dWf = np.dot(df_input, concat.T)
      dbf = df_input

      # sum of the gradient flow back through each gate
      dconcat = (np.dot(self.Wf.T, df_input) +
                  np.dot(self.Wi.T, di_input) +
                  np.dot(self.Wc.T, dc_tilde_input) +
                  np.dot(self.Wo.T, do_input))

      # Split gradient for input and hidden_prev
      dh_prev = dconcat[:self.hidden_size, :]
      dx = dconcat[self.hidden_size:, :]

      # Gradient for previous cell state
      dc_prev = dc * f

      gradients = {
            'dWf': dWf, 'dbf': dbf,
            'dWi': dWi, 'dbi': dbi,
            'dWc': dWc, 'dbc': dbc,
            'dWo': dWo, 'dbo': dbo,
            'dWy': dWy, 'dby': dby
        }

      return dx, dh_prev, dc_prev, gradients

  def train(self, X, Y, epochs=100, seq_length=25):
     """
        Train the LSTM with sequence batching
        X: input sequence of one-hot encoded characters (seq_length, vocab_size)
        Y: target sequence of one-hot encoded characters (seq_length, vocab_size)
        seq_length: length of subsequences for backpropagation through time
     """
     n_sequences = len(X) // seq_length

     for epoch in range(epochs):
        total_loss = 0

        for seq_idx in range(n_sequences):
          # Get Subsequence
          start_idx = seq_idx * seq_length
          end_idx = start_idx + seq_length

          # Initialize the hidden and cell states for this subsequence
          h = np.zeros((self.hidden_size, 1))
          c = np.zeros((self.hidden_size, 1))

          caches = []
          loss = 0

          # Forward pass through subsequence
          for t in range(start_idx, min(end_idx, len(X))):
            x = X[t].reshape(-1, 1)
            y_target = Y[t].reshape(-1, 1) # convert it into column vector

            y_pred, h, c, cache = self.forward(x, h, c)
            caches.append(cache)

            # Calculate loss (cross-entropy)
            loss += -np.sum(y_target * np.log(y_pred + 1e-8))

          # Backward pass for this subsequence
          dh_next = np.zeros((self.hidden_size, 1))
          dc_next = np.zeros((self.hidden_size, 1))

          # Accumulate gradients
          grads = {
              'dWf': np.zeros_like(self.Wf), 'dbf': np.zeros_like(self.bf),
              'dWi': np.zeros_like(self.Wi), 'dbi': np.zeros_like(self.bi),
              'dWc': np.zeros_like(self.Wc), 'dbc': np.zeros_like(self.bc),
              'dWo': np.zeros_like(self.Wo), 'dbo': np.zeros_like(self.bo),
              'dWy': np.zeros_like(self.Wy), 'dby': np.zeros_like(self.by)
          }

          for t in reversed(range(len(caches))):
            y_target = Y[start_idx + t].reshape(-1, 1) # fetches the correct answer
            y_pred = caches[t]['y'] # fetches the prediction answer

            # Gradient of loss (cross-entropy with softmax)
            dy = y_pred - y_target

            dx, dh_next, dc_next, step_grads = self.backward(dy, dh_next, dc_next, caches[t])

            # Accumulate gradients
            for key in grads.keys():
              grads[key] += step_grads[key]


          # Clip gradients to prevent exploding gradients
          for key in grads.keys():
              grads[key] = np.clip(grads[key], -5, 5)



          # Update weights after each subsequence
          self.Wf -= self.learning_rate * grads['dWf']
          self.bf -= self.learning_rate * grads['dbf']
          self.Wi -= self.learning_rate * grads['dWi']
          self.bi -= self.learning_rate * grads['dbi']
          self.Wc -= self.learning_rate * grads['dWc']
          self.bc -= self.learning_rate * grads['dbc']
          self.Wo -= self.learning_rate * grads['dWo']
          self.bo -= self.learning_rate * grads['dbo']
          self.Wy -= self.learning_rate * grads['dWy']
          self.by -= self.learning_rate * grads['dby']

          total_loss += loss

        if epoch % 10 == 0:
              avg_loss = total_loss / (n_sequences * seq_length) if n_sequences > 0 else 0
              print(f"Epoch {epoch}, Loss: {avg_loss:.6f}")


  def generate_text(self, seed_char_idx, char_to_idx, idx_to_char, num_chars=50, temperature=1.0):
      """
      Generate text starting from a seed character
      temperature: controls randomness (higher = more random, lower = more deterministic)
      """
      h = np.zeros((self.hidden_size, 1))
      c = np.zeros((self.hidden_size, 1))

      generated = [idx_to_char[seed_char_idx]]
      current_idx = seed_char_idx

      for _ in range(num_chars):
          # Create one-hot encoding
          x = np.zeros((self.output_size, 1))
          x[current_idx] = 1

          # Forward pass
          y_pred, h, c, _ = self.forward(x, h, c)

          # Apply temperature and sample
          y_pred = y_pred.flatten()
          y_pred = np.log(y_pred + 1e-8) / temperature
          y_pred = np.exp(y_pred) / np.sum(np.exp(y_pred))

          # Sample next character based on probability distribution
          current_idx = np.random.choice(len(y_pred), p=y_pred)
          generated.append(idx_to_char[current_idx])

      return ''.join(generated)

In [15]:
def prepare_char_data(text):

  unique_chars = sorted(set(text))
  vocab_size = len(unique_chars)

  # Create character to index and index to character mappings
  char_to_idx = {char: idx for idx, char in enumerate(unique_chars)}
  idx_to_char = {idx: char for char, idx in char_to_idx.items()}

  # convert chars to indices
  char_indices = [char_to_idx[char] for char in text]

  return char_indices, char_to_idx, idx_to_char, vocab_size


def create_sequences(char_indices, vocab_size):
    """
    Create input-output sequences for training
    X: current character, Y: next character
    """
    X = []
    Y = []

    for i in range(len(char_indices) - 1):
        # Create one-hot encoding for input
        x_onehot = np.zeros(vocab_size)
        x_onehot[char_indices[i]] = 1
        X.append(x_onehot)

        # Create one-hot encoding for target
        y_onehot = np.zeros(vocab_size)
        y_onehot[char_indices[i + 1]] = 1
        Y.append(y_onehot)

    return np.array(X), np.array(Y)

In [16]:
with open('shakespeare.txt', 'r', encoding='utf-8') as f:
    text = f.read()


print(f"Training on {len(text)} characters from Shakespeare")
print(f"Preview: {text[:100]}...\n")

Training on 7280 characters from Shakespeare
Preview: First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You...



In [17]:
char_indices, char_to_idx, idx_to_char, vocab_size = prepare_char_data(text)

X, Y = create_sequences(char_indices, vocab_size)

print(f"Vocab size: {vocab_size}, Sequences: {len(X)}\n")

Vocab size: 56, Sequences: 7279



In [18]:
lstm = LSTM(vocab_size, hidden_size=256, output_size=vocab_size, learning_rate=0.005)
print("Training (this may take a while)...\n")
lstm.train(X, Y, epochs=140, seq_length=50)

Training (this may take a while)...

Epoch 0, Loss: 3.601581
Epoch 10, Loss: 2.972125
Epoch 20, Loss: 2.419098
Epoch 30, Loss: 2.083551
Epoch 40, Loss: 1.855463
Epoch 50, Loss: 1.614191
Epoch 60, Loss: 1.403803
Epoch 70, Loss: 1.153284
Epoch 80, Loss: 0.961250
Epoch 90, Loss: 0.738477
Epoch 100, Loss: 0.553191
Epoch 110, Loss: 0.410282
Epoch 120, Loss: 0.200406
Epoch 130, Loss: 0.157140


In [23]:
# Generate text
print("\n" + "="*60)
print("Generated text (temperature=0.8):\n")
text = lstm.generate_text(char_to_idx['F'], char_to_idx, idx_to_char, num_chars=100, temperature=0.1)
print(text)


Generated text (temperature=0.8):

First Cithzens
One tha goon'st the beall and their counts
The bell, and we'll have cornted it plors, 
