In [1]:
# Importing packages.
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.datasets import cifar10, mnist
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler
import time

In [2]:
def separate_validation_set(x_train_all, y_train_all, x_validation_size):
    """
    Createa a validation set from the training set.

    Parameters
    ----------
    x_train_all : np.array
        The training data.
    y_train_all : np.array
        The training labels.
    x_validation_size: int
        The size of the validation dataset.

    Returns
    -------
    x_train_raw: np.array
        The new training data.
    y_train_raw: np.array
        The new training labels.
    x_val_raw: np.array
        The new validation data.
    y_val_raw: np.array
        The new validation labels.
    """
    sss_val = \
        StratifiedShuffleSplit(
        n_splits=1, 
        test_size=x_validation_size/x_train_all.shape[0],
        random_state=110)
    train_indices, validation_indices = list(sss_val.split(x_train_all,y_train_all))[0]
    
    x_train_raw = x_train_all[train_indices]
    y_train_raw = y_train_all[train_indices]
    x_val_raw = x_train_all[validation_indices]
    y_val_raw = y_train_all[validation_indices]
    
    return x_train_raw, y_train_raw, x_val_raw, y_val_raw

def reduce_data(data_x, data_y, keep_real_num):
    """
    Since the CNN in the code is a naive implementation, it could take forever to properly train
    on the entire datasets. As a result, they need to be reduced in size for faster training and inference.

    Parameters
    ----------
    data_x : np.array
        The data.
    data_y : np.array
        The labels.
    keep_real_num: float
        A real number in the interval (0.0,1.0]. Represents the amount of data to be kept in terms of percentage.

    Returns
    -------
    data_x_reduced : np.array
        The reduced array of data.
    data_y_reduced: np.array
        The reduced array of labels.
    """
    if keep_real_num != 1.0:
        # Stratified shuffle.
        sss = \
            StratifiedShuffleSplit(
            n_splits=1, 
            test_size=(1-keep_real_num),
            random_state=110)
        keep_indices, drop_indices = list(sss.split(data_x,data_y))[0]
        data_x_reduced = data_x[keep_indices]
        data_y_reduced = data_y[keep_indices]
    else:
        data_x_reduced = data_x
        data_y_reduced = data_y
    
    return data_x_reduced, data_y_reduced

def verbose_classes_and_occurences(data_x, data_y):
    # Check if the training and the validation data are balanced data sets (i.e.: have the same number of occurences per class). 
    y_unique, y_counts = np.unique(data_y, return_counts=True)

    y_classes_and_occurences = dict(zip(y_unique, y_counts))
    
    return y_classes_and_occurences

In [3]:
# Data preparation and preprocessing.

# Load the MNIST hand-written digit dataset.
(x_train_all, y_train_all), (x_test_all, y_test_all) = mnist.load_data()

# Reshaping the array for feeding it into CNN.
x_train_all = x_train_all.reshape(x_train_all.shape[0], 28, 28, 1)
x_test_all = x_test_all.reshape(x_test_all.shape[0], 28, 28, 1)

# Explicit casting to float to allow for double precision.
x_train_all = x_train_all.astype('float32')
x_test_all = x_test_all.astype('float32')

# Normalizing the RGB codes by dividing it to the max RGB value. Here grayscale image, but still do it.
x_train_all /= 255
x_test_all /= 255

# Validation dataset size.
x_validation_size = 1000


x_train_raw, y_train_raw, x_val_raw, y_val_raw = \
    separate_validation_set(x_train_all=x_train_all, y_train_all=y_train_all, 
                            x_validation_size=x_validation_size)

keep_real_num = 0.2

x_train, y_train = reduce_data(data_x=x_train_raw, data_y=y_train_raw, keep_real_num=keep_real_num)
x_val, y_val = reduce_data(data_x=x_val_raw, data_y=y_val_raw, keep_real_num=keep_real_num)
x_test, y_test = reduce_data(data_x=x_test_all, data_y=y_test_all, keep_real_num=keep_real_num)

# Verbose.
print('training shapes: x_train.shape={0}, y_train.shape={1}'.format(x_train.shape, y_train.shape))
print('validation shapes: x_val.shape={0}, y_val.shape={1}'.format(x_val.shape, y_val.shape))
print('test shapes: x_test.shape={0}, y_test.shape={1}'.format(x_test.shape, y_test.shape))

print("Training occurences: {0}".format(verbose_classes_and_occurences(data_x=x_train, data_y=y_train)))
print("Validation occurences: {0}".format(verbose_classes_and_occurences(data_x=x_val, data_y=y_val)))
print("Test occurences: {0}".format(verbose_classes_and_occurences(data_x=x_test, data_y=y_test)))



training shapes: x_train.shape=(11800, 28, 28, 1), y_train.shape=(11800,)
validation shapes: x_val.shape=(200, 28, 28, 1), y_val.shape=(200,)
test shapes: x_test.shape=(2000, 28, 28, 1), y_test.shape=(2000,)
Training occurences: {0: 1165, 1: 1326, 2: 1172, 3: 1206, 4: 1149, 5: 1066, 6: 1164, 7: 1232, 8: 1150, 9: 1170}
Validation occurences: {0: 20, 1: 22, 2: 20, 3: 20, 4: 19, 5: 18, 6: 20, 7: 21, 8: 20, 9: 20}
Test occurences: {0: 196, 1: 227, 2: 206, 3: 202, 4: 196, 5: 178, 6: 192, 7: 206, 8: 195, 9: 202}


