<a href="https://colab.research.google.com/github/tawaqalt/arbritrary/blob/master/Tawakalitu_Yusuf_SimpleCovo1d.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importation of Libraries

In [None]:
#importing relevant libraries
import numpy as np
import math
from keras.datasets import mnist
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split


In [None]:
class GetMiniBatch:
    def __init__(self, X, y, batch_size=20, seed=None):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(X.shape[0])
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0] / self.batch_size).astype(int)

    def __len__(self):
        return self._stop

    def __getitem__(self, item):
        p0 = item * self.batch_size
        p1 = item * self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]

    def __iter__(self):
        self._counter = 0
        return self

    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter * self.batch_size
        p1 = self._counter * self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [None]:
class FullyConnectedLayer:
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer, activation):
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.W = initializer.W(self.n_nodes1, self.n_nodes2)
        self.B = initializer.B(self.n_nodes2)
        self.activation = activation
        self.optimizer = optimizer
        self.HW = 0
        self.HB = 0

    def forward(self, X):
        self.X = X
        self.A = np.dot(X, self.W) + self.B
        return self.activation.forward(self.A)

    def backward(self, dZ):
        dA = self.activation.backward(dZ)
        self.dB = np.mean(dA, axis=0)
        self.dW = np.dot(self.X.T, dA)/len(self.X)
        dZ = np.dot(dA, self.W.T)
        self = self.optimizer.update(self)
        return dZ

In [None]:
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = np.random.randn(n_nodes1, n_nodes2) * self.sigma
        return W

    def B(self, n_nodes2):
        return np.zeros(n_nodes2)

In [None]:
class HeInitializer:
    def __init__(self):
        pass

    def W(self, n_nodes1, n_nodes2):
      return np.random.randn(n_nodes1, n_nodes2) * np.sqrt(2 / n_nodes1)

    def B(self, n_nodes2):
      return np.zeros(n_nodes2)

In [None]:
class Adagrad:
    def __init__(self, lr):
        self.lr = lr
        self.HW = 0
        self.HB = 0

    def update(self, layer):
        self.hW += layer.dW * layer.dW
        self.hB += layer.dB * layer.dB

        layer.W -= self.lr * layer.dW / (np.sqrt(self.hW) + 1e-7)
        layer.B -= self.lr * layer.dB / (np.sqrt(self.hB) + 1e-7)
        return layer

In [None]:
class SGD:
    def __init__(self, lr):
        self.lr = lr

    def update(self, layer):
        W -= self.lr * layer.dW
        B -= self.lr * layer.dB
        return layer

In [None]:
class Sigmoid:
    def forward(self, A):
        self.A = A
        Z = 1 / (1 + np.exp(-self.A))
        return Z

    def backward(self, dout):
        dA = dout * (1 - dout)
        return dA

class Tanh:
    def forward(self, A):
        self.A = A
        Z = np.tanh(self.A)
        return Z

    def backward(self, dZ):
        dA = dZ * (1 - np.tanh(self.A) ** 2)
        return dA

class Softmax:
    def forward(self, A):
        self.A = A
        exp_A = np.exp(A - np.max(A, axis=1, keepdims=True))
        Z = exp_A / np.sum(exp_A, axis=1, keepdims=True)
        return Z

    def backward(self, Z, y):
        m = len(y)
        dA = Z - y
        loss = -np.sum(y * np.log(Z + 1e-10)) / m
        return dA, loss

In [None]:
class ReLU:
    def __init__(self):
      pass
    def forward(self, A):
        self.A = A
        Z = np.maximum(0, self.A)
        return Z

    def backward(self, dZ):
        dA = dZ * (self.A > 0)
        return dA

In [None]:
class XavierInitializer:
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = np.random.randn(n_nodes1, n_nodes2) * self.sigma * np.sqrt(1 / n_nodes1)
        return W

    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B

class HeInitializer:
    def __init__(self, sigma):
        self.sigma = sigma

    def W(self, n_nodes1, n_nodes2):
        W = np.random.randn(n_nodes1, n_nodes2) * self.sigma * np.sqrt(2 / n_nodes1)
        return W

    def B(self, n_nodes2):
        B = self.sigma * np.random.randn(1, n_nodes2)
        return B

