# **1.Import Pytorch**

In [1]:
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models.feature_extraction as feature_extraction
import torch.nn as nn
import torch.nn.functional as F

import os
from torchsummary import summary

no_cuda = False
use_gpu = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

# **2.Load Fashion MNIST Dataset**

In [2]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

batch_size = 32

#Dataset
train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

#Dataloader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 26421880/26421880 [00:01<00:00, 16731806.50it/s]


Extracting ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 29515/29515 [00:00<00:00, 272797.33it/s]


Extracting ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 4422102/4422102 [00:00<00:00, 5051057.60it/s]


Extracting ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 5148/5148 [00:00<00:00, 7190235.43it/s]

Extracting ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw






# **3.Create a NN model**

In [3]:
class ToyModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.nn1 = nn.Linear(28*28, 120)
    self.nn2 = nn.Linear(120, 84)
    self.nn3 = nn.Linear(84, 10)

  def forward(self, x):
    x = x.view(-1, 28 * 28)
    x = F.relu(self.nn1(x))
    x = F.relu(self.nn2(x))
    x = self.nn3(x)
    return x

In [4]:
#Print summary of model
FP32_model = ToyModel().to(device)
summary(FP32_model,(1,28,28))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                  [-1, 120]          94,200
            Linear-2                   [-1, 84]          10,164
            Linear-3                   [-1, 10]             850
Total params: 105,214
Trainable params: 105,214
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.40
Estimated Total Size (MB): 0.41
----------------------------------------------------------------


# **4.Train model**

In [5]:
learning_rate = 1e-3
epochs = 3
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(FP32_model.parameters(), lr=learning_rate, momentum=0.9)

FP32_model.to(device) #Put model on GPU

ToyModel(
  (nn1): Linear(in_features=784, out_features=120, bias=True)
  (nn2): Linear(in_features=120, out_features=84, bias=True)
  (nn3): Linear(in_features=84, out_features=10, bias=True)
)

In [6]:
#train model
def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  #Set the model to train mode
  model.train()
  for batch, (x, y) in enumerate(dataloader):
    if use_gpu:
      x, y = x.cuda(), y.cuda() #Put data on GPU
    optimizer.zero_grad()
    #forward
    pred = model(x)

    #loss
    loss = loss_fn(pred, y)

    #backward
    loss.backward()

    #optimize
    optimizer.step()

    if batch % 100 == 0:
      loss, current = loss.item(), (batch + 1) * len(x)
      print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test_loop(dataloader, model, loss_fn):
  #set model to evaluate mode
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0
  with torch.no_grad():
    for x, y in dataloader:
      if use_gpu:
        x, y = x.cuda(), y.cuda()
      pred = model(x)
      test_loss = loss_fn(pred, y).item()
      correct += (pred.argmax(1) == y).type(torch.float).sum().item()
  test_loss /= num_batches
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [7]:
for i in range(epochs):
  print(f"Epoch {i+1}\n-------------------------------")
  train_loop(train_loader, FP32_model, loss_fn, optimizer)
  test_loop(test_loader, FP32_model, loss_fn)

Epoch 1
-------------------------------
loss: 2.311991  [   32/60000]
loss: 2.072658  [ 3232/60000]
loss: 1.539363  [ 6432/60000]
loss: 1.172183  [ 9632/60000]
loss: 1.100624  [12832/60000]
loss: 0.840749  [16032/60000]
loss: 0.732234  [19232/60000]
loss: 0.800273  [22432/60000]
loss: 0.615576  [25632/60000]
loss: 0.533384  [28832/60000]
loss: 0.636276  [32032/60000]
loss: 0.561942  [35232/60000]
loss: 0.689834  [38432/60000]
loss: 0.744495  [41632/60000]
loss: 0.396491  [44832/60000]
loss: 0.479610  [48032/60000]
loss: 0.587342  [51232/60000]
loss: 0.659354  [54432/60000]
loss: 0.517647  [57632/60000]
Test Error: 
 Accuracy: 80.3%, Avg loss: 0.000919 

