## Convolutional Neural Networks
CNNs are great for problems that require classification (and sometimes regression) from visual data. CNNs are useful because *they* find the features themselves as opposed to us determining the features to use. We just give the CNN a label and data that is somewhat visual and let it go.
The general flow for this notebook will be:
1. Explore tools from PyTorch that allow for the import/transformation of different types of visual data
2. Importing data image data from a clothing database
3. Design a Multi-Class NN model that will be used to classify the different types of clothing in the images
4. Examine the results and then create a CNN model and compare the performance to the MCNN
5. Save the weights from model with the best results so that it can be used elsewhere

In [None]:
# Lets start by importing everything we will need

import torch
from torch import nn

import torchvision
from torchvision import datasets # contains pre-built datasets that can be used to test models
from torchvision.transforms import ToTensor # contains useful functions that can transfrom common images formats to tensors

import matplotlib.pyplot as plt
print(f'PyTorch Version: {torch.__version__}, Torchvision Version: {torchvision.__version__}')

In [None]:
# Lets get our training and testing data. Turns out torchvision has many built-in datasets already.
# We will be using the FashionMNIST dataset for this classification problem. It's basically the fashion
# version of the original MNIST dataset that used number. There are 10 classes, but they're clothes, not numbers.

DATA_DIR = "../../data/"

# Many of the datasets in this module have the same arguments.
train_data = datasets.FashionMNIST(
    DATA_DIR,
    train=True,
    transform=ToTensor(),
    target_transform=None,
    download=True
)

test_data = datasets.FashionMNIST(
    DATA_DIR,
    train=False,
    transform=ToTensor(),
    target_transform=None,
    download=True
)

In [None]:
# With the training data donwloaded, lets explore
print(type(train_data))

# Lets get the first image. label combo from the test data
image, label = test_data[0]
print(f'Label Type: {type(label)}, Label: {label}')
print(f'Image Type: {type(image)}, Image Shape: {image.shape}')



#### So it looks like the image is a 3D tensor and the label is just an integer giving the class. The 1 in the first dimension of the image shows that it's just a greyscale image. This implies that the values in the 28x28 2D tensor just represent the intensity of the the pixel. If this were a color image, there would be 3 "channels" representing the intensity of RGB respectively. 

In [None]:
# The dataset object has useful properties that allow you to view different aspects of the dataset.
print(f'Training size: {len(train_data)}, Test size: {len(test_data)}, Test/Train Ratio: {len(test_data)/len(train_data):.2f}')
print(train_data.classes)

In [None]:
# Now lets actually look at some of the images/labels in the test dataset
fig = plt.figure(figsize=(9,9))
plt_rows = 4
plt_cols = 4
torch.manual_seed(42)
for sub in range(1, plt_rows * plt_cols + 1):
    index = torch.randint(0, len(train_data) - 1, size=[1]).item() # Get a random number (in range of the train data)
    img, label = train_data[index] # Pull a random test sample and it's label from the training data
    fig.add_subplot(plt_rows, plt_cols, sub)
    plt.imshow(img.squeeze(), cmap='gray') # Show the image in the subplot. Squeezing out the first dimension, not needed for 2D greyscale
    plt.title(train_data.classes[label])
    plt.axis(False)

#### As seen above, we have quite a few different images corresponding to the different classes in the dataset! The next step is create a Dataloader object for input into the model we plan to create. This seems like an arbitrary step, why do we need to transform the data into some special object as opposed to just using it as it exists in tensor form? The main reason is due to __Batching__. In most real world scenarios, datasets will be huge. So far, we've been doing a form of training called __Batch Gradient Descent__ where *backprop is done on the entire training set for each epoch*. This is computationally possible with small datasets, but with larger datasets, this would be computationally expensive (thus actually expensive in time and cost). The method we'll be adopting is the use of mini-batching; grouping a very small subset of the entire training set to be used for one backprop step. Instead of the loss from the WHOLE dataset (Batch) or each individual sample (Stochastic) being used to change the weights in the model, the *average* loss of the mini-batch is used. This is effective in reducing the computational load and to prevent issues like overfitting. Moving our data to a Dataloader object allows for us to easily change the batch size and to allow for shuffling the data around for each epoch (epoch # = mini-batch #).

In [None]:
# Create the dataloaders obj for both training and test
from torch.utils.data import DataLoader
BATCH_SIZE = 32
train_dloader = DataLoader(
    dataset=train_data,
    batch_size=BATCH_SIZE,
    shuffle=True
)
test_dloader = DataLoader(
    dataset=test_data,
    batch_size=BATCH_SIZE,
    shuffle=False
)

