In [13]:
import numpy as np
from tqdm import tqdm

In [67]:
class rnn:
  def __init__(self, input_nodes, hidden_nodes, output_nodes):
    # since the dataset contains only 2 distinct values {0, 1}
    self.input_nodes = input_nodes
     # I like two
    self.hidden_nodes = hidden_nodes
    # I am not explaining this
    self.output_nodes = output_nodes
    # weights connecting input to hidden layer
    self.hi = np.random.randn(self.hidden_nodes, self.input_nodes)*0.01
    # weights connecting hidden to hidden layer, the recurrent connection
    self.hh = np.random.randn(self.hidden_nodes, self.hidden_nodes)*0.01
    # weights connecting hidden to output layer
    self.oh = np.random.randn(self.output_nodes, self.hidden_nodes)*0.01
    # hidden layer bias
    self.b1 = np.zeros((self.hidden_nodes, ))
    # output layer bias
    self.b2 = np.zeros((self.output_nodes, ))

  '''
  X: 2d numpy array of shape (no. of examples in dataset, input nodes)
  H_prev: prev timestep hidden state values
  h: 2d numpy array of shape (no. of examples in dataset, hidden nodes)
  o: 2d numpy array of shape (no. of examples in dataset, output nodes)
  '''
  def forward(self, X, H_prev):
    h = np.matmul(X, self.hi.T) + np.matmul(H_prev, self.hh.T) + self.b1
    h = np.tanh(h)
    o = np.matmul(h, self.oh.T) + self.b2
    o = 1 / (1 + np.exp(-o)) # sigmoid of o
    return h, o

  def error(self, Y, O):
    N = O.shape[1] # number of examples
    err = ((Y - O[-1])**2).sum()/ N
    return err

  def backward(self, T, X, Y, O, H):
    N = O.shape[1] # number of examples
    delta = np.zeros((N, self.hidden_nodes)) # (number of examples, hidden nodes)

    delta = -2*(Y - O[T-1])*O[T-1]*(1 - O[T-1]) # (number of examples, output nodes)
    self.doh += np.matmul(delta.T, H[T-1])/ N
    self.db2 += delta.T.sum(axis = 1)/ N
    delta = np.matmul(delta, self.oh) # (number of examples, hidden nodes)
    
    for t in range(T-1, -1, -1):
      delta *= (1 - H[t]**2)
      self.dhh += np.matmul(delta.T, H[t-1])/ N
      self.dhi += np.matmul(delta.T, X[t]) / N
      self.db1 += delta.T.sum(axis = 1)/ N

      delta = np.matmul(delta, self.hh) # (number of examples, hidden nodes)

  def clip_gradients(self):
    self.doh.clip(-1, 1, out = self.doh)
    self.dhh.clip(-1, 1, out = self.dhh)
    self.dhi.clip(-1, 1, out = self.dhi)
    self.db1.clip(-1, 1, out = self.db1)
    self.db2.clip(-1, 1, out = self.db2)

  def zero_gradients(self):
    self.doh = np.zeros((self.output_nodes, self.hidden_nodes))
    self.dhh = np.zeros((self.hidden_nodes, self.hidden_nodes))
    self.dhi = np.zeros((self.hidden_nodes, self.input_nodes))
    self.db1 = np.zeros((self.hidden_nodes, ))
    self.db2 = np.zeros((self.output_nodes, ))

  def update_parameters(self, alpha):
    self.oh -= (alpha*self.doh)
    self.hh -= (alpha*self.dhh)
    self.hi -= (alpha*self.dhi)
    self.b1 -= (alpha*self.db1)
    self.b2 -= (alpha*self.db2)

In [5]:
import numpy as np
from urllib import request
import gzip
import pickle

filename = [
["training_images","train-images-idx3-ubyte.gz"],
["test_images","t10k-images-idx3-ubyte.gz"],
["training_labels","train-labels-idx1-ubyte.gz"],
["test_labels","t10k-labels-idx1-ubyte.gz"]
]

def download_mnist():
    base_url = "http://yann.lecun.com/exdb/mnist/"
    for name in filename:
        print("Downloading "+name[1]+"...")
        request.urlretrieve(base_url+name[1], name[1])
    print("Download complete.")

def save_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28*28)
    for name in filename[-2:]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)
    with open("mnist.pkl", 'wb') as f:
        pickle.dump(mnist,f)
    print("Save complete.")

def init():
    download_mnist()
    save_mnist()

def load():
    with open("mnist.pkl",'rb') as f:
        mnist = pickle.load(f)
    return mnist["training_images"], mnist["training_labels"], mnist["test_images"], mnist["test_labels"]

if __name__ == '__main__':
    init()

Downloading train-images-idx3-ubyte.gz...
Downloading t10k-images-idx3-ubyte.gz...
Downloading train-labels-idx1-ubyte.gz...
Downloading t10k-labels-idx1-ubyte.gz...
Download complete.
Save complete.


