In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.optimizer as optimizer
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, Subset, ConcatDataset,random_split
from tqdm import tqdm
import math
import numpy as np
import os
import time
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import confusion_matrix, accuracy_score
from typing import Any

In [None]:
SEED = 42
DATASET_DIR = "./dataset"
RESULTS_DIR = "./results_mnist_fcnn"
os.makedirs(RESULTS_DIR, exist_ok=True)

# DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE = torch.device("cpu")
print("Device:", DEVICE)

Device: cpu


In [3]:
ARCHITECTURES: dict[str, list[int]] = {
    "arch_3h": [256, 128, 64],
    "arch_4h": [512, 256, 128, 64],
    "arch_5h": [512, 256, 128, 64, 32],
}
CLASSES: set[int] = {
    0,
    1,
    2,
    3,
    4,
}  # NOTE: if we want custom class then use some transformation as it should be from 0 to N

In [4]:
LR = 0.001
MOMENTUM = 0.9
RMSPROP_ALPHA = 0.99  # beta
EPS = 1e-8
ADAM_BETAS = (0.9, 0.999)
STOP_THRESHOLD = 10e-4  # 0.001
MAX_EPOCHS_SAFETY = 20

OPTIMIZERS = [
    "SGD_stochastic",  # batch_size=1
    "BatchGD",         # batch_size = N_train (full-batch)
    "SGD_momentum",    # batch_size=1, momentum=0.9
    "SGD_NAG",         # batch_size=1, nesterov momentum=0.9
    "RMSProp",         # batch_size=1, alpha=0.99 eps=1e-8
    "Adam"             # batch_size=1, betas=(0.9,0.999), eps=1e-8
]

In [5]:
def get_optimizer(name: str, params: optimizer.ParamsT, lr: float) -> optimizer.Optimizer:
    if name == "SGD_stochastic":
        return optim.SGD(params, lr=lr)
    if name == "BatchGD":
        return optim.SGD(params, lr=lr)
    if name == "SGD_momentum":
        return optim.SGD(params, lr=lr, momentum=MOMENTUM, nesterov=False)
    if name == "SGD_NAG":
        return optim.SGD(params, lr=lr, momentum=MOMENTUM, nesterov=True)
    if name == "RMSProp":
        return optim.RMSprop(params, lr=lr, alpha=RMSPROP_ALPHA, eps=EPS)
    if name == "Adam":
        return optim.Adam(params, lr=lr, betas=ADAM_BETAS, eps=EPS)
    raise ValueError(f"Unknown optimizer: {name}")


In [6]:
def load_filtered_mnist() -> tuple[Subset[torchvision.datasets.MNIST], Subset[torchvision.datasets.MNIST]]:
    image_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)) # Standard normalization for MNIST
    ])
    train_dataset = torchvision.datasets.MNIST(
        root=DATASET_DIR,  # Directory to save the data
        train=True,  # Specify training data
        download=True,  # Download the data if not present
        transform=image_transform,
    )
    train_dataset = Subset[torchvision.datasets.MNIST](
        train_dataset,
        [i for i, y in enumerate(train_dataset.targets) if y.item() in CLASSES],
    )
    test_dataset = torchvision.datasets.MNIST(
        root=DATASET_DIR,
        train=False,  # Specify test data
        download=True,
        transform=image_transform,
    )
    test_dataset = Subset[torchvision.datasets.MNIST](
        test_dataset,
        [i for i, y in enumerate(test_dataset.targets) if y.item() in CLASSES],
    )
    full_dataset = ConcatDataset[torchvision.datasets.MNIST](
        [train_dataset, test_dataset]
    )  # as we want 8:2

    total_size = len(full_dataset)
    train_size = int(0.8 * total_size)
    test_size = total_size - train_size

    train_set, test_set = random_split(
        full_dataset,
        [train_size, test_size],
        generator=torch.Generator().manual_seed(SEED)  # reproducibility
    )

    return train_set, test_set

train_dataset, test_dataset = load_filtered_mnist()
N_train = len(train_dataset)
N_test = len(test_dataset)
print(f"Filtered dataset sizes -> Train: {N_train}, Test: {N_test}, classes={CLASSES}")

Filtered dataset sizes -> Train: 28588, Test: 7147, classes={0, 1, 2, 3, 4}


