## Script Overview: `cnn_full`

This notebook (`cnn_full.ipynb`) contains a self‑contained implementation of a small convolutional neural network, “from scratch,” organized into the following sections:

1. **Imports & Setup**  
   - Standard libraries (`numpy`, `pathlib`, `tqdm`, `PIL`, `pandas`)  
   - Utility functions for image loading, label encoding, and batch generation  

2. **Activation Layers**  
   - **ReLU** (if you want more activations there are in src/activations) 

3. **Convolution & Pooling Layers**  
   - **Conv2D**: 2D convolution with He/rand init, padding/stride, Adam updates  
   - **MaxPool2D**: 2×2 (or custom) max pooling layer  
   - Core routines: `conv_single_step`, `conv_forward`, `conv_backward`, `pool_forward`, `pool_backward`  

4. **Dense & Flatten Layers**  
   - **Flatten**: reshape from (m, H, W, C) → (m, features)  
   - **Dense**: fully connected + softmax output, with Adam updates  

5. **Model Builder & Core Passes**  
   - `crear_modelo(filters, pool, n_classes, …)`: assemble a sequential layer list  
   - `conv_net_forward(layers, X)`: propagate one batch through all layers, cache for backprop  
   - `conv_net_backward(layers, A_out, y_batch, lr)`: compute gradients and update trainable layers  

6. **Training Loop**  
   - `full_cnn(filters, pool, df_train, epochs, batch_size, lr)`:  
     - Splits data into batches via `batch_generator`  
     - Runs forward/backward passes, Adam updates per batch  
     - Tracks epoch‑wise loss & accuracy in a history dict  
     - Displays real‑time progress with `tqdm`




## Librarys

In [1]:
import pandas as pd 
import numpy as np
from pathlib import Path
from PIL import Image
from tqdm.auto import trange, tqdm

## Load the dataset 

In [2]:
base_dir = Path('..') / Path('plant-seedlings-classification')
train_dir = base_dir / 'train'
test_dir  = base_dir / 'test'

## Labels 
train_rows = []
for class_dir in train_dir.iterdir():
    if class_dir.is_dir():
        for img_path in class_dir.glob('*.*'):  
            train_rows.append({
                'filepath': str(img_path),
                'label': class_dir.name})
            
df_train = pd.DataFrame(train_rows)
df_train_shuffled = df_train.sample(frac=1, random_state=9).reset_index(drop=True)

n = len(df_train)
split_idx = int(0.8 * n) 

train_df = df_train.iloc[:split_idx].copy()
val_df = df_train.iloc[split_idx:].copy()
print(f"Train: {len(train_df)} filas, Val: {len(val_df)} filas")


## No labels 
test_rows = []
for img_path in test_dir.glob('*.*'):
    test_rows.append({'filepath': str(img_path)})
df_test = pd.DataFrame(test_rows)

print(f'Test: {len(df_test)} filas')

Train: 3800 filas, Val: 950 filas
Test: 794 filas


## Batch Functions 

In [3]:

label_list = sorted(df_train['label'].unique())
label2idx  = {label: i for i, label in enumerate(label_list)}

def load_image(path: str, target_size: tuple = (64, 64)):
    """
    Load an image from disk, resize it, and normalize pixel values.

    Args:
        path (str): Filesystem path to the image.
        target_size (tuple of int): Desired output size as (width, height).

    Returns:
        np.ndarray: RGB image array of shape (height, width, 3), dtype float32,
                    with values scaled to [0.0, 1.0].
    """
    img = Image.open(path).convert('RGB')
    try:
        resample = Image.Resampling.LANCZOS
    except AttributeError:
        resample = Image.LANCZOS
    img = img.resize(target_size, resample=resample)
    return np.array(img, dtype=np.float32) / 255.0


def encode_labels(label_batch: list, label2idx: dict, label_list: list):
    """
    Convert a list of label strings to one-hot encoded vectors.

    Args:
        label_batch (list of str): Labels for the current batch.
        label2idx (dict): Mapping from label string to integer index.
        label_list (list of str): Full list of possible labels in order.

    Returns:
        np.ndarray: One-hot matrix of shape (batch_size, num_classes), dtype float32.
    """
    idxs = [label2idx[label] for label in label_batch]
    one_hot = np.zeros((len(idxs), len(label_list)), dtype=np.float32)
    one_hot[np.arange(len(idxs)), idxs] = 1.0
    return one_hot


def batch_generator(df, label2idx: dict, label_list: list , batch_size: int = 32, shuffle: bool = True, target_size: tuple = (64, 64)):
    """
    Infinite generator yielding batches of images and one-hot labels.

    Args:
        df (pandas.DataFrame): Must contain columns 'filepath' and 'label'.
        batch_size (int): Number of samples per batch.
        shuffle (bool): Whether to shuffle df at the start of each epoch.
        target_size (tuple of int): Size to resize images (width, height).

    Yields:
        Tuple[np.ndarray, np.ndarray]:
            - X: Array of shape (batch_size, H, W, 3), dtype float32.
            - y: One-hot labels of shape (batch_size, num_classes), dtype float32.
    """
    n = len(df)
    while True:
        if shuffle:
            df = df.sample(frac=1).reset_index(drop=True)
        for i in range(0, n, batch_size):
            batch = df.iloc[i:i + batch_size]
            X = np.stack([load_image(fp, target_size) for fp in batch['filepath']])
            y = encode_labels(batch['label'].tolist(), label2idx, label_list)
            yield X, y

In [5]:
batch_size = 16
gen = batch_generator(train_df, batch_size=batch_size, label2idx=label2idx , label_list=label_list)
X_batch, y_batch = next(gen)
print(X_batch.shape, y_batch.shape)

