In [11]:
import pandas as pd
import numpy as np
from mlxtend.data import loadlocal_mnist
import math

np.random.seed(0)

alpha = 0.01
INIT_W = 0.01


dbg = False


In [12]:
def read_mnist_train():
    x, y = loadlocal_mnist(images_path='MNIST/train-images.idx3-ubyte',
            labels_path='MNIST/train-labels.idx1-ubyte')
    return x, y


In [13]:
def read_mnist_test():
    x, y = loadlocal_mnist(images_path='MNIST/t10k-images.idx3-ubyte',
            labels_path='MNIST/t10k-labels.idx1-ubyte')
    return x, y

In [14]:
"""
z = wx + b
"""
class FullyConnected:
    def __init__(self, out_dim):
        self.out_dim = out_dim
        self.a = None
        self.w = None
        self.b = None
        self.z = None
        self.dw = None
        self.db = None

    def forward(self, a):
        if self.w is None or self.b is None:
            self.w = np.random.randn(self.out_dim, a.shape[0])*INIT_W
            self.b = np.zeros((self.out_dim, 1))

        self.a = a
        self.z = np.matmul(self.w, a) + self.b

        if dbg:
            print("fc_forward: ")
            print(self.z.shape)

        return self.z

    def backward(self, dz):
        m = dz.shape[1]
        self.dw = np.matmul(dz, self.a.T)/m
        self.db = np.sum(dz, axis=1, keepdims=True)/m
        da = np.matmul(self.w.T, dz)
        self.w = self.w - alpha*self.dw
        self.b = self.b - alpha*self.db
        return da


In [15]:
"""
a = ReLU(z)
"""
class ReLU:
    def __init__(self):
        self.z = None
        self.a = None

    @staticmethod
    def relu(z):
        r = np.maximum(0, z)
        return r

    @staticmethod
    def relu_derivative(z):
        dz = np.array(z, copy=True)
        dz[dz<=0] = 0
        dz[dz>0] = 1
        return dz

    def forward(self, z):
        self.z = z
        self.a = self.relu(z)
        if dbg:
            print("relu_forward: ")
            print(self.a.shape)
        return self.a

    def backward(self, da):
        dz = np.multiply(da, self.relu_derivative(self.z))
        return dz



In [16]:
def cross_entropy(y_hat, y):
    m = y.shape[1]
    logs = np.multiply(np.log(y_hat),y)
    cost = - np.sum(logs) / m
    return cost

"""
y_hat = e^z/sum(e^x)
"""
class SoftMax:
    def __init__(self):
        self.out_dim = None
        self.z = None
        self.y_hat = None

    def forward(self, z):
        if self.out_dim is None:
            self.out_dim = z.shape[0]
        self.z = z
        self.y_hat = np.exp(z) / np.sum(np.exp(z), axis=0)
        if dbg:
            print("soft_forward: ")
            print(self.y_hat.shape)
        return self.y_hat

    def backward(self, y):
        dz = self.y_hat - y
        return dz