# Now lets inpect the objects. Much of the logic to batch and match labels to input data is done for us in the Dataloader obj
print(f'Type: {type(train_dloader)}')
train_batch_img, train_batch_labels = next(iter(train_dloader))
print(f'Batch Img: {train_batch_img.shape}, Batch Labels: {train_batch_labels.shape}') # So the dloader is basically a list of all the batches and their corressponding labels

### Model Creation
With all the data import and preparation done, we can now create a baseline (very simple) model. Will start with a basic 2 layer network, one hidden layer, with a flattening layer as the input layer.

In [None]:
INPUT_DIMS = train_data[0][0].shape[1]*train_data[0][0].shape[2] # our images are 28*28 and will be flattened out before hitting the NN
HIDDEN_UNITS = 10
OUTPUT_DIMS = len(train_data.classes)

class FashionMNISTModel0(nn.Module):
    def __init__(self,
                 in_shape: int,
                 out_shape: int,
                 hidden_units: int) -> None:
        super().__init__()
        self.network = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=in_shape, out_features=hidden_units, bias=True),
            nn.Linear(in_features=hidden_units, out_features=out_shape, bias=True)
        )
    def forward(self, x):
        return self.network(x)

fmnist_model_0 = FashionMNISTModel0(in_shape=INPUT_DIMS,
                               out_shape=OUTPUT_DIMS,
                               hidden_units=HIDDEN_UNITS)
print(fmnist_model_0)



In [None]:
# Need to once again define the optimizer/loss function combo
optimizer = torch.optim.SGD(params=fmnist_model_0.parameters(), lr=0.1)
loss_fn = nn.CrossEntropyLoss() # 'mean' reduction takes all the loss values from the batch and averages them to get the loss

In [None]:
# Now it's training loop time. Since we're using a slightly different model architecture with mini-batches,
# will write out all steps. Mini-batches require a nested loop for each epoch.
from timeit import default_timer as timer # boilerplate timer functionality
from tqdm.auto import tqdm # text-based progress bar
from torchmetrics import Accuracy

# Now lets create a quick little function that gives the run time of the loop
total_time = lambda start_time, stop_time: stop_time - start_time

torch.manual_seed(42)
EPOCHS = 2
train_size = len(train_dloader.dataset) # number of samples in the train dataset
test_size = len(test_dloader.dataset) # number of samples in test dataset
num_batches_train = train_size/BATCH_SIZE
num_batches_test = test_size/BATCH_SIZE
acc = Accuracy(task='multiclass', num_classes=len(test_data.classes))

train_time_mnn_start = timer()
for epoch in tqdm(range(EPOCHS)):
    print(f'Epoch: {epoch}\n-----------')

    # Training Steps
    train_loss = 0 # need to reset loss every epoch
    for batch, (X_train, y_train) in enumerate(train_dloader): # each batch has 32 data/labels, create object -> (batch, (X_train, y_train))
        fmnist_model_0.train()
        
        y_logits = fmnist_model_0(X_train) # Like before, need to get model's predictions (in logits)
        loss = loss_fn(y_logits, y_train) # calculate loss for this batch
        train_loss += loss # add loss from this batch (mean loss of 32 samples) to total loss for the epoch (sum of all batch loss)

        # backprop step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # We want to see some updates within an epoch
        print(f'Batches processed: {batch + 1}/{int(num_batches_train)}, Samples processed: {(batch + 1) * BATCH_SIZE}/{train_size}', end='\r')
    
    # Now we want to find the AVERAGE loss of all the batches
    train_loss /= num_batches_train

    # Test Steps; mimick the training steps without backprop.
    # Only care about the epoch level values for test, no intermediate
    # updates necessary.
    test_loss = 0
    test_acc = 0

    fmnist_model_0.eval()
    with torch.inference_mode():
        for X_test, y_test in test_dloader:
            test_logits = fmnist_model_0(X_test)
            test_loss += loss_fn(test_logits, y_test) # Note, no need for batch loss variable since we don't care about per batch backprop when testing
            test_acc += acc(test_logits, y_test) # acc expects input tensors denoting labels, need to convert from logits

        # Get AVERAGE loss and accuracy off all test batches
        test_loss /= num_batches_test 
        test_acc /= num_batches_test 

        # print out test loss and test accuracy
    print('\n-----------')
    print(f'Mean Test Loss: {test_loss:.4f}')
    print(f'Mean Test Accuracy: {test_acc * 100:.2f}%')
    print('-----------\n')
train_time_mnn_end = timer()
print(f'Total time to train: {total_time(train_time_mnn_start, train_time_mnn_end):.2f}s')

#### Seems like the baseline model is doing a decent job, but lets see if we can make it any better by adding non-linearity to the model architecture

