In [21]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from functools import reduce
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

import gzip
import os
import pickle
from urllib import request

In [22]:
filename = [
    ["training_images", "train-images-idx3-ubyte.gz"],
    ["test_images", "t10k-images-idx3-ubyte.gz"],
    ["training_labels", "train-labels-idx1-ubyte.gz"],
    ["test_labels", "t10k-labels-idx1-ubyte.gz"]
]


def download_mnist():
  base_url = "http://yann.lecun.com/exdb/mnist/"
  for name in filename:
    print("Downloading " + name[1] + "...")
    request.urlretrieve(base_url + name[1], name[1])
  print("Download complete.")

"""
def mnist_init():
  mnist = {}
  for name in filename[:2]:
    with gzip.open(name[1], 'rb') as f:
        mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28 * 28)
  for name in filename[-2:]:
    with gzip.open(name[1], 'rb') as f:
        mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)

  return mnist
"""

def mnist_init():
    if not os.path.isfile("mnist.pkl"):
        download_mnist()
        save_mnist()
    else:
        print("Dataset already downloaded, delete mnist.pkl if you want to re-download.")

def save_mnist():
    mnist = {}
    for name in filename[:2]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28 * 28)
    for name in filename[-2:]:
        with gzip.open(name[1], 'rb') as f:
            mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8)

    for _, gz_file in filename:
        os.remove(gz_file)

    with open("mnist.pkl", 'wb') as f:
        pickle.dump(mnist, f)
    print("Save complete.")

def mnist_load():
    with open("mnist.pkl", 'rb') as f:
        mnist = pickle.load(f)
    return mnist["training_images"], mnist["training_labels"], mnist["test_images"], mnist["test_labels"]


In [23]:

class Sigmoid:
  def apply(self, x):
    return np.where(x >= 0, 1 / (1 + np.exp(-x)), np.exp(x) / (1 + np.exp(x)))

  def df(self, x):
    y = self.apply(x)
    return y * (1 - y)


class Relu:
  def apply(self, x):
    return np.maximum(0, x)

  def df(self, x):
    return np.where(x <= 0, 0, 1)


class SoftMax:
  def apply(self, x):
    y = np.exp(x - np.max(x, axis=1, keepdims=True))
    return y / np.sum(y, axis=1, keepdims=True)

sigmoid = Sigmoid()
relu = Relu()
softmax = SoftMax()

In [24]:
class FullyConnected:

  def __init__(self, size, activation):
    self.size = size
    self.activation = activation
    self.is_softmax = isinstance(self.activation, SoftMax)
    self.cache_aprev,self.cache_z,self.cache_a=None,None,None
    self.w = None
    self.b = None

  def init(self, in_dim):
    # He initialization
    self.w = np.random.randn(self.size, in_dim) * np.sqrt(2 / in_dim)
    self.b = np.zeros((1, self.size))
    print("in",in_dim)
    print("in",self.size)

  def forward(self, a_prev, training):
    z = np.dot(a_prev, self.w.T) + self.b
    a = self.activation.apply(z)

    if training:
      self.cache_aprev,self.cache_z,self.cache_a= a_prev, z,a

    return a

  def backward(self, da):
    a_prev, z,a = self.cache_aprev,self.cache_z,self.cache_a
    batch_size = a_prev.shape[0]

    if self.is_softmax:
      y = da * (-a)
      dz = a - y
    else:
      dz = da * self.activation.df(z)

    dw = 1 / batch_size * np.dot(dz.T, a_prev)
    db = 1 / batch_size * dz.sum(axis=0, keepdims=True)
    da_prev = np.dot(dz, self.w)

    return da_prev, dw, db

  def update_params(self, dw, db):
    self.w -= dw
    self.b -= db

  def get_params(self):
    return self.w, self.b

  def get_output_dim(self):
    return self.size


In [25]:
epsilon = 1e-20

class SoftmaxCrossEntropy:
  def apply(self, a_last, y):
    batch_size = y.shape[0]
    cost = -1 / batch_size * (y * np.log(np.clip(a_last, epsilon, 1.0))).sum()
    return cost
  """
  def grad(self, a_last, y):
    return a_last-y
  """
  def grad(self, a_last, y):
    return - np.divide(y, np.clip(a_last, epsilon, 1.0))  


