<a href="https://colab.research.google.com/github/PhatHuynhTranSon99/Neural-Networks-From-Scratch/blob/main/Long_Short_Term_Memory_From_Scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LSTM Unit from scratch (using only NumPy)

## Import library

In [1]:
import numpy as np
import random

## Utility function

In [32]:
def sigmoid(x):
  return 1 / (1 + np.exp(-x))

def tanh(x):
  return np.tanh(x)

## Dataset creation

In [3]:
# Create a dataset for sentiment analysis
train_data = {
  'good': True,
  'bad': False,
  'happy': True,
  'sad': False,
  'not good': False,
  'not bad': True,
  'not happy': False,
  'not sad': True,
  'very good': True,
  'very bad': False,
  'very happy': True,
  'very sad': False,
  'i am happy': True,
  'this is good': True,
  'i am bad': False,
  'this is bad': False,
  'i am sad': False,
  'this is sad': False,
  'i am not happy': False,
  'this is not good': False,
  'i am not bad': True,
  'this is not sad': True,
  'i am very happy': True,
  'this is very good': True,
  'i am very bad': False,
  'this is very sad': False,
  'this is very happy': True,
  'i am good not bad': True,
  'this is good not bad': True,
  'i am bad not good': False,
  'i am good and happy': True,
  'this is not good and not happy': False,
  'i am not at all good': False,
  'i am not at all bad': True,
  'i am not at all happy': False,
  'this is not at all sad': True,
  'this is not at all happy': False,
  'i am good right now': True,
  'i am bad right now': False,
  'this is bad right now': False,
  'i am sad right now': False,
  'i was good earlier': True,
  'i was happy earlier': True,
  'i was bad earlier': False,
  'i was sad earlier': False,
  'i am very bad right now': False,
  'this is very good right now': True,
  'this is very sad right now': False,
  'this was bad earlier': False,
  'this was very good earlier': True,
  'this was very bad earlier': False,
  'this was very happy earlier': True,
  'this was very sad earlier': False,
  'i was good and not bad earlier': True,
  'i was not good and not happy earlier': False,
  'i am not at all bad or sad right now': True,
  'i am not at all good or happy right now': False,
  'this was not happy and not good earlier': False,
}

test_data = {
  'this is happy': True,
  'i am good': True,
  'this is not happy': False,
  'i am not good': False,
  'this is not bad': True,
  'i am not sad': True,
  'i am very good': True,
  'this is very bad': False,
  'i am very sad': False,
  'this is bad not good': False,
  'this is good and happy': True,
  'i am not good and not happy': False,
  'i am not at all sad': True,
  'this is not at all good': False,
  'this is not at all bad': True,
  'this is good right now': True,
  'this is sad right now': False,
  'this is very bad right now': False,
  'this was good earlier': True,
  'i was not happy and not good earlier': False,
}

In [4]:
# Firstly, calculate the vocabulary size of training and test set
word_to_index = {}
current_index = 0

for sentence in train_data:
  # Split sentences to get words
  words = sentence.split()

  # Put into word to index
  for word in words:
    if word not in word_to_index:
      word_to_index[word] = current_index
      current_index += 1

for sentence in test_data:
  # Split sentences to get words
  words = sentence.split()

  # Put into word to index
  for word in words:
    if word not in word_to_index:
      word_to_index[word] = current_index
      current_index += 1

# Function to one-hot-encode word
def encode(word, word_to_index, vocab_size):
  # Create a numpy array
  encoding = np.zeros(vocab_size)

  # Place 1 into position of word
  encoding[word_to_index[word]] = 1

  return encoding

# Build the dataset from each sentences:
def build_dataset(data, word_to_index, vocab_size):
  # Initialize an array
  dataset = []

  # For each sentence
  for sentence in data:
    # Initialize current X and y
    current_X = []
    
    # Get the label 
    label = data[sentence]
    current_y = int(label)

    # Split into words
    words = sentence.split()

    # One-hot-encode each word and put it in the database
    for word in words:
      current_X.append(encode(word, word_to_index, vocab_size))

    # Then add X and Y into dataset
    dataset.append((current_X, current_y))

  return dataset

