In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# If True, test is running on Colab. Otherwise, test if assumed to be offline.
TEST_ON_COLAB = False
FOLDERNAME = None  # only used if TEST_ON_COLAB is True

assert not (FOLDERNAME is None and TEST_ON_COLAB), "FOLDERNAME has to be set if TEST_ON_COLAB is True"

In [None]:
import torch

# Set the device to be used for training (cuda:0 if there is a GPU available, otherwise cpu)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Atividade de programação 3 (AP3)
## Rede neural convolucional
### Valor: 25 pontos

Responda as questões indicadas e preencha os campos indicados com o tag `TODO`.

Para essa atividade, você deverá utilizar *obrigatoriamente* um dataset de classificação de imagens. Pesquise um dataset de imagens de sua preferência (exceto mnist) utilizando o site `https://openml.org/`. O dataset será baixado através de função `fetch_openml` da biblioteca `scikit-learn`.

O código a seguir realiza o download de um dataset com a mesma estrutura de pixels/labels ($X$, $y$) vistos em exemplos de aula. Assume-se que a matriz $X$ (matriz de dados) é tal que cada linha armazena uma imagem linearizada, e $y$ é o vetor de labels. 

In [None]:
# TODO: Set the dataset ID
DATASET_OPENML_ID = None
##########################

assert DATASET_OPENML_ID is not None, "DATASET_OPENML_ID is not set"

Utilizaremos uma semente para garantir a reprodutibilidade dos resultados.

In [None]:
SEED = 42

### Base de dados

O código a seguir realiza, se necessário, o download da base de dados. Em seguida, as amostras são carregadas em formato matricial.

In [None]:
# Create dataset directory
import os

if TEST_ON_COLAB:
    # This mounts your Google Drive to the Colab VM.
    from google.colab import drive

    drive.mount("/content/drive")
    cache_dir = f"/content/drive/My Drive/{FOLDERNAME}/dataset/{DATASET_OPENML_ID}"
else:
    cache_dir = f"dataset/{DATASET_OPENML_ID}"

os.makedirs(cache_dir, exist_ok=True)

# Fetching the dataset
from sklearn.datasets import fetch_openml
import numpy as np

X_file_path = f"{cache_dir}/X.npy"
y_file_path = f"{cache_dir}/y.npy"

# Check if the dataset files already exist
if not (os.path.exists(X_file_path) and os.path.exists(y_file_path)):
    # Fetch the dataset where X is the data and y is the target
    X, y = fetch_openml(DATASET_OPENML_ID, as_frame=False, cache=True, return_X_y=True)

    # Save the dataset as numpy arrays
    np.save(X_file_path, X.astype(np.float32))
    np.save(y_file_path, y)
    print(f"{DATASET_OPENML_ID} dataset downloaded and saved successfully to {cache_dir}.")
else:
    X = np.load(X_file_path, allow_pickle=True)
    y = np.load(y_file_path, allow_pickle=True)
    print(f"{DATASET_OPENML_ID} dataset already exists in {cache_dir}. Skipping download.")

# Cast to 32-bits float number
X = X.astype(np.float32)

print(X.shape)
print(X.min(), X.max(), X.dtype)

A seguir, são exibidas algumas amostras do dataset. Ajuste a variável `image_shape` de acordo com as dimensões das amostras da sua base de dados.

In [None]:
# TODO: Set the image shape. ##################
image_shape = None # e.g. (28, 28), (32, 32, 3)
###############################################

assert image_shape is not None, "image_shape is not set"
assert len(image_shape) == 2 or len(image_shape) == 3, "image_shape should be a tuple of 2 or 3 elements"

import matplotlib.pyplot as plt

