In [10]:
import sys
from scipy.special import expit
from sklearn.metrics import accuracy_score

# Using the Two-Layer Perceptron network example (Mini-Batching & Glorant) provided during lecture,
# Let's quantify performance using accuracy. We don't normalize or one-hot encode the data just yet.

# start with a simple base classifier, which can't be fit or predicted it only has internal classes to be used by classes that will subclass it
class MultiLayerPerceptron(object):
    def __init__(self, n_hidden=30,
                 C=0.0, epochs=500, eta=0.001,
                 random_state=None, alpha=0.0,
                 decrease_const=0.0, shuffle=True,
                 minibatches=1, activation='sigmoid',
                 obj_func='quadratic'):
        np.random.seed(random_state)
        self.n_hidden = n_hidden
        self.l2_C = C
        self.epochs = epochs
        self.eta = eta
        self.alpha = alpha
        self.decrease_const = decrease_const
        self.shuffle = shuffle
        self.minibatches = minibatches
        self.activation = activation
        self.obj_func = obj_func

    @staticmethod
    def _encode_labels(y):
        """ Encode labels into one-hot representation """
        onehot = pd.get_dummies(y).values.T
        return onehot
    
    def _initialize_weights(self):
        hidden_layers = self.n_hidden

        if not hasattr(hidden_layers, "__iter__"): hidden_layers = [hidden_layers]

        hidden_layers = list(hidden_layers)
        hidden_layers += [self.n_output_]

        """Initialize weights with small random numbers."""
        weights = []
        #self.n_features_ = self.n_features_
        n_layers = len(hidden_layers)

        for i, n_hidden in enumerate(hidden_layers):

            if self.activation == 'sigmoid' or self.activation == 'linear':
                w1_num_elems = (self.n_features_ + 1) * n_hidden
                w1 = np.random.uniform(-1.0, 1.0, size=w1_num_elems)
                w1 = w1.reshape(n_hidden, self.n_features_ + 1) # reshape to be W

            elif self.activation == 'relu' or self.activation == 'silu':
                if i == n_layers - 1: init_bound = np.sqrt(0.5 / (n_hidden + self.n_features_ + 1))
                else: init_bound = np.sqrt(6. / (n_hidden + self.n_features_ + 1))

                w1 = np.random.uniform(-init_bound, init_bound, (n_hidden, self.n_features_ + 1))
                w1[:, :1] = 0
                
            self.n_features_ = n_hidden
            weights += [w1]
            
        return weights

    @staticmethod
    def _activation(self, z):
        if self.activation == 'linear': return z
        if self.activation == 'sigmoid': return expit(z)
        if self.activation == 'relu': return np.maximum(0, z.copy())
        if self.activation == 'silu':
            z = z.copy() * expit(z.copy())
            return z
        
    @staticmethod
    def _add_bias_unit(X, how='column'):
        """ Add bias unit (column or row of 1s) to array at index 0 """
        if how == 'column':
            ones = np.ones((X.shape[0], 1))
            X_new = np.hstack((ones, X))
        elif how == 'row':
            ones = np.ones((1, X.shape[1]))
            X_new = np.vstack((ones, X))
        return X_new
    
    @staticmethod
    def _L2_reg(lambda_, weights):
        """Compute L2-regularization cost"""
        # only compute for non-bias terms
        w = 0
        for w_j in weights:
            w += np.mean(w_j[:, 1:] ** 2)
            
        return (lambda_/2.0) * np.sqrt(w)
        
        #return (lambda_/2.0) * np.sqrt(np.mean(w1[:, 1:] ** 2) + np.mean(w2[:, 1:] ** 2))
    
    def _cost(self, a3, Y_enc, weights):
        """ Get the objective function value """
        if self.obj_func == 'quadratic':
            cost = np.mean((Y_enc - a3) ** 2)
        elif self.obj_func == 'cross_entropy':
            cost = -np.mean(np.nan_to_num((Y_enc*np.log(a3)+(1-Y_enc)*np.log(1-a3))))
        
        L2_term = self._L2_reg(self.l2_C, weights)
        return cost + L2_term
        
    def _feedforward(self, X, weights):
        """ Compute feedforward step """
        a_i = []
        z_i = []
        z = None
        for idx, W in enumerate(weights):
            if idx == 0:
                a1 = self._add_bias_unit(X.T, how='row')
            else:
                a1 = self._activation(z)
                a1 = self._add_bias_unit(a1, how='row')
            z1 = w @ a1
            a_i += [a1]
            z_i += [z1]
            z = z1
        a_out = self._sigmoid(z)
        a_i += [a_out]
        
        return a_i, z_i

    def _get_gradient(self, a, z, Y_enc, weights):
        """ Compute gradient step using backpropagation. """
        # vectorized backpropagation
        gradients = []
        A_output = a[-1]
        
        if self.obj_func == 'quadratic':
            V_last = -2 * (Y_enc - A_output) * A_output * (1-A_output)
        elif self.obj_func == 'cross_entropy':
            V_last = (A_output - Y_enc)
        
        i = 1
        for A, W in zip(a[:-1][::-1], weights[::-1]):
            if i == 1:
                grad = V_last @ A.T         # no bias on final layer
                V1 = (W.T @ V_last)         # back prop the sensitivity
            else:
                grad = V_last[1:,:] @ A.T   # dont back prop sensitivity of bias
                if i != len(weights):
                    V1 = (W.T @ V_last[1:, :])
            
            if len(weights)-1-i >= 0:

                if self.activation == 'linear':
                    V1 = V1

                elif self.activation == 'sigmoid':
                    V1 = A * (1 - A) * V1

                elif self.activation == 'relu':
                    Z1_with_bias = self._add_bias_unit(z[len(weights)-1-i],how='row')
                    V1[Z1_with_bias<=0] = 0

                elif self.activation == 'silu':
                    Z1_with_bias = self._add_bias_unit(z[len(weights)-1-i],how='row')
                    V1 = (A + (1-A) * self._sigmoid(Z1_with_bias)) * V1
                V_last = V1

            # regularize weights that are not bias terms
            grad[:, 1:] += W[:, 1:] * self.l2_C
            gradients.insert(0, grad)
            
            i += 1

        return gradients
    
    def fit(self, X, y, print_progress=False):
        """ Learn weights from training data. With mini-batch"""
        X_data, y_data = X.copy(), y.copy()
        y_enc = self._encode_labels(y)

        # init weights and setup matrices
        self.n_features_ = X_data.shape[1]
        self.n_output_ = y_enc.shape[0]
        self.weights= self._initialize_weights()
        
        W_prev_list = []
        for W in self.weights_list:
            delta_W_prev = np.zeros(W.shape)
            W_prev_list += [delta_W_prev]

        self.cost_ = []
        self.score_ = []
        # get starting acc
        self.score_.append(accuracy_score(y_data,self.predict(X_data)))
        for i in range(self.epochs):

            # adaptive learning rate
            self.eta /= (1 + self.decrease_const*i)

            if print_progress>0 and (i+1)%print_progress==0:
                sys.stderr.write('\rEpoch: %d/%d' % (i+1, self.epochs))
                sys.stderr.flush()

            if self.shuffle:
                idx_shuffle = np.random.permutation(y_data.shape[0])
                X_data, y_enc, y_data = X_data[idx_shuffle], y_enc[:, idx_shuffle], y_data[idx_shuffle]

            mini = np.array_split(range(y_data.shape[0]), self.minibatches)
            mini_cost = []
            for idx in mini:

                # feedforward
                a, z = self._feedforward(X_data[idx], self.weights)

                cost = self._cost(a3,y_enc[:, idx],self.w1,self.w2)
                mini_cost.append(cost)    # this appends cost of mini-batch only

                # compute gradient via backpropagation
                grad1, grad2 = self._get_gradient(a, z, y_enc=y_enc[:,idx], self.weights)

                # momentum calculations
                delta_w1, delta_w2 = self.eta * grad1, self.eta * grad2
                self.w1 -= (delta_w1 + (self.alpha * delta_w1_prev))
                self.w2 -= (delta_w2 + (self.alpha * delta_w2_prev))
                delta_w1_prev, delta_w2_prev = delta_w1, delta_w2

            self.cost_.append(mini_cost)
            self.score_.append(accuracy_score(y_data,self.predict(X_data)))

        return self

    def predict(self, X):
        """ Predict class labels """
        A3, _, = self._feedforward(X, self.weights)
        y_pred = np.argmax(A3, axis=0)
        return y_pred

