## 0.Getting Setup

In [1]:
import torch
import torchvision
print(f"torch version: {torch.__version__}")
print(f"torchvision version: {torchvision.__version__}")

torch version: 2.3.0+cu121
torchvision version: 0.18.0+cu121


In [2]:
#regural import
import matplotlib.pyplot as plt

from torch import nn
from torchvision import transforms

#import torchinfo
try:
  from torchinfo import summary
except:
  print(f"Downloading torchinfo")
  !pip install -q torchinfo
  from torchinfo import summary

#import going modular
try:
  from going_modular.going_modular import data_setup, engine
except:
  print(f"cloning github")
  !git clone https://github.com/mrdbourke/pytorch-deep-learning
  !mv pytorch-deep-learning/going_modular .
  !rm -rf pytorch-deep-learning
  from going_modular.going_modular import data_setup, engine

In [3]:
!pip install torch_xla
def get_device():
    if torch.cuda.is_available():
        return "cuda"
    else:
        try:
            import torch_xla.core.xla_model as xm
            return xm.xla_device()
        except ImportError:
            return "cpu"

device = get_device()
device

'cpu'

## 1.Get data

In [4]:
import os
import zipfile
from pathlib import Path

import requests

def download_data(source,
                  destination,
                  remove_source: bool=True
                  ):

  #set up folder
  data_path = Path("data/")
  image_path = data_path / destination

  if image_path.is_dir():
    print(f"[INFO] {image_path} : already exist")
  else:
    print(f"Crating folder")
    image_path.mkdir(parents=True, exist_ok=True)

    #download data
    target_file = Path(source).name
    with open(data_path / target_file, "wb") as f:
      request = requests.get(source)
      print(f"Getting Data")
      f.write(request.content)

    #opening zipfile
    with zipfile.ZipFile(data_path / target_file, "r") as zip_ref:
      print(f"unzipping")
      zip_ref.extractall(image_path)

    if remove_source:
      os.remove(data_path / target_file)

  return image_path

image_path = download_data(source="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip",
                           destination="pizza_steak_sushi")
image_path

[INFO] data/pizza_steak_sushi : already exist


PosixPath('data/pizza_steak_sushi')

## 2.Create dataloaders

In [5]:
#manual creation
train_dir = image_path / "train"
test_dir = image_path / "test"

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])


manual_transform = transforms.Compose([
    transforms.Resize(size=(224, 224)),
    transforms.ToTensor(),
    normalize
])

train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir = train_dir,
    test_dir = test_dir,
    transform = manual_transform,
    batch_size = 32
)

train_dataloader, test_dataloader, class_names

(<torch.utils.data.dataloader.DataLoader at 0x795bd597b880>,
 <torch.utils.data.dataloader.DataLoader at 0x795bd597b8e0>,
 ['pizza', 'steak', 'sushi'])

In [6]:
#get transform from model itself
train_dir = image_path / "train"
test_dir = image_path / "test"

weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT

auto_transform = weights.transforms()

train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir = train_dir,
    test_dir = test_dir,
    transform = auto_transform,
    batch_size = 32
)

train_dataloader, test_dataloader, class_names

(<torch.utils.data.dataloader.DataLoader at 0x795bd597bf70>,
 <torch.utils.data.dataloader.DataLoader at 0x795bd597be50>,
 ['pizza', 'steak', 'sushi'])

## 3.Prepare model
* Getting a pretrained model
* freeze the base layers
* Changing the classifier shape

In [7]:
#download the weights of EfficientNet_B0_Weights
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT

#setup the downloaded weights to model
model = torchvision.models.efficientnet_b0(weights=weights).to(device)

summary(model,
        input_size=(32, 3, 224, 224), # make sure this is "input_size", not "input_shape" (batch_size, color_channels, height, width)
         verbose=0,
         col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
         row_settings=["var_names"]
        )

Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                                  [32, 3, 224, 224]    [32, 1000]           --                   True
├─Sequential (features)                                      [32, 3, 224, 224]    [32, 1280, 7, 7]     --                   True
│    └─Conv2dNormActivation (0)                              [32, 3, 224, 224]    [32, 32, 112, 112]   --                   True
│    │    └─Conv2d (0)                                       [32, 3, 224, 224]    [32, 32, 112, 112]   864                  True
│    │    └─BatchNorm2d (1)                                  [32, 32, 112, 112]   [32, 32, 112, 112]   64                   True
│    │    └─SiLU (2)                                         [32, 32, 112, 112]   [32, 32, 112, 112]   --                   --
│    └─Sequential (1)                                        [32, 32, 112, 112]   [32, 16, 112

In [8]:
#freezing the base layers
from torch import nn

for params in model.features.parameters():
  params.requires_grad = False

#update classifier
model.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True),
    nn.Linear(in_features= 1280,
              out_features=len(class_names),
              bias=True).to(device)
)

