In [1]:
import numpy as np
import random

## Purpose and idea
Understand how the RNN works and accurately document the code

# Data preparation
First, I will generate words
If it works, I'll try sentences

In [2]:
vocab = 'abcdefghigklmnopqrstuvwxyz '

In [3]:
def find_index(letter, vocab):
    """Return index of given letter in vocabulary."""
    try:
        temp = vocab.index(letter)
    except ValueError:
        temp = -1
    return temp


def word_to_vector(word):
    """Return an array of vectors
    of letters in one-hot representarion.
    """
    vec = np.zeros((len(word), len(vocab), 1))
    for i, letter in enumerate(word):
        vec[i, find_index(letter, vocab)] = 1.0
    return vec


def vector_to_word(vec):
    """Inverse to word_to_vector.
    Reruns a string of letters
    """
    word = [None for _ in range(vec.shape[0])]
    for i in range(vec.shape[0]):
        temp = np.argmax(vec[i])
        word[i] = vocab[temp] 
    return word



In [4]:
def data_reading(file_name):
    """Data reading and converting to vectorised form."""
    x = []
    y = []
    with open(file_name, 'r') as file:
        read_data = file.readline()
        read_data = read_data.lower()
        while(read_data):
            x.append(word_to_vector(read_data[:-1]))
            y.append(word_to_vector(read_data[1:]))
            read_data = file.readline()
            read_data = read_data.lower()
    data = list(zip(x, y))
    random.shuffle(data)
    return data
DATA = data_reading('data/simple_words.txt')

In [5]:
def softmax(z):
    a = np.exp(z)
    return a / np.sum(a)

class CrossEntropyCost():
    """
    Crossentropy cost and derivative with sigmoidal activations.
    """

    def fn(a, y):
        return np.sum(np.nan_to_num(-y * np.log(a)))
    
    def delta(z, a, y):
        return a - y
    
    
