# Assignment 3: LSTM and GRU vs. Multiplicative Variations

### Maxim Ryabinov (U02204083)
### CAP4641: Natural Language Processing 
### Instructor: Dr. Ankur Mali 
### University of South Florida (Spring 2025)

---

# Description

In this assignment, I implemented Standard LSTM RNN, Standard GRU RNN, Multiplicative LSTM RNN, and Multiplicative GRU RNN. My choice for the machine learning library used in this notebook is TensorFlow.

Below, you will find an implementation for each recurrent neural network architecture, all following a set of model equations that each of the architectures are based off of.

Lastly, in order to show robustness and highlight the differences in gating mechanisms between the architectures, the Copy Task is carried out. Each model is ran through this task using varying lengths of sequences that contain a randomly generated set of characters (sequences lengths: `{100, 200, 500, 1000}`).



---

# 1. Initial Setup

In [32]:
import tensorflow as tf
import numpy as np
import time
import os
import random

# Set seeds for reproducibility
seed = 123

tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

# Make TensorFlow deterministic
os.environ['TF_DETERMINISTIC_OPS'] = '1'  # Force TensorFlow to use deterministic ops
os.environ['TF_CUDNN_DETERMINISTIC'] = '1'  # Ensure cuDNN is deterministic

# 2. Defining each RNN Architecture

### Standard LSTM RNN Implementation

In [33]:
class StandardLSTMCell(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size

        # Input gate weights
        self.W_i = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_i = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_i = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Forget gate weights
        self.W_f = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_f = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_f = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Output gate weights
        self.W_o = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_o = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_o = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Cell candidate weights
        self.W_c = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_c = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_c = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)

    def call(self, x_t, h_prev, c_prev):
        i_t = tf.sigmoid(tf.matmul(x_t, self.W_i) + tf.matmul(h_prev, self.U_i) + self.b_i)
        f_t = tf.sigmoid(tf.matmul(x_t, self.W_f) + tf.matmul(h_prev, self.U_f) + self.b_f)
        o_t = tf.sigmoid(tf.matmul(x_t, self.W_o) + tf.matmul(h_prev, self.U_o) + self.b_o)
        c_hat = tf.tanh(tf.matmul(x_t, self.W_c) + tf.matmul(h_prev, self.U_c) + self.b_c)
        
        c_t = f_t * c_prev + i_t * c_hat
        h_t = o_t * tf.tanh(c_t)
        
        return h_t, c_t

# ------- Higher-level TF RNN that unrolls over time -------
class StandardLSTM(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.lstm_cell = StandardLSTMCell(input_size, hidden_size)
        
        # Output projection
        self.W_out = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_out = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, X, initial_state = None):
        # X: [batch_size, seq_length, input_size]
        batch_size = tf.shape(X)[0]
        seq_length = tf.shape(X)[1]
        
        if initial_state == None:
            h = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
            c = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
        else:
            h, c = initial_state

        outputs = []
        
        for t in range(seq_length):
            x_t = X[:, t, :]
            h, c = self.lstm_cell(x_t, h, c)
            out_t = tf.matmul(h, self.W_out) + self.b_out
            outputs.append(tf.expand_dims(out_t, axis=1))
        return tf.concat(outputs, axis=1), (h, c)  # [batch_size, seq_length, input_size] and h, c

### Multiplicative LSTM RNN Implementation

In [34]:
class MultiplicativeLSTMCell(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size

        # Input gate weights and bias
        self.W_i = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_i = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_i = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Forget gate weights and bias
        self.W_f = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_f = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_f = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Output gate weights and bias
        self.W_o = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_o = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_o = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Cell candidate weights and bias
        self.W_c = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_c = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_c = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)

        # Multiplicative extension weights and bias
        self.W_m = self.add_weight(shape=(input_size, input_size), initializer="random_normal", trainable=True)
        self.U_m = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_m = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, x_t, h_prev, c_prev):
        # Multiplicative Extension
        m_t = tf.matmul(x_t, self.W_m) + tf.matmul(h_prev, self.U_m) + self.b_m
        x_cap = m_t * x_t

        i_t = tf.sigmoid(tf.matmul(x_cap, self.W_i) + tf.matmul(h_prev, self.U_i) + self.b_i)
        f_t = tf.sigmoid(tf.matmul(x_cap, self.W_f) + tf.matmul(h_prev, self.U_f) + self.b_f)
        o_t = tf.sigmoid(tf.matmul(x_cap, self.W_o) + tf.matmul(h_prev, self.U_o) + self.b_o)
        c_hat = tf.tanh(tf.matmul(x_cap, self.W_c) + tf.matmul(h_prev, self.U_c) + self.b_c)
        
        c_t = f_t * c_prev + i_t * c_hat
        h_t = o_t * tf.tanh(c_t)
        
        return h_t, c_t

