📝 **Author:** Amirhossein Heydari - 📧 **Email:** <amirhosseinheydari78@gmail.com> - 📍 **Origin:** [mr-pylin/pytorch-workshop](https://github.com/mr-pylin/pytorch-workshop)

---


**Table of contents**<a id='toc0_'></a>    
- [Dependencies](#toc1_)    
- [Classification of MNIST Dataset](#toc2_)    
  - [Utility Function to Store Metrics](#toc2_1_)    
  - [Hyperparameters](#toc2_2_)    
  - [Pre-Processing](#toc2_3_)    
    - [Load MNIST Dataset](#toc2_3_1_)    
    - [Split Train Set into Train and Validation Subsets](#toc2_3_2_)    
    - [Normalization](#toc2_3_3_)    
      - [Append Normalization to the Transforms](#toc2_3_3_1_)    
    - [Create DataLoaders](#toc2_3_4_)    
  - [Custom MLP Model](#toc2_4_)    
  - [Train and Validation Loop](#toc2_5_)    
    - [Analyze Loss and Accuracy over Epochs](#toc2_5_1_)    
  - [Test Loop](#toc2_6_)    
    - [Plot Top_1 Confusion Matrix](#toc2_6_1_)    
    - [Classification Report](#toc2_6_2_)    
  - [Prediction](#toc2_7_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

# <a id='toc1_'></a>[Dependencies](#toc0_)


In [1]:
from pathlib import Path

import matplotlib.pyplot as plt
import torch
from sklearn.metrics import classification_report
from torch import nn, optim
from torch.utils.data import DataLoader, random_split
from torchinfo import summary
from torchmetrics.classification import MulticlassAccuracy, MulticlassConfusionMatrix
from torchvision.datasets import MNIST
from torchvision.transforms import v2

In [2]:
# set a seed for deterministic results
seed = 42
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
# check if cuda is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# log
device

In [None]:
# update paths as needed based on your project structure
DATASET_DIR = Path("../../../../datasets/")
LOG_DIR = Path("./results")

# <a id='toc2_'></a>[Classification of MNIST Dataset](#toc0_)


## <a id='toc2_1_'></a>[Utility Function to Store Metrics](#toc0_)


In [None]:
class MetricsLogger:
    def __init__(
        self,
        train_val_file: str | Path = LOG_DIR / "train_val_metrics.csv",
        test_file: str | Path = LOG_DIR / "test_metrics.csv",
        confusion_matrix_file: str | Path = LOG_DIR / "test_top_1_confusion_matrix.csv",
        test_top_k_acc: int = 5,
        lr_precision: str = ".6f",
        loss_precision: str = "7.5f",
        acc_precision: str = ".3f",
    ):
        self.train_val_file = train_val_file
        self.test_file = test_file
        self.confusion_matrix_file = confusion_matrix_file
        self.test_top_k_acc = test_top_k_acc
        self.lr_precision = lr_precision
        self.loss_precision = loss_precision
        self.acc_precision = acc_precision

        # initialize csv files with headers
        self._initialize_file(
            self.train_val_file,
            "epoch,lr,train_loss,train_acc,val_loss,val_acc\n",
        )
        self._initialize_file(
            self.test_file,
            f"test_loss,{','.join(f'test_top_{i+1}_acc' for i in range(test_top_k_acc))}\n",
        )

    def _initialize_file(self, file_path: str | Path, header: str) -> None:

        # create directory if doesn't exist
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)

        with open(file_path, mode="w") as file:
            file.write(header)

    def log_train_val(
        self, epoch: str, lr: float, train_loss: float, train_acc: float, val_loss: float, val_acc: float
    ) -> None:
        with open(self.train_val_file, mode="a") as file:
            file.write(
                f"{epoch},{lr:{self.lr_precision}},{train_loss:{self.loss_precision}},{train_acc:{self.acc_precision}},{val_loss:{self.loss_precision}},{val_acc:{self.acc_precision}}\n"
            )

    def log_test(self, test_loss: float, *test_top_k_acc: float) -> None:

        if len(test_top_k_acc) != self.test_top_k_acc:
            raise ValueError(f"expected {self.test_top_k_acc} test accuracies, but got {len(test_top_k_acc)}.")

        with open(self.test_file, mode="a") as file:
            file.write(
                f"{test_loss:{self.loss_precision}},{','.join(f'{x:{self.acc_precision}}' for x in test_top_k_acc)}\n"
            )

    def log_confusion_matrix(self, cm: torch.Tensor, labels: list[str]) -> None:

        if cm.dim() != 2:
            raise ValueError("confusion matrix must be a 2D tensor.")

        self._initialize_file(
            self.confusion_matrix_file,
            f",{",".join([f'pred_{label}' for label in labels])}\n",
        )

        max_length_label = max(map(len, labels))

        with open(self.confusion_matrix_file, mode="a") as file:
            for true_label_idx, true_label in enumerate(labels):
                row = [f"true_{true_label:<{max_length_label}}"] + [
                    f"{cm[true_label_idx, pred_idx]}" for pred_idx in range(cm.shape[1])
                ]
                file.write(",".join(row) + "\n")

In [6]:
test_top_k_acc = 3
logger = MetricsLogger(test_top_k_acc=test_top_k_acc)

## <a id='toc2_2_'></a>[Hyperparameters](#toc0_)


In [7]:
TRAIN_BATCH_SIZE = 64
VALIDATION_BATCH_SIZE = 128
TEST_BATCH_SIZE = 128
LEARNING_RATE = 0.01
EPOCHS = 15

## <a id='toc2_3_'></a>[Pre-Processing](#toc0_)

📚 **Tutorials**:

- **Transformations**
  - Learn about common image transformations and how to apply them for data augmentation and normalization.
  - check [vision-transforms.ipynb](../../../utils/vision-transforms.ipynb)
- **Dataset & DataLoader**
  - Understand how to load datasets and efficiently manage batching with DataLoader.
  - check [dataset-dataloader.ipynb](../../../utils/dataset-dataloader.ipynb)
- **Normalizations**
  - Explore techniques for normalizing image data to improve model performance and convergence.
  - check [normalizations.ipynb](../../../utils/normalizations.ipynb)


### <a id='toc2_3_1_'></a>[Load MNIST Dataset](#toc0_)


In [8]:
# initial transforms
transforms = v2.Compose(
    [
        v2.ToImage(),
        v2.ToDtype(torch.float32, scale=True),
    ]
)

In [None]:
# load the MNIST dataset
trainset = MNIST(DATASET_DIR, train=True, transform=transforms, download=False)
testset = MNIST(DATASET_DIR, train=False, transform=transforms, download=False)

classes = trainset.classes
num_classes = len(classes)

# log
print("trainset:")
print(f"    -> trainset.data.shape    : {trainset.data.shape}")
print(f"    -> trainset.data.dtype    : {trainset.data.dtype}")
print(f"    -> type(trainset.data)    : {type(trainset.data)}")
print(f"    -> type(trainset.targets) : {type(trainset.targets)}")
print(f"    -> trainset[0][0].shape   : {trainset[0][0].shape}")
print(f"    -> trainset[0][0].dtype   : {trainset[0][0].dtype}")
print("-" * 50)
print("testset:")
print(f"    -> testset.data.shape    : {testset.data.shape}")
print(f"    -> testset.data.dtype    : {testset.data.dtype}")
print(f"    -> type(testset.data)    : {type(testset.data)}")
print(f"    -> type(testset.targets) : {type(testset.targets)}")
print(f"    -> testset[0][0].shape   : {testset[0][0].shape}")
print(f"    -> testset[0][0].dtype   : {testset[0][0].dtype}")
print("-" * 50)
print(f"classes               : {classes}")
print(f"class_to_idx          : {trainset.class_to_idx}")
print(f"trainset distribution : {torch.unique(trainset.targets, return_counts=True)[1]}")
print(f"testset  distribution : {torch.unique(testset.targets, return_counts=True)[1]}")

### <a id='toc2_3_2_'></a>[Split Train Set into Train and Validation Subsets](#toc0_)


In [None]:
# random split (returns List[Subset])
trainset, validationset = random_split(trainset, [0.9, 0.1])

# log
print("trainset:")
print(f"    -> len(trainset)         : {len(trainset)}")
print(f"    -> trainset[0][0]        : {trainset[0][0].shape}")
print(f"    -> trainset[0][1]        : {trainset[0][1]}")
print(f"    -> type(trainset)        : {type(trainset)}")
print(
    f"    -> trainset distribution : {torch.unique(trainset.dataset.targets[trainset.indices], return_counts=True)[1]}\n"
)
print("validationset:")
print(f"    -> len(validationset)          : {len(validationset)}")
print(f"    -> validationset[0][0]         : {validationset[0][0].shape}")
print(f"    -> validationset[0][1]         : {validationset[0][1]}")
print(f"    -> type(validationset)         : {type(validationset)}")
print(
    f"    -> validationset distribution : {torch.unique(validationset.dataset.targets[validationset.indices], return_counts=True)[1]}\n"
)
print("testset:")
print(f"    -> len(testset)         : {len(testset)}")
print(f"    -> testset[0][0]        : {testset[0][0].shape}")
print(f"    -> testset[0][1]        : {testset[0][1]}")
print(f"    -> type(testset)        : {type(testset)}")
print(f"    -> testset distribution : {torch.unique(testset.targets, return_counts=True)[1]}")

### <a id='toc2_3_3_'></a>[Normalization](#toc0_)


In [None]:
# create a temporary DataLoader for the trainset
temp_trainloader_x = next(iter(DataLoader(trainset, batch_size=len(trainset))))[0]

# calculate the mean and standard deviation
train_mean = temp_trainloader_x.mean().item()  # 0.1307
train_std = temp_trainloader_x.std().item()  # 0.3081

del temp_trainloader_x

# log
print(f"mean of train set per channel : {train_mean}")
print(f"std  of train set per channel : {train_std}")

#### <a id='toc2_3_3_1_'></a>[Append Normalization to the Transforms](#toc0_)


In [None]:
transforms.transforms.append(v2.Normalize(mean=(train_mean,), std=(train_std,)))

# log
print(f"trainset.dataset.transforms:\n{trainset.dataset.transforms}\n")
print(f"validationset.dataset.transforms:\n{validationset.dataset.transforms}\n")
print(f"testset.transforms:\n{testset.transforms}")

In [None]:
# plot
nrows, ncols = 4, 16
fig, axs = plt.subplots(nrows, ncols, figsize=(ncols, nrows + 1), layout="compressed")
plt.suptitle("Transformed First 64 MNIST Test Set Samples")
for i in range(nrows):
    for j in range(ncols):
        axs[i, j].imshow(testset[i * ncols + j][0].squeeze(), cmap="gray")
        axs[i, j].set_title(testset.targets[i * ncols + j].item())
        axs[i, j].axis("off")
plt.savefig(f"{LOG_DIR}/transformed_testset_demo.png", format="png", bbox_inches="tight", dpi=72)
plt.show()

### <a id='toc2_3_4_'></a>[Create DataLoaders](#toc0_)


In [14]:
trainloader = DataLoader(dataset=trainset, batch_size=TRAIN_BATCH_SIZE, shuffle=True, num_workers=2)
validationloader = DataLoader(dataset=validationset, batch_size=VALIDATION_BATCH_SIZE, shuffle=False, num_workers=2)
testloader = DataLoader(dataset=testset, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=2)

In [None]:
# log
first_train_batch = next(iter(trainloader))
first_validation_batch = next(iter(validationloader))
first_test_batch = next(iter(testloader))

print("trainloader [first batch]:")
print(f"    -> x.shape: {first_train_batch[0].shape}")
print(f"    -> x.dtype: {first_train_batch[0].dtype}")
print(f"    -> y.shape: {first_train_batch[1].shape}")
print(f"    -> y.dtype: {first_train_batch[1].dtype}\n")
print("validationloader [first batch]:")
print(f"    -> x.shape: {first_validation_batch[0].shape}")
print(f"    -> x.dtype: {first_validation_batch[0].dtype}")
print(f"    -> y.shape: {first_validation_batch[1].shape}")
print(f"    -> y.dtype: {first_validation_batch[1].dtype}\n")
print("testloader [first batch]:")
print(f"    -> x.shape: {first_test_batch[0].shape}")
print(f"    -> x.dtype: {first_test_batch[0].dtype}")
print(f"    -> y.shape: {first_test_batch[1].shape}")
print(f"    -> y.dtype: {first_test_batch[1].dtype}")

## <a id='toc2_4_'></a>[Custom MLP Model](#toc0_)

📚 **Tutorials**:

- **Gradient**
  - Explore gradient calculations and backpropagation in neural networks
  - check [vision-transforms.ipynb](../../../02-gradient.ipynb)
- **Multi-Layer Perceptron**
  - Learn how to build and train a custom MLP model for classification tasks
  - check [multi-layer-perceptrons.ipynb](../../../05-multi-layer-perceptrons.ipynb)


In [16]:
class CustomMLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dims: list[int], output_dim: int):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Flatten(start_dim=1),
            nn.Linear(input_dim, hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(hidden_dims[0], hidden_dims[1]),
            nn.ReLU(),
            nn.Linear(hidden_dims[1], output_dim),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.classifier(x)
        return x

In [None]:
depth, height, width = trainset[0][0].shape
input_dim = depth * height * width
hidden_dims = [64, 32]
output_dim = num_classes

# log
print(f"input  dim  : {input_dim}")
print(f"hidden dims : {hidden_dims}")
print(f"output dim  : {output_dim}")

In [None]:
# initialize the model
model = CustomMLP(input_dim, hidden_dims, output_dim).to(device)

# log
model

In [None]:
summary(model, input_size=(TRAIN_BATCH_SIZE, *trainset[0][0].shape))

In [20]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=1, threshold=1e-2)

## <a id='toc2_5_'></a>[Train and Validation Loop](#toc0_)


In [21]:
train_acc_per_epoch = []
train_loss_per_epoch = []
validation_acc_per_epoch = []
validation_loss_per_epoch = []

In [22]:
train_acc = MulticlassAccuracy(num_classes, top_k=1).to(device)
val_acc = MulticlassAccuracy(num_classes, top_k=1).to(device)

In [None]:
for epoch in range(EPOCHS):

    # train loop
    model.train()
    train_loss = 0

    for x, y in trainloader:

        # send data to GPU
        x, y_true = x.to(device), y.to(device)

        # forward
        y_pred = model(x)
        loss = criterion(y_pred, y_true)

        # backward
        loss.backward()

        # update parameters
        optimizer.step()
        optimizer.zero_grad()

        # store loss and accuracy per iteration
        train_loss += loss.item() * len(x)
        train_acc.update(y_pred, y_true)

    # store loss and accuracy per epoch
    train_loss_per_epoch.append(train_loss / len(trainset))
    train_acc_per_epoch.append(train_acc.compute().item())
    train_acc.reset()

    # validation loop
    model.eval()
    val_loss = 0

    with torch.no_grad():
        for x, y in validationloader:

            # send data to GPU
            x, y_true = x.to(device), y.to(device)

            # forward
            y_pred = model(x)
            loss = criterion(y_pred, y_true)

            # store loss and accuracy per iteration
            val_loss += loss.item() * len(x)
            val_acc.update(y_pred, y_true)

    # store loss and accuracy per epoch
    validation_loss_per_epoch.append(val_loss / len(validationset))
    validation_acc_per_epoch.append(val_acc.compute().item())
    val_acc.reset()

    # lr scheduler
    scheduler.step(validation_loss_per_epoch[epoch])

    # store train and validation metrics
    logger.log_train_val(
        epoch=f"{epoch+1:0{len(str(EPOCHS))}}",
        lr=scheduler.get_last_lr()[0],
        train_loss=train_loss_per_epoch[epoch],
        train_acc=train_acc_per_epoch[epoch],
        val_loss=validation_loss_per_epoch[epoch],
        val_acc=validation_acc_per_epoch[epoch],
    )

    # log
    print(
        f"epoch {epoch+1:0{len(str(EPOCHS))}}/{EPOCHS} -> lr: {scheduler.get_last_lr()[0]:.5f} | train[loss: {train_loss_per_epoch[epoch]:.5f} - acc: {train_acc_per_epoch[epoch]*100:5.2f}%] | validation[loss: {validation_loss_per_epoch[epoch]:.5f} - acc: {validation_acc_per_epoch[epoch]*100:5.2f}%]"
    )

### <a id='toc2_5_1_'></a>[Analyze Loss and Accuracy over Epochs](#toc0_)


In [None]:
fig, ax = plt.subplots(1, 2, figsize=(14, 4), layout="compressed")
ax[0].plot(train_acc_per_epoch, label="Train Accuracy", marker="o", color="blue")
ax[0].plot(validation_acc_per_epoch, label="Validation Accuracy", marker="o", color="orange")
ax[0].set(
    title="Accuracy Over Epochs",
    xlabel="Epoch",
    ylabel="Accuracy",
    xticks=range(EPOCHS),
)
ax[0].legend()
ax[0].grid()
ax[1].plot(train_loss_per_epoch, label="Train Loss", marker="o", color="blue")
ax[1].plot(validation_loss_per_epoch, label="Validation Loss", marker="o", color="orange")
ax[1].set(title="Loss Over Epochs", xlabel="Epoch", ylabel="Loss", xticks=range(EPOCHS))
ax[1].legend()
ax[1].grid()
plt.suptitle("Training and Validation Accuracy and Loss Over Epochs")
plt.savefig(f"{LOG_DIR}/train_val_metrics.svg", format="svg", bbox_inches="tight")
plt.show()

## <a id='toc2_6_'></a>[Test Loop](#toc0_)


In [None]:
top_k_acc = []
true_labels = []
predictions = []

for k in range(test_top_k_acc):

    model.eval()
    test_acc = MulticlassAccuracy(num_classes, top_k=k + 1).to(device)
    test_loss = 0.0

    with torch.no_grad():
        for x, y_true in testloader:

            # move batch of features and labels to <device>
            x, y_true = x.to(device), y_true.to(device)

            # forward
            y_pred = model(x)

            # loss
            loss = criterion(y_pred, y_true)

            # store loss and accuracy per iteration
            test_loss += loss.item() * len(x)
            test_acc.update(y_pred, y_true)

            # store predictions and true_labels
            if k == 0:
                predictions.extend(y_pred.argmax(dim=1).cpu())
                true_labels.extend(y_true.cpu())

    # store loss and accuracy per epoch
    test_loss /= len(testset)
    test_acc = test_acc.compute().item()
    top_k_acc.append(test_acc)

# log
print(f"test[loss: {test_loss:.5f} | {' - '.join(f'top_{i} acc: {a*100:5.2f}%' for i, a in enumerate(top_k_acc))}")

In [26]:
# store test metrics
logger.log_test(test_loss, *top_k_acc)

In [27]:
predictions = torch.tensor(predictions).to("cpu")
true_labels = torch.tensor(true_labels).to("cpu")

### <a id='toc2_6_1_'></a>[Plot Top_1 Confusion Matrix](#toc0_)


In [28]:
confmat = MulticlassConfusionMatrix(num_classes)
cm = confmat(predictions, true_labels)

In [29]:
# store confusion matrix
logger.log_confusion_matrix(cm, labels=[str(i) for i in testset.class_to_idx.values()])

In [None]:
# plot
fig, ax = plt.subplots(figsize=(8, 6))
ax.set(title="Confusion Matrix based on top_1 accuracy")
confmat.plot(ax=ax, cmap="Blues")
cbar = plt.colorbar(ax.images[0], ax=ax)
cbar.set_label("Count", rotation=270, labelpad=15)
plt.show()

### <a id='toc2_6_2_'></a>[Classification Report](#toc0_)

In [None]:
# top_1 classification report
print(classification_report(true_labels, predictions))

## <a id='toc2_7_'></a>[Prediction](#toc0_)


In [None]:
def predict(
    model: nn.Module, data: torch.Tensor, classes: list[str], transform: v2.Compose | None = None
) -> torch.Tensor:

    # add batch & channel dimension to a single data
    if len(data.shape) == 2:
        data = data[torch.newaxis, :, :, torch.newaxis]

    # apply the transform
    if transform:
        data = torch.stack([transform(image) for image in data])

    # predict
    model.eval()
    with torch.no_grad():

        # send data to GPU
        data = data.to(device)

        # forward
        y_pred = model(data).argmax(dim=1).cpu()

    return y_pred

In [None]:
# first 64 images of test set to demonstrate prediction section
raw_data = MNIST(DATASET_DIR, train=False, transform=None, download=False).data[:64]
y_pred = predict(model, data=raw_data, classes=classes, transform=transforms)

# plot
nrows, ncols = 4, 16
fig, axs = plt.subplots(nrows, ncols, figsize=(ncols, nrows), layout="compressed")
for i in range(nrows):
    for j in range(ncols):
        axs[i, j].imshow(transforms(raw_data[i * ncols + j]).squeeze(), cmap="gray")
        axs[i, j].set_title(y_pred[i * ncols + j].item())
        axs[i, j].axis("off")
plt.show()