# CleanStreetNet — Edge Vision for Cleaner Cities

## Introduction

You have already trained many models; now it is time to shape one for life outside a notebook. This lab guides you through taking a pre-trained VGG16 based StreetClassifier and turning it into a version that is easier to store, faster on CPU, and ready for deployment on edge targets. Along the way you will practice saving and reloading model state, pruning parameters, applying quantization, and evaluating the trade offs among accuracy, latency, and size.

In this lab you will:

- Load the CleanStreetDataset, initialize a pre-trained StreetClassifier, and establish a baseline evaluation.

- Work with checkpoints by saving and restoring state_dict objects for both training and inference.

- Implement magnitude based pruning across convolutional and linear layers, with options for unstructured and structured strategies, and verify sparsity and accuracy.

- Apply dynamic quantization to linear layers for a fast CPU speedup and benchmark the effect.

- Fuse common layer patterns and prepare a quantization aware variant, fine tune briefly, then convert to an int8 model.

- Compare accuracy, inference time, and file size before and after compression to understand the impact of each step.

By the end, you will have a compact classifier that keeps performance close to the original while being far more efficient to run and ship.

---

<h4 style="color:green; font-weight:bold;">TIPS FOR SUCCESSFUL GRADING OF YOUR ASSIGNMENT:</h4>

- All cells are frozen except for the ones where you need to submit your solutions or when explicitly mentioned you can interact with it.

- You can add new cells to experiment but these will be omitted by the grader, so don't rely on newly created cells to host your solution code, use the provided places for this.

- Avoid using global variables unless you absolutely have to. The grader tests your code in an isolated environment without running all cells from the top. As a result, global variables may be unavailable when scoring your submission. Global variables that are meant to be used will be defined in UPPERCASE.

- - To submit your notebook for grading, first save it by clicking the 💾 icon on the top left of the page and then click on the <span style="background-color: blue; color: white; padding: 3px 5px; font-size: 16px; border-radius: 5px;">Submit assignment</span> button on the top right of the page.
---

## Imports

In [None]:
import copy
import torch
from torch.nn.utils import prune
import torch.nn as nn
import torch.ao.quantization as aoq

In [None]:
%load_ext autoreload
%autoreload 2

import os
import time

import matplotlib.pyplot as plt
import numpy as np
from tqdm.notebook import tqdm

from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from torchvision.models import vgg16, VGG16_Weights
import torch.nn.functional as F

import helper_utils
import unittests

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {device}")

## Part 1: Clean StreetClassifier data and model

**StreetClassifier** is a deep learning model designed to classify urban scene images into three categories:  
- **clean**  
- **litter**  
- **recycle** 

### Dataset

For this task, we will use the **CleanStreetDataset**, which is already divided into **training**, **development**, and **test** splits.  
In the code below, you’ll see how the datasets are loaded and how data preprocessing and augmentation transforms are applied to prepare the data for training and evaluation.

In [None]:
dataset_path = "/workspace/code/pytorch_mixed/c4Assignment/data/CleanStreetDataset"

# Define transforms
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(15),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

eval_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasetsfrom torch.nn.utils import prune
train_dataset = datasets.ImageFolder(root=os.path.join(dataset_path, 'train'), transform=train_transform)
dev_dataset = datasets.ImageFolder(root=os.path.join(dataset_path, 'dev'), transform=eval_transform)
test_dataset = datasets.ImageFolder(root=os.path.join(dataset_path, 'test'), transform=eval_transform)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=1)

print("Number of training samples:", len(train_dataset))
print("Number of validation samples:", len(dev_dataset))
print("Number of test samples:", len(test_dataset))
print("\nClass mapping:", train_dataset.class_to_idx)

You can visualize some examples with he following helper function.

In [None]:
helper_utils.display_some_images(test_dataset)


### Model Definition

The **StreetClassifier** model builds on **VGG16**, a well-known convolutional neural network pretrained on ImageNet.  
In this implementation the final classifier layer was replaces so the model can predict the three target classes of our dataset (*clean*, *litter*, and *recycle*).  

The class definition is shown below:

In [5]:
class StreetClassifier(nn.Module):
    """
    Image classifier built on a pretrained **VGG16** backbone for three urban-scene classes
    (default: *clean*, *litter*, *recycle*).

    The VGG16 backbone (pretrained on ImageNet) is loaded and **frozen** so only the final
    classifier layer is trainable. The last `nn.Linear` layer is replaced to match the
    requested number of output classes.

    Parameters
    ----------
    num_classes : int, optional (default=3)
        Number of target classes. Used to size the final fully connected layer.

    Attributes
    ----------
    backbone : nn.Module
        The wrapped VGG16 network with all original parameters frozen except the
        replaced final `nn.Linear` classifier layer.

    Notes
    -----
    - Expected input: a float tensor of shape **(N, 3, H, W)**, typically resized to
      **224×224**, normalized with **ImageNet** stats:
        - mean = [0.485, 0.456, 0.406]
        - std  = [0.229, 0.224, 0.225]
    - Output: unnormalized class **logits** of shape **(N, num_classes)**.
    - To fine-tune the backbone as well, set `requires_grad=True` on its parameters
      before training.
    """

    def __init__(self, num_classes=3):
        super(StreetClassifier, self).__init__()
        # Load the pretrained VGG16 model (trained on ImageNet)
        self.backbone = vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
            
        # Replace the classifier with a new one for our specific task
        in_features = self.backbone.classifier[-1].in_features
        self.backbone.classifier[-1] = nn.Linear(in_features=in_features, out_features=num_classes)

    def forward(self, x):
        return self.backbone(x)

### Training the Model

The function below handles the **training loop** for the StreetClassifier.  
It takes care of:  
- Running the model for a given number of epochs.  
- Calculating the training loss.  
- Evaluating accuracy on the validation set after each epoch.  
- Saving the best model checkpoint based on validation accuracy.  

