# Computer Vision

# 0. Computer vision libraries
* `torchvision` - base library in PyTorch
* `torchvision.dataset` - datasets and data loading functions
* `torchvision.models` - pretrained computer vision models
* `torchvision.transforms` - functions for manipulating vision data
* `torch.utils.data.Dataset` - base dataset class for PT
* `torch.utils.data.Dataloader`  - create pythorn iterable over a dataset

In [None]:
import torch
from torch import nn

import torchvision
from torchvision import datasets
from torchvision import transforms
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt

# version
print(torch.__version__)
print(torchvision.__version__)

# 1. Dataset

In [None]:
# FashionMNIST
train_data = datasets.FashionMNIST(
    root='data', # where to put data
    train=True, # get training dataset
    download=True,
    transform=ToTensor(),
    target_transform=None
)

test_data = datasets.FashionMNIST(
    root='data', # where to put data
    train=False, # get training dataset
    download=True,
    transform=ToTensor(),
    target_transform=None
)




In [None]:
len(train_data), len(test_data)

In [None]:
# view first training example
image, label = train_data[0]
image, label

In [None]:
class_names = train_data.classes
class_names[0]

In [None]:
class_to_idx = train_data.class_to_idx
class_to_idx

In [None]:
# shapes
print(f"image.shape: {image.shape} -> [color_channels, height, width] ")
print(f"Image label: {class_names[label]}")

# visualizing data

In [None]:
image, label = train_data[0]
print(f"image shape: {image.shape}")
# remove first dim 
plt.imshow(image.squeeze())
plt.title(label)

In [None]:
plt.imshow(image.squeeze(), cmap="gray")
plt.title(label)
plt.axis(False)

In [None]:
# plt more images
torch.manual_seed(42)
fig = plt.figure(figsize=(9,9))
rows, cols = 4, 4
for i in range(1, rows*cols+1):
  random_idx = torch.randint(0, len(train_data), size=[1]).item()
  img, label = train_data[random_idx]
  fig.add_subplot(rows, cols, i)
  plt.imshow(img.squeeze(), cmap="gray")
  plt.title(class_names[label])
  plt.axis(False)

# Data Loaders

Dataloaders turn data into batches and Python interables

1. more computationally efficient - hardware may not be able to store in memory very large data. 
2. Breaks down large data sets into small batches of data (increments of 8 tend to work best)
3. neural network may update gradients more often per epoch

In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 32

train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              shuffle=True)

test_dataloader = DataLoader(dataset=test_data,
                              batch_size=BATCH_SIZE,
                              shuffle=False)

In [None]:
print(f"DataLoaders: {train_dataloader, test_dataloader}")
print(f"Len of train_dataloader: { len(train_dataloader)} batches of {BATCH_SIZE}")
print(f"Len of test_dataloader: { len(test_dataloader)}  batches of {BATCH_SIZE}")

In [None]:
train_features_batch, train_labels_batch = next(iter(train_dataloader))
train_features_batch.shape, train_labels_batch.shape

In [None]:
# torch.manual_seed(42)
random_idx = torch.randint(0, len(train_features_batch), size=[1]).item()
img, label = train_features_batch[random_idx], train_labels_batch[random_idx]
plt.imshow(img.squeeze(), cmap="gray")
plt.title(class_names[label])
plt.axis(False)
print(f"Image size: {img.shape}")
print(f"Label: {label}, label size: {label.shape}")

# Model 0: baseline model

simple model to improve with more experiments

In [None]:
# flat layer
flatten_model = nn.Flatten()

x = train_features_batch[0]

output = flatten_model(x)

print(f"Shape before flattening: {x.shape}") # [color_channels, height, width]
print(f"Shape after flattening: {output.shape}") # [color_channel, height*width]

In [None]:
class FashionMNISTModelV0(nn.Module):
    def __init__(self, 
                 input_shape: int,
                 hidden_units: int,
                 output_shape: int) -> None:
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape, out_features=hidden_units),
            nn.Linear(in_features=hidden_units, out_features=output_shape)
        )

    def forward(self, x):
        return self.layer_stack(x)
    

In [None]:
torch.manual_seed(42)