In [17]:
class Convolutional:
    def __init__(self, number_of_filters, filter_dim, stride=1, padding = 0):
        self.number_of_filters = number_of_filters
        self.filter_dim = filter_dim
        self.stride = stride
        self.padding = padding
        self.w = None
        self.b = None
        self.a_prev = None

    @staticmethod
    def add_padd(x, pad):
        x_pad = np.pad(x, ((0,0), (pad, pad), (pad, pad), (0,0)), mode='constant', constant_values = (0,0))
        return x_pad

    @staticmethod
    def conv_single_step(a_slice_prev, w, b):
        s = np.multiply(a_slice_prev, w)
        z = np.sum(s)
        z = z + float(b)
        return z

    def convolve(self, x, y, b, pad,m, stride, nh, nw, nc, f):
        x_p = self.add_padd(x, pad)
        z = np.zeros([m, nh, nw, nc])
        for i in range(m):
            x_cur = x_p[i]
            for h in range(nh):
                v_shuru = stride * h
                v_shesh = v_shuru + f
                for w in range(nw):
                    h_shuru = stride * w
                    h_shesh = h_shuru + f
                    for c in range(nc):
                        x_slice = x_cur[ v_shuru:v_shesh, h_shuru:h_shesh, :]
                        weights = y[:, :, :, c]
                        biases = b[:, :, :, c]
                        z[i, h, w, c] = self.conv_single_step(x_slice, weights, biases)
        return z

    def forward(self, a_prev):
        self.a_prev = a_prev
        (m, nh_prev, nw_prev, nc_prev) = a_prev.shape

        if self.w is None:
            self.w = np.random.randn(self.filter_dim, self.filter_dim, nc_prev, self.number_of_filters)*INIT_W
            self.b = np.zeros((1, 1, 1, self.number_of_filters))

        (fh, fw, nc_prev, nc) = self.w.shape

        stride = self.stride
        pad = self.padding

        nh = int(int(nh_prev + 2*pad - fh)/stride + 1)
        nw = int(int(nw_prev + 2*pad - fw)/stride + 1)

        z = self.convolve(a_prev,self.w,self.b,pad,m,self.stride,nh,nw,nc,fh)

        assert(z.shape == (m, nh, nw, nc))
        if dbg:
            print("conv_forward: ")
            print(z.shape)
        return z

    def backward(self, dz):
        (m, nh_prev, nw_prev, nc_prev) = self.a_prev.shape
        (fh, fw, nc_prev, nc) = self.w.shape
        stride = self.stride
        pad = self.padding
        (m, nh, nw, nc) = dz.shape
        da_prev = np.zeros(self.a_prev.shape)
        dw = np.zeros(self.w.shape)
        db = np.zeros(self.b.shape)
        a_prev_pad = self.add_padd(self.a_prev, pad)
        da_prev_pad = self.add_padd(da_prev, pad)

        for i in range(m):
            a_prev_pad_cur = a_prev_pad[i]
            da_prev_pad_cur = da_prev_pad[i]
            for h in range(nh):
                for w in range(nw):
                    for c in range(nc):
                        v_shuru = stride * h
                        v_shesh = v_shuru + fh
                        h_shuru = stride * w
                        h_shesh = h_shuru + fw

                        a_slice = a_prev_pad_cur[v_shuru:v_shesh,h_shuru:h_shesh,:]
                        da_prev_pad_cur[v_shuru:v_shesh, h_shuru:h_shesh, :] += self.w[:,:,:,c] * dz[i, h, w, c]
                        dw[:,:,:,c] += a_slice * dz[i, h, w, c]
                        db[:,:,:,c] += dz[i, h, w, c]

            if pad > 0:
                da_prev[i, :, :, :] = da_prev_pad_cur[pad:-pad, pad:-pad, :]
            else:
                da_prev[i, :, :, :] = da_prev_pad_cur[:, :, :]

        assert(da_prev.shape == (m, nh_prev, nw_prev, nc_prev))

        self.w = self.w - alpha*dw
        self.b = self.b - alpha*db

        return da_prev



In [18]:
class MaxPool:
    def __init__(self, filter_dim, stride):
        self.filter_dim = filter_dim
        self.stride = stride
        self.a_prev = None

    def forward(self, a_prev):
        self.a_prev = a_prev
        (m, nh_prev, nw_prev, nc_prev) = a_prev.shape
        f = self.filter_dim
        stride = self.stride
        nh = int(1 + (nh_prev - f) / stride)
        nw = int(1 + (nw_prev - f) / stride)
        nc = nc_prev

        a = np.zeros((m, nh, nw, nc))

        for i in range(m):
            a_prev_cur = a_prev[i]
            for h in range(nh):
                v_shuru = stride * h
                v_shesh = v_shuru + f
                for w in range(nw):
                    h_shuru = stride * w
                    h_shesh = h_shuru + f
                    for c in range (nc):
                        a_prev_cur_slice = a_prev_cur[v_shuru:v_shesh,h_shuru:h_shesh,c]
                        a[i, h, w, c] = np.max(a_prev_cur_slice)

        assert(a.shape == (m, nh, nw, nc))
        if dbg:
            print("pool_forward: ")
            print(a.shape)
        return a

    @staticmethod
    def create_mask_from_window(x):
        mask = (x == np.max(x))
        return mask

    def backward(self, da):
        stride = self.stride
        f = self.filter_dim
        (m, nh, nw, nc) = da.shape

        da_prev = np.zeros(self.a_prev.shape)

        for i in range(m):
            a_prev = self.a_prev[i,:,:,:]

            for h in range(nh):
                for w in range(nw):
                    for c in range(nc):
                        v_shuru  = h * stride
                        v_shesh    = v_shuru + f
                        h_shuru = w * stride
                        h_shesh   = h_shuru + f

                        a_prev_slice = a_prev[ v_shuru:v_shesh, h_shuru:h_shesh, c ]
                        mask = self.create_mask_from_window( a_prev_slice )
                        da_prev[i, v_shuru:v_shesh, h_shuru:h_shesh, c] += mask * da[i, h, w, c]

        assert(da_prev.shape == self.a_prev.shape)
        return da_prev