class MultiplicativeLSTM(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.lstm_cell = MultiplicativeLSTMCell(input_size, hidden_size)
        
        # Output projection
        self.W_out = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_out = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, X, initial_state=None):
        # X: [batch_size, seq_length, input_size]
        batch_size = tf.shape(X)[0]
        seq_length = tf.shape(X)[1]

        if initial_state == None:
            h = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
            c = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
        else:
            h, c = initial_state

        outputs = []
        
        for t in range(seq_length):
            x_t = X[:, t, :]
            h, c = self.lstm_cell(x_t, h, c)
            out_t = tf.matmul(h, self.W_out) + self.b_out
            outputs.append(tf.expand_dims(out_t, axis=1))
        return tf.concat(outputs, axis=1), (h, c)  # [batch_size, seq_length, input_size] and h, c

### Standard GRU RNN Implementation

In [35]:
class StandardGRUCell(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size

        # Update gate weights
        self.W_z = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_z = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_z = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Reset gate weights
        self.W_r = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_r = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_r = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Candidate hidden state weights
        self.W_h = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_h = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_h = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)

    def call(self, x_t, h_prev):
        # Update gate
        z_t = tf.sigmoid(tf.matmul(x_t, self.W_z) + tf.matmul(h_prev, self.U_z) + self.b_z)
        
        # Reset gate
        r_t = tf.sigmoid(tf.matmul(x_t, self.W_r) + tf.matmul(h_prev, self.U_r) + self.b_r)
        
        # Candidate hidden state
        h_hat = tf.tanh(tf.matmul(x_t, self.W_h) + tf.matmul(r_t * h_prev, self.U_h) + self.b_h)
        
        # New hidden state
        h_t = (1 - z_t) * h_prev + z_t * h_hat
        
        return h_t

# ------- Higher-level TF RNN that unrolls over time -------
class StandardGRU(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.gru_cell = StandardGRUCell(input_size, hidden_size)
        
        # Output projection
        self.W_out = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_out = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, X, initial_state=None):
        # X: [batch_size, seq_length, input_size]
        batch_size = tf.shape(X)[0]
        seq_length = tf.shape(X)[1]
        
        if initial_state == None:
            h = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
        else:
            h = initial_state

        outputs = []
        
        for t in range(seq_length):
            x_t = X[:, t, :]
            h = self.gru_cell(x_t, h)
            out_t = tf.matmul(h, self.W_out) + self.b_out
            outputs.append(tf.expand_dims(out_t, axis=1))
        return tf.concat(outputs, axis=1), h  # [batch_size, seq_length, input_size] and h

### Multiplicative GRU RNN Implementation

In [36]:
class MultiplicativeGRUCell(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size

        # Update gate weights
        self.W_z = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_z = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_z = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Reset gate weights
        self.W_r = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_r = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_r = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)
        
        # Candidate hidden state weights
        self.W_h = self.add_weight(shape=(input_size, hidden_size), initializer="random_normal", trainable=True)
        self.U_h = self.add_weight(shape=(hidden_size, hidden_size), initializer="random_normal", trainable=True)
        self.b_h = self.add_weight(shape=(hidden_size,), initializer="zeros", trainable=True)

        # Multiplicative extension weights and bias
        self.W_m = self.add_weight(shape=(input_size, input_size), initializer="random_normal", trainable=True)
        self.U_m = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_m = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, x_t, h_prev):
        # Memory matrix introduction
        m_t = tf.matmul(x_t, self.W_m) + tf.matmul(h_prev, self.U_m) + self.b_m
        x_cap = m_t * x_t

        # Update gate
        z_t = tf.sigmoid(tf.matmul(x_cap, self.W_z) + tf.matmul(h_prev, self.U_z) + self.b_z)
        
        # Reset gate
        r_t = tf.sigmoid(tf.matmul(x_cap, self.W_r) + tf.matmul(h_prev, self.U_r) + self.b_r)
        
        # Candidate hidden state
        h_hat = tf.tanh(tf.matmul(x_cap, self.W_h) + tf.matmul(r_t * h_prev, self.U_h) + self.b_h)
        
        # New hidden state
        h_t = (1 - z_t) * h_prev + z_t * h_hat
        
        return h_t