In [7]:
class FCNN(nn.Module):
    def __init__(
        self,
        input_dim: int,
        hidden_sizes: list[int],
        output_dim: int,
        activation: type[nn.Module] = nn.ReLU,
    ):
        super().__init__()
        layers: list[nn.Module] = []
        prev: int = input_dim
        for h in hidden_sizes:
            layers.append(nn.Linear(prev, h))
            layers.append(activation())
            prev = h
        layers.append(nn.Linear(prev, output_dim))
        # NOTE: CrossEntropyLoss expects raw logits, so no softmax here
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

In [8]:
def eval_model(
    model: FCNN, dataloader: DataLoader[torchvision.datasets.MNIST]
) -> tuple[np.ndarray, float]:
    model.eval()
    preds_list: list[np.ndarray] = []
    labels_list: list[np.ndarray] = []
    with torch.no_grad():
        Xb: torch.Tensor
        yb: torch.Tensor
        logits: torch.Tensor
        for Xb, yb in dataloader:
            Xb = Xb.reshape(-1, 28*28).to(DEVICE)
            logits = model(Xb)
            preds = logits.argmax(dim=1).cpu().numpy()
            preds_list.append(preds)
            labels_list.append(yb.numpy())
    preds_all = np.concatenate(preds_list)
    labels_all = np.concatenate(labels_list)
    acc = float((preds_all == labels_all).mean())
    return preds_all, acc

In [9]:
results: list[dict[str, Any]] = []
plots_data: dict[str, dict[str, tuple[list[int], list[float]]]] = {}

loss_fn = nn.CrossEntropyLoss()

for arch_name, hidden_sizes in ARCHITECTURES.items():
    print(f"\n=== ARCH {arch_name} hidden={hidden_sizes}")
    
    base_model = FCNN(784, hidden_sizes, output_dim=len(CLASSES)).to(DEVICE)
    plots_data[arch_name] = {}

    for opt_name in OPTIMIZERS:
        print(f"  -> Optimizer: {opt_name}")

        # reload model from same initial state
        model = FCNN(784, hidden_sizes, output_dim=len(CLASSES)).to(DEVICE)
        model.load_state_dict(base_model.state_dict())

        # set optimizer and batch size policy
        if opt_name == "BatchGD":
            batch_size = N_train  # full-batch
            shuffle = False
        else:
            batch_size = 1
            shuffle = True

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
        test_loader  = DataLoader(test_dataset,  batch_size=128, shuffle=False)  # for evaluation; 128 is fine

        curr_optimizer = get_optimizer(opt_name, model.parameters(), LR)

        epoch = 0
        prev_avg_loss = None
        avg_loss_history: list[float] = []
        epoch_history: list[int] = []
        converged = False
        start_time = time.time()
        avg_loss: float | None = None

        # Run epochs until stopping criterion triggered (or safety cap)
        for _ in tqdm(range(MAX_EPOCHS_SAFETY)):
            epoch += 1
            model.train()
            running_loss = 0.0
            total_samples = 0
            # iterate over batches
            Xb: torch.Tensor
            yb: torch.Tensor
            for Xb, yb in train_loader:
                Xb = Xb.reshape(-1, 28*28).to(DEVICE)
                yb = yb.to(DEVICE)

                curr_optimizer.zero_grad()
                logits = model(Xb)
                loss = loss_fn(logits, yb)
                loss.backward()
                curr_optimizer.step()

                b = Xb.shape[0]
                running_loss += float(loss.item()) * b
                total_samples += b

            avg_loss = running_loss / (total_samples + 1e-12)
            avg_loss_history.append(avg_loss)
            epoch_history.append(epoch)

            # compute train/val accuracy for monitoring
            _, train_acc = eval_model(model, DataLoader(train_dataset, batch_size=256, shuffle=False))
            _, val_acc   = eval_model(model, test_loader)

            # stopping based on absolute difference in successive epoch average training error
            if prev_avg_loss is not None:
                if abs(avg_loss - prev_avg_loss) < STOP_THRESHOLD:
                    converged = True

            prev_avg_loss = avg_loss

            if converged:
                break
        # safety cap: not used as stopping criterion, only to prevent infinite loops
        else:
            print(f"    Safety cap reached ({MAX_EPOCHS_SAFETY}). Stopping.")

        assert avg_loss is not None, ":-("

        elapsed = time.time() - start_time

        # final evaluation & confusion matrices
        preds_train, train_acc_final = eval_model(model, DataLoader(train_dataset, batch_size=256, shuffle=False))
        preds_test, test_acc_final   = eval_model(model, test_loader)

        # labels recovered from dataset subsets
        train_labels = np.concatenate([yb.numpy() for _, yb in DataLoader(train_dataset, batch_size=256, shuffle=False)])
        test_labels  = np.concatenate([yb.numpy() for _, yb in DataLoader(test_dataset,  batch_size=128, shuffle=False)])

        cm_train = confusion_matrix(train_labels, preds_train)
        cm_test  = confusion_matrix(test_labels, preds_test)

        print(f"    done: epochs={epoch}, avg_train_loss={avg_loss:.6f}, train_acc={train_acc_final:.4f}, val_acc={test_acc_final:.4f}, time={elapsed:.1f}s")

        results.append({
            "architecture": arch_name,
            "hidden_sizes": hidden_sizes,
            "optimizer": opt_name,
            "lr": LR,
            "epochs_to_converge": epoch,
            "final_avg_train_loss": avg_loss,
            "train_accuracy": train_acc_final,
            "val_accuracy": test_acc_final,
            "time_s": elapsed,
            "cm_train": cm_train,
            "cm_test": cm_test
        })

        plots_data[arch_name][opt_name] = (epoch_history, avg_loss_history)