In [None]:
# Load data
(X_train, y_train), (X_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


# [Problem 1] Creating a one-dimensional convolutional layer class that limits the number of channels to one

In [None]:
#1D Convolutional layer
class SimpleConv1d:
  def __init__(self, out_channel, in_channel, filter_size, stride, initializer, optimizer, activation, padding):
      self.out_channel = out_channel
      self.in_channel = in_channel
      self.filter_size = filter_size
      self.stride = stride
      self.initializer = initializer
      self.optimizer = optimizer
      self.activation = activation
      self.padding = padding

      # Initialize weights and biases
      self.W = self.initializer.W(out_channel, in_channel, filter_size)
      self.B = self.initializer.B(out_channel)

  def output_shape(self, n_feature, filter_size, padding=0, stride=1):
    return int((n_feature + 2*padding - filter_size) / stride + 1)

  def forward(self, X):
    self.X = X
    N,INC, Feature = X.shape
    OCH, INC, FS = self.W.shape
    OUT = self.output_shape(Feature, self.filter_size, 0, 1)

    self.size = N, INC, OCH, OUT
    A = np.zeros([N, OCH, OUT])
    for n in range(N):
      for och in range(OCH):
        for m in range(OUT):
          for ich in range(INC):
           A[n, och, m] += np.sum(X[n, ich,m:m+FS] * self.W[och, ich,:])
    A += self.B[:, None]

    return self.activation.forward(A)

  def backward(self, dA):

    #Bias
    self.dB = np.mean(np.sum(dA, axis=2), axis=0)
    #weight, flow and slope
    self.dW = np.zeros(self.W.shape)
    dZ = np.zeros(self.X.shape)

    N, INC, OCH, OUT = self.size

    for n in range(N):
      for och in range(OCH):
        for m in range(OUT):
          for ich in range(INC):
            self.dW[och, ich, :] += self.X[n, ich, fs+m] * dA[n, och, m]
            dZ[n, ich, fs+m] += self.W[och, ich, :] * dA[n, och, m]
    #update
    self = self.optimizer.update(self)
      # Update weights and biases
    self.W -= self.optimizer.update(self.dW)
    self.B -= self.optimizer.update(self.dB)

    return dZ

In [None]:
#1D convolutional Simple Initializer
class SimpleInitializerConv1d:
  def __init__(self, sigma=0.01):
    self.sigma = sigma

  def W(self, out_channel, in_channel, filter_size):
    return self.sigma * np.random.randn(out_channel, in_channel, filter_size)

  def B(self, out_channel):
    return np.zeros(out_channel)

In [None]:
#scracth CNN

class Scratch1dCNNClassifier():
  def __init__(self, NN, CNN, n_epoch = 5, n_batch = 1, verbose = False):
    #parameters
    self.NN = NN
    self.CNN = CNN
    self.n_epoch = n_epoch
    self.n_batch = n_batch
    self.verbose = verbose
    self.log_loss = np.zeros(self.n_epoch)
    self.log_acc = np.zeros(self.n_epoch)

  def loss_function(sels, y, yt):
    delta = 1e-7
    return -np.mean(yt*np.log(y+delta))

  def accuracy(self, Z, Y):
    return accuracy_score(Y,Z)

  def fit(self, X, y , X_val=False, y_val=False):
    for epoch in range(self.n_epoch):
      #mini-batch processing
      get_mini_batch = GetMiniBatch(X, y, batch_size=self.n_batch)
      self.loss = 0
      for mini_X_train, mini_y_train in get_mini_batch:
        #forward propagation
        forward_data = mini_X_train.reshape(self.n_batch, 1,-1)
        #Conv
        for layer in range(len(self.CNN)):
          forward_data = self.CNN[layer].forward(forward_data)

        record_shape = forward_data.shape
        forward_data = forward_data.reshape(record_shape[0], -1)

        for layer in range(len(self.NN)):
          forward_data = self.NN[layer].forward(forward_data)

          #Predicted value
          Z = forward_data

          #Back propagation
          backward_data = (Z - mini_y_train)/self.n_batch
          for layer in range(len(self.NN)-1, -1, -1):
            backward_data = self.NN[layer].backward(backward_data)

          #Log loss
          self.loss += self.loss_function(Z, mini_y_train)

      self.log_loss[epoch] = self.loss/len(get_mini_batch)
      self.log_acc[epoch] = self.accuracy(self.predict(X), np.argmax(y, axis=1))

  def predict(self, X):
    pred_data = X[:, np.newaxis, :]

    #Conv
    for layer in range(len(self.CNN)):
      pred_data = self.CNN[layer].forward(pred_data)

    pred_data = pred_data.reshape(pred_data.shape[0], -1)

    for layer in range(len(self.NN)):
      pred_data = self.NN[layer].forward(pred_data)

      return np.argmax(pred_data, axis=1)

# [Problem 2] Output size calculation after one-dimensional convolution

In [None]:
def output_size_calculation(n_in, F, P=0, S=1):
    n_out = int((n_in + 2*P - F) / S + 1)

    return n_out

In [None]:
output_size_calculation(3, 2)

2

# [Problem 3] Experiment of one-dimensional convolutional layer with small array

In [None]:
x = np.array([1,2,3,4])
w = np.array([3, 5, 7])
b = np.array([1])

delta_a = np.array([10, 20])

In [None]:
a = np.zeros(output_size_calculation(4,3,0,1))
for i in range(len(a)):
    x_tmp = x[i:i+len(w)]
    a[i] = np.dot(x_tmp, w) + b

print(a)

[35. 50.]


  a[i] = np.dot(x_tmp, w) + b


In [None]:
#backward bias
delta_f = np.sum(delta_a)
print(delta_f)

30


In [None]:
#backward filter
delta_w = np.zeros(w.shape)
for i in range(len(delta_a)):
    x_tmp = x[i:i+len(w)]
    delta_w += delta_a[i]*x_tmp
print(delta_w)

[ 50.  80. 110.]


In [None]:
#backward error to convey the next layer
delta_x = np.zeros(x.shape)
for i in range(len(delta_a)):
    delta_x[i:i+len(w)] += w*delta_a[i]
print(delta_x)

[ 30. 110. 170. 140.]


In [None]:
#Implementation
x = np.array([1, 2, 3, 4])
w = np.array([3, 5, 7])
b = np.array([1])
#creating indexes
indexes0 = np.array([0, 1, 2]).astype(int)
indexes1 = np.array([1, 2, 3]).astype(int)
a = np.zeros(2) # Initialize 'a' as a NumPy array with the correct size
a[0] = np.sum(x[indexes0]*w) # Calculate the sum of the element-wise multiplication
print(a[0])
a[1] = np.sum(x[indexes1]*w) # Calculate the sum of the element-wise multiplication
print(a[1])
# a = a.sum(axis=1) # No need to sum 'a' again
print(a)


34.0
49.0
[34. 49.]


# [Problem 4] Creating a one-dimensional convolutional layer class that does not limit the number of channels

In [None]:
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]])
w = np.array([[[1, 1, 2], [2, 1, 1]], [[2,1,1], [1,1,1]], [[1,1,1],[1,1,1]]])
b = np.array([1,2,3])