# ------- Higher-level TF RNN that unrolls over time -------
class MultiplicativeGRU(tf.keras.layers.Layer):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.gru_cell = MultiplicativeGRUCell(input_size, hidden_size)
        
        # Output projection
        self.W_out = self.add_weight(shape=(hidden_size, input_size), initializer="random_normal", trainable=True)
        self.b_out = self.add_weight(shape=(input_size,), initializer="zeros", trainable=True)

    def call(self, X, initial_state=None):
        # X: [batch_size, seq_length, input_size]
        batch_size = tf.shape(X)[0]
        seq_length = tf.shape(X)[1]
        h = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
        
        if initial_state == None:
            h = tf.zeros((batch_size, self.hidden_size), dtype=X.dtype)
        else:
            h = initial_state

        outputs = []
        
        for t in range(seq_length):
            x_t = X[:, t, :]
            h = self.gru_cell(x_t, h)
            out_t = tf.matmul(h, self.W_out) + self.b_out
            outputs.append(tf.expand_dims(out_t, axis=1))
        return tf.concat(outputs, axis=1), h  # [batch_size, seq_length, input_size] and h

# 3. Running the Copy Task
### Train, Test, and Validation Splits

In [37]:
def generate_dataset_splits(sequence_count, input_size, training_length, sequence_length, delimiter, total_delimiters):
    X_train = np.random.randint(0, 10, size=(sequence_count, training_length, input_size)).astype(np.float32)
    X_val = np.random.randint(0, 10, size=(sequence_count, training_length, input_size)).astype(np.float32)
    X_test = np.random.randint(0, 10, size=(sequence_count, sequence_length, input_size)).astype(np.float32)
    delimiters = np.full((sequence_count, total_delimiters, input_size), delimiter, dtype=np.float32)
    
    X_train = np.concatenate([X_train, delimiters], axis=1)
    X_val = np.concatenate([X_val, delimiters], axis=1)
    X_test = np.concatenate([X_test, delimiters], axis=1)
    
    Y_train = X_train.copy()
    Y_val = X_val.copy()
    Y_test = X_test.copy()
    
    return X_train, X_val, X_test, Y_train, Y_val, Y_test

### Training and Validating the Model (Training Loop)

