# Login to Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd drive/MyDrive

/content/drive/MyDrive


# Quantize model

## codes

In [None]:
import IPython.display as display

import glob
from collections import Counter

import math
import pandas as pd

import librosa
import librosa.display
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import torch
# import torchaudio
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from pathlib import Path
from PIL import Image
import soundfile as sf
from torch.utils.data import Dataset
from torchvision import models, transforms

import tensorflow as tf
from torch.utils.tensorboard import SummaryWriter
import datetime

from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

import torch.optim.lr_scheduler as lr_scheduler

def print_size_of_model(model, label=""):
    torch.save(model.state_dict(), "temp.p")
    size=os.path.getsize("temp.p")
    print("model: ",label,' \t','Size (MB):', size/1e6)
    os.remove('temp.p')
    return size

def model_size(model):
    param_size = 0
    for param in model.parameters():
        param_size += param.nelement() * param.element_size()
    buffer_size = 0
    for buffer in model.buffers():
        buffer_size += buffer.nelement() * buffer.element_size()

    size_all_mb = (param_size + buffer_size) / 1024**2
    # print('model size: {:.3f}MB'.format(size_all_mb))
    return size_all_mb

def evaluate(model, test_loader, device="cpu"):
    model.eval()
    model.to(device)
    num_correct = 0
    num_examples = 0

    with torch.no_grad():
        for batch in test_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            correct = torch.eq(torch.max(F.softmax(output, dim=1), dim=1)[1], targets).view(-1)
            num_correct += torch.sum(correct).item()
            num_examples += correct.shape[0]

    accuracy = num_correct / num_examples
    return accuracy

class FrequencyMask(object):
    """
      Example:
        >>> transforms.Compose([
        >>>     transforms.ToTensor(),
        >>>     FrequencyMask(max_width=10, use_mean=False),
        >>> ])

    """

    def __init__(self, max_width, use_mean=True):
        self.max_width = max_width
        self.use_mean = use_mean

    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of
            size (C, H, W) where the frequency
            mask is to be applied.

        Returns:
            Tensor: Transformed image with Frequency Mask.
        """
        start = random.randrange(0, tensor.shape[2])
        end = start + random.randrange(1, self.max_width)
        if self.use_mean:
            tensor[:, start:end, :] = tensor.mean()
        else:
            tensor[:, start:end, :] = 0
        return tensor

    def __repr__(self):
        format_string = self.__class__.__name__ + "(max_width="
        format_string += str(self.max_width) + ")"
        format_string += 'use_mean=' + (str(self.use_mean) + ')')

        return format_string


class TimeMask(object):
    """
      Example:
        >>> transforms.Compose([
        >>>     transforms.ToTensor(),
        >>>     TimeMask(max_width=10, use_mean=False),
        >>> ])

    """

    def __init__(self, max_width, use_mean=True):
        self.max_width = max_width
        self.use_mean = use_mean

    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of
            size (C, H, W) where the time mask
            is to be applied.

        Returns:
            Tensor: Transformed image with Time Mask.
        """
        start = random.randrange(0, tensor.shape[1])
        end = start + random.randrange(0, self.max_width)
        if self.use_mean:
            tensor[:, :, start:end] = tensor.mean()
        else:
            tensor[:, :, start:end] = 0
        return tensor

    def __repr__(self):
        format_string = self.__class__.__name__ + "(max_width="
        format_string += str(self.max_width) + ")"
        format_string += 'use_mean=' + (str(self.use_mean) + ')')
        return format_string


class PrecomputedESC50(Dataset):
    def __init__(self,path, max_freqmask_width, max_timemask_width, use_mean=True, dpi=50):
        files = Path(path).glob('*.png')
        self.items = [(f,int(f.name.split("-")[-1].replace(".wav.png",""))) for f in files]
        self.length = len(self.items)
        self.max_freqmask_width = max_freqmask_width
        self.max_timemask_width = max_timemask_width
        self.use_mean = use_mean
        self.img_transforms = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]),
            transforms.RandomApply([FrequencyMask(self.max_freqmask_width, self.use_mean)], p=0.5),
            transforms.RandomApply([TimeMask(self.max_timemask_width, self.use_mean)], p=0.5)])

    def __getitem__(self, index):
        filename, label = self.items[index]
        img = Image.open(filename).convert('RGB')
        return (self.img_transforms(img), label)

    def __len__(self):
        return self.length

