<a href="https://colab.research.google.com/github/ismael-rtellez/CNN1_Series_Assignment/blob/main/Convolutional_Neural_Networks_(CNN1)_Sprint.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## CNN1 Series Assignment: SimpleConv1d

### [Problem 1] Creating a one-dimensional convolutional layer class that limits the number of channels to one

In [None]:
# Problem 1: Creating a one-dimensional convolutional layer classtaht limits the number of channels to one
import numpy as np

class SimpleConv1d:
    def __init__(self, filter_size):
        self.filter_size = filter_size
        # Xavier initialization
        scale = np.sqrt(1.0 / filter_size)
        self.W = np.random.randn(filter_size) * scale
        self.B = np.zeros(1)
        self.dW = None
        self.dB = None
        self.learning_rate = 0.01

    def forward(self, x):
        self.x = x
        self.output_size = len(x) - self.filter_size + 1
        self.a = np.zeros(self.output_size)
        for i in range(self.output_size):
            self.a[i] = np.sum(x[i:i+self.filter_size] * self.W) + self.B
        return self.a

    def backward(self, delta_a):
        self.dW = np.zeros_like(self.W)
        self.dB = np.sum(delta_a)
        self.dx = np.zeros_like(self.x)

        for s in range(self.filter_size):
            for i in range(self.output_size):
                self.dW[s] += delta_a[i] * self.x[i + s]

        for j in range(len(self.x)):
            for s in range(self.filter_size):
                if 0 <= j - s < self.output_size:
                    self.dx[j] += delta_a[j - s] * self.W[s]

        return self.dx

    def update(self):
        self.W -= self.learning_rate * self.dW
        self.B -= self.learning_rate * self.dB

### [Problem 2] Output size calculation after one-dimensional convolution

In [None]:
# Problem 2: Output size calculation after one-dimensional convolution
def calculate_conv1d_output_size(n_input, padding, filter_size, stride):
    return (n_input + 2 * padding - filter_size) // stride + 1

### [Problem 3] Experiment of one-dimensional convolutional layer with small array

In [None]:
# Problem 3: Experiment of one-dimensional convolutional layer with small array
x = np.array([1, 2, 3, 4])
w = np.array([3, 5, 7])
b = np.array([1])

conv = SimpleConv1d(filter_size=3)
conv.W = w.copy()
conv.B = b.copy()

# Forward
a = conv.forward(x)
print("Forward output: ", a)

# Backward
delta_a = np.array([10, 20])
dx = conv.backward(delta_a)
print("dW: ", conv.dW)
print("dB: ", conv.dB)
print("dx: ", dx)

Forward output:  [35. 50.]
dW:  [ 50  80 110]
dB:  30
dx:  [ 30 110 170 140]


  self.a[i] = np.sum(x[i:i+self.filter_size] * self.W) + self.B


### [Problem 4] Creating a one-dimensional convolutional layer class that does not limit the number of channels

In [None]:
# Problem 4: Creating a one-dimensional convolutional layer classthat does not limit the number of channels

class Conv1d:
    def __init__(self, in_channels, out_channels, filter_size):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.filter_size = filter_size
        scale = np.sqrt(1.0 / (in_channels * filter_size))
        self.W = np.random.randn(out_channels, in_channels, filter_size) * scale
        self.B = np.zeros(out_channels)
        self.dW = None
        self.dB = None
        self.learning_rate = 0.01

    def forward(self, x):
        self.x = x
        in_channels, n_features = x.shape
        out_features = n_features - self.filter_size + 1
        self.out_features = out_features
        self.a = np.zeros((self.out_channels, out_features))

        for oc in range(self.out_channels):
            for i in range(out_features):
                for ic in range(self.in_channels):
                    self.a[oc, i] += np.sum(
                        x[ic, i:i+self.filter_size] * self.W[oc, ic]
                        )
                self.a[oc, i] += self.B[oc]
        return self.a


### [Problem 5] (Advanced task) Implementing padding

In [None]:
# Problem 5: (Advanced task) Implementing padding
def pad1d(x, pad_with, mode='constant'):
    return np.pad(x, pad_with, mode=mode)

# running an example to see output
x = np.array([1, 2, 3, 4])
print("Padded: ", pad1d(x, (2, 2)))

Padded:  [0 0 1 2 3 4 0 0]


### [Problem 6] (Advanced task) Response to mini batch

In [None]:
# Problem 6: (Advanced Task) Response to mini batch

def forward(self, x):   # x shape: (batch_size, in_channels, n_features)
    self.x = x
    batch_size, in_channels, n_features = x.shape
    out_features = n_features - self.filter_size + 1
    self.out_features = out_features
    self.a = np.zeros((batch_size, self.out_channels, out_features))

    for b in range(batch_size):
        for oc in range(self.out_channels):
            for i in range(out_features):
                for ic in range(self.in_channels):
                    self.a[b, oc, i] += np.sum(
                        x[b, ic, i:i+self.filter_size] * self.W[oc, ic]
                        )
                self.a[b, oc, i] += self.B[oc]
    return self.a

### [Problem 7] (Advance assignment) Arbitrary number of strides

In [None]:
# Problem 7: (Advance assignment) Arbitrary number of strides
def forward(self, x, stride=1):
    self.stride = stride
    self.x = x
    batch_size, in_channels, n_features = x.shape
    out_features = (n_features - self.filter_size) // stride + 1
    self.out_features = out_features
    self.a = np.zeros((batch_size, self.out_channels, out_features))

    for b in range(batch_size):
        for oc in range(self.out_channels):
            for i in range(out_features):
                start = i * stride
                for ic in range(self.in_channels):
                    self.a[b, oc, i] += np.sum(
                        x[b, ic, start:start + self.filter_size] * self.W[oc, ic]
                        )
                self.a[b, oc, i] += self.B[oc]
    return self.a

