In [None]:
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import h5py
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.preprocessing import OneHotEncoder
import timeit
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.datasets import fetch_openml
from sklearn.utils import check_random_state

In [None]:
#DATASET splitted  
allSets = h5py.File('dataset/singleDigits_set.h5','r')
print(allSets.keys())
train_set = allSets['training_data']
train_labels = allSets['training_labels']
real_test_set = allSets['test_data']

train_set = np.array(train_set)
train_labels = np.array(train_labels)
real_test_set = np.array(real_test_set)
print("train_set.shape is " + str(train_set.shape))
print("real_test_set.shape is " + str(real_test_set.shape))



In [None]:
# need to transform the dataset into a Tensor, how??? first transform images into array
train_set = np.asarray(train_set)
print(train_set.shape)
#print(train_set[0])
print(train_labels.shape)
#after split there are 167339 digits

In [None]:
def load_dataset():

    cv = KFold(n_splits=10)
    count = 1
    for train_index, test_index in cv.split(train_set): # Run 5 times
        count += 1
        x, x_test, y, y_test = train_set[train_index], train_set[test_index], train_labels[train_index], train_labels[test_index]
        if(count==5):#Which segment you choose
            break
    print("x.shape = " + str(x.shape))
    print("y.shape = " + str(y.shape))
    
    y = y.reshape(1,-1)
    y_test = y_test.reshape(1, -1)
    y = np.squeeze(y)
    y = pd.Series(y)
    y = pd.get_dummies(y)
    y = np.array(y).T
    y_test = np.squeeze(y_test)  
    y_test = pd.Series(y_test)
    y_test = pd.get_dummies(y_test)
    y_test = np.array(y_test).T  

    return x,y,x_test,y_test

In [None]:
def tanh(z):
    return (np.exp(z)-np.exp(-z))/(np.exp(z)+np.exp(-z))

def relu(z):
    return np.maximum(0,z)

def tanh_1(z):
    return 1-tanh(z)**2

def relu_1(z):
    return np.maximum(0, z/np.abs(z))

def sigmoid(z):
    return 1/(1+np.exp(-z))

In [None]:
def predict(x, w, b, layer, m, lm, sigma2, mu):
    answers = forward_test_predict(w, b, x, layer, m, lm, sigma2, mu)
    answers = answers.T
    for i in range(x.shape[1]):
        answers[i] = np.where(answers[i].max() == answers[i], 1, 0)

    print(answers.shape)
    result = np.argmax(answers, axis = 1)
    print("y_test_label is " + str(result))
    return result

def initialize(layer, n, m, hidden):
    np.random.seed(1)
    Vdw, Vdb, Sdw, Sdb = [],[],[],[]#refresh all
    b = []
    w = []
    
    for i in range(0, layer):
        if i != 0 and i != layer - 1:
            t0 = np.random.randn(hidden, hidden) * np.sqrt(2 / hidden)
        elif i == 0:
            t0 = np.random.randn(hidden, m) * np.sqrt(2 / m)
        else:                   
            t0 = np.random.randn(10, hidden) * np.sqrt(2 / hidden)
        w.append(t0)
        b.append(1)
        Vdw.append(0)
        Vdb.append(0)
        Sdw.append(0)
        Sdb.append(0)

    return w, b, Vdw, Vdb, Sdw, Sdb

smallNumber = 0.000000000001
sm = smallNumber

def printAccuracy(x, y, w, b, layer, m, lm, sigma2, mu):
    temp0 = forward_test(w, b, x, y, layer, m, lm, sigma2, mu)
    temp0 = temp0.T
    
    for i in range(x.shape[1]):
        temp0[i] = np.where(temp0[i].max() == temp0[i], 1, 0)
        
    temp0 = temp0.T 
    classes = np.unique(y) 
    
    temp1 = np.sum(np.abs(y - temp0), axis = 0, keepdims=True)/2
    temp2 = 1-np.sum(temp1, axis=1, keepdims=True)/10000
    temp2 = np.squeeze(temp2)

    print("Accuracy: {0}%".format(temp2 * 100)) #100%
    return 0


'''
Reference: 
Realize mnist handwritten digit recognition with python's numpy
https://www.programmersought.com/article/38914943265/
We understood it and rewrite these mothods below
'''