# Define a function to plot and log confusion matrix to TensorBoard
def plot_confusion_matrix(model, test_loader, device="cpu"):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in test_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            predictions = torch.max(F.softmax(output, dim=1), dim=1)[1].cpu().numpy()
            all_predictions.extend(predictions)
            all_labels.extend(targets.cpu().numpy())

    # Generate confusion matrix
    cm = confusion_matrix(all_labels, all_predictions)

    # Create a heatmap of the confusion matrix
    plt.figure(figsize=(20, 16))
    sn.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=True, yticklabels=True)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')

    figure = plt.gcf()
    return figure

# Define a function to log predictions vs. actuals as images to TensorBoard
def log_predictions_vs_actuals(model, data_loader, device="cpu", num_batches=5):
    model.eval()

    batch_counter = 0
    with torch.no_grad():
        for batch in data_loader:
            if batch_counter >= num_batches:
                break

            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            predictions = torch.max(F.softmax(output, dim=1), dim=1)
            predicted_labels = predictions[1]
            probabilities = predictions[0]

            # Convert PyTorch tensors to NumPy arrays
            inputs_np = inputs.permute(0, 2, 3, 1).cpu().numpy()

            # Create a figure for each batch
            fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(12, 12))

            for i, ax in enumerate(axes.flat):
                ax.imshow(inputs_np[i])
                ax.axis("off")

                actual_label = targets[i].item()
                predicted_label = predicted_labels[i].item()
                probability = probabilities[i].item()

                # Color the title based on correctness
                title_color = 'green' if actual_label == predicted_label else 'red'

                ax.set_title(f"Actual: {actual_label}\nPredicted: {predicted_label}\nProb: {probability:.2f}", color=title_color)

            plt.tight_layout()
            batch_counter += 1
    return fig

class EarlyStopping:
    def __init__(self, patience, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_valid_accuracy = 0.0
        self.early_stop = False

    def step(self, valid_accuracy):
        if valid_accuracy > self.best_valid_accuracy:
            self.best_valid_accuracy = valid_accuracy
            self.counter = 0
        else:
            self.counter += 1
            if self.counter > self.patience:
                self.early_stop = True
                if self.verbose:
                    print("Early stopping activated.")
        return self.early_stop

class LearningRateScheduler(lr_scheduler._LRScheduler):
    def __init__(self, optimizer, patience, factor=0.1, verbose=False):
        self.optimizer = optimizer
        self.patience = patience
        self.factor = factor
        self.verbose = verbose
        self.lr_scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, patience=self.patience, factor=self.factor, verbose=self.verbose)

    def step(self, valid_accuracy):
        self.lr_scheduler.step(valid_accuracy)
        return self.optimizer.param_groups[0]['lr']

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")


PATH_ESC50_TRAIN="./train1/"
PATH_ESC50_VALID="./valid1/"
PATH_ESC50_TEST="./test/"

bs=16
esc50pre_train = PrecomputedESC50(PATH_ESC50_TRAIN, max_freqmask_width=10, max_timemask_width=10 )
esc50pre_valid = PrecomputedESC50(PATH_ESC50_VALID,max_freqmask_width=10, max_timemask_width=10 )
esc50pre_test = PrecomputedESC50(PATH_ESC50_TEST,max_freqmask_width=10, max_timemask_width=10 )

