In [1]:
import csv
import numpy as np
import random

from numpy.random import randn

#### Data Pre-processing

In [13]:
class Dataset:
    def __init__(self, filename):
        self.vocab_size = 0
        
        # read data from csv and process text to bool.
        self.raw_data = self._read_data_from_csv(filename)
        self.data = self._text_to_boolean(self.raw_data)
        
        # assign index to words.
        self.word_idx = self._assign_word_to_index(self.data)


    def _read_data_from_csv(self, filename):
        """Read raw data from CSV."""

        with open(filename) as file:
            reader = csv.reader(file)
            data = {row[1]:row[2] for row in reader}

        return data
    
    
    def _text_to_boolean(self, data):
        """Read the second column and turn True/False from text to boolean."""

        processed_data = {}

        for text, label in data.items():
            if label == "True":
                processed_data[text] = True
            else:
                processed_data[text] = False

        return processed_data


    def _assign_word_to_index(self, data):
        """Preprocess data by assigning index to words."""
        
        text = list(data.keys())
        
        # get all unique words in this dataset.
        words = []
        
        for t in text:
            word = t.split(" ")
            words.extend(word)
            
        vocabulary = list(set(words))
        
        # assign indices to each word.
        word_to_idx = {w:i for i, w in enumerate(vocabulary)}
        idx_to_word = {i:w for i, w in enumerate(vocabulary)}
        
        output = {
            "word_to_idx": word_to_idx,
            "idx_to_word": idx_to_word,
        }
        
        # update vocabulary size.
        self.vocab_size = len(vocabulary)
        
        return output
    
    
    def one_hot_vector(self, words):
        """Create one-hot vectors from words."""

        inputs = []
        for word in words.split(" "):
            # set all word indices for this vector to 0.
            vector = np.zeros((self.vocab_size, 1))

            # set the current word index as 1.
            vector[word_to_idx[word.lower()]] = 1
            inputs.append(vector)

        return inputs

#### RNN class.

In [16]:
class RNN:
    """Vanilla Recurrent Neural Network."""
    
    def __init__(self, input_size, output_size, hidden_size=64):
        """Initialise weights and bias."""
        
        # weights.
        self.Whh = randn(hidden_size, hidden_size) / 1000
        self.Wxh = randn(hidden_size, input_size) / 1000
        self.Why = randn(output_size, hidden_size) / 1000
        
        # biases.
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))
        
        # previous inputs and hidden states.
        self.last_inputs = []
        self.last_hidden_state = {}
        
        
    def forward(self, inputs):
        """Perform forward pass of RNN with given inputs.
        
        It returns the final output and hidden state.
        Let 'y' be the output, and 'h' be the hidden state.
        """
        
        h = np.zeros((self.Whh.shape[0], 1))
        
        self.last_inputs = inputs
        self.last_hidden_state = {0: h}
        
        # perform each step of RNN.
        for i, x in enumerate(inputs):
            h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh)
            
            self.last_hidden_state[i + 1] = h
            
        # compute output.
        y = self.Why @ h + self.by
            
        return y, h

        
    def backprop(self, d_y, learn_rate=2e-2):
        """Perform back propogation of RNN.
        
        d_y (dL / dy) has a shape (output_size, 1).
        learn_rate is a float.
        """

        n = len(self.last_inputs)
        
        # calculate dl/dWhy and dl/dby.
        d_Why = d_y @ self.last_hidden_state[n].T
        d_by = d_y
        
        # initialise dl/dWhh, dL/dWxh, and dL/dbh to zero.
        d_Whh = np.zeros(self.Whh.shape)
        d_Wxh = np.zeros(self.Wxh.shape)
        d_bh = np.zeros(self.bh.shape)
        
        # Calculate dL/dh for the previous h.
        d_h = self.Why.T @ d_y
        
        # back propagate through time.
        for t in reversed(range(n)):
            # intermediate value: dL/dh * (1 - h^2).
            temp = ((1 - self.last_hidden_state[t+1] ** 2) * d_h)
            
            # dL/db = dL/dh * (1 - h^2).
            d_bh += temp
            
            # dL/dWhh = dl/dh * (1 - h^2) * h_{t-1}.
            d_Whh += temp @ self.last_hidden_state[t].T
            
            # dL/dWxh = dL/dh * (1 - h^2) * x
            d_Wxh += temp @ self.last_inputs[t].T
            
            # Next dL/dh = dL/dh * (1 - h^2) * Whh
            d_h = self.Whh @ temp
            
        # clip to prevent exploding gradients.
        for d in [d_Wxh, d_Whh, d_Why, d_bh, d_by]:
            np.clip(d, -1, 1, out=d)
            
        # update weights and biases using gradient descent.
        self.Whh -= learn_rate * d_Whh
        self.Wxh -= learn_rate * d_Wxh
        self.Why -= learn_rate * d_Why
        self.bh -= learn_rate * d_bh
        self.by -= learn_rate * d_by
        

    def softmax(self, array):
        """Applies softmax to the input array."""

        softmax = np.exp(self.array) / sum(np.exp(self.array))

        return softmax


