In [1]:
import numpy as np
import pandas as pd

In [2]:
#help classes and functions (same as ANN).
##one-hot-encoding
class one_hot_encoder:
    def __init__(self):
        pass
    def fit(self,y_train):
    #fit y_train, find set{y_train} and assign elements of this set with one-hot-vectors.
       label_set = set(y_train)
    #assign index to labels:
       self.label_list = list(label_set)
    
    def transform(self,y): #apply one-hot-encoding to y_train or y_test
        #the shape of output matrix is (y.shape[0],len(self.label_list))
        ohe_matrix = np.zeros((y.shape[0],len(self.label_list)))
        for label,vector in zip(y,ohe_matrix):
            #find index of label and add 1 to vector[index]
            index = self.label_list.index(label)
            vector[index]+=1
        return ohe_matrix.astype(int)
    
def ohe(y_train,y_test):
    ohencoder = one_hot_encoder()
    ohencoder.fit(y_train)
    return ohencoder.transform(y_train),ohencoder.transform(y_test)
### normalize feature
class StandardScaler:
    def __init__(self):
        pass
    def fit(self,X_train): ##fit X_train, calculate mean, std of X_train
        self.u = X_train.mean(axis=0)
        self.s = X_train.std(axis=0)
    def transform(self,X):#apply normlization on X_train or X_test
        X_norm = (X-self.u)/(self.s+1e-6)## z = (x-u)/s, check: will self.s = 0 ?
        return X_norm
    
def normalize_features(X_train,X_test):
    scaler = StandardScaler()
    scaler.fit(X_train)
    return scaler.transform(X_train),scaler.transform(X_test)    
## scoring function
def accuracy(ypred, yexact):
    p = np.array(ypred == yexact, dtype = int)
    return np.sum(p)/float(len(yexact))
## read dataset
def read_dataset(feature_file,label_file):
    df_X = pd.read_csv(feature_file)
    df_y = pd.read_csv(label_file)
    X = df_X.values #convert values in dataframe to numpy 2-D array
    y = df_y.values.reshape(-1)   #convert values in dataframe to numpy 1-D array
    return X,y

In [3]:
#help functions for CNN.
def to_tensor(X):
    #2d array to 4d array
    m_samples, n_features=X.shape
    return X.reshape(m_samples,1,28,28)# Minist dataset features 874=28*28

#convolutional operation
def conv(x, k):
    x_row, x_col = x.shape # data shape
    k_row, k_col = k.shape # kernel shape 
    
    result_row, result_col = x_row - k_row + 1, x_col - k_col + 1 #stride = 1
    result = np.empty((result_row, result_col))
    for i in range(result_row):
        for j in range(result_col):
            #summation of point-wise product of submatrix with kernel
            sub = x[i : i + k_row, j : j + k_col]
            result[i,j] = np.sum(sub * k)
    return result

#padding x with size1 rows/ size2 columns of zeros.
def padding(x, size1,size2):
    cur_r, cur_w = x.shape[0], x.shape[1]
    new_r = cur_r + size1 * 2
    new_w = cur_w + size2 * 2
    result = np.zeros((new_r, new_w))
    result[size1:cur_r + size1, size2:cur_w+size2] = x
    return result

def rot180(w):
#rotate matrix for 180 degree:
#array([[ 0,  1,  2,  3],
#       [ 4,  5,  6,  7],
#       [ 8,  9, 10, 11]])
# to
#array([[11, 10,  9,  8],
#       [ 7,  6,  5,  4],
#       [ 3,  2,  1,  0]])
    m,n = w.shape 
    w180 = w.flatten()[::-1].reshape(m,n)
    return w180



class ConvLayer:
    def __init__(self,lr=0.001):
        # in this example, in_channel = 1, out_channel = 2, kernel shape is (3,3)
        self.w = np.random.randn(1, 2, 3, 3)/np.sqrt(3)
        self.b = np.zeros((2)) 
        self.lr = lr
    
    def forward(self, in_data):      
        self.input = in_data 
        
        m_samples, in_channel, in_row, in_col = in_data.shape
        out_channel = 2
        # the kernel shape is (3,3)
        out_row = in_row - 2
        out_col = in_col - 2
        
        self.output = np.zeros((m_samples, out_channel, out_row, out_col))
        #You have to iterate in_channel if in_channel>1.
        for m in range(m_samples):
            for o in range(out_channel):
                self.output[m, o,:,:] += conv(in_data[m, 0,:,:], self.w[0, o,:,:])
                self.output[m, o,:,:] += self.b[o]# add bias for each output
        return self.output
    
    def backward(self,dz):
        m_samples, out_channel, out_row, out_col = dz.shape #dz has the same shape as the output

        # gradient of b has nothing to do with conv function; similar as ann's b term.
        db = np.sum(dz,axis=(0,2,3))
        
        
        #calculate dw to update current layers' gradient and dx for previous layer to update gradient
        dw = np.zeros_like(self.w)
        dx = np.zeros_like(self.input)
        
        #dL/dw  = sum_m con(x,dz). You have to iterate in_channel if in_channel>1.
        for m in range(m_samples):
            for o in range(out_channel):
                dw[0, o,:,:] += conv(self.input[m,0,:,:], dz[m,o,:,:])
                  
        #dL/dx = sum_i conv(pad(dz),rot180(w)).You have to iterate in_channel if in_channel>1.
        for m in range(m_samples):
            for o in range(out_channel):
                dz_padded = padding(dz[m,o,:,:],2,2)
                w_rot = rot180(self.w[0,o,:,:])
                dx[m, 0,  :,:] += conv(dz_padded,w_rot)

        # update gradient
        self.w = self.w - self.lr*dw
        self.b = self.b - self.lr*db
        ##return dx for previous layer to calculate gradient    
        return dx