esc50_train_loader = torch.utils.data.DataLoader(esc50pre_train, bs, shuffle=True)
esc50_val_loader = torch.utils.data.DataLoader(esc50pre_valid, bs, shuffle=True)
esc50_test_loader = torch.utils.data.DataLoader(esc50pre_test, bs, shuffle=True)

  warn(


In [None]:
# Function to calculate model latency (replace with your actual latency calculation)
def estimate_latency(model, device="cpu"):
    input_tensor = torch.randn(1, 3, 224, 224).to(device)
    # This is a simplified estimation of model latency and may not be accurate for all models.
    model = model.to(device)
    input_tensor = input_tensor.to(device)

    # Warm-up to reduce variability
    for _ in range(10):
        _ = model(input_tensor)

    # Measure execution time
    start_time = torch.cuda.Event(enable_timing=True)
    end_time = torch.cuda.Event(enable_timing=True)
    start_time.record()
    _ = model(input_tensor)
    end_time.record()
    torch.cuda.synchronize()

    latency_ms = start_time.elapsed_time(end_time)
    return latency_ms


def train(model, optimizer, loss_fn, train_loader, val_loader, test_loader, load_model_name, epochs=20, device="cpu", log_dir='tensorboard_logs', patience=5, early_stopping_patience=10, classifier_type='MLP'):
    ## Create folders and writer
    # Create a directory to store TensorBoard logs
    # log_dir = 'tensorboard_logs'

    # Create a TensorBoard SummaryWriter
    # load_model_name = "resnet50"
    model_name = "best_model_" + load_model_name + ".pth"

    current_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
    unique_folder_name = f"{current_datetime}_{load_model_name}"
    unique_log_dir = os.path.join(log_dir, unique_folder_name)
    model_path = os.path.join(unique_log_dir, model_name)

    layout = {
        "Train and validation at same time": {
            "Loss": ["Multiline", ["Loss/Train", "Loss/Validation"]],
            "Accuracy": ["Multiline", ["Accuracy/Train", "Accuracy/Validation"]],
        },
    }

    writer = SummaryWriter(log_dir=unique_log_dir)
    writer.add_custom_scalars(layout)

    ## use early_stopping and scheduler learning rate
    early_stopping = EarlyStopping(patience=early_stopping_patience, verbose=True)
    lr_scheduler = LearningRateScheduler(optimizer, patience=patience, factor=0.1, verbose=True)

    ## start training
    best_valid_accuracy = 0.0
    best_model_state = None

    for epoch in range(1, epochs + 1):
        training_loss = 0.0
        valid_loss = 0.0
        model.train()

        # Initialize variables for train accuracy calculation
        num_correct_train = 0
        num_examples_train = 0

        for batch in train_loader:
            optimizer.zero_grad()
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            loss = loss_fn(output, targets)
            loss.backward()
            optimizer.step()
            training_loss += loss.data.item() * inputs.size(0)

            # Calculate the number of correct predictions in the current batch
            correct = torch.eq(torch.max(F.softmax(output, dim=1), dim=1)[1], targets).view(-1)
            num_correct_train += torch.sum(correct).item()
            num_examples_train += correct.shape[0]

        training_loss /= len(train_loader.dataset)
        train_accuracy = num_correct_train / num_examples_train

        model.eval()
        num_correct = 0
        num_examples = 0

        for batch in val_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            output = model(inputs)
            targets = targets.to(device)
            loss = loss_fn(output, targets)
            valid_loss += loss.data.item() * inputs.size(0)
            correct = torch.eq(torch.max(F.softmax(output, dim=1), dim=1)[1], targets).view(-1)

            num_correct += torch.sum(correct).item()
            num_examples += correct.shape[0]

        valid_loss /= len(val_loader.dataset)
        valid_accuracy = num_correct / num_examples

        # Get the current learning rate from the optimizer
        current_lr = lr_scheduler.step(valid_accuracy)

        print('Epoch: {}, Learning Rate: {}, Training Loss: {:.2f}, Training Accuracy: {:.4f}, Validation Loss: {:.2f}, Validation Accuracy: {:.4f}'.format(epoch, current_lr, training_loss, train_accuracy, valid_loss, valid_accuracy))

        # Log training accuracy to TensorBoard
        writer.add_scalar('Learning Rate', current_lr, epoch)
        writer.add_scalar('Loss/Train', training_loss, epoch)
        writer.add_scalar('Accuracy/Train', train_accuracy, epoch)
        writer.add_scalar('Loss/Validation', valid_loss, epoch)
        writer.add_scalar('Accuracy/Validation', valid_accuracy, epoch)

        early_stop = early_stopping.step(valid_accuracy)
        if early_stop:
            break  # Stop training if early stopping is activated

        # Save the best model based on validation accuracy
        if valid_accuracy > best_valid_accuracy:
            best_valid_accuracy = valid_accuracy
            best_model_state = model.state_dict()
            # Save the best model state to a file
            torch.save(best_model_state, model_path)

    print(f"\n Model has been saved to {model_path} \n")

    # Inspect the model
    writer.add_graph(model, inputs)
    writer.add_figure('Confusion Matrix', plot_confusion_matrix(model, val_loader, device))
    # writer.add_figure(f"Predictions vs. Actuals", log_predictions_vs_actuals(model, val_loader, device=device, num_batches=1))

    # Add hyperparameters to TensorBoard
    hyperparameters = {
        'Feature Extractor': load_model_name,
        'Model Accuracy': best_valid_accuracy,
        'Params (M)': sum(p.numel() for p in model.parameters()) / 1e6,  # Convert to million parameters
        'Size of model (MB)': os.path.getsize(model_path) / (1024 * 1024),  # Size in MB
        'Latency of model (ms)': estimate_latency(model, device),  # Calculate latency with a dummy input
        'Classifier type': classifier_type,
        'Training type': 'Normal',
    }

    writer.add_hparams(hparam_dict=hyperparameters, metric_dict={})
    # Print hyperparameters with .4f
    for key, value in hyperparameters.items():
        if isinstance(value, float):
            print(f'{key}: {value:.4f}')
        else:
            print(f'{key}: {value}')

    # Close the TensorBoard SummaryWriter
    writer.close()

    model.load_state_dict(torch.load(model_path))
    test_accuracy = evaluate(model, test_loader, device=device)
    print(f"\n Test Accuracy: {test_accuracy * 100:.2f}%")
    return model

## Load model

In [None]:
# load teacher model
model = models.resnet18(pretrained=False)
# Replace the last fully connected layer
num_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(num_features, 500),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(500, 50)
)
model_path = 'tensorboard_logs/2023-12-02_120014_resnet18/best_model_resnet18.pth'
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
print(f'model from {model_path} loaded')



