1. PTQ Static with calibration MinMax (per-tensor and per-channel via fbgemm)
2. CNN, CNN with Early Exit and Pre-trained models (MobileNetV2, ResNet50, ResNeXt50, EfficientNet-B0)
3. Dataset: CIFAR-10, CIFAR-100, Tiny ImageNet
4. Target: CPU inference

### Libraries

In [1]:

import os
import io
import time
import copy
import shutil
import zipfile
import urllib.request
from tqdm import tqdm
from pathlib import Path
from tabulate import tabulate
import matplotlib.pyplot as plt
from typing import Optional, Literal, Dict, Any, List

import torch
import torch.nn as nn
import torch.optim as optim
import torch.ao.quantization
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.quantization import QuantStub, DeQuantStub, prepare_qat, convert

In [2]:
print(f"PyTorch: {torch.__version__}")
print(torch.backends.quantized.supported_engines)
device  = torch.device("cpu")
results = []

PyTorch: 2.5.1+cpu
['none', 'onednn', 'x86', 'fbgemm']


### Dataloader

In [12]:
DatasetName = Literal["cifar10", "cifar100", "tiny_imagenet", "mnist"]

class DataloaderManager:
    CONFIGS = {
        "cifar10": {
            "size": 32, "padding": 4, "num_classes": 10,
            "stats": ((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616))
        },
        "cifar100": {
            "size": 32, "padding": 4, "num_classes": 100,
            "stats": ((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))
        },
        "tiny_imagenet": {
            "size": 64, "padding": 8, "num_classes": 200,
            "stats": ((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
        },
        "mnist": {
            "size": 28, "padding": 0, "num_classes": 10,
            "stats": ((0.1307,), (0.3081,))
        }
    }

    def __init__(self, root_dir: str, dataset: DatasetName):
        self.root = Path(root_dir)
        self.dataset = dataset
        self.cfg = self.CONFIGS[dataset]
        
    def _get_transforms(self, train: bool):
        mean, std = self.cfg["stats"]
        tf_list = []
        if train:
            if self.dataset != "mnist":
                tf_list.extend([
                    transforms.RandomHorizontalFlip(),
                    transforms.RandomCrop(self.cfg["size"], padding=self.cfg["padding"])
                ])
            else:
                tf_list.append(transforms.RandomRotation(10))

        tf_list.extend([
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])

        return transforms.Compose(tf_list)

    def _get_dataset_instance(self, train: bool):
        tf = self._get_transforms(train)

        if self.dataset.startswith("cifar"):
            ds_cls = datasets.CIFAR10 if self.dataset == "cifar10" else datasets.CIFAR100
            return ds_cls(self.root, train=train, download=True, transform=tf)

        if self.dataset == "mnist":
            return datasets.MNIST(self.root, train=train, download=True, transform=tf)

        path = self.root / "tiny-imagenet-200" / ("train" if train else "val")
        return datasets.ImageFolder(str(path), transform=tf)

    def get_loaders(self, batch_size: int, num_workers: int):
        train_ds = self._get_dataset_instance(train=True)
        val_ds   = self._get_dataset_instance(train=False)

        loader_args = {"batch_size": batch_size, "num_workers": num_workers, "pin_memory": True}
        
        train_loader = torch.utils.data.DataLoader(train_ds, shuffle=True, **loader_args)
        val_loader   = torch.utils.data.DataLoader(val_ds, shuffle=False, **loader_args)

        self._print_summary(train_loader, val_loader, batch_size)
        return train_loader, val_loader

    def _print_summary(self, train_loader, val_loader, batch_size):
        print(f"Dataset      : {self.dataset}")
        print(f"Train samples: {len(train_loader.dataset)}")
        print(f"Val samples  : {len(val_loader.dataset)}")
        print(f"Classes      : {self.cfg['num_classes']}")
        print(f"Batch size   : {batch_size}")
        print(f"Train batches: {len(train_loader)}")
        print(f"Val batches  : {len(val_loader)}")

In [13]:
manager = DataloaderManager(root_dir="../data", dataset="mnist")
train_loader, val_loader = manager.get_loaders(batch_size=64, num_workers=4)
num_classes = manager.cfg["num_classes"]

Dataset      : mnist
Train samples: 60000
Val samples  : 10000
Classes      : 10
Batch size   : 64
Train batches: 938
Val batches  : 157


### Training and evaluation functions 

In [14]:
def train_model(
    model: nn.Module, 
    train_loader, 
    val_loader,
    device, 
    epochs: int, 
    lr: float, 
    model_name: str = "model"
) -> nn.Module:
    
    print(f"Training {model_name} | Epochs: {epochs} | LR: {lr}")
    model = model.to(device)
    
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
    criterion = nn.CrossEntropyLoss()
    
    best_acc = 0.0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")

        for data, target in pbar:
            data, target = data.to(device), target.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            pbar.set_postfix(loss=f"{running_loss/len(train_loader):.4f}")
            
        patience = 5
        trigger_times = 0
        acc = validate_model(model, val_loader, device, desc=f"Eval Ep {epoch+1}")
        scheduler.step()
        
        print(f"Result Epoch {epoch+1}: Loss = {running_loss/len(train_loader):.4f} | Acc = {acc:.2f}%")

        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), f"best_{model_name}.pth")
            trigger_times = 0
        else:
            trigger_times += 1
            if trigger_times >= patience:
                print("Early stopping!")
                break

    model.load_state_dict(torch.load(f"best_{model_name}.pth", weights_only=True))
    return model

