### Model Structure
| Layer             |
| ----------------- |
| `Conv2D(16, 3x3)` |
| `Conv2D(16, 3x3)` |
| `MaxPool2D(2x2)`  |
| `Dropout`         |
| `Conv2D(32, 3x3)` |
| `Conv2D(32, 3x3)` |
| `MaxPool2D(4x4)`  |
| `Dropout`         |
| `Flatten`         |
| `Dense(256)`      |
| `Dropout`         |
| `Dense(1)`        |

![BRAAI CNN Model Structure](images/fig-braai.png)

In [1]:
# Layer Class (template)
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input):
        pass

    def backward(self, d_out, learning_rate):
        pass

In [None]:
import numpy as np
from scipy import signal

class Conv2D:
    def __init__(self, kernel_size, output_depth):
        self.output_depth = output_depth
        self.kernel_size = kernel_size
        self.initialized = False
        
    def initialize_layer(self, input_shape):
        self.input_depth, self.input_height, self.input_width = input_shape
        self.output_height = self.input_height - self.kernel_size + 1
        self.output_width = self.input_width - self.kernel_size + 1

        self.kernels_shape = (self.output_depth, self.input_depth, self.kernel_size, self.kernel_size)
        self.kernels = np.random.randn(*self.kernels_shape) * np.sqrt(2.0 / np.prod(self.kernels_shape[1:]))  
        self.biases = np.random.randn(self.output_depth)
        self.initialized = True

    def forward(self, input):
        if not self.initialized:
            self.initialize_layer(input.shape)

        self.input = input
        self.output = np.array(self.biases, copy=True)

        for i in range(self.output_depth):
            for j in range(self.input_depth):
                self.output[i] += signal.correlate2d(self.input[j], self.kernels[i, j], mode="valid")

        return self.output

    def backward(self, d_out, learning_rate):
        kernels_gradient = np.zeros_like(self.kernels)
        input_gradient = np.zeros_like(self.input)

        for i in range(self.output_depth):
            for j in range(self.input_depth):
                kernels_gradient[i, j] = signal.correlate2d(self.input[j], d_out[i], 'valid')
                input_gradient[j] += signal.convolve2d(d_out[i], self.kernels[i, j], 'full')

        self.kernels -= learning_rate * kernels_gradient
        self.biases -= learning_rate * d_out.sum(axis=(1, 2))

        return input_gradient


In [10]:
class ReLU(Layer):
    def forward(self, input):
        self.input = input
        return np.maximum(0, input)

    def backward(self, d_out, learning_rate):
        # Relu prime
        return d_out * (self.input > 0)

In [None]:
class Flatten(Layer):
    def forward(self, input):
        self.input_shape = input.shape
        return input.reshape(-1)

    def backward(self, d_out, learning_rate=None):
        return d_out.reshape(self.input_shape)

In [None]:
class MaxPool2D(Layer):
    def __init__(self, pool_size=2, stride=2):
        self.pool_size = pool_size
        self.stride = stride

    def forward(self, input):
        self.input = input
        channels, height, width = self.input.shape
        out_height = (height - self.pool_size) // self.stride + 1
        out_width = (width - self.pool_size) // self.stride + 1
        self.output = np.zeros((channels, out_height, out_width))

        self.max_indices = np.zeros_like(input)

        for c in range(channels):
            for h in range(out_height):
                for w in range(out_width):
                    h_start = h * self.stride
                    h_end = h_start + self.pool_size
                    
                    w_start = w * self.stride
                    w_end = w_start + self.pool_size

                    window = input[c, h_start:h_end, w_start:w_end]
                    self.output[c,h,w] = np.max(window)

                    max_pos = np.unravel_index(np.argmax(window), window.shape)
                    self.max_indices[c, h_start+max_pos[0], w_start+max_pos[1]]

        return self.output
    
    def backward(self, d_out, learning_rate=None):
        channels, height, width = self.input.shape
        d_input = np.zeros_like(self.input)
        out_height, out_width = d_out.shape[1], d_out.shape[2]
        
        for c in range(channels):
            for h in range(out_height):
                for w in range(out_width):
                    h_start = h * self.stride
                    h_end = h_start + self.pool_size
                    
                    w_start = w * self.stride
                    w_end = w_start + self.pool_size

                    window = self.max_indices[c, h_start:h_end, w_start:w_end]
                    d_input[c, h_start:h_end, w_start:w_end] += window * d_out[c, h, w]

        return d_input