model from tensorboard_logs/2023-12-02_120014_resnet18/best_model_resnet18.pth loaded


## resnet50 trained model

In [None]:
model_size(model)

93.88278198242188

In [None]:
for n, p in model.named_parameters():
  print(n, ": ", p.dtype)

conv1.weight :  torch.float32
bn1.weight :  torch.float32
bn1.bias :  torch.float32
layer1.0.conv1.weight :  torch.float32
layer1.0.bn1.weight :  torch.float32
layer1.0.bn1.bias :  torch.float32
layer1.0.conv2.weight :  torch.float32
layer1.0.bn2.weight :  torch.float32
layer1.0.bn2.bias :  torch.float32
layer1.0.conv3.weight :  torch.float32
layer1.0.bn3.weight :  torch.float32
layer1.0.bn3.bias :  torch.float32
layer1.0.downsample.0.weight :  torch.float32
layer1.0.downsample.1.weight :  torch.float32
layer1.0.downsample.1.bias :  torch.float32
layer1.1.conv1.weight :  torch.float32
layer1.1.bn1.weight :  torch.float32
layer1.1.bn1.bias :  torch.float32
layer1.1.conv2.weight :  torch.float32
layer1.1.bn2.weight :  torch.float32
layer1.1.bn2.bias :  torch.float32
layer1.1.conv3.weight :  torch.float32
layer1.1.bn3.weight :  torch.float32
layer1.1.bn3.bias :  torch.float32
layer1.2.conv1.weight :  torch.float32
layer1.2.bn1.weight :  torch.float32
layer1.2.bn1.bias :  torch.float32
lay

## pytorch

### Post-Training Dynamic/Weight-only Quantization

In [None]:
# Post-Training Dynamic/Weight-only Quantization
'''
torch.quantization.quantize_dynamic. Currently only Linear and Recurrent (LSTM, GRU, RNN)
'''
import torch
from torch import nn

model.eval()

## EAGER MODE
from torch.quantization import quantize_dynamic
model_quantized = quantize_dynamic(
    model=model, qconfig_spec={nn.LSTM, nn.Linear}, dtype=torch.qint8, inplace=False
)