def forward_back(w, b, a, Y, layer, m, lm):
    z = []
    add = 0
    sigma2,mu = [],[]
    
    for i in range(0, layer):
        zl = np.dot(w[i], a) + b[i]

        multiplex = (1/m)*np.sum(zl, axis=1, keepdims = True)
        sigmaL = (1/m)*np.sum(np.power(zl - multiplex,2),axis=1,keepdims=True)
        
        z_norm = (zl - multiplex)/(np.sqrt(sigmaL + sm))
        gamma, beta_1 = 1*np.sqrt(sigmaL + sm), multiplex+0
        
        zl = np.multiply(z_norm,gamma) + beta_1
        mu.append(multiplex)
        sigma2.append(sigmaL)

        add += np.sum((lm / (2 * m)) * np.dot(w[i], w[i].T))
        z.append(zl)
        a = relu(zl)

    t = np.exp(zl)
    ti = np.sum(t,axis = 0,keepdims=True)
    
    a = np.divide(t,ti)
    J = (-1/m)*np.sum(Y*np.log(a))+ add 

    return z, a, J,sigma2,mu

def forward_test(w, b, a, Y, layer, m, lm,sigma2,mu): 
    for i in range(0, layer):
        zl = np.dot(w[i], a) + b[i]

        z_norm = (zl - mu[i]) / (np.sqrt(sigma2[i] + sm))
        gamma, beta_1 = 1 * np.sqrt(sigma2[i] + sm), mu[i] + 0 
        
        zl = np.multiply(z_norm, gamma) + beta_1
        a = relu(zl)
        
    t = np.exp(zl)
    ti = np.sum(t, axis=0, keepdims=True) 
    a = np.divide(t, ti)  
    return a

def forward_test_predict(w, b, a, layer, m, lm,sigma2,mu):  
    for i in range(0, layer):
        zl = np.dot(w[i], a) + b[i]

        z_norm = (zl - mu[i]) / (np.sqrt(sigma2[i] + sm))
        gamma, beta_1 = 1 * np.sqrt(sigma2[i] + sm), mu[i] + 0 
        
        zl = np.multiply(z_norm, gamma) + beta_1
        a = relu(zl)

    t = np.exp(zl)
    ti = np.sum(t, axis=0, keepdims=True) 
    
    a = np.divide(t, ti) 
    return a

def backward(w, b, X, Y, m, layer, lm):
    z,a,J,sigma2,mu = forward_back(w,b,X,Y,layer,m,lm)
    
    dw,db = [],[]
    for i in range(layer - 1, 0, -1):
        if i == layer - 1:
            dz = a - Y
        else:
            dz = np.dot(w[i + 1].T, dz) * relu_1(z[i])
            
        Dw = 1 / m * (np.dot(dz, relu(z[i - 1]).T)) + (lm / m) * w[i]
        Db = 1 / m * np.sum(dz, axis=1, keepdims=True)
        
        dw.append(Dw)
        db.append(Db)

    dz = np.dot(w[1].T, dz) * relu_1(z[0])
    Dw = 1 / m * np.dot(dz, X.T) + (lm / m) * w[0]
    
    Db = 1 / m * np.sum(dz, axis=1, keepdims = True)
    
    dw.append(Dw)
    db.append(Db)
    return dw, db, J,sigma2,mu

def back_momentum(w, b, X, Y, learning, m, layer, lm, beta, Vdw, Vdb):
    dw, db, J,sigma2,mu = backward(w,b,X,Y,m,L,lm)
    
    for i in range(0, layer):
        Vdw[i] = beta * Vdw[i] + (1 - beta) * dw[L-i-1]
        Vdb[i] = beta * Vdb[i] + (1 - beta) * db[L-i-1]
        
        w[i] = w[i] - learning * Vdw[i]
        b[i] = b[i] - learning * Vdb[i]
        
    return w,b,J,Vdw,Vdb,sigma2,mu

def back_RMSprop(w, b, X, Y, learning, m, layer, lm, beta, Sdw, Sdb):
    dw, db, J ,sigma2,mu= backward(w,b,X,Y,m,L,lm) 
    
    for i in range(0, layer):
        Sdw[i] = beta * Sdw[i] + (1 - beta) * np.power(dw[L-i-1],2)
        Sdb[i] = beta * Sdb[i] + (1 - beta) * np.power(db[L-i-1],2)
        
        w[i] = w[i] - learning * dw[L-i-1] / (np.power(Sdw[i],1/2) + sm)
        b[i] = b[i] - learning * db[L-i-1] / (np.power(Sdb[i],1/2) + sm)
        
    return w,b,J,Sdw,Sdb,sigma2,mu