# Create datasets for training and testing
train_dataset = build_dataset(train_data, word_to_index, vocab_size=current_index)
test_dataset = build_dataset(test_data, word_to_index, vocab_size=current_index)

# Define dimensions
input_size = len(word_to_index)
hidden_size = 64

In [5]:
# Check the training dataset
input, label = train_dataset[20]
# Display
print("X: ", input)
print("Y: ", label)

X:  [array([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0.]), array([0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0.]), array([0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0.]), array([0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0.])]
Y:  1


# Define the layers

### Logistic Regression layer

In [6]:
class LogisticRegression:
  def __init__(self, input_size, learning_rate):
    self.input_size = input_size
    self.learning_rate = learning_rate
    self.u = np.random.randn(input_size)
    self.b_y = 0

  def forward(self, h):
    self.h = h;
    z = self.u.dot(h) + self.b_y
    y_hat = sigmoid(z)
    self.y_hat = y_hat
    return y_hat

  def backward(self, y):
    dz = self.y_hat - y
    du = dz * self.h
    dh = dz * self.u
    db_y = dz

    self.u -= self.learning_rate * du
    self.b_y -= self.learning_rate * db_y

    return dh

### LSTM layer

In [49]:
class LongShortTermMemory:
  def __init__(self, input_size, hidden_size, learning_rate=0.01):
    # Save the dimension
    self.input_size = input_size
    self.hidden_size = hidden_size

    # Save learning rate
    self.learning_rate = 0.01

    # Initialize the parameters
    self.W_f = np.random.randn(hidden_size, hidden_size) / 1000
    self.U_f = np.random.randn(hidden_size, input_size) / 1000
    self.b_f = np.zeros((hidden_size,))

    self.W_i = np.random.randn(hidden_size, hidden_size) / 1000
    self.U_i = np.random.randn(hidden_size, input_size) / 1000
    self.b_i = np.zeros((hidden_size,))

    self.W_o = np.random.randn(hidden_size, hidden_size) / 1000
    self.U_o = np.random.randn(hidden_size, input_size) / 1000
    self.b_o = np.zeros((hidden_size,))

    self.W_c = np.random.randn(hidden_size, hidden_size) / 1000
    self.U_c = np.random.randn(hidden_size, input_size) / 1000
    self.b_c = np.zeros((hidden_size,))

  def forward(self, x):
    # Initialize h_0 and c_0
    h_0 = np.zeros((self.hidden_size,))
    c_0 = np.zeros((self.hidden_size,))

    # Cache the input 
    self.x_cache = x

    # Create a cache the save the result of the layers
    self.state_cache = []
    self.state_cache.append(
        {
            "a_t": None,
            "f_t": None,
            "b_t": None,
            "i_t": None,
            "d_t": None,
            "o_t": None,
            "e_t": None,
            "c_t_candidate": None,
            "c_t": c_0,
            "g_t": None,
            "h_t": h_0
        }
    )

    # Start forward propagation through time
    h_t_previous = h_0
    c_t_previous = c_0

    for t in range(len(x)):
      # Perform calculation
      a_t = self.W_f @ h_t_previous + self.U_f @ x[t] + self.b_f
      f_t = sigmoid(a_t)

      b_t = self.W_i @ h_t_previous + self.U_i @ x[t] + self.b_i
      i_t = sigmoid(b_t)

      d_t = self.W_o @ h_t_previous + self.U_o @ x[t] + self.b_o
      o_t = sigmoid(i_t)

      e_t = self.W_c @ h_t_previous + self.U_c @ x[t] + self.b_c
      c_t_candidate = sigmoid(e_t)

      c_t = f_t * c_t_previous + i_t * c_t_candidate

      g_t = tanh(c_t)

      h_t = o_t * g_t

      # Cache the state
      self.state_cache.append(
          {
              "a_t": a_t,
              "f_t": f_t,
              "b_t": b_t,
              "i_t": i_t,
              "d_t": d_t,
              "o_t": o_t,
              "e_t": e_t,
              "c_t_candidate": c_t_candidate,
              "c_t": c_t,
              "g_t": g_t,
              "h_t": h_t
          }
      )

      # Set previous states
      c_t_previous = c_t
      h_t_previous = h_t

    return h_t

  def backward(self, dh_T):
    # Create placeholder for the parameter gradients
    dW_f = np.zeros_like(self.W_f)
    dU_f = np.zeros_like(self.U_f)
    db_f = np.zeros_like(self.b_f)

    dW_i = np.zeros_like(self.W_i)
    dU_i = np.zeros_like(self.U_i)
    db_i = np.zeros_like(self.b_i)

    dW_o = np.zeros_like(self.W_o)
    dU_o = np.zeros_like(self.U_o)
    db_o = np.zeros_like(self.b_o)

    dW_c = np.zeros_like(self.W_c)
    dU_c = np.zeros_like(self.U_c)
    db_c = np.zeros_like(self.b_c)

    # Create placeholder for d_c and d_h
    d_h = dh_T
    d_c = np.zeros((self.hidden_size,))

    # Start backpropagation through time
    for t in reversed(range(len(self.x_cache))):
      # Unpack to get the hidden state
      state_cache = self.state_cache[t+1]
      a_t = state_cache["a_t"]
      f_t = state_cache["f_t"]
      b_t = state_cache["b_t"]
      i_t = state_cache["i_t"]
      d_t = state_cache["d_t"]
      o_t = state_cache["o_t"]
      e_t = state_cache["e_t"]
      c_t_candidate = state_cache["c_t_candidate"]
      c_t = state_cache["c_t"]
      g_t = state_cache["g_t"]
      h_t = state_cache["h_t"]

      # Unpack to get previous hidden state
      previous_state_cache = self.state_cache[t]
      h_t_previous = previous_state_cache["h_t"]
      c_t_previous = previous_state_cache["c_t"]

      # Calculate the gradient
      d_o = d_h * g_t
      d_d = d_o * o_t * (1 - o_t)
      dW_o_current = np.outer(d_d, h_t_previous)
      dU_o_current = np.outer(d_d, self.x_cache[t])
      db_o_current = d_d
      d_h_prev_1 = d_d @ self.W_o

      d_g = d_h * o_t
      # This is still in question
      # IMO, the correct version should be 
      # d_c_t = d_c + d_g * (1 - g_t**2)
      # but this does not lead to convergence
      # The version down here does
      d_c_t = d_g * (1 - g_t**2) if t == len(self.x_cache) - 1 else d_c
      d_c = d_c_t * f_t
      d_f = d_c_t * c_t_previous
      d_i = d_c_t * c_t_candidate
      d_c_candidate = d_c_t * i_t

      d_a = d_f * f_t * (1 - f_t)
      dW_f_current = np.outer(d_a, h_t_previous)
      dU_f_current = np.outer(d_a, self.x_cache[t])
      db_f_current = d_a
      d_h_prev_2 = d_a @ self.W_f

      d_b = d_i * i_t * (1 - i_t)
      dW_i_current = np.outer(d_b, h_t_previous)
      dU_i_current = np.outer(d_b, self.x_cache[t])
      db_i_current = d_b
      d_h_prev_3 = d_b @ self.W_i

      d_e = d_c_candidate * c_t_candidate * (1 - c_t_candidate)
      dW_c_current = np.outer(d_e, h_t_previous)
      dU_c_current = np.outer(d_e, self.x_cache[t])
      db_c_current = d_e
      d_h_prev_4 = d_e @ self.W_c

      d_h_prev = d_h_prev_1 + d_h_prev_2 + d_h_prev_3 + d_h_prev_4

      # Add to the gradients
      dW_f += dW_f_current
      dU_f += dU_f_current
      db_f += db_f_current

      dW_i += dW_i_current
      dU_i += dU_i_current
      db_i += db_i_current

      dW_o += dW_o_current
      dU_o += dU_o_current
      db_o += db_o_current

      dW_c += dW_c_current
      dU_c += dU_c_current
      db_c += db_c_current

      # Assign to d_h
      d_h = d_h_prev

    # Gradient clipping
    dW_f = np.clip(dW_f, -.5, .5)
    dU_f = np.clip(dU_f, -.5, .5)
    db_f = np.clip(db_f, -.5, .5)

    dW_i = np.clip(dW_i, -.5, .5)
    dU_i = np.clip(dU_i, -.5, .5)
    db_i = np.clip(db_i, -.5, .5)

    dW_o = np.clip(dW_o, -.5, .5)
    dU_o = np.clip(dU_o, -.5, .5)
    db_o = np.clip(db_o, -.5, .5)

    dW_c = np.clip(dW_c, -.5, .5)
    dU_c = np.clip(dU_c, -.5, .5)
    db_c = np.clip(db_c, -.5, .5)

    # Perform gradient descent
    self.W_f -= self.learning_rate * dW_f
    self.U_f -= self.learning_rate * dU_f
    self.b_f -= self.learning_rate * db_f

    self.W_i -= self.learning_rate * dW_i
    self.U_i -= self.learning_rate * dU_i
    self.b_i -= self.learning_rate * db_i

    self.W_o -= self.learning_rate * dW_o
    self.U_o -= self.learning_rate * dU_o
    self.b_o -= self.learning_rate * db_o

    self.W_c -= self.learning_rate * dW_c
    self.U_c -= self.learning_rate * dU_c
    self.b_c -= self.learning_rate * db_c

