<a href="https://colab.research.google.com/github/WaiWasabi/Neural-Networks/blob/master/convolutional_neural_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Activation Functions

In [None]:
class activation(object): # activation classes have two functions: fn(z) and prime(z) which return the activation function and its derivative, respectively.
  def fn(z):
    return z
  def prime(z):
    return z

class relu(activation):
  def fn(z):
    return np.maximum(0, z)

  def prime(z):
    return (fn(z) + 0.00000001)/(fn(z) + 0.00000001)

class sigmoid(activation):
  def fn(z):
    return 1/(1+np.exp(-z))

  def prime(z):
    return sigmoid.fn(z)*(1.0-sigmoid.fn(z))

# Cost Functions


In [None]:
class Cost(object):
  def fn(a, y):
    return y-a
  def prime(a, y):
    return -1

class Quadratic(Cost):
  def fn(a, y):
    return 0.5*np.linalg.norm(a-y)**2

  def delta(a, y):
    return a-y

class CrossEntropy(Cost):
  def fn(a, y):
    pass

  def delta(a, y):
    return a-y

# Weight Initializers

In [None]:
  def default_weight_initializer(input_shape, output_shape):
    return np.random.randn(output_shape, input_shape)
  
  def he_weight_initializer(input_shape, output_shape):
    return default_weight_initializer(input_shape, output_shape) * np.sqrt(2/input_shape)
  
  def xavier_weight_initializer(input_shape, output_shape): # used for tanh activation.
    return default_weight_initializer(input_shape, output_shape) * np.sqrt(1/input_shape)

# Network Class

In [None]:
import numpy as np
import random

class CNN(object):
  def __init__(self, input_shape): #input shape for a single training example, a **tuple**.
    self.model = []
    self.input_shape = input_shape

  def add(self, layer):
    self.model.append(layer)
  
  def initializer(self, input_shape): # input_shape for a single input (no batch dim)
    #for layer in self.model:
      #input_shape = layer.initializer(input_shape)
    for i in range(len(self.model)):
      input_shape = self.model[i].initializer(input_shape)
  
  def SGD(self, train_data, mini_batch_size, epochs, lr, validation_data = None):
    if validation_data != None:
        test_input, test_label = validation_data
    self.initialize()
    for i in range(epochs):
      random.shuffle(train_data)
      mini_batches = [zip(*train_data[i:i+mini_batch_size]) for i in range(0, len(train_data), mini_batch_size)]
      for mini_batch in mini_batches:
        self.update_mini_batch(mini_batch, lr)
      print(f"Epoch {i+1} complete")
      if validation_data != None: print("Accuracy (%):", self.evaluate(np.array(test_input), np.array(test_label)))

  def update_mini_batch(self, mini_batch, lr):
    train_input, train_label = mini_batch
    a = np.array(train_input)
    for layer in self.model:
      a = layer.feedforward(a)
    self.model[-1].set_labels(np.array(train_label))
    for layer in self.model[::-1]:
      a = layer.backprop(a, lr)

  def initialize(self):
    self.initializer(self.input_shape)
  
  def feedforward(self, a):
    for layer in self.model:
      a = layer.feedforward(a)
    return a

  def backprop(self, dCda, lr):
    for layer in self.model[::-1]:
      dCda = layer.backprop(dCda, lr)
    return dCda
  
  def evaluate(self, test_input, test_label): #test_data is batched.
    return sum([int(np.argmax(x)==y) for x, y in zip(self.feedforward(test_input), test_label)])/len(test_input)*100

# Layer Classes

In [None]:
class Layer(object):
  activations = {
      "default":activation,
      "relu":relu,
      "sigmoid":sigmoid
  }
  def __init__(self):
    pass

  def feedforward(self, a):
    self.set_input(a)
    return a

  def backprop(self, dCda, lr):
    return dCda

  def initializer(self, input_shape): # input_shape and output_shape are for single inputs, ignoring batches.
    self.input_shape = input_shape
    self.output_shape = self.get_output_shape(input_shape)
    return self.output_shape
  
  def set_input(self, a):
    self.a = a

  def get_output_shape(self, input_shape):
    return input_shape

