In [1]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import mean_squared_error
import numpy as np
import matplotlib.pyplot as plt
import sys

In [49]:
class MLPClassifier(BaseEstimator,ClassifierMixin):

    def __init__(self,lr, momentum, shuffle,hidden_layer_widths, Deterministic, random_weights, binaryClassification):
        """ Initialize class with chosen hyperparameters.

        Args:
            lr (float): A learning rate / step size.
            shuffle(boolean): Whether to shuffle the training data each epoch. DO NOT SHUFFLE for evaluation / debug datasets.
            momentum(float): The momentum coefficent 
        Optional Args (Args we think will make your life easier):
            hidden_layer_widths (list(int)): A list of integers which defines the width of each hidden layer if hidden layer is none do twice as many hidden nodes as input nodes.
        Example:
            mlp = MLPClassifier(lr=.2,momentum=.5,shuffle=False,hidden_layer_widths = [3,3]),  <--- this will create a model with two hidden layers, both 3 nodes wide
        """
        self.hidden_layer_widths = hidden_layer_widths
        self.lr = lr
        self.momentum = momentum
        self.shuffle = shuffle
        self.deterministic=Deterministic
        self.random_weights=random_weights
        self.weights=None
        self.delta_weights=None
        self.binaryClassification=binaryClassification
        self.x_valid=None
        self.y_valid=None
        self.x_test=None
        self.y_test=None


    def fit(self, X, y, output_size, initial_weights=None):
        """ Fit the data; run the algorithm and adjust the weights to find a good solution

        Args:
            X (array-like): A 2D numpy array with the training data, excluding targets
            y (array-like): A 2D numpy array with the training targets
        Optional Args (Args we think will make your life easier):
            initial_weights (array-like): allows the user to provide initial weights
        Returns:
            self: this allows this to be chained, e.g. model.fit(X,y).predict(X_test)

        """
        #self.initial_weights = self.initialize_weights() if not initial_weights else initial_weights
        
        self.input_size=X.shape[1]
        self.output_size=output_size
        if self.weights is None:
            self.initialize_weights()
            self.delta_weights=self.get_zero_weights()
            
        if self.deterministic:
            self.epochs=10
        else:
            self.epochs=sys.maxsize
            
        self.epoch_count=0    
        
        curr_valid_score = -1
        valid_count=0
        self.mse_training=[]
        self.mse_valid=[]
        self.accuracy_train=[]
        self.accuracy_valid=[]
        #print("y is ", y)
        #print("self.epochs ", self.epochs)
        
        # Infinite loop when determin is false
        for _ in range(self.epochs):

            curr_delta_weights=[]
            for i in range(y.shape[0]):
                #print("i = ", i)
                input_x = np.insert(X[i], len(X[i]), 1) # Add the bias of 1
                # Forwarding
                nets = [] 
                outs = [] 
                for num_layer in range(len(self.weights)):
                    if num_layer == 0:
                        net = self.get_nets(input_x, num_layer)
                    else:
                        net = self.get_nets(out, num_layer)                    
                    nets.append(net) 
                    out = self.activation_sigmoid(net)
                    if num_layer != len(self.weights)-1:
                        out.append(1) # bias
                    outs.append(out)
                #print("nets ", nets)
                # get the output error
                output_out = outs[-1] # 1-d array
                output_out = np.array(output_out)
                #print("output_out ", output_out)
                '''
                temp_errors=[]
                expected = [0 for i in range(len(output_out))]
                expected[int(y[i][0])]=1
                for node_num in range(len(output_out)):
                    eee = (expected[node_num] - output_out[node_num]) * output_out[node_num] * (1 - output_out[node_num])
                    temp_errors.append(eee)
                #output_error = np.array(temp_errors)
                #print("output error ", output_error)
                '''
                temp_errors=[]
                input_target = y[i] # 1-d array
                #print("target: ", input_target)
                #temp_targets=[0]*self.output_size
                #temp_targets[input_target] = 1
                #print("input_target", input_target)
                #for node_num in range(len(output_out)):
                #    eee = (input_target - output_out[node_num])*output_out[node_num]*(1-output_out[node_num])
                #    print("input target: ", input_target, "output[num_node] :", output_out[node_num])
                #    temp_errors.append(eee)
                
                #output_error = np.array(temp_errors)
                output_error = (input_target-output_out)*output_out*(1-output_out)
                #print("1: ", input_target-output_out, "2: ", output_out*(1-output_out))
                #print("output error ", output_error)

                # Backward
                errors = []
                self.get_errors(errors, output_error, outs)
                self.update_weights(errors, outs, input_x)
                #print("weights ", self.weights)
                
                
            self.epoch_count += 1 
            
            # Debug and Eval model do not go through here
            # check validation score
            if self.x_valid is not None:
                valid_score = self.score(self.x_valid, self.y_valid)
                
                pred_training = self.predict(self.x_test) # prediction using current weight
                self.mse_training.append(mean_squared_error(self.y_test, pred_training))
                self.mse_valid.append(mean_squared_error(self.y_valid, self.predict(self.x_valid)))
                self.accuracy_train.append(self.score(self.x_test, self.y_test))
                self.accuracy_valid.append(valid_score)
                
                if curr_valid_score < valid_score:
                    curr_valid_score = valid_score  # 현재까지의 최대 점수를 저장
                    valid_count = 0
                    self.curr_best_weight = self.weights
                #elif abs(curr_valid_score - valid_score) < 0.02:
                #    valid_count += 1
                if valid_score <= curr_valid_score or abs(curr_valid_score - valid_score) < 0.05:
                    valid_count += 1
                    
                    
                # 만약 현재 최고점수에서 다음에 떨어지면 +1
                # 현재 최고점수에서 10번동안 아무런 변동이 거의 없다면 끝
                

                print("epochs ", self.epoch_count)
                print("valid score ", valid_score)
                print("valid count ", valid_count)
                if valid_count == 15:
                    print("done")
                    break
                
                
            #if self.shuffle:
             #   X,y=self._shuffle_data(X,y)
                    
                    
        return self
    
    def update_weights(self, errors, outs, input_x):
        err = errors.copy()
        err.reverse()
        out = outs.copy()
        out.insert(0, input_x)
        del out[-1]
        
        for num_layer in range(len(self.weights)):
            w = self.weights[num_layer]
            e = err[num_layer]
            o = out[num_layer]
            
            temp_delta=[]
            curr_delta = self.delta_weights[num_layer]
            #print("o ", out[num_layer])
            #print("e ", err[num_layer])
            for i in range(len(o)):
                temp=[]
                for j in range(len(e)):
                    #print("e[j]", e[j], "o[i] ", o[i])
                    v = self.lr*e[j]*o[i] 
                    #print("v: ", v)
                    m = self.momentum*curr_delta[i][j]
                    #print("m : ", m)
                    temp.append(v+m)
                    #print("temp ", temp)
                temp_delta.append(temp)
            #print("num_layer ", num_layer)
            self.weights[num_layer] = self.weights[num_layer] + np.array(temp_delta) 
            self.delta_weights[num_layer] = temp_delta

    def get_errors(self, errors, output_error, outs):
        errors.append(output_error.tolist())
        w = self.weights.copy()
        w = w.tolist()
        w.reverse()
        o_nodes = outs.copy()
        o_nodes.reverse()
        del o_nodes[0] 
        num_layers = len(self.weights)
        for layer in range(num_layers-1):
            error=[]
            prev_errors = np.array(errors[-1])
            curr_outs = o_nodes[layer]
            weight = w[layer]
            #print("prev_errors ", prev_errors)
            for i in range(len(curr_outs)-1):
                s = 0
                for j in range(len(prev_errors)):
                    #print("prev[j] ", prev_errors[j], "weight[i][j]: ", weight[i][j])
                    s = s + prev_errors[j]*weight[i][j]
                e = s*self.get_der_sig(curr_outs[i])
                error.append(e)
            errors.append(error)        
    
    def get_der_sig(self, value):
        return value*(1-value)
   
    def activation_sigmoid(self, net):
        outs=[]
        for i in range(len(net)):
            val = 1.0/(1.0 + np.exp(-net[i]))
            outs.append(val)
        return outs
    
    def get_nets(self, x, num_layer):
        num_nodes = self.weights[num_layer].shape[1]
        nets = []
        weight_layer = self.weights[num_layer]
        for i in range(num_nodes):
            col = weight_layer[:,i]  
            dot_product = np.dot(x, col)
            nets.append(dot_product)
        return nets

    def predict(self, X):
        """ Predict all classes for a dataset X
        Args:
            X (array-like): A 2D numpy array with the training data, excluding targets
        Returns:
            array, shape (n_samples,)
                Predicted target values per element in X.
        """
        preds=[]
        for i in range(X.shape[0]):
            x = np.insert(X[i], len(X[i]), 1)
            nets = [] 
            outs = []
            for num_layer in range(len(self.weights)):
                if num_layer == 0:
                    net = self.get_nets(x, num_layer)
                else:
                    net = self.get_nets(out, num_layer)
                nets.append(net)  
                out = self.activation_sigmoid(net)
                
                if num_layer != len(self.weights)-1:
                    out.append(1) # bias
                outs.append(out)
                
            output_out = outs[-1] # 1-d array
            output_out = np.array(output_out)
            index = output_out.argmax()
            preds.append(index)
        return preds
            
              
    
    def get_zero_weights(self):
        self.layer_size = 2 + len(self.hidden_layer_widths)
        self.node_size_list = [self.input_size + 1] # input nodes + bias
        for i in range(len(self.hidden_layer_widths)):
            self.node_size_list.append(self.hidden_layer_widths[i] + 1)
        if self.binaryClassification:
            self.node_size_list.append(self.output_size+1)
        else:
            self.node_size_list.append(self.output_size)
        weights = np.empty(self.layer_size-1, dtype=np.ndarray)
                
        for i in range(len(self.node_size_list) - 1):
            
            if i == len(self.node_size_list)-2:
                x=self.node_size_list[i]
                y=self.node_size_list[i+1]
            else:
                x=self.node_size_list[i]
                y=self.node_size_list[i+1]-1
            
            weights[i]=np.zeros((x,y))
            
        return weights
    
        

    def initialize_weights(self):
        """ Initialize weights for perceptron. Don't forget the bias!

        Returns:

        """
        if self.random_weights:
            self.layer_size = 2 + len(self.hidden_layer_widths)
            self.node_size_list = [self.input_size + 1] # input nodes + bias
            for i in range(len(self.hidden_layer_widths)):
                self.node_size_list.append(self.hidden_layer_widths[i] + 1)
            if self.binaryClassification:
                self.node_size_list.append(self.output_size+1)
            else:
                self.node_size_list.append(self.output_size)
            weights = np.empty(self.layer_size-1, dtype=np.ndarray)
                
            for i in range(len(self.node_size_list) - 1):
            
                if i == len(self.node_size_list)-2:
                    x=self.node_size_list[i]
                    y=self.node_size_list[i+1]
                else:
                    x=self.node_size_list[i]
                    y=self.node_size_list[i+1]-1
            
                weights[i]=np.random.uniform(-1,1,(x,y))
            self.weights = weights
        else:
            # random zeros
            self.weights = self.get_zero_weights()
            
        return [0]

    def score(self, X, y):
        """ Return accuracy of model on a given dataset. Must implement own score function.

        Args:
            X (array-like): A 2D numpy array with data, excluding targets
            y (array-like): A 2D numpy array with targets

        Returns:
            score : float
                Mean accuracy of self.predict(X) wrt. y.
        """
        preds = self.predict(X)
        temp_y = np.reshape(y, (y.shape[1], y.shape[0]))
        #print("preds ", preds)
        #print("y test ", temp_y)
        return np.count_nonzero(preds == temp_y[0,:]) / len(preds)
    
    def set_valid_sets(self, x, y):
        self.x_valid=x
        self.y_valid=y
        
    def set_test_sets(self, x, y):
        self.x_test=x
        self.y_test=y
        
    def _shuffle_data(self, X, y):
        """ Shuffle the data! This _ prefix suggests that this method should only be called internally.
            It might be easier to concatenate X & y and shuffle a single 2D array, rather than
             shuffling X and y exactly the same way, independently.
        """
        # X is 2d array
        # y is 
        X_temp = X
        y_temp = y.flatten()
        temp = np.column_stack((X_temp, y_temp))
        np.random.shuffle(temp)
        X_result = temp[:,:-1]
        y_result = temp[:,-1]
        y_result = np.reshape(y_result, (y_result.shape[0],1))
        return X_result, y_result

    ### Not required by sk-learn but required by us for grading. Returns the weights.
    def get_weights(self):
        return self.weights


