In [119]:
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

In [133]:
class NeuralNet:
    
    def __init__(self, file_to_create_df, train_test_split_size, nfeature_L1, nfeature_L2):
        np.random.seed(1)

        df_0 = pd.read_csv(file_to_create_df, header=None, index_col= None)

        self.df_1 = self.preprocess(df_0)

        ncols = len(self.df_1.columns)
        nrows = len(self.df_1.index)
        
        self.features_arr = self.df_1.iloc[:, 0:(ncols -1)].values.reshape(nrows, ncols-1) # .values ignores header
        self.true_target_arr = self.df_1.iloc[:, (ncols-1)].values.reshape(nrows, 1) # .values ignores header


        nfeature_L0 = len(self.features_arr[0])
        nsamples_L0 = len(self.true_target_arr)

        if not isinstance(self.true_target_arr[0], np.ndarray):
            nfeature_Lf = 1
        else:
            nfeature_Lf = len(self.true_target_arr[0])

        # Split dataset into training set and testing set on the basis of test set size
        self.features_train_arr, self.features_test_arr, self.true_target_train_arr, self.true_target_test_arr = train_test_split(self.features_arr, self.true_target_arr, test_size=train_test_split_size)

        # weights go from -1 to 1

        self.x_L0 = np.zeros((nsamples_L0, nfeature_L0))
        self.x_L0 = self.features_train_arr  # 1000 samples x 10 features_L0
        self.w_L0_L1 = 2 * np.random.random((nfeature_L0, nfeature_L1)) - 1 # 10 features_L0 x 5 features_L1

        self.x_L1 = np.zeros((nsamples_L0, nfeature_L1)) # 1000 samples x 5 features_L1 (dim reduced - > like pca)
        self.w_L1_L2 = 2 * np.random.random((nfeature_L1, nfeature_L2)) - 1 # 5 features_L1 x 3 features_L2
        self.delta_x_L1_given_act_fn = np.zeros((nsamples_L0, nfeature_L1)) # 1000 samples x 5 features_1

        self.x_L2 = np.zeros((nsamples_L0, nfeature_L2)) # 1000 samples x 3 features_L2 (dim reduced - > like pca)
        self.w_L2_Lf = 2 * np.random.random((nfeature_L2, nfeature_Lf)) - 1 # 3 features_2 x 1 features_3 (output)
        self.delta_x_L2_given_act_fn = np.zeros((nsamples_L0, nfeature_L2)) # 1000 samples x 3 features_L2

        self.x_Lf = np.zeros((nsamples_L0, nfeature_Lf)) # 1000 samples x 1 feature_Lf (dim reduced - > like pca)
        self.delta_x_Lf_target_given_act_fn = np.zeros((nsamples_L0, nfeature_Lf)) # 1000 samples x 1 features_Lf

    def forward_pass(self, features_arr_L0, activation_fn):
        # pass our inputs through our neural network
        self.x_L1 = self.activation_function(features_arr_L0.dot(self.w_L0_L1), activation_fn)

        self.x_L2  = self.activation_function(self.x_L1.dot(self.w_L1_L2), activation_fn)

        x_Lf = self.activation_function(self.x_L2.dot(self.w_L2_Lf), activation_fn)

        return x_Lf
    
    def backward_pass(self, x_Lf, activation_fn):
        # pass our inputs through our neural network
        self.compute_output_delta(x_Lf, activation_fn)
        self.compute_hidden_layer2_delta(activation_fn)
        self.compute_hidden_layer1_delta(activation_fn)


    def compute_output_delta(self, x_Lf, activation_fn):
        delta_x_Lf_target= self.true_target_train_arr - x_Lf

        delta_x_Lf_target_given_act_fn = None

        if activation_fn == "sigmoid":
            delta_x_Lf_target_given_act_fn = delta_x_Lf_target * (self.sigmoid_derivative(self.x_Lf))
        elif activation_fn == "tanh":
            delta_x_Lf_target_given_act_fn = delta_x_Lf_target * (self.tanh_derivative(self.x_Lf))
        elif activation_fn == "relu":
            delta_x_Lf_target_given_act_fn = delta_x_Lf_target * (self.relu_derivative(self.x_Lf))

        self.delta_x_Lf_target_given_act_fn = delta_x_Lf_target_given_act_fn

    def compute_hidden_layer2_delta(self, activation_fn):
        delta_x_L2 = self.delta_x_Lf_target_given_act_fn.dot(self.w_L2_Lf.T)
        delta_x_L2_given_act_fn = None
        if activation_fn == "sigmoid":
            delta_x_L2_given_act_fn = delta_x_L2 * (self.sigmoid_derivative(self.x_L2))
        elif activation_fn == "tanh":
            delta_x_L2_given_act_fn = delta_x_L2 * (self.tanh_derivative(self.x_L2))
        elif activation_fn == "relu":
            delta_x_L2_given_act_fn = delta_x_L2 * (self.relu_derivative(self.x_L2))

        self.delta_x_L2_given_act_fn = delta_x_L2_given_act_fn

    def compute_hidden_layer1_delta(self, activation_fn):
        delta_x_L1 = self.delta_x_L2_given_act_fn.dot(self.w_L1_L2.T)
        delta_x_L1_given_act_fn = None
        if activation_fn == "sigmoid":
            delta_x_L1_given_act_fn = delta_x_L1 * (self.sigmoid_derivative(self.x_L1))
        elif activation_fn == "tanh":
            delta_x_L1_given_act_fn = delta_x_L1 * (self.tanh_derivative(self.x_L1))
        elif activation_fn == "relu":
            delta_x_L1_given_act_fn = delta_x_L1 * (self.relu_derivative(self.x_L1))
            
        self.delta_x_L1_given_act_fn = delta_x_L1_given_act_fn



    def activation_function(self, x, activation_fn):
        if activation_fn == "sigmoid":
            return 1 / (1 + np.exp(-x))
        elif activation_fn == "tanh":
            return np.tanh(x)
        elif activation_fn == "relu":
            return np.maximum(0, x)
        return None
    


    def activation_derivative(self, x, activation="sigmoid"):
        if activation == "sigmoid":
            return x * (1 - x)
        elif activation == "tanh":
            return 1 - np.tanh(x) ** 2
        elif activation == "relu":
            return (x > 0) * 1
    


    def train(self, activation_fn = "sigmoid", max_iterations = 1000, learning_rate = 0.05):
        for _ in range(0, max_iterations, 1):

            # send forward 
            x_Lf = self.forward_pass(self.x_L0, activation_fn)
            error = 0.5 * np.power((x_Lf - self.true_target_train_arr), 2)



            # send backward
            self.backward_pass(x_Lf, activation_fn)

            w_L2_Lf_update = learning_rate * self.x_L2.T.dot(self.delta_x_Lf_target_given_act_fn) # (2, 100) dot (100 * 1)  = (2,1)
            w_L1_L2_update = learning_rate * self.x_L1.T.dot(self.delta_x_L2_given_act_fn) # (4, 100) dot (100 * 2) = (4 * 2)
            w_L0_L1_update = learning_rate * self.x_L0.T.dot(self.delta_x_L1_given_act_fn)

            self.w_L2_Lf = self.w_L2_Lf + w_L2_Lf_update
            self.w_L1_L2 = self.w_L1_L2 + w_L1_L2_update
            self.w_L0_L1 = self.w_L0_L1 + w_L0_L1_update
        
        print("After " + str(max_iterations) + " iterations, and having learning rate as " + str(learning_rate) + ", the total error is " + str(np.sum(error)))
        print("The final weight vectors are (starting from input to output layers)")

        print(self.w_L0_L1)
        print(self.w_L1_L2)
        print(self.w_L2_Lf)


    def predict(self, activation_fn = "sigmoid", header = True):
        pred_target_arr = self.forward_pass(self.features_test_arr, activation_fn)
        error = 0.5 * np.power((pred_target_arr - self.true_target_test_arr), 2)
        return np.sum(error)


    def preprocess(df):

        #Convert categorical attributes to numerical attributes
        for col in df:
            if df[col].dtype == 'object':
                df[col] = df[col].astype('category').cat.codes.astype('int64')


        arr = df.values

        #Handle null or missing values
        imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
        imputer = imputer.fit(arr)
        arr = imputer.transform(arr)


        #Standardization, converting mean to 0 and standard deviation to 1
        scaler = StandardScaler().fit(arr)
        arr = scaler.transform(arr)

        df = pd.DataFrame(arr)
        return df