## Implementing a Vanilla RNN with Numpy 

In [27]:
import numpy as np 
import string 

In [28]:
# Sampla data 
inputs = np.array([
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["Z","Y","X","W","V","U","T","S","R","Q","P","O","N","M","L","K","J","I","H","G","F","E","D","C","B","A"],
    ["B","D","F","H","J","L","N","P","R","T","V","X","Z","A","C","E","G","I","K","M","O","Q","S","U","W","Y"],
    ["M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L"],
    ["H","G","F","E","D","C","B","A","L","K","J","I","P","O","N","M","U","T","S","R","Q","X","W","V","Z","Y"]
])

expected = np.array([
    ["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"],
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["C","E","G","I","K","M","O","Q","S","U","W","Y","A","B","D","F","H","J","L","N","P","R","T","V","X","Z"], 
    ["N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L","M"],
    ["I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H"]
])

In [29]:
def string_to_one_hot(inputs: np.ndarray) -> np.ndarray:
    char_to_index = {char: i for i, char in enumerate(string.ascii_uppercase)}
    # string.ascii_uppercase is a string containing all uppercase letters 
    # enumerate(string.ascii_uppercase) pairs each charatacter with its index 
    # {char: i for i, char in enume...} creates the mapping

    one_hot_inputs = [] # empty list of one_hot_inputs
    for row in inputs: # starts loop over each row in the input 
        one_hot_list = []
        for char in row:
            if char.upper() in char_to_index:
                one_hot_vector = np.zeros((len(string.ascii_uppercase), 1))
                one_hot_vector[char_to_index[char.upper()]] = 1
                one_hot_list.append(one_hot_vector)
        one_hot_inputs.append(one_hot_list)

    return np.array(one_hot_inputs)

In [30]:
# Input Layer 
class InputLayer: 
    inputs: np.ndarray 
    U: np.ndarray = None 
    delta_U: np.ndarray = None 

    def __init__(self, inputs: np.ndarray, hidden_size: int) -> None: 
        self.inputs = inputs 
        self.U = np.random.uniform(low = 0, high = 1, size = (hidden_size, len(inputs[0])))
        self.delta_U = np.zeros_like(self.U)
    # get_input -> Return the one-hot encoded vector of character at a given step 
    def get_input(self, time_step: int) -> np.ndarray:
        return self.U @ self.get_input(time_step)
    # return the result of U . x[t] to be used in s_t calculation
    def weighted_sum(self, time_step: int) -> np.ndarray: 
        return self.U @ self.get_input(time_step)

    def calculate_deltas_per_step(
        self, time_step: int, delta_weighted_sum: np.ndarray
    ) -> None: 
        self.delta_U += delta_weighted_sum @ self.get_input(time_step).T 
    # Returns the parameters using the gradient U = U - \alpha*dL/dU
    def update_weights_and_bias(self, learning_rate: float) -> None: 
        self.U -= learning_rate * self.delta_U 

In [31]:
# Hidden Layer 
class HiddenLayer: 
    states: np.ndarray = None # Stores activation of all time steps (internal memory of the network)
    W: np.ndarray = None # Recurrent weight matrix 
    delta_W: np.ndarray = None # gradient W of diuring BPTT 
    bias: np.ndarray = None # bias in math formulas 
    delta_bias: np.ndarray = None  # gradient of b 
    next_delta_activation: np.ndarray = None # derivative of next step's loss function w.r.t current activation, from
    # this formula 

    def __init__(self, vocab_size: int, size: int) -> None: 
        self.W = np.random.uniform(low = 0, high = 1, size = (size, size))
        self.bias = np.random.uniform(low = 0, high = 1, size = (size, 1))
        self.states = np.zeros(shape=(vocab_size, size, 1))
        self.next_delta_activation = np.zeros(shape=(size, 1))
        self.delta_bias = np.zeros_like(self.bias)
        self.delta_W = np.zeros_like(self.W)
    # Return the hidden state value at a given step. if time step is less than 0, default to all zeros matrix 
    def get_hidden_state(self, time_step: int) -> np.ndarray: 
        # If starting out at the beginning of the sequence, a[t - 1] will return zeros 
        if time_step < 0: 
            return np.zeros_like(self.states[0])
        return self.states[time_step]
    # Updating the state at a time steep after forward pass calculation
    def set_hidden_state(self, time_step: int, hidden_state: np.ndarray) -> None: 
        self.states[time_step] = hidden_state 
    # Forward pass calculation 
    def activate(self, weighted_input: np.ndarray, time_step: int) -> np.ndarray: 
        previous_hidden_state = self.get_hidden_state(time_step - 1)
        # W @ h_prev => (h_dimension, h_dimension) @ (h_dimension, 1) = (h_dimension, 1)
        weighted_hidden_state = self.W @ previous_hidden_state 
        # (h_dimension, 1) + (h_dimension, 1) + (h_dimension, 1) = (h_dimension, 1)
        weighted_sum = weighted_input + weighted_hidden_stae + self.bias 
        activation = np.tanh(weighted_sum) # (h_dimension, 1)
        self.hidden_state(time_step, activation)
        return activation

    def calculate_deltas_per_step(
        self, time_step: int, delta_output: np.ndarray
    ) -> np.ndarray:
        # (h_dimension, 1) + (h_dimension, 1) = (h_dimension, 1)
        delta_activatiation = delta_output + self.next_delta_activation
        # (h_dimension, 1) * scalar = (h_dimension, 1) 
        delta_weighted_sum = delta_activation * (
            1 - self.get_hidden_state(time_step) ** 2 
        )
        # (h_dimension, h_dimension) @ (h_dimension, 1) = (h_dimension, 1)
        self.next_delta_activation = self.W.T @ delta_weighed_sum 

        # (h_dimension, 1) @ (1, h_dimension) = (h_dimension, h_dimension)
        self.delta_W += delta_weighed_sum @ self.get_hidden_state(time_step - 1).T 

        # Derivative of hidden bias is the same as dL_ds 
        self.delta_bias += delta_weighed_sum
        return delta_weighed_sum

    def update_weights_and_bias(self, learning_rate: float) -> None: 
        self.W -= learning_rate * self.delta_W 
        self.bias -= learning_rate * self.delta_bias 

