In [18]:
import numpy as np
import string
from typing import List

In [3]:
## Sample data consisting of English Alphabets 
inputs = np.array([
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["Z","Y","X","W","V","U","T","S","R","Q","P","O","N","M","L","K","J","I","H","G","F","E","D","C","B","A"],
    ["B","D","F","H","J","L","N","P","R","T","V","X","Z","A","C","E","G","I","K","M","O","Q","S","U","W","Y"],
    ["M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L"],
    ["H","G","F","E","D","C","B","A","L","K","J","I","P","O","N","M","U","T","S","R","Q","X","W","V","Z","Y"]
])

expected = np.array([
    ["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"],
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["C","E","G","I","K","M","O","Q","S","U","W","Y","A","B","D","F","H","J","L","N","P","R","T","V","X","Z"], 
    ["N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L","M"],
    ["I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H"]
])

In [24]:
## Function to convert the list of strings into a list of one hot encoded vectors
def string_to_one_hot(inputs: np.ndarray) -> np.ndarray:
    char_to_index = {char: i for i, char in enumerate(string.ascii_uppercase)}

    one_hot_inputs = []
    for row in inputs:
        one_hot_list = []
        for char in row:
            if char.upper() in char_to_index:
                one_hot_vector = np.zeros((len(string.ascii_uppercase), 1))
                one_hot_vector[char_to_index[char.upper()]] = 1
                one_hot_list.append(one_hot_vector)
        one_hot_inputs.append(one_hot_list)

    return np.array(one_hot_inputs)

import numpy as np

def softmax(x):
    exp_x = np.exp(x - np.max(x))  # Numerical stability
    return exp_x / np.sum(exp_x)


In [5]:
## Each input sequence will have the input shape of (26, 26, 1)
class InputLayer:
    inputs: np.ndarray # sequential data in the form of np arrays
    U: np.ndarray = None # weight matrix connecting input to hidden layer
    delta_U: np.ndarray = None # gradient calculated during BPTT

    def __init__(self, inputs: np.ndarray, hidden_size: int) -> None:
        self.inputs = inputs
        self.U = np.random.uniform(low=0, high=1, size=(hidden_size, len(inputs[0])))
        self.delta_U = np.zeros_like(self.U)

    # Returns the one-hot encoded vector of the character at a give time step
    def get_input(self, time_step: int) -> np.ndarray:
        return self.inputs[time_step]

    # Return the result of U . x[t] to be used in s_t
    def weighted_sum(self, time_step: int) -> np.ndarray:
        return self.U @ self.get_input(time_step)

    def calculate_deltas_per_step(
        self, time_step: int, delta_weighted_sum: np.ndarray
    ) -> None:
        # (h_dimesnion, 1) @ 
        self.delta_U += delta_weighted_sum @ self.get_input(time_step).T

    def update_weights_and_bias(self, learning_rate: float) -> None:
        self.U -= learning_rate * self.delta_U