In [19]:
class Flattening:
    def __init__(self):
        self.a_prev = None

    def forward(self, a_prev):
        self.a_prev = a_prev
        m = a_prev.shape[0]
        a = list()
        for i in range(m):
            a.append(np.ravel(a_prev[i,:,:,:]))
        a = np.array(a)
        a = a.T
        if dbg:
            print("flatten_forward: ")
            print(a.shape)
        return a
    """
    (m,h,w,c) -> h*w*c,m -> m,h*w*c ->

    """
    def backward(self, da):
        da = da.T
        da_prev = da.reshape(self.a_prev.shape)
        return da_prev

In [20]:
def modify_label(y):
    out = np.zeros((10, y.shape[0]))
    for i in range(y.shape[0]):
        out[y[i,0], i] = 1
    return out

def encode_level(y_hat):
    for j in range(y_hat.shape[1]):
        mx = -10
        mx_idx = -1
        for i in range(y_hat.shape[0]):
            if y_hat[i,j] > mx:
                mx = y_hat[i,j]
                mx_idx = i
            y_hat[i][j] = 0
        y_hat[mx_idx, j] = 1

    return y_hat

def calc_accuracy(y_hat, y):
    match = 0
    for j in range(y_hat.shape[1]):
        flag = 0
        for i in range(y_hat.shape[0]):
            if y_hat[i,j] != y[i,j]:
                flag = 1
                break
        if flag == 0:
            match += 1

    return match


def run_mnist():
    f1 = open("architecture.txt", "r")
    lines = f1.readlines()
    cnn_layers = list()
    for line in lines:
        words = line.strip().split()
        if words[0].lower() == "fc":
            cnn_layers.append(FullyConnected(int(words[1])))
        elif words[0].lower() == "relu":
            cnn_layers.append(ReLU())
        elif words[0].lower() == "softmax":
            cnn_layers.append(SoftMax())
        elif words[0].lower() == "conv":
            cnn_layers.append(Convolutional( int(words[1]), int(words[2]), int(words[3]), int(words[4])))
        elif words[0].lower() == "pool":
            cnn_layers.append(MaxPool(int(words[1]), int(words[2])))
        elif words[0].lower() == "flatten":
            cnn_layers.append(Flattening())

    f1.close()

    x_mnist_train, y_mnist_train = read_mnist_train()
    batch_sz = 5

    itr_outer = 10

    while itr_outer > 0:
        itr_inner = 100
        for i in range(0,x_mnist_train.shape[0],batch_sz):
            curr_batch_x = x_mnist_train[i:i+batch_sz,:]
            curr_batch_y = y_mnist_train[i:i+batch_sz]

            curr_batch_x = curr_batch_x.reshape((batch_sz, 28, 28, 1))
            curr_batch_y = curr_batch_y.reshape(batch_sz, 1)

            curr_batch_y = modify_label(curr_batch_y)

            prev_a = curr_batch_x
            for layer in cnn_layers:
                prev_a = layer.forward(prev_a)

            prev_derivative = curr_batch_y
            for j in range(len(cnn_layers)-1,0,-1):
                prev_derivative = cnn_layers[j].backward(prev_derivative)

            print(cross_entropy(prev_a, curr_batch_y))

            itr_inner -= 1
            if itr_inner <= 0:
                break

        itr_outer -= 1


    x_mnist_test, y_mnist_test = read_mnist_test()

    total_match = 0

    for i in range(0,x_mnist_test.shape[0],batch_sz):
        curr_batch_x = x_mnist_test[i:i+batch_sz,:]
        curr_batch_y = y_mnist_test[i:i+batch_sz]

        curr_batch_x = curr_batch_x.reshape((batch_sz, 28, 28, 1))
        curr_batch_y = curr_batch_y.reshape(batch_sz, 1)

        curr_batch_y = modify_label(curr_batch_y)

        prev_a = curr_batch_x
        for layer in cnn_layers:
            prev_a = layer.forward(prev_a)

        prev_a = encode_level(prev_a)
        calc_accuracy(prev_a, curr_batch_y)

        total_match += calc_accuracy(prev_a, curr_batch_y)


    print("accuracy: " + str(total_match/10000))



