CNN model

In [1]:
import os
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

###### import your HW4 code######
from Dense import Dense
from Activation import Activation
from Loss import compute_BCE_loss
from Predict import predict
##################################

output = {}
seed = 1
np.random.seed(seed)

In [None]:
class Conv():
    def __init__(self, filter_size=2, input_channel=3, output_channel=8, pad=1, stride=1, seed=1):

        self.filter_size= filter_size
        self.input_channel=input_channel
        self.output_channel=output_channel
        self.seed = seed
        self.pad = pad
        self.stride = stride

        self.parameters = {'W':None, 'b': None}
        self.initialize_parameters()

    def initialize_parameters(self):
        """
        self.parameters -- python dictionary containing your parameters:
                           W -- weight matrix of shape (filter_size, filter_size, input channel size, output channel size)
                           b -- bias vector of shape (1, 1, 1, output channel size)
        """
        np.random.seed(seed)
        sd = np.sqrt(6.0 / (self.input_channel + self.output_channel))
        W = np.random.uniform(-sd, sd, (self.filter_size,self.filter_size,self.input_channel,self.output_channel))
        b = np.zeros((1, 1, 1, self.output_channel))

        assert(W.shape == (self.filter_size,self.filter_size,self.input_channel,self.output_channel))
        assert(b.shape == (1,1,1,self.output_channel))

        self.parameters['W'] = W
        self.parameters['b'] = b

In [None]:
def zero_pad(X, pad):
    """
    Pad all images in the dataset X with zeros. The padding should be applied to both the height and width of each image.

    Argument:
    X -- python numpy array of shape (m, n_H, n_W, n_C), where m represent the number of examples.
    pad -- integer, amount of padding around each image on vertical and horizontal dimensions

    Returns:
    X_pad -- padded image of shape (m, n_H + 2*pad, n_W + 2*pad, n_C)
    """

    ### START CODE HERE ###
    X_pad = np.pad(X, ((0,0),(pad, pad), (pad, pad), (0,0)), "constant", constant_values=(0,0))
    ### END CODE HERE ###

    return X_pad

In [None]:
def conv_single_step(self, a_slice_prev, W, b):
        """
        Arguments:
        a_slice_prev -- slice of previous activation layer output with shape (filter_size, filter_size, n_C_prev)
        W -- Weight parameters contained in a window - matrix of shape (filter_size, filter_size, n_C_prev)
        b -- Bias parameters contained in a window - matrix of shape (1, 1, 1)

        Returns:
        Z -- a scalar value, result of convolving the sliding window (W, b) on a slice x of the input data
        """

        ### START CODE HERE ### (≈ 3 lines)
        # Step 1: Element-wise product to a_slice_prev and W
        filtered = a_slice_prev * W
        # Step 2: Sum all values to get a single scalar
        Z = np.sum(filtered)
        # Step 3: Add the bias
        Z = Z + np.squeeze(b)
        ### END CODE HERE ###

        return Z

Conv.conv_single_step = conv_single_step

