# Resolvendo o MNIST com um Multilayer Perceptron (MLP)

Exemplo simples de um experimento para resolver o MNIST. Visto que estamos executando em CPU, ele demora um tempo considerável.

## Imports

In [1]:
import os
from typing import Type, Callable
import enum
import random

import mlflow
from mlflow.models import infer_signature
import numpy as np
import pandas as pd
import plotly.express as px
import torch
from torch import nn
from torch import optim
from torchmetrics.classification import F1Score, Precision, Recall, Accuracy
import torchvision.datasets as ds
import torchvision.transforms as transforms

# Ao definirmos os valores para essas variáveis de ambiente,
#   conseguimos acessar os dados guardados no servidor remoto!
os.environ['MLFLOW_TRACKING_URI']= "http://<URL>:8080"
os.environ['MLFLOW_TRACKING_USERNAME']= "<USER>"
os.environ['MLFLOW_TRACKING_PASSWORD']= "<PASSWORD>"

## Parâmetros

In [2]:
# Primeiro, precisamos marcar essa célula com a tag "parameters"
# https://papermill.readthedocs.io/en/latest/usage-parameterize.html

experiment_name: str = 'tutorials/examples/mnist'
run_name: str = 'mlp_variant_1'

n_hidden: list[int] = [500]
batch_size: int = 100
epochs: int = 20
loss_fn: str = 'CrossEntropyLoss'
init: str = 'HE_NORMAL'
optimizer: str = 'AdamW'
lr: float = 0.001
random_state = 27894018

## Funções e classes utilitárias

In [3]:
class Initialization(enum.Enum):
  ZEROES = nn.init.zeros_
  UNIFORM = nn.init.uniform_
  XAVIER_UNIFORM = nn.init.xavier_uniform_
  XAVIER_NORMAL = nn.init.xavier_normal_
  HE_UNIFORM = nn.init.kaiming_uniform_
  HE_NORMAL = nn.init.kaiming_normal_


class MLP:
  def __init__(self,
               n_input: int,
               n_output: int,
               n_hidden: list[int],
               loss_fn,
               batch_size: int = 64,
               epochs: int = 100,
               dropout_prob: list[float | None] = None,
               output_fn: Callable[[], nn.Module] = None,
               hidden_fn: Callable[[], nn.Module] = None,
               init: Initialization = Initialization.XAVIER_NORMAL,
               optimizer: Type[optim.Optimizer] = optim.SGD,
               optimizer_params: dict = None):
    # Set defaults
    if dropout_prob is None:
      dropout_prob = [None] * len(n_hidden)

    if output_fn is None:
      output_fn = lambda: nn.Softmax(dim=1)

    if hidden_fn is None:
      hidden_fn = nn.ReLU

    if optimizer_params is None:
      optimizer_params = dict(lr=0.001)

    # Assertions
    assert len(n_hidden) > 0
    assert len(dropout_prob) == len(n_hidden)

    # Input layer
    layers = [nn.Flatten()]

    # Hidden layers
    for i in range(len(n_hidden)):
      n_prev = n_input if i <= 0 else n_hidden[i - 1]
      hidden_layer = [nn.Linear(n_prev, n_hidden[i], bias=True),
                      hidden_fn()]
      # Check whether we should add a dropout
      #   layer or not
      dropout = dropout_prob[i]
      if dropout is not None:
        hidden_layer.append(nn.Dropout(dropout))

      # Extend layers with hidden layer
      layers.extend(hidden_layer)

    # Output layer
    layers.extend((nn.Linear(n_hidden[-1], n_output, bias=True),
                   output_fn()))

    # Store variables
    self._epochs = epochs
    self._loss = loss_fn
    self._batch_size = batch_size
    self._signature = None

    # Create MLP
    self._mlp = nn.Sequential(*layers)

    # Create optimizer
    self._optim = optimizer(self._mlp.parameters(),
                            **optimizer_params)

    # Create error history
    self._train_error = []
    self._test_error = []

    # Initialize weights
    def init_weights(m):
      if isinstance(m, nn.Linear):
          init(m.weight)

    self._mlp.apply(init_weights)

  @property
  def model(self) -> nn.Module:
    return self._mlp

  @property
  def optimizer(self) -> optim.Optimizer:
    return self._optim
  
  @property
  def signature(self):
    return self._signature

  def fit(self,
          train: torch.utils.data.Dataset,
          test: torch.utils.data.Dataset) -> None:
    # Create DataLoader from dataset
    train_loader = torch.utils.data.DataLoader(dataset=train,
                                               batch_size=self._batch_size,
                                               pin_memory=True,
                                               shuffle=True)
    test_loader = torch.utils.data.DataLoader(dataset=test,
                                              batch_size=self._batch_size,
                                              pin_memory=True,
                                              shuffle=False)
    # Start training
    for _ in range(self._epochs):
      total_loss = torch.tensor(0.0)
      n = 0

      # Set network to train mode
      self._mlp.train()

      # For each batch
      for X, y in train_loader:
        # Zero gradient for optimizer
        self._optim.zero_grad()

        # Predictions for current batch
        predictions = self._mlp(X)

        # Infer signature
        if self._signature is None:
          self._signature = infer_signature(
            X.numpy(), predictions.detach().numpy())

        # Loss for current batch
        loss = self._loss(predictions, y)

        # Backward pass
        loss.backward()

        # Optimization step
        self._optim.step()

        # Update epoch loss
        with torch.no_grad():
          total_loss += loss
          n += 1

      # Obtain average loss over batches
      total_loss = total_loss / n
      self._train_error.append(total_loss)

      # Set network to evaluation mode
      self._mlp.eval()

      # Obtain error in test set
      total_loss = torch.tensor(0.0)
      n = 0
      for X, y in test_loader:
        with torch.no_grad():
          total_loss += self._loss(self._mlp(X), y)
          n += 1

      # Store test loss
      total_loss = total_loss / n
      self._test_error.append(total_loss)

  def predict(self, X: torch.Tensor) -> torch.Tensor:
    self._mlp.eval()
    return self._mlp(X)

  def train_metrics(self) -> tuple[torch.Tensor, torch.Tensor]:
    train_error = torch.tensor(self._train_error)
    test_error = torch.tensor(self._test_error)
    return train_error, test_error