def back_Adam(w, b, X, Y, learning, m, layer, lm, beta, Vdw, Vdb, Sdw, Sdb):
    dw, db, J ,sigma2,mu= backward(w, b, X, Y, m, layer, lm)
    for i in range(0, layer):
        Vdw[i] = beta * Vdw[i] + (1 - beta) * dw[layer - i - 1]
        Vdb[i] = beta * Vdb[i] + (1 - beta) * db[layer - i - 1]
        
        Sdw[i] = beta * Sdw[i] + (1 - beta) * np.power(dw[layer-i-1],2)
        Sdb[i] = beta * Sdb[i] + (1 - beta) * np.power(db[layer-i-1],2)
        
        w[i] = w[i] - learning * Vdw[i] / (np.power(Sdw[i],1/2) + sm)
        b[i] = b[i] - learning * Vdb[i] / (np.power(Sdb[i],1/2) + sm)
        
    return w,b,J,Vdw,Vdb,Sdw,Sdb,sigma2,mu


In [None]:
if __name__ == "__main__":
    loss_history= []
    
    layer = 5
    hidden = 512
    lr = 0.005
    decay_rate = 0.0009
    
    
    lambd = 0.01 
    beta = 0.9
    mb = 1000
    sigma2 = 1
    mu = 1
    
    train_set_x_raw, train_set_y, test_set_x_raw, test_set_y = load_dataset()
    
    train_set_x = train_set_x_raw.reshape((train_set_x_raw.shape[0], -1)).T / 255  
    
    test_set_x = test_set_x_raw.reshape((test_set_x_raw.shape[0], -1)).T / 255  
    
    real_test_set_dim = real_test_set.reshape((real_test_set.shape[0], -1)).T / 255  
    
    print("train_set_x.shape is " + str(train_set_x.shape))
    print("train_set_y.shape is " + str(train_set_y.shape))
    print("test_set_x.shape is " + str(test_set_x.shape))
    print("test_set_y.shape is " + str(test_set_y.shape))
    
    w,b,Vdw,Vdb,Sdw,Sdb = initialize(layer, train_set_x.shape[1], train_set_x.shape[0], hidden)
    for i in range(0,161): #Number of iterations. Our best case is 161 times.
        Sigma2, Mu,J_average = 0,0,0
        for j in range(0,(train_set_x.shape[1] // mb)):
            w,b,J,Vdw,Vdb,Sdw,Sdb,sigma2,mu = back_Adam(w,b,((train_set_x.T)[j*mb:(j+1)*mb]).T,((train_set_y.T)[j*mb:(j+1)*mb]).T,lr,mb,layer,lambd,beta,Vdw,Vdb, Sdw, Sdb)
            
            Sigma2 = np.multiply(beta, Sigma2) + np.multiply((1-beta), sigma2)
            Mu = np.multiply(beta, Mu) + np.multiply((1 - beta), mu)
            
            J_average = np.multiply(beta,J) + np.multiply((1-beta), J)
            
        lr = lr * (1 / (1 + i*decay_rate) )
        
        if i % 1 == 0: #Print step by step
            print("loss：",J_average)
            loss_history.append(J_average)
            printAccuracy(test_set_x,test_set_y,w,b,layer,test_set_x.shape[1],lambd,Sigma2,Mu)
            
    plt.plot(loss_history)
    plt.show()

    printAccuracy(test_set_x,test_set_y,w,b,L,test_set_x.shape[1],lambd,Sigma2,Mu)
    print("test_set_x.shape" + str(test_set_x.shape))
    print("test_set_y.shape" + str(test_set_y.shape))
    singleAnswers = predict(real_test_set_dim,w,b,L,test_set_x.shape[1],lambd,Sigma2,Mu)


In [None]:
pd.DataFrame(singleAnswers).to_csv("generatedAnswers/singleAnswers.csv")

In [None]:
#Read Index
test_set_index = h5py.File('dataset/test_set_index.h5','r')
print(test_set_index.keys())
set_index = test_set_index['test_set_index']

In [None]:
# Generate final answer
finalAnswer = []
number_of_test = 0
temp = []
for i in range(len(set_index)):
    if(set_index[i]!=0):
        temp.append(singleAnswers[i])
    else:
        while(len(temp)<5):
            temp.append(10)
        if(i != 0):
            finalAnswer.append(temp)
        number_of_test += 1
        temp = []
        temp.append(singleAnswers[i])
finalAnswer.append(temp)
print(len(finalAnswer))
#finalAnswer

In [None]:
# Generate result file
import csv
with open("generatedAnswers/finalResult.csv", 'w', newline='') as f:
    writer = csv.writer(f)
    count = 0
    writer.writerow(["Id"]+["Label"])
    for row in finalAnswer:
        string = str(row)
        #writer.writerow(str(count) + str(row[0]).replace(',','') + str(row[1]) + str(row[2]) + str(row[3]) + str(row[4]))
        string = string.replace(' ', '').replace(',', '').strip('[').strip(']')
        writer.writerow([str(count)]+[string])
        count += 1
    #writer.writerows(finalAnswer)