# Visualize some examples from the dataset.
# We show a few examples of training images from each class.
classes = [int(class_id) for class_id in np.unique(y)]
num_classes = len(classes)
samples_per_class = 7
for cls in classes:
    idxs = np.flatnonzero(y == str(cls))
    idxs = np.random.choice(idxs, samples_per_class, replace=False)
    for i, idx in enumerate(idxs):
        plt_idx = i * num_classes + cls + 1
        plt.subplot(samples_per_class, num_classes, plt_idx)
        plt.imshow(X[idx].reshape(image_shape).astype('uint8'), cmap='gray')
        plt.axis('off')
        if i == 0:
            plt.title(cls)
plt.show()

**1. (2,5 pontos)** Agora, você precisa particionar o conjunto de dados em dois subconjuntos: (i) treino + validação (train+val) e (ii) teste (test).

Utilize a função `train_test_split` do módulo `sklearn.model_selection` para dividir o conjunto de dados (`X` e `y`): 90% dos dados para o subconjunto train+val e 10% para o subconjunto test.

In [None]:
from sklearn.model_selection import train_test_split

# Convert labels from string to integer
y = y.astype(int)

# Set the random seed for reproducibility
np.random.seed(SEED)

# TODO: Split the dataset into train and test sets
X_train_val, X_test, y_train_val, y_test = None
##################################################

assert X_train_val is not None, "X_train_val is not set"
assert y_train_val is not None, "y_train_val is not set"
assert X_test is not None, "X_test is not set"
assert y_test is not None, "y_test is not set"

print(f"Train+val set size: {len(X_train_val)}")
print(f"Test set size: {len(X_test)}")

**2. (2,5 pontos)** Agora, você precisa separar o conjunto de treino (train) do conjunto de validação (val).

Utilize novamente a função `train_test_split` do módulo `sklearn.model_selection` para dividir o conjunto de dados (`X_train_val` e `y_train_val`): 90% dos dados para o subconjunto train e 10% para o subconjunto val.

In [None]:
# TODO: Split further the train+val dataset into train and validation sets
X_train, X_val, y_train, y_val = None
##########################################################################

assert X_val is not None, "X_val is not set"
assert y_val is not None, "y_val is not set"

print(f"Train set size: {len(X_train)}")
print(f"Val set size: {len(X_val)}")

### Implementação em Pytorch

#### Modelo de rede neural convolucional

O model adotado nesta atividade tem como base uma rede residual de 18 camadas (ResNet18):

```
@InProceedings{He_2016_CVPR,
author = {He, Kaiming and Zhang, Xiangyu and Ren, Shaoqing and Sun, Jian},
title = {Deep Residual Learning for Image Recognition},
booktitle = {Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition (CVPR)},
month = {June},
year = {2016}
}
```

A última camada (totalmente conectada, `fc`) deve ser ajustada de acordo com o número de saídas esperadas do modelo (`n_outputs`). Além disso, é possível ajustar se o modelo será treinado do zero ou se será feito o fine-tuning de um modelo pré-treinado na Imagenet. O treinamento pode ser realizado com regularização por dropout se atribuído um valor maior do que zero para o `dropout_rate`. Pode ser aplicado um ajuste fino em todas as camadas ou apenas na última camada (neste caso, `freeze=True`).

In [None]:
from torchvision import models


class Resnet18(torch.nn.Module):
    def __init__(self, n_output, pretrained=False, dropout_rate=0.5, freeze=False):
        super(Resnet18, self).__init__()

        # Load ResNet18 model
        kwargs = {}
        if pretrained:
            kwargs["weights"] = models.ResNet18_Weights.IMAGENET1K_V1

        self.backbone = models.resnet18(**kwargs)

        # Freeze the parameters of the backbone
        if freeze:
            for param in self.backbone.parameters():
                param.requires_grad = False

        # Replace the final fully connected layer with dropout + new fully connected layer
        num_features = self.backbone.fc.in_features
        self.backbone.fc = torch.nn.Sequential(
            torch.nn.Dropout(p=dropout_rate),  # Dropout layer
            torch.nn.Linear(num_features, n_output),  # New FC layer with n_output classes
        )

    def forward(self, x):
        """
        Forward pass through the model.

        Args:
            x: Input data.

        Returns:
            logits: Raw predictions from the model.
        """
        x = self.backbone(x)  # Forward pass through the backbone to get features
        return x

    def transforms(self):
        return models.ResNet18_Weights.IMAGENET1K_V1.transforms()