In [20]:
class MaxPoolingLayer:
    def __init__(self, pooling = (2,2), strides = (2,2)):
        self.pooling = pooling # 
        self.strides = strides 

    def forward(self, in_data):   
        m_samples, in_channel, in_row, in_col = in_data.shape
        s0 = self.strides[0]
        s1 = self.strides[1]
        out_row = (in_row - self.pooling[0]) // s0 + 1
        out_col = (in_col - self.pooling[1]) // s1 + 1
        
        self.input = in_data
        self.output = np.empty((m_samples, in_channel, out_row, out_col))
        self.max_indices = np.zeros((m_samples, in_channel, out_row, out_col),dtype=tuple)

        for m in np.arange(m_samples):
            for i in np.arange(in_channel):
                for r in np.arange(out_row):
                    for c in np.arange(out_col):
                        region = in_data[m, i, r * s0 : r*s0 + self.pooling[0], c*s1 : c*s1 + self.pooling[1]]
                        self.output[m, i, r, c] = np.max(region)
                        # this is the max indices of the region [n, i, j]. it is a np array
                        self.max_indices[m, i, r, c] = np.unravel_index(region.argmax(), region.shape)
        return self.output

    def backward(self, dz):
        m_samples,in_channel,in_row, in_col = self.input.shape
        _, out_channel, out_row, out_col = self.output.shape
        
        dx = np.zeros_like(self.input)

        for m in np.arange(m_samples):
            for i in np.arange(in_channel):
                for r in np.arange(out_row):
                    for c in np.arange(out_col):
                        dx[m, i, self.max_indices[m, i, r, c][0]+r, self.max_indices[m, i, r, c][1]+c] = dz[m, i, r, c]
        
        return dx

In [21]:
class FlattenLayer:
    def __init__(self):
        pass
    def forward(self, in_data):
        self.m_samples, self.in_channel, self.in_row, self.in_col = in_data.shape
        return in_data.reshape(self.m_samples, self.in_channel * self.in_row * self.in_col)
    def backward(self, dz):
        return dz.reshape(self.m_samples, self.in_channel, self.in_row, self.in_col)

In [22]:
class LinearLayer:
    def __init__(self, in_num, out_num, lr = 0.001):
        self.in_num = in_num
        self.out_num = out_num
        self.w = np.random.randn(in_num, out_num)/np.sqrt(in_num)
        self.b = np.zeros((1,out_num))
        self.lr = lr

    def forward(self, in_data):
        self.input = in_data
        self.output = np.dot(in_data,self.w) + self.b
        return self.output
    def backward(self, dz):
        #similar as hidden layer in ann, but note that we do not include activation or softmax function in FCLayer.
        dw = np.dot(self.input.T, dz) 
        db = np.sum(dz,axis=0, keepdims=True)
        dx = dz.dot(self.w.T)
        self.w -= self.lr * dw
        self.b -= self.lr * db
        return dx

In [23]:
class relulayer:
    def __init__(self):
        pass
    def forward(self,in_data):
        # relu(x)= x if x>0 ; 0 otherwise
        self.input = in_data
        self.output = in_data.copy()
        self.output[self.output<0] = 0
        return self.output
    def backward(self,dz):
        dx = dz.copy()
        dx[self.input<0]=0
        return dx

In [24]:
class SoftmaxLayer:
    def __init__(self):
        pass
    def softmax(self,z):
        exp_value = np.exp(z-np.amax(z, axis=1, keepdims=True)) # for stablility
        # keepdims = True means that the output's dimension is the same as of z
        softmax_scores = exp_value / np.sum(exp_value, axis=1, keepdims=True)
        return softmax_scores
    
    def forward(self, in_data):
        self.input = in_data
        self.output = self.softmax(in_data)
        return self.output
    def backward(self, y):
        return self.output - y #y_hat - y

