In [1]:
%matplotlib inline
from time import time, sleep
import numpy as np
import pickle as pkl
import copy
import matplotlib.pyplot as plt
from IPython import display
from collections import Counter
from sklearn.preprocessing import OneHotEncoder
import os
from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def rel_error(x, y):
    """ returns relative error """
    return np.max(np.abs(x - y) / (np.maximum(1e-8, np.abs(x) + np.abs(y))))

<a id='toc'></a>
* [1. Modules](#modules)
    * [1.1 Base objects](#base)
        * [1.1.1 Layer](#layer)
        * [1.1.2 Sequential](#seq)
    * [1.2 Helpers](#helpers)
        * [1.2.1 Initializers](#initializers)
        * [1.2.2 Regularizers](#regularizers)
    * [1.3 Layers](#layers)
        * [1.3.1 Dense](#dense)
        * [1.3.2 Softmax](#softmax)
        * [1.3.3 Dropout](#dropout)
        * [1.3.4 BatchNormalization](#batchnorm)

<a id='layer'></a>
## Layer [[toc]](#toc)

In [2]:
# %load ../../ml/neural_network/sequential/layer.py

#import numpy as np
#from collections import OrderedDict

class Layer:
    def assert_nans(self, arr):
        assert not np.any(np.isnan(arr))
    def assert_inf(self, arr):
        assert not np.any(np.isinf(self.grad_input))
        
    def __init__(self):
        self.output = None
        self.grad_input = None
        self.training = True
        self.initialized = False
           
    # Initialization
    def initialize(self, params):
        params = self._initialize(params)
        self.initialized = True
        return params
    
    def _initialize(self, params):
        self._check_initialization_params(params)
        params = self._initialize_name(params)
        return params

    def _check_initialization_params(self, params):
        assert 'input_shape' in params
        assert 'seed'  in params
        assert 'dtype' in params
        assert 'names' in params
    
    def _initialize_name(self, params):
        names = params['names']
        layer_type_name = type(self).__name__
        n_layers = names.setdefault(layer_type_name, 0)
        self.name = layer_type_name + str(n_layers)
        names[layer_type_name] += 1
        return params
    
    # Propagation
    def forward(self, input):
        return self.update_output(input)
    def update_output(self, input):
        pass
    
    def backward(self, input, grad_output):
        self.update_grad_input(input, grad_output) # This updates self.grad_input
        self.update_grad_param(input, grad_output)
        return self.grad_input
    def update_grad_input(self, input, grad_output):
        pass
    def update_grad_param(self, input, grad_output):
        pass
        
    def get_params(self):
        return OrderedDict()
        
    def get_grad_params(self):
        return OrderedDict()
        
    def zero_grad_params(self):
        pass
        
    def train(self):
        self.training = True
        
    def evaluate(self):
        self.training = False
        
    def __repr__(self):
        return type(self).__name__

## Sequential [[toc]](#toc)

In [4]:
# %load ../../ml/neural_network/sequential/sequential.py

#import numpy as np
#from .layer import Layer
#import copy
#from collections import OrderedDict

class Sequential(Layer):
    def __init__(self):
        super().__init__()
        self.layers = []

    def add(self, layer):
        assert isinstance(layer, Layer)
        self.layers.append(layer)
        
    def _initialize(self, params):
        params.setdefault('seed', 0)
        params.setdefault('dtype', np.float32)
        params.setdefault('names', {})
        self._check_initialization_params()
        for n_layer, layer in enumerate(self.layers):
            params = layer.initialize(params)
        return params

    def update_output(self, input):
        """This function passes input through all layers and saves output"""
        for n_layer, layer in enumerate(self.layers):
            output = layer.forward(input)
            input = output
        self.output = output
        return self.output
        
    def backward(self, input, grad_output):
        n_layers = len(self.layers)
        for n_layer in reversed(list(range(1, n_layers))):
            grad_output = self.layers[n_layer].backward(self.layers[n_layer - 1].output, grad_output)
        self.grad_input = self.layers[0].backward(input, grad_output)
        return self.grad_input
        
    def get_params(self):
        params = OrderedDict()
        for layer in self.layers.get_params():
            for param_name, param_value in layer.items():
                params[param_name] = param_value
        return params

    def get_grad_params(self):
        grad_params = OrderedDict()
        for layer in self.layers:
            for grad_name, grad_value in layer.get_grad_params.items():
                grad_params[grad_name] = grad_value
        return grad_params
        
    def zero_grad_params(self):
        for layer in self.layers:
            layer.zero_grad_params()
            
    def __getitem__(self, n):
        return self.layers[n]
        
    def __repr__(self):
        return '->'.join([str(layer) for layer in self.layers])
        
    def train(self):
        """Sets all layers to training mode"""
        for layer in self.layers:
            layer.train()
            
    def evaluate(self):
        """Sets all layers to evaluation mode"""
        for layer in self.layers:
            layer.evaluate()

<a id='helpers'></a>
## Helper objects [[toc]](#toc)

<a id='initializers'></a>
### Initializers [[toc]](#toc)

In [5]:
# %load ../../ml/neural_network/initializers/initializers.py

import numpy as np

class Initializer:
    def __init__(self):
        pass

class DeterministicInitializer(Initializer):
    def __init__(self, init_value):
        self.init_value = init_value
    def __call__(self, shape=None, dtype=np.float32):
        return self.init_value.astype(dtype)
    
class RandomInitializer(Initializer):
    def __init__(self, seed=None):
        super().__init__()
        self.gen = np.random.RandomState(seed)

        
class ZerosInitializer(Initializer):
    def __init__(self):
        super().__init__()
    def __call__(self, shape=None, dtype=np.float32):
        if shape is None:
            return 0.0
        return np.zeros(shape, dtype=dtype)


class NormalInitializer(RandomInitializer):
    def __init__(self, seed=None):
        super().__init__(seed=seed)
    def __call__(self, shape=None, dtype=np.float32):
        stddev = 1.0
        if len(shape) == 2:
            stddev = 1. / np.sqrt(shape[0])
        if len(shape) == 4:
            stddev = 1.0 / np.sqrt(np.prod(shape[1:]))
        return self.gen.uniform(-stddev, stddev, size=shape).astype(dtype)


class NormalInitializer(RandomInitializer):
    def __init__(self, seed=None):
        super().__init__(seed=seed)
        
    def __call__(self, shape=None, dtype=np.float32):
        stddev = 1.0
        if len(shape) == 2:
            stddev = 1. / np.sqrt(shape[0])
        if len(shape) == 4:
            stddev = 1.0 / np.sqrt(np.prod(shape[1:]))
        return self.gen.normal(loc=0, scale=stddev, size=shape).astype(dtype)

<a id='regularizers'></a>
### Regularizers [[toc]](#toc)

In [6]:
# %load ../../ml/neural_network/regularizers/regularizers.py

import numpy as np

class Regularizer:
    pass

class EmptyRegularizer(Regularizer):
    def __bool__(self):
        return False
    
class L2regularizer(Regularizer):
    def __init__(self, l2=0.0):
        self.l2 = l2
    def __bool__(self):
        return True
    def loss(self, arr):
        return 0.5 * np.sum(arr ** 2)
    def grad(self, arr):
        return arr

<a id='dense'></a>
### 1.3.1 Dense [[toc]](#toc)

In [26]:
# %load ../../ml/neural_network/layers/dense.py

#import numpy as np
#from collections import OrderedDict


class Dense(Layer):
    def __init__(self, units, use_bias=True, 
                 W_initializer=None, b_initializer=None, 
                 W_regularizer=None, b_regularizer=None):
        """
        Inputs:
        - units - Integer or Long, dimensionality of the output space.
        - W_initializer
        - b_initializer
        - seed - used for initializers!!!
        """
        super().__init__()
        self.units = units
        self.use_bias = use_bias
        self.W_initializer = W_initializer
        self.b_initializer = b_initializer
        self.W_regularizer = W_regularizer
        self.b_regularizer = b_regularizer

    # initialization
    def _initialize(self, params):
        # Params check and name initialization
        params = super()._initialize(params)

        # Initializing params and grads
        params = self._initialize_W(params)
        params = self._initialize_b(params)
        
        # Regularization
        if self.W_regularizer is None: self.W_regularizer = EmptyRegularizer()
        if self.b_regularizer is None: self.b_regularizer = EmptyRegularizer()
        return params

    def _initialize_W(self, params):
        input_shape = params['input_shape']
        seed = params['seed']
        dtype = params['dtype']
        if self.W_initializer is None:
            self.W_initializer = NormalInitializer(seed=seed)
        elif isinstance(self.W_initializer, np.ndarray):
            assert self.W_initializer.shape == (input_shape[1], self.units)
            self.W_initializer = DeterministicInitializer(self.W_initializer)
        else:
            assert False
        self.W = self.W_initializer(shape=(input_shape[1], self.units), dtype=dtype)
        self.grad_W = np.zeros_like(self.W, dtype=dtype)
        params['seed'] = seed + 1
        params['input_shape'] = (input_shape[0], self.units) # Input shape for the next layer
        return params
        
    def _initialize_b(self, params):
        dtype = params['dtype']
        if self.b_initializer is None:
            self.b_initializer = ZerosInitializer()
        elif isinstance(self.b_initializer, np.ndarray):
            assert self.b_initializer.shape == (self.units,)
            self.b_initializer = DeterministicInitializer(self.b_initializer)
        else:
            assert False
        self.b = self.b_initializer(shape=(self.units,), dtype=dtype)
        self.grad_b = np.zeros_like(self.b, dtype=dtype)
        return params
    
    def update_output(self, input):
        self.assert_nans(input)
        self.output = np.dot(input, self.W)  # [B x I] x [I x O] = [B x O]
        if self.use_bias:
            self.output += self.b[None, :]
        return self.output
    
    def update_grad_input(self, input, grad_output):
        self.assert_nans(grad_output)
        self.grad_input = np.dot(grad_output, self.W.T)         # [B x O] x [O x I] = [B x I]
        return self.grad_input
    
    def update_grad_param(self, input, grad_output):
        self.assert_nans(grad_output)
        assert input.shape[0] == grad_output.shape[0]
        batch_size = input.shape[0]
        self.grad_W = np.dot(input.T, grad_output)               # ([I x B] x [B x O]).T = [I, O]
        if self.W_regularizer:
            self.grad_W += self.W_regularizer.grad(self.W)
        if self.use_bias:
            self.grad_b = np.mean(grad_output, axis=0)
            if self.b_regularizer:
                self.grad_b += self.b_regularizer.grad(self.b)
        print(self.grad_W)
        print(self.grad_b)
        
    def get_regularization_loss(self):
        loss = 0
        if self.W_regularizer:
            loss += self.W_regularizer.loss(self.W)
        if self.use_bias:
            if self.b_regularizer:
                loss += self.b_regularizer.loss(self.b)  
        return loss
 
    def get_params(self):
        return OrderedDict([(self.name + '/W', self.W), (self.name + '/b', self.b)])
    
    def get_grad_params(self):
        return OrderedDict([(self.name + '/W', self.grad_W), (self.name + '/b', self.grad_b)])

    def zero_grad_params(self):
        self.grad_W.fill(0)
        self.grad_b.fill(0)
    
    def __repr__(self):
        return 'Dense({}->{})'.format(self.input_size, self.output_size)

In [27]:
dense = Dense(units=100)
dense.initialize({'input_shape': (10, 20), 'seed': 11, 'dtype': np.float32, 'names': {}})

{'dtype': numpy.float32,
 'input_shape': (10, 100),
 'names': {'Dense': 1},
 'seed': 12}

#### Dense: forward

In [28]:
# Test the affine_forward function
batch_size = 2
input_size = 120
output_size = 3

X = np.linspace(-0.1, 0.5, num=batch_size * input_size).reshape(batch_size, input_size) # [2, 120]
W = np.linspace(-0.2, 0.3, num=input_size * output_size).reshape(input_size, output_size) # [360] - > [120, 3]
b = np.linspace(-0.3, 0.1, num=output_size)

dense = Dense(output_size, W_initializer=W, b_initializer=b)
dense.initialize({'input_shape': (-1, 120), 'dtype': np.float32, 'seed': 1, 'names': {}})

output = dense.forward(X)
correct_output = np.array([[ 1.49834967,  1.70660132,  1.91485297],
                        [ 3.25553199,  3.5141327,   3.77273342]])

# Compare your output with ours. The error should be around 1e-9.
print('Testing affine_forward function:')
print('difference: ', rel_error(output, correct_output))

Testing affine_forward function:
difference:  5.084615128635586e-09


#### Dense: backward

In [29]:
def eval_numerical_gradient_array(func, input, grad_output, h=1e-5):
    """
    Evaluate a numeric gradient for a function that accepts a numpy
    array and returns a numpy array.
    """
    grad = np.zeros_like(input)
    it = np.nditer(input, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        index = it.multi_index
        oldval = input[index]
        input[index] = oldval + h
        pos = func().copy()
        input[index] = oldval - h
        neg = func().copy()
        input[index] = oldval
        grad[index] = np.sum((pos - neg) * grad_output) / (2 * h)
        it.iternext()
    return grad

def eval_numerical_gradient_layer_param(layer, param_name, input, grad_output, h=1e-5):
    assert layer.initialized
    param_value = getattr(layer, param_name)
    def func():
        return layer.forward(input)
    return eval_numerical_gradient_array(func, param_value, grad_output, h=h)

def eval_numerical_gradient_layer_input(layer, input, grad_output, h=1e-5):
    assert layer.initialized
    def func():
        return layer.forward(input)
    return eval_numerical_gradient_array(func, input, grad_output, h=h)

In [31]:
# Test the affine_backward function
np.random.seed(231)

input = np.random.randn(10, 6)
W = np.random.randn(6, 5)
b = np.random.randn(5)
grad_output = np.random.randn(10, 5)

print('input.shape =', input.shape)
print('W.shape     =', W.shape)
print('b.shape     =', b.shape)

dense = Dense(5, W_initializer=W, b_initializer=b)
dense.initialize({'input_shape': (-1, 6), 'dtype': np.float32, 'seed': 1, 'names': {}})

num_grad_W     = eval_numerical_gradient_layer_param(dense, 'W', input, grad_output)
num_grad_b     = eval_numerical_gradient_layer_param(dense, 'b', input, grad_output)    
num_grad_input = eval_numerical_gradient_layer_input(dense, input, grad_output)
print('num_grad_W.shape     =', num_grad_W.shape)
print('grad_b.shape         =', num_grad_b.shape)
print('num_grad_input.shape =', num_grad_input.shape)


grad_input = dense.backward(input, grad_output)
grad_W = dense.grad_W
grad_b = dense.grad_b
print('grad_W.shape         =', grad_W.shape)
print('grad_b.shape         =', grad_b.shape)
print('grad_input.shape     =', grad_input.shape)

#The error should be around 1e-10
print('Testing affine_backward function:')
print('grad_input error: ', rel_error(num_grad_input, grad_input))
print('grad_W error:     ', rel_error(num_grad_W, grad_W))
print('grad_b error:     ', rel_error(num_grad_b, grad_b))  

input.shape = (10, 6)
W.shape     = (6, 5)
b.shape     = (5,)
num_grad_W.shape     = (6, 5)
grad_b.shape         = (5,)
num_grad_input.shape = (10, 6)
[[-2.37342917 -0.51198268  0.31810037  2.90403428  1.06935402]
 [-3.51268592 -2.11141032 -3.63676781 -1.97058092 -2.23413933]
 [ 5.08663365  1.49488732  3.34009108  5.93662486 -2.15844283]
 [-0.69809993 -0.20996862 -2.34841896  2.76050051 -3.16921717]
 [ 2.22793491  2.34320739 -4.92577398  2.06883897 -3.34916043]
 [-0.77798671 -1.29867108 -3.14496814  0.15478615 -3.09581511]]
[-0.57858866 -0.21428895 -0.39364814 -0.41066459 -0.00925332]
grad_W.shape         = (6, 5)
grad_b.shape         = (5,)
grad_input.shape     = (10, 6)
Testing affine_backward function:
grad_input error:  1.7513525864769402e-10
grad_W error:      0.0006785753951179602
grad_b error:      0.818406012402901


In [35]:
num_grad_b

array([-5.793744  , -2.1426065 , -3.9418273 , -4.1122227 , -0.09265885],
      dtype=float32)

<a id='softmax'></a>
### Softmax [[toc]](#toc)

In [208]:
# %load ../../ml/neural_network/layers/softmax.py

#import numpy as np
#from ..sequential import Layer

class SoftMax(Layer):
    def __init__(self):
        super().__init__()
    
    def update_output(self, input):
        self.assert_nans(input)
        self.output = np.subtract(input, input.max(axis=1, keepdims=True))
        np.exp(self.output, self.output)
        self.output /= np.sum(self.output, axis=1, keepdims=True)
        return self.output
    
    def update_grad_input(self, input, grad_output):
        self.assert_nans(grad_output)
        G = np.multiply(self.output, grad_output)
        self.grad_input = G - self.output * np.sum(G, axis=1, keepdims=True)
        #assert self.grad_input.shape == grad_output.shape
        return self.grad_input

In [209]:
softmax = SoftMax()
softmax.initialize({'input_shape': (10, 25), 'dtype': np.float32, 'seed': 1, 'names': {}})

{'dtype': numpy.float32,
 'input_shape': (10, 25),
 'names': {'SoftMax': 1},
 'seed': 1}

<a id='softmax'></a>
### Dropout [[toc]](#toc)

In [212]:
# %load ../../ml/neural_network/layers/dropout.py

#import numpy as np
#from ..sequential import Layer


class Dropout(Layer):
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p
        self.mask = None

    def _initialize(self, params):
        # Check params and initialize name
        params = super()._initialize(params)
        seed = params['seed']
        self.gen = np.random.RandomState(seed)
        params['seed'] += 1
        return params
    
    def update_output(self, input):
        if self.training:
            self.mask = self.gen.choice([0, 1], p=[self.p, 1 - self.p], size=input.shape)
            self.output = np.multiply(self.mask, input)
        else:
            self.output = (1 - self.p) * input
        return self.output
    
    def update_grad_input(self, input, grad_output):
        if self.training:
            self.grad_input = np.multiply(self.mask, grad_output)
        else:
            self.grad_input = (1 - self.p) * grad_output
        return self.grad_input
    
    def train(self):
        self.training = True
        
    def evaluate(self):
        self.training = False
        self.mask = None

In [215]:
dropout = Dropout(0.5)
print(dropout.initialized)
print(dropout.initialize({'input_shape': (10, 25), 'dtype': np.float32, 'seed': 1, 'names': {}}))
print(dropout.initialized)

False
{'seed': 2, 'dtype': <class 'numpy.float32'>, 'names': {'Dropout': 1}, 'input_shape': (10, 25)}
True


<a id='softmax'></a>
### BatchNormalization [[toc]](#toc)

In [216]:
# %load ../../ml/neural_network/layers/batch_normalization.py
#import numpy as np
#from collections import OrderedDict
#from ..sequential import Layer

class BatchNormalization(Layer):
    """
    Forward pass for batch normalization.

    During training the sample mean and (uncorrected) sample variance are
    computed from minibatch statistics and used to normalize the incoming data.
    During training we also keep an exponentially decaying running mean of the
    mean and variance of each feature, and these averages are used to normalize
    data at test-time.

    At each timestep we update the running averages for mean and variance using
    an exponential decay based on the momentum parameter:

    running_mean = momentum * running_mean + (1 - momentum) * sample_mean
    running_var  = momentum * running_var  + (1 - momentum) * sample_var

    Note that the batch normalization paper suggests a different test-time
    behavior: they compute sample mean and variance for each feature using a
    large number of training images rather than using a running average. For
    this implementation we have chosen to use running averages instead since
    they do not require an additional estimation step; the torch7
    implementation of batch normalization also uses running averages.

    Input:
    - x: Data of shape (N, D)
    - gamma: Scale parameter of shape (D,)
    - beta: Shift paremeter of shape (D,)
    - bn_param: Dictionary with the following keys:
      - mode: 'train' or 'test'; required
      - eps: Constant for numeric stability
      - momentum: Constant for running mean / variance.
      - running_mean: Array of shape (D,) giving running mean of features
      - running_var Array of shape (D,) giving running variance of features

    Returns a tuple of:
    - out: of shape (N, D)
    - cache: A tuple of values needed in the backward pass
    """
    def __init__(self, momentum=0.9, eps=1e-5):
        super().__init__()
        self.momentum = momentum
        self.eps = eps
        
    def _initialize(self, params):
        # Check params and initialize name
        params = super()._initialize(params)
        input_shape = params['input_shape']
        dtype = params['dtype']
        n_features = input_shape[1]
        self.running_mean = np.zeros(n_features, dtype=dtype)
        self.running_var = np.zeros(n_features, dtype=dtype)
        self.gamma = np.ones(n_features, dtype=dtype)
        self.beta = np.zeros(n_features, dtype=dtype)
        return params
        
    def update_output(self, input):
        if self.training:
            self.sample_mean = np.mean(input, axis=0)
            self.sample_var = np.var(input, axis=0)
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * self.sample_mean
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * self.sample_var
            self.normed_input = (input - self.sample_mean[None, :]) / np.sqrt(self.sample_var + eps)[None, :]
            self.output = self.gamma[None, :] * self.normed_input + self.beta[None, :]
        else:
            input_norm = (X - self.running_mean[None, :]) / (np.sqrt(running_var[None, :] + eps))
            self.output = self.gamma[None, :] * input_norm + self.beta[None, :]
        return self.output

    def update_grad_input(self, input, grad_output):
        if self.traning:
            var = self.sample_var
        else:
            var = self.running_var
        self.grad_input = self.gamma / np.sqrt(var + self.eps)[None, :] *\
                ((grad_output - np.mean(grad_output, axis=0)[None, :]) -\
                 self.normed_input * np.mean(np.multiply(self.normed_input, grad_output), axis=0)[None, :]) 
        
    def update_grad_param(self, input, grad_output):
        self.grad_gamma = np.sum(np.multiply(self.normed_input, grad_output), axis=0)
        self.grad_beta = np.sum(grad_output, axis=0)
          
    def get_params(self):
        return OrderedDict([(self.name + '/gamma', self.gamma), (self.name + '/beta', self.beta)])
    
    def get_grad_params(self):
        return OrderedDict([(self.name + '/gamma', self.grad_gamma), (self.name + '/beta', self.grad_beta)])

In [217]:
bn = BatchNormalization()
print(bn.initialized)
print(bn.initialize({'input_shape': (10, 25), 'dtype': np.float32, 'seed': 1, 'names': {}}))
print(bn.initialized)

False
{'seed': 1, 'dtype': <class 'numpy.float32'>, 'names': {'BatchNormalization': 1}, 'input_shape': (10, 25)}
True