print('x.shape', x.shape)
print('w.shape', w.shape)
print('b.shape', b.shape)

x.shape (2, 4)
w.shape (3, 2, 3)
b.shape (3,)


In [None]:
#forward
a = np.zeros([3, output_size_calculation(4,3,0,1)])

for och in range(a.shape[0]):
  for m in range(a.shape[1]):
    for ich in range(x.shape[0]): # Iterate over the valid range of input channels
      a[och, m] += np.sum(x[ich,m:m+w.shape[2]]*w[och,ich,:])

a += b[:, None]
print(a)

[[21. 29.]
 [18. 25.]
 [18. 24.]]


In [None]:
#backward
delta_a = np.array([[9, 11], [32, 35], [52, 56]])

print('delta_a:\n', delta_a)
print('a.shape:\n', a.shape)

delta_a:
 [[ 9 11]
 [32 35]
 [52 56]]
a.shape:
 (3, 2)


In [None]:
#backward bias
delta_b = np.sum(delta_a, axis=1)
print('delta_b\n', delta_b)

delta_b
 [ 20  67 108]


In [None]:
#backward filter
delta_w = np.zeros([3,2,3])
for och in range(delta_w.shape[0]):
  for ich in range(delta_w.shape[1]):
    for fs in range(delta_w.shape[2]):
      for m in range(2):
        delta_w[och,ich,fs] += (x[ich,fs+m]*delta_a[och,m])
print('delta_w')
print(delta_w)

delta_w
[[[ 31.  51.  71.]
  [ 51.  71.  91.]]

 [[102. 169. 236.]
  [169. 236. 303.]]

 [[164. 272. 380.]
  [272. 380. 488.]]]


In [None]:
#backward error to convey next layer
delta_x = np.zeros([2,4])
for och in range(w.shape[0]):
  for ich in range(w.shape[1]):
    for fs in range(w.shape[2]):
      for m in range(2):

        delta_x[ich,fs+m] += w[och,ich,fs]*delta_a[och,m]
print('delta_x')
print(delta_x)

delta_x
[[125. 230. 204. 113.]
 [102. 206. 195. 102.]]


# [Problem 6] (Advanced task) Response to mini batch

In [None]:
#Mini_batch support
x = np.array([[1, 2, 3, 4], [2, 3, 4, 5]]*2).reshape(2,2,4)
w = np.array([[[1, 1, 2], [2, 1, 1]], [[2,1,1], [1,1,1]], [[1,1,1],[1,1,1]]])
b = np.array([1,2,3])