# Define a the model
model = Resnet18(n_output=num_classes, pretrained=True, dropout_rate=0.0, freeze=False)
model.to(device)

A célula a seguir mostra a implementação da classe Dataset para gerenciamento dos dados.

**Importante!**

O código assume que as imagens linearizadas seguem formato $N \times H \times W \times C$, onde $N$ é o número de amostras, $H$ é a altura e $W$ é a largura, e $C$ é o número de canais.

In [None]:
from torchvision.datasets import VisionDataset

class Dataset(VisionDataset):
    def __init__(self, X, y, image_shape, transforms=None):
        super(Dataset, self).__init__("", transforms=transforms)

        # Determine height, width, and number of channels
        H, W = image_shape[:2] # height, width
        C = 1 if len(image_shape) == 2 else image_shape[2] # channels

        # Reshape the data to (N, H, W, C)
        self.X = X.reshape(-1, H, W, C)

        # Transpose the data to (N, C, H, W) format
        self.X = np.transpose(self.X, (0, 3, 1, 2))

        # Replicate first channel to have 3 channels (enable pretrained models)
        if C == 1:
            self.X = np.repeat(self.X, 3, axis=1)

        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        image = self.X[index]
        label = self.y[index]
        if self.transform is not None:
            image = self.transform(label)
        return image, label

# Create the datasets
datasets = {
    "train": Dataset(X_train, y_train, image_shape, model.transforms()),
    "val": Dataset(X_val, y_val, image_shape, model.transforms()),
    "test": Dataset(X_test, y_test, image_shape, model.transforms())
}

# Statiscs of the dataset
print(f"Training dataset shape: {datasets['train'].X.shape}")
print(f"Validation dataset shape: {datasets['val'].X.shape}")
print(f"Test dataset shape: {datasets['test'].X.shape}")

O carregamento dos dados em batch é implementado de acordo com objetos da classe `DataLoader`:

In [None]:
from torch.utils.data import DataLoader

batch_size = 32

assert batch_size is not None, "batch_size is not set"

dataloaders = {
    "train": DataLoader(datasets["train"], batch_size=batch_size, shuffle=True),
    "val": DataLoader(datasets["val"], batch_size=batch_size, shuffle=False),
    "test": DataLoader(datasets["test"], batch_size=batch_size, shuffle=False)
}

### Treinamento do modelo

Antes de realizar o treinamento, configuramos a biblioteca `pytorch` para garantir replicabilidade dos experimentos:

In [None]:
import torch

# Set the random seed for reproducibility
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

A seguir, tem-se a implementação do treinamento do modelo.

In [None]:
import time

def train(model, dataloaders, learning_rate=0.001, weight_decay=0.0, num_epochs=10, checkpoint="model.pt"):
    """
    Train the model.

    Args:
        model: The model to train.
        dataloaders: The dataloaders.
        learning_rate: The learning rate.
        weight_decay: The weight decay.
        num_epochs: The number of epochs.
        checkpoint: The path to the checkpoint.
    """

    # Cross-entropy loss
    criterion = torch.nn.CrossEntropyLoss()

    # Optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # Initialize the best (validation) accuracy
    best_acc = -1
    best_epoch = -1

    # Training loss and validation accuracy history
    train_loss_history = []
    val_acc_history = []

    # Initialize the time
    since = time.time()

    # Training loop
    for epoch in range(1, num_epochs + 1):
        print(f"Epoch {epoch}/{num_epochs}")
        print("-" * 10)

        # Each epoch has a training, validation, and test phase
        for phase in ["train", "val"]:
            if phase == "train":
                model.train()  # Set model to training mode
            else:
                model.eval()  # Set model to evaluate mode

            # Reset loss and counter of correct predictions
            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data).item()

            num_samples = len(datasets[phase])
            epoch_loss = running_loss / num_samples
            epoch_acc = running_corrects / num_samples

            # Update history
            if phase == "train":
                train_loss_history.append(epoch_loss)
            if phase == "val":
                val_acc_history.append(epoch_acc)

            # Calculate loss and accuracy
            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

            # Save the (current) best model (based on the validation accuracy)
            if phase == "val" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_epoch = epoch
                torch.save(model.state_dict(), checkpoint)

    time_elapsed = time.time() - since
    print(f"Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")
    print(f"Best validation accuracy: {best_acc:.4f} at epoch {best_epoch}")

    return train_loss_history, val_acc_history, best_epoch