For this exercise, you don’t need to train the model from scratch — a **pretrained checkpoint** has already been provided to save time.  
Still, you can run the function for one epoch to see how the training behaves, or even train the model fully from scratch if you’d like to experiment.  


In [None]:
# Initialize model, optimizer, and train
model = StreetClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Train the model
trained_model, best_accuracy = helper_utils.train_model(
    model=model,
    train_loader=train_loader, 
    dev_loader=dev_loader,
    num_epochs=1,
    optimizer=optimizer,
    device=device
)

### Using a Pretrained Checkpoint

For the exercises, we will work with a **pretrained StreetClassifier model**.  
The training checkpoint, including both the model weights and optimizer state, is stored in the file **`vgg_16.pt`**.  

We will load this checkpoint and then use the `compute_accuracy` function to evaluate the model’s performance on the test set.


In [None]:
# Use the base model
model = StreetClassifier().to(device)

# Load the final model checkpoint
checkpoint = torch.load("vgg_16.pt", map_location="cpu")
model.load_state_dict(checkpoint['model_state_dict'])

# Compute accuracy of the loaded model
base_accuracy = helper_utils.compute_accuracy(model, test_loader, device)
print(f"Model accuracy: {base_accuracy:.4f}")

## Pruning Utilities

In this section, we will explore **model pruning**, a technique to reduce the size and complexity of neural networks by removing less important weights.  
To simplify the process, we provide a couple of helper functions:  

- `_iter_prunable_modules(model)` – iterates over all `Conv2d` and `Linear` layers, which are the layers we will target for pruning.  
- `finalize_pruning(model)` – makes pruning permanent by removing reparametrization and storing the pruned weights directly.  

These utilities will make it easier to apply and finalize pruning across the model.


In [10]:
def _iter_prunable_modules(model):
    """
    Iterate over modules that are eligible for pruning.

    Yields
    ------
    Tuple[str, nn.Module]
        Pairs of (fully-qualified module name, module) for layers that are
        prunable in this assignment: `nn.Conv2d` and `nn.Linear`.

    Notes
    -----
    - The qualified name comes from `model.named_modules()` and reflects the
      path within the module hierarchy (e.g., "block.0", "classifier.fc").
    - Use this generator to systematically apply pruning across the model.
    """
    for name, m in model.named_modules():
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            yield name, m

def finalize_pruning(model):
    """
    Make pruning permanent by removing reparametrization wrappers.

    This converts any pruned parameter from the (`weight_orig`, `weight_mask`)
    reparametrization back to a regular `weight` `nn.Parameter` where the
    zeros are **materialized** in the stored tensor.
    """
    for _, module in _iter_prunable_modules(model):
        # Only remove if the parameter has been pruned
        if hasattr(module, "weight_orig") and hasattr(module, "weight_mask"):
            prune.remove(module, "weight")
    return model

### Exercise 1: Implementing Model Pruning

Now you will implement the function `prune_model`, which applies pruning to all `Conv2d` and `Linear` layers in the **StreetClassifier** model.  
You can choose between two pruning modes:  

- **`l1_unstructured`** – removes a fraction of the smallest-magnitude weights.  
- **`ln_structured`** – removes entire output channels (structured pruning), which can lead to faster inference on some hardware.  

#### Why Prune a Model?
- **Smaller models** → reduced memory footprint and easier deployment on resource-constrained devices.  
- **Faster inference** → especially with structured pruning, which eliminates entire channels.  
- **Regularization effect** → pruning can sometimes improve generalization by removing redundant connections.  

### prune_model -- details

Implement ```prune_model(model, amount=0.3, mode="l1_unstructured")```, a function that:

Applies magnitude-based pruning to the weights of every nn.Conv2d and nn.Linear layer in a given model using PyTorch’s pruning reparametrization API. The pruning is applied in-place (adds weight_orig and weight_mask) and does not change any tensor shapes. To permanently bake zeros into the stored weights (remove reparametrization), a separate helper finalize_pruning(model) is provided for you to call afterward.

The function proceeds through these stages:

Validate Inputs

1. Ensure amount is a float in [0, 1].

2. Ensure mode is one of {"l1_unstructured", "ln_structured"}.

3. Find Prunable Modules
    - Iterate over the model and select only ```nn.Conv2d``` and ```nn.Linear``` layers (a helper _iter_prunable_modules(model) is available).
    - Skip any module that does not have a weight attribute.
4. Apply Pruning (In-Place Reparametrization)
     - For unstructured pruning (default): Use ```prune.l1_unstructured(module, name="weight", amount=amount)``` to zero the smallest-magnitude individual weights within each tensor.
     - For structured pruning: Use ```prune.ln_structured(module, name="weight", amount=amount, n=2, dim=0)``` to zero out entire output channels (L2-norm across channel filters), leaving tensor shapes unchanged but producing channel-wise sparsity masks.
5. Return the Same Model Instance
Return the input model with pruning reparametrization attached (i.e., weight becomes a computed tensor from weight_orig * weight_mask).

Note: Parameters and buffers now include weight_orig and weight_mask for pruned layers.

Downstream code can call ```finalize_pruning(model)``` to remove the reparametrization objects and write the masked values into the raw weight tensors.

Even after finalization, shapes remain the same; channels are zeroed, not physically removed.

The output of this function—a model with pruning masks attached—will later be used by grading code that (a) checks the presence of reparametrized weights, (b) inspects sparsity levels, and (c) optionally calls finalize_pruning(model) before export or quantization.

<details> <summary><b><font color="green">Additional Code Hints (Click to expand if you are stuck)</font></b></summary>
1) Iterate only over prunable layers

You’ll be given a helper like:

```
    for _, module in _iter_prunable_modules(model):
```


This should yield nn.Conv2d and nn.Linear modules. Still, defensively skip modules that don’t have module.weight.

2) Two pruning modes you must support

Unstructured (default):
```
    prune.l1_unstructured(module, name="weight", amount=amount)
```

Zeros individual weights with the smallest L1 magnitudes.