def validate_model(model: nn.Module, data_loader, device, desc="Validating") -> float:
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in tqdm(data_loader, desc=desc, leave=False):
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)
            
    return 100.0 * correct / total

def evaluate_model(model: nn.Module, test_loader, device) -> float:
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            pred    = model(data).argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total   += target.size(0)
    return 100.0 * correct / total

def measure_inference_time(model: nn.Module, test_loader, device, num_batches: int = 20) -> float:
    """Return mean latency per batch in milliseconds."""
    model.eval()
    times = []
    with torch.no_grad():
        for i, (data, _) in enumerate(test_loader):
            if i >= num_batches:
                break
            data = data.to(device)
            if i < 3:   # warmup
                _ = model(data)
                continue
            t0 = time.time()
            _  = model(data)
            times.append(time.time() - t0)
    return sum(times) / len(times) * 1000

def get_model_size(model: nn.Module) -> float:
    buf = io.BytesIO()
    torch.save(model.state_dict(), buf)
    return buf.tell() / 1024 / 1024

# PTQ

## Calibration MinMax

### Experiment 1 - MLP

#### MLP without observers

In [15]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.flatten(x, 1) # [batch, 3, 32, 32] → [batch, 3072] === nn.Linear(3072, 512)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        return x

# REDE MAIS PROFUNDA
# class MLP(nn.Module):
#     def __init__(self, input_dim, hidden_dim, output_dim):
#         super().__init__()
#         self.net = nn.Sequential(
#             nn.Linear(input_dim, hidden_dim),
#             nn.BatchNorm1d(hidden_dim),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(hidden_dim, hidden_dim // 2),
#             nn.BatchNorm1d(hidden_dim // 2),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(hidden_dim // 2, output_dim)
#         )

#     def forward(self, x):
#         x = torch.flatten(x, 1)
#         return self.net(x)

#### Training

In [18]:
# model_fp32 = MLP(input_dim=3072, hidden_dim=512, output_dim=10).to(device) # cifar10
model_fp32 = MLP(input_dim=784, hidden_dim=512, output_dim=10).to(device)
model_fp32 = train_model(
    model_fp32, train_loader, val_loader, device,
    epochs=5, lr=0.01, model_name="experiment1_model_fp32"
)

acc_model_fp32  = evaluate_model(model_fp32, val_loader, device)
time_model_fp32 = measure_inference_time(model_fp32, val_loader, device)
size_model_fp32 = get_model_size(model_fp32)

print(f"Results:")
print(f"Acc     : {acc_model_fp32:.2f}%")
print(f"Latency : {time_model_fp32:.2f} ms/batch")
print(f"Size    : {size_model_fp32:.5f} MB")

Training experiment1_model_fp32 | Epochs: 5 | LR: 0.01


Epoch 1/5: 100%|██████████| 938/938 [00:07<00:00, 125.69it/s, loss=0.2811]
                                                            

Result Epoch 1: Loss = 0.2811 | Acc = 96.39%


Epoch 2/5: 100%|██████████| 938/938 [00:07<00:00, 126.34it/s, loss=0.1198]
                                                            

Result Epoch 2: Loss = 0.1198 | Acc = 97.19%


Epoch 3/5: 100%|██████████| 938/938 [00:07<00:00, 125.94it/s, loss=0.0863]
                                                            

Result Epoch 3: Loss = 0.0863 | Acc = 97.84%


Epoch 4/5: 100%|██████████| 938/938 [00:07<00:00, 128.16it/s, loss=0.0693]
                                                            

Result Epoch 4: Loss = 0.0693 | Acc = 97.96%


Epoch 5/5: 100%|██████████| 938/938 [00:07<00:00, 129.58it/s, loss=0.0609]
                                                            

Result Epoch 5: Loss = 0.0609 | Acc = 98.12%
Results:
Acc     : 98.12%
Latency : 0.24 ms/batch
Size    : 1.55458 MB


In [19]:
state_dict = torch.load("best_experiment1_model_fp32.pth", map_location="cpu", weights_only=True)
model_fp32.load_state_dict(state_dict)
model_fp32.eval()

print("Weights Structure:")
for name, param in model_fp32.named_parameters():
    print(f"{name} -> {param.shape}")
    
print('='*42)

print("Weights Statistics:\n")

