In [40]:
import torch
import csv
import numpy as np
import pandas as pd
import torch.nn.functional as F
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader

In [41]:
class DigitDataset(Dataset):
    """
    Classe para criar um Dataset a partir do arquivo de entrada
    """

    def __init__(self, path):
        """
        Lê o arquivo de entrada e criar um array numpy

        Args:
            path: Nome do arquivo de entrada
        """
        self.data = pd.read_csv(path).values.astype(np.float32)

    def __len__(self):
        """
        Retorna o número de entradas
        """
        return self.data.shape[0]

    def __getitem__(self, index):
        """
        Retorna os valores x e y a partir do índice

        Args:
            index: índice de entrada
        """
        # Formato dos dados: [y[i], x[i,1], x[i,2], ..., x[i,784]]
        data = self.data[index, :]
        x = torch.from_numpy(data[1:]) / 255.0  # Normalizado para valores entre 0 e 1
        y = torch.from_numpy(np.array(data[0]).astype(int))
        return x, y

In [42]:
class MLP(nn.Module):
    """
    Classe que cria um Multilayer Perceptron
    contendo 3 camadas
    1a: Entrada de tamanho 784 valores
    2a: Camada oculta de 25, 50 ou 100 nós
    3a: Saída com 10 valores
    Função de ativação Sigmoid
    Saída em Softmax
    """

    def __init__(self, input_size=784, hidden_layer=25, output_size=10):
        """
        Cria as camadas do MLP

        Args:
            input_size: Número de dimensões da entrada
            hidden_layer: Número de nurônios da camada oculta
            output_size: Número de classes da saída
        """
        super(MLP, self).__init__()
        # Camada de Entrada
        self.fc1 = nn.Linear(input_size, hidden_layer)
        # Camada Oculta
        self.fc2 = nn.Linear(hidden_layer, output_size)

    def forward(self, x):
        """
        Forward Pass

        Args:
            x: Entrada para a rede neural
        """
        # Função de ativação Sigmoid
        x = F.sigmoid(self.fc1(x))
        # Saída por Softmax
        return F.softmax(self.fc2(x), dim=1)

In [43]:
class EarlyStopping:
    """
    Classe para determinar as condições de parada
    """

    def __init__(self, tolerance=4, min_delta=0.0):
        """
        Args:
            tolerance: Número de epochs sem melhorar a loss
            min_delta: Valor mínimo da loss para parar
        """
        self.tolerance = tolerance
        self.min_delta = min_delta
        self.counter = 0
        self.min_validation_loss = np.inf

    def __call__(self, validation_loss):
        """
        Verifica se houve melhoria da loss

        Args:
            validation_loss: Loss da validação
        """
        # Não houve melhoria do valor da loss maior que min_delta
        if (validation_loss + self.min_delta) < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.counter = 0
        # Houve melhoria do valor da loss maior que min_delta
        elif (validation_loss + self.min_delta) >= self.min_validation_loss:
            self.counter += 1
            if self.counter >= self.tolerance:
                return True
        return False

In [44]:
def train_one_epoch(model, train_dataloader, loss_func, optimiser, verbose=True):
    """
    Função para treinar um epoch

    Args:
        model: Modelo a ser treinado
        train_dataloader: DataLoader para o treinamento
        loss_func: Função de perda
        optimiser: Otimizador
        verbose: Verbose do treinamento
    """
    # Colocar o modelo em modo de treinamento
    model.train()
    epoch_loss = 0.0
    for x, y in train_dataloader:
        # zerar os gradientes
        optimiser.zero_grad()
        # forward pass
        y_pred = model(x)
        # calcular a loss
        loss = loss_func(y_pred, y)
        # backward pass
        loss.backward()
        # atualizar os pesos
        optimiser.step()
        # somar a loss
        epoch_loss += loss.item()

    epoch_loss /= len(train_dataloader)
    if verbose:
        print(f"Training loss: {epoch_loss}")
    return epoch_loss


def validate_one_epoch(model, validate_dataloader, loss_func, verbose=True):
    """
    Função para validar um epoch

    Args:
        model: Modelo a ser validado
        validate_dataloader: DataLoader para a validação
        loss_func: Função de perda
        verbose: Verbose do treinamento
    """
    # Colocar o modelo em modo de avaliação
    model.eval()
    epoch_loss = 0.0
    y_eval = []
    with torch.no_grad():
        for x, y in validate_dataloader:
            # forward pass
            y_pred = model(x)
            # calcular a loss
            loss = loss_func(y_pred, y)
            # somar a loss
            epoch_loss += loss.item()
            # adicionar as predições
            y_eval.append(y_pred)
        epoch_loss /= len(validate_dataloader)
        if verbose:
            print(f"Validation loss: {epoch_loss}")
    return epoch_loss, y_eval

