# Preprocessing

In [1]:
import copy
import math
import random
from collections import OrderedDict, defaultdict
from matplotlib import pyplot as plt
from matplotlib.colors import ListedColormap
import numpy as np
from tqdm.auto import tqdm

import torch
from torch import nn
from torch.optim import *
from torch.optim.lr_scheduler import *
import torchvision.models as models
import torchvision
from torch.utils.data import DataLoader

from torchvision.datasets import *
from torchvision.transforms import *


no_cuda = False
use_gpu = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [72]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import copy

def bn_folding_model(model):

    new_model = copy.deepcopy(model)

    module_names = list(new_model._modules)

    for k, name in enumerate(module_names):
        print(k,name)
        if len(list(new_model._modules[name]._modules)) > 0:
            new_model._modules[name] = bn_folding_model(new_model._modules[name])
            # print(k)
            # print("-"*5)
            # print(name)
            # print("-"*5)
            # print(new_model._modules[name]._modules)
            
        else:
            if isinstance(new_model._modules[name], nn.BatchNorm2d):
                if isinstance(new_model._modules[module_names[k-1]], nn.Conv2d):

                    # Folded BN
                    folded_conv = fold_conv_bn_eval(new_model._modules[module_names[k-1]], new_model._modules[name])

                    # Replace old weight values
                    new_model._modules.pop(name) # Remove the BN layer
                    new_model._modules[module_names[k-1]] = folded_conv # Replace the Convolutional Layer by the folded version
                    print("yo")
    return new_model



def bn_folding(conv_w, conv_b, bn_rm, bn_rv, bn_eps, bn_w, bn_b):
    if conv_b is None:
        conv_b = bn_rm.new_zeros(bn_rm.shape)
    bn_var_rsqrt = torch.rsqrt(bn_rv + bn_eps)
    
    w_fold = conv_w * (bn_w * bn_var_rsqrt).view(-1, 1, 1, 1)
    b_fold = (conv_b - bn_rm) * bn_var_rsqrt * bn_w + bn_b
    
    return torch.nn.Parameter(w_fold), torch.nn.Parameter(b_fold)


def fold_conv_bn_eval(conv, bn):
    assert(not (conv.training or bn.training)), "Fusion only for eval!"
    fused_conv = copy.deepcopy(conv)

    fused_conv.weight, fused_conv.bias = bn_folding(fused_conv.weight, fused_conv.bias,
                             bn.running_mean, bn.running_var, bn.eps, bn.weight, bn.bias)

    return fused_conv

In [73]:
def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    :param v:
    :param divisor:
    :param min_value:
    :return:
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class h_sigmoid(nn.Module):
    def __init__(self, inplace=True):
        super(h_sigmoid, self).__init__()
        self.relu = nn.ReLU6(inplace=inplace)

    def forward(self, x):
        return self.relu(x + 3) / 6


class h_swish(nn.Module):
    def __init__(self, inplace=True):
        super(h_swish, self).__init__()
        self.sigmoid = h_sigmoid(inplace=inplace)

    def forward(self, x):
        return x * self.sigmoid(x)