print('x.shape', x.shape)
print('w.shape', w.shape)
print('b.shape', b.shape)

x.shape (2, 2, 4)
w.shape (3, 2, 3)
b.shape (3,)


In [None]:
#backward
delta_a = np.array([[9, 11], [32, 35], [52, 56]]*2).reshape(2,3,2)

print('delta_a:\n', delta_a)
print('a.shape:\n', a.shape)

delta_a:
 [[[ 9 11]
  [32 35]
  [52 56]]

 [[ 9 11]
  [32 35]
  [52 56]]]
a.shape:
 (3, 2)


In [None]:
#Size
N, INC, Feature = x.shape
OCH, INC, FS = w.shape
A = output_size_calculation(Feature, FS, 0, 1)

In [None]:
#Forward
a = np.zeros([N, OCH, A])

for n in range(N):
  for och in range(OCH):
    for ich in range(INC):
      for m in range(A):
        a[n, och, m] += np.sum(x[n, ich,m:m+FS] * w[och, ich,:])

a += b[:, None]
print(a.shape)
print(a)

(2, 3, 2)
[[[21. 29.]
  [18. 25.]
  [18. 24.]]

 [[21. 29.]
  [18. 25.]
  [18. 24.]]]


In [None]:
#backward bais
delta_b = np.mean(np.sum(delta_a, axis=2), axis=0)
print('delta_b\n', delta_b)

delta_b
 [ 20.  67. 108.]


In [None]:
#backward filter
delta_w = np.zeros([3,2,3])
for n in range(N):
  for och in range(OCH):
    for ich in range(INC):
      for fs in range(FS):
        for m in range(A):
          delta_w[och,ich,fs] += x[n, ich, fs+m] * delta_a[n, och, m]

print('delta_w:\n', delta_w)

delta_w:
 [[[ 62. 102. 142.]
  [102. 142. 182.]]

 [[204. 338. 472.]
  [338. 472. 606.]]

 [[328. 544. 760.]
  [544. 760. 976.]]]


In [None]:
#backward error to convey next layer
delta_x = np.zeros(x.shape)

for n in range(N):
  for och in range(OCH):
    for ich in range(INC):
      for fs in range(FS):
        for m in range(A):
          delta_x[n,ich,fs+m] += w[och,ich,fs]*delta_a[n, och, m]

print('delta_x:\n', delta_x)

delta_x:
 [[[125. 230. 204. 113.]
  [102. 206. 195. 102.]]

 [[125. 230. 204. 113.]
  [102. 206. 195. 102.]]]


# [Problem 8] Learning and estimation

# Importing mnist data from sklearn

In [None]:
#importing data
from keras.datasets import mnist
(X, y), (X_test, y_test) = mnist.load_data()

In [None]:
#checking the data
print(X.shape)
print(y.shape)
print(X[1].dtype)

(60000, 28, 28)
(60000,)
uint8


In [None]:
#Smoothing
X_flat = X.reshape(-1, 784)
X_test_flat = X_test.reshape(-1, 784)
print(X_flat.shape)
print(X_test_flat.shape)

(60000, 784)
(10000, 784)


In [None]:
#Conversion to the float datatype
X_flat = X_flat.astype(float)
X_test_flat = X_test_flat.astype(float)
X_flat /= 255
X_test_flat /= 255

print(X_flat.max())
print(X_flat.min())

1.0
0.0


In [None]:
#One hot encoding
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
y_enc = enc.fit_transform(y[:, np.newaxis])
y_test_enc = enc.transform(y_test[:, np.newaxis])
print(y.shape)
print(y_enc.shape)
print(y_test_enc.shape)

(60000,)
(60000, 10)
(10000, 10)


In [None]:
#splitting into training and validation data
X_train, X_vall, y_train, y_vall = train_test_split(X_flat, y_enc, test_size=0.2)
print(X_train.shape)
print(X_vall.shape)

(48000, 784)
(12000, 784)


In [None]:
#Learning and estimation
NN = {0:FullyConnectedLayer(15640, 400, HeInitializer(sigma=0.01), Adagrad(0.01), ReLU()),
      1:FullyConnectedLayer(400, 200, HeInitializer(sigma=0.01), Adagrad(0.01), ReLU()),
      2:FullyConnectedLayer(200, 10, SimpleInitializer(sigma=0.01), Adagrad(0.01), Softmax())}

In [None]:
#Learning and estimation
CNN = {0:SimpleConv1d(out_channel=16, in_channel=1, filter_size=3, stride=1,
                     initializer=SimpleInitializerConv1d(sigma=0.01),
                     optimizer=SGD(0.01), activation=ReLU(), padding=0)}