#TASK:
1. Split the 1000 samples into two groups, 50%,50%, one of them will be named calibration_dataset while the other will be validation_dataset. You should use calibration dataset to calibrate scale and zero point before performing actual quantization.
2. Make PTQ training for **torchvision.models.quantization.mobilenet_v2**, using PerchannelMinMax Quantization for weights and PerTensorMinMax Quantization separately.
3. Make Perchannel MovingAverageMinMax Quantization for weights and MovingAverage Pertensor Quantization for activation. Compare to results in 2., Check which one is better. Compare some scales and zero points of from the observers. Explain why one of the solution is better?

In [None]:
!gdown 1oHoYT7J4-xKfNu6cfBOMKHyO0QBqdYsI
!unzip /content/calibration_data2.zip -d /content/calibration_data

In [None]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.quantization.qconfig import QConfig
from torch.quantization.observer import MinMaxObserver, MovingAverageMinMaxObserver, MovingAveragePerChannelMinMaxObserver, PerChannelMinMaxObserver
from torch.ao.quantization.fake_quantize import FakeQuantize
import torch
import torchvision
import torch.nn as nn
import torch.ao.quantization.quantize_fx as quantize_fx
import copy
from torch.ao.quantization import (
  get_default_qconfig_mapping,
  get_default_qat_qconfig_mapping,
  QConfigMapping,
)
from tqdm.auto import tqdm
import time

In [None]:

transform_cali = transforms.Compose([
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.ToTensor(),  # Convert the image to a PyTorch tensor
      transforms.Normalize(
          mean=[0.485, 0.456, 0.406],  # ImageNet dataset mean
          std=[0.229, 0.224, 0.225]  # ImageNet dataset standard deviation
      )
  ])
transform_val = transforms.Compose([
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.ToTensor(),  # Convert the image to a PyTorch tensor
      transforms.Normalize(
          mean=[0.485, 0.456, 0.406],  # ImageNet dataset mean
          std=[0.229, 0.224, 0.225]  # ImageNet dataset standard deviation
      )
  ])


In [None]:
from PIL import Image
from torch.utils.data import Dataset
import random
import os

class MyDataset(Dataset):
    def __init__(self, txt_file, calibration=True, transform=None, dir=None):
        self.data = []
        with open(txt_file, 'r') as f:
            for line in f:
                image_path = line.split(' ')[0]
                label = line.split(' ')[1].split('\n')[0]
                self.data.append((image_path, int(label)))
            nr_images = len(self.data)
            if calibration:
                self.data = self.data[:nr_images//2]
            else:
                self.data = self.data[nr_images//2:]
        self.transform = transform
        self.dir = dir
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]
        if self.dir:
            image_path = os.path.join(self.dir, image_path)
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label


class CustomDataset(Dataset):
    def __init__(self, original_dataset, samples_per_epoch):
        self.original_dataset = original_dataset
        self.samples_per_epoch = samples_per_epoch

    def __getitem__(self, index):
        random_index = random.randint(0, len(self.original_dataset) - 1)
        return self.original_dataset[random_index]

    def __len__(self):
        return self.samples_per_epoch

In [None]:
calibration_dataset = MyDataset('/content/calibration_data/samples.txt', calibration=True, transform=transform_cali)
val_dataset = MyDataset('/content/calibration_data/samples.txt', calibration=False, transform=transform_val)

Define Model which will be quantized

In [None]:
example_inputs = (torch.randn(1, 3, 224, 224),)
model_fp = torchvision.models.quantization.mobilenet_v2(pretrained=True)
model_to_quantize = copy.deepcopy(model_fp)
quantize_fx.fuse_fx(model_to_quantize.eval())

# Specify quantization configuration

In [None]:
qconfig_1 = QConfig(
    activation=FakeQuantize.with_args(
        observer=MinMaxObserver,
        quant_min=0,
        quant_max=255,
        qscheme=torch.per_tensor_affine,
        reduce_range=False,),
    weight=FakeQuantize.with_args(
        observer=PerChannelMinMaxObserver,
        quant_min=-128,
        quant_max=127,
        dtype=torch.qint8,
        qscheme=torch.per_channel_symmetric,
        reduce_range=False,
        ch_axis=0,
          ))

In [None]:
qconfig_2 = QConfig(
    activation=FakeQuantize.with_args(
        observer=MinMaxObserver,
        quant_min=0,
        quant_max=255,
        qscheme=torch.per_tensor_affine,
        reduce_range=False,),
    weight=FakeQuantize.with_args(
        observer=MinMaxObserver,
        quant_min=-128,
        quant_max=127,
        dtype=torch.qint8,
        qscheme=torch.per_tensor_symmetric,
        reduce_range=False
        ))

In [None]:
qconfig_3 = QConfig(
    activation=FakeQuantize.with_args(
        observer=MovingAverageMinMaxObserver,
        quant_min=0,
        quant_max=255,
        qscheme=torch.per_tensor_affine,
        reduce_range=False,),
    weight=FakeQuantize.with_args(
        observer=MovingAveragePerChannelMinMaxObserver,
        quant_min=-128,
        quant_max=127,
        dtype=torch.qint8,
        qscheme=torch.per_channel_symmetric,
        reduce_range=False,
        ch_axis=0
        ))

In [None]:
qconfig_mapping = QConfigMapping().set_global(qconfig_3)
model_prepared = quantize_fx.prepare_fx(model_to_quantize, qconfig_mapping, example_inputs)

# Preparing model evaluation

In [None]:
def evaluate_model(model, dataloader, criterion, device):
    model.to(device)
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    start_time = time.perf_counter()
    with torch.no_grad():
        for data in tqdm(dataloader, desc="Batches", position=0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)

            # Compute the loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(dataloader)
    accuracy = 100 * correct / total
    inference_time = time.perf_counter() - start_time

    return avg_loss, accuracy, inference_time



# Create training/inference/evaluation tools


In [None]:
validation_loader = DataLoader(val_dataset, batch_size=32, shuffle=False,drop_last=True)
calibration_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, drop_last=True)
criterion = nn.CrossEntropyLoss()
device = "cuda" if torch.cuda.is_available() else "cpu"

# Evaluation of base model

In [None]:
loss, acc, perf_time = evaluate_model(model_prepared, validation_loader, criterion, "cpu")

In [None]:
print("Accuracy", acc)
print("Performing Time", perf_time)
print("Loss", loss)

# Calibration of the Quantization

In [None]:
model_prepared = model_prepared.to('cpu')
for data,labels in tqdm(calibration_loader, desc="Batches", position=0):
    model_prepared(data)

In [None]:
model_quantized = quantize_fx.convert_fx(model_prepared)

In [None]:
for module in model_quantized.modules():
  if isinstance(module, torch.quantization.ObserverBase):
        print(f"For module {module}: Scale = {module.scale}, Zero Point = {module.zero_point}")

## Evaluation of the quantized model

In [None]:
loss, acc, perf_time = evaluate_model(model_quantized, validation_loader, criterion, "cpu")

In [None]:
print("Accuracy", acc)
print("Performing Time", perf_time)
print("Loss", loss)

In [None]:
model_prepared