In [None]:
# pytorch
import torch
from torch import nn
from torch.utils.data import DataLoader

# torchvision
import torchvision
from torchvision import datasets
from torchvision import transforms
from torchvision.transforms import ToTensor

# machine learning
import pandas as pd
import numpy as np

# helper functions
from pathlib import Path
import requests

# visualization
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

# device
device = "cpu"

# print(torch.__version__)
# print(np.__version__)
# print(torchvision.__version__)

## PyTorch Computer Vision

#### 0. Computer Vision libraries in PyTorch
* `torchvision.datasets` - get datasets and data loading functions for computer vision
* `torchvision.models` - get pretrained computer vision models that you can leverage for your own problems
* `torchvision.transforms` - functions for manipulating your vision data (images) to be suitable for use with an ML model
* `torch.utils.data.Dataset` - Base dataset class for PyTorch
* `torch.utils.data.Datalodaer` - Creates a Python iterable over a dataset

#### 1. Getting a Dataset

* FashionMNIST from `torchvision.datasets`
* https://github.com/zalandoresearch/fashion-mnist

#### 1.1 Setup Training Data

In [None]:
train_data = datasets.FashionMNIST(
    root="data", # where to download data?
    train=True, # do we want the training dataset?
    download=True, # do we want to download yes/no?
    transform=torchvision.transforms.ToTensor(), # how do we want to transform the data?
    target_transform=None # how do we want to transform the labels/targets?
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
    target_transform=None
)

#### 1.2 Brief Overview of Data

In [None]:
# check out dataset
train_data_len, test_data_len = len(train_data), len(test_data)
# print(train_data_len, test_data_len)

In [None]:
# see first training example
image, label = train_data[0]
# print(image, label)
# print(type(image), type(label))

image_size, label_size = image.size(), label
image_dtype, label_dtype = image.dtype, type(label)
print(image_size, label_size)
print(image_dtype, label_dtype)

In [None]:
class_names = train_data.classes
class_names

In [None]:
class_to_idx = train_data.class_to_idx
class_to_idx

In [None]:
train_data.targets

In [None]:
train_data.targets.size()

In [None]:
print(f"Image Shape: {image.shape} -> [color_channels, height, width]")
print(f"Image Label: {class_names[label]}")

#### 1.3 Become One with Data

In [None]:
image, label = train_data[0]
# print(f"Image Shape: {image.shape}")
# print(image)
# print(image.squeeze().shape)

"""
plt.title(class_names[label])
plt.imshow(image.squeeze())
"""


In [None]:
"""
plt.imshow(image.squeeze(), cmap="gray")
plt.title(class_names[label])
plt.axis(False)
"""

In [None]:
# plot more images
torch.manual_seed(42)
fig = plt.figure(figsize=(9, 9))
rows, cols = 4, 4
for i in range(1, rows*cols+1):
    random_idx = torch.randint(0, len(train_data), size=[1]).item()
    # print(random_idx)
    img, img_label = train_data[random_idx]
    fig.add_subplot(rows, cols, i)
    plt.imshow(img.squeeze(), cmap="gray")
    plt.title(class_names[img_label])
    plt.axis(False)

#### 1.4 Prepare DataLoader

* DataLoader turns the dataset into a Python iterable
* We want to turn the data into batches (mini-batches)

In [None]:
train_data, test_data

In [None]:
# batch size is a hyperparameter
BATCH_SIZE = 32

# turn datasets into iterables (batches)
# this is a dataloader object
train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              shuffle=True)

test_dataloader = DataLoader(dataset=test_data,
                             batch_size=BATCH_SIZE,
                             shuffle=False)

train_dataloader, test_dataloader

#### 1.4.1 Interacting with DataLoader

In [None]:
# check out what's inside the training dataloader
train_features_batch, train_labels_batch = next(iter(train_dataloader))
train_features_batch.shape, train_labels_batch.shape

"""
32 data per batch.
1 color type (black or white) per data.
28 x 28 pixels per data.
"""

In [None]:
# show sample
# torch.manual_seed(42)
random_idx = torch.randint(0, len(train_features_batch), size=[1]).item()
data_img, data_label = train_features_batch[random_idx], train_labels_batch[random_idx]