# Train the model for 10 epochs
train_loss_history, val_acc_history, best_epoch = train(model, dataloaders, learning_rate=0.001, num_epochs=10)

O treinamento produziu as seguintes curvas de aprendizado:

In [None]:
# Plot training curve
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots()

# Training loss curve
epochs = range(1, len(train_loss_history) + 1)
ax1.plot(epochs, train_loss_history, label='Training Loss', color='tab:red')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss', color='tab:red')
ax1.tick_params(axis='y', labelcolor='tab:red')
ax1.set_xticks(epochs)

# Validation accuracy curve
ax2 = ax1.twinx()  # instantiate a second Axes that shares the same x-axis
ax2.plot(epochs, val_acc_history, label='Validation Accuracy', color='tab:blue')
ax2.set_ylabel('Accuracy', color='tab:blue')
ax2.tick_params(axis='y', labelcolor='tab:blue')

# Highlight the best epoch
plt.vlines(best_epoch, 0, 1, colors='k', linestyles='dashed', label='Best Epoch', color='gray')

# Additional information
# Legend
fig.tight_layout()
fig.legend(loc='upper center', bbox_to_anchor=(0.5, 1.15), ncol=3)

plt.title("Training Curve")

plt.show()

### Teste

A partição de teste é utilizada para avaliar o modelo treinado:

In [None]:
def test(checkpoint="model.pt"):
    """
    Test the model.

    Args:
        checkpoint: The path to the model checkpoint
    """
    params = torch.load(checkpoint, map_location=device)
    model.load_state_dict(params)
    model.eval()

    pred_labels = []
    true_labels = []

    # Iterate over data
    for inputs, labels in dataloaders["test"]:
        inputs = inputs.to(device)
        outputs = model(inputs)
        pred_labels_batch = torch.argmax(outputs, dim=1)
        pred_labels.append(pred_labels_batch)
        true_labels.append(labels)

    pred_labels = torch.cat(pred_labels, dim=0).cpu().numpy()
    true_labels = torch.cat(true_labels, dim=0).numpy()

    return pred_labels, true_labels

# Test the model
pred_labels, true_labels = test()

Os resultados obtidos são exibidos em formato de matriz de confusão e métricas de avaliação (relatório de classificação):

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay

def print_classification_report(true_labels, pred_labels, num_classes):
    """
    Print the classification report.
    """
    # Compute classification report
    class_report = classification_report(true_labels, pred_labels, target_names=[str(class_id) for class_id in range(num_classes)])
    print("\nClassification Report:")
    print(class_report)


def plot_confusion_matrix(true_labels, pred_labels, num_classes):
    """
    Plot the confusion matrix.
    """
    # Compute confusion matrix
    cm = confusion_matrix(true_labels, pred_labels, labels=[class_id for class_id in range(num_classes)])
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[class_id for class_id in range(num_classes)])
    disp.plot()
    # plt.show()


print_classification_report(true_labels, pred_labels, num_classes)
plot_confusion_matrix(true_labels, pred_labels, num_classes)

**3. (5 pontos)** Investigue a influência da regularização por dropout no desempenho do modelo. Fixando os demais parâmetros do modelo, varie o valor de dropout_rate entre 0.1 e 0.5 e avalie o desempenho do modelo. Exiba um gráfico da acurácia em função do dropout_rate.

