Optimizing LPR_Net With MLC and Model Optimizations

First, let's test the baseline model. We will be using CPU for all demonstrative purposes due to the lack of compute resources on GPU.

## Import Test Dataset

In [1]:
import os

if not os.path.exists('test_data'):
    !unzip test_data.zip

## Import Libraries

In [2]:
!pip install onnx
!pip install onnxscript
!pip install onnxruntime



## Import Libraries Needed

In [3]:
from imutils import paths
import numpy as np
import random
import cv2
import os
import torch
import torch.nn as nn
from torch.utils.data import *
import time
from torch.autograd import Variable
from types import SimpleNamespace
from PIL import Image, ImageDraw, ImageFont

## Define Seeds

In [4]:
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)

## Model Structure

Due to issues with importing the MaxPool3D layer to ONNX, I needed to manually squeeze and unsqueeze those layers.

In [5]:
class small_basic_block(nn.Module):
    def __init__(self, ch_in, ch_out):
        super(small_basic_block, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(ch_in, ch_out // 4, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(ch_out // 4, ch_out // 4, kernel_size=(3, 1), padding=(1, 0)),
            nn.ReLU(),
            nn.Conv2d(ch_out // 4, ch_out // 4, kernel_size=(1, 3), padding=(0, 1)),
            nn.ReLU(),
            nn.Conv2d(ch_out // 4, ch_out, kernel_size=1),
        )
    def forward(self, x):
        return self.block(x)

class LPRNet(nn.Module):
    def __init__(self, lpr_max_len, phase, class_num, dropout_rate):
        super(LPRNet, self).__init__()
        self.phase = phase
        self.lpr_max_len = lpr_max_len
        self.class_num = class_num
        self.backbone = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1), # 0
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(),  # 2
            nn.MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 1, 1)),
            small_basic_block(ch_in=64, ch_out=128),    # *** 4 ***
            nn.BatchNorm2d(num_features=128),
            nn.ReLU(),  # 6
            nn.MaxPool3d(kernel_size=(1, 3, 3), stride=(2, 1, 2)),
            small_basic_block(ch_in=64, ch_out=256),   # 8
            nn.BatchNorm2d(num_features=256),
            nn.ReLU(),  # 10
            small_basic_block(ch_in=256, ch_out=256),   # *** 11 ***
            nn.BatchNorm2d(num_features=256),   # 12
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 3, 3), stride=(4, 1, 2)),  # 14
            nn.Dropout(dropout_rate),
            nn.Conv2d(in_channels=64, out_channels=256, kernel_size=(1, 4), stride=1),  # 16
            nn.BatchNorm2d(num_features=256),
            nn.ReLU(),  # 18
            nn.Dropout(dropout_rate),
            nn.Conv2d(in_channels=256, out_channels=class_num, kernel_size=(13, 1), stride=1), # 20
            nn.BatchNorm2d(num_features=class_num),
            nn.ReLU(),  # *** 22 ***
        )
        self.container = nn.Sequential(
            nn.Conv2d(in_channels=448+self.class_num, out_channels=self.class_num, kernel_size=(1, 1), stride=(1, 1)),
            # nn.BatchNorm2d(num_features=self.class_num),
            # nn.ReLU(),
            # nn.Conv2d(in_channels=self.class_num, out_channels=self.lpr_max_len+1, kernel_size=3, stride=2),
            # nn.ReLU(),
        )

    def forward(self, x):
        keep_features = list()
        for i, layer in enumerate(self.backbone.children()):
            # x = layer(x)
            if i == 3 or i == 7 or i == 14:
                x = x.unsqueeze(1)
                x = layer(x)
                x = x.squeeze(1)
            else:
                x = layer(x)
            if i in [2, 6, 13, 22]: # [2, 4, 8, 11, 22]
                keep_features.append(x)

        global_context = list()
        for i, f in enumerate(keep_features):
            if i in [0, 1]:
                f = nn.AvgPool2d(kernel_size=5, stride=5)(f)
            if i in [2]:
                f = nn.AvgPool2d(kernel_size=(4, 10), stride=(4, 2))(f)
            f_pow = torch.pow(f, 2)
            f_mean = torch.mean(f_pow)
            f = torch.div(f, f_mean)
            global_context.append(f)

        x = torch.cat(global_context, 1)
        x = self.container(x)
        logits = torch.mean(x, dim=2)

        return logits

def build_lprnet(lpr_max_len=8, phase=False, class_num=66, dropout_rate=0.5):

    Net = LPRNet(lpr_max_len, phase, class_num, dropout_rate)

    if phase == "train":
        return Net.train()
    else:
        return Net.eval()

## Data Loader

In [6]:
CHARS = ['京', '沪', '津', '渝', '冀', '晋', '蒙', '辽', '吉', '黑',
         '苏', '浙', '皖', '闽', '赣', '鲁', '豫', '鄂', '湘', '粤',
         '桂', '琼', '川', '贵', '云', '藏', '陕', '甘', '青', '宁',
         '新',
         '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
         'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K',
         'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
         'W', 'X', 'Y', 'Z', 'I', 'O', '-'
         ]

CHARS_DICT = {char:i for i, char in enumerate(CHARS)}