In [38]:
def train_model(model, X_train, Y_train, X_val, Y_val, epochs=10, batch_size=32, lr=0.01):
    X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
    Y_train = tf.convert_to_tensor(Y_train, dtype=tf.float32)
    X_val = tf.convert_to_tensor(X_val, dtype=tf.float32)
    Y_val = tf.convert_to_tensor(Y_val, dtype=tf.float32)

    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    start_time = time.time()
    
    for epoch in range(epochs):
        # Shuffle training data at the start of each epoch.
        indices = tf.range(start=X_train.shape[0])
        indices = tf.random.shuffle(indices)
        X_train = tf.gather(X_train, indices)
        Y_train = tf.gather(Y_train, indices)
        
        epoch_loss = 0
        num_batches = int(np.ceil(X_train.shape[0] / batch_size))

        for i in range(num_batches):
            start = i * batch_size
            end = min((i+1) * batch_size, X_train.shape[0])
            X_batch = X_train[start:end]
            Y_batch = Y_train[start:end]
            
            # Convert one-hot encoded labels to integer class labels
            Y_batch_labels = tf.argmax(Y_batch, axis=-1)  # Shape will be (batch_size, seq_length)
            
            with tf.GradientTape() as tape:
                output, _ = model(X_batch)
                batch_loss = loss_fn(Y_batch_labels, output)
            gradients = tape.gradient(batch_loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            
            epoch_loss += batch_loss.numpy()

        epoch_loss /= num_batches
        
        # Calculate training accuracy
        train_preds = np.argmax(output.numpy(), axis=-1).flatten()
        true_train = Y_batch_labels.numpy().flatten()
        train_accuracy = np.mean(train_preds == true_train)
        
        # Calculate validation loss and accuracy
        val_output, _ = model(X_val)
        val_loss = loss_fn(tf.argmax(Y_val, axis=-1), val_output).numpy()
        
        val_preds = np.argmax(val_output.numpy(), axis=-1)
        true_val = tf.argmax(Y_val, axis=-1).numpy()
        val_accuracy = np.mean(val_preds == true_val)
        
        print(f"Epoch {epoch+1:02d} | Training Loss: {epoch_loss:.4f} | Val Loss: {val_loss:.4f} | "
              f"Training Accuracy: {train_accuracy:.4f} | Val Accuracy: {val_accuracy:.4f}")

### Evaluating the Model (Test Loop)

In [39]:
def test_model(model, X_test, Y_test, sequence_length, training_length=100):
    X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
    Y_test = tf.convert_to_tensor(Y_test, dtype=tf.float32)
    
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    
    test_loss = 0
    hidden_state = None
    num_test_segments = sequence_length // training_length

    all_test_preds = []  # Collect all predictions for final accuracy
    all_true_test = []   # Collect all true labels for final accuracy

    for i in range(num_test_segments):
        start = i * training_length
        end = min((i + 1) * training_length, sequence_length)
        
        X_segment = X_test[:, start:end, :]
        Y_segment = tf.argmax(Y_test[:, start:end, :], axis=-1)

        if hidden_state is None:
            output, hidden_state = model(X_segment)  # First segment
        else:
            output, hidden_state = model(X_segment, initial_state=hidden_state)  # Keep state

        segment_loss = loss_fn(Y_segment, output).numpy()
        test_loss += segment_loss

        test_preds = np.argmax(output.numpy(), axis=-1)
        true_test = Y_segment.numpy()

        # Collect for final accuracy calculation
        all_test_preds.append(test_preds)
        all_true_test.append(true_test)
        
        if num_test_segments > 1:
            segment_accuracy = np.mean(test_preds == true_test)
            print(f"Segment {i + 1}/{num_test_segments} | Loss: {segment_loss:.4f} | Accuracy: {segment_accuracy:.4f}")

    # Final test loss
    test_loss /= num_test_segments

    # Combine all predictions and labels for final accuracy
    all_test_preds = np.concatenate(all_test_preds, axis=1)
    all_true_test = np.concatenate(all_true_test, axis=1)
    test_accuracy = np.mean(all_test_preds == all_true_test)
    
    print(f"\nTest Loss: {test_loss:.4f} | Test Accuracy: {test_accuracy:.4f}")
    

### Benchmarking all Models

In [44]:
############################
# Main Run
############################
def run_benchmark():
    vocabulary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    delimiter = 10
    
    # Hyper-parameters used across each model
    sequence_lengths = [100, 200, 500, 1000]
    sequence_length = sequence_lengths[1]
    sequence_count = 100
    total_delimiters = 3 # Adds a few delimiters to make it clear that its the end of the sequence.
    input_size = len(vocabulary) + total_delimiters # Vocabulary tokens + delimiter token
    hidden_size = 128
    training_length = 100
    total_epochs = 5
    batch_size = 32
    learning_rate = 0.01
    
    X_train, X_val, X_test, Y_train, Y_val, Y_test = generate_dataset_splits(sequence_count, input_size, training_length, sequence_length, delimiter, total_delimiters)
    
    models = {
        "Standard LSTM": StandardLSTM(input_size, hidden_size),
        "Multiplicative LSTM": MultiplicativeLSTM(input_size, hidden_size),
        "Standard GRU": StandardGRU(input_size, hidden_size),
        "Multiplicative GRU": MultiplicativeGRU(input_size, hidden_size)
    }
    
    for name, model in models.items():
        print(f"Training {name}...\n")
        train_model(model, X_train, Y_train, X_val, Y_val, total_epochs, batch_size, learning_rate)
        print(f'\nTesting {name}...\n')
        test_model(model, X_test, Y_test, sequence_length, training_length)
        print("=======================================================================================================")





In [43]:
run_benchmark()

Training Standard LSTM...

Epoch 01 | Training Loss: 2.4130 | Val Loss: 2.2149 | Training Accuracy: 0.2015 | Val Accuracy: 0.3774
Epoch 02 | Training Loss: 2.0699 | Val Loss: 1.8415 | Training Accuracy: 0.4684 | Val Accuracy: 0.4499
Epoch 03 | Training Loss: 1.7144 | Val Loss: 1.5069 | Training Accuracy: 0.5704 | Val Accuracy: 0.5714
Epoch 04 | Training Loss: 1.3861 | Val Loss: 1.2453 | Training Accuracy: 0.6286 | Val Accuracy: 0.6388
Epoch 05 | Training Loss: 1.1484 | Val Loss: 1.0717 | Training Accuracy: 0.7015 | Val Accuracy: 0.6760

Testing Standard LSTM...

Segment 1/2 | Loss: 1.1164 | Accuracy: 0.6534
Segment 2/2 | Loss: 1.1015 | Accuracy: 0.6650

Test Loss: 1.1090 | Test Accuracy: 0.6592
Training Multiplicative LSTM...

Epoch 01 | Training Loss: 2.4191 | Val Loss: 2.2694 | Training Accuracy: 0.3228 | Val Accuracy: 0.2249
Epoch 02 | Training Loss: 2.1094 | Val Loss: 1.8909 | Training Accuracy: 0.4442 | Val Accuracy: 0.4253
Epoch 03 | Training Loss: 1.7512 | Val Loss: 1.5586 | Tra

# TODO

- Add accuracy measurement
- Look into vanishing/exploding gradients and how these can be displayed (need this for analysis later)
- Do copy task, figure out what a delimitor is
- Remember, 3 copy tasks need to be ran on each model so that mean accuracy and standard error can be measured
- look into cross entropy??