=== ARCH arch_3h hidden=[256, 128, 64]
  -> Optimizer: SGD_stochastic


 60%|██████    | 12/20 [06:44<04:29, 33.68s/it]


    done: epochs=13, avg_train_loss=0.000938, train_acc=1.0000, val_acc=0.9924, time=404.1s
  -> Optimizer: BatchGD


  5%|▌         | 1/20 [00:07<02:31,  7.98s/it]


    done: epochs=2, avg_train_loss=1.608717, train_acc=0.2229, val_acc=0.2201, time=8.0s
  -> Optimizer: SGD_momentum


 35%|███▌      | 7/20 [05:18<09:51, 45.53s/it]


    done: epochs=8, avg_train_loss=0.006911, train_acc=0.9987, val_acc=0.9927, time=318.7s
  -> Optimizer: SGD_NAG


 45%|████▌     | 9/20 [07:24<09:02, 49.34s/it]


    done: epochs=10, avg_train_loss=0.004535, train_acc=0.9988, val_acc=0.9924, time=444.0s
  -> Optimizer: RMSProp


100%|██████████| 20/20 [31:01<00:00, 93.09s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=0.402774, train_acc=0.9585, val_acc=0.9544, time=1861.7s
  -> Optimizer: Adam


100%|██████████| 20/20 [54:39<00:00, 163.98s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=0.162206, train_acc=0.9835, val_acc=0.9800, time=3279.6s

=== ARCH arch_4h hidden=[512, 256, 128, 64]
  -> Optimizer: SGD_stochastic


 45%|████▌     | 9/20 [06:17<07:41, 41.99s/it]


    done: epochs=10, avg_train_loss=0.001463, train_acc=0.9999, val_acc=0.9919, time=377.9s
  -> Optimizer: BatchGD


  5%|▌         | 1/20 [00:08<02:39,  8.37s/it]


    done: epochs=2, avg_train_loss=1.614200, train_acc=0.1980, val_acc=0.2074, time=8.4s
  -> Optimizer: SGD_momentum


 50%|█████     | 10/20 [10:50<10:50, 65.09s/it]


    done: epochs=11, avg_train_loss=0.003682, train_acc=0.9990, val_acc=0.9936, time=650.9s
  -> Optimizer: SGD_NAG


 30%|███       | 6/20 [07:18<17:04, 73.15s/it]


    done: epochs=7, avg_train_loss=0.007602, train_acc=0.9976, val_acc=0.9916, time=438.9s
  -> Optimizer: RMSProp


100%|██████████| 20/20 [50:49<00:00, 152.49s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=2.531674, train_acc=0.5061, val_acc=0.5073, time=3049.8s
  -> Optimizer: Adam


100%|██████████| 20/20 [1:45:03<00:00, 315.18s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=0.617513, train_acc=0.9553, val_acc=0.9528, time=6303.5s

=== ARCH arch_5h hidden=[512, 256, 128, 64, 32]
  -> Optimizer: SGD_stochastic


 45%|████▌     | 9/20 [06:34<08:01, 43.82s/it]


    done: epochs=10, avg_train_loss=0.000810, train_acc=0.9999, val_acc=0.9919, time=394.3s
  -> Optimizer: BatchGD


  5%|▌         | 1/20 [00:08<02:35,  8.18s/it]


    done: epochs=2, avg_train_loss=1.616928, train_acc=0.2145, val_acc=0.2201, time=8.2s
  -> Optimizer: SGD_momentum


 30%|███       | 6/20 [06:51<16:01, 68.66s/it]


    done: epochs=7, avg_train_loss=0.010006, train_acc=0.9990, val_acc=0.9927, time=411.9s
  -> Optimizer: SGD_NAG


 35%|███▌      | 7/20 [09:24<17:29, 80.70s/it]


    done: epochs=8, avg_train_loss=0.010637, train_acc=0.9971, val_acc=0.9917, time=564.9s
  -> Optimizer: RMSProp


100%|██████████| 20/20 [51:18<00:00, 153.91s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=1.762907, train_acc=0.5705, val_acc=0.5746, time=3078.2s
  -> Optimizer: Adam


100%|██████████| 20/20 [1:43:44<00:00, 311.22s/it]


    Safety cap reached (20). Stopping.
    done: epochs=20, avg_train_loss=0.473146, train_acc=0.9604, val_acc=0.9565, time=6224.4s


In [10]:
df_rows: list[dict[str, Any]] = []
for r in results:
    df_rows.append({
        "architecture": r["architecture"],
        "hidden_sizes": ",".join(map(str, r["hidden_sizes"])),
        "optimizer": r["optimizer"],
        "lr": r["lr"],
        "epochs_to_converge": r["epochs_to_converge"],
        "final_avg_train_loss": r["final_avg_train_loss"],
        "train_accuracy": r["train_accuracy"],
        "val_accuracy": r["val_accuracy"],
        "time_s": r["time_s"]
    })
df = pd.DataFrame(df_rows)
csv_path = os.path.join(RESULTS_DIR, "summary_results.csv")
df.to_csv(csv_path, index=False)
print("Saved summary CSV:", csv_path)

Saved summary CSV: ./results_mnist_fcnn/summary_results.csv


In [11]:
# plot curves per architecture
for arch_name, opt_data in plots_data.items():
    plt.figure(figsize=(8, 6))
    for opt_name, (epochs, losses) in opt_data.items():
        plt.plot(epochs, losses, label=opt_name)
    plt.xlabel("Epoch")
    plt.ylabel("Average training loss")
    plt.title(f"Avg training loss vs epoch ({arch_name})")
    plt.legend()
    plt.grid(True)
    pth = os.path.join(RESULTS_DIR, f"{arch_name}_loss_curves.png")
    plt.savefig(pth)
    plt.close()
    print("Saved plot:", pth)

Saved plot: ./results_mnist_fcnn/arch_3h_loss_curves.png
Saved plot: ./results_mnist_fcnn/arch_4h_loss_curves.png
Saved plot: ./results_mnist_fcnn/arch_5h_loss_curves.png


In [12]:
# find best architecture by validation accuracy and save confusion matrices
best = max(results, key=lambda x: x["val_accuracy"])
best_report = {
    "best_architecture": best["architecture"],
    "hidden_sizes": best["hidden_sizes"],
    "best_optimizer": best["optimizer"],
    "best_val_accuracy": best["val_accuracy"],
    "train_accuracy": best["train_accuracy"],
    "cm_train": best["cm_train"].tolist(),
    "cm_test": best["cm_test"].tolist()
}

In [13]:
import json
with open(os.path.join(RESULTS_DIR, "best_model_report.json"), "w") as f:
    json.dump(best_report, f, indent=2)
print("Saved best model report (JSON).")

print("All done. Results in:", RESULTS_DIR)

Saved best model report (JSON).
All done. Results in: ./results_mnist_fcnn