Structured:
```
    prune.ln_structured(module, name="weight", amount=amount, n=2, dim=0)
```

Zeros entire output channels based on L2 norms. Shapes do not change here; channels are masked.

3) Validate arguments early

Raise:
```
if not (0.0 <= amount <= 1.0):
    raise ValueError(f"amount must be in [0,1], got {amount}")
```

and for unsupported modes:
```
    raise ValueError("mode must be 'l1_unstructured' or 'ln_structured'")
```

4) Remember: pruning is in-place & reparametrized

After pruning a layer, you’ll see attributes:

    - weight_orig (the original parameter)
    - weight_mask (a buffer of 0/1s)
    - weight becomes a computed tensor (not a leaf parameter).

</details>

In [11]:
# GRADED FUNCTION: prune_model

def prune_model(model, amount=0.3, mode="l1_unstructured"):
    """
    Apply pruning to **weights** of all `Conv2d` and `Linear` layers.

    This uses PyTorch's pruning reparametrization (adds `weight_orig` and
    `weight_mask`) without changing the tensor shape. To permanently embed
    zeros into the stored weights, call `finalize_pruning(model)` afterward.

    Parameters
    ----------
    model : nn.Module
        Model to prune. Pruning is applied **in-place** via
        `torch.nn.utils.prune`.
    amount : float, optional (default=0.3)
        Fraction in [0, 1] to prune.
        - For **unstructured** pruning: fraction of smallest-magnitude weights
          within each tensor.
        - For **structured (ln)** pruning: fraction of **output channels**
          (dimension 0) to remove using L2-norm (n=2).
    mode : {"l1_unstructured", "ln_structured"}, optional
        Pruning strategy:
        - `"l1_unstructured"` → `prune.l1_unstructured(..., name="weight", amount=amount)`
        - `"ln_structured"`   → `prune.ln_structured(..., name="weight", amount=amount, n=2, dim=0)`

    Returns
    -------
    nn.Module
        The same model instance with pruning **reparametrization** applied
        (not yet made permanent).
    """

    ### START CODE HERE ###

    if not (0.0 <= amount <= 1.0): # @REPLACE if True: # Check if amount is in [0,1]
        raise ValueError(f"amount must be in [0,1], got {amount}") # @KEEP

    for _, module in _iter_prunable_modules(model): # @KEEP
        if not hasattr(module, "weight"): # @REPLACE if True: # Check if module has "weight" attribute
            continue # @KEEP

        if mode == "l1_unstructured": # @REPLACE if True: # Check if mode is "l1_unstructured"
            prune.l1_unstructured(module, name="weight", amount=amount) # @REPLACE prune the module using l1_unstructured
        elif mode == "ln_structured": # @REPLACE if True: # Check if mode is "ln_structured"
            prune.ln_structured(module, name="weight", amount=amount, n=2, dim=0)  # @REPLACE prune the module using ln_structured
        else: # @KEEP
            raise ValueError("mode must be 'l1_unstructured' or 'ln_structured'")
    ### END CODE HERE ###
    
    return model
    

To verify your code, run the following cells!

In [None]:
# Verify your code here

base = helper_utils.sparsity_report(model)
print("[BASE] global_sparsity:", base["global_sparsity"])
print("[BASE] per-layer:", base["layers"])

# We prune 50% of the model
prune_model(model, amount=0.5, mode="l1_unstructured")

model = model.to("cpu")
after = helper_utils.sparsity_report(model)
print("[AFTER PRUNE] global_sparsity:", after["global_sparsity"])
print("[AFTER PRUNE] per-layer:", after["layers"])

after_acc = helper_utils.compute_accuracy(model, test_loader, device="cpu")
print("[AFTER PRUNE] accuracy:", after_acc)

#### Expected Output:
```
[BASE] global_sparsity: 0.0
[BASE] per-layer: {'backbone.features.0.weight': 0.0, 'backbone.features.2.weight': 0.0, 'backbone.features.5.weight': 0.0, 'backbone.features.7.weight': 0.0, 'backbone.features.10.weight': 0.0, 'backbone.features.12.weight': 0.0, 'backbone.features.14.weight': 0.0, 'backbone.features.17.weight': 0.0, 'backbone.features.19.weight': 0.0, 'backbone.features.21.weight': 0.0, 'backbone.features.24.weight': 0.0, 'backbone.features.26.weight': 0.0, 'backbone.features.28.weight': 0.0, 'backbone.classifier.0.weight': 0.0, 'backbone.classifier.3.weight': 0.0, 'backbone.classifier.6.weight': 0.0}
[AFTER PRUNE] global_sparsity: 0.5
[AFTER PRUNE] per-layer: {'backbone.features.0.weight': 0.5, 'backbone.features.2.weight': 0.5, 'backbone.features.5.weight': 0.5, 'backbone.features.7.weight': 0.5, 'backbone.features.10.weight': 0.5, 'backbone.features.12.weight': 0.5, 'backbone.features.14.weight': 0.5, 'backbone.features.17.weight': 0.5, 'backbone.features.19.weight': 0.5, 'backbone.features.21.weight': 0.5, 'backbone.features.24.weight': 0.5, 'backbone.features.26.weight': 0.5, 'backbone.features.28.weight': 0.5, 'backbone.classifier.0.weight': 0.5, 'backbone.classifier.3.weight': 0.5, 'backbone.classifier.6.weight': 0.5}
```

In [None]:
# Test 1: Prune Model

unittests.exercise1(prune_model)

### Quantization in Deep Learning

Quantization is a technique that reduces the **precision** of model weights and activations, typically converting them from 32-bit floating point (FP32) to lower-precision formats like 8-bit integers (INT8).  
This can drastically reduce the size of the model and improve inference speed, especially on CPUs and edge devices, while keeping accuracy close to that of the original FP32 model.  

---

### Benefits of Quantization