class SigmoidCrossEntropy:
    def apply(self, a_last, y):
        batch_size = y.shape[0]
        # It would be better to have the logits and use this instead
        # max(logits, 0) - logits * y + log(1 + exp(-abs(logits)))
        a_last = np.clip(a_last, epsilon, 1.0 - epsilon)
        cost = -1 / batch_size * (y * np.log(a_last) + (1 - y) * np.log(1 - a_last)).sum()
        return cost

    def grad(self, a_last, y):
        a_last = np.clip(a_last, epsilon, 1.0 - epsilon)
        return - (np.divide(y, a_last) - np.divide(1 - y, 1 - a_last))

softmax_cross_entropy = SoftmaxCrossEntropy()
sigmoid_cross_entropy = SigmoidCrossEntropy()

In [26]:
class GradientDescent:
  def __init__(self, trainable_layers):
    self.trainable_layers = trainable_layers

  def initialize(self):
    pass

  def update(self, learning_rate, w_grads, b_grads, step):
    for layer in self.trainable_layers:
      layer.update_params(dw=learning_rate * w_grads[layer],db=learning_rate * b_grads[layer])

gradient_descent = GradientDescent

In [27]:
class NeuralNetwork:
  def __init__(self, input_dim, layers, cost_function, optimizer=gradient_descent):
    self.layers = layers
    self.w_grads = {}
    self.b_grads = {}
    self.cost_function = cost_function
    self.optimizer = optimizer

    self.layers[0].init(input_dim)

    for prev_layer, curr_layer in zip(self.layers, self.layers[1:]):
      curr_layer.init(prev_layer.get_output_dim())

    self.trainable_layers = set(layer for layer in self.layers if layer.get_params() is not None)
    self.optimizer = optimizer(self.trainable_layers)
    self.optimizer.initialize()

  def forward_prop(self, x, training=True):
    a = x
    for layer in self.layers:
      a = layer.forward(a, training)
    return a
  
  def backward_prop(self, a_last, y):
    da=self.cost_function.grad(a_last, y)
    batch_size = da.shape[0]
    
    for layer in reversed(self.layers):
      da_prev, dw, db = layer.backward(da)
      
      if layer in self.trainable_layers:
        self.w_grads[layer] = dw
        self.b_grads[layer] = db

      da = da_prev
  
  def predict(self, x):
    a_last = self.forward_prop(x, training=False)
    return a_last
  
  def update_param(self, learning_rate, step):
    self.optimizer.update(learning_rate, self.w_grads, self.b_grads, step)
  
  def compute_cost(self, a_last, y):
    cost = self.cost_function.apply(a_last, y)
    return cost

  
  def train_step(self, x_train, y_train, learning_rate, step):
    a_last = self.forward_prop(x_train, training=True)
    self.backward_prop(a_last, y_train)
    self.update_param(learning_rate, step)
    
  def train(self, x_train, y_train, learning_rate, num_epochs, test_data):
    x_test, y_test = test_data
    costs=[]
    costs_test=[]
    accuracyTrain=[]
    accuracyTest=[]
    step = 0
    for e in range(num_epochs):
      print("Epoch " + str(e + 1))
      epoch_cost = 0

      step += 1
      self.train_step(x_train, y_train, learning_rate, step)
      
      costTest,accuracy_test=self.validate_model(x_test, y_test)
      costs_test.append(costTest)
      accuracyTest.append(accuracy_test)

      cost,accuracy_train=self.validate_model(x_train, y_train)
      costs.append(cost)
      accuracyTrain.append(accuracy_train)
      
      print(f"\nCost after epoch {e+1}: test= {costTest} , train = {cost}")
      
      print(f"Accuracy on test set: {accuracy_test}")
      print("Finished training")
    self.visualizeCost(costs,costs_test)
    self.visualizeAccuracy(accuracyTrain,accuracyTest)

  def validate_model(self,x, y):
    cost=0
    accuracy=0
    size=x.shape[0]
    prediction=self.predict(x)#argmax no one hot
    cost+=self.cost_function.apply(prediction, y)
    accuracy+=np.sum(np.argmax(prediction, axis=1) == np.argmax(y, axis=1))
    accuracy=accuracy/size
    return cost,accuracy
  
  def visualizeCost(self,costs,costs_test):
    plt.plot(costs)
    plt.plot(costs_test)
    plt.ylabel('cost')
    plt.legend(["train", "test"])
    plt.show()
  
  def visualizeAccuracy(self,accuracyTrain,accuracyTest):
    plt.plot(accuracyTrain)
    plt.plot(accuracyTest)
    plt.ylabel('accuracy')
    plt.legend(["train", "test"])
    plt.show()
  



  