## FX MODE
from torch.quantization import quantize_fx
qconfig_dict = {"": torch.quantization.default_dynamic_qconfig}  # An empty key denotes the default applied to all modules
example_inputs = (torch.randn(1, 3, 224, 224),)
model_prepared = quantize_fx.prepare_fx(model, qconfig_dict, example_inputs)
model_quantized = quantize_fx.convert_fx(model_prepared)



In [None]:
def print_size_of_model(model, label=""):
    torch.save(model.state_dict(), "temp.p")
    size=os.path.getsize("temp.p")
    print("model: ",label,' \t','Size (MB):', size/1e6)
    os.remove('temp.p')
    return size

print_size_of_model(model, 'float32')
print_size_of_model(model_quantized, 'Dynamic/Weight-only Quantization')

print()

model:  float32  	 Size (MB): 98.526044
model:  Dynamic/Weight-only Quantization  	 Size (MB): 95.010924



In [None]:
evaluate(model_quantized, esc50_test_loader, device)

0.8675

### Post-Training Static Quantization (PTQ) 1 - resnet18

In [None]:
import copy

## FX GRAPH
from torch.quantization import quantize_fx

backend = "fbgemm"
m = copy.deepcopy(model)
m.eval()
qconfig_dict = {"": torch.quantization.get_default_qconfig(backend)}
# Prepare
example_inputs = (torch.randn(1, 3, 224, 224),)
model_prepared = quantize_fx.prepare_fx(m, qconfig_dict, example_inputs)
# Calibrate - Use representative (validation) data.
# with torch.inference_mode():
#   for _ in range(10):
#     x = torch.rand(1,2,28, 28)
#     model_prepared(x)

model_prepared.to(torch.device("cpu:0"))
with torch.inference_mode():
    for batch in esc50_test_loader:
        inputs, targets = batch
        inputs = inputs.to('cpu')
        model_prepared(inputs)
# quantize
model_quantized = quantize_fx.convert_fx(model_prepared)



In [None]:
evaluate(model_quantized, esc50_test_loader, 'cpu')

0.8525

In [None]:
print_size_of_model(model_quantized, 'Post-Training Static Quantization (PTQ)')
print()

model:  Post-Training Static Quantization (PTQ)  	 Size (MB): 11.593976



In [None]:
import time

# Prepare input data (replace this with your actual input data)
input_data = torch.randn(1, 3, 224, 224)

# Warm-up the model (optional but recommended)
warmup_iterations = 10
for _ in range(warmup_iterations):
    _ = model_quantized(input_data)

# Measure inference time
num_iterations = 100  # Adjust as needed
total_time = 0.0

for _ in range(num_iterations):
    start_time = time.time()
    with torch.no_grad():
        _ = model_quantized(input_data)
    end_time = time.time()

    iteration_time = end_time - start_time
    total_time += iteration_time

average_latency = total_time / num_iterations

print(f"Average Latency: {(average_latency * 1000):.2f} ms seconds")

Average Latency: 33.08 ms seconds


In [None]:
import time

# Prepare input data (replace this with your actual input data)
input_data = torch.randn(1, 3, 224, 224).to(torch.device('cuda'))

# Warm-up the model (optional but recommended)
warmup_iterations = 10
for _ in range(warmup_iterations):
    _ = model(input_data)

# Measure inference time
num_iterations = 100  # Adjust as needed
total_time = 0.0

for _ in range(num_iterations):
    start_time = time.time()
    with torch.no_grad():
        _ = model(input_data)
    end_time = time.time()

    iteration_time = end_time - start_time
    total_time += iteration_time

average_latency = total_time / num_iterations

print(f"Average Latency: {(average_latency * 1000):.2f} ms seconds")

Average Latency: 12.83 ms seconds


### Quantization Aware Training with FX Graph Mode

In [None]:
epoches = 25
patience = 10
early_stopping_patience=15
load_model_name = "resnet18_quantized"
classifier_type='MLP'

loss_fn = nn.CrossEntropyLoss()

In [None]:
#sample site
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms

from torch.ao.quantization import get_default_qat_qconfig_mapping
from torch.ao.quantization.quantize_fx import prepare_qat_fx, convert_fx