### [Problem 8] Learning and estimation

In [None]:
# Problem 8: Learning and estimation
import numpy as np
from keras.datasets import mnist
from keras.utils import to_categorical

# utilities
def softmax(x):
    e_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return e_x / e_x.sum(axis=1, keepdims=True)

def cross_entropy(y_pred, y_true):
    return -np.sum(y_true * np.log(y_pred + 1e-7)) / y_true.shape[0]

def softmax_cross_entropy_grad(y_pred, y_true):
    return (y_pred - y_true) / y_true.shape[0]

def relu(x):
    return np.maximum(0, x)

def relu_grad(x):
    return (x > 0).astype(np.float32)

# Conv1d layer
class Conv1d_p8:
    def __init__(self, in_channels, out_channels, filter_size):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.filter_size = filter_size
        scale = np.sqrt(1.0 / (in_channels * filter_size))
        self.W = np.random.randn(out_channels, in_channels, filter_size) * scale
        self.B = np.zeros(out_channels)
        self.dW = None
        self.dB = None
        self.lr = 0.01

    def forward(self, x, stride=1):
        self.stride = stride
        self.x = x
        batch_size, in_channels, n_features = x.shape
        self.out_features = (n_features - self.W.shape[2]) // stride + 1
        self.a = np.zeros((batch_size, self.W.shape[0], self.out_features))
        for b in range(batch_size):
            for oc in range(self.W.shape[0]):
                for i in range(self.out_features):
                    start = i * stride
                    for ic in range(self.W.shape[1]):
                        self.a[b, oc, i] += np.sum(
                            x[b, ic, start:start + self.W.shape[2]] * self.W[oc, ic]
                            )
                    self.a[b, oc, i] += self.B[oc]
        return self.a

    def backward(self, delta):
        batch_size, in_channels, n_features = self.x.shape
        _, out_channels, _ = delta.shape
        filter_size = self.W.shape[2]

        self.dW = np.zeros_like(self.W)
        self.dB = np.zeros_like(self.B)
        dx = np.zeros_like(self.x)

        for b in range(batch_size):
            for oc in range(out_channels):
                for i in range(self.out_features):
                    start = i * self.stride
                    self.dB[oc] += delta[b, oc, i]
                    for ic in range(in_channels):
                        self.dW[oc, ic] += self.x[b, ic, start:start + filter_size] * delta[b, oc, i]
                        dx[b, ic, start:start + filter_size] += self.W[oc, ic] * delta[b, oc, i]

        self.W -= self.lr * self.dW
        self.B -= self.lr * self.dB
        return dx

# Fully Connected layer
class FC:
    def __init__(self, input_dim, output_dim):
        scale = np.sqrt(1.0 / input_dim)
        self.W = np.random.randn(input_dim, output_dim) * scale
        self.B = np.zeros(output_dim)
        self.lr = 0.01

    def forward(self, x):
        self.x = x
        return np.dot(x, self.W) + self.B

    def backward(self, delta):
        dx = np.dot(delta, self.W.T)
        dW = np.dot(self.x.T, delta)
        dB = np.sum(delta, axis=0)

        self.W -= self.lr * dW
        self.B -= self.lr * dB
        return dx


# Loading MNIST
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Specifying datset size to speed training time
x_train = x_train[:1000]
y_train = y_train[:1000]

# Normalizeing and reshaping to (batch, channel, feature)
x_train = x_train.reshape(-1, 1, 28*28).astype(np.float32) / 255.0
y_train = to_categorical(y_train)

# Model
conv_p8 = Conv1d_p8(in_channels=1, out_channels=4, filter_size=5)
fc = FC(input_dim=4 *780, output_dim=10)

# Trainig loop
epochs = 5
batch_size = 32
num_batches = x_train.shape[0] // batch_size

for epoch in range(epochs):
    perm = np.random.permutation(x_train.shape[0])
    x_train_shuffled = x_train[perm]
    y_train_shuffled = y_train[perm]
    epoch_loss = 0
    correct = 0

    for i in range(num_batches):
        xb = x_train_shuffled[i*batch_size:(i+1)*batch_size]
        yb = y_train_shuffled[i*batch_size:(i+1)*batch_size]

        # Forward
        out = conv_p8.forward(xb)
        out_relu = relu(out)
        out_flat = out_relu.reshape(batch_size, -1)
        logits = fc.forward(out_flat)
        probs = softmax(logits)
        loss = cross_entropy(probs, yb)
        epoch_loss += loss

        pred = np.argmax(probs, axis=1)
        true = np.argmax(yb, axis=1)
        correct += np.sum(pred == true)

        # Backward
        dloss = softmax_cross_entropy_grad(probs, yb)
        d_fc = fc.backward(dloss)
        d_relu = d_fc.reshape(out.shape) * relu_grad(out)
        conv_p8.backward(d_relu)

    acc = correct / (num_batches * batch_size)
    print(f"Epoch {epoch+1}/{epochs} - Loss: {epoch_loss/num_batches:.4f} - Accuracy: {acc:.4f}")

Epoch 1/5 - Loss: 1.9703 - Accuracy: 0.4607
Epoch 2/5 - Loss: 1.3305 - Accuracy: 0.7611
Epoch 3/5 - Loss: 0.9221 - Accuracy: 0.8165
Epoch 4/5 - Loss: 0.7108 - Accuracy: 0.8427
Epoch 5/5 - Loss: 0.5979 - Accuracy: 0.8609
