In [None]:
import torch
import torchvision
from torch import nn
from torchvision import transforms
import matplotlib.pyplot as plt
from torchinfo import summary
from going_modular.going_modular import data_setup, engine, utils
from helper_functions import set_seeds, plot_loss_curves
from pathlib import Path
import pandas as pd
from PIL import Image
from timeit import default_timer as timer 
from tqdm.auto import tqdm
from typing import List, Dict
import gradio

In [None]:
# to use cuda
device = "cuda" if torch.cuda.is_available() else "cpu"

# get data win 20%
train_dir = "./data/pizza_steak_sushi/train/"
test_dir = "./data/pizza_steak_sushi/test/"

## Create a EfficientNetB2

In [None]:
# creating an EffNetB2 Weights
effnetb2_weights = torchvision.models.EfficientNet_B2_Weights.DEFAULT

# get effnetb2 transform
effnetb2_transform = effnetb2_weights.transforms()

# setup pretrain Model
effnetb2 = torchvision.models.efficientnet_b2(weights="DEFAULT")

# freeze the base layers in the model
for param in effnetb2.parameters():
    param.requires_grad = False

# check last layer effnetb2
effnetb2.classifier

In [None]:
# change last layer effnetb2
set_seeds(42)
effnetb2.classifier = nn.Sequential(
    nn.Dropout(p=0.3, inplace=True),
    nn.Linear(in_features=1408, out_features=3, bias=True) ) # 3 for class_names

In [None]:
# summary model
summary(effnetb2, input_size=(1, 3, 224, 224),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20, row_settings=["var_names"]) 

In [None]:
# setup dataloader
train_dataloader_effnetb2, test_dataloader_effnetb2, class_names = data_setup.create_dataloaders(
    train_dir=train_dir, test_dir=test_dir,batch_size=32,transform=effnetb2_transform)

In [None]:
# optimizer, loss & train model
set_seeds(42)
optimizer = torch.optim.Adam(params=effnetb2.parameters(), lr=1e-3)

loss_fn = nn.CrossEntropyLoss()

effnetb2_results = engine.train(model=effnetb2, train_dataloader=train_dataloader_effnetb2,
                                test_dataloader=test_dataloader_effnetb2, epochs=10,
                                optimizer=optimizer, loss_fn=loss_fn, device=device)


In [None]:
# plot results effnetb2
plot_loss_curves(effnetb2_results)

In [None]:
# Save the model
utils.save_model(model=effnetb2, target_dir="models",
                 model_name="09_pretrained_effnetb2.pth")

In [None]:
# Get the model size in bytes then convert to megabytes
pretrained_effnetb2_model_size = Path("models/09_pretrained_effnetb2.pth").stat().st_size // (1024*1024) # division converts bytes to megabytes (roughly) 
print(f"Pretrained EffNetB2 feature extractor model size: {pretrained_effnetb2_model_size} MB")

In [None]:
# Count number of parameters in EffNetB2
effnetb2_total_params = sum(torch.numel(param) for param in effnetb2.parameters())
effnetb2_total_params

In [None]:
# Create a dictionary with EffNetB2 statistics
effnetb2_stats = {"test_loss": effnetb2_results["test_loss"][-1],
                  "test_acc": effnetb2_results["test_acc"][-1],
                  "number_of_parameters": effnetb2_total_params,
                  "model_size (MB)": pretrained_effnetb2_model_size}
effnetb2_stats

## Create a Vit

In [None]:
# creating an Vit Weights
vit_weights = torchvision.models.ViT_B_16_Weights.DEFAULT

# get vit transform
vit_transform = vit_weights.transforms()

# setup pretrain Model
vit = torchvision.models.vit_b_16(weights="DEFAULT")

# freeze the base layers in the model
for param in vit.parameters():
    param.requires_grad = False

# check last layer vit, en el caso de Vit es heads
vit.heads

In [None]:
# change last layer vit
set_seeds(42)
vit.heads = nn.Sequential(
    nn.Linear(in_features=768, out_features=3, bias=True) ) # 3 for class_names