- **Smaller model size** → INT8 weights require 4× less storage than FP32.  
- **Faster inference** → Integer operations are usually faster than floating-point on CPUs.  
- **Lower memory bandwidth** → Reduced precision means less data transfer, improving latency.  
- **Deployment-friendly** → Ideal for running models on devices with limited resources (mobile, IoT, embedded).  

---

### Dynamic Quantization

There are different approaches to quantization:  
- **Static Quantization**: requires calibration with representative data before inference.  
- **Quantization Aware Training (QAT)**: simulates quantization effects during training for maximum accuracy retention.  
- **Dynamic Quantization**: the simplest approach — it keeps weights in INT8 but performs activations in FP32, dynamically quantizing them at runtime.  

**Dynamic quantization is particularly effective for models dominated by `nn.Linear` layers (e.g., Transformers, LSTMs, fully-connected classifiers).**  
It requires **no retraining or calibration** and is CPU-only, making it the fastest way to get the benefits of quantization.  

---

### Exercise 2: Implementing Dynamic Quantization

In this exercise, you will complete the function `quantize_dynamic_linear`, which should:  

- Make a **deep copy** of the original model (do not modify the original).  
- Apply **dynamic quantization** to **only** the `nn.Linear` layers, converting them to INT8.  
- Return the quantized model in `eval()` mode.  
- Ensure it works in CPU-only environments.  

This will give you hands-on practice with PyTorch’s `torch.quantization.quantize_dynamic` utility and help you understand how quantization can be applied selectively to certain model components.

### Details -- `quantize_dynamic_linear`

Implement `quantize_dynamic_linear`, a function that: returns a new model in eval() mode where all nn.Linear layers are dynamically quantized to INT8. Dynamic quantization stores weights as INT8 and quantizes activations on-the-fly at runtime, giving CPU speed/memory wins without calibration.

What your function must do

1. Clone the model (don’t mutate the original) and switch to eval mode
    - copy.deepcopy(model) so the input model is untouched.
    - Call .eval() on the copy.

2. (CPU) Select a sensible quantization engine if available
    - On x86 CPUs, fbgemm is common. Set it if present:
    - torch.backends.quantized.engine = "fbgemm" inside a try/except.

3. Apply dynamic INT8 quantization to nn.Linear only
    - Use torch.quantization.quantize_dynamic(...) on the FP32 copy.
    - Pass the module set {nn.Linear} so only linear layers are quantized.
    - Use dtype=torch.qint8.

4. Return the quantized model in eval() mode

Ensure you return .eval() (no training-time behavior).

The function should work CPU-only (no CUDA, no calibration steps).

<details> <summary><b><font color="green">Additional Code Hints (Click to expand if you are stuck)</font></b></summary>

1. Work on a copy, in eval mode
    ```
        model_fp32 = copy.deepcopy(model).eval()
    ```
2. Pick a CPU quantization engine if available (harmless if not)
    ```
        if hasattr(torch.backends, "quantized") and hasattr(torch.backends.quantized, "engine"):
            try:
                torch.backends.quantized.engine = "fbgemm"
            except Exception:
                pass
    ```

3. Quantize ONLY Linear layers to INT8 dynamically
    ```
        qmodel = torch.quantization.quantize_dynamic(
            model_fp32,
            {nn.Linear},          # target modules
            dtype=torch.qint8,    # INT8 weights
        ).eval()                   # ensure eval mode
    ```

4. Return the quantized model
    ```
        return qmodel
    ```
</details>


In [None]:
# GRADED FUNCTION: quantize_dynamic_linear

def quantize_dynamic_linear(model):
    """
    Return a **new** model where all nn.Linear layers are dynamically quantized to INT8.

    Requirements checked by the autograder
    --------------------------------------
    - Do NOT mutate the original model; use a deepcopy.
    - Quantize ONLY Linear modules (e.g., {nn.Linear}).
    - Use dynamic quantization with INT8 dtype.
    - Return the quantized model in eval() mode.
    - Should run on CPU-only environments (no CUDA, no calibration).

    Returns
    -------
    nn.Module
        An eval-mode copy of `model` with Linear layers using INT8 dynamic quantization.
    """
    ### START CODE HERE ###
    model_fp32 = copy.deepcopy(model).eval() # @REPLACE model_fp32 = None # Create a deep copy of the model and set it to eval mode

    # Ensure a sensible engine on CPU (x86). If unavailable, this line is harmless.
    if hasattr(torch.backends, "quantized") and hasattr(torch.backends.quantized, "engine"): # @REPLACE if True: # Check if torch.backends has quantized and torch.backends.quantized has engine
        try:
            torch.backends.quantized.engine = "fbgemm" # @KEEP
        except Exception: # @KEEP
            pass  # keep whatever the runtime supports # @KEEP
    # Quantize only Linear layers to INT8
    quantized = torch.quantization.quantize_dynamic( # @REPLACE quantized = None ( # Use quantize_dynamic from quantization in torch to quantize the model_fp32 to INT8
        model_fp32, # @REPLACE None, # The model to quantize
        {nn.Linear}, # @REPLACE None, # The layers to quantize
        dtype=torch.qint8, # @REPLACE None, # The dtype to quantize to qint8
    )
    
    quantized.eval() # @REPLACE None, # Set the quantized model to eval mode

    return quantized # @REPLACE return None, # Return the quantized model

    ### END CODE HERE ###

: 

In [None]:
# Verify your code here

# Use the base model to start fresh
model = StreetClassifier().to(device)

# Load the final model checkpoint
checkpoint = torch.load("vgg_16.pt", map_location="cpu")
model.load_state_dict(checkpoint['model_state_dict'])
torch.manual_seed(5)

# Load the base model model
model.to("cpu")
model.eval()

# Quantize the model
qmodel = quantize_dynamic_linear(model)

# Evaluate the quantized model
qacc = helper_utils.compute_accuracy(qmodel, test_loader, device="cpu")
print(f"\nAccuracy on test dataset after quantization: {qacc:.2f}%")