In [None]:
class ConvLayer(Layer):
  def __init__(self, kernel_size, activation, stride = 1, zero_padding = 'valid'):   # still need to implement zero padding
    """kernel_size - a list of length 2 containing integers representing the x and y sizes of the filter"""
    super().__init__()
    self.activation = super().activations[activation]
    self.kernel_size = kernel_size
    self.stride = stride
    self.zero_padding = zero_padding
  
  def feedforward(self, a): # a - a numpy array of dimensionality 4: (batch size, image x, image y, color channel)
    self.set_input(a)
    output = np.zeros([a.shape[0]] + [int((i-k)/self.stride) + 1 for i, k in zip(a.shape[1:3], self.kernel_size)] + [a.shape[3]])
    a = np.transpose(a, (1,2,0,3))
    output = np.transpose(output, (1,2,0,3)) # transpose output to (image x, image y, batch size, color channel) in order to set values in batches
    for i in range(output.shape[0]):
      for j in range(output.shape[1]):
        output[i][j] = np.einsum("ijkl, ij -> kl", a[i*self.stride:i*self.stride+self.kernel_size[0], j*self.stride:j*self.stride+self.kernel_size[1]], self.filter)
    self.z = np.transpose(output, (2,0,1,3)) + self.biases
    return self.activation.fn(self.z)
  
  def backprop(self, dconv, lr): # dconv is the matrix of partial derivatives of shape (batch size, conv x, conv y, color channel), conv x and conv y are the sizes of the convoluted output from the layer's feedforward.
    delta = dconv * self.activation.prime(self.z) # delta is the derivative of the cost with respect to z.
    dCdb = delta
    dCdF = np.zeros(self.filter.shape + (delta.shape[0],)) # initialize shape of filter gradient with shape (filter x, filter y, batch size)
    delta, a = (np.transpose(delta, (1,2,0,3)), np.transpose(self.a, (1,2,0,3))) # reshape to dimensions (x, y, batch, color channel)
    dCda = np.zeros(a.shape)
    for i in range(delta.shape[0]):
      for j in range(delta.shape[1]):
        x, y = (i*self.stride, j*self.stride)
        dCdF += np.sum(delta[i][j] * a[x:x+self.kernel_size[0], y:y+self.kernel_size[1]], axis = 3) # a is the orginal input to the layer.
        dCda[x:x+self.kernel_size[0], y:y+self.kernel_size[1]] += np.einsum("ij, kl -> ijkl", self.filter, delta[i][j])
    self.filter = self.filter - (lr/dCdF.shape[2])*np.sum(np.transpose(dCdF, (2,0,1)), axis = 0)
    self.biases = self.biases - (lr/dCdb.shape[0])*np.sum(dCdb, axis = 0)
    return np.transpose(dCda, (2,0,1,3))

  """backprop todo:
  remove intermediate values dCdb and dCdF. Instead, update them directly"""

  def initializer(self, input_shape):
    self.input_shape = input_shape
    self.output_shape = self.get_output_shape(input_shape)
    self.filter = np.random.standard_normal(self.kernel_size) * np.sqrt(2/(input_shape[0]*input_shape[1]))
    self.biases = np.random.standard_normal(self.output_shape)
    #self.biases = np.zeros(self.output_shape)
    return self.output_shape

  def get_output_shape(self, input_shape):
    return tuple([int((i-k)/self.stride)+1 for i, k in zip(input_shape[:2], self.kernel_size)]) + (self.input_shape[-1],)

In [None]:
class Flatten(Layer):
  def __init__(self):
    super().__init__()
  
  def feedforward(self, a):
    self.set_input(a)
    return np.reshape(a.sum(axis = 3), (a.shape[0], -1, 1))
    
  def backprop(self, dreshape, lr):
    return np.repeat(dreshape.reshape((dreshape.shape[0],) + self.input_shape[:-1] + (1,)), self.input_shape[-1], axis = 3) 

  def get_output_shape(self, input_shape):
    return (np.prod(input_shape[:-1]).item(), 1)

In [None]:
class Dense(Layer):
  w_inits = {
      "default":default_weight_initializer,
      "he":he_weight_initializer,
      "xavier":xavier_weight_initializer
  }
  def __init__(self, shape, activation = "sigmoid", weight_initialization = "he"): # shape is an integer specifying the # of output neurons.
    super().__init__()
    self.shape = shape
    self.w_init = Dense.w_inits[weight_initialization]
    self.activation = Layer.activations[activation]


  def initializer(self, input_shape):
    self.input_shape = input_shape
    self.output_shape = self.get_output_shape(input_shape)
    self.weights = self.w_init(input_shape[0], self.output_shape[0])
    self.biases = np.random.standard_normal(self.output_shape)
    return self.output_shape

  def feedforward(self, a):
    self.set_input(a)
    self.z = np.matmul(self.weights, a) + self.biases
    return  self.activation.fn(self.z)

  def backprop(self, dCda, lr):
    delta = dCda * self.activation.prime(self.z)
    dCda = np.matmul(self.weights.transpose(), delta)
    self.biases = self.biases - (lr/delta.shape[0])*np.sum(delta, axis = 0)
    self.weights = self.weights - (lr/self.a.shape[0])*np.sum(np.matmul(delta, self.a.transpose([0,2,1])), axis = 0)
    return dCda
  
  def get_output_shape(self, input_shape):
    return (self.shape, 1)

In [None]:
class Output(Dense):
  costs = {
      "cross entropy":CrossEntropy,
      "quadratic":Quadratic
  }
  def __init__(self, shape, activation = "softmax", weight_initialization = "he", cost = "cross entropy"):
    super().__init__(shape = shape, activation = activation, weight_initialization = weight_initialization)
    self.cost = Output.costs[cost]

  def backprop(self, dCda, lr): #dCda is actually just the output a of the feedforward.
    delta = self.cost.delta(dCda, self.train_label)
    dCda = np.matmul(self.weights.transpose(), delta)    
    self.biases = self.biases - (lr/delta.shape[0])*np.sum(delta, axis = 0)
    self.weights = self.weights - (lr/self.a.shape[0])*np.sum(np.matmul(delta, self.a.transpose([0,2,1])), axis = 0)
    return dCda
  
  def set_labels(self, train_label):
    self.train_label = train_label