import torch.onnx

import time
import copy
import numpy as np

from torchvision.models import resnet18

def prepare_dataloader(num_workers=8, train_batch_size=128, eval_batch_size=256):
    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
    ])

    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
    ])

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=train_transform)
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=test_transform)

    train_sampler = torch.utils.data.RandomSampler(train_set)
    test_sampler = torch.utils.data.SequentialSampler(test_set)

    train_loader = torch.utils.data.DataLoader(
        dataset=train_set, batch_size=train_batch_size,
        sampler=train_sampler, num_workers=num_workers)

    test_loader = torch.utils.data.DataLoader(
        dataset=test_set, batch_size=eval_batch_size,
        sampler=test_sampler, num_workers=num_workers)

    return train_loader, test_loader

def evaluate_model(model, test_loader, device, criterion=None):
    model.eval()
    model.to(device)

    running_loss = 0
    running_corrects = 0

    for inputs, labels in test_loader:

        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)

        if criterion is not None:
            loss = criterion(outputs, labels).item()
        else:
            loss = 0

        # statistics
        running_loss += loss * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    eval_loss = running_loss / len(test_loader.dataset)
    eval_accuracy = running_corrects / len(test_loader.dataset)

    return eval_loss, eval_accuracy

def train_model(model, train_loader, test_loader, device, learning_rate=1e-1, num_epochs=200):
    # The training configurations were not carefully selected.
    criterion = nn.CrossEntropyLoss()

    model.to(device)

    # It seems that SGD optimizer is better than Adam optimizer for ResNet18 training on CIFAR10.
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-4)
    # scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=500)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100, 150], gamma=0.1, last_epoch=-1)

    # Evaluation
    model.eval()
    eval_loss, eval_accuracy = evaluate_model(model=model, test_loader=test_loader, device=device, criterion=criterion)
    print("Epoch: {:02d} Eval Loss: {:.3f} Eval Acc: {:.3f}".format(-1, eval_loss, eval_accuracy))

    for epoch in range(num_epochs):

        # Training
        model.train()

        running_loss = 0
        running_corrects = 0

        for inputs, labels in train_loader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = running_corrects / len(train_loader.dataset)

        # Evaluation
        model.eval()
        eval_loss, eval_accuracy = evaluate_model(model=model, test_loader=test_loader, device=device, criterion=criterion)

        # Set learning rate scheduler
        scheduler.step()

        print("Epoch: {:03d} Train Loss: {:.3f} Train Acc: {:.3f} Eval Loss: {:.3f} Eval Acc: {:.3f}".format(epoch, train_loss, train_accuracy, eval_loss, eval_accuracy))

    return model

def measure_inference_latency(model,
                              device,
                              input_size=(1, 3, 32, 32),
                              num_samples=100,
                              num_warmups=10):
    model.to(device)
    model.eval()

    x = torch.rand(size=input_size).to(device)

    with torch.no_grad():
        for _ in range(num_warmups):
            _ = model(x)
    torch.cuda.synchronize()

    with torch.no_grad():
        start_time = time.time()
        for _ in range(num_samples):
            _ = model(x)
            torch.cuda.synchronize()
        end_time = time.time()
    elapsed_time = end_time - start_time
    elapsed_time_ave = elapsed_time / num_samples

    return elapsed_time_ave

def save_model(model, model_dir, model_filename):
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_filepath = os.path.join(model_dir, model_filename)
    torch.save(model.state_dict(), model_filepath)

def load_model(model, model_filepath, device):
    model.load_state_dict(torch.load(model_filepath, map_location=device))
    return model

def save_torchscript_model(model, model_dir, model_filename):
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_filepath = os.path.join(model_dir, model_filename)
    torch.jit.save(torch.jit.script(model), model_filepath)

def load_torchscript_model(model_filepath, device):
    model = torch.jit.load(model_filepath, map_location=device)
    return model