model_0 = FashionMNISTModelV0(
    input_shape=784, # 28*28 mnist size
    hidden_units=10, # unit in hidden layer
    output_shape=len(class_names) # one per class    
)

model_0

In [None]:
dummy_x = torch.rand([1,1,28,28])
model_0(dummy_x)

In [None]:
model_0.state_dict()

# loss, optimizer and evaluation metrics

* Loss function - `nn.CrossEntropyLoss`
* Optimizer = `torch.optim.SGD()`
* Evaluatiom metic - accuracy for classification

Import some helper functions

In [None]:
import requests
from pathlib import Path
# download helper function from github Learn PyTorch repo

if Path("helper_functions.py").is_file():
    print("helper_functions.py alrady exists, skipping download...")
else:
    print("Downloading helper_functions.py")
    request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/helper_functions.py")
    with open("helper_functions.py", "wb") as f:
        f.write(request.content)

In [None]:
from helper_functions import accuracy_fn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(), lr=0.1)

# Time experiments

In [None]:
from timeit import default_timer as timer

def print_train_time(start: float, end: float, device: torch.device = None):
    """ Print difference in start and end times """
    total_time = end - start
    print(f"Train time on {device}: {total_time:.3f} seconds")
    return total_time

In [None]:
# start_time = timer()
# end_time = timer()
# print_train_time(start=start_time, end=end_time, device="cpu")

# Training model to batches of data

1. Loop epochs
2. loop batches, perform training, calc loss per patch
3. Loop test batches, perform testing steps, calc loss per batch
4. Print results
5. Time all steps

In [None]:
from tqdm.auto import tqdm

torch.manual_seed(42)
train_time_start_on_cpu = timer()

epochs = 3

for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n----")

    train_loss = 0

    for batch, (X, y) in enumerate(train_dataloader):
        model_0.train()
        y_pred = model_0(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss # accumulate train loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 400 == 0:
            print(f"Looked at {batch * len(X)}/{len(train_dataloader.dataset)} samples.")
        
    # divide total trian loss by len of train data
    train_loss /= len(train_dataloader)

    test_loss, test_acc = 0, 0
    model_0.eval()
    with torch.inference_mode():
        for X_test, y_test in test_dataloader:
            test_pred = model_0(X_test)
            test_loss += loss_fn(test_pred, y_test)
            test_acc += accuracy_fn(y_true=y_test, y_pred=test_pred.argmax(dim=1))
        
        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    print(f"Train loss: {train_loss:.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}")

train_time_end_on_cpu = timer()
total_train_time_model_0 = print_train_time(start=train_time_start_on_cpu,
                                            end=train_time_end_on_cpu,
                                            device=str(next(model_0.parameters()).device))

Train loss: 0.4130 | Test loss: 0.4616 | Test Acc: 84.1354  
Train time on cpu: 24.643 seconds

# Predictions and model 0 results

In [None]:
torch.manual_seed(42)

def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn):
    """
      Return a dictionary containing the results of a model prediction on data_loader
    """
    loss, acc = 0, 0

    model.eval()
    with torch.inference_mode():
        for X, y in tqdm(data_loader):
            y_pred = model(X)
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))

        # scale loss and acc to find averages
        loss /= len(data_loader)
        acc /= len(data_loader)

    return {"model_name": model.__class__.__name__, # only works when modelw as created with a clas
            "model_loss": loss.item(),
            "model_acc": acc
            }

# calc model results on test dataset
model_0_results = eval_model(model=model_0, 
                             data_loader=test_dataloader, 
                             loss_fn=loss_fn, 
                             accuracy_fn=accuracy_fn
                             )
model_0_results

# Device agnostic-code for GPU if available

In [None]:
# torch.cuda.is_available()
device = "cuda" if torch.cuda.is_available() else "cpu"
device

# Model 1: better model with non-linear

In [None]:
class FashionMNISTModelV1(nn.Module):
    def __init__(self, 
                 input_shape: int, 
                 hidden_units: int, 
                 output_shape: int):
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=input_shape,
                      out_features=hidden_units),
            nn.ReLU(),
            nn.Linear(in_features=hidden_units,
                      out_features=output_shape),
            nn.ReLU()
        )

    def forward(self, x: torch.Tensor):
        return self.layer_stack(x)