class LPRDataLoader(Dataset):
    def __init__(self, img_dir, imgSize, lpr_max_len, PreprocFun=None):
        self.img_dir = img_dir
        self.img_paths = []
        for i in range(len(img_dir)):
            self.img_paths += [el for el in paths.list_images(img_dir[i])]
        random.shuffle(self.img_paths)
        self.img_size = imgSize
        self.lpr_max_len = lpr_max_len
        if PreprocFun is not None:
            self.PreprocFun = PreprocFun
        else:
            self.PreprocFun = self.transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        filename = self.img_paths[index]
        Image = cv2.imread(filename)
        height, width, _ = Image.shape
        if height != self.img_size[1] or width != self.img_size[0]:
            Image = cv2.resize(Image, self.img_size)
        Image = self.PreprocFun(Image)

        basename = os.path.basename(filename)
        imgname, suffix = os.path.splitext(basename)
        imgname = imgname.split("-")[0].split("_")[0]
        label = list()
        for c in imgname:
            # one_hot_base = np.zeros(len(CHARS))
            # one_hot_base[CHARS_DICT[c]] = 1
            label.append(CHARS_DICT[c])

        if len(label) == 8:
            if self.check(label) == False:
                print(imgname)
                assert 0, "Error label ^~^!!!"

        return Image, label, len(label)

    def transform(self, img):
        img = img.astype('float32')
        img -= 127.5
        img *= 0.0078125
        img = np.transpose(img, (2, 0, 1))

        return img

    def check(self, label):
        if label[2] != CHARS_DICT['D'] and label[2] != CHARS_DICT['F'] \
                and label[-1] != CHARS_DICT['D'] and label[-1] != CHARS_DICT['F']:
            print("Error label, Please check!")
            return False
        else:
            return True

def collate_fn(batch):
    imgs = []
    labels = []
    lengths = []
    for _, sample in enumerate(batch):
        img, label, length = sample
        imgs.append(torch.from_numpy(img))
        labels.extend(label)
        lengths.append(length)
    labels = np.asarray(labels).flatten().astype(np.float32)

    return (torch.stack(imgs, 0), torch.from_numpy(labels), lengths)

In [7]:
def test():

    lprnet = build_lprnet(lpr_max_len=args.lpr_max_len, phase=args.phase_train, class_num=len(CHARS), dropout_rate=args.dropout_rate)
    device = torch.device("cpu")
    lprnet.to(device)
    print("Successful to build network!")

    # load pretrained model
    if args.pretrained_model:
        lprnet.load_state_dict(torch.load(args.pretrained_model, map_location=torch.device('cpu')))
        print("load pretrained model successful!")
    else:
        print("[Error] Can't found pretrained mode, please check!")
        return False

    test_img_dirs = os.path.expanduser(args.test_img_dirs)
    test_dataset = LPRDataLoader(test_img_dirs.split(','), args.img_size, args.lpr_max_len)
    try:
        Greedy_Decode_Eval(lprnet, test_dataset, args)
    finally:
        cv2.destroyAllWindows()

def Greedy_Decode_Eval(Net, datasets, args):
    # TestNet = Net.eval()
    epoch_size = len(datasets) // args.test_batch_size
    batch_iterator = iter(DataLoader(datasets, args.test_batch_size, shuffle=True, num_workers=args.num_workers, collate_fn=collate_fn))

    Tp = 0
    Tn_1 = 0
    Tn_2 = 0
    t1 = time.time()
    for i in range(epoch_size):
        # load train data
        images, labels, lengths = next(batch_iterator)
        start = 0
        targets = []
        for length in lengths:
            label = labels[start:start+length]
            targets.append(label)
            start += length
        targets = np.array([el.numpy() for el in targets])
        imgs = images.numpy().copy()

        if args.cuda:
            images = Variable(images.cuda())
        else:
            images = Variable(images)

        # forward
        prebs = Net(images)
        # greedy decode
        prebs = prebs.cpu().detach().numpy()
        preb_labels = list()
        for i in range(prebs.shape[0]):
            preb = prebs[i, :, :]
            preb_label = list()
            for j in range(preb.shape[1]):
                preb_label.append(np.argmax(preb[:, j], axis=0))
            no_repeat_blank_label = list()
            pre_c = preb_label[0]
            if pre_c != len(CHARS) - 1:
                no_repeat_blank_label.append(pre_c)
            for c in preb_label: # dropout repeate label and blank label
                if (pre_c == c) or (c == len(CHARS) - 1):
                    if c == len(CHARS) - 1:
                        pre_c = c
                    continue
                no_repeat_blank_label.append(c)
                pre_c = c
            preb_labels.append(no_repeat_blank_label)
        for i, label in enumerate(preb_labels):
            # show image and its predict label
            if args.show:
                show(imgs[i], label, targets[i])
            if len(label) != len(targets[i]):
                Tn_1 += 1
                continue
            if (np.asarray(targets[i]) == np.asarray(label)).all():
                Tp += 1
            else:
                Tn_2 += 1
    Acc = Tp * 1.0 / (Tp + Tn_1 + Tn_2)
    print("[Info] Test Accuracy: {} [{}:{}:{}:{}]".format(Acc, Tp, Tn_1, Tn_2, (Tp+Tn_1+Tn_2)))
    t2 = time.time()
    print("[Info] Test Speed: {}s 1/{}]".format((t2 - t1) / len(datasets), len(datasets)))
    return Acc

def show(img, label, target):
    img = np.transpose(img, (1, 2, 0))
    img *= 128.
    img += 127.5
    img = img.astype(np.uint8)

    lb = ""
    for i in label:
        lb += CHARS[i]
    tg = ""
    for j in target.tolist():
        tg += CHARS[int(j)]

    flag = "F"
    if lb == tg:
        flag = "T"
    img = cv2ImgAddText(img, lb, (0, 0))
    cv2.imshow("test", img)
    print("target: ", tg, " ### {} ### ".format(flag), "predict: ", lb)
    cv2.waitKey()
    cv2.destroyAllWindows()

def cv2ImgAddText(img, text, pos, textColor=(255, 0, 0), textSize=12):
    if (isinstance(img, np.ndarray)):  # detect opencv format or not
        img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img)
    fontText = ImageFont.truetype("data/NotoSansCJK-Regular.ttc", textSize, encoding="utf-8")
    draw.text(pos, text, textColor, font=fontText)

    return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)

## Define Arguments

