In [1]:
import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

from scipy.special import expit
from scipy.optimize import minimize_scalar

import copy

from numpy.linalg import pinv
from sklearn.datasets import load_iris
from scipy.optimize import fmin_bfgs

import multiprocessing as mp

In [2]:
from sklearn.model_selection import train_test_split

df = pd.read_csv('processed_data.csv')

# split the data and the target
train, test = train_test_split(df, test_size=0.99)

In [21]:
class Sequential(object):
    def __init__(self, C=0.0, epochs=500, eta=0.001, random_state=None, 
                 cost_function='quadratic', 
                 metric='accuracy',
                 verbose=1,
                 regular='None'):
        np.random.seed(random_state)
        self.C = C
        self.epochs = epochs
        self.eta = eta
        self.cost_function = cost_function
        self.metric = metric
        self.verbose = verbose
        self.regular = regular
        
        self.dims = []
        self.acts = []
        
        self.matrice = []
        self.vects = [] #before the activation function
        self.neurons = [] #after the activation function
        
    def add(self,arg,argv):
        '''
        add('Dense',12)
        add('Dense',15)
        add('Activation',Sigmoid)
        add('Activation',Relu)
        '''
        if arg == 'Dense':
            if len(self.acts) == 0:
                self.dims.append((0,argv))
            else:
                self.dims.append((self.dims[-1][-1],argv))
        if arg == 'Activation':
            self.acts.append(argv)
        return
    
    @staticmethod
    def _encode_labels(y):
        onehot = pd.get_dummies(y).values.T
        return onehot

    @staticmethod
    def _add_bias_unit(X, how='column'):
        if how == 'column':
            ones = np.ones((X.shape[0], 1))
            X_new = np.hstack((ones, X))
        elif how == 'row':
            ones = np.ones((1, X.shape[1]))
            X_new = np.vstack((ones, X))
        return X_new

    def _initialize_params(self,X):
        for row,col in self.dims:
            W_num_elems = (row + 1)*col
            W = np.random.uniform(-1.0, 1.0, size=W_num_elems)
            W = W.reshape(col, row + 1) # reshape to be W
            self.matrice.append(W)
        self.neurons.append(X)
        return
    
    def activate(vec,act):
        return act.val(vec)
    
    def activate_dev(vec,neuron,act):
        if act == sigmoid:
            return neuron * (1-neuron)
        else:
            return act.dev(vec)
    
    def _feedforward(self):
        for W,act in zip(self.matrice,self.acts):
            self.vects.append(W @ self._add_bias_unit(self.neurons[-1].T, how='row'))
            self.neurons.append(self.activate(self.vects[-1],act))
        return

    def predict(self,X_test):
        if X_test == self.neurons[0]:
            y_pred = np.argmax(self.neurons[-1], axis=0)
        else:
            for W,act in zip(self.matrice,self.acts):
                neuron = X_test
                vec = W @ self._add_bias_unit(neuron.T, how='row')
                neuron = self.activate(vec,act)
            y_pred = np.argmax(neuron, axis=0)
        return y_pred
    
    def set_init_dev_prefix(self):
        if self.cost_function == 'quadratic':
            return -2 * (Y - self.neurons[-1])
        if self.cost_function == 'cross_entropy':
            return -2 * (Y - self.neurons[-1])
            #to be continued

    def _update_params(self,Y):
        #double check
        cur_dev_prefix = self.set_init_dev_prefix()
        grads = []
        for N,V,act,W in zip(self.neurons[1:],self.vects,self.activations,self.matrice)[::-1]:
            cur_dev_prefix *= self.activate_dev(V,N,act)
            grads.append(cur_dev_prefix @ N)
            cur_dev_prefix *= W
        grads.reverse()
        for W,grad in zip(self.matrice,grad):
            W -= self.eta * grad
        return
    
    def _get_gradient(self):
        V2 = -2 * (Y - A3) * A3 * (1 - A3)
        V1 = A2 * (1 - A2) * (W2.T @ V2)
        grad2 = V2 @ A2.T
        grad1 = V1[1:,:] @ A1.T
        return grad1, grad2

    def fit(X,Y):
        dims[0][0] = X.shape[1]
        if len(Y.uniques) != dims[-1][-1]:
            print('Error: output dimension is wrong!')
            return False
        self._initialize_params(X)
        for i in range(self.epochs):
            self._feedforward()
            self._update_params(Y)
            if verbose and not i % (self.epochs//100):
                if self.metric == 'accuracy':
                    accu = accuracy_score(self.neurons[-1],y)
                print('{} percent finished, current accuracy is {}.'.format(100*i//self.epochs,accu))
        print('Training is done!')

In [22]:
class TANH:
    def val(x):
        return np.tanh(x)
    def dev(x):
        return 1 - np.tanh(x)^2
class Sigmoid:
    def val(x):
        return np.tanh(x)
    def dev(x):
        return 1 - np.tanh(x)^2
class Relu:
    def val(x):
        return np.tanh(x)
    def dev(x):
        return 1 - np.tanh(x)^2
class SoftMax:
    def val(x):
        return np.tanh(x)
    def dev(x):
        return 1 - np.tanh(x)^2
        
model = Sequential(C=0.0, epochs=500, eta=0.001, random_state=None, 
                 cost_function='quadratic', 
                 metric='accuracy',
                 verbose=1,
                 regular='None')
model.add('Dense',15)
model.add('Activation',Sigmoid)
model.add('Dense',20)
model.add('Activation',Relu)
model.add('Dense',30)
model.add('Activation',TANH)
model.add('Dense',3)
model.add('Activation',SoftMax)

#model.fit(X,y)
#model.predict(X_test)