## 1.0) data_fetch.py

In [23]:
# %%writefile data_fetch.py

import requests
import zipfile
from pathlib import Path
import os

# Setup path to data folder
data_path = Path("../data/")
image_path = data_path / "pizza_steak_sushi"

# If the image folder doesn't exist, download it and prepare it... 
if image_path.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {image_path} directory, creating one...")
    image_path.mkdir(parents=True, exist_ok=True)
    
    # Download pizza, steak, sushi data
    with open(data_path / "pizza_steak_sushi.zip", "wb") as f:
        request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip") # Need the raw github url
        print("Downloading pizza, steak, sushi data...")
        f.write(request.content)

    # Unzip pizza, steak, sushi data
    with zipfile.ZipFile(data_path / "pizza_steak_sushi.zip", "r") as zip_ref:
        print("Unzipping pizza, steak, sushi data...") 
        zip_ref.extractall(image_path)

    # Remove zip file
    os.remove(data_path / "pizza_steak_sushi.zip")

../data/pizza_steak_sushi directory exists.


## 2.0) data_loader.py

In [15]:
# %%writefile data_loader.py

import torch
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import v2 as transforms

import os


NUM_WORKERS = os.cpu_count()

def create_dataloaders(
    train_dir: str, 
    test_dir: str, 
    transform: transforms.Compose, 
    batch_size: int, 
    num_workers: int=NUM_WORKERS
):
  """Creates training and testing DataLoaders.

  Takes in a training directory and testing directory path and turns them into PyTorch Datasets and then into PyTorch DataLoaders.

  Args:
    train_dir: Path to training directory. 
    test_dir: Path to testing directory.
    transform: torchvision transforms to perform on training and testing data.
    batch_size: Number of samples per batch in each of the DataLoaders.
    num_workers: An integer for number of workers per DataLoader.

  Returns:
    A tuple of (train_dataloader, test_dataloader, class_names). Where class_names is a list of the target classes.
    
    Example usage:
      train_dataloader, test_dataloader, class_names = create_dataloaders(train_dir=path/to/train_dir,
                                                                          test_dir=path/to/test_dir,
                                                                          transform=some_transform,
                                                                          batch_size=32,
                                                                          num_workers=4)
  """

  # Use ImageFolder to create dataset
  train_data = datasets.ImageFolder(train_dir, transform = transform)
  test_data = datasets.ImageFolder(test_dir, transform = transform)

  # Get class names
  class_names = train_data.classes

  # Turn images into data loaders
  train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
  test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)

  return train_dataloader, test_dataloader, class_names

# If you load your samples in the Dataset on CPU and would like to push it during training to the GPU, 
# you can speed up the host to device transfer by enabling pin_memory.

### 2.1) Sanity check

In [11]:
train_transform_trivial_augment = transforms.Compose([transforms.Resize((64, 64)),
                                                      transforms.TrivialAugmentWide(num_magnitude_bins=31),
                                                      transforms.ToImage(), transforms.ToDtype(torch.float32, scale=True) ])

# test_transform = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor() ])

train_dataloader, test_dataloader, class_names = create_dataloaders(train_dir= "../data/pizza_steak_sushi/train", test_dir="../data/pizza_steak_sushi/test", transform=train_transform_trivial_augment, batch_size=32, num_workers=4)


# 1. Get a batch of images and labels from the DataLoader
img_batch, label_batch = next(iter(train_dataloader))
print(img_batch.shape, label_batch.shape)

# 2. Get a single image from the batch and unsqueeze the image so its shape fits the model
img_single, label_single = img_batch[0].unsqueeze(dim=0), label_batch[0]
print(f"Single image shape: {img_single.shape}\n")

torch.Size([32, 3, 64, 64]) torch.Size([32])
Single image shape: torch.Size([1, 3, 64, 64])



## 3.0 model.py

In [17]:
# %%writefile model.py

"""
Contains PyTorch model code to instantiate a TinyVGG model.
"""

import torch
from torch import nn 