# plot image
"""
plt.imshow(data_img.squeeze(), cmap="gray")
plt.title(class_names[data_label])
plt.axis(False)
print(f"Image Size: {data_img.shape}")
print(f"Label: {data_label}, Label Size: {data_label.shape}")
"""

# now we have data that is seperated into different batches
# time to build model

#### 1.5 Build a Baseline Model

* Start with baseline model
* Improve model through different experiments
* Start simply and add complexity

In [None]:
# create a flatten layer
flatten_model = nn.Flatten()

# get single sample flatten
x = train_features_batch[0]
# x.shape # torch.Size([1, 28, 28])

# flatten sample data
output = flatten_model(x)
print(f"Shape before flattening: {x.shape} -> [color_channels, height, width]")
print(f"Shape after flattening: {output.shape} -> [color_channels, height*width]")

In [None]:
# print(x)
# print(output)
# output.squeeze()

#### 1.5.1 Model for Linear Layers

* A linear layer model can only handle 1-dimensional input data
* Original Data -> [1, 28, 28]
* Flattened Data -> [1, 784]

In [None]:
# build base model
class FashionMNISTModelV0(nn.Module):
    def __init__(self,
                 input_shape: int,
                 hidden_units: int,
                 output_shape: int):
        
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape,
                      out_features=hidden_units),
            nn.Linear(in_features=hidden_units,
                      out_features=output_shape)
        )

    def forward(self, x):
        return self.layer_stack(x)

In [None]:
torch.manual_seed(42)

# setup model with input parameters
model_0 = FashionMNISTModelV0(input_shape=784, # 28x28
                              hidden_units=10, # hidden layer nodes
                              output_shape=len(class_names) # one for every class
                              ).to(device)

model_0

In [None]:
# check if model works
dummy_x = torch.rand([1, 1, 28, 28])
model_0(dummy_x)
print(model_0(dummy_x).shape)

#### 1.5.2 Setup Loss, Optimizer, and Evaluaiton Metrics

* Loss function - since we're working with multi-class data, our loss function will be `nn.CrossEntropyLoss()`
* Optimizer - SGD, `torch.optim.SGD()`
* Evaluation Metric - classification problem w/ balanced dataset, we'll use `accuracy`

In [None]:
# import accuracy functions
if Path("helper_functions.py").is_file():
    print("helper_functions.py already exists, skipping download...")
else:
    print("Downloading helper_functions.py")
    request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/helper_functions.py")
    with open("helper_functions.py", "wb") as f:
        f.write(request.content)

In [None]:
from helper_functions import accuracy_fn
from timeit import default_timer as timer

# setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(),
                            lr=0.1)

# setup timer function
def print_train_time(start: float,
                     end: float,
                     device: torch.device=None):
    """
    Prints difference between start adn end time.
    """

    total_time = end - start
    print(f"Train time on {device} : {total_time: .3f} seconds")
    return total_time

start_time = timer()
for i in range(100):
    i += i
print(i)
end_time = timer()
print_train_time(start=start_time, end=end_time, device=device)

#### 1.5.3 Train and Test (linear model)

1. Loop through epochs.
2. Loop through training batches, perform training steps, calculate the train loss *per batch*.
3. Loop through testing batches, perform testing steps, calculate test loss *per batch*.
4. Print out what's happening.
5. Time it all.

In [None]:
torch.manual_seed(42)
train_time_start = timer()

epochs = 3 # small for faster training time

for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n------")

    train_loss = 0
    for batch, (X, y) in enumerate(train_dataloader):
        model_0.train()
        y_pred = model_0.forward(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss # for every batch accumulate train loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 400 == 0:
            print(f"Looked at {batch * len(X)}/{len(train_dataloader.dataset)} samples.")
    
    # divide total train loss by length of train dataloader
    train_loss /= len(train_dataloader)

    # testing
    test_loss, test_acc = 0, 0
    model_0.eval()
    with torch.inference_mode():
        for X, y in test_dataloader:
            test_pred = model_0.forward(X)
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y, y_pred=test_pred.argmax(dim=1))

        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    print(f"\nTrain Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}%")

train_time_end = timer()
total_train_time_model_0 = print_train_time(start=train_time_start,
                                            end=train_time_end,
                                            device=str(next(model_0.parameters()).device))