In [4]:
class ConvolutionalLayer():
    def __init__(self, n_C, n_H_prev, n_W_prev, n_C_prev, f=3, stride=1, pad=1):
        self.n_C = n_C
        self.n_H_prev = n_H_prev
        self.n_W_prev = n_W_prev
        self.n_C_prev = n_C_prev
        self.f = f
        self.stride = stride
        self.pad = pad
        
        self.n_H = self.get_n_H()
        self.n_W = self.get_n_W()
        
        self.W, self.W_memory, self.W_squared_sum_memory = self.initialize_W()
        self.b, self.b_memory, self.b_squared_sum_memory = self.initialize_b()
        
        self.A_prev = None
        
    def get_n_H(self):
        n_H = (self.n_H_prev - self.f + 2*self.pad) / self.stride + 1
        if n_H % 1 != 0:
            raise Exception("n_H is invalid, n_H={0}\n".format(n_H))
        return int(n_H)
    
    def get_n_W(self):
        n_W = (self.n_W_prev - self.f + 2*self.pad) / self.stride + 1
        if n_W % 1 != 0:
            raise Exception("n_W is invalid, n_W={0}\n".format(n_W))
        return int(n_W)
        
    def initialize_W(self):
        """
        W -- Weights, numpy array of shape (f, f, n_C_prev, n_C)
        """
        W = np.random.normal(loc=0.0, scale=1.0, size=(self.f, self.f, self.n_C_prev, self.n_C)) * 10e-2
        W_memory = np.zeros(W.shape)
        W_squared_sum_memory = np.zeros(W.shape)
        return W, W_memory, W_squared_sum_memory
    
    def initialize_b(self):
        """
        b -- Biases, numpy array of shape (1, 1, 1, n_C)
        """
        b = np.zeros((1,1,1,self.n_C))
        b_memory = np.zeros(b.shape)
        b_squared_sum_memory = np.zeros(b.shape)
        return b, b_memory, b_squared_sum_memory
        

    def zero_pad(self, X, pad):
        """
        Pad with zeros all images of the dataset X. The padding is applied to the height and width of an image, 
        as illustrated in Figure 1.

        Argument:
        X -- python numpy array of shape (m, n_H, n_W, n_C) representing a batch of m images
        pad -- integer, amount of padding around each image on vertical and horizontal dimensions

        Returns:
        X_pad -- padded image of shape (m, n_H + 2*pad, n_W + 2*pad, n_C)
        """

        ### START CODE HERE ### (≈ 1 line)
        X_pad = np.pad(X, ((0, 0), (pad, pad), (pad, pad), (0, 0)), 'constant', constant_values=0)
        ### END CODE HERE ###

        return X_pad
    
    def conv_single_step(self, a_slice_prev, W, b):
        """
        Apply one filter defined by parameters W on a single slice (a_slice_prev) of the output activation 
        of the previous layer.

        Arguments:
        a_slice_prev -- slice of input data of shape (f, f, n_C_prev)
        W -- Weight parameters contained in a window - matrix of shape (f, f, n_C_prev)
        b -- Bias parameters contained in a window - matrix of shape (1, 1, 1)

        Returns:
        Z -- a scalar value, the result of convolving the sliding window (W, b) on a slice x of the input data
        """

        ### START CODE HERE ### (≈ 2 lines of code)
        # Element-wise product between a_slice_prev and W. Do not add the bias yet.
        s = np.multiply(a_slice_prev, W)
        # Sum over all entries of the volume s.
        Z = np.sum(s)
        # Add bias b to Z. Cast b to a float() so that Z results in a scalar value.
        Z += float(b)
        ### END CODE HERE ###

        return Z
    
    def conv_forward(self, A_prev):
        """
        Implements the forward propagation for a convolution function

        Arguments:
        A_prev -- output activations of the previous layer, 
            numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
        W -- Weights, numpy array of shape (f, f, n_C_prev, n_C)
        b -- Biases, numpy array of shape (1, 1, 1, n_C)
        hparameters -- python dictionary containing "stride" and "pad"

        Returns:
        Z -- conv output, numpy array of shape (m, n_H, n_W, n_C)
        cache -- cache of values needed for the conv_backward() function
        """

        ### START CODE HERE ###
        # Retrieve dimensions from A_prev's shape (≈1 line)  
        #(m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape

        # Retrieve dimensions from W's shape (≈1 line)
        #(f, f, n_C_prev, n_C) = W.shape

        # Retrieve information from "hparameters" (≈2 lines)
        #stride = hparameters["stride"]
        #pad = hparameters["pad"]

        # Compute the dimensions of the CONV output volume using the formula given above. 
        # Hint: use int() to apply the 'floor' operation. (≈2 lines)
        #n_H = int((n_H_prev - f + 2*pad) / stride + 1)
        #n_W = int((n_W_prev - f + 2*pad) / stride + 1)
        
        m = A_prev.shape[0]
        # Initialize the output volume Z with zeros. (≈1 line)
        Z = np.zeros((m, self.n_H, self.n_W, self.n_C))

        # Create A_prev_pad by padding A_prev
        A_prev_pad = self.zero_pad(A_prev, self.pad)

        for i in range(m):               # loop over the batch of training examples
            a_prev_pad = A_prev_pad[i]               # Select ith training example's padded activation
            for h in range(self.n_H):           # loop over vertical axis of the output volume
                # Find the vertical start and end of the current "slice" (≈2 lines)
                vert_start = h * self.stride
                vert_end = vert_start + self.f

                for w in range(self.n_W):       # loop over horizontal axis of the output volume
                    # Find the horizontal start and end of the current "slice" (≈2 lines)
                    horiz_start = w * self.stride
                    horiz_end = horiz_start + self.f

                    for c in range(self.n_C):   # loop over channels (= #filters) of the output volume

                        # Use the corners to define the (3D) slice of a_prev_pad (See Hint above the cell). (≈1 line)
                        # f,f,n_C_prev
                        a_slice_prev = a_prev_pad[vert_start:vert_end, horiz_start:horiz_end,:]

                        # Convolve the (3D) slice with the correct filter W and bias b, to get back one output neuron. (≈3 line)
                        weights = self.W[:,:,:,c]
                        biases = self.b[:,:,:,c]
                        Z[i, h, w, c] = self.conv_single_step(a_slice_prev=a_slice_prev, W=weights, b=biases)

        ### END CODE HERE ###

        # Making sure your output shape is correct
        assert(Z.shape == (m, self.n_H, self.n_W, self.n_C))

        # Save information in "cache" for the backprop
        #cache = (A_prev, W, b, hparameters)
        self.A_prev = A_prev

        return Z
    
    def conv_backward(self, dZ, regularization_rate):
        """
        Implement the backward propagation for a convolution function

        Arguments:
        dZ -- gradient of the cost with respect to the output of the conv layer (Z), numpy array of shape (m, n_H, n_W, n_C)
        cache -- cache of values needed for the conv_backward(), output of conv_forward()

        Returns:
        dA_prev -- gradient of the cost with respect to the input of the conv layer (A_prev),
                   numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
        dW -- gradient of the cost with respect to the weights of the conv layer (W)
              numpy array of shape (f, f, n_C_prev, n_C)
        db -- gradient of the cost with respect to the biases of the conv layer (b)
              numpy array of shape (1, 1, 1, n_C)
        """

        ### START CODE HERE ###
        # Retrieve information from "cache"
        #(A_prev, W, b, hparameters) = cache
        A_prev = self.A_prev
        m = A_prev.shape[0]

        # Retrieve dimensions from A_prev's shape
        #(m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape

        # Retrieve dimensions from W's shape
        #(f, f, n_C_prev, n_C) = W.shape

        # Retrieve information from "hparameters"
        #stride = hparameters["stride"]
        #pad = hparameters["pad"]

        # Retrieve dimensions from dZ's shape
        #(m, n_H, n_W, n_C) = dZ.shape

        # Initialize dA_prev, dW, db with the correct shapes
        dA_prev = np.zeros((m, self.n_H_prev, self.n_W_prev, self.n_C_prev))                           
        dW = np.zeros((self.f, self.f, self.n_C_prev, self.n_C))
        db = np.zeros((1, 1, 1, self.n_C))

        # Pad A_prev and dA_prev
        A_prev_pad = self.zero_pad(X=A_prev, pad=self.pad)
        dA_prev_pad = self.zero_pad(X=dA_prev, pad=self.pad)

        for i in range(m):                       # loop over the training examples

            # select ith training example from A_prev_pad and dA_prev_pad
            a_prev_pad = A_prev_pad[i]
            da_prev_pad = dA_prev_pad[i]

            for h in range(self.n_H):                   # loop over vertical axis of the output volume
                for w in range(self.n_W):               # loop over horizontal axis of the output volume
                    for c in range(self.n_C):           # loop over the channels of the output volume

                        # Find the corners of the current "slice"
                        vert_start = self.stride * h
                        vert_end = vert_start + self.f
                        horiz_start = self.stride * w
                        horiz_end = horiz_start + self.f

                        # Use the corners to define the slice from a_prev_pad
                        a_slice = a_prev_pad[vert_start:vert_end, horiz_start:horiz_end, :]

                        # Update gradients for the window and the filter's parameters using the code formulas given above
                        da_prev_pad[vert_start:vert_end, horiz_start:horiz_end, :] += self.W[:,:,:,c] * dZ[i,h,w,c]
                        dW[:,:,:,c] += a_slice * dZ[i,h,w,c]
                        db[:,:,:,c] += dZ[i,h,w,c]

            # Set the ith training example's dA_prev to the unpadded da_prev_pad (Hint: use X[pad:-pad, pad:-pad, :])
            if self.pad != 0:
                dA_prev[i, :, :, :] = da_prev_pad[self.pad:-self.pad, self.pad:-self.pad, :]
            else:
                dA_prev[i, :, :, :] = da_prev_pad
        ### END CODE HERE ###

        # Making sure your output shape is correct
        assert(dA_prev.shape == (m, self.n_H_prev, self.n_W_prev, self.n_C_prev))

        return dA_prev, dW, db
    
    def update_parameters(self, dW, db, learning_rate, adam_beta_1, adam_beta_2, adam_iterator_counter):
        #print("W before")
        #print(self.W)
        #self.W = self.W - learning_rate * dW
        self.W_memory = adam_beta_1 * self.W_memory + (1 - adam_beta_1) * dW
        W_memory_bias_correction = self.W_memory / (1 - adam_beta_1**adam_iterator_counter)
        self.W_squared_sum_memory = \
            adam_beta_2 * self.W_squared_sum_memory + (1 - adam_beta_2) * np.power(dW,2)
        W_squared_sum_memory_bias_correction = self.W_squared_sum_memory / (1 - adam_beta_2**adam_iterator_counter)
        self.W = \
            self.W - learning_rate * W_memory_bias_correction / (np.sqrt(W_squared_sum_memory_bias_correction) + 1e-8)
        #print("W after")
        #print(self.W)
        
        #print("b before")
        #print(self.b)
        #self.b = self.b - learning_rate * db
        self.b_memory = adam_beta_1 * self.b_memory + (1 - adam_beta_1) * db
        b_memory_bias_correction = self.b_memory  / (1 - adam_beta_1**adam_iterator_counter)
        self.b_squared_sum_memory = \
            adam_beta_2 * self.b_squared_sum_memory + (1 - adam_beta_2) * np.power(db,2)
        b_squared_sum_memory_bias_correction = self.b_squared_sum_memory / (1 - adam_beta_2**adam_iterator_counter)
        self.b = \
            self.b - learning_rate * b_memory_bias_correction / (np.sqrt(b_squared_sum_memory_bias_correction) + 1e-8)
        #print("b after")
        #print(self.b)
    
    def __repr__(self):
        repr_text = "CONV: n_C:{0},n_H_prev:{1},n_W_prev:{2},n_C_prev:{3},f:{4},stride:{5},pad:{6},n_H:{7},n_W:{8}".format(
            self.n_C, self.n_H_prev, self.n_W_prev, self.n_C_prev, self.f, self.stride, self.pad,
            self.n_H, self.n_W)
        return repr_text