In [30]:
class HiddenLayer:
    states: np.ndarray = None # Stores activations of all time steps
    W: np.ndarray = None # Recurrent weight matrix
    delta_W: np.ndarray = None # gradient of W during BPTT
    bias: np.ndarray = None # b in the math formulas
    delta_bias: np.ndarray = None # gradient of b
    next_delta_activation: np.ndarray = None # stores the derivative of next step's loss function w.r.t. current activation

    def __init__(self, vocab_size: int, size: int) -> None:
        self.W = np.random.uniform(low=0, high=1, size=(size, size))
        self.bias = np.random.uniform(low=0, high=1, size=(size, 1))
        self.states = np.zeros(shape=(vocab_size, size, 1))
        self.next_delta_activation = np.zeros(shape=(size, 1))
        self.delta_bias = np.zeros_like(self.bias)
        self.delta_W = np.zeros_like(self.W)

    ## Return the hidden state value at a given time step. if time step is leass than 0
    ## default to all zeros matrix
    def get_hidden_state(self, time_step: int) -> np.ndarray:
        # If starting out at the beginning of the sequence, a[t-1] will return zero
        if time_step < 0:
            return np.zeros_like(self.states[0])
        return self.states[time_step]

    # Updating the state at a time step after forward pass calculation
    def set_hidden_state(self, time_step: int, hidden_state: np.ndarray) -> None:
        self.states[time_step] = hidden_state

    # Forward pass calculation
    def activate(self, weighted_input: np.ndarray, time_step: int) -> np.ndarray:
        previous_hidden_state = self.get_hidden_state(time_step - 1)
        # W @ h_prev => (h_dimension, h_dimension) @ (h_dimension, 1) = (h_dimension, 1)
        weighted_hidden_state = self.W @ previous_hidden_state
        # (h_dimension, 1) + (h_dimension, 1) + (h_dimension, 1) = (h_dimension, 1)
        weighted_sum = weighted_input + weighted_hidden_state + self.bias
        activation = np.tanh(weighted_sum) # (h_dimension, 1)
        self.set_hidden_state(time_step, activation)
        return activation

    # Gradients of w and b
    def calculate_deltas_per_step(
        self, time_step: int, delta_output: np.ndarray
    ) -> np.ndarray:
        # (h_dimension, 1) + (h_dimension, 1) = (h_dimension, 1)
        delta_activation = delta_output + self.next_delta_activation
        # (h_dimension, 1) * scalar = (h_dimension, 1)
        delta_weighted_sum = delta_activation * (
            1 - self.get_hidden_state(time_step) ** 2
        )
        # (h_dimension, h_dimension) @ (1, h_dimension) = (h_dimension, h_dimension)
        self.delta_W += delta_weighted_sum @ self.get_hidden_state(time_step - 1).T

        # derivative of hidden bias is the same as dL_ds
        self.delta_bias += delta_weighted_sum
        return delta_weighted_sum

    # updating the parameters of gradients
    def update_weights_and_bias(self, learning_rate: float) -> None:
        self.W -= learning_rate * self.delta_W
        self.bias -= learning_rate * self.delta_bias
        

In [28]:
## Output layer
class OutputLayer:
    states: np.ndarray = None # internal memory of the networkk
    V: np.ndarray = None # output weight matrix
    bias: np.ndarray = None # c in the math formulas 
    delta_bias: np.ndarray = None # gradient of c 
    delta_V: np.ndarray = None # gradient of V during BPTT

    def __init__(self, size: int, hidden_size: int) -> None:
        self.V = np.random.uniform(low=0, high=1, size=(size, hidden_size))
        self.bias = np.random.uniform(low=0, high=1, size=(size, 1))
        self.states = np.zeros(shape=(size, size, 1))
        self.delta_bias = np.zeros_like(self.bias)
        self.delta_V = np.zeros_like(self.V)

    #  forward pass to calculate the weighted output and probability distribution with softmax
    def predict(self, hidden_state: np.ndarray, time_step: int) -> np.ndarray:
        # V @ h => (input_size, h_dimension) @ (h_dimension, 1) = (input_size, 1)
        # (input_size, 1) + (input_size, 1) = (input_size, 1)
        output = self.V @ hidden_state + self.bias
        prediction = softmax(output)
        self.set_state(time_step, prediction)
        return prediction

    # return the output state (prediction) value at a given time step
    def get_state(self, time_step: int) -> np.ndarray:
        return self.states[time_step]

    #  updating the output state at a time step after forward pass calculation.
    def set_state(self, time_step: int, prediction: np.ndarray) -> None:
        self.states[time_step] = prediction

    # compute gradients of V and c
    def calculate_deltas_per_step(
        self, 
        expected: np.ndarray, 
        hidden_state: np.ndarray, 
        time_step: int
    ) -> np.ndarray:
        # dL_do = dL_dyhat * dyhat_do = derivative of loss function * derivative of softmax
        # dL_do = step.y_hat - expected[step_number]
        delta_output = self.get_state(time_step) - expected # (input_size, 1)

        self.delta_V += delta_output @ hidden_state.T

        self.delta_bias += delta_output
        return self.V.T @ delta_output

    # updating the parameters using the gradient
    def update_weights_and_bias(self, learning_rate: float) -> None:
        self.V -= learning_rate * self.delta_V
        self.bias -= learning_rate * self.delta_bias
        

In [15]:
from typing import List