In [45]:
def train_and_validate(
    max_epochs=1000,
    batch_size=10,
    hidden_layer=25,
    lr=0.5,
    tolerance=3,
    min_delta=0.01,
    verbose=True,
):
    """
    Função principal do treinamento e validação

    Args:
        max_epochs: Número máximo de epochs
        batch_size: Tamanho do batch
        hidden_layer: Número de nurônios da camada oculta
        lr: Taxa de aprendizado
        tolerance: Número de epochs sem melhorar a loss
        min_delta: Valor mínimo da loss para parar
        verbose: Verbose do treinamento
    """
    print("-------------------------------")
    print(
        "Modelo: Hidden Layer %3d | Batch Size %4d | LR %2.1f"
        % (hidden_layer, batch_size, lr)
    )

    # Cria os datasets
    train_dataset = DigitDataset("data_tp1")
    test_dataset = DigitDataset("validation.csv")
    # Cria os dataloaders
    train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True)
    validate_dataloader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
    # Cria o modelo
    model = MLP(hidden_layer=hidden_layer)
    loss_func = nn.CrossEntropyLoss()
    optimiser = optim.SGD(model.parameters(), lr=lr)
    train_loss = []
    validation_loss = []
    early_stopping = EarlyStopping(tolerance, min_delta)

    for i in range(max_epochs):
        # treinamento
        if verbose:
            print("-------------------------------")
            print(f"Epoch {i+1}")
        epoch_train_loss = train_one_epoch(
            model, train_dataloader, loss_func, optimiser, verbose
        )
        train_loss.append(epoch_train_loss)

        # validação
        with torch.no_grad():
            epoch_validate_loss, y_eval = validate_one_epoch(
                model, validate_dataloader, loss_func, verbose
            )
            validation_loss.append(epoch_validate_loss)

        # critério de parada
        if verbose and (i > 0):
            print(f"Loss delta: {validation_loss[i-1] - validation_loss[i]}")
        if early_stopping(epoch_validate_loss):
            print("-------------------------------")
            print(f"End at epoch: {i+1}")
            print("===============================")
            break

    num_epochs = i + 1
    output = np.array(y_eval).reshape(len(test_dataset), 10).argmax(axis=1)
    torch.save(model, f"./models/model_hl{hidden_layer}_batch{batch_size}_lr{lr}.pt")
    return train_loss, validation_loss, output, num_epochs

In [46]:
def main(oculta=25, batch_size=10, lr=0.5, train_all_models=False, verbose=True):
    """
    Treinamento e validação dos modelos
    Geração do arquivo csv com os resultados na pasta results
    Gravação dos modelos em disco na pasta models

    Args:
        oculta: Número de nurônios da camada oculta
        batch_size: Tamanho do batch
        lr: Taxa de aprendizado
        train_all_models: Treinar todos os modelos

    Atenção:
        Caso train_all_models seja True, fará o treinamento de todas
        as combinações de hiper-parâmetros, totalizando 84 treinamentos.
        Isso vai demorar literalmente algumas horas.
    """
    # Criando um arquivo csv com os resultados
    if train_all_models:
        result = "./results/results_all_models.csv"
    else:
        result = f"./results/result_model_hl{oculta}_batch{batch_size}_lr{lr}.csv"
    with open(result, "w") as f:
        # Lendo os dados
        y_real = pd.read_csv("validation.csv")["label"].values.astype(np.int8)
        fieldnames = [
            "hidden_layer",
            "batch_size",
            "learning_rate",
            "taxa_acerto",
            "num_epochs",
            "training_loss",
            "validation_loss",
        ]
        writer = csv.DictWriter(f, fieldnames, lineterminator="\n")
        writer.writeheader()

        # Hiper-parâmetros
        if train_all_models:
            camadas_ocultas = [25, 50, 100]
            batch_sizes = [1, 10, 50, 100, 500, 1000, 5000]
            learning_rates = [0.1, 0.5, 1.0, 10.0]
        else:
            camadas_ocultas = [oculta]
            batch_sizes = [batch_size]
            learning_rates = [lr]

        i = 1
        n = len(camadas_ocultas) * len(batch_sizes) * len(learning_rates)
        # Todas as combinações de hiper-parâmetros
        for hl in camadas_ocultas:
            for batch in batch_sizes:
                for lr in learning_rates:
                    # Treinamento e validação
                    print(f"Treinamento de modelo {i}/{n}")
                    stats = [hl, batch, lr]
                    params = {
                        "batch_size": batch,
                        "hidden_layer": hl,
                        "lr": lr,
                        "min_delta": 0.0,
                        "tolerance": 4,
                        "verbose": verbose,
                    }
                    train_loss, validation_loss, y_pred, num_epochs = (
                        train_and_validate(**params)
                    )

                    # Comparação
                    num_acertos = (y_real == y_pred).sum()
                    total = len(y_real)
                    taxa_acerto = num_acertos / total
                    # Escrevendo os resultados
                    stats += [taxa_acerto, num_epochs, train_loss, validation_loss]
                    writer.writerow(dict(zip(fieldnames, stats)))
                    i += 1
        f.close()

In [47]:
if __name__ == "__main__":
    main(train_all_models=True, verbose=False)

Treinamento de modelo 1/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size    1 | LR 0.1
-------------------------------
End at epoch: 23
Treinamento de modelo 2/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size    1 | LR 0.5
-------------------------------
End at epoch: 14
Treinamento de modelo 3/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size    1 | LR 1.0
-------------------------------
End at epoch: 15
Treinamento de modelo 4/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size    1 | LR 10.0
-------------------------------
End at epoch: 5
Treinamento de modelo 5/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size   10 | LR 0.1
-------------------------------
End at epoch: 135
Treinamento de modelo 6/84
-------------------------------
Modelo: Hidden Layer  25 | Batch Size   10 | LR 0.5
-------------------------------
End at epoch: 43
Treinamento de modelo 7/84
------------