In [5]:
class FullyConnectedLayer(ConvolutionalLayer):
    def __init__(self, n_C, n_H_prev, n_W_prev, n_C_prev, f=None, stride=1, pad=0):
        assert(n_H_prev == n_W_prev)
        f = n_H_prev
        super().__init__(n_C, n_H_prev, n_W_prev, n_C_prev, f, stride, pad)

In [6]:
class PoolingLayer():
    def __init__(self, n_H_prev, n_W_prev, n_C_prev, f, stride, mode="max"):
        self.n_H_prev = n_H_prev
        self.n_W_prev = n_W_prev
        self.n_C_prev = n_C_prev
        self.f = f
        self.stride = stride
        self.mode = mode
        
        self.n_H = self.get_n_H()
        self.n_W = self.get_n_W()
        self.n_C = self.n_C_prev
        
        self.A_prev = None
        
    def get_n_H(self):
        n_H = (self.n_H_prev - self.f) / self.stride + 1
        if n_H % 1 != 0:
            raise Exception("n_H is invalid, n_H={0}\n".format(n_H))
        return int(n_H)
    
    def get_n_W(self):
        n_W = (self.n_W_prev - self.f) / self.stride + 1
        if n_W % 1 != 0:
            raise Exception("n_W is invalid, n_W={0}\n".format(n_W))
        return int(n_W)
    
    def pool_forward(self, A_prev, mode = "max"):
        """
        Implements the forward pass of the pooling layer

        Arguments:
        A_prev -- Input data, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
        hparameters -- python dictionary containing "f" and "stride"
        mode -- the pooling mode you would like to use, defined as a string ("max" or "average")

        Returns:
        A -- output of the pool layer, a numpy array of shape (m, n_H, n_W, n_C)
        cache -- cache used in the backward pass of the pooling layer, contains the input and hparameters 
        """

        # Retrieve dimensions from the input shape
        #(m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape

        # Retrieve hyperparameters from "hparameters"
        #f = hparameters["f"]
        #stride = hparameters["stride"]

        # Define the dimensions of the output
        #n_H = int(1 + (n_H_prev - f) / stride)
        #n_W = int(1 + (n_W_prev - f) / stride)
        #n_C = n_C_prev

        m = A_prev.shape[0]
        # Initialize output matrix A
        A = np.zeros((m, self.n_H, self.n_W, self.n_C))              

        ### START CODE HERE ###
        for i in range(m):                         # loop over the training examples
            for h in range(self.n_H):                     # loop on the vertical axis of the output volume
                # Find the vertical start and end of the current "slice" (≈2 lines)
                vert_start = self.stride * h
                vert_end = vert_start + self.f

                for w in range(self.n_W):                 # loop on the horizontal axis of the output volume
                    # Find the vertical start and end of the current "slice" (≈2 lines)
                    horiz_start = self.stride * w
                    horiz_end = horiz_start + self.f

                    for c in range (self.n_C):            # loop over the channels of the output volume

                        # Use the corners to define the current slice on the ith training example of A_prev, channel c. (≈1 line)
                        a_prev_slice = A_prev[i, vert_start:vert_end, horiz_start:horiz_end, c]

                        # Compute the pooling operation on the slice. 
                        # Use an if statement to differentiate the modes. 
                        # Use np.max and np.mean.
                        if self.mode == "max":
                            A[i, h, w, c] = np.max(a_prev_slice)
                        elif self.mode == "average":
                            A[i, h, w, c] = np.mean(a_prev_slice)
                        else:
                            raise Exception("Invalid mode of pooling\n")

        ### END CODE HERE ###

        # Store the input and hparameters in "cache" for pool_backward()
        #cache = (A_prev, hparameters)
        self.A_prev = A_prev

        # Making sure your output shape is correct
        assert(A.shape == (m, self.n_H, self.n_W, self.n_C))

        return A
    
    def create_mask_from_window(self, x):
        """
        Creates a mask from an input matrix x, to identify the max entry of x.

        Arguments:
        x -- Array of shape (f, f)

        Returns:
        mask -- Array of the same shape as window, contains a True at the position corresponding to the max entry of x.
        """

        ### START CODE HERE ### (≈1 line)
        mask = (x == x.max())
        ### END CODE HERE ###

        return mask
    
    def distribute_value(self, dz, shape):
        """
        Distributes the input value in the matrix of dimension shape

        Arguments:
        dz -- input scalar
        shape -- the shape (n_H, n_W) of the output matrix for which we want to distribute the value of dz

        Returns:
        a -- Array of size (n_H, n_W) for which we distributed the value of dz
        """

        ### START CODE HERE ###
        # Retrieve dimensions from shape (≈1 line)
        (n_H, n_W) = shape

        # Compute the value to distribute on the matrix (≈1 line)
        average = dz/(n_H*n_W)

        # Create a matrix where every entry is the "average" value (≈1 line)
        a = np.ones((n_H, n_W)) * average
        ### END CODE HERE ###

        return a
    
    def pool_backward(self, dA, mode = "max"):
        """
        Implements the backward pass of the pooling layer

        Arguments:
        dA -- gradient of cost with respect to the output of the pooling layer, same shape as A
        cache -- cache output from the forward pass of the pooling layer, contains the layer's input and hparameters 
        mode -- the pooling mode you would like to use, defined as a string ("max" or "average")

        Returns:
        dA_prev -- gradient of cost with respect to the input of the pooling layer, same shape as A_prev
        """

        ### START CODE HERE ###

        # Retrieve information from cache (≈1 line)
        #(A_prev, hparameters) = cache
        A_prev = self.A_prev

        # Retrieve hyperparameters from "hparameters" (≈2 lines)
        #stride = hparameters["stride"]
        #f = hparameters["f"]

        # Retrieve dimensions from A_prev's shape and dA's shape (≈2 lines)
        #m, n_H_prev, n_W_prev, n_C_prev = A_prev.shape
        #m, n_H, n_W, n_C = dA.shape
        
        m = A_prev.shape[0]
        # Initialize dA_prev with zeros (≈1 line)
        dA_prev = np.zeros(A_prev.shape)

        for i in range(m):                       # loop over the training examples

            # select training example from A_prev (≈1 line)
            a_prev = A_prev[i]

            for h in range(self.n_H):                   # loop on the vertical axis
                for w in range(self.n_W):               # loop on the horizontal axis
                    for c in range(self.n_C):           # loop over the channels (depth)

                        # Find the corners of the current "slice" (≈4 lines)
                        vert_start = self.stride * h
                        vert_end = vert_start + self.f
                        horiz_start = self.stride * w
                        horiz_end = horiz_start + self.f

                        # Compute the backward propagation in both modes.
                        if self.mode == "max":

                            # Use the corners and "c" to define the current slice from a_prev (≈1 line)
                            a_prev_slice = a_prev[vert_start:vert_end, horiz_start:horiz_end, c]
                            # Create the mask from a_prev_slice (≈1 line)
                            mask = self.create_mask_from_window(a_prev_slice)
                            # Set dA_prev to be dA_prev + (the mask multiplied by the correct entry of dA) (≈1 line)
                            dA_prev[i, vert_start: vert_end, horiz_start: horiz_end, c] += \
                                np.multiply(mask, dA[i, h, w, c])

                        elif self.mode == "average":

                            # Get the value a from dA (≈1 line)
                            da = dA[i, h, w, c]
                            # Define the shape of the filter as fxf (≈1 line)
                            shape = (self.f, self.f)
                            # Distribute it to get the correct slice of dA_prev. i.e. Add the distributed value of da. (≈1 line)
                            dA_prev[i, vert_start: vert_end, horiz_start: horiz_end, c] += \
                                self.distribute_value(da, shape=shape)
                        
                        else:
                            raise Exception("Invalid mode of pooling\n")

        ### END CODE ###

        # Making sure your output shape is correct
        assert(dA_prev.shape == A_prev.shape)

        return dA_prev
    
    
    def __repr__(self):
        repr_text = "POOL: n_H_prev:{0},n_W_prev:{1},n_C_prev:{2},f:{3},stride:{4},n_H:{5},n_W:{6},n_C:{7}".format(
            self.n_H_prev, self.n_W_prev, self.n_C_prev, self.f, self.stride, self.n_H, self.n_W, self.n_C)
        return repr_text