Epoch 2
-------------------------------
loss: 0.471254  [   32/60000]
loss: 0.584544  [ 3232/60000]
loss: 0.388880  [ 6432/60000]
loss: 0.380573  [ 9632/60000]
loss: 0.674940  [12832/60000]
loss: 0.532947  [16032/60000]
loss: 0.902433  [19232/60000]
loss: 0.354916  [22432/60000]
loss: 0.645885  [25632/60000]
loss: 0.376863  [28832/60000

# **5.Post Training Quantization**

Use Pytorch setup -> use some input data to calibrate -> convert to quantize model

In [8]:
#Import quantization
from torch.ao.quantization import get_default_qconfig
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx
from torch.ao.quantization import QConfigMapping
import copy

In [9]:
model = copy.deepcopy(FP32_model) #copy FP32 model
model.eval()
model.cpu()

ToyModel(
  (nn1): Linear(in_features=784, out_features=120, bias=True)
  (nn2): Linear(in_features=120, out_features=84, bias=True)
  (nn3): Linear(in_features=84, out_features=10, bias=True)
)

Use Pytorch setup

In [10]:
#set quantization config
qconfig = get_default_qconfig('qnnpack')

qconfig_mapping = QConfigMapping().set_global(qconfig)

calibrate

In [11]:
example_inputs = (next(iter(train_loader))[0]) #to know model input data type
prepared_model = prepare_fx(model, qconfig_mapping, example_inputs)

  torch.has_cuda,
  torch.has_cudnn,
  torch.has_mps,
  torch.has_mkldnn,


In [12]:
def calibrate(model, device, data_loader):
  model.to(device)
  model.eval()
  with torch.no_grad():
    for x, y in data_loader:
      x, y = x.to(device), y.to(device) #device
      model(x)
calibrate(prepared_model, 'cpu', test_loader)

convert to quantized model

In [13]:
PTQ_model = convert_fx(prepared_model)

check quantized model

In [14]:
print(PTQ_model)

GraphModule(
  (nn1): QuantizedLinearReLU(in_features=784, out_features=120, scale=0.02781897969543934, zero_point=0, qscheme=torch.per_tensor_affine)
  (nn2): QuantizedLinearReLU(in_features=120, out_features=84, scale=0.03556470572948456, zero_point=0, qscheme=torch.per_tensor_affine)
  (nn3): QuantizedLinear(in_features=84, out_features=10, scale=0.1019989475607872, zero_point=113, qscheme=torch.per_tensor_affine)
)



def forward(self, x):
    _input_scale_0 = self._input_scale_0
    _input_zero_point_0 = self._input_zero_point_0
    quantize_per_tensor = torch.quantize_per_tensor(x, _input_scale_0, _input_zero_point_0, torch.quint8);  x = _input_scale_0 = _input_zero_point_0 = None
    view = quantize_per_tensor.view(-1, 784);  quantize_per_tensor = None
    nn1 = self.nn1(view);  view = None
    nn2 = self.nn2(nn1);  nn1 = None
    nn3 = self.nn3(nn2);  nn2 = None
    dequantize_4 = nn3.dequantize();  nn3 = None
    return dequantize_4
    
# To see more debug info, please use `g

# **6.Quantization Aware Training**

Use Pytorch setup -> use input data to fine-tune model with fake quantize layer -> convert to quantize model

In [15]:
model = copy.deepcopy(FP32_model)

Use Pytorch setup

In [16]:
model.qconfig = torch.ao.quantization.get_default_qconfig('qnnpack')
qconfig_mapping = QConfigMapping().set_global(qconfig)

example_inputs = (next(iter(train_loader))[0]) #to know model input data type
prepared_model = torch.ao.quantization.quantize_fx.prepare_qat_fx(model, qconfig_mapping, example_inputs) # prepare to quantize model (fuse module (ex:CONV+BN+RELU...)，insert observer)
prepared_model.train()
prepared_model.to(device)

GraphModule(
  (activation_post_process_0): HistogramObserver(min_val=inf, max_val=-inf)
  (activation_post_process_1): HistogramObserver(min_val=inf, max_val=-inf)
  (nn1): LinearReLU(
    in_features=784, out_features=120, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (activation_post_process_2): HistogramObserver(min_val=inf, max_val=-inf)
  (nn2): LinearReLU(
    in_features=120, out_features=84, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (activation_post_process_3): HistogramObserver(min_val=inf, max_val=-inf)
  (nn3): Linear(
    in_features=84, out_features=10, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (activation_post_process_4): HistogramObserver(min_val=inf, max_val=-inf)
)

Training fake quantize model

In [17]:
learning_rate = 1e-3
epochs = 1
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(prepared_model.parameters(), lr=learning_rate, momentum=0.9)

In [18]:
for epoch in range(epochs):
  print(f"Epoch {epoch+1}\n-------------------------------")
  train_loop(train_loader, prepared_model, loss_fn, optimizer)
  test_loop(test_loader, prepared_model, loss_fn)

Epoch 1
-------------------------------
loss: 0.440943  [   32/60000]
loss: 0.425902  [ 3232/60000]
loss: 0.624886  [ 6432/60000]
loss: 0.464961  [ 9632/60000]
loss: 0.463102  [12832/60000]
loss: 0.327730  [16032/60000]
loss: 0.335214  [19232/60000]
loss: 0.374603  [22432/60000]
loss: 0.446798  [25632/60000]
loss: 0.401708  [28832/60000]
loss: 0.564758  [32032/60000]
loss: 0.600982  [35232/60000]
loss: 0.378758  [38432/60000]
loss: 0.471951  [41632/60000]
loss: 0.425112  [44832/60000]
loss: 0.413184  [48032/60000]
loss: 0.594654  [51232/60000]
loss: 0.248248  [54432/60000]
loss: 0.406001  [57632/60000]
Test Error: 
 Accuracy: 84.6%, Avg loss: 0.000642 



In [19]:
prepared_model.cpu()
prepared_model.eval()

GraphModule(
  (activation_post_process_0): HistogramObserver(min_val=-1.0, max_val=1.0)
  (activation_post_process_1): HistogramObserver(min_val=-1.0, max_val=1.0)
  (nn1): LinearReLU(
    in_features=784, out_features=120, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=-0.1149161159992218, max_val=0.11076779663562775)
  )
  (activation_post_process_2): HistogramObserver(min_val=0.0, max_val=7.873921871185303)
  (nn2): LinearReLU(
    in_features=120, out_features=84, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=-0.19614410400390625, max_val=0.22086060047149658)
  )
  (activation_post_process_3): HistogramObserver(min_val=0.0, max_val=10.523351669311523)
  (nn3): Linear(
    in_features=84, out_features=10, bias=True
    (weight_fake_quant): MinMaxObserver(min_val=-0.3988783359527588, max_val=0.4632803201675415)
  )
  (activation_post_process_4): HistogramObserver(min_val=-13.051665306091309, max_val=15.669031143188477)
)

convert to quantized model

In [20]:
QAT_model = convert_fx(prepared_model) # convert the calibrated model to a quantized model

In [21]:
print(QAT_model)

GraphModule(
  (nn1): QuantizedLinearReLU(in_features=784, out_features=120, scale=0.027018360793590546, zero_point=0, qscheme=torch.per_tensor_affine)
  (nn2): QuantizedLinearReLU(in_features=120, out_features=84, scale=0.035908035933971405, zero_point=0, qscheme=torch.per_tensor_affine)
  (nn3): QuantizedLinear(in_features=84, out_features=10, scale=0.1013011708855629, zero_point=110, qscheme=torch.per_tensor_affine)
)



def forward(self, x):
    _input_scale_0 = self._input_scale_0
    _input_zero_point_0 = self._input_zero_point_0
    quantize_per_tensor = torch.quantize_per_tensor(x, _input_scale_0, _input_zero_point_0, torch.quint8);  x = _input_scale_0 = _input_zero_point_0 = None
    view = quantize_per_tensor.view(-1, 784);  quantize_per_tensor = None
    nn1 = self.nn1(view);  view = None
    nn2 = self.nn2(nn1);  nn1 = None
    nn3 = self.nn3(nn2);  nn2 = None
    dequantize_4 = nn3.dequantize();  nn3 = None
    return dequantize_4
    
# To see more debug info, please use 

# **7.Compare FP32、PTQ and QAT model**

In [22]:
def print_size_of_model(model):
    """ Print the size of the model.

    Args:
        model: model whose size needs to be determined

    """
    torch.save(model.state_dict(), "temp.p")
    print('Size of the model(MB):', os.path.getsize("temp.p")/1e6)
    os.remove('temp.p')

def compare(model, device, test_loader, quantize=False):
  model.to(device)
  model.eval()

  total = 0
  correct = 0
  with torch.no_grad():
    for data in test_loader:
      images, labels = data
      images, labels = images.to(device),labels.to(device)
      outputs = model(images)
      # the class with the highest energy is what we choose as prediction
      _, predicted = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()

  test_loss = 0

  print("========================================= PERFORMANCE =============================================")
  print_size_of_model(model)
  print('\nAccuracy: {}/{} ({:.0f}%)\n'.format( correct, total,100. * correct / total))

In [None]:
compare(model=FP32_model, device="cpu", test_loader=test_loader)

In [None]:
compare(model=PTQ_model, device="cpu", test_loader=test_loader)

In [None]:
compare(model=QAT_model, device="cpu", test_loader=test_loader)

# **8.Quantize yourself**

# 8.1 Quantize layer by layer

In [26]:
model = copy.deepcopy(FP32_model).to("cpu")
print(model)

ToyModel(
  (nn1): Linear(in_features=784, out_features=120, bias=True)
  (nn2): Linear(in_features=120, out_features=84, bias=True)
  (nn3): Linear(in_features=84, out_features=10, bias=True)
)


In [27]:
class QuantizedLinear(nn.Module):
  def __init__(self, in_features, out_features, weight, bias):
    super(QuantizedLinear, self).__init__()
    self.in_features = in_features
    self.out_features = out_features
    self.scale, self.zero_point = None, None
    self.weight = self._weight_quantize(weight)
    self.bias = bias

  def forward(self, x):
    x = torch.ops.quantized.matmul(x, self.weight.t(), self.scale, self.zero_point).dequantize() + self.bias
    output = torch.quantize_per_tensor(x, self.scale, self.zero_point, dtype=torch.qint8)
    return output

  def _weight_quantize(self, weight):
    q_min, q_max = -128, 127
    min_val, max_val = np.min(weight.detach().numpy()), np.max(weight.detach().numpy())

    s = (max_val - min_val) / (q_max - q_min)
    z = round(q_min - min_val / s)
    return torch.quantize_per_tensor(weight, s, z, dtype=torch.qint8)

  def _calibrate(self, x):
    x = x.dequantize()
    output = torch.matmul(x, self.weight.t().dequantize())
    q_min, q_max = -128, 127
    min_val, max_val = np.min(output.detach().numpy()), np.max(output.detach().numpy())
    self.scale = (max_val - min_val) / (q_max - q_min)
    self.zero_point = round(q_min - min_val / self.scale)


  def __repr__(self):
    return f'QuantizedLinear(in_features={self.in_features}, out_features={self.out_features}, scale={self.scale}, zero_point={self.zero_point})'

class QuantizedLinearReLU(nn.Module):
  def __init__(self, in_features, out_features, weight, bias):
    super(QuantizedLinearReLU, self).__init__()
    self.in_features = in_features
    self.out_features = out_features
    self.scale, self.zero_point = None, None
    self.weight = self._weight_quantize(weight)
    self.bias = bias

  def forward(self, x):
    x = torch.ops.quantized.matmul(x, self.weight.t(), self.scale, self.zero_point).dequantize() + self.bias
    output = torch.quantize_per_tensor(x, self.scale, self.zero_point, dtype=torch.qint8)
    output = F.relu(output)
    return output

  def _weight_quantize(self, weight):
    q_min, q_max = -128, 127
    min_val, max_val = np.min(weight.detach().numpy()), np.max(weight.detach().numpy())

    s = (max_val - min_val) / (q_max - q_min)
    z = round(q_min - min_val / s)
    return torch.quantize_per_tensor(weight, s, z, dtype=torch.qint8)

  def _calibrate(self, x):
    x = x.dequantize()
    output = F.relu(torch.matmul(x, self.weight.t().dequantize()))
    q_min, q_max = -128, 127
    min_val, max_val = np.min(output.detach().numpy()), np.max(output.detach().numpy())
    self.scale = (max_val - min_val) / (q_max - q_min)
    self.zero_point = round(q_min - min_val / self.scale)

  def __repr__(self):
    return f'QuantizedLinearReLU(in_features={self.in_features}, out_features={self.out_features}, scale={self.scale}, zero_point={self.zero_point})'

In [28]:
class QuantizedModel(nn.Module):
  def __init__(self, model):
    super(QuantizedModel, self).__init__()
    self.weight_dic = []
    self.bias_dic = []
    self.scale, self.zero_point = None, None  #scale and zero point of input layer
    self._get_weight()
    self.nn1 = QuantizedLinearReLU(in_features=28*28, out_features=120, weight=self.weight_dic[0], bias=self.bias_dic[0])
    self.nn2 = QuantizedLinearReLU(in_features=120, out_features=84, weight=self.weight_dic[1], bias=self.bias_dic[1])
    self.nn3 = QuantizedLinear(in_features=84, out_features=10, weight=self.weight_dic[2], bias=self.bias_dic[2])

  def forward(self, x):
    x = x.view(-1, 28 * 28)
    x = torch.quantize_per_tensor(x, self.scale, self.zero_point, dtype=torch.qint8)
    x = self.nn1(x)
    x = self.nn2(x)
    x = self.nn3(x)
    x = x.dequantize()
    return x

  def _get_weight(self):
    for name, paras in model.named_parameters():
      if "weight" in name:
        self.weight_dic.append(paras)
      elif "bias" in name:
        self.bias_dic.append(paras)

  def _calibrate(self, input):
    self.scale = (np.max(input.detach().numpy()) - np.min(input.detach().numpy())) / 256
    self.zero_point = round(np.min(input.detach().numpy())/self.scale)
    input = input.view(-1, 28*28)
    input = torch.quantize_per_tensor(input, self.scale, self.zero_point, dtype=torch.qint8)

    self.nn1._calibrate(input)
    input = self.nn1(input)

    self.nn2._calibrate(input)
    input = self.nn2(input)

    self.nn3._calibrate(input)

In [29]:
test_model = QuantizedModel(model)
print(test_model)

QuantizedModel(
  (nn1): QuantizedLinearReLU(in_features=784, out_features=120, scale=None, zero_point=None)
  (nn2): QuantizedLinearReLU(in_features=120, out_features=84, scale=None, zero_point=None)
  (nn3): QuantizedLinear(in_features=84, out_features=10, scale=None, zero_point=None)
)


這邊需要注意的是，weight的s、z與activation的s、z是分開的，因此每個layer會有兩組(s, z)

In [30]:
#Calibrate to compute scale and zero point of model
for batch in train_loader:
  input, label = batch
  test_model._calibrate(input)
  break
print(test_model)

QuantizedModel(
  (nn1): QuantizedLinearReLU(in_features=784, out_features=120, scale=0.009455646253099628, zero_point=-128)
  (nn2): QuantizedLinearReLU(in_features=120, out_features=84, scale=0.007616113213931813, zero_point=-128)
  (nn3): QuantizedLinear(in_features=84, out_features=10, scale=0.02165715834673713, zero_point=-15)
)


# 8.2 Quantize all layer at the same time

In [31]:
model = copy.deepcopy(FP32_model).to("cpu")
print(model)

ToyModel(
  (nn1): Linear(in_features=784, out_features=120, bias=True)
  (nn2): Linear(in_features=120, out_features=84, bias=True)
  (nn3): Linear(in_features=84, out_features=10, bias=True)
)


In [32]:
#Check name of all layer
train_nodes, eval_nodes = feature_extraction.get_graph_node_names(model)
print(train_nodes)

['x', 'view', 'nn1', 'relu', 'nn2', 'relu_1', 'nn3']


In [33]:
scale_dic = []
zero_dic = []

#Calibrate to compute s、z of all layer at the same time
for batch in train_loader:
  input, label = batch
  for node in train_nodes:
    if node == "x" or ("relu" in node) or node == "nn3":
      extractor = feature_extraction.create_feature_extractor(model, [node]).cpu()
      output = extractor(input)[node]
      q_min, q_max = -128, 127
      min_val, max_val = np.min(output.detach().numpy()), np.max(output.detach().numpy())
      scale = (max_val - min_val) / (q_max - q_min)
      zero = round(q_min - min_val / scale)
      scale_dic.append(scale)
      zero_dic.append(zero)
  break


print(scale_dic)
print(zero_dic)

[0.00784313725490196, 0.02774794522453757, 0.036900086496390545, 0.0955727969898897]
[0, -128, -128, -23]


In [34]:
class QuantizedLinear2(nn.Module):
  def __init__(self, in_features, out_features, weight, bias, scale, zero_point):
    super(QuantizedLinear2, self).__init__()
    self.in_features = in_features
    self.out_features = out_features
    self.scale, self.zero_point = scale, zero_point
    self.weight = self._weight_quantize(weight)
    self.bias = bias

  def forward(self, x):
    x = torch.ops.quantized.matmul(x, self.weight.t(), self.scale, self.zero_point).dequantize() + self.bias
    output = torch.quantize_per_tensor(x, self.scale, self.zero_point, dtype=torch.qint8)

    return output

  def _weight_quantize(self, weight):
    q_min, q_max = -128, 127
    min_val, max_val = np.min(weight.detach().numpy()), np.max(weight.detach().numpy())

    s = (max_val - min_val) / (q_max - q_min)
    z = round(q_min - min_val / s)
    return torch.quantize_per_tensor(weight, s, z, dtype=torch.qint8)

  def __repr__(self):
    return f'QuantizedLinear(in_features={self.in_features}, out_features={self.out_features}, scale={self.scale}, zero_point={self.zero_point})'

class QuantizedLinearReLU2(nn.Module):
  def __init__(self, in_features, out_features, weight, bias, scale, zero_point):
    super(QuantizedLinearReLU2, self).__init__()
    self.in_features = in_features
    self.out_features = out_features
    self.scale, self.zero_point = scale, zero_point
    self.weight = self._weight_quantize(weight)
    self.bias = bias

  def forward(self, x):
    x = torch.ops.quantized.matmul(x, self.weight.t(), self.scale, self.zero_point).dequantize() + self.bias
    output = torch.quantize_per_tensor(x, self.scale, self.zero_point, dtype=torch.qint8)
    output = F.relu(output)

    return output

  def _weight_quantize(self, weight):
    q_min, q_max = -128, 127
    min_val, max_val = np.min(weight.detach().numpy()), np.max(weight.detach().numpy())

    s = (max_val - min_val) / (q_max - q_min)
    z = round(q_min - min_val / s)
    return torch.quantize_per_tensor(weight, s, z, dtype=torch.qint8)

  def __repr__(self):
    return f'QuantizedLinearReLU(in_features={self.in_features}, out_features={self.out_features}, scale={self.scale}, zero_point={self.zero_point})'

In [35]:
class QuantizedModel2(nn.Module):
  def __init__(self, model, scale, zero_point):
    super(QuantizedModel2, self).__init__()
    self.weight_dic = []
    self.bias_dic = []
    self.scale, self.zero_point = scale, zero_point #scale and zero point of input layer

    self._get_weight()
    self.nn1 = QuantizedLinearReLU2(in_features=28*28, out_features=120, weight=self.weight_dic[0], bias=self.bias_dic[0], scale=self.scale[1], zero_point=self.zero_point[1])
    self.nn2 = QuantizedLinearReLU2(in_features=120, out_features=84, weight=self.weight_dic[1], bias=self.bias_dic[1], scale=self.scale[2], zero_point=self.zero_point[2])
    self.nn3 = QuantizedLinear2(in_features=84, out_features=10, weight=self.weight_dic[2], bias=self.bias_dic[2], scale=self.scale[3], zero_point=self.zero_point[3])

  def forward(self, x):
    x = x.view(-1, 28 * 28)
    x = torch.quantize_per_tensor(x, self.scale[0], self.zero_point[0], dtype=torch.qint8)
    x = self.nn1(x)
    x = self.nn2(x)
    x = self.nn3(x)
    x = x.dequantize()
    return x

  def _get_weight(self):
    for name, paras in model.named_parameters():
      if "weight" in name:
        self.weight_dic.append(paras)
      elif "bias" in name:
        self.bias_dic.append(paras)

In [36]:
test_model2 = QuantizedModel2(model, scale=scale_dic, zero_point=zero_dic)

print(test_model2)

QuantizedModel2(
  (nn1): QuantizedLinearReLU(in_features=784, out_features=120, scale=0.02774794522453757, zero_point=-128)
  (nn2): QuantizedLinearReLU(in_features=120, out_features=84, scale=0.036900086496390545, zero_point=-128)
  (nn3): QuantizedLinear(in_features=84, out_features=10, scale=0.0955727969898897, zero_point=-23)
)


# 8.3 Compare

In [42]:
FP32_model.to("cpu")
for batch in train_loader:
  input, label = batch
  output1 = FP32_model(input)[0]    #FP32 model
  output2 = test_model(input)[0]    #Quantize layer by layer
  output3 = test_model2(input)[0]    #Quantize at the same time
  print(output1)
  print(output2)
  print(output3)
  break

tensor([-3.1556, -0.4147, -2.7360, -3.8460, -2.4293,  4.8535, -3.8508,  9.1655,
         1.3757,  0.6913], grad_fn=<SelectBackward0>)
tensor([-0.1299, -0.0217, -0.3465, -0.4115, -0.4548,  0.1949, -0.5631,  0.8879,
         0.2599,  0.5198])
tensor([-3.1539, -0.3823, -2.7716, -3.8229, -2.4849,  4.8742, -3.7273,  8.9838,
         1.3380,  0.6690])


In [None]:
#Quantize layer by layer
total = 0
correct = 0
with torch.no_grad():
  for data in test_loader:
    images, labels = data
    outputs = test_model(images)
    # the class with the highest energy is what we choose as prediction
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum().item()

test_loss = 0

print("========================================= PERFORMANCE =============================================")
print('\nAccuracy: {}/{} ({:.0f}%)\n'.format( correct, total,100. * correct / total))

In [None]:
#Quantize at the same time
total = 0
correct = 0
with torch.no_grad():
  for data in test_loader:
    images, labels = data
    outputs = test_model2(images)
    # the class with the highest energy is what we choose as prediction
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum().item()

test_loss = 0

print("========================================= PERFORMANCE =============================================")
print('\nAccuracy: {}/{} ({:.0f}%)\n'.format( correct, total,100. * correct / total))

# 8.4 Compute MSE of output of each layer

In [None]:
train_nodes, eval_nodes = feature_extraction.get_graph_node_names(test_model)
print(train_nodes)
train_nodes, eval_nodes = feature_extraction.get_graph_node_names(test_model2)
print(train_nodes)

In [None]:
#Compare MSE of 2 different methods

for batch in train_loader:
  input, label = batch
  for train_node in train_nodes:
    if train_node == "quantize_per_tensor" or "relu" in train_node or train_node == "dequantize":
      extractor1 = feature_extraction.create_feature_extractor(test_model, [train_node]).cpu()
      output1 = extractor1(input)[train_node]
      extractor2 = feature_extraction.create_feature_extractor(test_model2, [train_node]).cpu()
      output2 = extractor2(input)[train_node]
      mse = F.mse_loss(output1.dequantize(), output2.dequantize())
      print(f'MSE of layer {train_node} is {mse}')
  break