In [None]:
# summary model
summary(vit, input_size=(1, 3, 224, 224),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20, row_settings=["var_names"]) 

In [None]:
# setup dataloader
train_dataloader_vit, test_dataloader_vit, class_names = data_setup.create_dataloaders(
    train_dir=train_dir, test_dir=test_dir,batch_size=32,transform=vit_transform)

In [None]:
# optimizer, loss & train model
set_seeds(42)
optimizer = torch.optim.Adam(params=vit.parameters(), lr=1e-3)

loss_fn = nn.CrossEntropyLoss()

vit_results = engine.train(model=vit, train_dataloader=train_dataloader_vit,
                                test_dataloader=test_dataloader_vit, epochs=10,
                                optimizer=optimizer, loss_fn=loss_fn, device=device)


In [None]:
# plot results effnetb2
plot_loss_curves(vit_results)

In [None]:
# Save the model
utils.save_model(model=vit, target_dir="models",
                 model_name="09_pretrained_vit.pth")

In [None]:
# Get the model size in bytes then convert to megabytes
pretrained_vit_model_size = Path("models/09_pretrained_vit.pth").stat().st_size // (1024*1024) # division converts bytes to megabytes (roughly) 
print(f"Pretrained Vit feature extractor model size: {pretrained_vit_model_size} MB")

In [None]:
# Count number of parameters in EffNetB2
vit_total_params = sum(torch.numel(param) for param in vit.parameters())
vit_total_params

In [None]:
# Create ViT statistics dictionary
vit_stats = {"test_loss": vit_results["test_loss"][-1],
             "test_acc": vit_results["test_acc"][-1],
             "number_of_parameters": vit_total_params,
             "model_size (MB)": pretrained_vit_model_size}

vit_stats

In [None]:
effnetb2_stats

## Making and timing predictions with EfficientNetB2

In [None]:
# Get all test data paths
print(f"[INFO] Finding all filepaths ending with '.jpg' in directory: {test_dir}")
test_data_paths = list(Path(test_dir).glob("*/*.jpg"))
test_data_paths[:5]

In [None]:
# 1. Create a function to return a list of dictionaries with sample, truth label, prediction, prediction probability and prediction time
def pred_and_store(paths: List[Path], 
                   model: torch.nn.Module,
                   transform: torchvision.transforms, 
                   class_names: List[str], 
                   device: str = "cuda" if torch.cuda.is_available() else "cpu") -> List[Dict]:
    
    # 2. Create an empty list to store prediction dictionaires
    pred_list = []
    
    # 3. Loop through target paths
    for path in tqdm(paths):
        
        # 4. Create empty dictionary to store prediction information for each sample
        pred_dict = {}

        # 5. Get the sample path and ground truth class name
        pred_dict["image_path"] = path
        class_name = path.parent.stem
        pred_dict["class_name"] = class_name
        
        # 6. Start the prediction timer
        start_time = timer()
        
        # 7. Open image path
        img = Image.open(path)
        
        # 8. Transform the image, add batch dimension and put image on target device
        transformed_image = transform(img).unsqueeze(0).to(device) 
        
        # 9. Prepare model for inference by sending it to target device and turning on eval() mode
        model.to(device)
        model.eval()
        
        # 10. Get prediction probability, predicition label and prediction class
        with torch.inference_mode():
            pred_logit = model(transformed_image) # perform inference on target sample 
            pred_prob = torch.softmax(pred_logit, dim=1) # turn logits into prediction probabilities
            pred_label = torch.argmax(pred_prob, dim=1) # turn prediction probabilities into prediction label
            pred_class = class_names[pred_label.cpu()] # hardcode prediction class to be on CPU

            # 11. Make sure things in the dictionary are on CPU (required for inspecting predictions later on) 
            pred_dict["pred_prob"] = round(pred_prob.unsqueeze(0).max().cpu().item(), 4)
            pred_dict["pred_class"] = pred_class
            
            # 12. End the timer and calculate time per pred
            end_time = timer()
            pred_dict["time_for_pred"] = round(end_time-start_time, 4)

        # 13. Does the pred match the true label?
        pred_dict["correct"] = class_name == pred_class

        # 14. Add the dictionary to the list of preds
        pred_list.append(pred_dict)
    
    # 15. Return list of prediction dictionaries
    return pred_list

