# Preprocessing

In [None]:
import copy
import math
import random
from collections import OrderedDict, defaultdict

from matplotlib import pyplot as plt
from matplotlib.colors import ListedColormap
import numpy as np
from tqdm.auto import tqdm

import torch
from torch import nn
from torch.optim import *
from torch.optim.lr_scheduler import *
import torchvision.models as models
import torchvision
from torch.utils.data import DataLoader

from torchvision.datasets import *
from torchvision.transforms import *


no_cuda = False
use_gpu = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

In [None]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

batch_size = 32

#Dataset
train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

#Dataloader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 26421880/26421880 [00:01<00:00, 16215578.28it/s]


Extracting ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 29515/29515 [00:00<00:00, 312432.67it/s]


Extracting ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 4422102/4422102 [00:00<00:00, 5245944.51it/s]


Extracting ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 5148/5148 [00:00<00:00, 14973839.80it/s]

Extracting ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw






Create NN model

In [None]:
class ToyModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.backbone = nn.Sequential(
      nn.Linear(28*28, 120, bias=False),
      nn.ReLU(),
      nn.Linear(120, 84, bias=False),
      nn.ReLU(),
      nn.Linear(84, 10, bias=False)
    )

  def forward(self, x):
    x = x.view(-1, 28 * 28) #transform 28*28 figure to 784 vector
    x = self.backbone(x)
    return x

FP32_model = ToyModel()
print(FP32_model)

ToyModel(
  (backbone): Sequential(
    (0): Linear(in_features=784, out_features=120, bias=False)
    (1): ReLU()
    (2): Linear(in_features=120, out_features=84, bias=False)
    (3): ReLU()
    (4): Linear(in_features=84, out_features=10, bias=False)
  )
)


In [None]:
#train model
def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  #Set the model to train mode
  model.train()
  for batch, (x, y) in enumerate(dataloader):
    if use_gpu:
      x, y = x.cuda(), y.cuda()
    optimizer.zero_grad()
    #forward
    pred = model(x)

    #loss
    loss = loss_fn(pred, y)

    #backward
    loss.backward()

    #optimize
    optimizer.step()

    if batch % 100 == 0:
      loss, current = loss.item(), (batch + 1) * len(x)
      print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_loop(dataloader, model, loss_fn):
  #set model to evaluate mode
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0
  with torch.no_grad():
    for x, y in dataloader:
      if use_gpu:
        x, y = x.cuda(), y.cuda()
      pred = model(x)
      test_loss = loss_fn(pred, y).item()
      correct += (pred.argmax(1) == y).type(torch.float).sum().item() #calculate accuracy
  test_loss /= num_batches
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
learning_rate = 1e-3
epochs = 3
loss_fn = nn.CrossEntropyLoss() #define loss function
optimizer = torch.optim.SGD(FP32_model.parameters(), lr=learning_rate, momentum=0.9)  #define optimizer

FP32_model.to(device) #let model on GPU

ToyModel(
  (backbone): Sequential(
    (0): Linear(in_features=784, out_features=120, bias=False)
    (1): ReLU()
    (2): Linear(in_features=120, out_features=84, bias=False)
    (3): ReLU()
    (4): Linear(in_features=84, out_features=10, bias=False)
  )
)

In [None]:
#Training
for i in range(epochs):
  print(f"Epoch {i+1}\n-------------------------------")
  train_loop(train_loader, FP32_model, loss_fn, optimizer)
  test_loop(test_loader, FP32_model, loss_fn)

# Quantization definition

####Question 1.####

Use
>$S=(r_{\mathrm{max}} - r_{\mathrm{min}}) / (q_{\mathrm{max}} - q_{\mathrm{min}})$

>$Z = q_{\mathrm{min}} - r_{\mathrm{min}} / S$

to calculate scale factor and zero point of a tensor


In [None]:
def get_scale_and_zero_point(fp32_tensor, bitwidth=8):
  q_min, q_max = -2**(bitwidth-1), 2**(bitwidth-1) - 1
  fp_min = fp32_tensor.min().item()
  fp_max = fp32_tensor.max().item()

  #####################################################

  scale = ( __ - __ ) / ( __ - __ )
  zero_point = __ - __ / __

  #####################################################


  zero_point = round(zero_point)          #round
  zero_point = max(q_min, min(zero_point, q_max)) #clip

  return scale, int(zero_point)

####Question 2.####

Use $q=r/S + Z$ to quantize a tensor

In [None]:
def linear_quantize(fp32_tensor, bitwidth=8):
  q_min, q_max = -2**(bitwidth-1), 2**(bitwidth-1) - 1

  scale, zero_point = get_scale_and_zero_point(fp32_tensor)

  #####################################################

  q_tensor = torch.round( __ / __ ) + __

  #####################################################

  #clamp
  q_tensor = torch.clamp(q_tensor, q_min, q_max)
  return q_tensor, scale, zero_point

####Question 3.####

Use
> $q_{\mathrm{output}} = M * \mathrm{Linear}[q_{\mathrm{input}}, q_{\mathrm{weight}}] + Z_{\mathrm{output}}$

> $M = S_{\mathrm{input}} * S_{\mathrm{weight}} / S_{\mathrm{output}}$