In [28]:
def one_hot(x, num_classes=10):
  out = np.zeros((x.shape[0], num_classes))
  out[np.arange(x.shape[0]), x[:, 0]] = 1
  return out

def preprocess(x_train, y_train, x_test, y_test):
  x_train = x_train.reshape(x_train.shape[0], 28* 28).astype(np.float32)
  x_test = x_test.reshape(x_test.shape[0], 28* 28).astype(np.float32)
  y_train = one_hot(y_train.reshape(y_train.shape[0], 1))
  y_test = one_hot(y_test.reshape(y_test.shape[0], 1))
  x_train /= 255
  x_test /= 255
  return x_train, y_train, x_test, y_test

mnist_init()
x_train, y_train, x_test, y_test= preprocess(*mnist_load())

Dataset already downloaded, delete mnist.pkl if you want to re-download.


In [None]:

#conv c_filter_size,c_channels,c_filters,c_stride,c_pad,activation=Relu()
cnn = NeuralNetwork(
    input_dim=(x_train.shape[1]),
    layers=[
            FullyConnected(128, relu),
            FullyConnected(64, relu),
            FullyConnected(10, softmax)      
        ],
        cost_function=softmax_cross_entropy,
        optimizer=gradient_descent
        
    )

cnn.train(x_train, y_train,
          learning_rate=0.01,
          num_epochs=100,
          test_data=(x_test, y_test))



In [30]:
pred_test=np.argmax(cnn.predict(x_test),axis=1)
print(classification_report(np.argmax(y_test, axis=1), pred_test))

              precision    recall  f1-score   support

           0       0.68      0.90      0.77       980
           1       0.66      0.98      0.79      1135
           2       0.76      0.69      0.72      1032
           3       0.87      0.48      0.62      1010
           4       0.50      0.57      0.53       982
           5       0.65      0.33      0.44       892
           6       0.79      0.80      0.80       958
           7       0.74      0.34      0.46      1028
           8       0.50      0.71      0.59       974
           9       0.54      0.62      0.58      1009

    accuracy                           0.65     10000
   macro avg       0.67      0.64      0.63     10000
weighted avg       0.67      0.65      0.63     10000



In [31]:
"""
def preprocess1(x_train, y_train, x_test, y_test):
  x_train, x_dev, y_train, y_dev = train_test_split(x_train, y_train, test_size=0.15)
  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype(np.float32)
  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1).astype(np.float32)
  x_dev = x_dev.reshape(x_dev.shape[0], 28, 28, 1).astype(np.float32)
  x_train /= 255
  x_test /= 255
  x_dev /= 255
  return x_train, y_train, x_test, y_test,x_dev,y_dev

mnist_init()
x_train, y_train, x_test, y_test,x_dev,y_dev = preprocess1(*mnist_load())
pred_train=np.argmax(cnn.predict(x_train),axis=1)
print(classification_report(y_train, pred_train))
"""

'\ndef preprocess1(x_train, y_train, x_test, y_test):\n  x_train, x_dev, y_train, y_dev = train_test_split(x_train, y_train, test_size=0.15)\n  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype(np.float32)\n  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1).astype(np.float32)\n  x_dev = x_dev.reshape(x_dev.shape[0], 28, 28, 1).astype(np.float32)\n  x_train /= 255\n  x_test /= 255\n  x_dev /= 255\n  return x_train, y_train, x_test, y_test,x_dev,y_dev\n\nmnist_init()\nx_train, y_train, x_test, y_test,x_dev,y_dev = preprocess1(*mnist_load())\npred_train=np.argmax(cnn.predict(x_train),axis=1)\nprint(classification_report(y_train, pred_train))\n'