summary(model,
        input_size=(32, 3, 224, 224),
        verbose=0,
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
        )

Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                                  [32, 3, 224, 224]    [32, 3]              --                   Partial
├─Sequential (features)                                      [32, 3, 224, 224]    [32, 1280, 7, 7]     --                   False
│    └─Conv2dNormActivation (0)                              [32, 3, 224, 224]    [32, 32, 112, 112]   --                   False
│    │    └─Conv2d (0)                                       [32, 3, 224, 224]    [32, 32, 112, 112]   (864)                False
│    │    └─BatchNorm2d (1)                                  [32, 32, 112, 112]   [32, 32, 112, 112]   (64)                 False
│    │    └─SiLU (2)                                         [32, 32, 112, 112]   [32, 32, 112, 112]   --                   --
│    └─Sequential (1)                                        [32, 32, 112, 112]   [32, 

## 4.Train a model and track a summary

In [9]:
#loss fn and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(),
                             lr = 0.001)

In [10]:
#importing tensorbord
from torch.utils.tensorboard import SummaryWriter

#create writer with all default settings
writer = SummaryWriter()

In [11]:
from typing import Dict, List
from tqdm.auto import tqdm

from going_modular.going_modular.engine import train_step, test_step

def train(model,
          train_dataloader,
          test_dataloader,
          loss_fn,
          optimizer,
          epoch: int=5,
          device=device):

  result = {"train_loss":[],
            "train_acc":[],
            "test_loss":[],
            "test_acc":[],
            }

  for epoc in tqdm(range(epoch)):
    train_loss, train_acc = train_step(
      model = model,
      dataloader = train_dataloader,
      loss_fn= loss_fn,
      optimizer= optimizer,
      device = device
      )

    test_loss, test_acc = test_step(
      model = model,
      dataloader = test_dataloader,
      loss_fn= loss_fn,
      device = device
      )

    print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
        )

    result["train_loss"].append(train_loss)
    result["train_acc"].append(train_acc)
    result["test_loss"].append(test_loss)
    result["test_acc"].append(test_acc)

    #### NEW: Experiment tracking
    #see SummaryWriter doc
    writer.add_scalars(
        main_tag="Loss",
        tag_scalar_dict={"train_loss": train_loss,
                         "test_loss": test_loss},
        global_step= epoc
    )

    writer.add_scalars(
        main_tag="Acc",
        tag_scalar_dict={"train_acc": train_acc,
                         "test_acc": test_acc},
        global_step= epoc
    )

    writer.add_graph(model=model,
                     input_to_model=torch.randn(32, 3, 224, 224).to(device))

  writer.close()

  return result

In [12]:
result = train(
    model=model,
    train_dataloader= train_dataloader,
    test_dataloader=test_dataloader,
    loss_fn=loss_fn,
    optimizer = optimizer,
    epoch=5,
    device=device
)

  0%|          | 0/5 [00:00<?, ?it/s]

  self.pid = os.fork()
  self.pid = os.fork()


Epoch: 6 | train_loss: 1.0433 | train_acc: 0.4062 | test_loss: 0.9454 | test_acc: 0.6004
Epoch: 6 | train_loss: 0.8337 | train_acc: 0.7617 | test_loss: 0.7313 | test_acc: 0.8239
Epoch: 6 | train_loss: 0.8237 | train_acc: 0.6406 | test_loss: 0.6739 | test_acc: 0.8551
Epoch: 6 | train_loss: 0.6824 | train_acc: 0.7734 | test_loss: 0.7059 | test_acc: 0.8144


KeyboardInterrupt: 

In [None]:
result

## 5.View result in TensorBord

In [None]:
%load_ext tensorboard
%tensorboard --logdir=runs

from google.colab import output
output.serve_kernel_port_as_window(6006)

## 6.Creating fn to create summarywriter

In [None]:
def create_writer(experiment_name,
                  model_name,
                  extra):
  import os
  from datetime import datetime

  #get date in yy-mm-dd
  timestamp = datetime.now().strftime("%Y-%m-%d")

  if extra:
    log_dir = os.path.join("runs", timestamp, experiment_name, model_name, extra)
  else:
    log_dir = os.path.join("runs", timestamp, experiment_name, model_name)

  print(f"[INFO] Created SummaryWriter, saving to: {log_dir}...")
  return SummaryWriter(log_dir = log_dir)