In [7]:
class ReLuLayer():
    alpha = 0
    
    def __init__(self):
        self.Z_prev = None
    
    def relu_forward(self, Z_prev):
        self.Z_prev = Z_prev
        relu_activation = np.maximum(0,Z_prev)
        assert relu_activation.shape == Z_prev.shape
        
        return relu_activation
    
    def relu_backward(self, dA):
        relu_grad = (self.Z_prev > 0) * dA
        assert relu_grad.shape == self.Z_prev.shape
        return relu_grad
    
    def __repr__(self):
        return "RELU LAYER"

In [8]:
class SigmoidLayer():
    
    def __init__(self):
        pass
    
    def sigmoid_forward(self, Z_prev):
        self.Z_prev = Z_prev
        self.sigmoid_activation = 1 / (1 + np.exp(-Z_prev))
        return self.sigmoid_activation
    
    def sigmoid_backward(self, dA):
        sigmoid_grad = np.multiply(self.sigmoid_activation, (1-self.sigmoid_activation))
        return sigmoid_grad
    
    def __repr__(self):
        return "SIGMOID LAYER"

In [9]:
class ConvolutionalNeuralNetwork():
    learning_rate = 0.001
    regularization_rate = 0.5
    adam_beta_1 = 0.9
    adam_beta_2 = 0.99
    adam_iterator_counter = 1
    
    def __init__(self, architecture, compute_validation):
        self.compute_validation = compute_validation
        
        self.layers = []
        for layer_idx, layer_info in enumerate(architecture):
            if layer_info[0] == "CONV":
                layer = \
                    ConvolutionalLayer(n_C=layer_info[1], 
                                       n_H_prev=layer_info[2], 
                                       n_W_prev=layer_info[3], 
                                       n_C_prev=layer_info[4], 
                                       f=layer_info[5], 
                                       stride=layer_info[6], 
                                       pad=layer_info[7])
            elif layer_info[0] == "FC":
                layer = \
                    FullyConnectedLayer(n_C=layer_info[1], 
                                        n_H_prev=layer_info[2], 
                                        n_W_prev=layer_info[3], 
                                        n_C_prev=layer_info[4])
            elif layer_info[0] == "POOL":
                layer = \
                    PoolingLayer(n_H_prev=layer_info[1],
                                n_W_prev=layer_info[2],
                                n_C_prev=layer_info[3],
                                f=layer_info[4],
                                stride=layer_info[5],
                                mode=layer_info[6])
            elif layer_info[0] == "RELU":
                layer = ReLuLayer()
            elif layer_info[0] == "SIGMOID":
                layer = SigmoidLayer()
            else:
                raise Exception("Invalid layer type {0}".format(layer_info[0]))
            self.layers.append(layer)
    
    def forward_prop(self, X):
        for layer in self.layers:
            if isinstance(layer, ConvolutionalLayer):
                print("forward propagating={0}".format(layer))
                X = layer.conv_forward(X)
            elif isinstance(layer, PoolingLayer):
                print("forward propagating={0}".format(layer))
                X = layer.pool_forward(X)
            elif isinstance(layer, ReLuLayer):
                print("forward propagating={0}".format(layer))
                X = layer.relu_forward(X)
            elif isinstance(layer, SigmoidLayer):
                print("forward propagating={0}".format(layer))
                X = layer.sigmoid_forward(X)
            else:
                raise Exception("Invalid layer\n")
        return X
    
    def back_prop(self, dA):
        for layer in reversed(self.layers):
            if isinstance(layer, ConvolutionalLayer):
                print("backpropagating={0}".format(layer))
                dA, dW, db = layer.conv_backward(dZ=dA, regularization_rate=self.regularization_rate)
                layer.update_parameters(dW=dW, db=db, learning_rate=self.learning_rate, 
                                        adam_beta_1=self.adam_beta_1, adam_beta_2=self.adam_beta_2,
                                        adam_iterator_counter=self.adam_iterator_counter)
            elif isinstance(layer, PoolingLayer):
                print("backpropagating={0}".format(layer))
                dA = layer.pool_backward(dA)
            elif isinstance(layer, ReLuLayer):
                print("backpropagating={0}".format(layer))
                """ HEYY WTF THIS IS NOT THE CORRECT BACKPROP, DA IS NOT DZ"""
                dA = layer.relu_backward(dA)
            elif isinstance(layer, SigmoidLayer):
                print("backpropagating={0}".format(layer))
                """ HEYY WTF THIS IS NOT THE CORRECT BACKPROP, DA IS NOT DZ"""
                dA = layer.sigmoid_backward(dA)
            else:
                raise Exception("Invalid layer\n")
        self.adam_iterator_counter = self.adam_iterator_counter + 1
    
    def softmax_classifier_forward(self, X):
        m = X.shape[0]
        X = X.reshape(m,-1)
        #Z = Z.reshape(m,-1)
        #Z = Z - np.max(Z, axis=1, keepdims=True)
        #scores = Z
        #exp_scores = np.exp(scores - np.max(scores, axis=1, keepdims=True))
        #probs = exp_scores/np.sum(exp_scores,axis=1,keepdims=True)
        #exp_scores = np.exp(Z)
        #exp_scores_normalizer = np.sum(np.exp(Z), axis=1, keepdims=True)
        #probabilities = exp_scores/exp_scores_normalizer
        #probabilities = probs
        
        
        shifted_logits = X - np.max(X, axis=1, keepdims=True)
        Z = np.sum(np.exp(shifted_logits), axis=1, keepdims=True)
        log_probs = shifted_logits - np.log(Z)
        probs = np.exp(log_probs)
        
        #print("probs")
        #print(probs)
        
        assert np.sum(probs, axis=1, keepdims=True).all() == 1.0
        return probs, log_probs
    
    def softmax_classifier_backward(self, probs, y):
        m = probs.shape[0]
        c = probs.shape[1]
        y = y.reshape(m,)
        #dscores = probabilities
        #dscores[np.arange(m),y] -= 1
        #dscores /= m
        
        probs[np.arange(m),y] -= 1
        
        #print("y")
        #print(y)
        
        
        #print("grad")
        #print(probs)
        
        probs /= m
        
        return probs.reshape(m, 1, 1, c)
    
    def full_backprop(self, probs, y):
        dZ = self.softmax_classifier_backward(probs=probs, y=y)
        self.back_prop(dA=dZ)
    
    def compute_cross_entropy(self, log_probs, y, regularization_rate):
        m = log_probs.shape[0]
        y = y.reshape(m,)
        #correct_logprobs = -np.log(probabilities[np.arange(m),y])
        #print("correct_logprobs")
        #print(correct_logprobs)
        #data_loss = np.sum(correct_logprobs)/m
        #loss = data_loss
        
        #print("log_probs")
        #print(log_probs)
        #print(log_probs.shape)
        #print(log_probs[np.arange(m),y])
        #print(log_probs[np.arange(m),y].shape)
        
        loss = -np.sum(log_probs[np.arange(m),y]) / m
        
        
        return loss
    
    def predict(self, probabilities, y):
        m = probabilities.shape[0]
        y = y.reshape(m,)
        predicitons = np.argmax(probabilities, axis=1)
        accuracy = accuracy_score(y, predicitons)
        return predicitons, accuracy
    
    def inference(self, X, y):
        Z = self.forward_prop(X=X)
        probs, log_probs = self.softmax_classifier_forward(X=Z)
        loss = self.compute_cross_entropy(log_probs=log_probs, y=y,
                                          regularization_rate=self.regularization_rate)
        predictions, accuracy = self.predict(probabilities=probs, y=y)
        return probs, loss, predictions, accuracy
    
    def train(self, X, y, epoch, minibatch_size, X_val, y_val):
        
        training_loss = []
        training_accuracy = []
        
        validation_loss = []
        validation_accuracy = []
        
        for epoch_n in range(epoch):
            
            print("EPOCH:{0} STARTED\n".format(epoch_n+1))
            
            minibatch_iterator = range(0, X.shape[0], minibatch_size)
            
            for minibatch_idx, i in enumerate(minibatch_iterator):
                
                print("EPOCH:{0}, MINIBATCH_ID:{1}/{2} STARTED\n".format(
                    epoch_n+1, minibatch_idx+1, len(list(minibatch_iterator))))
                
                X_minibatch = X[i:i+minibatch_size]
                y_minibatch = y[i:i+minibatch_size]
                
                print("\ttraining inference...")
                
                probs, loss, predictions, accuracy = self.inference(X=X_minibatch, y=y_minibatch)
                
                training_loss.append(loss)
                training_accuracy.append(accuracy)
                
                print("\ttraining backpropagation...")
                
                self.full_backprop(probs=probs, y=y_minibatch)
                
                if self.compute_validation:
                    print("\tvalidation inference...")
                
                    probs_val, loss_val, predictions_val, accuracy_val = self.inference(X=X_val, y=y_val)
                
                    validation_loss.append(loss_val)
                    validation_accuracy.append(accuracy_val)
                    
                    validation_info = "validation loss={0:.4f}, validation accuracy={1:.4f}%".format(loss_val, accuracy_val*100)
                
                print("\nEPOCH:{0}, MINIBATCH_ID:{1}/{2} RESULTS".format(
                    epoch_n+1, minibatch_idx+1, len(list(minibatch_iterator))))
                training_info = "training loss={0:.4f}, training accuracy={1:.4f}%".format(loss, accuracy*100)
                #validation_info = "validation loss={0:.4f}, validation accuracy={1:.4f}%".format(loss_val, accuracy_val*100)
                info = training_info + "\n" + validation_info if self.compute_validation else training_info
                print("{0}".format(info))
                
            print("\nEPOCH:{0} FINISHED\n".format(epoch_n+1))
            print("_"*10)
            print("\n")
        
        
        fig, ax = plt.subplots(4,1, figsize=(12,15))
        
        left  = 0.125  # the left side of the subplots of the figure
        right = 0.9    # the right side of the subplots of the figure
        bottom = 0.1   # the bottom of the subplots of the figure
        top = 0.9      # the top of the subplots of the figure
        wspace = 0.2   # the amount of width reserved for blank space between subplots
        hspace = 2.2   # the amount of height reserved for white space between subplots
        
        plt.subplots_adjust(left=left, bottom=bottom, right=right, top=top, wspace=wspace, hspace=hspace)
        
        ax[0].plot(range(len(training_loss)), training_loss)
        ax[0].set_xlabel("Minibatch runs")
        ax[0].set_ylabel("Loss")
        ax[0].set_title("Training loss")
                      
        ax[1].plot(range(len(training_accuracy)), np.array(training_accuracy)*100)
        ax[1].set_xlabel("Minibatch runs")
        ax[1].set_ylabel("Accuracy [%]")
        ax[1].set_title("Training accuracy")
        
        ax[2].plot(range(len(validation_loss)), validation_loss)
        ax[2].set_xlabel("Minibatch runs")
        ax[2].set_ylabel("Loss")
        ax[2].set_title("Validation loss")
                      
        ax[3].plot(range(len(validation_accuracy)), np.array(validation_accuracy)*100)
        ax[3].set_xlabel("Minibatch runs")
        ax[3].set_ylabel("Accuracy [%]")
        ax[3].set_title("Validation accuracy")
        
    
    def __repr__(self):
        repr_text = []
        for layer_idx, layer in enumerate(self.layers):
            repr_text.append("Index={0}, layer={1}".format(layer_idx, layer))
            repr_text.append("\n")
        return "".join(repr_text)