In [None]:
torch.manual_seed(42)
model_1 = FashionMNISTModelV1(input_shape=784, #out_put of flatten layer 28*28 images
                              hidden_units=10,
                              output_shape=len(class_names)).to(device)
next(model_1.parameters()).device

### Model 1 Loss, Optimizer, Evalation

In [None]:
from helper_functions import accuracy_fn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_1.parameters(), 
                            lr=0.1)

### Functionize train/testing loops

1. Training loop - Train_step()
2. Test loop - Test_step()

In [None]:
def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               accuracy_fn,
               device: torch.device = device):
    """
     Performs a training swith model learning on data_loader
    """
    train_loss, train_acc = 0, 0
    model.train()

    for batch, (X, y) in enumerate(data_loader):
        # put data on traget device
        X, y = X.to(device), y.to(device)
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss # accumulate train loss
        train_acc += accuracy_fn(y_true=y, 
                                 y_pred=y_pred.argmax(dim=1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(data_loader)
    train_acc /= len(data_loader)

    print(f"Train loss: {train_loss:.5f} | Train acc: {train_acc:.2f}%")

In [None]:
def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              accuracy_fn,
              device: torch.device = device):
    """
      Preforms testing loop on model with DataLoader
    """
    test_loss, test_acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            X, y = X.to(device), y.to(device)

            test_pred = model(X)
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y, 
                                    y_pred=test_pred.argmax(dim=1))
        
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
        print(f"Test loss: {test_loss:.5f} | Test acc: {test_acc:.2f}%\n")



In [None]:
from tqdm.auto import tqdm
from timeit import default_timer as timer

torch.manual_seed(42)
train_time_start_on_gpu = timer()

epochs = 3

for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n--------")
    train_step(model=model_1, 
               data_loader=train_dataloader, 
               loss_fn=loss_fn,
               optimizer=optimizer,
               accuracy_fn=accuracy_fn,
               device=device)
    
    test_step(model=model_1,
              data_loader=test_dataloader,
              loss_fn=loss_fn,
              accuracy_fn=accuracy_fn,
              device=device)

train_time_end_on_gpu = timer()
total_train_time_model_1 = print_train_time(start=train_time_start_on_gpu, 
                                            end=train_time_end_on_gpu,
                                            device=device)

### Note: depending on data/hardware CPU trains faster than GPU

1. overhead for copying data/model to and from GPU outweights compute benefits of GPU
2. CPU > GPU in terms of compute

In [None]:
torch.manual_seed(42)

def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn,
               device: torch.device = device):
    """
      Return a dictionary containing the results of a model prediction on data_loader
    """
    loss, acc = 0, 0

    model.eval()
    with torch.inference_mode():
        for X, y in tqdm(data_loader):
            X, y = X.to(device), y.to(device)
            y_pred = model(X)
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))

        # scale loss and acc to find averages
        loss /= len(data_loader)
        acc /= len(data_loader)

    return {"model_name": model.__class__.__name__, # only works when modelw as created with a clas
            "model_loss": loss.item(),
            "model_acc": acc
            }


In [None]:
# Model 1 results dict
model_1_results = eval_model(model=model_1,
                             data_loader=test_dataloader,
                             loss_fn=loss_fn,
                             accuracy_fn=accuracy_fn,
                             device=device)
model_1_results

# Model 2: Convolutional Neural Network (CNN)