In [16]:
class VanillaRNN:
    hidden_layer: HiddenLayer
    output_layer: OutputLayer
    alpha: float  # learning rate
    input_layer: InputLayer = None

    def __init__(self, vocab_size: int, hidden_size: int, alpha: float) -> None:
        self.hidden_layer = HiddenLayer(vocab_size, hidden_size)
        self.output_layer = OutputLayer(vocab_size, hidden_size)
        self.hidden_size = hidden_size
        self.alpha = alpha

    def feed_forward(self, inputs: np.ndarray) -> OutputLayer:
        self.input_layer = InputLayer(inputs, self.hidden_size)
        for step in range(len(inputs)):
            weighted_input = self.input_layer.weighted_sum(step)
            activation = self.hidden_layer.activate(weighted_input, step)
            self.output_layer.predict(activation, step)
        return self.output_layer

    def backpropagation(self, expected: np.ndarray) -> None:
        for step_number in reversed(range(len(expected))):
            delta_output = self.output_layer.calculate_deltas_per_step(
                expected[step_number],
                self.hidden_layer.get_hidden_state(step_number),
                step_number,
            )
            delta_weighted_sum = self.hidden_layer.calculate_deltas_per_step(
                step_number, delta_output
            )
            self.input_layer.calculate_deltas_per_step(step_number, delta_weighted_sum)

        self.output_layer.update_weights_and_bias(self.alpha)
        self.hidden_layer.update_weights_and_bias(self.alpha)
        self.input_layer.update_weights_and_bias(self.alpha)

    def loss(self, y_hat: List[np.ndarray], y: List[np.ndarray]) -> float:
        """
        Cross-entropy loss function - Calculating difference between 2 probability distributions.
        First, calculate cross-entropy loss for each time step with np.sum, which returns a numpy array
        Then, sum across individual losses of all time steps with sum() to get a scalar value.
        :param y_hat: predicted value
        :param y: expected value - true label
        :return: total loss
        """
        return sum(-np.sum(y[i] * np.log(y_hat[i]) for i in range(len(y))))

    def train(self, inputs: np.ndarray, expected: np.ndarray, epochs: int) -> None:
        for epoch in range(epochs):
            print(f"epoch={epoch}")
            for idx, input in enumerate(inputs):
                y_hats = self.feed_forward(input)
                self.backpropagation(expected[idx])
                print(f'Loss round: {self.loss([y for y in y_hats.states], expected[idx])}')

In [31]:
if __name__ == "__main__":
  inputs = np.array([
      ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
      ["Z","Y","X","W","V","U","T","S","R","Q","P","O","N","M","L","K","J","I","H","G","F","E","D","C","B","A"],
      ["B","D","F","H","J","L","N","P","R","T","V","X","Z","A","C","E","G","I","K","M","O","Q","S","U","W","Y"],
      ["M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L"],
      ["H","G","F","E","D","C","B","A","L","K","J","I","P","O","N","M","U","T","S","R","Q","X","W","V","Z","Y"]
  ])

  expected = np.array([
      ["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"],
      ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
      ["C","E","G","I","K","M","O","Q","S","U","W","Y","A","B","D","F","H","J","L","N","P","R","T","V","X","Z"], 
      ["N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L","M"],
      ["I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H"]
  ])
  
  one_hot_inputs = string_to_one_hot(inputs)
  one_hot_expected = string_to_one_hot(expected)

  # Forward pass through time, no gradient clipping yet so there will be gradient exploding problem
  # https://stackoverflow.com/a/33980220
  # https://stackoverflow.com/a/72494516
  rnn = VanillaRNN(vocab_size=len(string.ascii_uppercase), hidden_size=128, alpha=0.0001)
  rnn.train(one_hot_inputs, one_hot_expected, epochs=10)

  new_inputs = np.array([["B", "C", "D"]])
  for input in string_to_one_hot(new_inputs):
      predictions = rnn.feed_forward(input)
      output = np.argmax(predictions.states[-1])
      print(output) # index of the one-hot value of prediction
      print(string.ascii_uppercase[output]) # mapping one hot to character

epoch=0
Loss round: [258.07744889]
Loss round: [251.003623]
Loss round: [237.64500754]
Loss round: [216.04396964]
Loss round: [198.61126984]
epoch=1
Loss round: [185.05408263]


  return sum(-np.sum(y[i] * np.log(y_hat[i]) for i in range(len(y))))


Loss round: [176.52511383]
Loss round: [166.3217971]
Loss round: [156.79101613]
Loss round: [152.19618862]
epoch=2
Loss round: [148.09753291]
Loss round: [144.20883191]
Loss round: [139.95248213]
Loss round: [135.80280863]
Loss round: [134.07294609]
epoch=3
Loss round: [133.16657197]
Loss round: [132.74015163]
Loss round: [133.17719793]
Loss round: [131.79141193]
Loss round: [131.80301695]
epoch=4
Loss round: [132.55487368]
Loss round: [133.72217959]
Loss round: [133.68342362]
Loss round: [132.91677432]
Loss round: [133.91231748]
epoch=5
Loss round: [135.59427731]
Loss round: [137.84004223]
Loss round: [136.36046176]
Loss round: [138.67763905]
Loss round: [141.45929627]
epoch=6
Loss round: [144.1266439]
Loss round: [147.94379505]
Loss round: [149.83785316]
Loss round: [153.08789014]
Loss round: [157.14484135]
epoch=7
Loss round: [160.53942399]
Loss round: [165.93550777]
Loss round: [168.97673589]
Loss round: [173.84681062]
Loss round: [179.77954496]
epoch=8
Loss round: [183.08293392]
L

