In [9]:
# simplest model, performing very poorly

import numpy as np
import pandas as pd
import time
from utils import encoder_cell, decoder_cell, softmax

np.random.seed(7)
tic = time.perf_counter()

def accuracy(ytest, ypred):
    return float(np.sum(np.all(ytest == ypred, axis=1))/ float(ytest.shape[0]))

def accuracy_(ytest, ypred):
    """
    This function calculate how many right characters are correct.
    """
    return float(np.sum(ytest == ypred)/ float(ytest.size))

class seq2seq():
    def __init__(self, X_shape, Y_shape, H=50, lr=0.0001):
        
        self.encoder = []
        self.decoder = []
        # X is of shape (n_samples, seq_length, onehot_dim)
        self.n_samples, self.n_encoder_cells, self.D = X_shape
        _, self.n_decoder_cells, self.n_classes = Y_shape
        # in this toy example, the onehot_dim is the n_classes
        self.H = H 

        Wx = np.random.randn(self.D, self.H)
        Wh = np.random.randn(self.H, self.H)
        Wy = np.random.randn(self.H, self.n_classes)

        self.Wy = Wy
        self.h_init = np.random.randn(self.n_samples, self.H)

        for i in range(self.n_encoder_cells):
            self.encoder.append(encoder_cell(Wx, Wh))
        for i in range(self.n_decoder_cells):
            self.decoder.append(decoder_cell(Wh, Wy))
        self.lr = lr

    def feed_data(self, X, Y):
        self.X = X 
        self.Y = Y

    def feed_forward(self):
        # input in reversed time order
        self.y_prob = np.zeros(self.Y.shape) 
        h = self.h_init
        for i, cell in enumerate(self.encoder):
            h = cell.feed_forward(np.flip(self.X, axis=1)[:, i, :], h)
        for i, cell in enumerate(self.decoder):
            h = cell.feed_forward(h) 
            self.y_prob[:, i, :] = softmax(np.dot(h, self.Wy)) 

    def back_propagation(self):
        """
        L = L_0 + ... + L_{n_decoder_cells}
        """
        # dLt/dWy = np.dot(h^t.T, y_prob[:, i, :] - Y[:, i, :])
        dWy = 0
        dh_last = 0 # here h_last is the final hidden vector of the encoder.
        decoder_dWh = []
        for i in np.arange(self.n_decoder_cells):
            
            dLt_dWy = np.dot(self.decoder[i].h_output.T, self.y_prob[:, i, :] - self.Y[:, i, :])
            dWy += dLt_dWy

            dLt_dht = np.dot(self.y_prob[:, i, :] - self.Y[:, i, :], self.Wy.T)
            gradient = dLt_dht
            for cell in reversed(self.decoder[:i+1]):
                gradient = cell.back_propagation(gradient)
            dh_last += gradient # gradient is indeed dLt/dh_last

            dLt_dWh = 0 # This is gonna be the graident dLt over dWh for the decoder 
            for cell in self.decoder[:i+1]:
                dLt_dWh += cell.dWh
            decoder_dWh.append(dLt_dWh)
        
        # dL/db_y = \sum_axis=0 y_prob - y
        #self.dL_dby = np.sum(self.y_prob - self.y, axis=0) 
        # dL/dh_last = np.dot(y_prob - y, W_hy.T)
            
        dh = dh_last
        # do the backpropagation for the encoder
        for cell in reversed(self.encoder):
            dh = cell.back_propagation(dh)

        dWx = 0
        dWh = sum(decoder_dWh)  
        for cell in self.encoder:
            dWx += cell.dWx
            dWh += cell.dWh

        for cell in self.encoder:
            cell.Wx -= self.lr * dWx
            cell.Wh -= self.lr * dWh

        for cell in self.decoder:
            cell.Wh -= self.lr * dWh 
            cell.Wy -= self.lr * dWy

    def predict(self, Xtest):
        # Xtest has the same shape as that of X
        y_prob = np.zeros(Xtest.shape) 
        h = self.h_init
        for i, cell in enumerate(self.encoder):
            h = cell.feed_forward(np.flip(Xtest, axis=1)[:, i, :], h)
        for i, cell in enumerate(self.decoder):
            h = cell.feed_forward(h) 
            y_prob[:, i, :] = softmax(np.dot(h, self.Wy))
        return np.argmax(y_prob, axis = 2)

    def cross_entropy_loss(self):
        """
            calculate loss after doing feed forward.
        """
        # L = \sum_t Lt
        # calculate y_prob
        loss = 0
        for i in np.arange(self.n_decoder_cells):
            loss += -np.sum(self.Y[:, i, :]*np.log(self.y_prob[:, i, :] + 1e-6))
        return loss

