In [2]:
import numpy as np
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split

In [3]:
"""
Model architecture
"""
nn_arch = [
    {"input_dim": 2, "output": 25, "activation": "relu"},
    {"input_dim": 25, "output": 50, "activation": "relu"},
    {"input_dim": 50, "output": 50, "activation": "relu"},
    {"input_dim": 50, "output": 25, "activation": "relu"},
    {"input_dim": 25, "output": 1, "activation": "sigmoid"}
]

In [4]:
"""
Initiate model based on pre-specified architecture 
"""
def init_layers(nn_arch, seed = 83):
    param_cache = {}
    np.random.seed(seed)
    
    for layer_id,layer in enumerate(nn_arch):
        input_dim = layer['input_dim']
        output_dim = layer['output']
        
        param_cache["W" + str(layer_id)] = np.random.randn(output_dim,input_dim)*0.1 #normal(0,0.1) Low variance normal distribution, with 0 mean
        param_cache["B" + str(layer_id)] = np.random.randn(output_dim, 1)*0.1
    return param_cache

In [5]:
"""
Activation functions and first derivatives
"""

def sigmoid(X):
    return 1/(1+np.exp(-X))
    
def relu(X):
    return np.maximum(0,X)
    
def sigmoid_backward(dZ_curr, A_curr):
    return sigmoid(A_curr)*(1-sigmoid(A_curr))*dZ_curr
    
def relu_backward(dZ_curr, A_curr):
    X = np.copy(dZ_curr)
    X[A_curr <= 0] = 0
    return X

In [26]:
#binary cross-entropy (Binary classification)
"""
Scoring and accuracy metrics
"""
def error_cost(output,target):
    m = target.shape[1]
    #print(m)
    if m != 0:
        x = -1/m * (np.dot(target,np.log(output).T) + np.dot((1-target),np.log(1-output).T))
    #print(output)
    return np.squeeze(x)
    
def nn_accuracy(output,target):
    op = prob_to_class(output)
    return (op == target).all(axis=0).mean()
    
def prob_to_class(output):
    op = np.copy(output)
    op[op>0.5] = 1
    op[op<=0.5] = 0
    return op

In [7]:
"""
Single step of forward propagation
"""
def single_forward_prop(W_curr, B_curr, Z_prev, activation):
    if activation == 'sigmoid':
        forward_activation = sigmoid
    elif activation == 'relu':
        forward_activation = relu
    A_curr = np.dot(W_curr, Z_prev) + B_curr
    
    return A_curr, forward_activation(A_curr)

In [8]:
"""
Complete forward propagation
"""
def full_forward_prop(nn_input, param_cache, nn_arch):
    memory_cache = {}
    A_curr = nn_input
    
    for layer_id, layer in enumerate(nn_arch):
        Z_prev = A_curr
        W_curr = param_cache["W" + str(layer_id)]
        B_curr = param_cache["B" + str(layer_id)]
        activation = layer['activation']
        A_curr, Z_curr = single_forward_prop(W_curr, B_curr, Z_prev, activation)
        
        memory_cache["A" + str(layer_id)] = A_curr
        memory_cache["Z" + str(layer_id)] = Z_prev
        
    return Z_curr, memory_cache

In [27]:
"""
Single step of backward propagation
"""
def single_back_prop(back_input, A_curr, W_curr, B_curr, Z_prev, activation):
    m = back_input.shape[1]
    
    if activation == 'sigmoid':
        backward_activation = sigmoid_backward
    elif activation == 'relu':
        backward_activation = relu_backward
    
    delta_A = backward_activation(back_input, A_curr)
    
    W_err = np.dot(delta_A, Z_prev.T)/m
    B_err = np.sum(delta_A, axis = 1, keepdims = True)/m
    back_input_ = np.dot(W_curr.T, delta_A)
    
    return W_err, B_err, back_input_

In [10]:
"""
Complete backward propagation
"""
def full_back_prop(param_cache, memory_cache, nn_arch, output, target):
    back_input = -1*(np.divide(target,output) - np.divide((1-target),(1-output)))
    
    for layer_id, layer in reversed(list(enumerate(nn_arch))):
        A_curr = memory_cache["A" + str(layer_id)]
        Z_prev = memory_cache["Z" + str(layer_id)]
        
        W_curr = param_cache["W" + str(layer_id)]
        B_curr = param_cache["B" + str(layer_id)]
        
        activation = layer['activation']
        
        W_err, B_err, back_input_ = single_back_prop(back_input, A_curr, W_curr, B_curr, Z_prev, activation)
        back_input = back_input_
        
        param_cache["dW" + str(layer_id)] = W_err
        param_cache["dB" + str(layer_id)] = B_err
        
    return param_cache