run_mnist()

2.306200911979168
2.3062534136246473
2.2993054118411567
2.3115523834279372
2.301056512761414
2.303533143265494
2.302112543210028
2.3086776436688576
2.300058060909449
2.2986692028910003
2.298737534933602
2.293263062168191
2.29535724486525
2.2921234305608813
2.3048397285165168
2.296534176591899
2.3036391722496448
2.288997770207111
2.297925793930588
2.2714381895795617
2.309068079126524
2.2819583819208145
2.2429943692975813
2.271337940033372
2.1983707395542864
2.461364606902733
2.3262946978383416
2.3163156250206782
2.3078574802944005
2.304801877812649
2.2942334289863275
2.2793805812210444
2.2883576903245917
2.271159645724143
2.3168429782737148
2.2898492585117056
2.286578841671349
2.2770927016784377
2.2575658466573048
2.2561338883219757
2.293398577351996
2.160491730277289
2.148059046707667
2.3212643633502785
2.221671254890685
2.3510532281001932
2.1442892052232656
2.405418362928246
2.188232200208507
2.1297187739140178
2.1809676066177
2.230839856150654
2.1188955994534457
2.3040099671569103
2.

In [21]:
# def read_data(file_path):
#     df = pd.read_csv(file_path, delim_whitespace=True, header=None)
#     num_features = df.shape[1] - 1
#     df = pd.get_dummies(df, columns=[4], drop_first=False)
#     train_dataset = df.to_numpy()
#     x_train = train_dataset[:,:num_features]
#     y_train = train_dataset[:,num_features:]
#
#     x_train = x_train.T
#     y_train = y_train.T
#
#     # print(x_train.shape)
#     # print(y_train.shape)
#     return x_train, y_train


In [22]:
# def encode_level(y_hat):
#     for j in range(y_hat.shape[1]):
#         mx = -10
#         mx_idx = -1
#         for i in range(y_hat.shape[0]):
#             if y_hat[i,j] > mx:
#                 mx = y_hat[i,j]
#                 mx_idx = i
#             y_hat[i][j] = 0
#         y_hat[mx_idx, j] = 1
#
#     return y_hat
#
# def calc_accuracy(y_hat, y):
#     match = 0
#     for j in range(y_hat.shape[1]):
#         flag = 0
#         for i in range(y_hat.shape[0]):
#             if y_hat[i,j] != y[i,j]:
#                 flag = 1
#                 break
#         if flag == 0:
#             match += 1
#
#     print("accuracy: " + str(match/y_hat.shape[1]))
#
#
#
#
# def runcnn():
#     f1 = open("architecture.txt", "r")
#     lines = f1.readlines()
#     cnn_layers = list()
#     for line in lines:
#         words = line.strip().split()
#         if words[0].lower() == "fc":
#             cnn_layers.append(FullyConnected(int(words[1])))
#         elif words[0].lower() == "relu":
#             cnn_layers.append(ReLU())
#         elif words[0].lower() == "softmax":
#             cnn_layers.append(SoftMax())
#
#     f1.close()
#
#     x, y = read_data("Toy Dataset/trainNN.txt")
#     itr_limit = 10000
#     for itr in range(itr_limit):
#         prev_a = x
#         for layer in cnn_layers:
#             prev_a = layer.forward(prev_a)
#
#         prev_derivative = y
#         for i in range(len(cnn_layers)-1,0,-1):
#             prev_derivative = cnn_layers[i].backward(prev_derivative)
#
#         if itr % 500 == 0:
#             print(cross_entropy(prev_a, y))
#
#     prev_a = encode_level(prev_a)
#     calc_accuracy(prev_a, y)
#
#     x_test, y_test = read_data("Toy Dataset/testNN.txt")
#
#     prev_a = x_test
#     for itr in range(itr_limit):
#         prev_a = x
#         for layer in cnn_layers:
#             prev_a = layer.forward(prev_a)
#
#     prev_a = encode_level(prev_a)
#     calc_accuracy(prev_a, y_test)
#

In [23]:
# """
# run cnn
# """
# runcnn()