for name, param in model_fp32.named_parameters():
    print(f"{name}:")
    print(param.data.view(-1)[:10])
    print(f"Dtype: {param.dtype}")
    print(f"Shape: {tuple(param.shape)}")
    print(f"Mean: {param.data.mean().item():.6f}")
    print(f"Std:  {param.data.std().item():.6f}")
    print('-'*42)
  
print('Datatype model:')  
print(next(model_fp32.parameters()).dtype)

Weights Structure:
linear1.weight -> torch.Size([512, 784])
linear1.bias -> torch.Size([512])
linear2.weight -> torch.Size([10, 512])
linear2.bias -> torch.Size([10])
Weights Statistics:

linear1.weight:
tensor([-0.0175,  0.0201, -0.0092,  0.0005, -0.0101, -0.0255,  0.0081, -0.0065,
        -0.0162, -0.0012])
Dtype: torch.float32
Shape: (512, 784)
Mean: -0.000682
Std:  0.022832
------------------------------------------
linear1.bias:
tensor([ 0.0114, -0.0190,  0.0070, -0.0084,  0.0045,  0.0068, -0.0315, -0.0023,
         0.0089,  0.0088])
Dtype: torch.float32
Shape: (512,)
Mean: 0.001357
Std:  0.020073
------------------------------------------
linear2.weight:
tensor([-0.0865,  0.1052,  0.0611, -0.1032, -0.0705,  0.1378, -0.0799,  0.0163,
        -0.1358,  0.0837])