In [1]:
class TLPGlorot(TLPMiniBatch):
    def __init__(self, dropout=0.25, **kwds):
        # need to add to the original initializer
        self.dropout = dropout
        # but keep other keywords
        super().__init__(**kwds)



    def fit(self, X, y, print_progress=0, xy_test=None):
        """ Learn weights from training data. With mini-batch"""
        x_data, y_data = X.copy(), y.copy()
        y_enc = self._encode_labels(y)

        # init weights and setup matrices
        self.n_features_ = x_data.shape[1]
        self.n_output_ = y_enc.shape[0]
        self.w1, self.w2 = self._initialize_weights()

        delta_w1_prev = np.zeros(self.w1.shape)
        delta_w2_prev = np.zeros(self.w2.shape)

        self.cost_ = []
        self.score_ = []
        self.score_.append(accuracy_score(y_data,self.predict(x_data)))
        if xy_test is not None:
            X_test = xy_test[0].copy()
            y_test = xy_test[1].copy()
            self.val_score_ = []
            self.val_score_.append(accuracy_score(y_test,self.predict(X_test)))
        for i in range(self.epochs):

            # adaptive learning rate
            self.eta /= (1 + self.decrease_const*i)

            if print_progress>0 and (i+1)%print_progress==0:
                sys.stderr.write('\rEpoch: %d/%d' % (i+1, self.epochs))
                sys.stderr.flush()

            if self.shuffle:
                idx_shuffle = np.random.permutation(y_data.shape[0])
                x_data, y_enc, y_data = x_data[idx_shuffle], y_enc[:, idx_shuffle], y_data[idx_shuffle]

            mini = np.array_split(range(y_data.shape[0]), self.minibatches)
            mini_cost = []

            # adding dropout neurons
            w1 = self.w1.copy()
            w2 = self.w2.copy()

            if self.dropout>0.0:

                # randomly select half of the neurons
                idx_dropout = np.random.permutation(w1.shape[0])
                #idx_other_half = idx_dropout[:int(w1.shape[0]*self.dropout)]
                idx_dropout = idx_dropout[int(w1.shape[0]*(1-self.dropout)):] #drop half

                idx_dropout = np.sort(idx_dropout)
                idx_w2_withbias = np.hstack(([0],(idx_dropout+1)))
                w1 = w1[idx_dropout,:]# get rid of rows
                w2 = w2[:,idx_w2_withbias]# get rid of extra columns
                delta_w1_prev_dropout = delta_w1_prev[idx_dropout,:]
                delta_w2_prev_dropout = delta_w2_prev[:,idx_w2_withbias]
            else:
                delta_w1_prev_dropout = delta_w1_prev
                delta_w2_prev_dropout = delta_w2_prev

            for idx in mini:

                # feedforward
                a1, z1, a2, z2, a3 = self._feedforward(x_data[idx], w1, w2)

                cost = self._cost(a3,y_enc[:, idx],w1,w2)
                mini_cost.append(cost) # this appends cost of mini-batch only

                # compute gradient via backpropagation
                grad1, grad2 = self._get_gradient(a1=a1, a2=a2, a3=a3, z1=z1, z2=z2, y_enc=y_enc[:, idx], w1=w1,w2=w2)

                delta_w1, delta_w2 = self.eta * grad1, self.eta * grad2
                w1 -= (delta_w1 + (self.alpha * delta_w1_prev_dropout))
                w2 -= (delta_w2 + (self.alpha * delta_w2_prev_dropout))
                delta_w1_prev_dropout, delta_w2_prev_dropout = delta_w1, delta_w2

            if self.dropout>0.0:
                # now append the learned weights back into the original matrices
                self.w1[idx_dropout,:] = w1
                self.w2[:,idx_w2_withbias] = w2
                delta_w1_prev[idx_dropout,:] = delta_w1_prev_dropout
                delta_w2_prev[:,idx_w2_withbias] = delta_w2_prev_dropout
            else:
                # don't eliminate any neurons
                self.w1 = w1
                self.w2 = w2
                delta_w1_prev = delta_w1_prev_dropout
                delta_w2_prev = delta_w2_prev_dropout

            self.score_.append(accuracy_score(y_data,self.predict(x_data)))
            self.cost_.append(mini_cost) # only uses dropped samples, so more noise
            if xy_test is not None:
                self.val_score_.append(accuracy_score(y_test,self.predict(X_test)))
        return self

NameError: name 'TLPMiniBatch' is not defined

In [11]:
import sys
import plotly
import numpy as np
import pandas as pd
from scipy.special import expit
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split

ds = load_iris()
X = ds.data
y = ds.target

scaler = StandardScaler()
print(scaler.fit(X))
StandardScaler(copy=True, with_mean=True, with_std=True)
X = scaler.transform(X)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

StandardScaler()


In [12]:
params = dict(n_hidden=(50,50,50,),
              C=0.1,
              epochs=300,
              eta=0.001,
              random_state=1,
              alpha=0.001,
              decrease_const=0.0001,
              shuffle=True,
              minibatches=50,
              activation='silu',
              obj_func='quadratic')

In [13]:
%%time 
mlp = MultiLayerPerceptron(**params)
mlp.fit(X_train, y_train, print_progress=10)
yhat = mlp.predict(X_test)
print('Test acc:',accuracy_score(y_test,yhat))

AttributeError: 'MultiLayerPerceptron' object has no attribute 'w1'