In [8]:
args = SimpleNamespace(**{
    "img_size": [94, 24],
    "test_img_dirs": "./test_data",
    "dropout_rate": 0,
    "lpr_max_len": 8,
    "test_batch_size": 100,
    "phase_train": False,
    "num_workers": 0,
    "cuda": False, # test on CPU
    "show": False,
    "pretrained_model": "./Final_LPRNet_model.pth",
})

Let's calculate the PyTorch baseline model's accuracy and inference speed per image.

In [9]:
test()

Successful to build network!
load pretrained model successful!


  lprnet.load_state_dict(torch.load(args.pretrained_model, map_location=torch.device('cpu')))


[Info] Test Accuracy: 0.899 [899:59:42:1000]
[Info] Test Speed: 0.21223733949661255s 1/1000]


## Build LPR Net Structure

In [10]:
def get_model_size(model, filename):
  dummy_input = torch.randn(1, 3, 24, 94)
  torch.onnx.export(
      lprnet,
      dummy_input,
      filename,
      input_names=["input"],
      output_names=["output"],
      dynamic_axes={
          "input": {0: "batch_size"},
          "output": {0: "batch_size"},
      },
  )
  size_in_bytes = os.path.getsize(filename)
  size_in_mb = size_in_bytes / (1024 * 1024)
  return size_in_mb

def get_full_model_size_pytorch(model):
    torch.save(model, "full_model.pth")
    size = os.path.getsize("full_model.pth")
    os.remove('full_model.pth')
    return size / 1e6  # Size in MB

In [11]:
lprnet = build_lprnet(lpr_max_len=args.lpr_max_len, phase=args.phase_train, class_num=len(CHARS), dropout_rate=args.dropout_rate)
lprnet.load_state_dict(torch.load(args.pretrained_model, map_location=torch.device('cpu')))
print("Model Size: " + str(get_model_size(lprnet, "lprnet.onnx")) + " MB")


  lprnet.load_state_dict(torch.load(args.pretrained_model, map_location=torch.device('cpu')))


Model Size: 1.7072572708129883 MB


As we can see, we were able to obtain a test accuracy of 89.9%, an inference speed of 183 ms, and a model size of 1.71 MB.

In [12]:
print("Model Size: " + str(get_full_model_size_pytorch(lprnet)) + " MB")

Model Size: 1.82749 MB


We will also keep track of the PyTorch saving method, as it will be a useful comparison. And with this method the baseline model size is around 1.83 MB.

## Pruning Optimizations

Let's trying doing a filter-based pruning optimization.

In [13]:
def evaluate(model):
  test_img_dirs = os.path.expanduser(args.test_img_dirs)
  test_dataset = LPRDataLoader(test_img_dirs.split(','), args.img_size, args.lpr_max_len)
  try:
      Acc = Greedy_Decode_Eval(model, test_dataset, args)
  finally:
      cv2.destroyAllWindows()
  return Acc

In [14]:
import copy
lprnet_copy = copy.deepcopy(lprnet)
print(f"LPRNet Model is on device: {next(lprnet_copy.parameters()).device}")

LPRNet Model is on device: cpu


In [15]:
def recover_model(model):
    state_dict = torch.load(args.pretrained_model, map_location=torch.device('cpu'))
    model.load_state_dict(state_dict)
    return model

In [16]:
def filter_prune(tensor: nn.Conv2d, sparsity: float) -> torch.Tensor:
    """
    Filter-based pruning for convolutional layers
    :param tensor: torch.(cuda.)Tensor, weight of conv layer
    :param sparsity: float, pruning sparsity
        sparsity = #zeros / #elements = 1 - #nonzeros / #elements
    :return:
        torch.(cuda.)Tensor, mask with zeroed indices
    """
    num_filters = tensor.weight.size(0)
    num_prune = int(num_filters * sparsity)

    # L1-norm for each filter
    flattened_tensor = tensor.weight.view(num_filters, -1)
    filter_norms = torch.norm(flattened_tensor, p=1, dim=1)

    # Prune filters with smallest L1-norm
    prune_indices = torch.topk(filter_norms, num_prune, largest=False).indices

    # Mask to determine what filters to keep / not keep
    mask = torch.ones_like(filter_norms)
    mask[prune_indices] = 0

    # Apply mask to tensor weights
    mask = mask.view(-1, 1, 1, 1)
    tensor.weight.data.mul_(mask)

    return mask

class FilterPruner:
    def __init__(self, model, sparsity_dict):
        self.masks = FilterPruner.prune(model, sparsity_dict)

    @torch.no_grad()
    def apply(self, model):
        for name, param in model.named_parameters():
            if name in self.masks:
                param *= self.masks[name]

    @staticmethod
    @torch.no_grad()
    def prune(model, sparsity_dict):
        masks = dict()
        for name, module in model.named_modules():
            if isinstance(module, nn.Conv2d):
                if isinstance(sparsity_dict, dict):
                    masks[name] = filter_prune(module, sparsity_dict[name])
                else:
                    assert(0 <= sparsity_dict < 1)
                    if sparsity_dict > 0:
                        masks[name] = filter_prune(module, sparsity_dict)
        return masks


I orignally tried pruning up to 30% of the weight, but unfortunately it reduces the accuracy by a significant amount. As we have no efficient way of retraining the pruned model to gain back the lost accuracy, I decided to only prune the amount of weights that I could optimize, which ended up being only between 1% to 1.65%.

In [17]:
model_accuracies_filter = []
model_sizes_filter = []
best_pruned_model = None