Dtype: torch.float32
Shape: (10, 512)
Mean: 0.000050
Std:  0.085414
------------------------------------------
linear2.bias:
tensor([-0.0715, -0.0490, -0.0043, -0.0023,  0.0380,  0.0446, -0.0464, -0.0715,
         0.1086, -0

#### MLP with observers

In [20]:
class QuantizedMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.quant = torch.quantization.QuantStub()
        self.linear1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, output_dim)
        self.dequant = torch.quantization.DeQuantStub()

    def forward(self, x):
        x = self.quant(x)
        x = torch.flatten(x, 1)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.dequant(x)
        return x

#### Calibration

In [29]:
model_prepared = QuantizedMLP(input_dim=784, hidden_dim=512, output_dim=10).to(device)
model_prepared.load_state_dict(model_fp32.state_dict())
model_prepared.eval()

model_prepared.qconfig = torch.ao.quantization.default_qconfig
model_prepared = torch.ao.quantization.prepare(model_prepared)
model_prepared

QuantizedMLP(
  (quant): QuantStub(
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (linear1): Linear(
    in_features=784, out_features=512, bias=True
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (relu): ReLU()
  (linear2): Linear(
    in_features=512, out_features=10, bias=True
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (dequant): DeQuantStub()
)

In [30]:
acc_model_prepared = validate_model(model_prepared, val_loader, device, desc='Val Quantized')
lat_model_prepared = measure_inference_time(model_prepared, val_loader, device)
model_size_model_prepared = get_model_size(model_prepared)

print(f"FP32 Accuracy: {acc_model_prepared:.2f}%")
print(f"FP32 Latency:  {lat_model_prepared:.2f} ms/batch")
print(f"FP32 Size:  {model_size_model_prepared:.5f} MB")

                                                                

FP32 Accuracy: 98.12%
FP32 Latency:  0.73 ms/batch
FP32 Size:  1.55711 MB


#### Conversion

In [31]:
model_prepared
model_quantized = torch.ao.quantization.convert(model_prepared)
model_quantized

QuantizedMLP(
  (quant): Quantize(scale=tensor([0.0256]), zero_point=tensor([17]), dtype=torch.quint8)
  (linear1): QuantizedLinear(in_features=784, out_features=512, scale=0.16171421110630035, zero_point=70, qscheme=torch.per_tensor_affine)
  (relu): ReLU()
  (linear2): QuantizedLinear(in_features=512, out_features=10, scale=0.31290170550346375, zero_point=52, qscheme=torch.per_tensor_affine)
  (dequant): DeQuantize()
)

In [32]:
print("Weights Statistics:\n")
print(torch.int_repr(model_quantized.linear1.weight()))
print(torch.int_repr(model_quantized.linear2.weight()))

Weights Statistics:

tensor([[-15,  17,  -8,  ...,   7,  11, -17],
        [-11,   8,  10,  ...,  24,  11,   4],
        [ 24,  16,  16,  ...,  28, -21,  -8],
        ...,
        [-23, -14, -11,  ..., -26, -18,  22],
        [ -9,  -9,  29,  ...,   3,  14,  -9],
        [-26,  29, -27,  ...,  11, -22,  -7]], dtype=torch.int8)
tensor([[-30,  37,  21,  ...,  23, -22, -27],
        [ 17, -15, -38,  ..., -42,  25,  39],
        [-71, -48, -48,  ...,   0,  -2, -43],
        ...,
        [  8,  18,  16,  ..., -60,  18, -15],
        [ 14, -47,  33,  ...,  34, -11, -31],
        [ 20,  24,  32,  ..., -18, -22,  13]], dtype=torch.int8)


#### Int8 Inference

In [33]:
acc_model_quantized = validate_model(model_quantized, val_loader, device, desc='Final INT8 Eval')
lat_model_quantized = measure_inference_time(model_quantized, val_loader, device)
model_size_quantized = get_model_size(model_quantized)

print(f"Acc INT8     : {acc_model_quantized:.2f}%")
print(f"Latency INT8 : {lat_model_quantized:.2f} ms/batch")
print(f"Size INT8    : {model_size_quantized:.2f} MB")

                                                                  

Acc INT8     : 98.10%
Latency INT8 : 0.62 ms/batch
Size INT8    : 0.39 MB


| Type  | Acc | Latency | Size |
| ----- | --- | ------- | ---- |
| FP32  | 98.12% | 0.73 ms/batch | 1.55711 MB |
| INT8  | 98.10% | 0.62 ms/batch | 0.39 MB    |

### Experiment 2

#### CNN No Observers

In [19]:
class CNN_NO_OBS(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1   = nn.BatchNorm2d(32)
        self.relu1 = nn.ReLU(inplace=False)

        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2   = nn.BatchNorm2d(64)
        self.relu2 = nn.ReLU(inplace=False)

        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3   = nn.BatchNorm2d(128)
        self.relu3 = nn.ReLU(inplace=False)

        self.pool = nn.AdaptiveAvgPool2d((8, 8))
        self.fc   = nn.Linear(128 * 8 * 8, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.relu1(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, 2)
        x = self.relu2(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, 2)
        x = self.relu3(self.bn3(self.conv3(x)))

        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

#### Training

In [21]:
cnn_fp32 = CNN_NO_OBS(num_classes=10).to(device)
cnn_fp32 = train_model(
    cnn_fp32, train_loader, val_loader, device,
    epochs=5, lr=0.01, model_name="experiment2_cnn_fp32"
)

acc_cnn_fp32  = evaluate_model(cnn_fp32, val_loader, device)
time_cnn_fp32 = measure_inference_time(cnn_fp32, val_loader, device)
size_cnn_fp32 = get_model_size(cnn_fp32)

print(f"Results:")
print(f"Acc     : {acc_cnn_fp32:.2f}%")
print(f"Latency : {time_cnn_fp32:.2f} ms/batch")
print(f"Size    : {size_cnn_fp32:.5f} MB")

Training experiment2_cnn_fp32 | Epochs: 5 | LR: 0.01


Epoch 1/5: 100%|██████████| 782/782 [00:30<00:00, 25.30it/s, loss=1.5273]
                                                            

Result Epoch 1: Loss = 1.5273 | Acc = 64.03%


Epoch 2/5: 100%|██████████| 782/782 [00:31<00:00, 24.71it/s, loss=1.0502]
                                                            

Result Epoch 2: Loss = 1.0502 | Acc = 60.80%


Epoch 3/5: 100%|██████████| 782/782 [00:31<00:00, 24.87it/s, loss=0.8913]
                                                            

Result Epoch 3: Loss = 0.8913 | Acc = 71.85%


Epoch 4/5: 100%|██████████| 782/782 [00:31<00:00, 24.69it/s, loss=0.7889]
                                                            

Result Epoch 4: Loss = 0.7889 | Acc = 75.31%


Epoch 5/5: 100%|██████████| 782/782 [00:31<00:00, 25.18it/s, loss=0.7156]
                                                            

Result Epoch 5: Loss = 0.7156 | Acc = 76.63%
Results:
Acc     : 76.63%
Latency : 8.63 ms/batch
Size    : 0.67806 MB


In [22]:
state_dict = torch.load("best_experiment2_cnn_fp32.pth", map_location="cpu", weights_only=True)
cnn_fp32.load_state_dict(state_dict)
cnn_fp32.eval()

print("Weights Structure:")
for name, param in cnn_fp32.named_parameters():
    print(f"{name} -> {param.shape}")
    
print('='*42)

print("Weights Statistics:\n")

for name, param in cnn_fp32.named_parameters():
    print(f"{name}:")
    print(param.data.view(-1)[:10])
    print(f"Dtype: {param.dtype}")
    print(f"Shape: {tuple(param.shape)}")
    print(f"Mean: {param.data.mean().item():.6f}")
    print(f"Std:  {param.data.std().item():.6f}")
    print('-'*42)
  
print('Datatype model:')  
print(next(cnn_fp32.parameters()).dtype)

Weights Structure:
conv1.weight -> torch.Size([32, 3, 3, 3])
conv1.bias -> torch.Size([32])
bn1.weight -> torch.Size([32])
bn1.bias -> torch.Size([32])
conv2.weight -> torch.Size([64, 32, 3, 3])
conv2.bias -> torch.Size([64])
bn2.weight -> torch.Size([64])
bn2.bias -> torch.Size([64])
conv3.weight -> torch.Size([128, 64, 3, 3])
conv3.bias -> torch.Size([128])
bn3.weight -> torch.Size([128])
bn3.bias -> torch.Size([128])
fc.weight -> torch.Size([10, 8192])
fc.bias -> torch.Size([10])
Weights Statistics:

conv1.weight:
tensor([ 0.0483,  0.0565,  0.1348, -0.1159, -0.1469, -0.1909,  0.1750, -0.2315,
        -0.1932,  0.1487])
Dtype: torch.float32
Shape: (32, 3, 3, 3)
Mean: 0.000287
Std:  0.192709
------------------------------------------
conv1.bias:
tensor([ 0.1551,  0.0933, -0.1757,  0.1327,  0.1873,  0.0394,  0.0653,  0.0367,
        -0.0382, -0.1751])
Dtype: torch.float32
Shape: (32,)
Mean: -0.006533
Std:  0.101757
------------------------------------------
bn1.weight:
tensor([0.9763, 

#### CNN Observers

In [23]:
class CNN_OBS(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.quant   = QuantStub()
        self.dequant = DeQuantStub()

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1   = nn.BatchNorm2d(32)
        self.relu1 = nn.ReLU(inplace=False)

        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2   = nn.BatchNorm2d(64)
        self.relu2 = nn.ReLU(inplace=False)

        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3   = nn.BatchNorm2d(128)
        self.relu3 = nn.ReLU(inplace=False)

        self.pool = nn.AdaptiveAvgPool2d((8, 8))
        self.fc   = nn.Linear(128 * 8 * 8, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.quant(x)

        x = self.relu1(self.bn1(self.conv1(x)))
        x = F.max_pool2d(x, 2)
        x = self.relu2(self.bn2(self.conv2(x)))
        x = F.max_pool2d(x, 2)
        x = self.relu3(self.bn3(self.conv3(x)))

        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        x = self.dequant(x)
        return x

    CNN_FUSION_PATTERNS = [
        ["conv1", "bn1", "relu1"],
        ["conv2", "bn2", "relu2"],
        ["conv3", "bn3", "relu3"]
    ]

#### Folding BatchNormalization

In [24]:
def _find_conv_bn_relu_sequences(module: nn.Module) -> List[List[str]]:
    children = list(module.named_children())
    patterns: List[List[str]] = []
    i = 0
    while i < len(children):
        n0, m0 = children[i]
        if isinstance(m0, nn.Conv2d) and i + 1 < len(children):
            n1, m1 = children[i + 1]
            if isinstance(m1, nn.BatchNorm2d):
                if i + 2 < len(children):
                    n2, m2 = children[i + 2]
                    if isinstance(m2, (nn.ReLU, nn.ReLU6)):
                        patterns.append([n0, n1, n2])
                        i += 3
                        continue
                patterns.append([n0, n1])
                i += 2
                continue
        i += 1
    return patterns

def fuse_bn_recursively(model: nn.Module) -> nn.Module:
    assert not model.training, "Call model.eval() before BN folding"
    for _, child in model.named_children():
        fuse_bn_recursively(child)
    patterns = _find_conv_bn_relu_sequences(model)
    if patterns:
        torch.quantization.fuse_modules(model, patterns, inplace=True)

    return model

In [25]:
def apply_ptq(model: nn.Module, calibration_loader, mode: Literal["per_tensor", "per_channel"] = "per_channel",
            fold_bn: bool = False, num_calibration_batches: int = 10, custom_fusion_patterns: Optional[List[List[str]]] = None) -> nn.Module:
    
    backend = "fbgemm"
    torch.backends.quantized.engine = backend

    model = copy.deepcopy(model).cpu().eval()

    if fold_bn:
        if custom_fusion_patterns is not None:
            torch.quantization.fuse_modules(
                model, custom_fusion_patterns, inplace=True
            )
        else:
            fuse_bn_recursively(model)

    if mode == "per_channel":
        qconfig = torch.quantization.get_default_qconfig(backend)
    else:
        qconfig = torch.quantization.QConfig(
            activation=torch.quantization.MinMaxObserver.with_args(
                dtype=torch.quint8, qscheme=torch.per_tensor_affine
            ),
            weight=torch.quantization.MinMaxObserver.with_args(
                dtype=torch.qint8, qscheme=torch.per_tensor_symmetric
            ),
        )

    model.qconfig = qconfig
    model_prepared = torch.quantization.prepare(model, inplace=False)
    with torch.no_grad():
        for i, (data, _) in enumerate(calibration_loader):
            if i >= num_calibration_batches:
                break
            model_prepared(data.cpu())

    model_quantized = torch.quantization.convert(model_prepared, inplace=False)
    return model_quantized

In [28]:
cnn_fused = copy.deepcopy(cnn_fp32).cpu().eval()
torch.quantization.fuse_modules(cnn_fused, CNN_OBS.CNN_FUSION_PATTERNS, inplace=True)

lat_fused  = measure_inference_time(cnn_fused, val_loader, device)
acc_fused  = evaluate_model(cnn_fused, val_loader, device)
size_fused = get_model_size(cnn_fused)

print(f"{'Model':<30} {'Acc':>7}  {'Latency':>10}  {'Size':>8}")
print("="*60)
print(f"{'CNN FP32 without layer fusion':<30} {acc_cnn_fp32:>6.2f}%  {time_cnn_fp32:>8.2f}ms  {size_cnn_fp32:>6.3f}MB")
print(f"{'CNN FP32 with layer fusion':<30} {acc_fused:>6.2f}%  {lat_fused:>8.2f}ms  {size_fused:>6.3f}MB")

Model                              Acc     Latency      Size
CNN FP32 without layer fusion   76.63%      8.63ms   0.678MB
CNN FP32 with layer fusion      76.63%      7.86ms   0.671MB


In [None]:
torch.backends.quantized.engine = "fbgemm"

cnn_quant_base = CNNOBS(num_classes=10)
cnn_quant_base.load_state_dict(torch.load("best_cnn_fp32.pth", map_location="cpu", weights_only=True))

print("Quantizando: per_tensor ...")
cnn_pt = apply_ptq(cnn_quant_base, val_loader, mode="per_tensor",  fold_bn=False, custom_fusion_patterns=CNN.CNN_FUSION_PATTERNS)

print("Quantizando: per_tensor + fold_bn ...")
cnn_pt_fold = apply_ptq(cnn_quant_base, val_loader, mode="per_tensor",  fold_bn=True, custom_fusion_patterns=CNN.CNN_FUSION_PATTERNS)

print("Quantizando: per_channel ...")
cnn_pc = apply_ptq(cnn_quant_base, val_loader, mode="per_channel", fold_bn=False, custom_fusion_patterns=CNN.CNN_FUSION_PATTERNS)

print("Quantizando: per_channel + fold_bn ...")
cnn_pc_fold = apply_ptq(cnn_quant_base, val_loader, mode="per_channel", fold_bn=True, custom_fusion_patterns=CNN.CNN_FUSION_PATTERNS)

In [None]:
def eval_all(model, name):
    acc  = evaluate_model(model, val_loader, device)
    lat  = measure_inference_time(model, val_loader, device)
    size = get_model_size(model)
    print(f"{name:<35} acc={acc:.2f}%  lat={lat:.2f}ms  size={size:.3f}MB")
    return acc, lat, size

print(f"{'Modelo':<35} {'Acc':>8}  {'Latência':>10}  {'Tamanho':>9}")
print("-" * 70)
acc_fp32_cnn,  lat_fp32_cnn,  size_fp32_cnn  = eval_all(cnn_fp32,      "CNN FP32")
acc_pt,        lat_pt,        size_pt         = eval_all(cnn_pt,       "CNN per_tensor")
acc_pt_fold,   lat_pt_fold,   size_pt_fold    = eval_all(cnn_pt_fold,  "CNN per_tensor + fold_bn")
acc_pc,        lat_pc,        size_pc         = eval_all(cnn_pc,       "CNN per_channel")
acc_pc_fold,   lat_pc_fold,   size_pc_fold    = eval_all(cnn_pc_fold,  "CNN per_channel + fold_bn")

In [None]:
imagenet_transform = T.Compose([
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std =[0.229, 0.224, 0.225]),
])

# Recria o val loader com transforms de ImageNet para avaliação dos modelos pré-treinados
import torchvision.datasets as dsets
cifar10_imagenet_val = dsets.CIFAR10(
    "../data", train=False, download=True, transform=imagenet_transform
)
imagenet_val_loader = torch.utils.data.DataLoader(
    cifar10_imagenet_val, batch_size=64, shuffle=False, num_workers=4, pin_memory=True
)

def evaluate_pretrained_variants(model_name: str):
    """
    Carrega, quantiza e avalia um modelo pré-treinado nas 4 variantes PTQ.
    Retorna dict com acc, lat, size para fp32 e cada variante.
    """
    print(f"\n{'='*60}")
    print(f"  Modelo: {model_name}")
    print(f"{'='*60}")

    torch.backends.quantized.engine = "fbgemm"

    # FP32 baseline
    model_fp32 = prepare_pretrained_model(model_name, num_classes=1000, wrap=False)
    model_fp32.eval().cpu()

    acc_fp32_ = evaluate_model(model_fp32, imagenet_val_loader, device)
    lat_fp32_ = measure_inference_time(model_fp32, imagenet_val_loader, device, num_batches=10)
    sz_fp32_  = get_model_size(model_fp32)
    print(f"  FP32       acc={acc_fp32_:.2f}%  lat={lat_fp32_:.1f}ms  size={sz_fp32_:.1f}MB")

    # Para quantização precisamos do wrapper
    base_wrapped = prepare_pretrained_model(model_name, num_classes=1000, wrap=True)

    variants = {
        "per_tensor":         dict(mode="per_tensor",  fold_bn=False),
        "per_tensor_foldbn":  dict(mode="per_tensor",  fold_bn=True),
        "per_channel":        dict(mode="per_channel", fold_bn=False),
        "per_channel_foldbn": dict(mode="per_channel", fold_bn=True),
    }

    row = {
        "model": model_name,
        "fp32_acc": acc_fp32_, "fp32_lat": lat_fp32_, "fp32_size": sz_fp32_,
    }

    for tag, kwargs in variants.items():
        q_model = apply_ptq(base_wrapped, imagenet_val_loader,
                            num_calibration_batches=10, **kwargs)
        acc_ = evaluate_model(q_model, imagenet_val_loader, device)
        lat_ = measure_inference_time(q_model, imagenet_val_loader, device, num_batches=10)
        sz_  = get_model_size(q_model)
        print(f"  {tag:<22} acc={acc_:.2f}%  lat={lat_:.1f}ms  size={sz_:.1f}MB")
        row[f"{tag}_acc"]  = acc_
        row[f"{tag}_lat"]  = lat_
        row[f"{tag}_size"] = sz_

    return row

# ── Executar para todos os modelos pré-treinados ──────────────────────────────
pretrained_names = ["mobilenet_v2", "resnet50", "resnext50_32x4d", "efficientnet_b0"]
pretrained_results = []

for name in pretrained_names:
    row = evaluate_pretrained_variants(name)
    pretrained_results.append(row)

print("\nQuantização de modelos pré-treinados concluída!")


In [None]:
from tabulate import tabulate

# ── Tabela 1: Modelo | Variante | Latência (ms) | Tamanho (MB) ────────────────
rows_t1 = []

# CNN (custom)
cnn_variants_t1 = [
    ("CNN",  "FP32",                   lat_fp32_cnn,  size_fp32_cnn),
    ("CNN",  "per_tensor",             lat_pt,        size_pt),
    ("CNN",  "per_tensor + fold_bn",   lat_pt_fold,   size_pt_fold),
    ("CNN",  "per_channel",            lat_pc,        size_pc),
    ("CNN",  "per_channel + fold_bn",  lat_pc_fold,   size_pc_fold),
]
rows_t1.extend(cnn_variants_t1)

# Modelos pré-treinados
for r in pretrained_results:
    m = r["model"]
    rows_t1.append((m, "FP32",                   r["fp32_lat"],                r["fp32_size"]))
    rows_t1.append((m, "per_tensor",             r["per_tensor_lat"],          r["per_tensor_size"]))
    rows_t1.append((m, "per_tensor + fold_bn",   r["per_tensor_foldbn_lat"],   r["per_tensor_foldbn_size"]))
    rows_t1.append((m, "per_channel",            r["per_channel_lat"],         r["per_channel_size"]))
    rows_t1.append((m, "per_channel + fold_bn",  r["per_channel_foldbn_lat"],  r["per_channel_foldbn_size"]))

print("\n" + "="*65)
print("TABELA 1 — Latência e Tamanho dos Modelos")
print("="*65)
print(tabulate(
    [[m, v, f"{lat:.2f}", f"{sz:.3f}"] for m, v, lat, sz in rows_t1],
    headers=["Modelo", "Variante", "Latência (ms/batch)", "Tamanho (MB)"],
    tablefmt="fancy_grid"
))

# ── Tabela 2: Modelo | FP32 | per_tensor | per_tensor_fold | per_channel | per_channel_fold ──
rows_t2 = []

# CNN
rows_t2.append([
    "CNN",
    f"{acc_fp32_cnn:.2f}",
    f"{acc_pt:.2f}",
    f"{acc_pt_fold:.2f}",
    f"{acc_pc:.2f}",
    f"{acc_pc_fold:.2f}",
])

# Pré-treinados
for r in pretrained_results:
    rows_t2.append([
        r["model"],
        f"{r['fp32_acc']:.2f}",
        f"{r['per_tensor_acc']:.2f}",
        f"{r['per_tensor_foldbn_acc']:.2f}",
        f"{r['per_channel_acc']:.2f}",
        f"{r['per_channel_foldbn_acc']:.2f}",
    ])

print("\n" + "="*90)
print("TABELA 2 — Acurácia (%) por Estratégia de Quantização")
print("="*90)
print(tabulate(
    rows_t2,
    headers=[
        "Modelo", "FP32 (%)", "per_tensor (%)",
        "per_tensor\n+fold_bn (%)", "per_channel (%)",
        "per_channel\n+fold_bn (%)"
    ],
    tablefmt="fancy_grid"
))

print("\n[Legenda]")
print("  fold_bn   = BatchNorm absorvido nos pesos Conv antes da quantização")
print("  per_tensor = uma escala por tensor inteiro (pesos + ativações)")
print("  per_channel= uma escala por canal de saída (pesos) — padrão fbgemm")


#### Wrapper

In [None]:
class QuantizableWrapper(nn.Module):
    def __init__(self, model: nn.Module):
        super().__init__()
        self.quant   = QuantStub()
        self.model   = model
        self.dequant = DeQuantStub()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.quant(x)
        x = self.model(x)
        x = self.dequant(x)
        return x
    
def prepare_pretrained_model(model_name: str, num_classes: int = 1000, wrap: bool = True) -> nn.Module:
    weights_map = {
        "mobilenet_v2":      (models.mobilenet_v2,       models.MobileNet_V2_Weights.IMAGENET1K_V2),
        "mobilenet_v3_small":(models.mobilenet_v3_small, models.MobileNet_V3_Small_Weights.IMAGENET1K_V1),
        "efficientnet_b0":   (models.efficientnet_b0,    models.EfficientNet_B0_Weights.IMAGENET1K_V1),
        "resnet50":          (models.resnet50,           models.ResNet50_Weights.IMAGENET1K_V2),
        "resnext50_32x4d":   (models.resnext50_32x4d,   models.ResNeXt50_32X4D_Weights.IMAGENET1K_V2),
    }

    if model_name not in weights_map:
        raise ValueError(f"Model '{model_name}' not supported. Choose from: {list(weights_map)}")

    constructor, weights = weights_map[model_name]
    model = constructor(weights=weights)

    if num_classes != 1000:
        if model_name == "mobilenet_v2":
            model.classifier[1] = nn.Linear(model.last_channel, num_classes)
        elif model_name == "mobilenet_v3_small":
            model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
        elif model_name == "efficientnet_b0":
            model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)
        elif model_name in ("resnet50", "resnext50_32x4d"):
            model.fc = nn.Linear(model.fc.in_features, num_classes)

    return QuantizableWrapper(model) if wrap else model

# QAT

In [None]:
import torch.ao.quantization as quantization

# Configuração para INT4
# Definimos o range de 0 a 15 (4 bits)
qconfig_int4 = quantization.FakeQuantize.with_args(
    observer=quantization.MinMaxObserver,
    quant_min=0,
    quant_max=15,
    dtype=torch.quint8,
    qscheme=torch.per_tensor_affine
)

# Para INT2 (0 a 3)
qconfig_int2 = quantization.FakeQuantize.with_args(
    observer=quantization.MinMaxObserver,
    quant_min=0,
    quant_max=3,
    dtype=torch.quint8,
    qscheme=torch.per_tensor_affine
)

class BinaryActivation(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        return torch.sign(input)

    @staticmethod
    def backward(ctx, grad_output):
        # Straight-Through Estimator (STE)
        return grad_output

class BinaryLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(out_features, in_features))
        
    def forward(self, x):
        # Binariza pesos e entradas durante o forward
        bw = torch.sign(self.weight)
        bx = torch.sign(x)
        return F.linear(bx, bw)

In [35]:
model_int4 = QuantizedMLP(input_dim=784, hidden_dim=512, output_dim=10)
model_int4.qconfig = quantization.QConfig(activation=qconfig_int4, weight=qconfig_int4)
model_prepared = quantization.prepare_qat(model_int4)
model_prepared

QuantizedMLP(
  (quant): QuantStub(
    (activation_post_process): FakeQuantize(
      fake_quant_enabled=tensor([1], dtype=torch.uint8), observer_enabled=tensor([1], dtype=torch.uint8), quant_min=0, quant_max=15, dtype=torch.quint8, qscheme=torch.per_tensor_affine, ch_axis=-1, scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32)
      (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
    )
  )
  (linear1): Linear(
    in_features=784, out_features=512, bias=True
    (weight_fake_quant): FakeQuantize(
      fake_quant_enabled=tensor([1], dtype=torch.uint8), observer_enabled=tensor([1], dtype=torch.uint8), quant_min=0, quant_max=15, dtype=torch.quint8, qscheme=torch.per_tensor_affine, ch_axis=-1, scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32)
      (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
    )
    (activation_post_process): FakeQuantize(
      fake_quant_enabled=tensor([1], dtype=torch.uint8), observer_enabled=tenso

In [None]:
model_prepared.eval()
with torch.no_grad():
    for images, _ in calibration_loader:
        model_prepared(images)

model_int4 = quantization.convert(model_prepared)

In [36]:
acc_model_prepared = validate_model(model_prepared, val_loader, device, desc='Val Quantized')
lat_model_prepared = measure_inference_time(model_prepared, val_loader, device)
model_size_model_prepared = get_model_size(model_prepared)

print(f"FP32 Accuracy: {acc_model_prepared:.2f}%")
print(f"FP32 Latency:  {lat_model_prepared:.2f} ms/batch")
print(f"FP32 Size:  {model_size_model_prepared:.5f} MB")

                                                                 

FP32 Accuracy: 13.65%
FP32 Latency:  3.09 ms/batch
FP32 Size:  1.56528 MB