In [None]:
T = 28 # time steps
x_train, y_train, x_test, y_test = load()
N = 60000 # no. of training examples
N_TEST = 10000 # no. of testing examples

INPUT_NODES = 28
OUTPUT_NODES = 10
HIDDEN_NODES = 256

BATCH_SIZE = 60
BATCHES = int(N/BATCH_SIZE)

if __name__ == "__main__":
    # dataset preparation
    X = x_train[:N].reshape(N, T, INPUT_NODES)/ 255
    Y = np.zeros((N, OUTPUT_NODES))
    for i in range(N):
        temp = np.zeros((OUTPUT_NODES, ))
        temp[y_train[i]] = 1
        Y[i] = temp
    
    model = rnn(INPUT_NODES, HIDDEN_NODES, OUTPUT_NODES)
    
#---------------TRAINING THE MODEL---------------------------------------------------------------
    EPOCHS = 500
    ALPHA = 0.001
    for epoch in range(EPOCHS):
        # shuffle the examples
        perm = np.random.permutation(len(X))
        X = X[perm]
        Y = Y[perm]
        loss = 0.
        
        for batch in range(BATCHES):
            H = np.zeros((T+1, BATCH_SIZE, model.hidden_nodes))
            O = np.zeros((T, BATCH_SIZE, model.output_nodes))

            X_B = X[batch*BATCH_SIZE:batch*BATCH_SIZE + BATCH_SIZE].transpose(1,0,2)
            Y_B = Y[batch*BATCH_SIZE:batch*BATCH_SIZE + BATCH_SIZE]
            
            for t in range(T):
                H[t], O[t] = model.forward(X_B[t], H[t-1])

            # ensure gradients are initialised to zero
            model.zero_gradients()
        
            # compute gradients
            model.backward(T, X_B, Y_B, O, H)
        
            # clip the gradients before clipping
            model.clip_gradients()
        
            # update parameters
            model.update_parameters(ALPHA)
        
            # print error after each epoch
            if batch%50 == 0:
                print(f"Error after epoch {epoch}, batch {batch}: {model.error(Y_B, O)}", O.max(), O.min(), H.max(), H.min())
            loss += (model.error(Y_B, O))/ BATCHES

        print(f"Error after epoch {epoch}: {loss}")

  #----------------------------------------------------------------------------------------------

Error after epoch 0, batch 0: 2.5000016664254425 0.5039698123604776 0.49651072014293113 0.13938390685832136 -0.12940840040156912
Error after epoch 0, batch 50: 2.479563451891528 0.5014570136256268 0.49387623192117003 0.12872146941474968 -0.11796335236333841
Error after epoch 0, batch 100: 2.459449619075475 0.4990166115463357 0.49160669952982317 0.12119285433288202 -0.11902972107201065
Error after epoch 0, batch 150: 2.4394436828844177 0.4966484748522942 0.48856503516842936 0.12960517001076566 -0.12189716122555781
Error after epoch 0, batch 200: 2.419904546841768 0.4940530935719061 0.48589034415183624 0.1371800906614768 -0.1213270625567633
Error after epoch 0, batch 250: 2.400334109287284 0.4915886266169775 0.4831964782649198 0.13826237721305762 -0.13322775762073413
Error after epoch 0, batch 300: 2.3812209851486466 0.4887984956037161 0.48057865411649314 0.14523059965419588 -0.13015649271168836
Error after epoch 0, batch 350: 2.362464893831596 0.48627143395439937 0.4781337338005475 0.12

In [60]:
# testing the model
print("Testing the model")
N_TEST = 10000
# dataset preparation
X_TEST = x_test[:N_TEST].reshape(N_TEST, T, INPUT_NODES)/ 255
X_TEST = X_TEST.transpose(1, 0, 2)
Y_TEST = y_test[:N_TEST].T
# pp = rnn(INPUT_NODES, HIDDEN_NODES, OUTPUT_NODES)

# H_TEST = np.zeros((T+1, N_TEST, model.hidden_nodes))
# O_TEST = np.zeros((T, N_TEST, model.output_nodes))

# for t in tqdm(range(T), desc = f"Running forward pass.."):
#     H_TEST[t], O_TEST[t] = pp.forward(X_TEST[t], H_TEST[t-1])

correct_ans = (np.argmax(O_TEST[-1], axis = 1) == Y_TEST).sum()
accuracy = (correct_ans/ N_TEST) * 100
print(correct_ans)
print(f"Test Accuracy: {accuracy}")
# print(np.argmax(O_TEST[-1], axis = 1)[0])
# print(Y_TEST[:100])
# print(O_TEST[-1][:3])
print(H_TEST[4][4] == H_TEST[4][5])
# print(Y_TEST[:2])
# print(X_TEST[1][] == X_TEST[][])

Testing the model
942
Test Accuracy: 9.42
[ True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  True  True  True  True  True  True  True  True  True  True  True
  True  T