In [None]:
dropout_rate_vals = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]
accuracies = []
for dropout_rate in dropout_rate_vals:
    # TODO: Treino/teste
    pass
    ####################

assert len(accuracies) > 0

# Plot training curve
import matplotlib.pyplot as plt

plt.plot(dropout_rate_vals, accuracies, marker='o')
plt.xlabel('Dropout Rate')
plt.ylabel('Accuracy')
plt.title('Dropout Rate vs. Accuracy')
plt.show()

**4. (5 pontos)** Neste exercício, você investigará o impacto da técnica de aumento de dados na acurácia do modelo. Para isso, foi fornecida a classe `DatasetAugmented` para realização de aumento de dados online. Você deverá refazer os passos de treino e teste do modelo, mas utilizando a classe `DatasetAugmented` para carregar os dados. Compare os resultados obtidos com e sem aumento de dados.

In [None]:
%pip install imgaug

In [None]:
import imgaug.augmenters as iaa

class DatasetAugmented(Dataset):
    def __init__(self, X, y, image_shape, transforms=None):
        super(DatasetAugmented, self).__init__(X, y, image_shape, transforms)

        # Define the augmentation pipeline
        self.augmentation = iaa.Sequential([
            iaa.Fliplr(0.5),  # horizontal flips
            iaa.Sometimes(0.5, iaa.GaussianBlur(sigma=(0, 0.5))),  # Gaussian blur
            iaa.ContrastNormalization((0.75, 1.5)),  # Strengthen or weaken the contrast in each image.
            iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05 * 255), per_channel=0.5),  # Add gaussian noise.
            iaa.Affine(
                scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},  # scale images to 80-120% of their size
                translate_percent={"x": (-0.2, 0.2), "y": (-0.2, 0.2)},  # translate by -20 to +20 percent (per axis)
                rotate=(-25, 25),  # rotate by -25 to +25 degrees
                shear=(-8, 8)  # shear by -8 to +8 degrees
            )
        ])

    def __getitem__(self, index):
        image, label = super(DatasetAugmented, self).__getitem__(index)
        image = self.augmentation(image=image)
        return image, label

# TODO: Augmented dataset ###
datasets_augmented = None
dataloaders_augmented = None
#############################

# TODO: Train the model with augmented data
pass
###########################################

# Test the model ##############
pred_labels, true_labels = None
###############################

assert datasets_augmented is not None, "datasets_augmented is not set"
assert dataloaders_augmented is not None, "dataloaders_augmented is not set"
assert pred_labels is not None, "pred_labels is not set"
assert true_labels is not None, "true_labels is not set"

# Results
print_classification_report(true_labels, pred_labels, num_classes)
plot_confusion_matrix(true_labels, pred_labels, num_classes)

**5. (10 pontos)** Neste exercício final, você trabalhará pará obter o melhor modelo variando três hiperparâmetros: taxa de aprendizado, taxa de dropout e peso (coeficience) de regularização L2 (`weight_decay`). Você deverá treinar os diferentes modelos e utilizar, no teste, aquele que resultar na maior acurácia no conjunto de **VALIDAÇÃO**.

Dica: salve um *checkpoint* para cada configuração.

In [None]:
best_checkpoint = None
best_val_acc = -1

learning_rate_vals = [0.001, 0.0001, 0.00001]
dropout_rate_vals = [0.0, 0.5]
weight_decay_vals = [0.0, 0.001, 0.0001]

for learning_rate in learning_rate_vals:
    for weight_decay in weight_decay_vals:
        for dropout_rate in dropout_rate_vals:
            # TODO: Train and test the models in different setups
            pass
            ####################################################


# Test the best model and report results
assert best_checkpoint is not None

print(f"Best checkpoint={best_checkpoint} (val. acc.={best_val_acc:.3f})")

pred_labels, true_labels = test(checkpoint=best_checkpoint)
print_classification_report(true_labels, pred_labels, num_classes)
plot_confusion_matrix(true_labels, pred_labels, num_classes)