for i in range(20, 34, 1): # (20, 34, 1)
  sparsity = i / 2000
  print("Sparsity factor: ", sparsity)
  lprnet_copy = recover_model(lprnet_copy)
  pruner = FilterPruner(lprnet_copy, sparsity)
  pruner.apply(lprnet_copy)
  sparse_model_accuracy = evaluate(lprnet_copy)
  sparse_model_size = get_model_size(lprnet_copy, "lprnet_model_prune_" + str(sparsity) + ".onnx")
  model_accuracies_filter.append(sparse_model_accuracy)
  model_sizes_filter.append(sparse_model_size)
  print("Model Size: " + str(sparse_model_size) + " MB")
  # As soon as the pruned model performs worse than 85%, I break from the for loop.
  if sparse_model_accuracy > 0.85:
    best_pruned_model = copy.deepcopy(lprnet_copy)
  else:
    break

Sparsity factor:  0.01


  state_dict = torch.load(args.pretrained_model, map_location=torch.device('cpu'))


[Info] Test Accuracy: 0.9 [900:59:41:1000]
[Info] Test Speed: 0.20528143215179442s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.0105
[Info] Test Accuracy: 0.9 [900:61:39:1000]
[Info] Test Speed: 0.20610330057144166s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.011
[Info] Test Accuracy: 0.9 [900:59:41:1000]
[Info] Test Speed: 0.20144831156730653s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.0115
[Info] Test Accuracy: 0.897 [897:61:42:1000]
[Info] Test Speed: 0.20327204990386963s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.012
[Info] Test Accuracy: 0.903 [903:58:39:1000]
[Info] Test Speed: 0.20221845483779907s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.0125
[Info] Test Accuracy: 0.902 [902:59:39:1000]
[Info] Test Speed: 0.2006907227039337s 1/1000]
Model Size: 1.7072572708129883 MB
Sparsity factor:  0.013
[Info] Test Accuracy: 0.897 [897:61:42:1000]
[Info] Test Speed: 0.20007882452011108s 1/1000]
Model

As we can see from the output above, the model size did not change significantly as pruning was taking place, which makes sense since I could only remove such a small amount of weights before the model performance was affected dramatically. I will choose the best model which can maximize the sparsity while not affecting the accuracy or speed as much. From the graph above, and the values outputted, I believe that a sparsity factor of 1.45% would be best as it generally keeps the size from 1.707 MB to 1.707 MB, accuracy from 90.1% to 89.9%, and inference time from 183.8 to 182.4. Although these changes are somewhat negligble, I was not able to do much better due to the lack of a provided dataset which would enable me to fine-tune the model after pruning the weights.

In [18]:
print("Size of full model saved using PyTorch: " + str(get_full_model_size_pytorch(best_pruned_model)) + " MB")

Size of full model saved using PyTorch: 1.82749 MB


Since the size did not change, I decided to save the model using PyTorch, which keeps the weights and architecture, and I found that the best pruned model saved with a size of 1.82 MB. Comparing this to the baseline, we can also see that the model size did not change, suggesting that little improvement was done with pruning without dramatically affecting the performance.

In [19]:
best_pruned_model_copy = copy.deepcopy(best_pruned_model)
best_pruned_model_copy