# Benchmark the models  
t_fp32 = helper_utils.bench(model)
t_int8 = helper_utils.bench(qmodel)
print("\n[TIMING] avg forward per batch (CPU)")
print(f"  - FP32 : {t_fp32*1e3:.2f} ms")
print(f"  - INT8 : {t_int8*1e3:.2f} ms (↓ is better)")
print(f"  - Improvement: {((t_fp32 - t_int8)/t_fp32)*100:.1f}%")

#### Expected Output:
```
Accuracy on test dataset after quantization: 0.97%

[TIMING] avg forward per batch (CPU)
  - FP32 : 200.54 ms
  - INT8 : 185.24 ms (↓ is better)
  - Improvement: 7.6%
```

In [None]:
# Verify your code here

unittests.exercise2(quantize_dynamic_linear)

## From Fusion to Quantization-Aware Training (QAT)

In the previous sections you reduced model size and latency with pruning and (optionally) dynamic quantization.  
Now you’ll make the model **quantization-friendly** and prepare it for **INT8 training** by:

1) **Fusing ops** that commonly appear together (e.g., `Conv + BN + ReLU`) into single fused modules.  
2) **Preparing for QAT**, which inserts observers and fake-quantization modules so the model “feels” INT8 during training and learns robust, quantization-tolerant weights.

### What is QAT and why use it?
**Quantization-Aware Training (QAT)** simulates INT8 behavior during training (via fake-quant) so that, after conversion, the **final INT8 model preserves more accuracy** than post-training quantization alone—especially on CNNs.

**Benefits:**
- **Higher accuracy under INT8** vs. dynamic/static PTQ on many convolutional models.  
- **Production-ready path**: train with fake-quant → convert to real INT8 → deploy.  
- **Works with standard PyTorch tooling** (eager mode, observers, qconfigs).

---

## Auxiliary Function (used by Exercise 3)

### `_try_fuse(module, names)`
This helper attempts PyTorch eager-mode fusion on a sequence of child layers inside a `nn.Sequential`.  
It **gracefully ignores unsupported patterns** (some backbones or PyTorch versions won’t fuse every combo), so your fusion pass can be “best-effort” without breaking.

**You will use it to** fuse common patterns such as:
- `Conv2d + BatchNorm2d + ReLU`  
- `Conv2d + BatchNorm2d`  
- `Conv2d + ReLU`  
- `Linear + ReLU`

Fusing reduces op count and numerical overhead at inference and **sets up cleaner patterns for QAT**.

---

In [29]:
def _try_fuse(seq: nn.Sequential, names):
    """
    Best-effort wrapper around torch.ao.quantization.fuse_modules.
    Replaces fused positions with intrinsic fused ops / Identity in-place.
    """
    try:
        fuse_modules = getattr(torch.ao.quantization, "fuse_modules", None)
        if fuse_modules is None:  # PyTorch < 1.13 fallback
            fuse_modules = getattr(torch.quantization, "fuse_modules")
        # Inplace fusion inside the *same* Sequential
        fuse_modules(seq, names, inplace=True)
    except Exception:
        # Best-effort: ignore unsupported patterns / backends
        pass


## Exercise 3 — Implement `fuse_model_inplace(model)`

You’ll write a recursive, **best-effort** fusion routine that:
- Walks the model tree (`named_children`) and recurses into submodules.
- When it finds a `nn.Sequential`, scans adjacent layers and tries to fuse the patterns listed above using `_try_fuse`.
- Leaves unsupported cases untouched (no errors).

**What to expect after fusion**
- In `print(model)`, some sequences will be replaced by intrinsic fused modules (e.g., `ConvBnReLU2d`, `ConvReLU2d`, `LinearReLU`).
- Forward outputs in `eval()` should remain (nearly) identical, showing fusion preserved behavior.
- This step improves **CPU inference efficiency** and provides **better numerics for QAT**.

---

In [30]:
# GRADED FUNCTION: fuse_model_inplace