In [32]:
# Output Layer 
class OutputLayer:
    states: np.ndarray = None # Stores predictions of all time  steps 
    V: np.ndarray = None # Output weights matrix 
    bias: np.ndarray = None # c in math formulas
    delta_bias: np.ndarray = None # gradient of c
    delta_V: np.ndarray = None # output of weight matrix

    def __init__(self, size: int, hidden_size: int) -> None: 
        self.V = np.random.uniform(low = 0, high = 1, size=(size, hidden_size))
        self.bias = np.random.uniform(low=0, high=1, size=(size, 1))
        self.delta_bias = np.zeros_like(self.bias)
        self.delta_V = np.zeros_like(self.V)
    # predict: forward pass to calculate the weighted output and probability distribution with softmax 
    def predict(self, hidden_state: np.ndarray, time_step: int) -> np.ndarray: 
        # V @ h => (input_size, h_dimension) @ (h_dimension, 1) = (input_size, 1)
        # (input_size, 1) + (input_size, 1) = (input_size, 1)
        output = self.V @ hidden_state + self.bias 
        prediction = sofmax(output)
        self.set_state(time_step, prediction)
        return prediction 
    # Return the output state (prediction) value at a given time step
    def get_state(self, time_step: int) -> np.ndarray: 
        return self.states[time_step]
    # Updating the output state at a time step after forward pass calculation
    def set_state(self, time_step: int, prediction: np.ndarray) -> None: 
        self.states[time_step] = prediction
    # compute gradients of v and c
    def calculate_deltas_per_step(
        self, 
        expected: np.ndarray, 
        hidden_state: np.ndarray, 
        time_step: int, 
    ) -> np.ndarray:
        # dL_do = dL_dyhat * dyhat_do = derivative of loss function * derivative of softmax 
        # dl_do = step.y_hat - expected[step_number]
        delta_output = self.get_state(time_step) - expected # (input_size, 1)

        # (input_size, 1) @ (1, hidden_size) = (input_size, hidden_size)
        self.delta_V += delta_output @ hidden_state.T 

        # dL_dc += dL_do 
        self.delta_bias += delta_output 
        return self.V.T @ delta_output 

    def update_weights_and_bias(self, learning_rate: float) -> None: 
        self.V -= learning_rate * self.delta_V
        self.bias -= learning_rate * self.delta_bias 