In [11]:
"""
Modifying parameters based on learning rate; end of one iteration
"""
def update_params(nn_arch, param_cache, learning_rate):
    #m = target.shape[1]
    for layer_id, layer in enumerate(nn_arch):
        param_cache["W" + str(layer_id)] -= learning_rate*param_cache["dW" + str(layer_id)]
        param_cache["B" + str(layer_id)] -= learning_rate*param_cache["dB" + str(layer_id)]
        
    return param_cache

In [12]:
"""
Main neural net training wrapper function
"""
def train_nn(nn_arch, X_train, y_train, learning_rate, epochs, score_logs = True):
    error_history = []
    accuracy_history = []
    param_cache = init_layers(nn_arch)
    
    for i in range(epochs):
        output, memory_cache = full_forward_prop(X_train, param_cache, nn_arch)
        param_cache = full_back_prop(param_cache, memory_cache, nn_arch, output, y_train)
        param_cache = update_params(nn_arch, param_cache, learning_rate)
        #output, memory_cache = full_forward_prop(X_train, param_cache, nn_arch)
        err = error_cost(output, y_train)
        accuracy = nn_accuracy(output, y_train)
        error_history.append(err)
        accuracy_history.append(accuracy)
        
        if (i%50 == 0):
            if(score_logs):
                print("Iteration: {0:05} - err: {1:.5f} - accuracy: {2:.5f}".format(i, err, accuracy))
    
    return param_cache

In [13]:
# number of samples in the data set
N_SAMPLES = 1000
# fraction of total observations used in test_set
TEST_SIZE = 0.1

In [24]:
"""
Making training and test datasets
"""
X, y = make_moons(n_samples = N_SAMPLES, noise=0.2, random_state=100)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=42)

In [28]:
"""
Estimating model parameters on the training set
"""
#np.seterr(divide='ignore', invalid = 'ignore')
params_cache = train_nn(nn_arch, np.transpose(X_train), np.transpose(y_train.reshape((y_train.shape[0], 1))), 0.02, 3000)

Iteration: 00000 - err: 0.69951 - accuracy: 0.49556
Iteration: 00050 - err: 0.69485 - accuracy: 0.49556
Iteration: 00100 - err: 0.69103 - accuracy: 0.49556
Iteration: 00150 - err: 0.68732 - accuracy: 0.77889
Iteration: 00200 - err: 0.68319 - accuracy: 0.77222
Iteration: 00250 - err: 0.67818 - accuracy: 0.78667
Iteration: 00300 - err: 0.67179 - accuracy: 0.78889
Iteration: 00350 - err: 0.66335 - accuracy: 0.78111
Iteration: 00400 - err: 0.65177 - accuracy: 0.77778
Iteration: 00450 - err: 0.63528 - accuracy: 0.78444
Iteration: 00500 - err: 0.61133 - accuracy: 0.79222
Iteration: 00550 - err: 0.57676 - accuracy: 0.80111
Iteration: 00600 - err: 0.52932 - accuracy: 0.80889
Iteration: 00650 - err: 0.47209 - accuracy: 0.81889
Iteration: 00700 - err: 0.41453 - accuracy: 0.83444
Iteration: 00750 - err: 0.36707 - accuracy: 0.84889
Iteration: 00800 - err: 0.33302 - accuracy: 0.85889
Iteration: 00850 - err: 0.31067 - accuracy: 0.86667
Iteration: 00900 - err: 0.29682 - accuracy: 0.87333
Iteration: 0

In [76]:
"""
Estimating target values for test input using estimated model parameters
"""
output, _ = full_forward_prop(np.transpose(X_test), params_cache, nn_arch)

In [77]:
"""
Target value accuracy when compared to true output
"""
accuracy = nn_accuracy(output, np.transpose(y_test.reshape((y_test.shape[0], 1))))
print(accuracy)

0.85