In [None]:
class FashionMNISTModelV2(nn.Module):
  """
    Model arhitecture TinyVGG modelf rom CNN explainer website
  """

  def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
    super().__init__()
    self.conv_block_1 = nn.Sequential(
      nn.Conv2d(in_channels=input_shape, 
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.Conv2d(in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2,
                   stride=2)
    )

    self.conv_block_2 = nn.Sequential(
      nn.Conv2d(in_channels=hidden_units, 
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.Conv2d(in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2,
                   stride=2)
    )

    self.classifier = nn.Sequential(
      nn.Flatten(),
      nn.Linear(in_features=hidden_units*7*7,
                out_features=output_shape)
    )

  def forward(self, x):
    x = self.conv_block_1(x)
    # print(f"out conv block 1: {x.shape}")
    # torch.Size([1, 10, 14, 14]) goes into conv_block_2
    x = self.conv_block_2(x)
    # print(f"out conv block 2: {x.shape}")
    # output of torch.Size([1, 10, 7, 7]) goes into classifier 
    x = self.classifier(x)
    # print(f"out for classifier: {x.shape}")
    return x

In [None]:
torch.manual_seed(32)
model_2 = FashionMNISTModelV2(input_shape=1, 
                              hidden_units=10, 
                              output_shape=1).to(device)

In [None]:
# img.shape
# plt.imshow(img.squeeze(), cmap="gray")

In [None]:
rand_img_tesnor = torch.randn(size=(1,28,28))
model_2(rand_img_tesnor.unsqueeze(0).to(device))

# stepping nn.Conv2d

In [None]:
torch.manual_seed(42)

images = torch.randn(size=(32, 3, 64, 64))
test_image = images[0]

print(f"Image batch shape: {images.shape}")
print(f"Single Image shape: {test_image.shape}")
print(f"Test_image: {test_image}")

In [None]:
# conv2d layer
conv_layer = nn.Conv2d(in_channels=3, 
                     out_channels=10,
                     kernel_size=(3, 3),
                     stride=1,
                     padding=0)

# pass data through conv
conv_output = conv_layer(test_image)
conv_output.shape

# stepping through nn.MaxPool2d()

In [None]:
print(f"test image original shape: {test_image.shape}")
print(f"test image unsqueeze: {test_image.unsqueeze(0).shape}")

max_pool_layer = nn.MaxPool2d(kernel_size=2)

test_image_through_conv = conv_layer(test_image.unsqueeze(dim=0))
print(f"Shape after going though conv_layer(): {test_image_through_conv.shape}")

test_image_through_conv_and_max_pool = max_pool_layer(test_image_through_conv)
print(f"Shape after going though conv_layer() and Max pool layer: {test_image_through_conv_and_max_pool.shape}")

In [None]:
torch.manual_seed(42)

random_tensor = torch.randn(size=(1, 1, 2, 2))
print(f"\nmax pool tensor:\n: {random_tensor}")
print(f"Max pool tensor shape: {random_tensor.shape}")

max_pool_layer = nn.MaxPool2d(kernel_size=2)

max_pool_tensor = max_pool_layer(random_tensor)
print(f"\nmax pool tensor:\n: {max_pool_tensor}")
print(f"Max pool tensor shape: {max_pool_tensor.shape}")

In [None]:
# Setup loss and optim
from helper_functions import accuracy_fn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_2.parameters(), 
                            lr=0.1)

In [None]:
# Model 2 training and testing
torch.manual_seed(42)
torch.cuda.manual_seed(42)

from timeit import default_timer as timer
from tqdm.auto import tqdm

train_time_start_model_2 = timer()

epochs = 3

for epoch in tqdm(range(epochs)):
  print(f"Epoch: {epoch}\n-------")
  train_step(model=model_2,
             data_loader=train_dataloader,
             loss_fn=loss_fn,
             optimizer=optimizer,
             accuracy_fn=accuracy_fn,
             device=device)
  test_step(model=model_2,
            data_loader=test_dataloader,
            loss_fn=loss_fn,
            accuracy_fn=accuracy_fn,
            device=device)
  
train_time_end_model_2 = timer()
total_train_time_model_2 = print_train_time(start=train_time_start_model_2,
                                            end=train_time_end_model_2,
                                            device=device)

In [None]:
model_2_results = eval_model(
    model=model_2,
    data_loader=test_dataloader,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_fn,
    device=device
)

# Compare model results and training time


In [None]:
import pandas as pd
compare_results = pd.DataFrame([model_0_results,
                                model_1_results,
                                model_2_results])
compare_results

In [None]:
compare_results["trainimg_time"] = [total_train_time_model_0, 
                                    total_train_time_model_1, 
                                    total_train_time_model_2]

In [None]:
compare_results.set_index("model_name")["model_acc"].plot(kind="barh")
plt.xlabel("accuracy (%)")
plt.ylabel("model")

In [None]:
# make and eval random predictions
def make_predictions(model: torch.nn.Module,
                     data: list,
                     device: torch.device = device):
    pred_probs = []
    model.to(device)
    model.eval()
    with torch.inference_mode():
        for sample in data:
            sample = torch.unsqueeze(sample, dim=0).to(device)
            pred_logit = model(sample)
            pred_prob = torch.softmax(pred_logit.squeeze(), dim=0)
            pred_probs.append(pred_prob.cpu())
    return torch.stack(pred_probs)


In [None]:
import random
random.seed(42)
test_samples = []
test_labels = []
for sample, label in random.sample(list(test_data), k=9):
    test_samples.append(sample)
    test_labels.append(label)

test_samples[0].shape

In [None]:
pred_probs = make_predictions(model=model_2,
                              data=test_samples)



In [None]:
pred_classes = pred_probs.argmax(dim=1)
pred_classes

In [None]:
plt.figure(figsize=(9,9))
nrows = 3
ncols = 3
for i, sample in enumerate(test_samples):
    plt.subplot(nrows, ncols, i+1)

    plt.imshow(sample.squeeze(), cmap="gray")

    pred_label = class_names[pred_classes[i]]

    truth_label = class_names[test_labels[i]]

    title_text = f"Pred: {pred_label} | Truth: {truth_label}"

    if pred_label == truth_label:
        plt.title(title_text, fontsize=10, c="g")
    else:
        plt.title(title_text, fontsize=10, c="r")
plt.axis(False)

# Confusion matrix

1. Make predictions with trained model
2. make a confusion matrix `torchmetrics.ConfusionMatrix`
3. plot the confusion matrix using `mlxtend.plotting.plot_confusion_matrix()`


In [None]:
from tqdm.auto import tqdm

y_preds = []
model_2.eval()
with torch.inference_mode():
    for X, y in tqdm(test_dataloader, desc="Making predictions"):
        X, y = X.to(device), y.to(device)
        y_logit = model_2(X)
        y_pred = torch.softmax(y_logit.squeeze(), dim=0).argmax(dim=1)
        y_preds.append(y_pred.cpu())
        
# print(y_preds)
y_pred_tensor = torch.cat(y_preds)
y_pred_tensor[:10]

In [None]:
# example how to import with try catch 
try:
  import torchmetrics, mlxtend
  print(f"mlxtend version: {mlxtend.__version__}")
  assert int(mlxtend.__version.split(".")[1] >= 19, "mlxtend version should be 0.19.0 or higher")
except:
  %pip install torchmetrics -U mlxtend
  import torchmetrics, mlxtend
  print(f"mlxtend version: {mlxtend.__version__}")


In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

confmat = ConfusionMatrix(num_classes=len(class_names))
confmat_tensor = confmat(preds=y_pred_tensor,
                         target=test_data.targets)

fig, ax = plot_confusion_matrix(
    conf_mat=confmat_tensor,
    class_names=class_names,
    figsize=(10, 7)
)

#  Save and load best performing model


In [None]:
from pathlib import Path

MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True,
                 exist_ok=True)

MODEL_NAME = "03_pytorch_computer_vision_model_2.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

print(f"Saving model to: {MODEL_SAVE_PATH}")
torch.save(obj=model_2.state_dict(),
           f=MODEL_SAVE_PATH)

In [None]:
torch.manual_seed(42)
loaded_model_2 = FashionMNISTModelV2(input_shape=1,
                                     hidden_units=10,
                                     output_shape=len(class_names))

loaded_model_2.load_state_dict(torch.load(f=MODEL_SAVE_PATH))

loaded_model_2.to(device)

In [None]:
# Eval loaded model
torch.manual_seed(42)

loaded_model_2_results = eval_model(
    model=loaded_model_2,
    data_loader=test_dataloader,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_fn
)

loaded_model_2_results

In [None]:
torch.isclose(torch.tensor(model_2_results["model_loss"]),
              torch.tensor(loaded_model_2_results["model_loss"]),
              atol=1e-02)