def model_equivalence(model_1, model_2, device, rtol=1e-05, atol=1e-08, num_tests=100, input_size=(1,3,32,32)):
    model_1.to(device)
    model_2.to(device)

    for _ in range(num_tests):
        x = torch.rand(size=input_size).to(device)
        y1 = model_1(x).detach().cpu().numpy()
        y2 = model_2(x).detach().cpu().numpy()
        if np.allclose(a=y1, b=y2, rtol=rtol, atol=atol, equal_nan=False) == False:
            print("Model equivalence test sample failed: ")
            print(y1)
            print(y2)
            return False
    return True

In [None]:
# num_classes = 10
cuda_device = torch.device("cuda:0")
cpu_device = torch.device("cpu:0")

model_dir = "saved_models"
model_filename = "resnet18.pt"
prepared_model_filename = "resnet18_prepared_model.pt"
quantized_model_filename = "resnet18_quantized.pt"
model_filepath = os.path.join(model_dir, model_filename)
quantized_model_filepath = os.path.join(model_dir, quantized_model_filename)

In [None]:


# Create an untrained model.
# model = model = resnet18(num_classes=num_classes, pretrained=False)

# train_loader, test_loader = prepare_dataloader(num_workers=8, train_batch_size=128, eval_batch_size=256)

# Train model.
# print("Training Model...")
# model = train_model(model=model, train_loader=train_loader, test_loader=test_loader, device=cuda_device, learning_rate=1e-1, num_epochs=5)
# Save model.
save_model(model=model, model_dir=model_dir, model_filename=model_filename)

# Prepare a model for quantization aware training
model.to(cpu_device)
model_to_quantize = copy.deepcopy(model)
qconfig_mapping = get_default_qat_qconfig_mapping("fbgemm")
example_inputs = torch.rand(size=(1,3,224,224)).to(cpu_device)
prepared_model = prepare_qat_fx(model_to_quantize, qconfig_mapping, example_inputs)

# Print FP32 model.
# print(model)
# Print fused model.
# print(prepared_model)

# Model and fused model should be equivalent.
model.eval()
prepared_model.eval()
assert model_equivalence(model_1=model, model_2=prepared_model, device=cpu_device, rtol=1e-01, atol=3, num_tests=100, input_size=(1,3,224,224)), "Fused model is not equivalent to the original model!"

# Quantization aware training
print("Training QAT Model...")
prepared_model.train()
prepared_model.to(cuda_device)
# train_model(model=prepared_model, train_loader=train_loader, test_loader=test_loader, device=cuda_device, learning_rate=1e-3, num_epochs=5)
optimizer = optim.Adam([
                        {'params': prepared_model.conv1.parameters(), 'lr': 1e-4},
                        {'params': prepared_model.layer1.parameters(), 'lr': 1e-4},
                        {'params': prepared_model.layer2.parameters(), 'lr': 1e-4},
                        {'params': prepared_model.layer3.parameters(), 'lr': 1e-4},
                        {'params': prepared_model.layer4.parameters(), 'lr': 1e-4},
                        {'params': prepared_model.fc.parameters(), 'lr': 1e-8}
                        ], lr=1e-2)


prepared_model = train(prepared_model, optimizer, loss_fn, esc50_train_loader, esc50_val_loader, esc50_test_loader, load_model_name, epochs=epoches, device=device,
      log_dir = 'tensorboard_logs', patience=patience, early_stopping_patience=early_stopping_patience, classifier_type=classifier_type)
prepared_model.to(cpu_device)

print('evaluate: ', evaluate(prepared_model, esc50_test_loader, cpu_device))

# Save model.
save_model(model=prepared_model, model_dir=model_dir, model_filename=prepared_model_filename)


  torch.has_cuda,
  torch.has_cudnn,
  torch.has_mps,
  torch.has_mkldnn,