class RNNetwork():
    
    def __init__(self, vocab_size, hidden_size, bptt_trunc=4, cost=CrossEntropyCost):
        """
        hidden_size is a number of internal neurons,
        bptt_trunc responsible for the number of BPTT's execution cycles,
        cost is a cost-function of this model.
        
        """
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.bptt_trunc = bptt_trunc
        #weights and internal state initialisation
        self.Wxh = np.random.randn(hidden_size, vocab_size)
        self.Whh = np.random.randn(hidden_size, hidden_size)
        self.Wha = np.random.randn(vocab_size, hidden_size)
        self.h = np.zeros((hidden_size, 1))
        #self.bh = np.zeros((hidden_size, 1))
        #self.ba = np.zeros((vocab_size, 1))
        self.cost = cost
        
    def feedforward(self, x):
        """Return output of network as an array of vectors."""
        time_steps = len(x)
        z = np.zeros((time_steps, self.vocab_size, 1))
        #h = np.zeros((time_steps + 1, self.hidden_size, 1))
        #h[-1] = np.zeros(h[0].shape)
        for t in range(time_steps):
            self.h = np.tanh(self.Wxh @ x[t] + self.Whh @ self.h)# + self.bh)
            """z[t] = self.Wha @ self.h #+ self.ba
        return softmax(z)"""
            z[t] = softmax(self.Wha @ self.h)
        return z
    
    def feedforward_with_data(self, x):
        """
        Return output of network as an array of vectors
        and an array of internal states, in which the
        network was in process.
        """
        time_steps = len(x)
        z = np.zeros((time_steps, self.vocab_size, 1))
        h = np.zeros((time_steps + 1, self.hidden_size, 1))
        h[-1] = np.zeros(h[0].shape)
        for t in range(time_steps):
            self.h = np.tanh(self.Wxh @ x[t] + self.Whh @ self.h)# + self.bh)
            z[t] = softmax(self.Wha @ self.h) #+ self.ba
            h[t] = self.h
        return z, h
    
    def loss(self, x, y):
        """Returns loss on a single example w.r.t. cost-function"""
        return self.cost.fn(self.feedforward(x), y) / len(x)
    
    def total_loss(self, data):
        """Returns loss on a data sample w.r.t. cost-function"""
        n = len(data)
        cost = 0.0
        for (x, y) in data:
            cost += self.loss(x, y)
        return cost / n
    
    def BPTT(self, data, learning_rate=1):
        """
        Training model using backpropogation through time algorithm
        The data is a list of arrays (x, y).
        """
        params = [self.Wxh, self.Whh, self.Wha]
        for (x, y) in data:
            nabla_params = self.backprop(x, y)
            for (param, nabla_param) in zip(params, nabla_params):
                param -= learning_rate * nabla_param
                
    def predict(self, initial, time_steps = 10):
        vectorized = WordToAlphabet(initial)
        for t in range(len(vectorized)):
            self.h = np.tanh(self.Wxh @ vectorized[t] + self.Whh @ self.h)# + self.bh)
        for t in range(time_steps):
            self.h = np.tanh(self.Wxh @ x[t] + self.Whh @ self.h)# + self.bh)
            z[t] = self.Wha @ self.h #+ self.ba
            h[t] = self.h
        return 0
    
    def backprop(self, x, y):
        time_steps = len(x)
        nabla_Wxh = np.zeros(self.Wxh.shape)
        nabla_Whh = np.zeros(self.Whh.shape)
        nabla_Wha = np.zeros(self.Wha.shape)
        y_hat, h = self.feedforward_with_data(x)
        
        delta = np.dot(self.Wha.T, y_hat[time_steps - 1] - y[time_steps - 1]) #delta = dl/dh
        
        for t in range(time_steps):
            nabla_Wha += np.dot(y_hat[t] - y[t], h[t].T)
        for t in range(time_steps - 2, -1, -1): #already have done the last time step
            delta_help = np.dot(self.Wha.T, y_hat[t] - y[t])
            helper = np.diag((np.ones(h[t].shape) - h[t] ** 2)[:, 0])
            #print(helper.shape, delta.shape)
            delta = np.dot(self.Whh.T, helper) @ delta + delta_help
            nabla_Whh += np.dot(helper @ delta, h[t - 1].T)
            nabla_Wxh += np.dot(helper @ delta, x[t - 1].T)
        return [nabla_Wxh, nabla_Whh, nabla_Wha]

In [23]:
net = RNNetwork(len(vocab), 30)

In [24]:
net.BPTT(DATA, 0.00005)
net.total_loss(DATA)

9.670568384313377

In [37]:
for i in range(100,110):
    out, h = net.feedforward_with_data(np.array(DATA[i][0]))
    print(vector_to_word(out), vector_to_word(DATA[i][1]))

['u', 'p', 'v', 'c', 'x', 'c', 'g'] ['r', 'o', 'g', 'r', 'a', 'm', ' ']
['b', 'w', 'b', 'f', 'e', 'p', 'f', 'k', ' '] ['n', 'o', 'w', 'l', 'e', 'd', 'g', 'e', ' ']
['b', 'x', 'x', 'f', 'i', 't', 'c', 'w', 'u', 'b'] ['n', 'd', 'e', 'r', 's', 't', 'a', 'n', 'd', ' ']
[' ', 'q'] ['v', ' ']
[' ', 'f', 'k', 'a', ' ', 't'] ['h', 'e', 'o', 'r', 'y', ' ']
['d', 'x', 'c', 'w', 'b', ' ', 'y', 'o'] ['a', 'm', 'p', 'a', 'i', 'g', 'n', ' ']
['t', 'd', 'd', 'h', ' ', 'p', 'h', 'p'] ['p', 'p', 'r', 'o', 'a', 'c', 'h', ' ']
['t', ' '] ['o', ' ']
['p', ' ', 'y', ' ', 'y', 'l', 'x'] ['e', 'c', 'e', 'i', 'v', 'e', ' ']
['y', 'u', 'b', 'x'] ['o', 's', 't', ' ']


Пока полный бардак, но есть план<br>
Проверить правильность формул BPTT<br>
Построить график loss(% complete)<br>
Сменить базу данных<br>
Добавить biases<br>