# Import and Process Data

In [None]:
def to_one_hot(data, max):
  output = []
  for index in data:
    one_hot = np.zeros((max, 1))
    one_hot[index][0] = 1
    output.append(one_hot)
  return np.array(output)

import tensorflow as tf
from tensorflow.keras.datasets import mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()
#x_train = np.array([x.reshape(-1, 1)/255 for x in x_train])
#x_test = np.array([x.reshape(-1, 1)/255 for x in x_test])

x_train = np.array([np.expand_dims(x/255, 2) for x in x_train])
x_test = np.array([np.expand_dims(x/255, 2) for x in x_test])

x_train, x_validate = (x_train[0:50000], x_train[50000:60000])
y_train, y_validate = (y_train[0:50000], y_train[50000:60000])

y_train = to_one_hot(y_train, 10)

train_batch = [(x, y) for x, y in zip(x_train, y_train)]
test_batch = [(x, y) for x, y in zip(x_test, y_test)]
validate_batch = [(x, y) for x, y in zip(x_validate, y_validate)]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


# Testing

In [None]:
network = CNN((28,28,1))
network.add(ConvLayer([2,2], "sigmoid", stride = 2))
network.add(Flatten())
network.add(Dense(60, activation = 'sigmoid', weight_initialization = 'he'))
network.add(Output(10, activation = 'sigmoid', weight_initialization = 'he', cost = 'cross entropy'))

"""
network.model[0].input_shape = (5,5,1)
network.model[0].output_shape = (2,2,1)
network.model[0].filter = np.array([[1,0],[1,0]])
network.model[0].biases = np.array([[[0.2], [0.3]], [[0.8], [0.4]]])

network.model[1].input_shape = (2,2,1)
network.model[1].output_shape = (4,1)

network.model[2].input_shape = (4,1)
network.model[2].output_shape = (5,1)
network.model[2].weights = np.array([[0.8,0.4,0.3,0.5],[0.1,0.7,0.2,0.9],[0.1,0.2,0.1,0.3],[0.2,0.3,0.7,0.6],[0.5,0.4,0.8,0.6]])
network.model[2].biases = np.array([[0.2], [0.1], [0.3], [0.3], [0.5]])

network.model[3].input_shape = (5,1)
network.model[3].output_shape = (3,1)
network.model[3].weights = np.array([[0.1,0.2,0.3,0.4,0.5],[0.5,0.4,0.3,0.2,0.1],[0.2,0.3,0.5,0.4,0.1]])
network.model[3].biases = np.array([[0.1],[0.2],[0.3]])
"""
validation = zip(*validate_batch)
network.SGD(train_batch, 32, 15, 0.6, validation)

"""
network.initialize()

random.shuffle(train_batch)
mini_batches = [zip(*train_batch[i:i+30]) for i in range(0, len(train_batch), 30)]

count = 0
for mini_batch in mini_batches:
  train_input, train_label = mini_batch
  network.model[-1].set_labels(np.array(train_label))
  ff = network.feedforward(np.array(train_input))
  network.backprop(ff, 10)
  count += 1
  print(count)
"""

"""
a = np.array([[[[0],[1],[2],[3],[2]],[[4],[0],[1],[2],[1]],[[1],[3],[2],[3],[0]],[[3],[1],[0],[2],[3]],[[2],[2],[3],[0],[4]]]])
print(a.shape)
a = network.model[0].feedforward(a)
print(a.shape)
a = network.model[1].feedforward(a)
print(a.shape)
a = network.model[2].feedforward(a)
print(a.shape)
a = network.model[3].feedforward(a)
network.model[3].set_labels(np.array([[[0], [1], [0]]]))
a = network.model[3].backprop(a, 1)
a = network.model[2].backprop(a, 1)
a = network.model[1].backprop(a, 1)
a = network.model[0].backprop(a, 1)
print(a)
print(network.model[0].filter)
print(network.model[0].biases)
"""
print("")

Epoch 1 complete
Accuracy (%): 87.92
Epoch 2 complete
Accuracy (%): 92.49000000000001
Epoch 3 complete
Accuracy (%): 94.17999999999999
Epoch 4 complete
Accuracy (%): 94.39
Epoch 5 complete
Accuracy (%): 95.84
Epoch 6 complete
Accuracy (%): 96.22
Epoch 7 complete
Accuracy (%): 96.16
Epoch 8 complete
Accuracy (%): 96.78999999999999
Epoch 9 complete
Accuracy (%): 96.75
Epoch 10 complete
Accuracy (%): 96.74000000000001
Epoch 11 complete
Accuracy (%): 96.71
Epoch 12 complete
Accuracy (%): 96.88
Epoch 13 complete
Accuracy (%): 96.97
Epoch 14 complete
Accuracy (%): 96.98
Epoch 15 complete
Accuracy (%): 97.0