def fuse_model_inplace(model: nn.Module) -> nn.Module:
    """
    Recursively apply best-effort eager fusion to:
      Conv+BN+ReLU, Conv+BN, Conv+ReLU, Linear+ReLU
    Only fuses *adjacent* modules inside nn.Sequential blocks.
    Modifies `model` in-place and returns the *same instance*.
    """
    ### START CODE HERE ###
    for _, child in model.named_children(): # @REPLACE for _, child in [ (None, None)]: # Iterate over the named children of the model
        # Recurse first
        fuse_model_inplace(child) # @REPLACE fuse_model_inplace(None) # Recursively apply best-effort eager fusion to the child

        # Then scan this child if it's a Sequential
        if isinstance(child, nn.Sequential) and len(child) >= 2: # @REPLACE if True: # Check if the child is a Sequential and has at least 2 layers
            # BN folding prefers eval; don’t mutate outer state permanently
            was_training = child.training # @REPLACE was_training = None # Get the training state of the child
            child.eval() # @REPLACE child.eval() # Set the child to eval mode
            i = 0 # @KEEP   
            while i < len(child) - 1: # Iterate over the child layers - 1 # @KEEP
                a, b = child[i], child[i + 1] # @REPLACE a, b = None, None # Get the two adjacent layers at i and i + 1
                if i + 2 < len(child): # @REPLACE if True: # Check if the third layer exists
                    c = child[i + 2] # @REPLACE c = None # Get the third layer
                else: #  @KEEP
                    c = None # @REPLACE c = None # set the third layer to None

                # Conv + BN + ReLU
                if isinstance(a, nn.Conv2d) and isinstance(b, nn.BatchNorm2d) and isinstance(c, nn.ReLU): # @REPLACE if True: # Check if the first layer is a Conv2d, the second layer is a BatchNorm2d, and the third layer is a ReLU
                    _try_fuse(child, [str(i), str(i+1), str(i+2)]) # @REPLACE _try_fuse(None, ["", "", ""]) # Try to fuse the three layers
                    i += 3 # @KEEP
                    continue # @KEEP
                # Conv + BN
                if isinstance(a, nn.Conv2d) and isinstance(b, nn.BatchNorm2d): # @REPLACE if True: # Check if the first layer is a Conv2d and the second layer is a BatchNorm2d
                    _try_fuse(child, [str(i), str(i+1)]) # @REPLACE _try_fuse(None, ["", ""]) # Try to fuse the two layers
                    i += 2 # @KEEP
                    continue # @KEEP
                # Conv + ReLU
                if isinstance(a, nn.Conv2d) and isinstance(b, nn.ReLU): # @REPLACE if True: # Check if the first layer is a Conv2d and the second layer is a ReLU
                    _try_fuse(child, [str(i), str(i+1)]) # @REPLACE _try_fuse(None, ["", ""]) # Try to fuse the two layers
                    i += 2 # @KEEP
                    continue # @KEEP
                # Linear + ReLU
                if isinstance(a, nn.Linear) and isinstance(b, nn.ReLU): # @REPLACE if True: # Check if the first layer is a Linear and the second layer is a ReLU
                    _try_fuse(child, [str(i), str(i+1)]) # @REPLACE _try_fuse(None, ["", ""]) # Try to fuse the two layers
                    i += 2 # @KEEP
                    continue # @KEEP
                
                # Conv + ReLU
                if isinstance(a, nn.Conv2d) and isinstance(b, nn.ReLU): # @REPLACE if True: # Check if the first layer is a Conv2d and the second layer is a ReLU
                    _try_fuse(child, [str(i), str(i+1)]) # @REPLACE _try_fuse(None, ["", ""]) # Try to fuse the two layers
                    i += 2 # @KEEP
                    continue # @KEEP
                # Linear + ReLU
                if isinstance(a, nn.Linear) and isinstance(b, nn.ReLU): # @REPLACE if True: # Check if the first layer is a Linear and the second layer is a ReLU
                    _try_fuse(child, [str(i), str(i+1)]) # @REPLACE _try_fuse(None, ["", ""]) # Try to fuse the two layers
                    i += 2 # @KEEP
                    continue # @KEEP

                i += 1 # @KEEP

            if was_training: # @REPLACE if True: # Check if the child was training
                child.train() # @REPLACE child.train() # Set the child to train mode

    # IMPORTANT: return the same object (tests check identity)
    return model # @REPLACE return None, # Return the model

    ### END CODE HERE ###

In [None]:
# Verify your code

# Create a toy model to test your code
torch.manual_seed(0)
device = torch.device("cpu")
toy = helper_utils.ToyNet().eval().to(device)

# Keep a copy for numerical comparison
toy_copy = helper_utils.ToyNet().eval().to(device)
toy_copy.load_state_dict(toy.state_dict())

# Show BEFORE
helper_utils.list_children(toy, "Before fusion")

# Forward pass BEFORE
x = torch.randn(2, 3, 32, 32, device=device)
with torch.no_grad():
    y_before = toy(x)

# Apply your fusion function (assumes fuse_model_inplace is defined + _try_fuse available)
ret_model = fuse_model_inplace(toy).eval()
# Show AFTER
helper_utils.list_children(toy, "After fusion")

# Forward pass AFTER
with torch.no_grad():
    y_after = toy(x)

# Report numerical closeness and fused-layer counts
max_abs_diff = (y_before - y_after).abs().max().item()
fused_counts = helper_utils.count_fused_layers(toy)


print("\n== Verification ==")
print(f"Max |y_before - y_after|: {max_abs_diff:.6g}  (expect ~0)")
print("Fused intrinsic layers found:", fused_counts if fused_counts else "{} (none)")

# sanity check on output shape
print("Output shapes -> before:", tuple(y_before.shape), ", after:", tuple(y_after.shape))

### Expected Output
```
== Before fusion ==

[stem]
  0: Conv2d
  1: BatchNorm2d
  2: ReLU

[block]
  0: Conv2d
  1: ReLU
  2: Conv2d
  3: BatchNorm2d
  4: ReLU

[head]
  0: AdaptiveAvgPool2d
  1: Flatten
  2: Linear
  3: ReLU
  4: Linear

== After fusion ==

[stem]
  0: ConvReLU2d
  1: Identity
  2: Identity

[block]
  0: ConvReLU2d
  1: Identity
  2: ConvReLU2d
  3: Identity
  4: Identity

[head]
  0: AdaptiveAvgPool2d
  1: Flatten
  2: LinearReLU
  3: Identity
  4: Linear

== Verification ==
Max |y_before - y_after|: 1.11759e-08  (expect ~0)
Fused intrinsic layers found: {'ConvReLU2d': 3, 'LinearReLU': 1}
Output shapes -> before: (2, 3) , after: (2, 3)
```

In [None]:
# Test your code!

unittests.exercise3(fuse_model_inplace)

## More Auxiliary Functions (used by Exercise 4)
To complete exercise 4 you will need the following functions:

#### `QATWrapper`
Wraps your FP32 model with `QuantStub`/`DeQuantStub`. During QAT/inference prep:
- Inputs are quantized → model body runs with fake-quant/observers → outputs dequantized.
- This scaffolding allows eager-mode QAT to be applied cleanly.

#### `convert_qat(model)`
After you finish QAT fine-tuning:
- Switch the model to `eval()`
- Use the configured backend (e.g., **`fbgemm`** on x86, **`qnnpack`** on ARM)
- **Convert** the QAT model to a **real INT8** model for deployment.

---

In [46]:
def convert_qat(model):
    """
    Convert a trained QAT model to a quantized INT8 model for inference.
    Call `model.eval()` before measuring latency/accuracy.
    """
    model = copy.deepcopy(model)
    model.eval()
    try:
        torch.backends.quantized.engine = getattr(torch.backends.quantized, "engine", "fbgemm")
    except Exception:
        pass
    aoq.convert(model, inplace=True)
    return model