In [None]:
def forward(self, A_prev):
    """
    Implements the forward propagation for a convolution layer

    Arguments:
    A_prev -- output activations of the previous layer, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)

    Returns:
    Z -- conv output, numpy array of shape (m, n_H, n_W, n_C)
    """

    ### START CODE HERE ###
    # Retrieve dimensions from A_prev's shape
    (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape

    # Retrieve dimensions from W's shape
    (f, f, n_C_prev, n_C) = self.parameters["W"].shape


    # Step 1: Output Dimension Calculation
    pad = self.pad
    stride = self.stride
    n_H = int(np.floor(n_H_prev - self.filter_size + 2 * pad)/self.stride + 1)
    n_W = int(np.floor(n_W_prev - self.filter_size + 2 * pad)/self.stride + 1)

    # Initialize the output volume Z with zeros
    Z = np.zeros((m, n_H, n_W, n_C))
    # Step 2: Padding
    A_prev_pad = zero_pad(A_prev, pad)

    # Step 3: Loop Through Training Examples
    for i in range(m):                                 # loop over the batch of training examples
        for h in range(n_H):                           # loop over vertical axis of the output volume
            for w in range(n_W):                       # loop over horizontal axis of the output volume
                for c in range(n_C):                   # loop over channels (= #filter) of the output volume


                    # Step 3-1: Extracting slices
                    vert_start = h * self.stride
                    vert_end = vert_start + self.filter_size
                    horiz_start = w * self.stride
                    horiz_end = horiz_start + self.filter_size
                    a_slice_prev = A_prev_pad[i, vert_start:vert_end, horiz_start:horiz_end, :]

                    # Step 3-2: Applying Filters
                    Z[i, h, w, c] = self.conv_single_step(a_slice_prev, self.parameters["W"][:,:,:,c], self.parameters["b"][:,:,:,c])

    ### END CODE HERE ###

    # Making sure your output shape is correct
    assert(Z.shape == (m, n_H, n_W, n_C))

    # Save information in "cache" for the backward pass
    self.cache = A_prev

    return Z

Conv.forward = forward

In [None]:
def backward(self, dZ):
    """
    Implement the backward propagation for a convolution layer

    Arguments:
    dZ -- gradient of the cost with respect to the output of the conv layer (Z), numpy array of shape (m, n_H, n_W, n_C)

    Returns:
    dA_prev -- gradient of the cost with respect to the input of the conv layer (A_prev),
                numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
    """


    A_prev = self.cache

    ### START CODE HERE ###

    # Retrieve dimensions from A_prev's shape
    (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape

    # Retrieve dimensions from W's shape
    (f, f, n_C_prev, n_C) = self.parameters["W"].shape

    # Retrieve dimensions from dZ's shape
    (m, n_H, n_W, n_C) = dZ.shape

    # Step 1: Initialize Gradients
    dA_prev = np.zeros((m, n_H_prev, n_W_prev, n_C_prev))
    dW = np.zeros((f, f, n_C_prev, n_C))
    db = np.zeros((1, 1, 1, n_C))

    # Step 2: Padding
    A_prev_pad = zero_pad(A_prev, self.pad)
    dA_prev_pad = zero_pad(dA_prev, self.pad)

    # Step 3: Loop Through Training Examples
    for i in range(m):                         # loop over the batch of training examples
        for h in range(n_H):                   # loop over vertical axis of the output volume
            for w in range(n_W):               # loop over horizontal axis of the output volume
                for c in range(n_C):           # loop over the channels of the output volume

                    # Step 3-1: Extracting slices
                    vert_start = h * self.stride
                    vert_end = vert_start + self.filter_size
                    horiz_start = w * self.stride
                    horiz_end = horiz_start + self.filter_size
                    a_slice = A_prev_pad[i, vert_start:vert_end, horiz_start:horiz_end, :]

                    # Step 3-2: Update the Gradients
                    dA_prev_pad[i, vert_start:vert_end, horiz_start:horiz_end, :] += self.parameters["W"][:,:,:,c] * dZ[i, h, w, c]
                    dW[:,:,:,c] += (1 / m)* a_slice * dZ[i, h, w, c]
                    db[:,:,:,c] += (1/m) * dZ[i, h, w, c]

        # Step 4: Remove Padding
        dA_prev[i, :, :, :] = dA_prev_pad[i, self.pad:-self.pad, self.pad:-self.pad, :]

    ### END CODE HERE ###

    assert(dA_prev.shape == (m, n_H_prev, n_W_prev, n_C_prev))

    self.dW = dW
    self.db = db

    return dA_prev

Conv.backward = backward

In [None]:
def update(self, learning_rate):
    """
    Update parameters using gradient descent

    Arguments:
    learning rate -- step size
    """

    ### START CODE HERE ###
    self.parameters["W"] = self.parameters["W"] - learning_rate * self.dW
    self.parameters["b"] = self.parameters["b"] - learning_rate * self.db
    ### END CODE HERE ###

Conv.update = update

In [None]:
class MaxPool():
    def __init__(self, pool_size=2, stride=2):

        self.pool_size = pool_size
        self.stride = stride

    def create_mask_from_window(self, x):
        """
        Creates a mask from an input x to identify the max entry of x.

        Arguments:
        x -- Array of shape (filter_size, filter_size)

        Returns:
        mask -- Array of the same shape as filter, contains a True at the position corresponding to the max entry of x.
        """

        mask = x == np.max(x)

        return mask
    def forward(self, A_prev):
        """
        Implements the forward pass of the max pooling layer

        Arguments:
        A_prev -- Input data, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)

        Returns:
        A -- output of the pool layer, a numpy array of shape (m, n_H, n_W, n_C)
        """

        ### START CODE HERE ###
        # retrieve dimensions from the input shape
        (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape


        # Step 1: Output Dimension Calculation
        n_H = int((n_H_prev - self.pool_size)/self.stride) + 1
        n_W = int((n_W_prev - self.pool_size)/self.stride) + 1
        n_C = n_C_prev

        # initialize output matrix A with zeros
        A = np.zeros((m, n_H, n_W, n_C))

        # Step 2: Loop Through Training Examples
        for i in range(m):                           # loop over the batch of training examples
            for h in range(n_H):                     # loop on the vertical axis of the output volume
                for w in range(n_W):                 # loop on the horizontal axis of the output volume
                    for c in range (n_C):            # loop over the channels of the output volume

                        # Step 2-1: Extracting slices
                        vert_start = h * self.stride
                        vert_end = vert_start + self.pool_size
                        horiz_start = w * self.stride
                        horiz_end = horiz_start + self.pool_size
                        a_prev_slice = A_prev[i][vert_start:vert_end, horiz_start:horiz_end, c]

                        # Step 2-2: Applying Maxpooling
                        A[i, h, w, c] = np.max(a_prev_slice)

        ### END CODE HERE ###

        # Store the input in "cache" for backward pass
        self.cache = A_prev

        # Making sure your output shape is correct
        assert(A.shape == (m, n_H, n_W, n_C))

        return A

    def backward(self, dA):
        """
        Implements the backward pass of the max pooling layer

        Arguments:
        dA -- gradient of cost with respect to the output of the pooling layer, same shape as A

        Returns:
        dA_prev -- gradient of cost with respect to the input of the pooling layer, same shape as A_prev
        """

        # Retrieve information from cache
        A_prev = self.cache

        ### START CODE HERE ###

        # Retrieve dimensions from A_prev's shape and dA's shape
        m, n_H_prev, n_W_prev, n_C_prev = A_prev.shape
        m, n_H, n_W, n_C = dA.shape

        # Step 1: Initialize Gradients
        dA_prev = np.zeros((m, n_H_prev, n_W_prev, n_C_prev))

        # Step 2: Loop Through Training Examples
        for i in range(m):                           # loop over the batch of training examples
            for h in range(n_H):                     # loop on the vertical axis of the output volume
                for w in range(n_W):                 # loop on the horizontal axis of the output volume
                    for c in range (n_C):            # loop over the channels of the output volume

                        # Step 2-1: Extracting slices
                        vert_start = h * self.stride
                        vert_end = vert_start + self.pool_size
                        horiz_start = w * self.stride
                        horiz_end = horiz_start + self.pool_size
                        a_prev_slice = A_prev[i][vert_start:vert_end, horiz_start:horiz_end, c]

                        # Step 2-2: Pass through the Gradients
                        mask = self.create_mask_from_window(a_prev_slice)
                        dA_prev[i][vert_start:vert_end, horiz_start:horiz_end, c] = mask * dA[i, h, w, c]

        ### END CODE HERE ###

        # Make sure your output shape is correct

        assert(dA_prev.shape == A_prev.shape)

        return dA_prev



In [None]:
class Flatten():
    def __init__(self):
        pass

    def forward(self, A_prev):
        """
        Implements the forward pass of the flatten layer

        Arguments:
        A_prev -- Input data, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)

        Returns:
        A -- output of the flatten layer, a 2-dimensional array of shape (m, (n_H_prev * n_W_prev * n_C_prev))
        """

        # Save information in "cache" for the backward pass
        self.cache = A_prev.shape

        ### START CODE HERE ###
        A = A_prev.reshape(A_prev.shape[0], -1)
        ### END CODE HERE ###
        return A

    def backward(self, dA):
        """
        Implements the backward pass of the flatten layer

        Arguments:
        dA -- Input data, a 2-dimensional array

        Returns:
        dA_prev -- An array with its original shape (the output shape of its' previous layer).
        """
        ### START CODE HERE ###
        dA_prev = dA.reshape(self.cache)
        ### END CODE HERE ###
        return dA_prev



In [None]:
class Model():
    def __init__(self):
        self.layers=[]

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, X):
        A = X

        ### START CODE HERE ###
        for l in range(len(self.layers)):
            A = self.layers[l].forward(A)
        ### END CODE HERE ###
        return A

    def backward(self, AL=None, Y=None):
        L = len(self.layers)

        ### START CODE HERE ###
        e = 10**(-5)
        dAL = (-1)*(Y/(AL + e) - (1 - Y)/(1 - AL + e))
        # Lth layer (SIGMOID -> LINEAR) gradients. Inputs: "dAL". Outputs: "dA_prev"
        dZ = self.layers[L-1].backward(dA=dAL)
        dA_prev = self.layers[L-2].backward(dZ)
        # Loop from l=L-2 to l=0
        for l in reversed(range(L-2)):
            dA_prev = self.layers[l].backward(dA_prev)
        ### END CODE HERE ###

        return dA_prev

    def update(self, learning_rate):

        # Only convolution layer and dense layer have to update parameters
        ### START CODE HERE ###
        for l in range(len(self.layers)):
            if self.layers[l].__class__.__name__ == "Conv" or self.layers[l].__class__.__name__ == "Dense":
                self.layers[l].update(learning_rate)
        ### END CODE HERE ###


In [None]:
# Use np.load to load the data from npz file
### START CODE HERE ###
data = np.load("data.npz")
X_train = data['X_train']
y_train = data['y_train']
X_test = data['X_test']
### END CODE HERE ###

# plot first few images
for i in range(9):
    # define subplot
    plt.subplot(330 + 1 + i)
    # plot raw pixel data
    plt.imshow(X_train[i].squeeze(), cmap='gray', vmin=0, vmax=1)
    plt.title(y_train[i])
# show the figure
plt.show()

# check the shape of training data and testing data
print('Train: X=%s, y=%s' % (X_train.shape, y_train.shape))
print('Test: X=%s' % (X_test.shape, ))

#You can split training and validation set here using train_test_split (Optional)
### START CODE HERE ###
split_ratio = 0.9

# Split the data into training and validation sets
x1_train = X_train[:int(X_train.shape[0] * split_ratio)]
y1_train = y_train[:int(y_train.shape[0] * split_ratio)]
x_val = X_train[int(X_train.shape[0] * split_ratio):]
y_val = y_train[int(y_train.shape[0] * split_ratio):]
### END CODE HERE ###

In [None]:
def random_mini_batches(X, Y, mini_batch_size = 64):
    """
    Creates a list of random minibatches from (X, Y)

    Arguments:
    X -- input data, of shape !!!!!!!!!!!(number of examples ,input size)!!!!!!!!!!!
    Y -- true "label" vector, of shape (number of classes, number of examples)
    mini_batch_size -- size of the mini-batches, integer

    Returns:
    mini_batches -- list of synchronous (mini_batch_X, mini_batch_Y)
    """

    m = X.shape[0]  # number of training examples
    mini_batches = []

    # GRADED CODE: Binary classification
    ### START CODE HERE ###

    # Step 1: Shuffle (X, Y)
    permutation = list(np.random.permutation(m))
    shuffled_X = X[permutation, :]
    shuffled_Y = Y[permutation, :]

    inc = mini_batch_size

    # Step 2 - Partition (shuffled_X, shuffled_Y).
    # Cases with a complete mini batch size only i.e each of 64 examples.
    num_complete_minibatches = math.floor(m / mini_batch_size) # number of mini batches of size mini_batch_size in your partitionning
    for k in range(0, num_complete_minibatches):
        mini_batch_X = shuffled_X[k * mini_batch_size : (k+1) * mini_batch_size,:]
        mini_batch_Y = shuffled_Y[k * mini_batch_size : (k+1) * mini_batch_size,:]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)

    # For handling the end case (last mini-batch < mini_batch_size i.e less than 64)
    if m % mini_batch_size != 0:
        mini_batch_X = shuffled_X[num_complete_minibatches * mini_batch_size :,]
        mini_batch_Y = shuffled_Y[num_complete_minibatches * mini_batch_size :,]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)


    return mini_batches

    ### END CODE HERE ###

In [None]:
### START CODE HERE ###
learning_rate = 3e-5
num_iterations = 20
batch_size = 64
costs = []   # keep track of cost


# Initialize the model
model = Model()

# Build the model by adding layers
model=Model()
model.add(Conv(filter_size=3, input_channel=3, output_channel=8, pad=1, stride=2))
model.add(Activation("relu", None))
model.add(MaxPool(pool_size=2, stride=2))
model.add(Flatten())
model.add(Dense(32, 1))
model.add(Activation("sigmoid", None))

# Loop (gradient descent)
for i in range(0, num_iterations):
    print("epoch: ",i)
    mini_batches = random_mini_batches(X_train, y_train, batch_size)

    for batch in mini_batches:
        x_batch, y_batch = batch

        # forward
        AL = model.forward(x_batch)

        # compute cost
        cost = compute_BCE_loss(AL, y_batch)

        # backward
        dA_prev = model.backward(AL, y_batch)

        # update
        model.update(learning_rate)

    # AL = model.forward(A)
    # dA_prev = model.backward(AL=AL, Y=Y)
    print ("Cost after iteration %i: %f" %(i, cost))
    costs.append(cost)

### END CODE HERE ###