In [25]:
class Network():
    def __init__(self):
        self.layers = []
        self.y_prob = None
        self.loss = None
        
    def add_layer(self, layer):
        self.layers.append(layer)
    
    def feed_data(self, X, y):
        self.X = X
        self.y = y
    
    def feed_forward(self):                # feed forward 
        input = self.X
        for layer in self.layers:
            input = layer.forward(input)
        self.y_prob = input
        return input
    
    def back_propagation(self):                # back propagate
        dz = self.y # for the output layer, the input gradient is indeed target.
        for layer in reversed(self.layers):
            dz = layer.backward(dz) 
    
    def cross_entropy_loss(self):
        #  $L = -\sum_n\sum_{i\in C} y_{n, i}\log(\hat{y}_{n, i})$
        # calculate y_hat
        self.feed_forward()
        self.loss = -np.sum(self.y*np.log(self.y_prob + 1e-6)) 
    
    def predict(self, X_test):
        input = X_test
        for layer in self.layers:
            #if plot_feature_maps:
            #    image = (image * 255)[0, :, :]
            #    plot_sample(image, None, None)
            input = layer.forward(input)
        return np.argmax(input, axis=1)   
    
    def mini_batch_descent(self, X, y, batch_size=50):
        """
            Note that y is a 2d array. 
        """
        shuffled_idx = np.arange(X.shape[0])
        np.random.shuffle(shuffled_idx)
        n_batches = X.shape[0] // batch_size 
        
        for i in range(n_batches):
            self.feed_data(X[shuffled_idx[i*batch_size:(i+1)*batch_size]], y[shuffled_idx[i*batch_size:(i+1)*batch_size]])
            self.feed_forward()
            self.back_propagation()
        if X.shape[0] % batch_size != 0:
            self.feed_data(X[shuffled_idx[n_batches*batch_size:]], y[shuffled_idx[n_batches*batch_size:]])
            self.feed_forward()
            self.back_propagation()    

In [27]:
import time        
# load data
X_train, y_train = read_dataset('MNIST_X_train.csv', 'MNIST_y_train.csv')
X_test, y_test = read_dataset('MNIST_X_test.csv', 'MNIST_y_test.csv')
#normlize featurues / encode labels / convert features into one-hot-vector.
X_train_norm, X_test_norm = normalize_features(X_train, X_test)
y_train_ohe, y_test_ohe = ohe(y_train, y_test)
X_train_tensor,X_test_tensor = to_tensor(X_train_norm),to_tensor(X_test_norm)
# initialize network
myCNN = Network() 
myCNN.add_layer(ConvLayer(lr=0.0001))
myCNN.add_layer(MaxPoolingLayer())
#conv;pad; for your homework
myCNN.add_layer(FlattenLayer())
myCNN.add_layer(LinearLayer(in_num=338, out_num=100,lr=0.0001))
myCNN.add_layer(relulayer())
myCNN.add_layer(LinearLayer(in_num=100, out_num = y_train_ohe.shape[1],lr=0.0001))
myCNN.add_layer(SoftmaxLayer())
#start training
start = time.time()
epoch_num = 40
for i in range(epoch_num):
    myCNN.mini_batch_descent(X_train_tensor, y_train_ohe, batch_size=100)
    myCNN.cross_entropy_loss()
    y_pred = myCNN.predict(X_test_tensor)
    print('epoch = {}, current loss = {:.2f}, test accuracy = {:.2f}%'.format(i+1, myCNN.loss, 100*accuracy(y_pred, y_test.ravel())))      
    
end = time.time()
y_pred = myCNN.predict(X_test_tensor)
print('Accuracy of my CNN model is {:.2f}%'.format(accuracy(y_pred, y_test.ravel())*100))
print('Takes {:.2f}s'.format(end-start))    

epoch = 1, current loss = 119.67, test accuracy = 64.60%
epoch = 2, current loss = 73.86, test accuracy = 73.20%
epoch = 3, current loss = 53.43, test accuracy = 77.80%
epoch = 4, current loss = 47.94, test accuracy = 80.00%
epoch = 5, current loss = 51.05, test accuracy = 82.00%
epoch = 6, current loss = 44.69, test accuracy = 83.20%
epoch = 7, current loss = 41.15, test accuracy = 84.00%
epoch = 8, current loss = 33.23, test accuracy = 84.40%
epoch = 9, current loss = 39.08, test accuracy = 85.00%
epoch = 10, current loss = 23.49, test accuracy = 86.40%
epoch = 11, current loss = 28.82, test accuracy = 87.20%
epoch = 12, current loss = 22.16, test accuracy = 87.40%
epoch = 13, current loss = 21.75, test accuracy = 88.00%
epoch = 14, current loss = 27.73, test accuracy = 87.60%
epoch = 15, current loss = 26.49, test accuracy = 87.80%
epoch = 16, current loss = 25.66, test accuracy = 88.20%
epoch = 17, current loss = 18.35, test accuracy = 87.40%
epoch = 18, current loss = 29.53, test 