In [None]:
# Since the train/test loop has gotten more complex, lets turn the steps into functions
def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               accuracy_fn):

    # Training Steps
    loss, acc = 0, 0 # need to reset loss every epoch
    for batch, (X, y) in enumerate(data_loader): # each batch has 32 data/labels, create object -> (batch, (X, y))
        model.train()
        y_pred = model(X) # Like before, need to get model's predictions (in logits)
        loss = loss_fn(y_pred, y) # calculate loss for this batch
        loss += loss # add loss from this batch (mean loss of 32 samples) to total loss for the epoch (sum of all batch loss)
        acc += accuracy_fn(y_pred.argmax(dim=1), y)

        # backprop step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # We want to see some updates within an epoch
        print(f'Batches processed: {batch + 1}/{len(data_loader)}, Samples processed: {(batch + 1) * data_loader.batch_size}/{len(data_loader.dataset)}', end='\r')
    
    # Now we want to find the AVERAGE loss and accuracy of all the batches
    loss /= len(data_loader)
    acc /= len(data_loader)
    print('\n-----------')
    print(f'Mean Train Loss: {loss:.4f}, Mean Train Accuracy: {acc * 100:.2f}%')

def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              accuracy_fn):

    # Test Steps
    model.eval()
    loss, acc = 0, 0 # need to reset loss every epoch
    with torch.inference_mode():
        for X, y in data_loader: # each batch has 32 data/labels, create object -> (batch, (X_train, y_train))
            y_pred = model(X) # Like before, need to get model's predictions (in logits)
            loss = loss_fn(y_pred, y) # calculate loss for this batch
            loss += loss # add loss from this batch (mean loss of 32 samples) to total loss for the epoch (sum of all batch loss)
            acc += accuracy_fn(y_pred.argmax(dim=1), y)

        # Now we want to find the AVERAGE loss and accuracy of all the batches
        loss /= len(data_loader)
        acc /= len(data_loader)
    print(f'Mean Test Loss: {loss:.4f}, Mean Test Accuracy: {acc * 100:.2f}%')
    print('-----------\n')

In [None]:
# Lets try out new functions out in an identical training loop as before
# Now it's training loop time. Since we're using a slightly different model architecture with mini-batches,
# will write out all steps. Mini-batches require a nested loop for each epoch.

from helper_functions import model_accuracy
torch.manual_seed(42)
EPOCHS = 2
train_time_mnn_start = timer()
for epoch in tqdm(range(EPOCHS)):
    print(f'Epoch: {epoch}\n-----------')
    train_step(
        fmnist_model_0,
        train_dloader,
        loss_fn,
        optimizer,
        model_accuracy
    )
    test_step(
        fmnist_model_0,
        test_dloader,
        loss_fn,
        model_accuracy
    )
train_time_mnn_end = timer()
print(f'Total time to train: {total_time(train_time_mnn_start, train_time_mnn_end):.2f}s')

In [None]:
# With this simplified training/testing loop code, lets define a new model with non-linearities and try to test it again
INPUT_DIMS = train_data[0][0].shape[1]*train_data[0][0].shape[2] # our images are 28*28 and will be flattened out before hitting the NN
HIDDEN_UNITS = 10
OUTPUT_DIMS = len(train_data.classes)

class FashionMNISTModel1(nn.Module):
    def __init__(self,
                 in_shape: int,
                 out_shape: int,
                 hidden_units: int) -> None:
        super().__init__()
        self.network = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=in_shape, out_features=hidden_units, bias=True),
            nn.ReLU(),
            nn.Linear(in_features=hidden_units, out_features=out_shape, bias=True),
            nn.ReLU()
        )
    def forward(self, x):
        return self.network(x)

fmnist_model_1 = FashionMNISTModel1(in_shape=INPUT_DIMS,
                               out_shape=OUTPUT_DIMS,
                               hidden_units=HIDDEN_UNITS)
print(fmnist_model_1)

In [None]:
torch.manual_seed(42)
optimizer_v1 = torch.optim.SGD(params=fmnist_model_1.parameters(), lr=0.1)
loss_fn = nn.CrossEntropyLoss() # 'mean' reduction takes all the loss values from the batch and averages them to get the loss

EPOCHS = 2
train_time_mnn_start = timer()
for epoch in tqdm(range(EPOCHS)):
    print(f'Epoch: {epoch}\n-----------')
    train_step(
        fmnist_model_1,
        train_dloader,
        loss_fn,
        optimizer_v1,
        model_accuracy
    )
    test_step(
        fmnist_model_1,
        test_dloader,
        loss_fn,
        model_accuracy
    )
train_time_mnn_end = timer()
print(f'Total time to train: {total_time(train_time_mnn_start, train_time_mnn_end):.2f}s')