class SELayer(nn.Module):
    def __init__(self, channel, reduction=4):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
                nn.Linear(channel, _make_divisible(channel // reduction, 8)),
                nn.ReLU(inplace=True),
                nn.Linear(_make_divisible(channel // reduction, 8), channel),
                h_sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y
 

def pw_conv(infp,outfp):
    return nn.Sequential(
        nn.Conv2d(in_channels=infp, out_channels=outfp, kernel_size=1, stride=1,padding= 0, bias=False)
    )

def dw_conv(infp,outfp,kernel_size,stride,padding):
    return nn.Sequential(
        nn.Conv2d(infp,outfp,kernel_size,stride=stride,padding=padding,groups=infp)
    )

class MobileNetV3_block(nn.Module):
    def __init__(self, infp, outfp, middle_feature ,kernel_size,stride,padding):
        super(MobileNetV3_block, self).__init__()

        ############ My Design #########
        #self.pw1 = pw_conv(infp,middle_feature)
        self.pw1 = nn.Conv2d(in_channels=infp, out_channels=middle_feature, kernel_size=1, stride=1,padding= 0, bias=False)
        self.bn1 = nn.BatchNorm2d(middle_feature)
        self.hs1 = h_swish()

        self.dw1 = nn.Conv2d(middle_feature,middle_feature,kernel_size,stride=stride,padding=padding,groups=middle_feature)
        #self.dw1 = dw_conv(middle_feature,middle_feature,kernel_size,stride,padding)
        self.bn2 = nn.BatchNorm2d(middle_feature)
        self.hs2 = h_swish()
        self.se1 = SELayer(middle_feature)
        self.hs3 = h_swish()
        self.pw2 = nn.Conv2d(in_channels=middle_feature, out_channels=outfp, kernel_size=1, stride=1,padding= 0, bias=False)
        #self.pw2 = pw_conv(middle_feature, outfp)
        self.bn3 = nn.BatchNorm2d( outfp)
    
    def forward(self,x):
        out = self.pw1(x)
        out = self.bn1(out)
        out = self.hs1(out)
        out = self.dw1(out)
        out = self.bn2(out)
        out = self.hs2(out)
        out = self.se1(out)
        out = self.hs3(out)
        out = self.pw2(out)
        out = self.bn3(out)
        return out

class Our_MobileNetV3(nn.Module):
    def __init__(self):
        super(Our_MobileNetV3, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1,out_channels=8,kernel_size=3,stride=1,padding=1)
        self.block1 = MobileNetV3_block(infp = 8, outfp=16 , middle_feature=8 ,kernel_size=3,stride=2,padding=1)
        self.block2 = MobileNetV3_block(infp = 16, outfp=32 , middle_feature=48 ,kernel_size=3,stride=2,padding=1)
        self.block3 = MobileNetV3_block(infp = 32, outfp=32 , middle_feature=64 ,kernel_size=3,stride=2,padding=1)
        self.block4 = MobileNetV3_block(infp = 32, outfp=48 , middle_feature=64 ,kernel_size=3,stride=2,padding=1)
        self.block5 = MobileNetV3_block(infp = 48, outfp=64 , middle_feature=96 ,kernel_size=3,stride=2,padding=1)
        self.block6 = MobileNetV3_block(infp = 64, outfp=64 , middle_feature=96 ,kernel_size=3,stride=2,padding=1)
        self.block7 = MobileNetV3_block(infp = 64, outfp=32 , middle_feature=48 ,kernel_size=3,stride=2,padding=1)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.linear1 = nn.Linear(32,20)
        self.swish = h_swish()
        self.linear2 = nn.Linear(20,10)

    
    def forward(self,x):
        out = self.conv1(x)
        out = self.block1(out)
        out = self.block2(out)
        out = self.block3(out)
        out = self.block4(out)
        out = self.block5(out)
        out = self.block6(out)
        out = self.block7(out)
        out = self.avgpool(out)
        out = out.view(out.size(0),-1)
        out = self.linear1(out)
        out = self.swish(out)
        out = self.linear2(out)
        return out


In [11]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

batch_size = 32

#Dataset
train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

#Dataloader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [26]:
empty_net = []

empty_net.append(nn.Conv2d(in_channels=1,out_channels=8,kernel_size=3,stride=1,padding=1))


Create NN model

In [77]:
trained_net = Our_MobileNetV3().cuda()

input_shape = torch.randn(1,1,28,28).cuda()
trained_net_weight = torch.load("N26122246_minist_best.ckpt")
trained_net.load_state_dict(trained_net_weight,strict=False)
trained_net.eval()

# print(trained_net.block1)
# cur_block = getattr(train_net, 'block"+str(i))
#print(trained_net.block1)
print(trained_net.block1)
print("-"*5)
BN_fold_trained_net=   bn_folding_model(trained_net)
print(BN_fold_block1)
# print(trained_net)
# print("-"*5)
#print(BN_fold_trained_net)
# print(trained_net_weight.keys())

# print(trained_net_weight['block1.bn1.weight'])

FP32_model = Our_MobileNetV3()
#print(FP32_model)

MobileNetV3_block(
  (pw1): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (hs1): h_swish(
    (sigmoid): h_sigmoid(
      (relu): ReLU6(inplace=True)
    )
  )
  (dw1): Conv2d(8, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=8)
  (bn2): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (hs2): h_swish(
    (sigmoid): h_sigmoid(
      (relu): ReLU6(inplace=True)
    )
  )
  (se1): SELayer(
    (avg_pool): AdaptiveAvgPool2d(output_size=1)
    (fc): Sequential(
      (0): Linear(in_features=8, out_features=8, bias=True)
      (1): ReLU(inplace=True)
      (2): Linear(in_features=8, out_features=8, bias=True)
      (3): h_sigmoid(
        (relu): ReLU6(inplace=True)
      )
    )
  )
  (hs3): h_swish(
    (sigmoid): h_sigmoid(
      (relu): ReLU6(inplace=True)
    )
  )
  (pw2): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (b

In [6]:
#train model
def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  #Set the model to train mode
  model.train()
  for batch, (x, y) in enumerate(dataloader):
    if use_gpu:
      x, y = x.cuda(), y.cuda()
    optimizer.zero_grad()
    #forward
    pred = model(x)

    #loss
    loss = loss_fn(pred, y)

    #backward
    loss.backward()

    #optimize
    optimizer.step()

    if batch % 100 == 0:
      loss, current = loss.item(), (batch + 1) * len(x)
      print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_loop(dataloader, model, loss_fn):
  #set model to evaluate mode
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0
  with torch.no_grad():
    for x, y in dataloader:
      if use_gpu:
        x, y = x.cuda(), y.cuda()
      pred = model(x)
      test_loss = loss_fn(pred, y).item()
      correct += (pred.argmax(1) == y).type(torch.float).sum().item() #calculate accuracy
  test_loss /= num_batches
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [7]:
learning_rate = 1e-3
epochs = 3
loss_fn = nn.CrossEntropyLoss() #define loss function
optimizer = torch.optim.SGD(FP32_model.parameters(), lr=learning_rate, momentum=0.9)  #define optimizer

FP32_model.to(device) #let model on GPU

Our_MobileNetV3(
  (conv1): Conv2d(1, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (block1): MobileNetV3_block(
    (pw1): Sequential(
      (0): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
    )
    (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (hs1): h_swish(
      (sigmoid): h_sigmoid(
        (relu): ReLU6(inplace=True)
      )
    )
    (dw1): Sequential(
      (0): Conv2d(8, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=8)
    )
    (bn2): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (hs2): h_swish(
      (sigmoid): h_sigmoid(
        (relu): ReLU6(inplace=True)
      )
    )
    (se1): SELayer(
      (avg_pool): AdaptiveAvgPool2d(output_size=1)
      (fc): Sequential(
        (0): Linear(in_features=8, out_features=8, bias=True)
        (1): ReLU(inplace=True)
        (2): Linear(in_features=8, out_features=8, bias=True)
        (3): h_sigmoid(
          (r

In [8]:
#Training
for i in range(epochs):
  print(f"Epoch {i+1}\n-------------------------------")
  train_loop(train_loader, FP32_model, loss_fn, optimizer)
  test_loop(test_loader, FP32_model, loss_fn)

Epoch 1
-------------------------------
loss: 2.360337  [   32/60000]
loss: 2.295292  [ 3232/60000]
loss: 2.213784  [ 6432/60000]
loss: 2.118740  [ 9632/60000]
loss: 2.020132  [12832/60000]
loss: 1.875932  [16032/60000]


KeyboardInterrupt: 

# Quantization definition

####Question 1.####

Use
>$S=(r_{\mathrm{max}} - r_{\mathrm{min}}) / (q_{\mathrm{max}} - q_{\mathrm{min}})$

>$Z = q_{\mathrm{min}} - r_{\mathrm{min}} / S$

to calculate scale factor and zero point of a tensor


In [None]:
def get_scale_and_zero_point(fp32_tensor, bitwidth=8):
  q_min, q_max = -2**(bitwidth-1), 2**(bitwidth-1) - 1
  fp_min = fp32_tensor.min().item()
  fp_max = fp32_tensor.max().item()

  #####################################################

  scale = (fp_max-fp_min) / (q_max-q_min)
  zero_point = q_min-fp_min /scale

  #####################################################


  zero_point = round(zero_point)          #round
  zero_point = max(q_min, min(zero_point, q_max)) #clip

  return scale, int(zero_point)

####Question 2.####

Use $q=r/S + Z$ to quantize a tensor

In [None]:
def linear_quantize(fp32_tensor, bitwidth=8):
  q_min, q_max = -2**(bitwidth-1), 2**(bitwidth-1) - 1

  scale, zero_point = get_scale_and_zero_point(fp32_tensor)

  #####################################################

  q_tensor = torch.round( fp32_tensor/scale ) +zero_point

  #####################################################

  #clamp
  q_tensor = torch.clamp(q_tensor, q_min, q_max)
  return q_tensor, scale, zero_point

####Question 3.####

Use
> $q_{\mathrm{output}} = M * \mathrm{Linear}[q_{\mathrm{input}}, q_{\mathrm{weight}}] + Z_{\mathrm{output}}$

> $M = S_{\mathrm{input}} * S_{\mathrm{weight}} / S_{\mathrm{output}}$

to compute quantized linear operation

In [None]:
def quantized_conv(input, weights, stride, padding,groups,input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, device, bitwidth=8, activation_bitwidth=8):
  input, weights = input.to(device), weights.to(device)

  #####################################################

  M = input_scale * weight_scale / output_scale
  output = torch.nn.functional.conv2d((input - input_zero_point ), (weights - weight_zero_point),stride=stride,padding=padding,groups=groups)
  output *= M
  output += output_zero_point

  #####################################################

  #clamp and round
  output = output.round().clamp(-2**(activation_bitwidth-1), 2**(activation_bitwidth-1)-1)

  return output

def quantized_linear(input, weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, device, bitwidth=8, activation_bitwidth=8):
  input, weights = input.to(device), weights.to(device)

  #####################################################

  M = input_scale * weight_scale / output_scale
  output = torch.nn.functional.linear((input - input_zero_point ), (weights - weight_zero_point))
  output *= M
  output += output_zero_point

  #####################################################

  #clamp and round
  output = output.round().clamp(-2**(activation_bitwidth-1), 2**(activation_bitwidth-1)-1)

  return output

# Design quantized linear layer and preprocess

In [None]:
class QuantizedConv(nn.Module):
  def __init__(self, weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, bitwidth=8, activation_bitwidth=8):
    super().__init__()
    self.weights = weights
    self.input_scale, self.input_zero_point = input_scale, input_zero_point
    self.weight_scale, self.weight_zero_point = weight_scale, weight_zero_point
    self.output_scale, self.output_zero_point = output_scale, output_zero_point

    self.bitwidth = bitwidth
    self.activation_bitwidth = activation_bitwidth

  def forward(self, x):
    return quantized_linear(x, self.weights, self.input_scale, self.weight_scale, self.output_scale, self.input_zero_point, self.weight_zero_point, self.output_zero_point, device)
  def __repr__(self):
    return f"QuantizedConv(in_channels={self.weights.size(1)}, out_channels={self.weights.size(0)})"

class QuantizedLinear(nn.Module):
  def __init__(self, weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point, bitwidth=8, activation_bitwidth=8):
    super().__init__()
    self.weights = weights
    self.input_scale, self.input_zero_point = input_scale, input_zero_point
    self.weight_scale, self.weight_zero_point = weight_scale, weight_zero_point
    self.output_scale, self.output_zero_point = output_scale, output_zero_point

    self.bitwidth = bitwidth
    self.activation_bitwidth = activation_bitwidth

  def forward(self, x):
    return quantized_linear(x, self.weights, self.input_scale, self.weight_scale, self.output_scale, self.input_zero_point, self.weight_zero_point, self.output_zero_point, device)
  def __repr__(self):
    return f"QuantizedLinear(in_channels={self.weights.size(1)}, out_channels={self.weights.size(0)})"

#Transform input data to correct integer range
class Preprocess(nn.Module):
  def __init__(self, input_scale, input_zero_point, activation_bitwidth=8):
    super().__init__()
    self.input_scale, self.input_zero_point = input_scale, input_zero_point
    self.activation_bitwidth = activation_bitwidth
  def forward(self, x):
    x = x / self.input_scale + self.input_zero_point
    return x

# Calibration

In [None]:
# add hook to record the min max value of the activation
input_activation = {}
output_activation = {}

#Define a hook to record the feature map of each layer
def add_range_recoder_hook(model):
    import functools
    def _record_range(self, x, y, module_name):
        x = x[0]
        input_activation[module_name] = x.detach()
        output_activation[module_name] = y.detach()

    all_hooks = []
    for name, m in model.named_modules():
        if isinstance(m, (nn.Linear, nn.ReLU)):
            all_hooks.append(m.register_forward_hook(
                functools.partial(_record_range, module_name=name)))

    return all_hooks

hooks = add_range_recoder_hook(FP32_model)
sample_data = iter(train_loader).__next__()[0].to(device) #Use a batch of training data to calibrate
FP32_model(sample_data) #Forward to use hook
print(input_activation.values())
print("-"*5)
print(output_activation.values())
# remove hooks
for h in hooks:
    h.remove()


NameError: name 'FP32_model' is not defined

# Quantize model

In [None]:
#copy original model
quantized_model = copy.deepcopy(FP32_model)

#Record each layer in original model
quantized_backbone = []
i = 0

#Record input scale and zero point
input_scale, input_zero_point = get_scale_and_zero_point(input_activation["backbone.0"])
preprocess = Preprocess(input_scale, input_zero_point)
quantized_backbone.append(preprocess)

#Record Linear + ReLU of the model (except the last Linear)
while i < len(quantized_model.backbone) - 1:
  if isinstance(quantized_model.backbone[i], nn.Linear) and isinstance(quantized_model.backbone[i+1], nn.ReLU):
    linear = quantized_model.backbone[i]
    linear_name = f"backbone.{i}"
    relu = quantized_model.backbone[i + 1]
    relu_name = f"backbone.{i + 1}"

    #Use the calibration data to calculate scale and zero point of each layer
    input_scale, input_zero_point = get_scale_and_zero_point(input_activation[linear_name])
    output_scale, output_zero_point = get_scale_and_zero_point(output_activation[relu_name])
    quantized_weights, weight_scale, weight_zero_point = linear_quantize(linear.weight.data)

    quantizedLinear = QuantizedLinear(quantized_weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point)

    quantized_backbone.append(quantizedLinear)
    i += 2

#Record the last Linear layer
linear = quantized_model.backbone[4]
linear_name = f"backbone.4"
input_scale, input_zero_point = get_scale_and_zero_point(input_activation[linear_name])
output_scale, output_zero_point = get_scale_and_zero_point(output_activation[linear_name])
quantized_weights, weight_scale, weight_zero_point = linear_quantize(linear.weight.data)
quantizedLinear = QuantizedLinear(quantized_weights, input_scale, weight_scale, output_scale, input_zero_point, weight_zero_point, output_zero_point)
quantized_backbone.append(quantizedLinear)


quantized_model.backbone = nn.Sequential(*quantized_backbone)

In [None]:
print(quantized_model)

ToyModel(
  (backbone): Sequential(
    (0): Preprocess()
    (1): QuantizedLinear(in_channels=784, out_channels=120)
    (2): QuantizedLinear(in_channels=120, out_channels=84)
    (3): QuantizedLinear(in_channels=84, out_channels=10)
  )
)


# Evaluate

In [None]:
test_loop(test_loader, FP32_model, loss_fn)

Test Error: 
 Accuracy: 84.2%, Avg loss: 0.000704 



In [None]:
test_loop(test_loader, quantized_model, loss_fn)

Test Error: 
 Accuracy: 84.2%, Avg loss: 0.002995 