In [10]:
# Random architecture.

# CONV: n_C, n_H_prev, n_W_prev, n_C_prev, f, stride, pad
# CONV: (n_H_prev - f + 2*pad) / stride + 1
# POOL: n_H_prev, n_W_prev, n_C_prev, f, stride, mode="max" or "average"

l0 = ("CONV", 16, 28, 28, 1, 3, 1, 1)
l0_relu = ("RELU",)
l1 = ("POOL", 28, 28, 16, 2, 2, "max")
l2 = ("FC", 16, 14, 14, 16)
l2_relu = ("RELU",)
l3 = ("FC", 10, 1, 1, 16)

architecture=[l0, l0_relu, l1, l2, l2_relu, l3]

#CNN = ConvolutionalNeuralNetwork(architecture=architecture)
#print(CNN)
#CNN.train(X=x_train_1, y=y_train_1, epoch=5, minibatch_size=32)

In [None]:
# LeNet5 architecture.

# CONV: n_C, n_H_prev, n_W_prev, n_C_prev, f, stride, pad
# CONV: (n_H_prev - f + 2*pad) / stride + 1
# POOL: n_H_prev, n_W_prev, n_C_prev, f, stride, mode="max" or "average"

l1 = ("CONV", 6, 28, 28, 1, 5, 1, 0)
l1_relu = ("RELU",)
l2 = ("POOL", 24, 24, 6, 2, 2, "average")
l3 = ("CONV", 16, 12, 12, 6, 5, 1, 0)
l3_relu = ("RELU",)
l4 = ("POOL", 8, 8, 16, 2, 2, "average")
l5 = ("FC", 120, 4, 4, 16)
l5_relu = ("RELU",)
l6 = ("FC", 84, 1, 1, 120)
l6_relu = ("RELU",)
l7 = ("FC", 10, 1, 1, 84)