In [4]:
def plot_training_error(model_name: str,
                        train_error: torch.Tensor,
                        test_error: torch.Tensor,
                        metric_name: str = 'Error'):
  train = [dict(Epoch=i, Error=v, Set='Treino')
           for i, v in enumerate(train_error.numpy())]
  test = [dict(Epoch=i, Error=v, Set='Teste')
           for i, v in enumerate(test_error.numpy())]
  df = pd.DataFrame(train + test)
  fig = px.line(df,
                x="Epoch",
                y="Error",
                color="Set",
                title=(f"{metric_name} do "
                      f"modelo {model_name} "
                      "durante treinamento"))
  return fig


def classification_metrics_df(model: MLP, test: torch.utils.data.Dataset):
  # Obtain target and X
  target = test.targets
  X = test.data.detach().type(torch.FloatTensor)
  n_classes = target.unique().size(dim=0)

  # Obtain predictions
  preds = model.predict(X)
  _, preds = torch.max(preds, 1)
  data = []

  # Metrics
  for avg in [None, 'weighted']:
    for m in [F1Score, Precision, Recall, Accuracy]:
      value = m(task="multiclass",
                num_classes=n_classes,
                average=avg)(preds, target).numpy()
      name = m.__name__

      if not avg:
        data.extend([dict(Class=str(i),
                          Value=v,
                          Metric=name)
                     for i, v, in enumerate(value)])
      else:
        data.append(dict(Class='Weighted',
                         Value=value,
                         Metric=name))

  # Create DataFrane
  return pd.DataFrame(data).sort_values(by=["Class", "Metric"])

def plot_classification_metrics(model: MLP,
                                test: torch.utils.data.Dataset,
                                df=None):
  if df is None:
    df = classification_metrics_df(model, test)

  fig = px.bar(df,
                x='Class',
                y='Value',
                color='Metric',
                title="Métricas de Classificação",
                barmode='group')
  fig.update_layout(xaxis=dict(dtick=1))
  return fig

## Carregamento dos dados

In [5]:
# Carregando split de treinamento
train_dataset = ds.MNIST(root='../../data/',
                         train=True,
                         transform=transforms.ToTensor(),
                         download=True)

# Carregando o split de testes
test_dataset = ds.MNIST(root='../../data/',
                        train=False,
                        transform=transforms.ToTensor())

## Execução dos Experimentos

In [6]:
# Configurando seeds randômicas
torch.manual_seed(random_state)
np.random.seed(random_state)
random.seed(random_state)

In [7]:
# Vamos criar um experimento com esse nome caso ele não exista
# Primeiro, obtemos uma lista de experimentos com esse nome
experiments = mlflow.search_experiments(
    filter_string=f"name = '{experiment_name}'")

# Caso não tenham sido encontrados experimentos, precisamos criar um novo
if len(experiments) <= 0:
    experiment_id = mlflow.create_experiment(name=experiment_name)
else:
    experiment_id = experiments[0].experiment_id

# Vamos iniciar uma nova run para armazenar os resultados
with mlflow.start_run(experiment_id=experiment_id,
                      run_name=run_name) as run:

    # Realizando o log de parâmetros e
    #  hiper-parâmetros
    mlflow.log_params({
        'n_hidden': n_hidden,
        'batch_size': batch_size,
        'epochs': epochs,
        'loss_fn': loss_fn,
        'init': init,
        'optimizer': optimizer,
        'lr': lr,
        'random_state': random_state,
        'train_samples': len(train_dataset),
        'test_samples': len(test_dataset)
    })

    # Instanciando o classificador
    mlp = MLP(n_input=784,
              n_hidden=n_hidden,
              n_output=10,
              batch_size=batch_size,
              epochs=epochs,
              loss_fn=getattr(nn, loss_fn)(),
              init=getattr(Initialization, init),
              optimizer=getattr(optim, optimizer),
              optimizer_params=dict(lr=lr))

    # Realizando treinamento do classificador
    mlp.fit(train_dataset, test_dataset)

    # Salvando modelo
    mlflow.pytorch.log_model(mlp.model, "mlp", 
                             signature=mlp.signature)

    # Obtendo as métricas de classificação
    df = classification_metrics_df(mlp, test_dataset)
    report = df[df['Class'] == 'Weighted'].to_dict('records')

    # Salvando as métricas de classificação
    for entry in report:
        mlflow.log_metric(f"Weighted {entry['Metric']}",
                          entry['Value'])

    # Obtenção da figura do erro de treinamento
    error_fig = plot_training_error('MLP', 
                                    *mlp.train_metrics(), 
                                    'Cross Entropy')

    # Salvando imagem
    error_fig.write_image('training_error.png')

    # Salvando o artefato
    mlflow.log_artifact('training_error.png')

    # Removendo arquivos
    os.remove('training_error.png')