to compute quantized linear operation

In [None]:
def quantized_linear(input, weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, device, bitwidth=8, activation_bitwidth=8):
  input, weights = input.to(device), weights.to(device)

  #####################################################

  M = __ * __ / __
  output = torch.nn.functional.linear((input - __ ), (weights - __ ))
  output *= M
  output += output_zero_point

  #####################################################

  #clamp and round
  output = output.round().clamp(-2**(activation_bitwidth-1), 2**(activation_bitwidth-1)-1)

  return output

# Design quantized linear layer and preprocess

In [None]:
class QuantizedLinear(nn.Module):
  def __init__(self, weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, bitwidth=8, activation_bitwidth=8):
    super().__init__()
    self.weights = weights
    self.input_scale, self.input_zero_point = input_scale, input_zero_point
    self.weight_scale, self.weight_zero_point = weight_scale, weight_zero_point
    self.output_scale, self.output_zero_point = output_scale, output_zero_point

    self.bitwidth = bitwidth
    self.activation_bitwidth = activation_bitwidth

  def forward(self, x):
    return quantized_linear(x, self.weights, self.input_scale, self.weight_scale, self.output_scale, self.input_zero_point, self.weight_zero_point, self.output_zero_point, device)
  def __repr__(self):
    return f"QuantizedLinear(in_channels={self.weights.size(1)}, out_channels={self.weights.size(0)})"

#Transform input data to correct integer range
class Preprocess(nn.Module):
  def __init__(self, input_scale, input_zero_point, activation_bitwidth=8):
    super().__init__()
    self.input_scale, self.input_zero_point = input_scale, input_zero_point
    self.activation_bitwidth = activation_bitwidth
  def forward(self, x):
    x = x / self.input_scale + self.input_zero_point
    return x

# Calibration

In [None]:
# add hook to record the min max value of the activation
input_activation = {}
output_activation = {}

#Define a hook to record the feature map of each layer
def add_range_recoder_hook(model):
    import functools
    def _record_range(self, x, y, module_name):
        x = x[0]
        input_activation[module_name] = x.detach()
        output_activation[module_name] = y.detach()

    all_hooks = []
    for name, m in model.named_modules():
        if isinstance(m, (nn.Linear, nn.ReLU)):
            all_hooks.append(m.register_forward_hook(
                functools.partial(_record_range, module_name=name)))
    return all_hooks

hooks = add_range_recoder_hook(FP32_model)
sample_data = iter(train_loader).__next__()[0].to(device) #Use a batch of training data to calibrate
FP32_model(sample_data) #Forward to use hook

# remove hooks
for h in hooks:
    h.remove()


# Quantize model

In [None]:
#copy original model
quantized_model = copy.deepcopy(FP32_model)

#Record each layer in original model
quantized_backbone = []
i = 0

#Record input scale and zero point
input_scale, input_zero_point = get_scale_and_zero_point(input_activation["backbone.0"])
preprocess = Preprocess(input_scale, input_zero_point)
quantized_backbone.append(preprocess)

#Record Linear + ReLU of the model (except the last Linear)
while i < len(quantized_model.backbone) - 1:
  if isinstance(quantized_model.backbone[i], nn.Linear) and isinstance(quantized_model.backbone[i+1], nn.ReLU):
    linear = quantized_model.backbone[i]
    linear_name = f"backbone.{i}"
    relu = quantized_model.backbone[i + 1]
    relu_name = f"backbone.{i + 1}"

    #Use the calibration data to calculate scale and zero point of each layer
    input_scale, input_zero_point = get_scale_and_zero_point(input_activation[linear_name])
    output_scale, output_zero_point = get_scale_and_zero_point(output_activation[relu_name])
    quantized_weights, weight_scale, weight_zero_point = linear_quantize(linear.weight.data)

    quantizedLinear = QuantizedLinear(quantized_weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point)

    quantized_backbone.append(quantizedLinear)
    i += 2

#Record the last Linear layer
linear = quantized_model.backbone[4]
linear_name = f"backbone.4"
input_scale, input_zero_point = get_scale_and_zero_point(input_activation[linear_name])
output_scale, output_zero_point = get_scale_and_zero_point(output_activation[linear_name])
quantized_weights, weight_scale, weight_zero_point = linear_quantize(linear.weight.data)
quantizedLinear = QuantizedLinear(quantized_weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point)
quantized_backbone.append(quantizedLinear)


quantized_model.backbone = nn.Sequential(*quantized_backbone)

In [None]:
print(quantized_model)

ToyModel(
  (backbone): Sequential(
    (0): Preprocess()
    (1): QuantizedLinear(in_channels=784, out_channels=120)
    (2): QuantizedLinear(in_channels=120, out_channels=84)
    (3): QuantizedLinear(in_channels=84, out_channels=10)
  )
)


# Evaluate

In [None]:
test_loop(test_loader, FP32_model, loss_fn)

Test Error: 
 Accuracy: 83.9%, Avg loss: 0.000875 



In [None]:
test_loop(test_loader, quantized_model, loss_fn)

Test Error: 
 Accuracy: 83.9%, Avg loss: 0.004596 