#### 1.6 Make Predictions & Get Model Results

* want to automate the process of evaluating model (train loop)

In [None]:
torch.manual_seed(42)
def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn):
    """
    Returns a dictionary containing the results of model predicting on data_loader. We can compare the dictionary per model for different models with different hyperparameters.
    """

    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in tqdm(data_loader):
            # make predictions
            y_pred = model.forward(X)

            # accumulate loss & acc per batch
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true=y,
                               y_pred=y_pred.argmax(dim=1))
            
        # scale loss and acc to find average per batch
        loss /= len(data_loader)
        acc /= len(data_loader)

    return {"model_name": model.__class__.__name__, # only works when model is created with class
            "model_loss": loss.item(), 
            "model_acc": acc}

# calculate model_0 results on test dataset
model_0_results = eval_model(model=model_0,
                             data_loader=test_dataloader,
                             loss_fn=loss_fn,
                             accuracy_fn=accuracy_fn)

model_0_results

#### 2.0 Building a Better Model w/ Non-linearity

In [None]:
class FashionMNISTModelV1(nn.Module):
    def __init__(self,
                 input_shape: int,
                 hidden_units: int,
                 output_shape: int):
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape,
                      out_features=hidden_units),
            nn.ReLU(),
            # nn.Linear(in_features=hidden_units,
            #           out_features=hidden_units),
            # nn.ReLU(),
            nn.Linear(in_features=hidden_units,
                      out_features=output_shape),
            nn.ReLU()                 
        )

    def forward(self, x: torch.Tensor):
        return self.layer_stack(x)

In [None]:
torch.manual_seed(42)
model_1 = FashionMNISTModelV1(input_shape=784,
                              hidden_units=10,
                              output_shape=len(class_names)).to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_1.parameters(),
                            lr=0.1)

#### 3.0 Functionizing Train & Test Loop

* training loop - `train_step()`
* testing loop - `test_step()`

#### 3.1 Automize Train Function

In [None]:
def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               accuracy_fn,
               device: torch.device=device):
    """
    Performs a training with model trying to learn on data_loader.
    """

    model.train()
    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(data_loader):
        # put data on target device
        X, y = X.to(device), y.to(device)

        # make predictions
        y_pred = model.forward(X)

        # calculate & accumulate loss (wrongness)
        loss = loss_fn(y_pred, y)
        train_loss += loss 

        # calculate & accumulate acc
        train_acc += accuracy_fn(y_true=y,
                                 y_pred=y_pred.argmax(dim=1)) # change logits -> prediction labels

        # optimize model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # keep track of progress
        if batch % 400 == 0:
            print(f"Looked at {batch * len(X)}/{len(data_loader.dataset)} samples.")
    

    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    print(f"Train loss: {train_loss:.4f} | Train acc: {train_acc:.2f}%")

#### 3.2 Automize Test Function

In [None]:
def test_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn,
               device: torch.device=device):
    """
    Performs a testing loop step with model going over data_loader.
    """

    model.eval()
    test_loss, test_acc = 0, 0

    with torch.inference_mode():
        for X, y in data_loader:
            X, y = X.to(device), y.to(device)

            test_pred = model.forward(X)
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y, y_pred=test_pred.argmax(dim=1))

        test_loss /= len(data_loader)
        test_acc /= len(data_loader)

    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%\n")

#### 3.3 Call Train & Test Functions

In [None]:
torch.manual_seed(42)

train_time_start = timer()

epochs = 3

for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch} \n-----------")
    
    train_step(model=model_1,
               data_loader=train_dataloader,
               loss_fn=loss_fn,
               optimizer=optimizer,
               accuracy_fn=accuracy_fn,
               device=device)
    
    test_step(model=model_1,
              data_loader=test_dataloader,
              loss_fn=loss_fn,
              accuracy_fn=accuracy_fn,
              device=device)

train_time_end = timer()
total_train_time_model_1 = print_train_time(
                        start=train_time_start,
                        end=train_time_end)

In [None]:
model_1_results = eval_model(model=model_1,
                             data_loader=test_dataloader,
                             loss_fn=loss_fn,
                             accuracy_fn=accuracy_fn)

print(model_0_results)
print(model_1_results)