# Wrapper for QAT
class QATWrapper(nn.Module):
    def __init__(self, m):
        super().__init__()
        self.quant = aoq.QuantStub()
        self.m = m
        self.dequant = aoq.DeQuantStub()
    def forward(self, x):
        x = self.quant(x)
        x = self.m(x)
        x = self.dequant(x)
        return x

## Exercise 4 — Implement `prepare_qat(model, backend="fbgemm")`

Create and return a **QAT-ready copy** of the input model. Your function should:

1. **Do not mutate the original**: `deepcopy` the model and work on the copy.  
2. **Set backend**: choose the quantized engine (`"fbgemm"` for x86, `"qnnpack"` for ARM).  
3. **Fuse first**: call your fusion pass (`fuse_model_inplace`) on the copy.  
4. **Attach a QAT qconfig**: use `get_default_qat_qconfig(backend)` (fall back to a sensible default if needed).  
5. **Prepare for QAT**: run eager-mode `prepare_qat` to insert observers/fake-quant modules.  
6. **Return in `train()` mode** so the learner can fine-tune with QAT.

**Outcome:**  
You’ll have a training-ready module that, after a brief fine-tuning, can be converted with `convert_qat(...)` into a compact, fast **INT8 inference model** with strong accuracy retention.

#### Details — `prepare_qat`

Implement `prepare_qat(model, backend="fbgemm")`, a function that: returns a QAT-ready copy of an FP32 model by selecting an appropriate quantized backend, fusing eligible blocks (e.g., `Conv+BN(+ReLU)`), attaching a default QAT qconfig, and running eager-mode prepare_qat to insert observers and fake-quant modules.

***Note:*** The original model must remain unmodified; the returned module must be in train() mode.

The function has the following stages:

1. Clone & Switch to Train Mode (No Mutation)
    - Create a deep copy of the input model so the original remains intact.
    - Put the copy in training mode: .train() (QAT requires training mode).
2. Select Quantized Backend
    - Set torch.backends.quantized.engine to the requested backend if available.
    - Defaults: "fbgemm" (x86) or "qnnpack" (ARM).
    - If unsupported, keep the runtime’s current engine (best-effort).
3. Fuse Eligible Modules (Best-Effort)
    - Call the created helper fuse_model_inplace(qat) to fuse common patterns like (Conv, BN, ReLU), (Conv, BN), (Conv, ReLU), (Linear, ReLU).
    
4. Attach a Default QAT qconfig
    - Obtain a backend-appropriate QAT config (e.g., aoq.get_default_qat_qconfig(backend)).
    - If that fails, fall back to aoq.get_default_qat_qconfig("fbgemm").
    - Assign it to qat.qconfig.

5. Insert Observers & Fake-Quant (Eager QAT Prepare)
    - Call aoq.prepare_qat(qat, inplace=True) to add observers and fake-quant modules throughout the network.
    - These modules simulate quantization effects during training.
    - Ensure the returned module is in train() mode and ready for QAT fine-tuning.

Return the new (deep-copied) QAT-ready model.

The output of this function—a QAT-ready model—will then be fine-tuned for a few epochs. During training, the inserted observers and fake-quantization modules learn appropriate scales/zero-points, enabling a high-accuracy post-training convert to INT8 later.

<details> <summary><b><font color="green">Additional Code Hints (Click to expand if you are stuck)</font></b></summary>

1. ***Backend selection (best-effort)***
    Prefer:
        - "fbgemm" on x86/AVX2+ CPUs
        - "qnnpack" on ARM
    Safe pattern:
    ```
    if hasattr(torch.backends, "quantized") and hasattr(torch.backends.quantized, "engine"):
        try:
            torch.backends.quantized.engine = backend  # e.g., "fbgemm" or "qnnpack"
        except Exception:
            pass  # leave current engine if unsupported
    ```

2. ***Fusion helper***
    You are given fuse_model_inplace(qat). Call it after copying and before preparing QAT:
    ```
    fuse_model_inplace(qat)  # best-effort; no-op if pattern not found
    ```
    Fusing improves numerical stability and performance for quantization workflows.

3. ***QAT configuration***
    Pick a default QAT config tied to the backend; fall back to "fbgemm" if needed:
    ```
    try:
        qconfig = aoq.get_default_qat_qconfig(backend)
    except Exception:
        qconfig = aoq.get_default_qat_qconfig("fbgemm")
    qat.qconfig = qconfig
    ```

4. ***Insert observers & fake-quant***
    Use eager-mode QAT preparation:
    ```
    aoq.prepare_qat(qat, inplace=True)
    qat.train()  # ensure training mode for QAT fine-tuning
    ```
</details>


In [47]:
# GRADED FUNCTION: prepare_qat