class TinyVGG(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, out_channels=hidden_units,  kernel_size=3, stride=1, padding=1), 
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units*16*16, out_features=output_shape)
        )
    
    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        # print(x.shape)
        x = self.conv_block_2(x)
        # print(x.shape)
        x = self.classifier(x)
        # print(x.shape)
        return x
        # return self.classifier(self.conv_block_2(self.conv_block_1(x))) # <- leverage the benefits of operator fusion - Horace he blogspot

### 3.1) Sanity check

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(42)
model_0 = TinyVGG(input_shape=3,hidden_units=10, output_shape=len(class_names)).to(device)
model_0

TinyVGG(
  (conv_block_1): Sequential(
    (0): Conv2d(3, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block_2): Sequential(
    (0): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=2560, out_features=3, bias=True)
  )
)

In [13]:
import torchinfo
torchinfo.summary(model_0, input_size=img_batch.shape)

Layer (type:depth-idx)                   Output Shape              Param #
TinyVGG                                  [32, 3]                   --
├─Sequential: 1-1                        [32, 10, 32, 32]          --
│    └─Conv2d: 2-1                       [32, 10, 64, 64]          280
│    └─ReLU: 2-2                         [32, 10, 64, 64]          --
│    └─Conv2d: 2-3                       [32, 10, 64, 64]          910
│    └─ReLU: 2-4                         [32, 10, 64, 64]          --
│    └─MaxPool2d: 2-5                    [32, 10, 32, 32]          --
├─Sequential: 1-2                        [32, 10, 16, 16]          --
│    └─Conv2d: 2-6                       [32, 10, 32, 32]          910
│    └─ReLU: 2-7                         [32, 10, 32, 32]          --
│    └─Conv2d: 2-8                       [32, 10, 32, 32]          910
│    └─ReLU: 2-9                         [32, 10, 32, 32]          --
│    └─MaxPool2d: 2-10                   [32, 10, 16, 16]          --
├─Sequentia

## 4.0) engine.py

In [5]:
# %%writefile engine.py

"""
Contains functions for training and testing a PyTorch model.
"""

import torch

from tqdm.auto import tqdm
from typing import Dict, List, Tuple

def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               device: torch.device = None) -> Tuple[float, float]:
    model.train()
    model.to(device)
    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)

    return train_loss, train_acc



def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              device: torch.device = None) -> Tuple[float, float] :

    model.eval()
    model.to(device)
    test_loss, test_acc = 0, 0
    
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            test_pred_logits = model(X)
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)

    return test_loss, test_acc



def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = torch.nn.CrossEntropyLoss(),
          epochs: int = 5,
          device: torch.device = None) -> Dict[str, List]:
    
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }
    
    for epoch in tqdm(range(epochs)):

        train_loss, train_acc = train_step(model=model, dataloader=train_dataloader, loss_fn=loss_fn, optimizer=optimizer)
        test_loss, test_acc = test_step(model=model, dataloader=test_dataloader, loss_fn=loss_fn)
        
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results

Overwriting engine.py


## 5.0) utils.py

In [6]:
# %%writefile utils.py
"""
Contains various utility functions for PyTorch model training and saving.
"""
import torch
from pathlib import Path

def save_model(model: torch.nn.Module,
               target_dir: str,
               model_name: str):
  """Saves a PyTorch model to a target directory.

  Args:
    model: A target PyTorch model to save.
    target_dir: A directory for saving the model to.
    model_name: A filename for the saved model. Should include either ".pth" or ".pt" as the file extension.

  Example usage:
    save_model(model=model_0, target_dir="models", model_name="05_going_modular_tingvgg_model.pth")
  """
  # Create target directory
  target_dir_path = Path(target_dir)
  target_dir_path.mkdir(parents=True, exist_ok=True)

  # Create model save path
  assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
  model_save_path = target_dir_path / model_name

  # Save the model state_dict()
  print(f"[INFO] Saving model to: {model_save_path}")
  torch.save(obj=model.state_dict(),
             f=model_save_path)

Overwriting utils.py


## 6.0) train.py

In [9]:
# %%writefile train.py


"""
Trains a PyTorch image classification model using device-agnostic code.
"""

import os
import torch
import data_fetch, data_loader, model, engine, utils

import torchvision.transforms.v2 as transforms
import argparse

# -----------------------------------------------------------------

parser = argparse.ArgumentParser(description="PyTorch Classification Training", add_help=True)

