<a href="https://colab.research.google.com/github/stanislavlia/DeepLearning.AI-specialization/blob/main/DeepNN2.0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np

In [2]:
class DeepNN():
    def __init__(self, X, Y, dims_of_layers, activations, alpha = 0.01, l2_reg = 0):
        #dims_of_layers - list of number of units in each layer (first element - num of features in input)
        #activations - activation function applied to each layer

        #dims_of_layers[0] - n of features in input
        #activations[0] - activation for first hidden layer
        #we support only 3 activation funcs: linear, sigmoid, relu

        #X.shape should be (n_features, m_examples)
        #Y.shape should be (1, m_examples)

        #regularization - regularization rate
        self.X = X
        self.Y = Y

        self.m_examples = X.shape[1]

        self.dims_of_layers = dims_of_layers
        self.n_layers = len(activations)

        self.activations = activations
        self.params = dict()

        self.learning_history = []

        self.alpha = alpha
        self.l2_reg = l2_reg

        #setting cache dicts for backpropogation

        self.cache = dict()

    def initialize_params(self):
      #Now, we are going to use He-initialization of weights

        for i in range(1, len(self.activations) + 1):

            #setting parameters layer by layer
            self.params["W" + str(i)] = np.random.randn(self.dims_of_layers[i], self.dims_of_layers[i-1])
            #multiplying by constant according to He init
            self.params["W" + str(i)] *=  np.sqrt(2 / (self.dims_of_layers[i-1]))


            self.params["b" + str(i)] = np.zeros((self.dims_of_layers[i], 1))


    def make_minibatches(self, batch_size):
      #split X,y into minibatches and return list of (X_i, Y_i)

      batches = []
      complete_batches = self.m_examples // batch_size

      for k in range(0, complete_batches):

        #extracting a particular slice of data
        X_k = self.X[:, k * batch_size : (k + 1) * batch_size]
        Y_k = self.Y[:, k * batch_size : (k + 1) * batch_size]

        minibatch = (X_k, Y_k)
        batches.append(minibatch)

      #add incomplete batch in case we have remaining examples
      if (self.m_examples % batch_size != 0):
        X_k = self.X[:, complete_batches * batch_size : ]
        Y_k = self.Y[:, complete_batches * batch_size : ]
        minibatch = (X_k, Y_k)
        batches.append(minibatch)

      return (batches)



    def activation(self, Z, function="linear"):
        if function == "linear":
            return Z

        if function == "sigmoid":
            return 1 / (1 + np.exp(-Z))

        if function == "relu":
            return Z * (Z > 0)


    def deriv_activation(self, Z, function):
        if function == "linear":
            return 1.

        if function == "sigmoid":
            sigm_z = self.activation(Z, "sigmoid")

            return sigm_z *(1 - sigm_z)

        if function == "relu":
            return 1. * (Z > 0)



    def forward_propogation(self, X): #TOFIX

        #X.shape = (n_features, m_examples)

        A_prev = X
        self.cache["A0"]  = A_prev

        Z_current = np.dot(self.params["W1"], A_prev) + self.params["b1"]
        A_current = self.activation(Z_current, function=self.activations[0])

        self.cache["Z1"] = Z_current
        self.cache["A1"] = A_current

        for i in range(1, len(self.activations)):
            A_prev = A_current

            #A_prev - cache["A" + str(i)]
            Z_current = np.dot(self.params["W" + str(i+1)], A_prev) + self.params["b" + str(i+1)]
            A_current = self.activation(Z_current, function=self.activations[i])

            #keeping values in cache for backprop
            self.cache["Z" + str(i+1)] = Z_current
            self.cache["A" + str(i+1)] = A_current


        predictions = A_current

        return predictions

    def compute_reg_penalty(self): #TOFIX

      penalty = 0
      for l in range(1, self.n_layers + 1):
        W_l = self.params["W" + str(l)]
        penalty += np.sum(np.square(W_l))

      penalty = penalty * (self.l2_reg / (2 * self.m_examples))

      return penalty




    def compute_cost(self, predictions, cost_function): #TOFIX
        #leave cost func as a parameter
        #so that we can use it futher not only for classification

        if cost_function == "cross_entropy":

            #lets cut off a  tiny constant to avoid log0 problem
            epsilon = 10 ** -15

            predictions = np.clip(predictions, epsilon, 1-epsilon)

            #BinaryCrossEntropy

            cost = (self.Y * np.log(predictions) +
                    (1 - self.Y) * np.log(1 - (predictions)) ) * (-1 / self.m_examples)

            cost = np.sum(cost, axis=1, keepdims=True) #sum up the columns

        if cost_function == "mse":

            cost = np.sum(((predictions - self.Y) ** 2) * (2 / self.m_examples), axis=1, keepdims=True)


        #computing regularization penalty
        if self.l2_reg> 0:

          reg_penalty = self.compute_reg_penalty()
          cost += reg_penalty



        return cost

    def deriv_of_cost(self, predictions, cost_function):

        if cost_function == "cross_entropy":

            #avoiding division by zero
            epsilon = 10 ** -15
            predictions = np.clip(predictions, epsilon, 1-epsilon)

            dAL = (predictions - self.Y) / (predictions * (1 - predictions))

        if cost_function == "mse":
            dAL = (predictions - self.Y)

        return dAL


    def back_propogation(self, predictions, cost_func): #TOFIX

        L = self.n_layers

        grads_cache = dict()

        for i in range(L, 0, -1):
            if i == L:
                dA_i = self.deriv_of_cost(predictions, cost_function=cost_func)
            else:

                dA_i = np.dot(self.params["W" + str(i+1)].T, grads_cache["dZ" + str(i+1)]) #ERROR MuSt be here
                #print("i =", i)
                # print("W=", self.params["W" + str(i+1)] )
                # print("dZ_next =", grads_cache["dZ" + str(i+1)])

                # print("W = ", self.params["W" + str(i+1)].T.shape)
                # print("W shape ", )
                # print("dZ shape" + str(i),  grads_cache["dZ" + str(i+1)].shape)
                #print("dA", dA_i)


            grads_cache["dA" + str(i)] = dA_i
            activation_i = self.activations[i-1]

            Z_i = self.cache["Z" + str(i)]
            A_prev = self.cache["A" + str(i-1)]
            W_i = self.params["W" + str(i)]

            #print("Activation = ", activation_i)
            dZ_i = dA_i * self.deriv_activation(Z_i, activation_i)

            #print("Sum of dZ_i", np.sum(dZ_i))

            #computing derivs for W, b

            #L2 regularization term
            l2_term = (self.l2_reg / self.m_examples) * W_i

            dW_i = (1 / self.m_examples) * np.dot(dZ_i, A_prev.T) + l2_term
            db_i = (1/ self.m_examples) * np.sum(dZ_i, axis=1, keepdims=True)


            #storing gradients
            grads_cache["dZ" + str(i)] = dZ_i
            grads_cache["dW" + str(i)] = dW_i
            grads_cache["db" + str(i)] = db_i

        return grads_cache

    def update_params(self, grads): #TOFIX + opting: gd, momentum, adam

        for i in range(1, self.n_layers + 1):

            #updating by Gradient Descent
            self.params["W" + str(i)] -=  self.alpha * grads["dW" + str(i)]
            self.params["b" + str(i)] -= self.alpha * grads["db" + str(i)]


    def fit(self, epochs=100, cost_func="mse", debug=False): #TOFIX

        #fitting process

        #initialize random params

        history = []
        self.initialize_params()


        for epoch in range(0, epochs + 1):

            predictions = self.forward_propogation(self.X)

            #computing cost function
            cost = np.round(self.compute_cost(predictions, cost_func), 6)
            history.append(cost)


            if (epoch % max(1 , (epochs // 20)) == 0):
              print("Epoch #{},  {} == {}".format(epoch, cost_func, cost))

            #computing gradients
            grads = self.back_propogation(predictions, cost_func=cost_func)
            if debug == True:
              print(grads)

            #update params using Gradient Descent
            self.update_params(grads)

        self.history = history