LeNet5_architecture = [l1, l1_relu, l2, l3, l3_relu, l4, l5, l5_relu, l6, l6_relu, l7]

LeNet5 = ConvolutionalNeuralNetwork(architecture=LeNet5_architecture, compute_validation=True)
print(LeNet5)

train_start_time = time.time()

LeNet5.train(X=x_train, y=y_train, epoch=1, minibatch_size=32, X_val=x_val, y_val=y_val)

training_time = time.time() - train_start_time

print("\nTraining finished in {0:.2f} seconds".format(training_time))

Index=0, layer=CONV: n_C:6,n_H_prev:28,n_W_prev:28,n_C_prev:1,f:5,stride:1,pad:0,n_H:24,n_W:24
Index=1, layer=RELU LAYER
Index=2, layer=POOL: n_H_prev:24,n_W_prev:24,n_C_prev:6,f:2,stride:2,n_H:12,n_W:12,n_C:6
Index=3, layer=CONV: n_C:16,n_H_prev:12,n_W_prev:12,n_C_prev:6,f:5,stride:1,pad:0,n_H:8,n_W:8
Index=4, layer=RELU LAYER
Index=5, layer=POOL: n_H_prev:8,n_W_prev:8,n_C_prev:16,f:2,stride:2,n_H:4,n_W:4,n_C:16
Index=6, layer=CONV: n_C:120,n_H_prev:4,n_W_prev:4,n_C_prev:16,f:4,stride:1,pad:0,n_H:1,n_W:1
Index=7, layer=RELU LAYER
Index=8, layer=CONV: n_C:84,n_H_prev:1,n_W_prev:1,n_C_prev:120,f:1,stride:1,pad:0,n_H:1,n_W:1
Index=9, layer=RELU LAYER
Index=10, layer=CONV: n_C:10,n_H_prev:1,n_W_prev:1,n_C_prev:84,f:1,stride:1,pad:0,n_H:1,n_W:1

EPOCH:1 STARTED

EPOCH:1, MINIBATCH_ID:1/369 STARTED

	training inference...
forward propagating=CONV: n_C:6,n_H_prev:28,n_W_prev:28,n_C_prev:1,f:5,stride:1,pad:0,n_H:24,n_W:24
forward propagating=RELU LAYER
forward propagating=POOL: n_H_prev:24,n_

In [1]:
probs_test, loss_test, predictions_test, accuracy_test = LeNet5.inference(X=x_test, y=y_test)
print("\Testing info: loss={0:.4f}, accuracy={1:.4f}%".format(loss_test, accuracy_test*100))

NameError: name 'LeNet5' is not defined