### Image Classification with MNIST dataset, PyTorch Lightning and MLFlow

* integration with MLFlow deployed on OCI
* added checkpoint to save best based on val_loss

In [2]:
import os

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sn
import torch

from IPython.core.display import display

from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import CSVLogger
from pytorch_lightning.loggers import MLFlowLogger

from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
from torchmetrics import Accuracy
from torchvision import transforms

# from here we get MNIST dataset
from torchvision.datasets import MNIST

# import MLflow API for experiment tracking
import mlflow

sn.set()

%matplotlib inline

#### setting for MLFlow:
To be able to connect to the tracking server via MLflow Python API you need to setup some environment variables
with: 
* the URI for the tracking server
* the username and pwd to connect to MLflow
* S3 credentials (on OCI we provide an S3 compatibility API)

I have specified these values inside the config.py file as
```
MLFLOW_TRACKING_URI = "http://<tracking server ip>:3000"
MLFLOW_TRACKING_USERNAME = "user1"
....
```
and then I make them visible, as MLflow wants, as environment variables

In [3]:
from config import (MLFLOW_TRACKING_URI, MLFLOW_TRACKING_USERNAME,
                    MLFLOW_TRACKING_PASSWORD, MLFLOW_S3_ENDPOINT_URL,
                    AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

os.environ['MLFLOW_TRACKING_URI'] = MLFLOW_TRACKING_URI
os.environ['MLFLOW_TRACKING_USERNAME'] = MLFLOW_TRACKING_USERNAME
os.environ['MLFLOW_TRACKING_PASSWORD'] = MLFLOW_TRACKING_PASSWORD
# for storing on OSS
os.environ['MLFLOW_S3_ENDPOINT_URL'] = MLFLOW_S3_ENDPOINT_URL
os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY

In [4]:
# where we're storing the downloaded dataset
PATH_DATASETS = "."

BATCH_SIZE = 256 if torch.cuda.is_available() else 64

#### Configure what you want to log in MLflow
You have simply to log in validation step (see below), as usual with Lightning)

In [5]:
class LitMNISTCNN(LightningModule):
    def __init__(self, data_dir=PATH_DATASETS, learning_rate=2e-4):

        super().__init__()

        # Set our init args as class attributes
        self.data_dir = data_dir
        self.learning_rate = learning_rate

        # dataset specific attributes
        self.num_classes = 10
        # shape of input images in MNIST
        self.dims = (1, 28, 28)
        channels, width, height = self.dims
        
        self.transform = transforms.Compose(
            [
                transforms.ToTensor(),
                # normalization is clarified here
                # https://discuss.pytorch.org/t/normalization-in-the-mnist-example/457
                transforms.Normalize((0.1307,), (0.3081,)),
            ]
        )

        # Define PyTorch model: a simple CNN
        self.model = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=5),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Conv2d(32, 64, kernel_size=5),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Flatten(),
            nn.Linear(3*3*64, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, self.num_classes),
        )

        self.val_accuracy = Accuracy()
        self.test_accuracy = Accuracy()

    def forward(self, x):
        # the model outputs logits not probabilities
        # this is better for numerical stability
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        self.val_accuracy.update(preds, y)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", self.val_accuracy, prog_bar=True)

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        self.test_accuracy.update(preds, y)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log("test_loss", loss, prog_bar=True)
        self.log("test_acc", self.test_accuracy, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

    # dataset and dataloaders

    def prepare_data(self):
        # download
        MNIST(self.data_dir, train=True, download=True)
        MNIST(self.data_dir, train=False, download=True)

    def setup(self, stage=None):

        # Assign train/val datasets for use in dataloaders
        if stage == "fit" or stage is None:
            mnist_full = MNIST(self.data_dir, train=True, transform=self.transform)
            self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000])

        # Assign test dataset for use in dataloader(s)
        if stage == "test" or stage is None:
            self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)

    def train_dataloader(self):
        return DataLoader(self.mnist_train, batch_size=BATCH_SIZE, num_workers=6)

    def val_dataloader(self):
        return DataLoader(self.mnist_val, batch_size=BATCH_SIZE, num_workers=6)

    def test_dataloader(self):
        return DataLoader(self.mnist_test, batch_size=BATCH_SIZE, num_workers=6)

#### Create the MLflow logger
* specify the name for the experiment and the run
* log hyper-parameters

In [6]:
EXP_NAME = "LT005"
RUN_NAME = "run001"

mlf_logger = MLFlowLogger(experiment_name=EXP_NAME, run_name = RUN_NAME, 
                          tracking_uri=MLFLOW_TRACKING_URI)

# how to log HP
params = {"lr": 2e-4}
mlf_logger.log_hyperparams(params)

checkpoint_callback = ModelCheckpoint(dirpath="checkpoint_mnist", save_top_k=2, monitor="val_loss",
                                     mode="min",
                                     filename="mnist-{epoch:02d}-{val_loss:.2f}")

Experiment with name LT005 not found. Creating it.


#### Add the MLflow logger 
to the list of loggers in the Trainer

In [7]:
model = LitMNISTCNN()

trainer = Trainer(
    accelerator="auto",
    devices=1 if torch.cuda.is_available() else None,
    max_epochs=20,
    callbacks=[TQDMProgressBar(refresh_rate=20), checkpoint_callback],
    logger=[CSVLogger(save_dir="logs/"), mlf_logger]
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


#### Fit

In [8]:
trainer.fit(model)

  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type       | Params
---------------------------------------------
0 | model         | Sequential | 228 K 
1 | val_accuracy  | Accuracy   | 0     
2 | test_accuracy | Accuracy   | 0     
---------------------------------------------
228 K     Trainable params
0         Non-trainable params
228 K     Total params
0.912     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=20` reached.


In [9]:
trainer.test()

  + f" You can pass `.{fn}(ckpt_path='best')` to use the best model or"
Restoring states from the checkpoint path at /home/datascience/pytorch-on-oci/ch-mlflow/checkpoint_mnist/mnist-epoch=19-val_loss=0.02-v1.ckpt
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Loaded model weights from checkpoint at /home/datascience/pytorch-on-oci/ch-mlflow/checkpoint_mnist/mnist-epoch=19-val_loss=0.02-v1.ckpt


Testing: 0it [00:00, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc            0.9941999912261963
        test_loss          0.019012711942195892
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.019012711942195892, 'test_acc': 0.9941999912261963}]

#### Plot the metrics' history

In [None]:
# modified adding [0] since we have now two loggers
metrics = pd.read_csv(f"{trainer.logger[0].log_dir}/metrics.csv")
del metrics["step"]
metrics.set_index("epoch", inplace=True)

# if you want also in tabular format
# display(metrics.dropna(axis=1, how="all").tail(10))
sn.relplot(data=metrics, kind="line").set(title='Metrics');
plt.grid(True)
plt.show()

#### Reload from checkpoint and test the model

In [None]:
model = LitMNISTCNN.load_from_checkpoint("./checkpoint_mnist/best.ckpt")

In [None]:
model

In [None]:
trainer.test(model)