In [None]:
example_writer = create_writer(experiment_name="data_10_percent",
              model_name="effnetb0",
              extra="5_epochs")

example_writer

### 6.1Update train fn to include writer

In [None]:
from going_modular.going_modular.engine import train_step, test_step

def train(model,
          train_dataloader,
          test_dataloader,
          loss_fn,
          optimizer,
          writer,
          epochs: int=5,
          device=device):

  result = {"train_loss":[],
            "train_acc":[],
            "test_loss":[],
            "test_acc":[]}

  for epoch in tqdm(range(epochs)):
    train_loss, train_acc = train_step(model=model,
                                       dataloader=train_dataloader,
                                       loss_fn = loss_fn,
                                       optimizer = optimizer,
                                       device=device
                                       )

    test_loss, test_acc = test_step(model=model,
                                   dataloader=test_dataloader,
                                   loss_fn = loss_fn,
                                   device=device
                                   )
    print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
        )

    result["train_loss"].append(train_loss)
    result["train_acc"].append(train_acc)
    result["test_loss"].append(test_loss)
    result["test_acc"].append(test_acc)
    #experiment tracking
    if writer:
      writer.add_scalars(main_tag="Loss",
                               tag_scalar_dict={"train_loss": train_loss,
                                                "test_loss": test_loss},
                               global_step=epoch)

      writer.add_scalars(main_tag="Accuracy",
                        tag_scalar_dict={"train_acc": train_acc,
                                        "test_acc": test_acc},
                        global_step=epoch)

      writer.close()

  return result

## 7.Setting up a series of modelling

In [None]:
#download data
data_10_percent_path = download_data(source="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip",
                                     destination="pizza_steak_sushi")

data_20_percent_path = download_data(source="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi_20_percent.zip",
                                     destination="pizza_steak_sushi_20_percent")

In [None]:
#training_data
train_dir_10_percent = data_10_percent_path / "train"
train_dir_20_percent = data_20_percent_path / "train"

#test data (same fro both train)
test_dir = data_10_percent_path / "test"

print(f"Training directory 10%: {train_dir_10_percent}")
print(f"Training directory 20%: {train_dir_20_percent}")
print(f"Testing directory: {test_dir}")

In [None]:
manual_transform

In [None]:
#creating dataloaders
from going_modular.going_modular import data_setup

BATCH_SIZE = 32

# for 10 % data
train_dataloader_10_per, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir = train_dir_10_percent,
    test_dir = test_dir,
    transform = manual_transform,
    batch_size=BATCH_SIZE
)

# for 20 % data
train_dataloader_20_per, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir = train_dir_20_percent,
    test_dir = test_dir,
    transform = manual_transform,
    batch_size=BATCH_SIZE
)

print(f"Number of batches of size {BATCH_SIZE} in 10 percent training data: {len(train_dataloader_10_per)}")
print(f"Number of batches of size {BATCH_SIZE} in 20 percent training data: {len(train_dataloader_20_per)}")
print(f"Number of batches of size {BATCH_SIZE} in testing data: {len(train_dataloader_10_per)} (all experiments will use the same test set)")
print(f"Number of classes: {len(class_names)}, class names: {class_names}")

In [None]:
#creat feature extractor model

import torchvision
from torchinfo import summary

eff_b2_weights = torchvision.models.EfficientNet_B2_Weights.DEFAULT
eff_b2_model = torchvision.models.efficientnet_b2(weights=eff_b2_weights)

summary(model=eff_b2_model,
        input_size=(32, 3, 224, 224),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
        )

In [None]:
from torch import nn
import torchvision
OUT_FEATURES = len(class_names)

def create_eff_b0():
  weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
  model = torchvision.models.efficientnet_b0(weights=weights).to(device)

   # 2. Freeze the base model layers
  for param in model.features.parameters():
    param.requires_grad = False

  #changing classifier head
  model.classifier = nn.Sequential(
      nn.Dropout(p=0.2),
      nn.Linear(in_features=1280,
                out_features=OUT_FEATURES)
  ).to(device)

  # 5. Give the model a name
  model.name = "effnetb0"
  print(f"[INFO] Created new {model.name} model.")
  return model

def create_eff_b2():
  weights = torchvision.models.EfficientNet_B2_Weights.DEFAULT
  model = torchvision.models.efficientnet_b2(weights=weights).to(device)

   # 2. Freeze the base model layers
  for param in model.features.parameters():
    param.requires_grad = False

  #changing classifier head
  model.classifier = nn.Sequential(
      nn.Dropout(p=0.2),
      nn.Linear(in_features=1408,
                out_features=OUT_FEATURES)
  ).to(device)

  # 5. Give the model a name
  model.name = "effnetb0"
  print(f"[INFO] Created new {model.name} model.")
  return model