(16, 64, 64, 3) (16, 12)


## Auxiliar functios for Conv2d layer 

In [6]:
def zero_pad(X, pad):
    """
    Pad with zeros all images of the dataset X. The padding is applied to the height and width of an image
    
    Argument:
    X -- python numpy array of shape (m, n_H, n_W, n_C) representing a batch of m images
    pad -- integer, amount of padding around each image on vertical and horizontal dimensions
    
    Returns:
    X_pad -- padded image of shape (m, n_H + 2 * pad, n_W + 2 * pad, n_C)
    """

    if not isinstance(pad, int) or pad < 0:
        raise ValueError("`pad` debe ser un entero >= 0.")
    if X.ndim != 4:
        raise ValueError("`X` debe tener forma (m, n_H, n_W, n_C).")
    
    m, n_H, n_W, n_C = X.shape
    X_pad = np.zeros((m, n_H + 2*pad,n_W + 2*pad,n_C) , dtype=X.dtype)
    X_pad[:, pad:pad+n_H, pad:pad+n_W, :] = X
    
    return X_pad


def conv_single_step(a_slice_prev, W, b):
    """
    Perform a single convolution step on a slice of the input.

    Args:
        a_slice_prev (np.ndarray): Input slice of shape (f, f, n_C_prev).
        W (np.ndarray): Filter weights of shape (f, f, n_C_prev).
        b (np.ndarray or float): Bias term, broadcastable to a scalar.

    Returns:
        float: The result of applying the filter and bias (i.e., sum(a_slice_prev * W) + b).
    """

    s = a_slice_prev * W
    Z = np.sum(s)
    Z = Z + b.item()
    return Z

## Forward pass for the Conv Layers

