In [9]:
import numpy as np
from numpy.random import randn
from scipy.special import expit as sigmoid

from data.sentiment_dataset import train_data, test_data

In [6]:
vocab = list(set([word for text in train_data.keys() for word in text.split(' ')]))
vocab_size = len(vocab)
print(vocab_size)

18


In [7]:
word_to_idx = {w: i for i, w in enumerate(vocab)}
idx_to_word = {i: w for i, w in enumerate(vocab)}
print(word_to_idx['good'])
print(idx_to_word[0])

8
was


In [71]:
def createInputs(text):
    '''
    Returns a list of one-hot vectors representing the words 
    in the input text string.
    - @param {string} text
    - @returns {list} one-hot vector with a shape of (vocab_size, 1)
    '''
    inputs = []
    for word in text.split(' '):
        v = np.zeros((vocab_size, 1))
        v[word_to_idx[word]] = 1
        inputs.append(v)
    return inputs

In [72]:
class RNN:
    # A Vanilla Recurrent Neural Network
    # ht = tanh(WxhXt + WhhHt-1 + Bh)
    # yt = WhyHt + By

    def __init__(self, input_size, output_size, hidden_size=64):
        # Weights
        self.hidden_size = hidden_size
        self.Whh = randn(hidden_size, hidden_size) / 1000
        self.Wxh = randn(hidden_size, input_size) / 1000
        self.Why = randn(output_size, hidden_size) / 1000

        # Bias
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))

    def forward(self, inputs):
        """
        Perform a forward pass of the RNN using the given inputs.
        Returns the final output and hidden state.
        - @param {list} inputs - list of one-hot vectors with shape (input_size, 1)
        """
        h = np.zeros((self.hidden_size, 1))

        self.inputs = inputs
        self.hs = {0: h}

        # Perform each step of the RNN
        for i, x in enumerate(inputs):
            h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh)
            self.hs[i + 1] = h

        # Compute the output
        y = self.Why @ h + self.by

        return y, h

    def backprop(self, dy, learn_rate=2e-2):
        """
        Perform a back propogation to update the weights
        - @param{list} dy the gradient (dL/dy) received for each output with a shape of (output_size, 1)
        - @param{float} learn_rate
        """
        n = len(self.inputs)

        # calculate dL/dWhy and dL/dby from the linear output
        d_Why = dy @ self.hs[n].T
        d_by = dy

        # Initialize dL/dWhh, dL/dWxh, dL/dbh to zero.
        d_Whh = np.zeros(self.Whh.shape)
        d_Wxh = np.zeros(self.Wxh.shape)
        d_bh = np.zeros(self.bh.shape)

        # calculate dL/dh for the last h.
        d_h = self.Why.T @ dy

        for t in reversed(range(n)):
            # An intermediate value: dL/dh * (1 - h^2)
            temp = (1 - self.hs[t + 1] ** 2) * d_h

            # dL/db = dL/dh * (1 - h^2)
            d_bh += temp
            # dL/dWhh = dL/dh * (1 - h^2) * h_{t-1}
            d_Whh += temp @ self.hs[t].T
            # dL/dWxh = dL/dh * (1 - h^2) * x
            d_Wxh += temp @ self.inputs[t].T
            # Next dL/dh = dL/dh * (1 - h^2) * Whh
            d_h = self.Whh @ temp

        # Clip to prevent exploding gradients.
        for d in [d_Wxh, d_Whh, d_bh, d_by]:
            np.clip(d, -1, 1, out=d)

        # Update the weights and biases using gradent descent
        self.Whh -= learn_rate * d_Whh
        self.Wxh -= learn_rate * d_Wxh
        self.Why -= learn_rate * d_Why
        self.bh -= learn_rate * d_bh
        self.by -= learn_rate * d_by

In [73]:
def softmax(x):
    return np.exp(x) / sum(np.exp(x))

In [74]:
# Initialize the RNN
rnn = RNN(vocab_size, 2)

inputs = createInputs('i am very good')
out, h = rnn.forward(inputs)
probs = softmax(out)

In [75]:
from pprint import pprint

pprint(probs)

array([[0.50000387],
       [0.49999613]])


In [76]:
import random


def processData(data, backprop=True):
    items = list(data.items())
    random.shuffle(items)

    loss = 0
    num_correct = 0

    for x, y in items:
        inputs = createInputs(x)
        target = int(y)

        # forward
        out, _ = rnn.forward(inputs)
        probs = softmax(out)

        # calculate loss / accuracy
        loss -= np.log(probs[target])[0]
        num_correct += int(np.argmax(probs) == target)

        if backprop:
            # build dL/dy
            # dL/dy = pi - y = pi - 1 if i==c else pi
            dl_dy = probs
            dl_dy[target] -= 1

            rnn.backprop(dl_dy)

    return loss / len(data), num_correct / len(data)

In [78]:
for epoch in range(1000):
    train_loss, train_acc = processData(train_data)

    if epoch % 100 == 99:
        print(f"-- Epoch {epoch + 1}")
        print(f"Train Loss: {train_loss:.3f} | Accuracy: {train_acc:.3f}")

        test_loss, test_acc = processData(test_data, backprop=False)
        print(f"Test Loss: {test_loss:.3f} | Accuracy: {test_acc:.3f}")

-- Epoch 100
Train Loss: 0.002 | Accuracy: 1.000
Test Loss: 0.568 | Accuracy: 0.950
-- Epoch 200
Train Loss: 0.002 | Accuracy: 1.000
Test Loss: 0.580 | Accuracy: 0.950
-- Epoch 300
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.589 | Accuracy: 0.950
-- Epoch 400
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.598 | Accuracy: 0.950
-- Epoch 500
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.605 | Accuracy: 0.950
-- Epoch 600
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.612 | Accuracy: 0.950
-- Epoch 700
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.619 | Accuracy: 0.950
-- Epoch 800
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.625 | Accuracy: 0.950
-- Epoch 900
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.631 | Accuracy: 0.950
-- Epoch 1000
Train Loss: 0.001 | Accuracy: 1.000
Test Loss: 0.636 | Accuracy: 0.950


In [79]:
inputs = createInputs('i am very good')
out, h = rnn.forward(inputs)
probs = softmax(out)
pprint(probs)

array([[0.0025818],
       [0.9974182]])