In [12]:
class Dense(Layer):
    def __init__(self, input_size, output_size):
        self.weights = np.random.randn(output_size, input_size)
        self.biases = np.zeros(output_size)

    def forward(self, input):
        self.input = input
        return np.dot(self.weights, input) + self.biases

    def backward(self, d_out, learning_rate):
        weights_gradient = np.outer(d_out, self.input)
        input_gradient = np.dot(self.weights.T, d_out)

        self.weights -= learning_rate * weights_gradient
        self.biases -= learning_rate * d_out

        return input_gradient

In [11]:
class Dropout(Layer):
    def __init__(self, dropout_rate = 0.5):
        self.rate = dropout_rate
        self.rnd = None
        self.training = True

    def forward(self, input):
        if self.training:
            self.rnd = np.random.uniform(low=0.0, high=1.0, size=input.shape) > self.rate
            return input * self.rnd / (1.0 - self.rate)

    def backward(self, d_out, learning_rate):
        if self.training:
            return d_out * self.rnd / (1.0 - self.rate)
        else:
            return d_out

In [7]:
class Sigmoid(Layer):
    def forward(self, input):
        self.out = 1 / (1 + np.exp(-input))
        return self.out
    
    def backward(self, d_out, learning_rate=None):
        return d_out * self.out * (1-self.out)

In [8]:
def binary_cross_entropy(y_true, y_pred):
    y_pred = np.clip(y_pred, 1e-15, 1-(1e-15))
    return -(y_true*np.log(y_pred) + (1-y_true) * np.log(1-y_pred))

def binary_cross_entropy_grad(y_true, y_pred):
    y_pred = np.clip(y_pred, 1e-15, 1-(1e-15))
    return (y_pred - y_true) / (y_pred * (1 - y_pred))

In [None]:
class Model:
    def __init__(self):
        self.layers = [
            # Block 1
            Conv2D(kernel_size=3, output_depth=16),
            ReLU(),
            Conv2D(kernel_size=3, output_depth=16),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),
            Dropout(dropout_rate=0.25),
            
            # Block 2
            Conv2D(kernel_size=3, output_depth=32),
            ReLU(),
            Conv2D(kernel_size=3, output_depth=32),
            ReLU(),
            MaxPool2D(pool_size=2, stride=4),
            Dropout(dropout_rate=0.25),

            # Fully connected
            Flatten(),
            Dense(input_size=1152, output_size=256),
            ReLU(),
            Dropout(dropout_rate=0.5),
            Dense(input_size=256, output_size=1),
            Sigmoid()
        ]


    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x
    
    def backward(self, grad, learning_rate):
        for layer in reversed(self.layers):
            grad = layer.backward(grad, learning_rate)

In [13]:
def train(model, X_train, y_train, epochs, lr):
    for epoch in range(epochs):
        total_loss = 0
        correct = 0

        for i in range(len(X_train)):
            x = X_train[i]
            y_true = y_train[i]

            out = model.forward(x)

            loss = binary_cross_entropy(y_true, out)
            total_loss += loss
            
            prediction = None
            if out > 0.5:
                prediction = 1
            else: 
                prediction = 0

            if prediction == y_true:
                correct += 1

            grad = binary_cross_entropy_grad(y_true, out)
            model.backward(grad, lr)

        accuracy = correct / len(X_train)
        avg_loss = total_loss / len(X_train)
        print(f"Epoch {epoch + 1}/{epochs} — Loss: {avg_loss:.4f} — Accuracy: {accuracy:.4f}")

            

In [None]:
dataset_split = np.load("data/ztf_dataset_split.npz")
X_train = dataset_split["X_train"]
y_train = dataset_split["Y_train"]
X_train.shape, y_train.shape