In [50]:
# Load debug data
from scipy.io import arff
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
onehot_encoder = OneHotEncoder(sparse=False)

# Load debug data
debug_data = arff.loadarff("debug_dataset.arff")
debug_df = pd.DataFrame(debug_data[0])
debug_df['class'] = debug_df['class'].str.decode('utf-8')
debug_df['class'] = debug_df['class'].apply(pd.to_numeric)
np_debug = np.array(debug_df)

X_debug = np_debug[:,:-1]
y_debug = np_debug[:,-1]
y_debug = np.reshape(y_debug, (y_debug.shape[0],1)) # (8,1)
encoded_y = onehot_encoder.fit_transform(y_debug)



#(self,lr=.1, momentum=0, shuffle=True,hidden_layer_widths=None, Deterministic=True, random_weights=True, binaryClassification=False):
# Train on debug data
mlp_debug = MLPClassifier(0.1, 0.5, False, [4], True, False, False)
mlp_debug = mlp_debug.fit(X_debug, encoded_y,2)

# Check weights
mlp_debug.weights

array([array([[-0.00018149, -0.00018149, -0.00018149, -0.00018149],
       [ 0.00157468,  0.00157468,  0.00157468,  0.00157468],
       [-0.00788218, -0.00788218, -0.00788218, -0.00788218]]),
       array([[ 0.01050642, -0.01050642],
       [ 0.01050642, -0.01050642],
       [ 0.01050642, -0.01050642],
       [ 0.01050642, -0.01050642],
       [ 0.02148778, -0.02148778]])], dtype=object)

In [7]:
from sklearn.preprocessing import OneHotEncoder
onehot_encoder = OneHotEncoder(sparse=False)
encoded = onehot_encoder.fit_transform(y_debug)

In [9]:
encoded-0.5

array([[-0.5,  0.5],
       [-0.5,  0.5],
       [-0.5,  0.5],
       [-0.5,  0.5],
       [ 0.5, -0.5],
       [ 0.5, -0.5],
       [ 0.5, -0.5],
       [ 0.5, -0.5]])