In [39]:
class VanillaRNN:
    hidden_layer: HiddenLayer
    output_layer: OutputLayer
    alpha: float  # learning rate
    input_layer: InputLayer = None

    def __init__(self, vocab_size: int, hidden_size: int, alpha: float) -> None:
        self.hidden_layer = HiddenLayer(vocab_size, hidden_size)
        self.output_layer = OutputLayer(vocab_size, hidden_size)
        self.hidden_size = hidden_size
        self.alpha = alpha

    def feed_forward(self, inputs: np.ndarray) -> OutputLayer:
        self.input_layer = InputLayer(inputs, self.hidden_size)
        for step in range(len(inputs)):
            weighted_input = self.input_layer.weighted_sum(step)
            activation = self.hidden_layer.activate(weighted_input, step)
            self.output_layer.predict(activation, step)
        return self.output_layer

    def backpropagation(self, expected: np.ndarray) -> None:
        for step_number in reversed(range(len(expected))):
            delta_output = self.output_layer.calculate_deltas_per_step(
                expected[step_number],
                self.hidden_layer.get_hidden_state(step_number),
                step_number,
            )
            delta_weighted_sum = self.hidden_layer.calculate_deltas_per_step(
                step_number, delta_output
            )
            self.input_layer.calculate_deltas_per_step(step_number, delta_weighted_sum)

        self.output_layer.update_weights_and_bias(self.alpha)
        self.hidden_layer.update_weights_and_bias(self.alpha)
        self.input_layer.update_weights_and_bias(self.alpha)

    def loss(self, y_hat: list[np.ndarray], y: list[np.ndarray]) -> float:
        """
        Cross-entropy loss function - Calculating difference between 2 probability distributions.
        First, calculate cross-entropy loss for each time step with np.sum, which returns a numpy array
        Then, sum across individual losses of all time steps with sum() to get a scalar value.
        :param y_hat: predicted value
        :param y: expected value - true label
        :return: total loss
        """
        return sum(-np.sum(y[i] * np.log(y_hat[i]) for i in range(len(y))))

    def train(self, inputs: np.ndarray, expected: np.ndarray, epochs: int) -> None:
        for epoch in range(epochs):
            print(f"epoch={epoch}")
            for idx, input in enumerate(inputs):
                y_hats = self.feed_forward(input)
                self.backpropagation(expected[idx])
                print(
                    f"Loss round: {self.loss([y for y in y_hats.states], expected[idx])}"
                )

In [40]:
one_hot_inputs = string_to_one_hot(inputs)
one_hot_expected = string_to_one_hot(expected)

In [41]:
rnn = VanillaRNN(vocab_size=len(string.ascii_uppercase), hidden_size=128, alpha=0.0001)
rnn.train(one_hot_inputs, one_hot_expected, epochs=10)

epoch=0


RecursionError: maximum recursion depth exceeded

## Vanilla RNN in PyTorch 

In [42]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import string

# Seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Prepare input and expected data
inputs = np.array([
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["Z","Y","X","W","V","U","T","S","R","Q","P","O","N","M","L","K","J","I","H","G","F","E","D","C","B","A"],
    ["B","D","F","H","J","L","N","P","R","T","V","X","Z","A","C","E","G","I","K","M","O","Q","S","U","W","Y"],
    ["M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L"],
    ["H","G","F","E","D","C","B","A","L","K","J","I","P","O","N","M","U","T","S","R","Q","X","W","V","Z","Y"]
])

expected = np.array([
    ["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"],
    ["A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"],
    ["C","E","G","I","K","M","O","Q","S","U","W","Y","A","B","D","F","H","J","L","N","P","R","T","V","X","Z"],
    ["N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H","I","J","K","L","M"],
    ["I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A","B","C","D","E","F","G","H"]
])

# Encode strings to one-hot vectors
def encode_sequences(sequences):
    encoded = np.vectorize(string.ascii_uppercase.index)(sequences)
    one_hot = np.eye(26)[encoded]
    return torch.FloatTensor(one_hot)

# Prepare input and target data
X = encode_sequences(inputs)
y = encode_sequences(expected)

# RNN Model
class AlphabetRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(AlphabetRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out

# Hyperparameters
input_size = 26  # One-hot encoded input size
hidden_size = 128
output_size = 26
learning_rate = 0.001
epochs = 10

# Initialize model, loss, and optimizer
model = AlphabetRNN(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = model(X)
    
    # Reshape outputs and targets for loss calculation
    loss = criterion(outputs.view(-1, output_size), y.argmax(dim=2).view(-1))
    loss.backward()
    optimizer.step()
    
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# Prediction
new_input = np.array([["B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z","A"]])
new_input_encoded = encode_sequences(new_input)

# Make prediction
with torch.no_grad():
    prediction = model(new_input_encoded)
    predicted_index = prediction[0, -1].argmax().item()

print("Predicted index:", predicted_index)
print("Predicted character:", string.ascii_uppercase[predicted_index])

Epoch [1/10], Loss: 3.2564
Epoch [2/10], Loss: 3.2435
Epoch [3/10], Loss: 3.2306
Epoch [4/10], Loss: 3.2176
Epoch [5/10], Loss: 3.2045
Epoch [6/10], Loss: 3.1911
Epoch [7/10], Loss: 3.1773
Epoch [8/10], Loss: 3.1630
Epoch [9/10], Loss: 3.1482
Epoch [10/10], Loss: 3.1327
Predicted index: 24
Predicted character: Y
