In [None]:
import os
import numpy as np

from PIL import Image
from sklearn.utils import shuffle

from net.activations.relu import ReLU
from net.models.sequential import Sequential
from net.losses.cross_entropy import CrossEntropy
from net.optimizers.adam import Adam
from net.layers import Parameter, Dropout, Dense, Flatten, MaxPool2D, Conv2D

In [None]:
def load_dataset(root_dir, image_size=(28, 28), max_per_class=50):
    X, y = [], []
    for label in range(10):
        folder = os.path.join(root_dir, str(label))
        files = [
            f for f in os.listdir(folder)
            if f.endswith(".png") or f.endswith(".jpg")
        ]
        files = sorted(files)[:max_per_class]  # take up to 50 per class

        for filename in files:
            path = os.path.join(folder, filename)
            image = Image.open(path).convert("L").resize(image_size)
            X.append(np.array(image) / 255.0)
            y.append(label)

    X = np.array(X)[..., np.newaxis]  # (N, H, W, 1)
    X = np.transpose(X, (0, 3, 1, 2))  # (N, C, H, W)
    y = np.array(y)
    return X, y

In [None]:
train_X, train_y = load_dataset("data/training", max_per_class=50)
test_X, test_y = load_dataset("data/test")

# Shuffle if needed
train_X, train_y = shuffle(train_X, train_y, random_state=42)
test_X, test_y = shuffle(test_X, test_y, random_state=42)

print(f"Train: {train_X.shape}, {train_y.shape}")
print(f"Unique labels in train:", np.unique(train_y, return_counts=True))

In [None]:
def one_hot(y, num_classes=10):
    return np.eye(num_classes)[y]


def get_params_from_model(model):
    params = []
    for layer in model.layers:
        for attr in dir(layer):
            param = getattr(layer, attr)
            if isinstance(param, Parameter):
                params.append(param)
    return params

In [None]:
model = Sequential([
    Conv2D(in_channels=1, out_channels=8, kernel_size=3, padding=1),  # (1, 28, 28) → (8, 28, 28)
    ReLU(),
    MaxPool2D(kernel_size=2),  # (8, 28, 28) → (8, 14, 14)

    Conv2D(in_channels=8, out_channels=16, kernel_size=3, padding=1),  # (8, 14, 14) → (16, 14, 14)
    ReLU(),
    MaxPool2D(kernel_size=2),  # (16, 14, 14) → (16, 7, 7)

    Flatten(),  # (16, 7, 7) → (16 * 7 * 7) -> (784)
    Dense(16 * 7 * 7, 64),  # (784) → (64)
    ReLU(),
    Dropout(0.3),
    Dense(64, 10)  # (64) → (10)
])

loss_fn = CrossEntropy()
optimizer = Adam(params=get_params_from_model(model), lr=1e-3)

In [None]:
def accuracy(pred, target):
    pred_labels = np.argmax(pred, axis=1)
    target_labels = np.argmax(target, axis=1)
    return np.mean(pred_labels == target_labels)

epochs = 10
batch_size = 64
num_classes = 10
lr = 1e-3

train_y_oh = one_hot(train_y, num_classes)
test_y_oh = one_hot(test_y, num_classes)

x_one = train_X[0:1]
y_one = train_y_oh[0:1]

train_loss_log = []
test_acc_log = []

for epoch in range(epochs):
    total_loss = 0
    total_batches = 0

    train_X, train_y_oh = shuffle(train_X, train_y_oh)

    for i in range(0, len(train_X), batch_size):
        x_batch = train_X[i:i+batch_size]
        y_batch = train_y_oh[i:i+batch_size]

        output = model.forward(x_batch)
        if i == 0:
            print("Pred :", np.argmax(output[:5], axis=1))
            print("GT   :", np.argmax(y_batch[:5], axis=1))
            print("Unique preds in batch:", np.unique(np.argmax(output, axis=1)))

        loss = loss_fn.forward(output, y_batch)
        grad = loss_fn.backward()
        model.backward(grad)
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss
        total_batches += 1

    avg_loss = total_loss / total_batches
    train_loss_log.append(avg_loss)

    test_out = model.forward(test_X)
    test_acc = accuracy(test_out, test_y_oh)
    test_acc_log.append(test_acc)

    print(f"[Epoch {epoch+1:02d}] Loss: {avg_loss:.2f} | Test Accuracy: {test_acc:.2f}")

# ~ Fix the convergence issue.