parser.add_argument(
        "--epochs",
        default=12,
        type=int,
        help="number of epochs" )

parser.add_argument(
        "--batch_size",
        default=32,
        type=int,
        help="batch size")

parser.add_argument(
        "--hidden_units",
        default=10,
        type=int,
        help="total hidden units")

parser.add_argument(
        "--lr",
        default=0.001,
        type=float,
        help="learning rate")

args = parser.parse_args()

#--------------------------------------------------------------------

# Setup hyperparameters
NUM_EPOCHS = args.epochs
BATCH_SIZE = args.batch_size
HIDDEN_UNITS = args.hidden_units
LEARNING_RATE = args.lr

# Setup directories
basedir = "../"
train_dir = basedir + "data/pizza_steak_sushi/train"
test_dir = basedir + "data/pizza_steak_sushi/test"

# Setup target device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create transforms
train_transform_trivial_augment = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.TrivialAugmentWide(num_magnitude_bins=31),
    transforms.ToImage(), 
    transforms.ToDtype(torch.float32, scale=True)
])

# Create testing transform (no data augmentation)
# test_transform = transforms.Compose([
#     transforms.Resize((64, 64)),
#     transforms.ToImage(), 
#     transforms.ToDtype(torch.float32, scale=True)
# ])

# Create DataLoaders
train_dataloader, test_dataloader, class_names = data_loader.create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=train_transform_trivial_augment,
    batch_size=BATCH_SIZE
)

# Model, loss, optimizer
model = model.TinyVGG( input_shape=3, hidden_units=HIDDEN_UNITS, output_shape=len(class_names))
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Start training with help from engine.py
engine.train(model=model,
             train_dataloader=train_dataloader,
             test_dataloader=test_dataloader,
             loss_fn=loss_fn,
             optimizer=optimizer,
             epochs=NUM_EPOCHS,
             device=device)

# Save the model with help from utils.py
utils.save_model(model=model, target_dir="../models", model_name="tinyvgg_model_v1.pth")



Overwriting train.py


## 7.0) predict.py

In [14]:
%%writefile predict.py

"""predict the label when fed with an image_path"""

"""predict the label when fed with an image_path"""

import torchvision
import torch
import torchvision.transforms.v2 as transforms
import argparse
import model as model_repo

import warnings
warnings.filterwarnings("ignore", category=UserWarning) 

class_names = ['pizza', 'steak', 'sushi']

def get_args_parser(add_help=True):
    
    parser = argparse.ArgumentParser(description="PyTorch Classification Prediction", add_help=add_help)

    parser.add_argument(
                    "--img",
                    default="../data/04-pizza-dad.jpeg",
                    type=str,
                    help="image path" )

    parser.add_argument(
                    "--model_state_path",
                    default="../models/tinyvgg_model_v1.pth",
                    type=str,
                    help="model_state_path")

    return parser

def predict(img_path: str, model: torch.nn.Module, device: torch.device = None):

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "gpu")

    img = torchvision.io.read_image(str(img_path)).type(torch.float32)
    img /= 255.

    composition = transforms.Compose([
                             transforms.Resize((64, 64))])
    
    img_transformed = composition(img)

    model.to(device)
    model.eval()
    with torch.inference_mode():

        img_pred = model(img_transformed.unsqueeze(dim=0).to(device))

        print(f"Prediction logits: {img_pred.cpu().numpy()}")

        img_pred_prob = torch.softmax(img_pred, dim=1)
        print(f"Prediction probabilities: {img_pred_prob.cpu().numpy()}")

        img_pred_label = torch.argmax(img_pred_prob, dim=1)
        print(f"Prediction label: {img_pred_label.cpu().numpy()}")

        img_pred_class = class_names[img_pred_label] # put pred label to CPU, otherwise will error
        print(f"Prediction class: {img_pred_class}")
        

def main(args):
    model = model_repo.TinyVGG( input_shape=3, hidden_units=10, output_shape=len(class_names))
    model.load_state_dict(torch.load(f=args.model_state_path))
    predict(args.img, model)

if __name__ == "__main__":
    args = get_args_parser().parse_args()
    main(args)


Overwriting predict.py