## PyTorch Implementation

In [1]:
import numpy as np
import string
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

if __name__ == "__main__":
    inputs = np.array([
        ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
        ["Z","Y","X","W","V","U","T","S","R","Q","P","O","N","M","L","K","J","I","H","G","F","E","D","C","B","A"],
        ["B","D","F","H","J","L","N","P","R","T","V","X","Z","A","C","E","G","I","K","M","O","Q","S","U","W","Y"],
        ["M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L"],
        ["H","G","F","E","D","C","B","A","L","K","J","I","P","O","N","M","U","T","S","R","Q","X","W","V","Z","Y"]
    ])
    expected = np.array([
        ["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"],
        ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
        ["C","E","G","I","K","M","O","Q","S","U","W","Y","A","B","D","F","H","J","L","N","P","R","T","V","X","Z"],
        ["N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L","M"],
        ["I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H"]
    ])
    
    # Encode strings to int indexes
    input_encoded = np.vectorize(string.ascii_uppercase.index)(inputs)
    expected_encoded = np.vectorize(string.ascii_uppercase.index)(expected)
    
    # Convert to PyTorch tensors
    # PyTorch RNN expects input shape as (seq_len, batch, input_size)
    # We'll transpose the data to match this format
    X = torch.FloatTensor(input_encoded).long()
    y = torch.FloatTensor(expected_encoded).long()
    
    # One-hot encode inputs and targets
    # PyTorch has a cleaner way to handle this with embedding or using CrossEntropyLoss
    X_one_hot = torch.nn.functional.one_hot(X, num_classes=26).float()
    y_one_hot = torch.nn.functional.one_hot(y, num_classes=26).float()
    
    # Reshape to (batch_size, seq_len, features)
    X_one_hot = X_one_hot
    y_one_hot = y_one_hot
    
    # Create PyTorch model
    class SimpleRNN(nn.Module):
        def __init__(self, input_size, hidden_size, output_size):
            super(SimpleRNN, self).__init__()
            self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
            self.fc = nn.Linear(hidden_size, output_size)
            
        def forward(self, x):
            # x shape: (batch_size, seq_len, input_size)
            rnn_out, _ = self.rnn(x)
            # rnn_out shape: (batch_size, seq_len, hidden_size)
            output = self.fc(rnn_out)
            # output shape: (batch_size, seq_len, output_size)
            return output
    
    # Initialize model, loss function, and optimizer
    model = SimpleRNN(input_size=26, hidden_size=128, output_size=26)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())
    
    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        # Forward pass
        outputs = model(X_one_hot)
        
        # Compute loss
        # Reshape for CrossEntropyLoss: [batch_size*seq_len, num_classes]
        loss = criterion(
            outputs.reshape(-1, 26), 
            y.reshape(-1)
        )
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    
    # Test with new input
    new_inputs = np.array([["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"]])
    new_inputs_encoded = np.vectorize(string.ascii_uppercase.index)(new_inputs)
    new_inputs_tensor = torch.tensor(new_inputs_encoded).long()
    new_inputs_one_hot = torch.nn.functional.one_hot(new_inputs_tensor, num_classes=26).float()
    
    # Make prediction
    model.eval()
    with torch.no_grad():
        prediction = model(new_inputs_one_hot)
        
    # Get prediction of last time step and last element
    prediction = torch.argmax(prediction[0, -1]).item()
    
    print(prediction)
    print(string.ascii_uppercase[prediction])

Epoch [1/10], Loss: 3.2638
Epoch [2/10], Loss: 3.2507
Epoch [3/10], Loss: 3.2377
Epoch [4/10], Loss: 3.2247
Epoch [5/10], Loss: 3.2116
Epoch [6/10], Loss: 3.1983
Epoch [7/10], Loss: 3.1847
Epoch [8/10], Loss: 3.1707
Epoch [9/10], Loss: 3.1561
Epoch [10/10], Loss: 3.1409
17
R