def prepare_qat(model, backend="fbgemm"):
    """
    Return a **QAT-ready copy** of `model`:
      - Sets quantized backend (default: 'fbgemm')
      - Applies best-effort fusion (Conv+BN(+Act))
      - Attaches a default QAT qconfig
      - Runs eager-mode prepare_qat to insert observers/fake-quant
      - Returns the prepared module in **train()** mode

    The original `model` **must not** be mutated.

    Parameters
    ----------
    model : nn.Module
        FP32 model to prepare for QAT.
    backend : str
        Quantized engine (use 'fbgemm' on x86; 'qnnpack' on ARM).

    Returns
    -------
    nn.Module
        A new, QAT-ready model (with observers) in training mode.
    """
    ### START CODE HERE ###
    # 1) Work on a copy; do not mutate the original
    qat = copy.deepcopy(model).train() # @REPLACE qat = None, # Create a deep copy of the model and set it to train mode

    # 2) Configure quantized backend
    has_quantized = hasattr(torch.backends, "quantized") # @REPLACE has_quantized = None, # Check if torch.backends has quantized
    has_engine = hasattr(torch.backends.quantized, "engine") # @REPLACE has_engine = None, # Check if torch.backends.quantized has engine
    if has_quantized and has_engine: # @REPLACE if True: # Check if torch.backends has quantized and torch.backends.quantized has engine
        try: # @KEEP
            torch.backends.quantized.engine = backend # @REPLACE torch.backends.quantized.engine = None, # Set the quantized engine to the backend
        except Exception:  # 
            pass  # keep current engine if backend is unsupported

    # 3) Fuse eligible modules (best-effort; safe no-op if unsupported)
    fuse_model_inplace(qat) # @REPLACE fuse_model_inplace(None) # Fuse the eligible modules (qat)

    # 4) Attach default QAT qconfig
    try: # @KEEP
        qconfig = aoq.get_default_qat_qconfig(backend) # @REPLACE qconfig = None, # Get the default QAT qconfig from aoq for the backend
    except Exception: # @KEEP
        # Fallback to a gener ic default if backend-specific isn't available
        qconfig = aoq.get_default_qat_qconfig("fbgemm") # @REPLACE qconfig = None, # Get the default QAT qconfig from aoq for "fbgemm"
    qat.qconfig = qconfig # @REPLACE qat.qconfig = None, # Set the qconfig to the qconfig

    # 5) Prepare for QAT (insert observers/fake-quant)
    aoq.prepare_qat( # @REPLACE aoq.prepare_qat( # Prepare the model for QAT
        qat, # @REPLACE None,, # The model to prepare for QAT
        inplace=True # @REPLACE inplace=None, # Set the correct value for inplace
        ) # @KEEP

    # Ensure model is in training mode for QAT fine-tuning
    qat.train() # @REPLACE None # Set the model to train mode
    ### END CODE HERE ###
    return qat


In [None]:
# Verify your code here

# Use the base model to start fresh
model = StreetClassifier().to(device)
checkpoint = torch.load("vgg_16.pt")
model.load_state_dict(checkpoint['model_state_dict'])

# wrap the base model in QATWrapper
wrapped_model = QATWrapper(model)

print("Base Model loaded and wrapped")

# Prepare the QAT model
qat_model = prepare_qat(wrapped_model, backend="fbgemm")
print("Model prepared for qat")

# Fine-tune with fake-quant in the loop (can be on GPU)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD((p for p in qat_model.parameters() if p.requires_grad),
                            lr=1e-4, momentum=0.9, weight_decay=1e-4)

qat_model.to(device)

helper_utils.train_model(
    qat_model,
    train_loader,
    dev_loader,
    2,
    optimizer,
    device,
    save_path="fine_tuned_qat_model.pt")
    
qat_model.to("cpu")

# Convert to real INT8 (runs on CPU)
qat_model.eval()
int8_model = convert_qat(qat_model)
print("Model converted to int8")

# Save the quantized model with full state
torch.save({
    'model_state_dict': int8_model.state_dict(),
    'quantization_config': int8_model.state_dict()
}, "quantized_int8_model.pt")

print("Saved quantized model checkpoint to quantized_int8_model.pt")

# Evaluate int8 model on test data
int8_model.eval()
print("Testing model on cpu")
test_acc = helper_utils.compute_accuracy(int8_model, test_loader, device="cpu")
print(f"Test accuracy in base model: {base_accuracy:.2f}%")
print(f'\nInt8 model test accuracy: {test_acc:.2f}%')

# Measure inference time for both models
base_time = helper_utils.bench(model)
int8_time = helper_utils.bench(int8_model)

# Calculate percentage improvement
time_improvement = ((base_time - int8_time) / base_time) * 100

print(f"\nInference time comparison:")
print(f"Base model: {base_time:.4f} seconds per batch")
print(f"Int8 model: {int8_time:.4f} seconds per batch") 
print(f"Speed improvement: {time_improvement:.1f}%")

# Save both models weights to compare sizes
torch.save(model.state_dict(), "base_model_weights.pt")
torch.save(int8_model.state_dict(), "int8_model_weights.pt")

# Get file sizes in MB
base_size = os.path.getsize("base_model_weights.pt") / (1024 * 1024)
int8_size = os.path.getsize("int8_model_weights.pt") / (1024 * 1024)

print(f"\nModel size comparison:")
print(f"Base model: {base_size:.2f} MB")
print(f"Int8 model: {int8_size:.2f} MB")
print(f"Size reduction: {((base_size - int8_size) / base_size * 100):.1f}%")


#### Expected Output

```
Base Model loaded and wrapped
Model prepared for qat

New best accuracy: 0.9519, saved model to best_model.pt

New best accuracy: 0.9667, saved model to best_model.pt

Training completed:
Best accuracy: 0.9667
Final accuracy: 0.9667
Final model saved to final_model.pt
Model converted to int8
Saved quantized model checkpoint to quantized_int8_model.pt
Testing model on cpu

Test accuracy in base model: 0.97%

Int8 model test accuracy: 0.96%

Inference time comparison:
Base model: 0.0346 seconds per batch
Int8 model: 0.0193 seconds per batch
Speed improvement: 44.2%

Model size comparison:
Base model: 512.22 MB
Int8 model: 128.31 MB
Size reduction: 75.0%
```

In [None]:
# Test your code!

unittests.exercise4(prepare_qat)

## Conclusion

You started from a solid baseline and made it deployable. By saving and reloading state dictionaries you protected your progress and created a repeatable path back to working models. With pruning you explored how zeroing less useful parameters can make a network leaner, and you saw how masks change behavior without immediately changing tensor shapes. Dynamic quantization gave you a rapid path to smaller weights and faster CPU inference, and the fusion plus quantization aware training workflow helped you fine tune under simulated int8 effects so the final converted model holds onto accuracy while gaining speed.

You now have a StreetClassifier that is lighter, faster, and easier to package. The same workflow scales to larger architectures and different tasks: prune with intent, quantify the accuracy and latency impact, and then use quantization aware training when you want the best balance. As next steps, you can automate experiment tracking, prune with channel removal to alter layer shapes, or export to formats that run beyond PyTorch so your models serve users wherever they live.