#### create training dataset.

In [17]:
train_data = Dataset("data/train_data.csv")
test_data = Dataset("data/test_data.csv")

In [55]:
def process_data(data, backprop=True):
    """Returns RNN's loss and accuracy for the given data.
    
    Args:
        data (dict): mapping text to True/False (good/bad).
        backprop (bool): run with back propagation.
    """
    
    items = list(data.items())
    random.shuffle(items)

    loss = 0
    num_correct = 0
    
    for text, label in items:
        inputs = one_hot_vector(text, len(vocab))
        target = int(label)

        # forward pass.
        output, _ = rnn.forward(inputs)
        probability = rnn.softmax(output)
        
        # calculate loss / accuracy.
        loss -= np.log(probability[target])
        num_correct += int(np.argmax(probability) == target)
        
        if backprop:
            # build dL/dy.
            dL_dy = probability
            dL_dy[target] -= 1

            # back propagation.
            rnn.backprop(dL_dy)
        
    return loss/len(data), num_correct/len(data)

In [56]:
# initialise homemade RNN.
rnn = RNN(train_data.vocab_size, 2)

# training loop.
for epoch in range(1000):
    train_loss, train_accuracy = process_data(train_data)
    
    if epoch % 100 == 99:
        print("-- Epoch {}".format(epoch + 1))
        print("Train:\tLoss %.3f | Accuracy: %.3f" % (train_loss, train_accuracy))

        test_loss, test_accuracy = process_data(test_data, backprop=False)
        print("Test:\tLoss %.3f | Accuracy: %.3f" %(test_loss, test_accuracy))

-- Epoch 100
Train:	Loss 0.689 | Accuracy: 0.534
Test:	Loss 0.697 | Accuracy: 0.500
-- Epoch 200
Train:	Loss 0.660 | Accuracy: 0.655
Test:	Loss 0.740 | Accuracy: 0.650
-- Epoch 300
Train:	Loss 0.586 | Accuracy: 0.638
Test:	Loss 0.670 | Accuracy: 0.600
-- Epoch 400
Train:	Loss 0.377 | Accuracy: 0.879
Test:	Loss 0.754 | Accuracy: 0.550
-- Epoch 500
Train:	Loss 0.336 | Accuracy: 0.828
Test:	Loss 0.552 | Accuracy: 0.650
-- Epoch 600
Train:	Loss 0.578 | Accuracy: 0.672
Test:	Loss 0.712 | Accuracy: 0.600
-- Epoch 700
Train:	Loss 0.338 | Accuracy: 0.897
Test:	Loss 0.304 | Accuracy: 0.950
-- Epoch 800
Train:	Loss 0.034 | Accuracy: 1.000
Test:	Loss 0.113 | Accuracy: 0.950
-- Epoch 900
Train:	Loss 0.008 | Accuracy: 1.000
Test:	Loss 0.079 | Accuracy: 0.950
-- Epoch 1000
Train:	Loss 0.004 | Accuracy: 1.000
Test:	Loss 0.028 | Accuracy: 1.000