In [7]:
def conv_forward(A_prev, W, b, hparameters):
    """
    Implements the forward propagation for a convolution layer
    
    Arguments:
    A_prev -- output activations of the previous layer, 
        numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
    W -- Weights, numpy array of shape (f, f, n_C_prev, n_C)
    b -- Biases, numpy array of shape (1, 1, 1, n_C)
    hparameters -- python dictionary containing "stride" and "pad"
        
    Returns:
    Z -- conv output, numpy array of shape (m, n_H, n_W, n_C)
    cache -- cache of values needed for the conv_backward() function
    """

    (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape
    (f, f, n_C_prev, n_C) = W.shape
    
    stride = hparameters["stride"]
    pad = hparameters["pad"]

    if not isinstance(hparameters, dict):
        raise ValueError("`hparameters` debe ser un dict con 'stride' y 'pad'.")
    if 'stride' not in hparameters or 'pad' not in hparameters:
        raise KeyError("`hparameters` requiere las claves 'stride' y 'pad'.")
    stride, pad = hparameters['stride'], hparameters['pad']
    if not (isinstance(stride, int) and stride > 0):
        raise ValueError("`stride` debe ser un entero > 0.")
    if not (isinstance(pad, int) and pad >= 0):
        raise ValueError("`pad` debe ser un entero >= 0.")

    n_H = int((n_H_prev - f + 2*pad) / stride) + 1
    n_W = int((n_W_prev - f + 2*pad) / stride) + 1
    Z = np.zeros((m, n_H, n_W, n_C))
    
    A_prev_pad = zero_pad(A_prev, pad)

    for i in range(m):       # para cada imagen
        a_prev_pad = A_prev_pad[i] # shape (n_H_prev+2pad, n_W_prev+2pad, n_C_prev)

        for h in range(n_H): # recorre ejes verticales
            vert_start = h * stride
            vert_end   = vert_start + f 
            for w in range(n_W):                       # recorre ejes horizontales
                horiz_start = w * stride
                horiz_end   = horiz_start + f
                for c in range(n_C):                   # recorre cada filtro / canal de salida
                    a_slice_prev = a_prev_pad[
                        vert_start:vert_end,
                        horiz_start:horiz_end,:]     # shape (f, f, n_C_prev)
                        
                    Z[i, h, w, c] = conv_single_step(a_slice_prev,W[:, :, :, c],b[:, :, :, c])
                    
    cache = (A_prev, W, b, hparameters)
    
    return Z, cache


def pool_forward(A_prev, hparameters, mode = "max"):
    """
    Forward pass for a 2D pooling layer.

    Args:
        A_prev (np.ndarray): Input data of shape (m, n_H_prev, n_W_prev, n_C_prev).
        hparameters (dict): Dictionary with keys:
            - "f" (int): size of the pooling window (f × f).
            - "stride" (int): stride for moving the window.
        mode (str): Pooling mode, either "max" or "average".

    Returns:
        A (np.ndarray): Output of the pooling layer, shape (m, n_H, n_W, n_C_prev).
        cache (tuple): Cached values (A_prev, hparameters) for the backward pass.
    """
    
    (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape
    
    f = hparameters["f"]
    stride = hparameters["stride"]


    if mode not in ("max", "average"):
        raise ValueError("`mode` debe ser 'max' o 'average'")
    if not isinstance(f, int) or f <= 0:
        raise ValueError("`f` debe ser un entero > 0")
    if not isinstance(stride, int) or stride <= 0:
        raise ValueError("`stride` debe ser un entero > 0")
    # Asegurarnos de que (n_H_prev - f) es divisible por stride
    if (n_H_prev - f) % stride != 0 or (n_W_prev - f) % stride != 0:
        raise ValueError("Dimensiones inválidas: comprueba f y stride.")

    
    n_H = int(1 + (n_H_prev - f) / stride)
    n_W = int(1 + (n_W_prev - f) / stride)
    n_C = n_C_prev
    
    A = np.zeros((m, n_H, n_W, n_C))              
    
    for i in range(m):             
        for h in range(n_H):       
            vert_start = h * stride
            vert_end   = vert_start + f
            
            for w in range(n_W):    
                horiz_start = w * stride
                horiz_end   = horiz_start + f
                
                for c in range(n_C): 
                    a_prev_slice = A_prev[i,vert_start:vert_end,horiz_start:horiz_end,c]
                    
                    if mode == "max":
                        A[i, h, w, c] = np.max(a_prev_slice)
                    elif mode == "average":
                        A[i, h, w, c] = np.mean(a_prev_slice)      
    
    cache = (A_prev, hparameters)

    
    return A, cache

## Backward function for the convolution net 

In [8]:
def conv_backward(dZ, cache):
    """
    Implement the backward propagation for a convolution layer
    
    Arguments:
    dZ -- gradient of the cost with respect to the output of the conv layer (Z), numpy array of shape (m, n_H, n_W, n_C)
    cache -- cache of values needed for the conv_backward(), output of conv_forward()
    
    Returns:
    dA_prev -- gradient of the cost with respect to the input of the conv layer (A_prev),
               numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
    dW -- gradient of the cost with respect to the weights of the conv layer (W)
          numpy array of shape (f, f, n_C_prev, n_C)
    db -- gradient of the cost with respect to the biases of the conv layer (b)
          numpy array of shape (1, 1, 1, n_C)
    """    
    A_prev, W, b, hparameters = cache
    stride = hparameters["stride"]
    pad = hparameters["pad"]
    
    m, n_H_prev, n_W_prev, n_C_prev = A_prev.shape
    f, f, n_C_prev, n_C  = W.shape
    _, n_H, n_W, _= dZ.shape
    
    dA_prev = np.zeros_like(A_prev , dtype=A_prev.dtype)     
    dW  = np.zeros_like(W , dtype=W.dtype)     
    db  = np.zeros_like(b , dtype=b.dtype)      
    
    A_prev_pad   = zero_pad(A_prev, pad)
    dA_prev_pad  = zero_pad(dA_prev, pad)
    
    for i in range(m):
        a_prev_pad = A_prev_pad[i]  # (n_H_prev+2pad, n_W_prev+2pad, n_C_prev)
        da_prev_pad = dA_prev_pad[i] # igual shape que a_prev_pad
        
        for h in range(n_H):
            for w in range(n_W):
                for c in range(n_C):
                    # Encontrar coordenadas del slice
                    vert_start  = h * stride
                    vert_end    = vert_start + f
                    horiz_start = w * stride
                    horiz_end   = horiz_start + f
        
                    # Extraer slice de A_prev_pad
                    a_slice = a_prev_pad[vert_start:vert_end,horiz_start:horiz_end,:] 
                    
                    # dA_prev_pad: distribuye dZ * W sobre la ventana correspondiente
                    da_prev_pad[vert_start:vert_end,horiz_start:horiz_end,:] += W[:, :, :, c] * dZ[i, h, w, c]
                    
                    # dW: gradiente del filtro c es sum(a_slice * dZ)
                    dW[:, :, :, c] += a_slice * dZ[i, h, w, c]
                    
                    # db: gradiente del bias c es la suma de dZ sobre todos los ejemplos y posiciones
                    db[:, :, :, c] += dZ[i, h, w, c]
                    
        if pad != 0:
            dA_prev[i, :, :, :] = da_prev_pad[pad:-pad, pad:-pad, :]
        else:
            A_prev[i, :, :, :] = da_prev_pad     

    # Making sure your output shape is correct
    assert(dA_prev.shape == (m, n_H_prev, n_W_prev, n_C_prev))
    
    return dA_prev, dW, db





def create_mask_from_window(x):
    """
    Create a boolean mask identifying the maximum entry in a 2D window.

    Args:
        x (np.ndarray): 2D array of shape (f, f).

    Returns:
        np.ndarray: Boolean mask of the same shape as x, with True at the position(s)
                    of the maximum value in x.
    """  
    mask = (x == np.max(x))
    return mask


def distribute_value(dz, shape):
    """
    Evenly distribute a scalar value over a matrix of specified shape.

    Args:
        dz (float): Scalar value to distribute.
        shape (tuple of int): Tuple (n_H, n_W) specifying the output matrix dimensions.

    Returns:
        np.ndarray: Array of shape (n_H, n_W) where each element equals dz / (n_H * n_W).
    """ 
    (n_H, n_W) = shape
    average = dz / (n_H * n_W)

    a = np.ones((n_H, n_W)) * average
    
    return a


def pool_backward(dA, cache, mode = "max"):
    """
    Perform the backward pass for a 2D pooling layer.

    Args:
        dA (np.ndarray): Gradient of the cost with respect to the output of the pooling layer,
                         of shape (m, n_H, n_W, n_C).
        cache (tuple): Tuple containing:
            - A_prev (np.ndarray): Input data to the pooling layer during forward pass,
                                    shape (m, n_H_prev, n_W_prev, n_C_prev).
            - hparameters (dict): Dictionary with keys:
                'stride' (int): Stride used in pooling,
                'f' (int): Size of the pooling window.
        mode (str): Pooling mode, either 'max' or 'average'.

    Returns:
        np.ndarray: Gradient of the cost with respect to the input of the pooling layer,
                    of shape (m, n_H_prev, n_W_prev, n_C_prev).
    """

    A_prev, hparameters = cache
    stride = hparameters["stride"]
    f = hparameters["f"]
    
    (m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape
    (_, n_H, n_W, n_C) = dA.shape
    
    dA_prev = np.zeros_like(A_prev)
    
    for i in range(m):

        a_prev = A_prev[i]
        for h in range(n_H):
            for w in range(n_W):
                for c in range(n_C):
 
                    vert_start  = h * stride
                    vert_end    = vert_start + f
                    horiz_start = w * stride
                    horiz_end   = horiz_start + f

                    if mode == "max":
 
                        a_prev_slice = a_prev[vert_start:vert_end,horiz_start:horiz_end,c]
                        mask = create_mask_from_window(a_prev_slice)

                        dA_prev[i,vert_start:vert_end,horiz_start:horiz_end,c] += mask * dA[i, h, w, c]

                    elif mode == "average":
                        a = distribute_value(dA[i, h, w, c], (f, f))
                        dA_prev[i,vert_start:vert_end,horiz_start:horiz_end,c] += a

    assert(dA_prev.shape == A_prev.shape)
    
    return dA_prev

## All the clases for the model 

In [9]:
class Conv2D:
    """
    2D convolutional layer with built‑in Adam optimizer support.
    """
    def __init__(self, n_C_prev, n_C,
        f, stride = 1, pad = 0, initialization = 'he',scale = 0.01, seed = None):

        """
        Initialize a Conv2D layer.

        Args:
            n_C_prev (int): Number of channels in the input (depth of A_prev).
            n_C (int): Number of filters (output channels).
            f (int): Size of each filter (filters are f x f).
            stride (int, optional): Stride length for the convolution. Defaults to 1.
            pad (int, optional): Number of zero-padding pixels around the input. Defaults to 0.
            initialization (str, optional): Weight init method: 'he' or 'rand'. Defaults to 'he'.
            scale (float, optional): Scaling factor for 'rand' init. Defaults to 0.01.
            seed (int or None, optional): Random seed for reproducibility. Defaults to None.

        Attributes:
            W (np.ndarray): Filters of shape (f, f, n_C_prev, n_C).
            b (np.ndarray): Biases of shape (1, 1, 1, n_C).
            stride (int): Convolution stride.
            pad (int): Padding size.
            cache (tuple): Cached values for backward pass.
            dW (np.ndarray): Gradient of W.
            db (np.ndarray): Gradient of b.
            mW, vW, mb, vb (np.ndarray): Adam first/second moment buffers.
            t (int): Adam timestep counter.
        """
         
        if seed is not None:
            np.random.seed(seed)

        self.n_C_prev = n_C_prev
        self.n_C      = n_C
        self.f        = f
        self.stride   = stride
        self.pad      = pad
        self.name     = 'conv2d'

        if initialization.lower() == 'he':
            factor = np.sqrt(2.0 / (f * f * n_C_prev))
            self.W = np.random.randn(f, f, n_C_prev, n_C) * factor
        elif initialization.lower() == 'rand':
            self.W = np.random.randn(f, f, n_C_prev, n_C) * scale
        else:
            raise ValueError("`initialization` debe ser 'rand' o 'he'")
        self.b = np.zeros((1, 1, 1, n_C), dtype=self.W.dtype)


        self.cache = None
        self.dW = None
        self.db = None

        self.mW = np.zeros_like(self.W)
        self.vW = np.zeros_like(self.W)
        self.mb = np.zeros_like(self.b)
        self.vb = np.zeros_like(self.b)
        self.t  = 0

    def forward(self, A_prev):
        """
        Perform the forward pass of the convolution.

        Args:
            A_prev (np.ndarray): Input data of shape (m, n_H_prev, n_W_prev, n_C_prev).

        Returns:
            np.ndarray: Convolved output Z of shape (m, n_H, n_W, n_C).
        """
        hparams = {'stride': self.stride, 'pad': self.pad}
        Z, cache = conv_forward(A_prev, self.W, self.b, hparams)
        self.cache = cache
        return Z

    def backward(self, dZ):
        """
        Perform the backward pass of the convolution.

        Args:
            dZ (np.ndarray): Gradient of the loss with respect to the output Z.

        Returns:
            np.ndarray: Gradient with respect to the input A_prev.
        """
        dA_prev, dW, db = conv_backward(dZ, self.cache)
        self.dW, self.db = dW, db
        return dA_prev

    def update_adam(self, lr, beta1=0.9, beta2=0.999, eps=1e-8):
        """
        Update parameters using Adam optimization. Call after backward().

        Args:
            lr (float): Learning rate.
            beta1 (float, optional): Exponential decay rate for the first moment. Defaults to 0.9.
            beta2 (float, optional): Exponential decay rate for the second moment. Defaults to 0.999.
            eps (float, optional): Small constant to prevent division by zero. Defaults to 1e-8.
        """
        self.t += 1

        self.mW = beta1 * self.mW + (1 - beta1) * self.dW
        self.mb = beta1 * self.mb + (1 - beta1) * self.db


        self.vW = beta2 * self.vW + (1 - beta2) * (self.dW ** 2)
        self.vb = beta2 * self.vb + (1 - beta2) * (self.db ** 2)

        mW_hat = self.mW / (1 - beta1 ** self.t)
        mb_hat = self.mb / (1 - beta1 ** self.t)
        vW_hat = self.vW / (1 - beta2 ** self.t)
        vb_hat = self.vb / (1 - beta2 ** self.t)

        self.W -= lr * mW_hat / (np.sqrt(vW_hat) + eps)
        self.b -= lr * mb_hat / (np.sqrt(vb_hat) + eps)
    
    







class MaxPool2D:
    """
    2D max pooling layer.

    Performs non-overlapping max pooling over input feature maps.
    """

    def __init__(self, f: int, stride: int = 2):
        """
        Initialize the MaxPool2D layer.

        Args:
            f (int): Size of the pooling window (f × f).
            stride (int): Stride (step) size for both height and width.
        Raises:
            ValueError: If f or stride is not a positive integer.
        """

        if not isinstance(f, int) or f <= 0:
            raise ValueError("`f` debe ser un entero > 0.")
        if not isinstance(stride, int) or stride <= 0:
            raise ValueError("`stride` debe ser un entero > 0.")

        self.f = f
        self.stride = stride
        self.cache = None
        self.name = 'poollayer'

    def forward(self, A_prev: np.ndarray):
        """
        Forward pass for max pooling.

        Args:
            A_prev (np.ndarray): Input data of shape (m, H_prev, W_prev, C_prev).

        Returns:
            np.ndarray: Pooled output of shape (m, H, W, C_prev),
                        where H and W depend on f and stride.
        """

        hparams = {'f': self.f, 'stride': self.stride}
        A, cache = pool_forward(A_prev, hparams, mode='max')
        self.cache = cache
        return A
    
    def backward(self, dA):
        """
        Backward pass for max pooling.

        Args:
            dA (np.ndarray): Gradient of the loss with respect to the pooled output,
                             of shape (m, H, W, C_prev).

        Returns:
            np.ndarray: Gradient with respect to the input A_prev,
                        of shape (m, H_prev, W_prev, C_prev).
        """

        A_prev, hparams = self.cache
        dA_prev = pool_backward(dA, (A_prev, hparams), mode='max')
        return dA_prev

    






class ReLU:
    """
    Rectified Linear Unit (ReLU) activation layer.

    Attributes:
        name (str): identifier for this activation.
        cache (np.ndarray): stores input Z for use in backward pass.
    """

    def __init__(self):
        """
        Initialize the ReLU layer.
        """
        self.name = 'relu'
        self.cache = None

    def forward(self, Z):
        """
        Forward pass of ReLU.

        Args:
            Z (np.ndarray): pre-activation input of any shape.

        Returns:
            np.ndarray: activations A, where A = max(0, Z).
        """
        A = np.maximum(0, Z)
        self.cache = Z
        return A

    def backward(self, dA):
        """
        Backward pass of ReLU.

        Args:
            dA (np.ndarray): gradient of the loss with respect to the activation output A.

        Returns:
            np.ndarray: gradient of the loss with respect to Z.
        """
        Z = self.cache
        dZ = dA.copy()
        dZ[Z <= 0] = 0
        return dZ
    





class Flatten:
    """
    Flatten layer that reshapes its input into a 2D array
    (batch_size, features).

    Attributes:
        name (str): Identifier for this layer.
        cache (tuple): Stores the original input shape for backward pass.
    """

    def __init__(self):
        """
        Initialize the Flatten layer.
        """

        self.name = 'flatten'
        self.cache = None

    def forward(self, A_prev):
        """
        Forward pass: flatten the input.

        Args:
            A_prev (np.ndarray): Input array of shape (m, ...),
                                 where m is the batch size and ... 
                                 represents any number of additional dimensions.

        Returns:
            np.ndarray: Flattened output of shape (m, features),
                        where features = product of the ... dimensions.
        """


        self.cache = A_prev.shape
        m = A_prev.shape[0]
        return A_prev.reshape(m, -1)

    def backward(self, dA):
        """
        Backward pass: reshape upstream gradients to the original input shape.

        Args:
            dA (np.ndarray): Gradient of the loss w.r.t. the flattened output,
                             of shape (m, features).

        Returns:
            np.ndarray: Gradient reshaped to the original input shape stored in cache.
        """"""
        Backward pass: reshape upstream gradients to the original input shape.

        Args:
            dA (np.ndarray): Gradient of the loss w.r.t. the flattened output,
                             of shape (m, features).

        Returns:
            np.ndarray: Gradient reshaped to the original input shape stored in cache.
        """

        return dA.reshape(self.cache)







class Dense:
    """
    Fully‑connected (dense) layer with softmax activation and Adam optimizer support.
    """

    def __init__(self, n_units, initialization='rand', scale=0.01, seed=None):
        """
        Initialize the Dense layer (weights and Adam buffers will be set on first forward).

        Args:
            n_units (int): Number of output neurons.
            initialization (str): 'he' for He initialization or 'rand' for scaled random.
            scale (float): Scale for 'rand' initialization.
            seed (int or None): Random seed for reproducibility.
        """

        self.n_inputs = None
        self.n_units = n_units
        self.initialization = initialization
        self.scale = scale
        self.seed = seed
        
        self.W = None
        self.b = None
        

        self.dW = None
        self.db = None
        
        self.mW = None
        self.vW = None
        self.mb = None
        self.vb = None
        self.t = 0
        
        self.cache = None
        self.name = 'dense'

    def forward(self, A_prev):
        """
        Forward pass: linear transform followed by softmax.

        Args:
            A_prev (np.ndarray): Input data of shape (m, features).

        Returns:
            np.ndarray: Output probabilities of shape (m, n_units).
        """

        m, features = A_prev.shape

        if self.W is None:
            if self.seed is not None:
                np.random.seed(self.seed)
            self.n_inputs = features

            if self.initialization.lower() == 'he':
                factor = np.sqrt(2.0 / self.n_inputs)
                self.W = np.random.randn(self.n_inputs, self.n_units) * factor
            else:
                self.W = np.random.randn(self.n_inputs, self.n_units) * self.scale
            self.b = np.zeros((1, self.n_units), dtype=self.W.dtype)

            self.mW = np.zeros_like(self.W)
            self.vW = np.zeros_like(self.W)
            self.mb = np.zeros_like(self.b)
            self.vb = np.zeros_like(self.b)

        Z = A_prev @ self.W + self.b
        exps = np.exp(Z - np.max(Z, axis=1, keepdims=True))
        A = exps / np.sum(exps, axis=1, keepdims=True)
        self.cache = (A_prev, A)
        return A

    def backward(self, dA):
        """
        Backward pass: compute gradients for weights, biases, and inputs.

        Args:
            dA (np.ndarray): Upstream gradient of shape (m, n_units).

        Returns:
            np.ndarray: Gradient w.r.t. input A_prev, shape (m, n_inputs).
        """

        A_prev, A = self.cache
        m = A_prev.shape[0]

        S = dA * A
        sum_S = np.sum(S, axis=1, keepdims=True)
        dZ = S - A * sum_S

        self.dW = (A_prev.T @ dZ) / m
        self.db = np.sum(dZ, axis=0, keepdims=True) / m
        dA_prev = dZ @ self.W.T
        return dA_prev

    def update_adam(self, lr, beta1=0.9, beta2=0.999, eps=1e-8):
        """
        Update parameters W and b using Adam optimizer.

        Must be called after backward().

        Args:
            lr (float): Learning rate.
            beta1 (float): Exponential decay rate for first moment.
            beta2 (float): Exponential decay rate for second moment.
            eps (float): Small epsilon to avoid division by zero.
        """

        self.t += 1

        # Update biased first moment estimates
        self.mW = beta1 * self.mW + (1 - beta1) * self.dW
        self.mb = beta1 * self.mb + (1 - beta1) * self.db

        # Update biased second moment estimates
        self.vW = beta2 * self.vW + (1 - beta2) * (self.dW ** 2)
        self.vb = beta2 * self.vb + (1 - beta2) * (self.db ** 2)

        # Compute bias-corrected moments
        mW_hat = self.mW / (1 - beta1 ** self.t)
        mb_hat = self.mb / (1 - beta1 ** self.t)
        vW_hat = self.vW / (1 - beta2 ** self.t)
        vb_hat = self.vb / (1 - beta2 ** self.t)

        # Parameter update
        self.W -= lr * mW_hat / (np.sqrt(vW_hat) + eps)
        self.b -= lr * mb_hat / (np.sqrt(vb_hat) + eps)


## Compute cost for clasification task

In [10]:

def compute_cost(A_final , labels , tipe ='CrossEntropy' ):
    """Compute the loss between predicted probabilities and true labels, with optional L2 regularization.

    Supports binary cross-entropy for two-class problems or categorical
    cross-entropy for multi-class problems. 

    Args:
        A_final (np.ndarray): Predicted probabilities, shape (n_y, m).
        labels (np.ndarray): True labels, shape (n_y, m) or (m,); will be
            reshaped to (1, m) if necessary.
        tipe (str): Type of cost to compute:
            - 'BinaryCrossEntropy': binary cross-entropy loss.
            - 'CrossEntropy': categorical cross-entropy loss.
        caches (dict, optional): Dictionary of cached values from forward pass.
            Used to extract weight matrices 'W1', 'W2', … when `regularization=True`.

    Returns:
        float: The scalar loss value (including regularization term if enabled).

    Raises:
        ValueError: If `tipe` is not one of the supported cost types.
    """

    # Ensure labels are shape (n_y, m)
    if labels.ndim == 1:
        labels = labels.reshape(1, -1)

    m = labels.shape[1]
    eps = 1e-15
    A_safe = np.clip(A_final, eps, 1 - eps)

    # Compute base cost
    if tipe == 'BinaryCrossEntropy':
        logprobs = (labels * np.log(A_safe) +
                    (1 - labels) * np.log(1 - A_safe))
        cost = - (1 / m) * np.sum(logprobs)

    elif tipe == 'CrossEntropy':
        logprobs = labels * np.log(A_safe)
        cost = - (1 / m) * np.sum(logprobs)

    else:
        raise ValueError("`tipe` must be 'BinaryCrossEntropy' or 'CrossEntropy'")

    return float(np.squeeze(cost))

## Forward and Backward pass 

In [11]:

def conv_net_forward(layers, X):
    """
    Perform forward propagation through a sequence of convolutional network layers.

    Args:
        layers (list): Ordered list of layer objects with .name and .forward():
            - 'conv2d': convolutional layer, caches (A_prev, W, b, hparams)
            - 'poollayer': max‑pooling layer, caches (A_prev, hparams)
            - 'relu': ReLU activation, caches Z
            - 'dense': fully‑connected + softmax, caches (A_prev_flat, A)
        X (np.ndarray): Input batch of shape (m, H, W, C) for conv layers.

    Returns:
        A (np.ndarray): Output activations from the final layer.
        caches (dict): Mapping string keys to cached arrays needed for backprop.
    """
    caches = {}
    A = X
    conv, pool, activation, dense = 1, 1, 1, 1

    for idx, layer in enumerate(layers):
        A_prev = A

        if layer.name == 'dense':
            m = A_prev.shape[0]
            A_prev_flat = A_prev.reshape(m, -1)
            A = layer.forward(A_prev_flat)

            A_prev_c, A_c = layer.cache
            caches['A_prev' + str(dense) + ' dense'] = A_prev_c
            caches['A' + str(dense) + ' dense'] = A_c
            dense += 1

        else:
            A = layer.forward(A_prev)

            if layer.name == 'conv2d':
                A_prev_c, W_c, b_c, hp_c = layer.cache
                caches['A_prev' + str(conv) + ' conv2d'] = A_prev_c
                caches['W'+ str(conv) + ' conv2d'] = W_c
                caches['b'+ str(conv) + ' conv2d'] = b_c
                caches['hparameters' + str(conv) + ' conv2d'] = hp_c
                conv += 1

            elif layer.name == 'poollayer':
                A_prev_c, hp_c = layer.cache
                caches['A_prev'+ str(pool) + ' PoolLayer'] = A_prev_c
                caches['hparameters' + str(pool) + ' PoolLayer'] = hp_c
                pool += 1

            elif layer.name == 'relu':
                Z_c = layer.cache
                caches['Z' + str(activation) + ' ReLu'] = Z_c
                activation += 1

    return A, caches


def conv_net_backward(layers , A_out , y_batch , lr):
    """
    Perform the backward pass through a convolutional network and update trainable parameters.

    Args:
        layers (list): List of layer objects in forward order.
        A_out (np.ndarray): Softmax output from the network, shape (m, n_classes).
        y_batch (np.ndarray): One-hot encoded true labels, shape (m, n_classes).
        lr (float): Learning rate for Adam updates on Conv2D and Dense layers.

    Returns:
        dict: Gradients for each layer, keyed by layer index and type, useful for debugging.
    """
    grads = {}
    dA_l = A_out - y_batch
    conv, pool, activation, dense = 1, 1, 1, 1

    for layer in reversed(layers):
        dA_post = dA_l

        if layer.name == 'dense':
            dA_l = layer.backward(dA_post)

            grads['dW' + str(dense) + ' dense'] = layer.dW
            grads['db' + str(dense) + ' dense'] = layer.db 
            layer.update_adam(lr)
            dense += 1

        elif layer.name == 'flatten':
            dA_l = layer.backward(dA_post)

            grads['dA' + str(dense) + ' flatten'] = dA_l
        
        elif layer.name == 'poollayer':
            dA_l = layer.backward(dA_post)

            grads['dA' + str(pool) + ' PoolLayer'] = dA_l
            pool += 1

        elif layer.name == 'relu':
            dA_l = layer.backward(dA_post)

            grads['dZ' + str(activation) + ' ReLu'] = dA_l
            activation += 1

        elif layer.name == 'conv2d':
            dA_l = layer.backward(dA_post)

            
            grads['dW' + str(conv) + ' conv2d'] = layer.dW
            grads['db' + str(conv) + ' conv2d'] = layer.db 
            layer.update_adam(lr)
            conv += 1

        else:
            raise ValueError(f"Capa desconocida: {layer.name}") 
    
    return grads


## Functions to build and train the model

In [12]:

def crear_modelo(filters, pool, n_classes,
                 filter_size=3, stride=1, pad=1,
                 pool_filter=2, pool_stride=2, poo='half'):
    
    """
    Build a convolutional neural network given layer specifications.

    Args:
        filters (list of int): Channel counts for each Conv2D layer, e.g. [3, 8, 16].
        pool (list of int): Pool indicators per conv block; if pool[i] != 0, add pooling after block i.
        n_classes (int): Number of output units in the final Dense layer.
        filter_size (int, optional): Height/width of each Conv2D filter. Default is 3.
        stride (int, optional): Stride for each Conv2D layer. Default is 1.
        pad (int, optional): Zero‑padding for each Conv2D layer. Default is 1.
        pool_filter (int, optional): Filter size for MaxPool2D when poo!='half'. Default is 2.
        pool_stride (int, optional): Stride for MaxPool2D when poo!='half'. Default is 2.
        poo (str, optional): Pooling mode. If 'half', uses (2,2) filter+stride; otherwise uses pool_filter and pool_stride. Default is 'half'.

    Returns:
        list: Sequence of layer objects [Conv2D, ReLU, (MaxPool2D)..., Flatten, Dense].
    """
    
    model = []
    for i in range(len(filters)-1):
        model.append(Conv2D(filters[i], filters[i+1], f=filter_size,
                             stride=stride, pad=pad))
        model.append(ReLU())
        if pool[i] != 0:
            if poo == 'half':
                model.append(MaxPool2D(f=2, stride=2))
            else:
                model.append(MaxPool2D(f=pool_filter, stride=pool_stride))
    model.append(Flatten())
    model.append(Dense(n_units=n_classes, initialization='he'))
    return model



def full_cnn(filters , pool , df_train , epochs ,batch_size , lr):
    """
    Train a CNN end‑to‑end on image data using a simple training loop.

    Args:
        filters (list[int]): Number of channels for each Conv2D layer, e.g. [3, 8, 16].
        pool (list[int]): Indicators for adding a pooling layer after each conv block.
        df_train (pd.DataFrame): DataFrame with columns 'filepath' and 'label' for training.
        epochs (int): Number of training epochs (full passes over the dataset).
        batch_size (int): Number of samples per gradient update.
        lr (float): Learning rate for the optimizer.

    Returns:
        tuple:
            model (list): List of layer objects (Conv2D, ReLU, MaxPool2D, Flatten, Dense).
            history (dict): Contains two lists:
                - 'cost': average cross‑entropy per epoch.
                - 'acc':  average accuracy per epoch.
    """
    steps_per_epoch = len(df_train) // batch_size
    gen = batch_generator(df_train, batch_size=batch_size)
    model = crear_modelo(filters, pool)

    history = {'cost': [], 'acc': []}
    
    for epoch in range(epochs):
         
        epoch_cost = 0.0
        epoch_acc = 0.0

        pbar = tqdm(range(steps_per_epoch),  desc=f"Epoch {epoch+1}/{epochs}", leave=False)

        for _ in pbar:

            X_batch, y_batch = next(gen)
            A_out, _ = conv_net_forward(model, X_batch)
            cost_step = compute_cost(A_out , y_batch , 'CrossEntropy')

            preds = np.argmax(A_out, axis=1)
            trues = np.argmax(y_batch, axis=1)
            acc_step = np.mean(preds == trues)

            epoch_cost += cost_step
            epoch_acc  += acc_step

            conv_net_backward(model , A_out , y_batch , lr)
            
            pbar.set_postfix({
                'cost': f"{cost_step:.4f}",
                'acc':  f"{acc_step:.4f}"})
            
        cost_avg = epoch_cost / steps_per_epoch
        acc_avg  = epoch_acc  / steps_per_epoch
        history['cost'].append(cost_avg)
        history['acc'].append(acc_avg)

        tqdm.write(f"→ Epoch {epoch+1}/{epochs} "f"— cost_avg: {cost_avg:.4f}, acc_avg: {acc_avg:.4f}")

    return model , history

## Create and tetst model

In [13]:

model = [
  Conv2D(n_C_prev=3, n_C=8,  f=3, stride=1, pad=1),
  ReLU(),
  MaxPool2D(f=2, stride=2),

  Conv2D(n_C_prev=8, n_C=16, f=3, stride=1, pad=1),
  ReLU(),
  MaxPool2D(f=2, stride=2),
  
  Flatten(),
  Dense(n_units=12, initialization='he')]


X_batch = np.random.randn(2, 8, 8, 3).astype(np.float32)
batch_size  = X_batch.shape[0]
num_classes = 12

y_int = np.random.randint(0, num_classes, size=batch_size)
y_batch = np.zeros((batch_size, num_classes), dtype=np.float32)
y_batch[np.arange(batch_size), y_int] = 1.0

print("y_batch shape:", y_batch.shape)
print("Sample label indices:", y_int[:5])
print("Sample one‑hot rows:\n", y_batch[:5])

A_out, all_caches = conv_net_forward(model, X_batch)

print("Output shape:", A_out.shape)
print("Keys in caches:", list(all_caches.keys()))


y_batch shape: (2, 12)
Sample label indices: [5 9]
Sample one‑hot rows:
 [[0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]
Output shape: (2, 12)
Keys in caches: ['A_prev1 conv2d', 'W1 conv2d', 'b1 conv2d', 'hparameters1 conv2d', 'Z1 ReLu', 'A_prev1 PoolLayer', 'hparameters1 PoolLayer', 'A_prev2 conv2d', 'W2 conv2d', 'b2 conv2d', 'hparameters2 conv2d', 'Z2 ReLu', 'A_prev2 PoolLayer', 'hparameters2 PoolLayer', 'A_prev1 dense', 'A1 dense']


In [14]:
compute_cost(A_out , y_batch , 'CrossEntropy')

0.8494814124497543

In [15]:
all_caches['A_prev1 conv2d']

array([[[[-1.08535635e+00,  8.84865820e-01, -2.21738863e+00],
         [-4.36401099e-01, -1.50021946e+00, -1.71337533e+00],
         [-1.85090685e+00,  1.30147055e-01, -3.37578982e-01],
         [ 1.53737918e-01,  1.02734935e+00, -1.18749750e+00],
         [ 4.15060997e-01,  6.46146297e-01,  1.33408284e+00],
         [ 2.18879044e-01,  5.83041191e-01, -3.27038527e-01],
         [-1.08417891e-01,  9.85066772e-01,  8.67435873e-01],
         [-2.60802007e+00, -8.33135784e-01,  8.35995793e-01]],

        [[ 1.67270911e+00, -1.30358076e+00, -4.56510335e-01],
         [-1.70925990e-01,  3.69940192e-01, -9.68445241e-01],
         [ 2.79843032e-01,  7.24299312e-01,  1.09825361e+00],
         [-1.09861255e+00, -5.84066212e-01,  6.63183987e-01],
         [-1.68721192e-02,  1.98960721e-01, -1.26223409e+00],
         [-7.63742626e-01,  5.46435237e-01, -6.78651690e-01],
         [ 9.95025277e-01,  1.65827310e+00,  6.72506750e-01],
         [-3.98395121e-01,  3.90984893e-01,  3.19868493e+00]],

    

In [16]:
grads = conv_net_backward(model , A_out , y_batch , 0.001)
print("Keys in grads:", list(grads.keys()))

Keys in grads: ['dW1 dense', 'db1 dense', 'dA2 flatten', 'dA1 PoolLayer', 'dZ1 ReLu', 'dW1 conv2d', 'db1 conv2d', 'dA2 PoolLayer', 'dZ2 ReLu', 'dW2 conv2d', 'db2 conv2d']


In [17]:
grads['dW2 conv2d']

array([[[[-0.04701317, -0.06372207,  0.05919323,  0.04804708,
          -0.03173059,  0.0477537 ,  0.04008454,  0.07483662],
         [ 0.0879787 ,  0.01252406,  0.00900483, -0.0046065 ,
           0.05733547,  0.01931735, -0.03816056, -0.14284468],
         [-0.0024332 ,  0.00409315, -0.02246164, -0.02046628,
          -0.00387669, -0.04575265,  0.02427689, -0.06683799]],

        [[ 0.03052255,  0.0315146 , -0.07496332,  0.09032764,
           0.03411074, -0.02634919,  0.05656244,  0.03023276],
         [-0.02191041, -0.02229101,  0.07038713,  0.06335997,
          -0.04141929, -0.00505811,  0.00635207, -0.00802328],
         [-0.00126535, -0.0255095 ,  0.08382687,  0.03290524,
           0.03654223, -0.07992808, -0.02312746, -0.10790746]],

        [[-0.02733469,  0.04457072, -0.05207625, -0.03088645,
           0.06113013, -0.03147708,  0.00330668, -0.05127046],
         [ 0.02326311, -0.02718658, -0.09092722, -0.05660406,
          -0.04673655,  0.00944437,  0.07034207, -0.0511032

## Trian the model 

In [None]:
filters = [X_batch.shape[3] , 8 , 16]
pool = [1 , 1 ,1]
batch_size = 16
lr = 0.001

model, history = full_cnn(filters , pool , train_df , 5 , batch_size , lr)

## Pablo Reyes