Training QAT Model...


  return torch.fused_moving_avg_obs_fake_quant(
  return torch.fused_moving_avg_obs_fake_quant(


Epoch: 1, Learning Rate: 0.0001, Training Loss: 2.66, Training Accuracy: 0.9000, Validation Loss: 2.02, Validation Accuracy: 0.7775
Epoch: 2, Learning Rate: 0.0001, Training Loss: 1.10, Training Accuracy: 0.9831, Validation Loss: 1.45, Validation Accuracy: 0.8025
Epoch: 3, Learning Rate: 0.0001, Training Loss: 0.71, Training Accuracy: 0.9944, Validation Loss: 1.23, Validation Accuracy: 0.8225
Epoch: 4, Learning Rate: 0.0001, Training Loss: 0.60, Training Accuracy: 0.9969, Validation Loss: 1.21, Validation Accuracy: 0.8150
Epoch: 5, Learning Rate: 0.0001, Training Loss: 0.53, Training Accuracy: 0.9962, Validation Loss: 1.22, Validation Accuracy: 0.8000
Epoch: 6, Learning Rate: 0.0001, Training Loss: 0.48, Training Accuracy: 0.9988, Validation Loss: 1.20, Validation Accuracy: 0.8175
Epoch: 7, Learning Rate: 0.0001, Training Loss: 0.46, Training Accuracy: 1.0000, Validation Loss: 1.14, Validation Accuracy: 0.8325
Epoch: 8, Learning Rate: 0.0001, Training Loss: 0.44, Training Accuracy: 0.9

Tensor-likes are not close!

Mismatched elements: 764 / 800 (95.5%)
Greatest absolute difference: 0.1461324691772461 at index (6, 35) (up to 1e-05 allowed)
Greatest relative difference: inf at index (2, 9) (up to 1e-05 allowed)
  _check_trace(


Feature Extractor: resnet18_quantized
Model Accuracy: 0.8500
Params (M): 11.4581
Size of model (MB): 44.0067
Latency of model (ms): 25.7802
Classifier type: MLP
Training type: Normal

 Test Accuracy: 84.25%
evaluate:  0.8325


In [None]:
# Convert trained model to quantized model
quantized_model = convert_fx(prepared_model)

quantized_model.eval()
# Save quantized model.
save_torchscript_model(model=quantized_model, model_dir=model_dir, model_filename=quantized_model_filename)



In [None]:
# Load quantized model.
quantized_jit_model = load_torchscript_model(model_filepath=quantized_model_filepath, device=cpu_device)

fp32_eval_accuracy = evaluate(model, esc50_test_loader, device=cpu_device)
int8_eval_accuracy = evaluate(quantized_jit_model, esc50_test_loader, device=cpu_device)

print("FP32 evaluation accuracy: {:.3f}".format(fp32_eval_accuracy))
print("INT8 evaluation accuracy: {:.3f}".format(int8_eval_accuracy))


FP32 evaluation accuracy: 0.835
INT8 evaluation accuracy: 0.825


In [None]:
fp32_cpu_inference_latency = measure_inference_latency(model=model, device=cpu_device, input_size=(1,3,224,224), num_samples=100)
int8_cpu_inference_latency = measure_inference_latency(model=quantized_model, device=cpu_device, input_size=(1,3,224,224), num_samples=100)
int8_jit_cpu_inference_latency = measure_inference_latency(model=quantized_jit_model, device=cpu_device, input_size=(1,3,224,224), num_samples=100)
fp32_gpu_inference_latency = measure_inference_latency(model=model, device=cuda_device, input_size=(1,3,224,224), num_samples=100)

print("FP32 CPU Inference Latency: {:.2f} ms / sample".format(fp32_cpu_inference_latency * 1000))
print("FP32 CUDA Inference Latency: {:.2f} ms / sample".format(fp32_gpu_inference_latency * 1000))
print("INT8 CPU Inference Latency: {:.2f} ms / sample".format(int8_cpu_inference_latency * 1000))
print("INT8 JIT CPU Inference Latency: {:.2f} ms / sample".format(int8_jit_cpu_inference_latency * 1000))

FP32 CPU Inference Latency: 81.72 ms / sample
FP32 CUDA Inference Latency: 3.86 ms / sample
INT8 CPU Inference Latency: 34.76 ms / sample
INT8 JIT CPU Inference Latency: 45.23 ms / sample


In [None]:
print_size_of_model(model, 'model')
print_size_of_model(prepared_model, 'prepared_model')
print_size_of_model(quantized_model, 'quantized_model')
print()

model:  model  	 Size (MB): 45.90489
model:  prepared_model  	 Size (MB): 46.004802
model:  quantized_model  	 Size (MB): 11.593976