from sklearn import preprocessing
X = np.loadtxt('X6.txt', delimiter= ' ').astype(int)
idx = np.arange(216)
np.random.shuffle(idx)
X=X[idx]

#Y = np.flip(X, axis=1)
Y = (X+1)%3

lb = preprocessing.LabelBinarizer()
lb.fit(range(6))

X_ohe = np.zeros((216, 3, 6))
Y_ohe = np.zeros((216, 3, 6))
for i in range(216):
    X_ohe[i] = lb.transform(X[i])
    Y_ohe[i] = lb.transform(Y[i])

batch_size = 1

toy_seq2seq = seq2seq(X_shape=(batch_size, 3, 6), Y_shape=(batch_size, 3, 6), H = 10, lr=0.0001)
epochs = 500
for i in range(epochs):
    score = 0
    loss = 0
    for j in range(int(180/batch_size)):
        toy_seq2seq.feed_data(X_ohe[batch_size*j:batch_size*(j+1)], Y_ohe[batch_size*j:batch_size*(j+1)])
        toy_seq2seq.feed_forward()
        toy_seq2seq.back_propagation()
        y_pred = toy_seq2seq.predict(X_ohe[batch_size*j:batch_size*(j+1)])
        score += accuracy_(y_pred, Y[batch_size*j:batch_size*(j+1)])
        loss += toy_seq2seq.cross_entropy_loss()
    score = score/int(180/batch_size)

    if ((i + 1) % 20 == 0):
        print('epoch = {}, current loss = {}, train accuracy = {:.2f}%'.format(i+1, loss, 100*score))
        #print('epoch = {}, current loss = {}'.format(i+1, toy_seq2seq.cross_entropy_loss()))

toc = time.perf_counter()
print('Totol time: {:.2f}s'.format(toc-tic))
print('===============================Finish===================================')

epoch = 20, current loss = 654.6354406911717, train accuracy = 58.70%
epoch = 40, current loss = 442.52578313574674, train accuracy = 69.44%
epoch = 60, current loss = 382.85562757549997, train accuracy = 71.85%
epoch = 80, current loss = 345.1368292507025, train accuracy = 73.52%
epoch = 100, current loss = 317.26583164288934, train accuracy = 77.04%
epoch = 120, current loss = 294.53033442283936, train accuracy = 79.26%
epoch = 140, current loss = 274.4854622350271, train accuracy = 80.00%
epoch = 160, current loss = 254.57884718993662, train accuracy = 81.48%
epoch = 180, current loss = 234.77639008827794, train accuracy = 84.44%
epoch = 200, current loss = 212.91580741708577, train accuracy = 87.22%
epoch = 220, current loss = 190.43668778184426, train accuracy = 90.74%
epoch = 240, current loss = 166.04552016215723, train accuracy = 92.04%
epoch = 260, current loss = 143.79490227760144, train accuracy = 93.33%
epoch = 280, current loss = 126.54791076751172, train accuracy = 94.63%

In [10]:
Xtest_ohe = X_ohe[180:]
Ytest = Y[180:]
test_score = 0
for i in range(36):
    test_score+=accuracy_(toy_seq2seq.predict(Xtest_ohe[i*batch_size:(i+1)*batch_size]), Ytest[i*batch_size:(i+1)*batch_size])
    #print(test_score)
test_score/36

0.9629629629629631

In [43]:
toy_seq2seq.predict(Xtest_ohe)[:10]

array([[0, 2, 1],
       [0, 0, 1],
       [2, 0, 1],
       [2, 0, 0],
       [2, 1, 2],
       [1, 0, 2],
       [1, 2, 0],
       [2, 1, 2],
       [2, 1, 2],
       [0, 0, 1]])

In [44]:
X[180:190]

array([[0, 5, 4],
       [3, 0, 4],
       [5, 0, 1],
       [5, 0, 3],
       [4, 1, 2],
       [1, 3, 2],
       [1, 5, 3],
       [1, 1, 2],
       [5, 1, 2],
       [0, 3, 1]])