In [45]:
# Test the layer
lstm = LongShortTermMemory(
    input_size=input_size,
    hidden_size=hidden_size
)

# Get random example
x, y = train_dataset[5]

# Forward 
h = lstm.forward(x)

# Check the dimension
assert h.shape == (hidden_size,)

# Check the length of input and state cache
assert len(lstm.x_cache) == len(x)
assert len(lstm.state_cache) == len(x) + 1

# Backward
dh_T = np.random.rand(hidden_size)
lstm.backward(dh_T)

In [22]:
def train(rnn, log, epochs = 2000):
  for i in range(epochs):
    loss = 0
    train_correct = 0
    test_correct = 0

    # Loop through items in train dataset and train
    for x, y in train_dataset:
      h_T = rnn.forward(x)
      y_hat = log.forward(h_T)

      current_loss = - y * np.log(y_hat) - (1 - y) * np.log(1 - y_hat)
      loss += current_loss

      if (y == 0 and y_hat < 0.5) or (y == 1 and y_hat >= 0.5):
        train_correct += 1

      dh_T = log.backward(y)
      rnn.backward(dh_T)

    # Calculate test accuracy
    for x, y in test_dataset:
      h_T = rnn.forward(x)
      y_hat = log.forward(h_T)

      if (y == 0 and y_hat < 0.5) or (y == 1 and y_hat >= 0.5):
        test_correct += 1

    print(f"Epochs: {i}")
    print(f"Loss: {loss / len(train_dataset)}")
    print(f"Train Accuracy: {train_correct / len(train_dataset)}")
    print(f"Test Accuracy: {test_correct / len(test_dataset)}")
    print("--------------------")

In [50]:
# Create GRU and Softmax
lstm_layer = LongShortTermMemory(
    input_size=input_size,
    hidden_size=hidden_size,
    learning_rate=0.01
)

log_layer = LogisticRegression(
    input_size=64,
    learning_rate=0.01
)

# Train
train(lstm_layer, log_layer)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epochs: 1000
Loss: 0.32022764687360444
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1001
Loss: 0.3193144627853504
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1002
Loss: 0.3184021256335517
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1003
Loss: 0.31749064903784724
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1004
Loss: 0.316580046556843
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1005
Loss: 0.3156703316877123
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1006
Loss: 0.31476151786580114
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1007
Loss: 0.31385361846423343
Train Accuracy: 0.8793103448275862
Test Accuracy: 0.95
--------------------
Epochs: 1008
Loss: 0