In [None]:
effnet_b0 = create_eff_b0()

summary(model=effnet_b0,
        input_size=(32, 3, 224, 224),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
        )

In [None]:
effnet_b2 = create_eff_b2()

summary(model=effnet_b2,
        input_size=(32, 3, 224, 224),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
        )

### 7.1Create a experiment and setup training code

In [None]:
# epoch list
num_epochs = [5, 10]

# model
models = ["effnet_b0", "effnet_b2"]

# train dataloaders
train_dataloaders = {"data_10_percent" : train_dataloader_10_per,
                     "data_20_percent" : train_dataloader_20_per}

In [None]:
%%time
from going_modular.going_modular.utils import save_model

# 2. Keep track of experiment numbers
experiment_number = 0

# 3. Loop through each DataLoader
for dataloader_name, train_dataloader in train_dataloaders.items():

    # 4. Loop through each number of epochs
    for epochs in num_epochs:

        # 5. Loop through each model name and create a new model based on the name
        for model_name in models:

            # 6. Create information print outs
            experiment_number += 1
            print(f"[INFO] Experiment number: {experiment_number}")
            print(f"[INFO] Model: {model_name}")
            print(f"[INFO] DataLoader: {dataloader_name}")
            print(f"[INFO] Number of epochs: {epochs}")

            # 7. Select the model
            if model_name == "effnetb0":
                model = create_eff_b0() # creates a new model each time (important because we want each experiment to start from scratch)
            else:
                model = create_eff_b2() # creates a new model each time (important because we want each experiment to start from scratch)

            # 8. Create a new loss and optimizer for every model
            loss_fn = nn.CrossEntropyLoss()
            optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)

            # 9. Train target model with target dataloaders and track experiments
            train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=test_dataloader,
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=epochs,
                  device=device,
                  writer=create_writer(experiment_name=dataloader_name,
                                       model_name=model_name,
                                       extra=f"{epochs}_epochs"))

            # 10. Save the model to file so we can get back the best model
            save_filepath = f"07_{model_name}_{dataloader_name}_{epochs}_epochs.pth"
            save_model(model=model,
                       target_dir="models",
                       model_name=save_filepath)
            print("-"*50 + "\n")

## 8.View Experiment in Tensorbord

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=runs

from google.colab import output
output.serve_kernel_port_as_window(6006)

In [None]:
!tensorboard dev upload --logdir runs \
    --name "07. PyTorch Experiment Tracking: FoodVision Mini model result (video)" \
    --description "Comparing results of different model size, training data amount and training time."
    #--one_shot

## 9.Load the best model and make prediction

In [None]:
#best model path
best_model_path = "models/07_effnet_b2_data_20_percent_10_epochs.pth"

#model instance
best_model = create_eff_b2()

#load the state dict of save model
best_model.load_state_dict(torch.load(best_model_path))

### 9.1for checking size

In [None]:
# Check the model file size
from pathlib import Path

# Get the model size in bytes then convert to megabytes
effnetb2_model_size = Path(best_model_path).stat().st_size // (1024*1024)
print(f"EfficientNetB2 feature extractor model size: {effnetb2_model_size} MB")

In [None]:
# Import function to make predictions on images and plot them
from going_modular.going_modular.predictions import pred_and_plot_image

# Get a random list of 3 images from 20% test set
import random
num_images_to_plot = 3
test_image_path_list = list(Path(data_20_percent_path / "test").glob("*/*.jpg")) # get all test image paths from 20% dataset
test_image_path_sample = random.sample(population=test_image_path_list,
                                       k=num_images_to_plot) # randomly select k number of images

# Iterate through random test image paths, make predictions on them and plot them
for image_path in test_image_path_sample:
    pred_and_plot_image(model=best_model,
                        image_path=image_path,
                        class_names=class_names,
                        image_size=(224, 224))

### 9.2On custom data

In [None]:
# Download custom image
import requests

# Setup custom image path
custom_image_path = Path("data/04-pizza-dad.jpeg")

# Download the image if it doesn't already exist
if not custom_image_path.is_file():
    with open(custom_image_path, "wb") as f:
        # When downloading from GitHub, need to use the "raw" file link
        request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/images/04-pizza-dad.jpeg")
        print(f"Downloading {custom_image_path}...")
        f.write(request.content)
else:
    print(f"{custom_image_path} already exists, skipping download.")

# Predict on custom image
pred_and_plot_image(model=best_model,
                    image_path=custom_image_path,
                    class_names=class_names)