LPRNet(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=0, dilation=1, ceil_mode=False)
    (4): small_basic_block(
      (block): Sequential(
        (0): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1))
        (1): ReLU()
        (2): Conv2d(32, 32, kernel_size=(3, 1), stride=(1, 1), padding=(1, 0))
        (3): ReLU()
        (4): Conv2d(32, 32, kernel_size=(1, 3), stride=(1, 1), padding=(0, 1))
        (5): ReLU()
        (6): Conv2d(32, 128, kernel_size=(1, 1), stride=(1, 1))
      )
    )
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool3d(kernel_size=(1, 3, 3), stride=(2, 1, 2), padding=0, dilation=1, ceil_mode=False)
    (8): small_basic_block(
      (block): Sequential(
        (0): Conv2d(64, 64, 

Quantizations

After pruning, the next optimization that I chose to do was quantization. Specifically, I chose to apply post-training static quantization (PTQ) as I did not have a dataset to train the model after quantization. Nonetheless, we will apply this methodology to our model to see how it performs compared to pruning and the baseline itself.

In [20]:
evaluate(best_pruned_model_copy)

[Info] Test Accuracy: 0.898 [898:61:41:1000]
[Info] Test Speed: 0.19977479028701783s 1/1000]


0.898

Import Libraries

In [21]:
import torch
import torch.nn as nn
from torch.quantization import QuantStub, DeQuantStub
import torch.nn.functional as F

Import Test Dataset and Loader

In [22]:
test_img_dirs = os.path.expanduser(args.test_img_dirs)
test_dataset = LPRDataLoader(test_img_dirs.split(','), args.img_size, args.lpr_max_len)
test_loader = DataLoader(test_dataset, batch_size=args.test_batch_size, shuffle=False, num_workers=args.num_workers, collate_fn=collate_fn)

For our specific methodology, we will need to wrap the MaxPool3D layers to ensure that those layers are contiguous.

In [23]:
class EnsureContiguous(nn.Module):
    def forward(self, x):
        return x.contiguous()

def insert_quant_stubs(module):
    for name, submodule in module.named_children():
        if isinstance(submodule, nn.MaxPool3d):
            new_layer = nn.Sequential(
                DeQuantStub(),
                EnsureContiguous(),
                submodule,
                QuantStub()
            )
            setattr(module, name, new_layer)
        elif isinstance(submodule, nn.Sequential):
            insert_quant_stubs(submodule)
    return module

class LPRNetWithQuant(nn.Module):
    """
    A wrapper class to add quantization support to the LPRNet model.
    """
    def __init__(self, base_model):
        super().__init__()
        self.quant = QuantStub()
        self.dequant = DeQuantStub()
        self.model = insert_quant_stubs(base_model)

    def forward(self, x):
        x = self.quant(x)

        feature_maps = []
        for idx, layer in enumerate(self.model.backbone.children()):
            x = layer(x)
            if idx in [2, 6, 13, 22]:
                feature_maps.append(x)

        context_features = []
        for idx, fmap in enumerate(feature_maps):
            if idx in [0, 1]:
                fmap = nn.AvgPool2d(kernel_size=5, stride=5)(fmap)
            elif idx == 2:
                fmap = nn.AvgPool2d(kernel_size=(4, 10), stride=(4, 2))(fmap)

            fmap = self.dequant(fmap)
            normalized_fmap = fmap / torch.mean(torch.pow(fmap, 2))
            fmap = self.quant(normalized_fmap)
            context_features.append(fmap)

        x = torch.cat(context_features, dim=1)
        x = self.model.container(x)
        logits = torch.mean(x, dim=2)
        return self.dequant(logits).contiguous()

In [24]:
def prepare_model_for_quantization(base_model):
    """
    Prepares the model for post-training quantization.
    """
    quant_ready_model = LPRNetWithQuant(base_model)
    quant_ready_model.eval()
    return quant_ready_model

def perform_calibration(model, data_loader, max_batches=100):
    device = next(model.parameters()).device
    with torch.no_grad():
        for batch_idx, (images, _, _) in enumerate(data_loader):
            if batch_idx >= max_batches:
                break
            print(f"Calibrating batch {batch_idx + 1}/{max_batches}")
            images = images.to(device)
            model(images)

# Configure quantization
torch.backends.quantized.engine = 'fbgemm' # x86 architecture
best_pruned_model_copy = prepare_model_for_quantization(best_pruned_model_copy)
best_pruned_model_copy.qconfig = torch.quantization.get_default_qconfig('fbgemm')

# Prepare and calibrate
torch.quantization.prepare(best_pruned_model_copy, inplace=True)
perform_calibration(best_pruned_model_copy, test_loader, max_batches=10)

# Convert to quantized model
torch.quantization.convert(best_pruned_model_copy, inplace=True)

print("Quantization complete!")

Calibrating batch 1/10




Calibrating batch 2/10
Calibrating batch 3/10
Calibrating batch 4/10
Calibrating batch 5/10
Calibrating batch 6/10
Calibrating batch 7/10
Calibrating batch 8/10
Calibrating batch 9/10
Calibrating batch 10/10
Quantization complete!


In [25]:
pruned_quantized_model = copy.deepcopy(best_pruned_model_copy)
evaluate(pruned_quantized_model)
print("Size of pruned and quantized model: " + str(get_model_size(pruned_quantized_model, "lprnet_model_prune_" + str(sparsity) + "_quantized.onnx")) + " MB")

[Info] Test Accuracy: 0.723 [723:164:113:1000]
[Info] Test Speed: 0.026481295347213746s 1/1000]
Size of pruned and quantized model: 1.7072572708129883 MB


The test accuracy reduced fairly significantly from 90% to 71.4% with quantization, but the inference speed also improved dramatically from 188.4 ms to 28.8 ms. Unfortunately, the model size does not change when I save it to an ONNX format, so let's trying saving it using PyTorch

In [26]:
print("Size of full model saved using PyTorch: " + str(get_full_model_size_pytorch(pruned_quantized_model)) + " MB")

Size of full model saved using PyTorch: 0.536304 MB


From saving using PyTorch, which captures the weights and architecture but in a different format, I got a model size of 0.54 MB, which is significantly smaller than the 1.83 MB from pruning.

In [27]:
pruned_quantized_model

LPRNetWithQuant(
  (quant): Quantize(scale=tensor([0.3139]), zero_point=tensor([3]), dtype=torch.quint8)
  (dequant): DeQuantize()
  (model): LPRNet(
    (backbone): Sequential(
      (0): QuantizedConv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.3160325884819031, zero_point=56)
      (1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Sequential(
        (0): DeQuantize()
        (1): EnsureContiguous()
        (2): MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=0, dilation=1, ceil_mode=False)
        (3): Quantize(scale=tensor([0.1786]), zero_point=tensor([0]), dtype=torch.quint8)
      )
      (4): small_basic_block(
        (block): Sequential(
          (0): QuantizedConv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), scale=0.42272183299064636, zero_point=71)
          (1): ReLU()
          (2): QuantizedConv2d(32, 32, kernel_size=(3, 1), stride=(1, 1), scale=0.8551036715507507, zero_point=69,

## TVM Optimizations

Import Apache TVM

In [28]:
!pip install apache-tvm



In [29]:
import tvm
from tvm import relay
import onnx
from tvm.contrib import graph_executor

# Load the ONNX model and Convert to TVM
onnx_model = onnx.load("lprnet_model_prune_0.0145_quantized.onnx")

mod, params = relay.frontend.from_onnx(onnx_model, shape={"input": (100, 3, 24, 94)})

print(mod)

def @main(%input: Tensor[(100, 3, 24, 94), float32] /* ty=Tensor[(100, 3, 24, 94), float32] */) -> Tensor[(100, 68, 18), float32] {
  %0 = nn.conv2d(%input, meta[relay.Constant][0] /* ty=Tensor[(64, 3, 3, 3), float32] */, padding=[0, 0, 0, 0], channels=64, kernel_size=[3, 3]) /* ty=Tensor[(100, 64, 22, 92), float32] */;
  %1 = nn.bias_add(%0, meta[relay.Constant][1] /* ty=Tensor[(64), float32] */) /* ty=Tensor[(100, 64, 22, 92), float32] */;
  %2 = nn.relu(%1) /* ty=Tensor[(100, 64, 22, 92), float32] */;
  %3 = nn.avg_pool2d(%2, pool_size=[5, 5], strides=[5, 5], padding=[0, 0, 0, 0], count_include_pad=True) /* ty=Tensor[(100, 64, 4, 18), float32] */;
  %4 = power(%3, 2f /* ty=float32 */) /* ty=Tensor[(100, 64, 4, 18), float32] */;
  %5 = mean(%4, axis=[0, 1, 2, 3]) /* ty=float32 */;
  %6 = expand_dims(%2, axis=1) /* ty=Tensor[(100, 1, 64, 22, 92), float32] */;
  %7 = nn.max_pool3d(%6, pool_size=[1, 3, 3], padding=[0, 0, 0, 0, 0, 0]) /* ty=Tensor[(100, 1, 64, 20, 90), float32] */;
  %8 

In [30]:
target = tvm.target.Target("llvm", host="llvm")
dev = tvm.cpu(0)

In [31]:
def build_module(mod):
    with tvm.transform.PassContext(opt_level=3):
        lib = relay.build(mod, target=target, params=params)

    dtype = "float32"
    module = graph_executor.GraphModule(lib["default"](dev))

    return module

In [32]:
def Greedy_Decode_Eval_TVM(module, datasets, args):
    epoch_size = len(datasets) // args.test_batch_size
    batch_iterator = iter(DataLoader(datasets, args.test_batch_size, shuffle=True, num_workers=args.num_workers, collate_fn=collate_fn))

    Tp = 0
    Tn_1 = 0
    Tn_2 = 0
    t1 = time.time()

    for i in range(epoch_size):
        # Load batch data
        images, labels, lengths = next(batch_iterator)
        start = 0
        targets = []
        for length in lengths:
            label = labels[start:start + length]
            targets.append(label)
            start += length
        targets = np.array([el.numpy() for el in targets])
        imgs = images.numpy().copy()

        # Set TVM inputs and run inference
        module.set_input("input", tvm.nd.array(images.numpy()))
        module.run()
        prebs = module.get_output(0).asnumpy()

        # Greedy decode
        preb_labels = list()
        for i in range(prebs.shape[0]):
            preb = prebs[i, :, :]
            preb_label = list()
            for j in range(preb.shape[1]):
                preb_label.append(np.argmax(preb[:, j], axis=0))
            no_repeat_blank_label = list()
            pre_c = preb_label[0]
            if pre_c != len(CHARS) - 1:
                no_repeat_blank_label.append(pre_c)
            for c in preb_label:
                if (pre_c == c) or (c == len(CHARS) - 1):
                    if c == len(CHARS) - 1:
                        pre_c = c
                    continue
                no_repeat_blank_label.append(c)
                pre_c = c
            preb_labels.append(no_repeat_blank_label)

        # Evaluate accuracy
        for i, label in enumerate(preb_labels):
            if args.show:
                show(imgs[i], label, targets[i])
            if len(label) != len(targets[i]):
                Tn_1 += 1
                continue
            if (np.asarray(targets[i]) == np.asarray(label)).all():
                Tp += 1
            else:
                Tn_2 += 1

    Acc = Tp * 1.0 / (Tp + Tn_1 + Tn_2)
    print("[Info] Test Accuracy: {} [{}:{}:{}:{}]".format(Acc, Tp, Tn_1, Tn_2, (Tp + Tn_1 + Tn_2)))
    t2 = time.time()
    print("[Info] Test Speed: {}s 1/{}]".format((t2 - t1) / len(datasets), len(datasets)))

In [33]:
mod_base = build_module(mod)
Greedy_Decode_Eval_TVM(mod_base, test_dataset, args)



[Info] Test Accuracy: 0.897 [897:62:41:1000]
[Info] Test Speed: 0.043021236419677734s 1/1000]


As we can see, it seems like the accuracy loss has recovered after importing to TVM, going from 71.7% to 89.9%. Moreover, the speed has also remained consistent although slightly slower, from 28.9 ms to 45.8 ms.

Specific TVM Optimizations

TVM Quantization

In [34]:
mod_copy = copy.deepcopy(mod)

In [39]:
from tvm import relay
from tvm.contrib import graph_executor

with relay.quantize.qconfig(
    calibrate_mode="kl_divergence",
    global_scale=8.0,
):
    mod_optimized = relay.quantize.quantize(mod_copy, params)

TypeError: Traceback (most recent call last):
  7: TVMFuncCall
  6: tvm::runtime::PackedFuncObj::Extractor<tvm::runtime::PackedFuncSubObj<tvm::runtime::TypedPackedFunc<tvm::IRModule (tvm::transform::Pass, tvm::IRModule)>::AssignTypedLambda<tvm::transform::{lambda(tvm::transform::Pass, tvm::IRModule)#7}>(tvm::transform::{lambda(tvm::transform::Pass, tvm::IRModule)#7}, std::string)::{lambda(tvm::runtime::TVMArgs const&, tvm::runtime::TVMRetValue*)#1}> >::Call(tvm::runtime::PackedFuncObj const*, tvm::runtime::TVMArgs, tvm::runtime::TVMRetValue*)
  5: tvm::transform::Pass::operator()(tvm::IRModule) const
  4: tvm::transform::Pass::operator()(tvm::IRModule, tvm::transform::PassContext const&) const
  3: tvm::transform::SequentialNode::operator()(tvm::IRModule, tvm::transform::PassContext const&) const
  2: tvm::transform::Pass::operator()(tvm::IRModule, tvm::transform::PassContext const&) const
  1: tvm::transform::ModulePassNode::operator()(tvm::IRModule, tvm::transform::PassContext const&) const
  0: tvm::runtime::PackedFuncObj::Extractor<tvm::runtime::PackedFuncSubObj<TVMFuncCreateFromCFunc::{lambda(tvm::runtime::TVMArgs, tvm::runtime::TVMRetValue*)#2}> >::Call(tvm::runtime::PackedFuncObj const*, tvm::runtime::TVMArgs, tvm::runtime::TVMRetValue*) [clone .cold]
  File "tvm/_ffi/_cython/./packed_func.pxi", line 56, in tvm._ffi._cy3.core.tvm_callback
  File "/usr/local/lib/python3.10/dist-packages/tvm/relay/quantize/_calibrate.py", line 221, in wrapped_func
    input_scale_func = _kl_scale(mod, dataset)
  File "/usr/local/lib/python3.10/dist-packages/tvm/relay/quantize/_calibrate.py", line 97, in _kl_scale
    for samples in collect_stats(mod, dataset, chunk_by):
  File "/usr/local/lib/python3.10/dist-packages/tvm/relay/quantize/_calibrate.py", line 85, in collect_stats
    for batch in dataset:
TypeError: 'NoneType' object is not iterable

Computation Graph Optimization Pipeline

* Graph Simplification: SimplifyInference, FoldConstant, FoldScaleAxis
* Computation Optimization: FuseOps, EliminateCommonSubexpr
* Graph Pruning: DeadCodeElimination
* Layout Optimization: AlterOpLayout, ConvertLayout



In [40]:
from tvm.relay import transform

# Define optimization passes
passes = [
    transform.SimplifyInference(),  # Simplify inference computations (e.g., BatchNorm folding)
    transform.FuseOps(fuse_opt_level=2),  # Fuse Conv2D, ReLU, and BiasAdd into a single kernel
    transform.EliminateCommonSubexpr(),  # Remove duplicate computations
    transform.DeadCodeElimination(),  # Remove unused outputs and operations
    transform.AlterOpLayout(),  # Transform operations for better performance on the target hardware
    transform.FoldConstant(),  # Fold constant computations
    transform.FoldScaleAxis(),  # Fold scaling factors for performance gains
    transform.ConvertLayout({"conv2d": ["NCHW", "NHWC"]}),
]

# Apply the optimization pipeline
with tvm.transform.PassContext(opt_level=3):
    mod_optimized = tvm.transform.Sequential(passes)(mod_copy)

In [41]:
mod_optimized_copy = copy.deepcopy(mod_optimized)
mod_optimized_copy = build_module(mod_optimized_copy)
Greedy_Decode_Eval_TVM(mod_optimized_copy, test_dataset, args)

[Info] Test Accuracy: 0.899 [899:60:41:1000]
[Info] Test Speed: 0.04566181373596191s 1/1000]


AutoTVM

In [35]:
mod_optimized_autotvm = copy.deepcopy(mod_optimized)

NameError: name 'mod_optimized' is not defined

In [1]:
from tvm import auto_scheduler, relay, rpc
from tvm.contrib import utils, ndk
from tvm.relay import transform

# Define the tuning task
tasks, task_weights = auto_scheduler.extract_tasks(
    mod_optimized_autotvm["main"], params, target
)

# Define the log file to save the tuning records
log_file = "tuning_records.json"

# Set up the TaskScheduler
tuner = auto_scheduler.TaskScheduler(tasks, task_weights)

# Set up the runner and builder (adjust according to your environment)
builder = auto_scheduler.LocalBuilder()
runner = auto_scheduler.LocalRunner(repeat=10, min_repeat_ms=100, enable_cpu_cache_flush=True)

# Run the tuning process
tuner.tune(
    auto_scheduler.TuningOptions(
        num_measure_trials=500,  # Number of trials
        builder=builder,
        runner=runner,
        measure_callbacks=[auto_scheduler.RecordToFile(log_file)],
    )
)

NameError: name 'mod_optimized_autotvm' is not defined

In [None]:
from tvm import auto_scheduler

with auto_scheduler.ApplyHistoryBest("tuning_records.json"):
    with tvm.transform.PassContext(opt_level=3, config={"relay.backend.use_auto_scheduler": True}):
        lib_optimized_autotvm = relay.build(mod_optimized_autotvm, target=target, params=params)

Evaluating TVM Optimized Script

In [None]:
from tvm.contrib import graph_executor

dtype = "float32"
module_optimized_autotvm_graph = graph_executor.GraphModule(lib_optimized_autotvm["default"](dev))

In [None]:
Greedy_Decode_Eval_TVM(module_optimized_autotvm_graph, test_dataset, args)

In [None]:
# with tvm.transform.PassContext(opt_level=3):  # Level 3 is for heavy optimizations
#     mod = relay.transform.FoldConstant()(mod)  # Fold constants into the graph (e.g., for batchnorm, conv, etc.)
#     # mod = relay.transform.MergeComposite()(mod)  # This will try to merge compatible operators into one
#     mod = relay.transform.FuseOps()(mod)  # Fuse operations like Conv2d + ReLU into a single kernel

# # Step 4: Target the compilation (e.g., CPU or CUDA)
# target = "llvm"  # Use "cuda" for GPU, "llvm" for CPU
# dev = tvm.device(target, 0)

# # Step 5: Compile the model with the applied optimizations
# with tvm.transform.PassContext(opt_level=3):
#     # Apply the optimizations and build the model for the target
#     compiled_lib = relay.build(mod, target, params=params)

# # Step 6: Check the compiled model (optional)
# # You can print out the transformed Relay module or inspect the optimized operators
# print(mod)

def @main(%input: Tensor[(100, 3, 24, 94), float32] /* ty=Tensor[(100, 3, 24, 94), float32] */) -> Tensor[(100, 68, 18), float32] {
  %8 = fn (%p04: Tensor[(100, 3, 24, 94), float32] /* ty=Tensor[(100, 3, 24, 94), float32] */, %p12: Tensor[(64, 3, 3, 3), float32] /* ty=Tensor[(64, 3, 3, 3), float32] */, %p22: Tensor[(64), float32] /* ty=Tensor[(64), float32] */, Primitive=1) -> Tensor[(100, 64, 22, 92), float32] {
    %6 = nn.conv2d(%p04, %p12, padding=[0, 0, 0, 0], channels=64, kernel_size=[3, 3]) /* ty=Tensor[(100, 64, 22, 92), float32] */;
    %7 = nn.bias_add(%6, %p22) /* ty=Tensor[(100, 64, 22, 92), float32] */;
    nn.relu(%7) /* ty=Tensor[(100, 64, 22, 92), float32] */
  } /* ty=fn (Tensor[(100, 3, 24, 94), float32], Tensor[(64, 3, 3, 3), float32], Tensor[(64), float32]) -> Tensor[(100, 64, 22, 92), float32] */;
  %9 = %8(%input, meta[relay.Constant][0] /* ty=Tensor[(64, 3, 3, 3), float32] */, meta[relay.Constant][1] /* ty=Tensor[(64), float32] */) /* ty=Tensor[(100, 64, 22, 92)

In [None]:
# from tvm import autotvm

# # Example: Define a task and tune it
# task = autotvm.task.create("conv2d", args=(...), target="llvm")  # Adjust for your model
# measure_option = autotvm.measure_option(
#     builder=autotvm.LocalBuilder(),
#     runner=autotvm.LocalRunner(number=10, repeat=1, timeout=10)
# )
# tuner = autotvm.tuner.XGBTuner(task)
# tuner.tune(n_trial=1000, measure_option=measure_option, callbacks=[autotvm.callback.log_to_file("tuning.log")])

In [None]:
# module = graph_executor.GraphModule(compiled_lib["default"](dev))

In [None]:
# # Create a runtime executor
# device = tvm.cpu()  # Replace with `tvm.cpu()` for CPU
# module = graph_executor.GraphModule(lib["default"](device))

# # Set input data
# import numpy as np
# input_data = np.random.rand(1, 3, 24, 94).astype("float32")
# module.set_input("input", tvm.nd.array(input_data))

# # Run the model
# module.run()

# # Get output
# output = module.get_output(0).asnumpy()

array([[[-45.843307 , -26.777699 , -29.277887 , ..., -37.8795   ,
         -26.984959 , -39.790222 ],
        [-56.843357 , -35.934715 , -44.665154 , ..., -51.765587 ,
         -43.806007 , -48.38035  ],
        [-60.37148  , -40.050667 , -41.848427 , ..., -56.75559  ,
         -59.2636   , -60.169075 ],
        ...,
        [-40.721634 , -63.13036  , -35.909454 , ..., -42.47039  ,
         -35.262417 , -42.471115 ],
        [-51.038795 , -84.98663  , -50.540886 , ..., -44.51516  ,
         -44.781967 , -44.84289  ],
        [ 41.37358  , -52.763165 ,   0.4645604, ..., -15.351716 ,
           6.4474   , -11.702883 ]]], dtype=float32)

Acknowledgements: I would like to thank fellow classmate Adam Scott for giving me helpful advice when I was stuck on this project.

## Importing Model to ONNX

In [None]:
# import onnxruntime as ort

# def test_onnx(onnx_file_path):
#     ort_session = ort.InferenceSession(onnx_file_path)
#     print("Loaded ONNX model successfully!")

#     test_img_dirs = os.path.expanduser(args.test_img_dirs)
#     test_dataset = LPRDataLoader(test_img_dirs.split(','), args.img_size, args.lpr_max_len)
#     try:
#         Greedy_Decode_Eval_ONNX(ort_session, test_dataset, args)
#     finally:
#         cv2.destroyAllWindows()

# def Greedy_Decode_Eval_ONNX(ort_session, datasets, args):
#     epoch_size = len(datasets) // args.test_batch_size
#     batch_iterator = iter(DataLoader(datasets, args.test_batch_size, shuffle=True, num_workers=args.num_workers, collate_fn=collate_fn))

#     Tp = 0
#     Tn_1 = 0
#     Tn_2 = 0
#     t1 = time.time()
#     for i in range(epoch_size):
#         # Load batch data
#         images, labels, lengths = next(batch_iterator)
#         start = 0
#         targets = []
#         for length in lengths:
#             label = labels[start:start+length]
#             targets.append(label)
#             start += length
#         targets = np.array([el.numpy() for el in targets])
#         imgs = images.numpy().copy()

#         # ONNX inference
#         ort_inputs = {ort_session.get_inputs()[0].name: images.numpy()}
#         prebs = ort_session.run(None, ort_inputs)[0]

#         # Greedy decode
#         preb_labels = list()
#         for i in range(prebs.shape[0]):
#             preb = prebs[i, :, :]
#             preb_label = list()
#             for j in range(preb.shape[1]):
#                 preb_label.append(np.argmax(preb[:, j], axis=0))
#             no_repeat_blank_label = list()
#             pre_c = preb_label[0]
#             if pre_c != len(CHARS) - 1:
#                 no_repeat_blank_label.append(pre_c)
#             for c in preb_label:
#                 if (pre_c == c) or (c == len(CHARS) - 1):
#                     if c == len(CHARS) - 1:
#                         pre_c = c
#                     continue
#                 no_repeat_blank_label.append(c)
#                 pre_c = c
#             preb_labels.append(no_repeat_blank_label)

#         # Evaluate accuracy
#         for i, label in enumerate(preb_labels):
#             if args.show:
#                 show(imgs[i], label, targets[i])
#             if len(label) != len(targets[i]):
#                 Tn_1 += 1
#                 continue
#             if (np.asarray(targets[i]) == np.asarray(label)).all():
#                 Tp += 1
#             else:
#                 Tn_2 += 1
#     Acc = Tp * 1.0 / (Tp + Tn_1 + Tn_2)
#     print("[Info] Test Accuracy: {} [{}:{}:{}:{}]".format(Acc, Tp, Tn_1, Tn_2, (Tp + Tn_1 + Tn_2)))
#     t2 = time.time()
#     print("[Info] Test Speed: {}s 1/{}]".format((t2 - t1) / len(datasets), len(datasets)))