In [None]:
# Make predictions across test dataset with EffNetB2
effnetb2_test_pred_dicts = pred_and_store(paths=test_data_paths, model=effnetb2,
                                          transform=effnetb2_transform,
                                          class_names=class_names, device="cpu") # CPU devices

In [None]:
effnetb2_test_pred_dicts[:2]

In [None]:
effnetb2_test_pred_df = pd.DataFrame(effnetb2_test_pred_dicts)
effnetb2_test_pred_df.head()

In [None]:
# Check number of correct predictions
effnetb2_test_pred_df.correct.value_counts()

In [None]:
# Find the average time per prediction 
effnetb2_average_time_per_pred = round(effnetb2_test_pred_df.time_for_pred.mean(), 4)
print(f"EffNetB2 average time per prediction: {effnetb2_average_time_per_pred} seconds")

In [None]:
# Add EffNetB2 average prediction time to stats dictionary 
effnetb2_stats["time_per_pred_cpu"] = effnetb2_average_time_per_pred
effnetb2_stats

## Making and timing predictions with Vit

In [None]:
# Make predictions across test dataset with EffNetB2
vit_test_pred_dicts = pred_and_store(paths=test_data_paths, model=vit,
                                          transform=vit_transform,
                                          class_names=class_names, device="cpu") # CPU devices

In [None]:
vit_test_pred_dicts[:2]

In [None]:
vit_test_pred_df = pd.DataFrame(vit_test_pred_dicts)
vit_test_pred_df.head()

In [None]:
# Check number of correct predictions
vit_test_pred_df.correct.value_counts()

In [None]:
# Find the average time per prediction 
vit_average_time_per_pred = round(vit_test_pred_df.time_for_pred.mean(), 4)
print(f"EffNetB2 average time per prediction: {vit_average_time_per_pred} seconds")

In [None]:
# Add EffNetB2 average prediction time to stats dictionary 
vit_stats["time_per_pred_cpu"] = vit_average_time_per_pred
vit_stats

## Comparing model results, prediction times and size

In [None]:
# Turn stat dictionaries into DataFrame
df = pd.DataFrame([effnetb2_stats, vit_stats])

# Add column for model names
df["model"] = ["EffNetB2", "ViT"]

# Convert accuracy to percentages
df["test_acc"] = round(df["test_acc"] * 100, 2)

df

In [None]:
# Compare ViT to EffNetB2 across different characteristics
pd.DataFrame(data=(df.set_index("model").loc["ViT"] / df.set_index("model").loc["EffNetB2"]), # divide ViT statistics by EffNetB2 statistics
             columns=["ViT to EffNetB2 ratios"]).T

### Visualizing the speed vs. performance tradeoff 

In [None]:
# 1. Create a plot from model comparison DataFrame
fig, ax = plt.subplots(figsize=(12, 8))
scatter = ax.scatter(data=df, x="time_per_pred_cpu", y="test_acc", 
                     c=["blue", "orange"], s="model_size (MB)") 

# 2. Add titles, labels and customize fontsize for aesthetics
ax.set_title("FoodVision Mini Inference Speed vs Performance", fontsize=18)
ax.set_xlabel("Prediction time per image (seconds)", fontsize=14)
ax.set_ylabel("Test accuracy (%)", fontsize=14)
ax.tick_params(axis='both', labelsize=12)
ax.grid(True)

# 3. Annotate with model names
for index, row in df.iterrows():
    ax.annotate(text=row["model"], xy=(row["time_per_pred_cpu"]+0.0006, 
                                       row["test_acc"]+0.03), size=12)

# 4. Create a legend based on model sizes
handles, labels = scatter.legend_elements(prop="sizes", alpha=0.5)
model_size_legend = ax.legend(handles, labels, loc="lower right", 
                              title="Model size (MB)", fontsize=12)

plt.show()

In [2]:
import cv2

In [None]:
# 37