In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.models.quantization as models
import torch.nn.utils.prune as prune
import os
import copy
import torchsummary
import help
from help import helper_functions
from timeit import default_timer as timer

In [2]:
torch.__version__, torchvision.__version__

('2.3.1+cpu', '0.18.1+cpu')

In [3]:
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [4]:
trainset = datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64,
                                          shuffle=True, num_workers=16, pin_memory=True)

testset = datasets.CIFAR10(root='./data', train=False,
                                        download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=32,
                                          shuffle=True, num_workers=16, pin_memory= True)

Files already downloaded and verified
Files already downloaded and verified


In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [6]:
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample= None, stride= 1):
        super().__init__()

        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size= 3, stride= stride, padding= 1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size= 3,padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace= True)
        self.identity_downsample = identity_downsample
        self.skip_add = nn.quantized.FloatFunctional()

    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        # x = self.relu(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        # x += identity
        x = self.skip_add.add(x, identity)
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, BasicBlock, layers, image_channels, num_classes):
        super().__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size= 7, stride= 2, padding= 3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace= True)
        self.maxpool = nn.MaxPool2d(kernel_size= 3, stride= 2, padding= 1)

        self.layer1 = self._make_layer(BasicBlock, layers[0], out_channels= 64, stride= 1)
        self.layer2 = self._make_layer(BasicBlock, layers[1], out_channels= 128, stride= 2)
        self.layer3 = self._make_layer(BasicBlock, layers[2], out_channels= 256, stride= 2)
        self.layer4 = self._make_layer(BasicBlock, layers[3], out_channels= 512, stride= 2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512, num_classes)
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()

    def forward(self, x):
        # print(x.shape)
        x = self.quant(x)
        x = self.conv1(x)
        # print(x.shape)
        x = self.bn1(x)
        # print(x.shape)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
    
        x = self.avgpool(x)
        x = self.dequant(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        

        return x


    def _make_layer(self, BasicBlock, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []
        if stride != 1 or self.in_channels != out_channels:
            identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels, out_channels , kernel_size= 1, stride= stride),
                                               nn.BatchNorm2d(out_channels))
        layers.append(BasicBlock(self.in_channels, out_channels, identity_downsample, stride))
        self.in_channels = out_channels

        for i in range(num_residual_blocks - 1):
            layers.append(BasicBlock(self.in_channels, out_channels))

        return nn.Sequential(*layers)

def ResNet18(img_channels= 3, num_classes= 10):
    return ResNet(BasicBlock, [2, 2, 2, 2], img_channels, num_classes)


In [7]:
model = ResNet18()
print(model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (skip_add): FloatFunctional(
        (activation_post_process): Identity()
      )
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=

In [15]:
for  name,_ in model.named_modules():
    print(name)


conv1
bn1
relu
maxpool
layer1
layer1.0
layer1.0.conv1
layer1.0.bn1
layer1.0.conv2
layer1.0.bn2
layer1.0.relu
layer1.0.skip_add
layer1.0.skip_add.activation_post_process
layer1.1
layer1.1.conv1
layer1.1.bn1
layer1.1.conv2
layer1.1.bn2
layer1.1.relu
layer1.1.skip_add
layer1.1.skip_add.activation_post_process
layer2
layer2.0
layer2.0.conv1
layer2.0.bn1
layer2.0.conv2
layer2.0.bn2
layer2.0.relu
layer2.0.identity_downsample
layer2.0.identity_downsample.0
layer2.0.identity_downsample.1
layer2.0.skip_add
layer2.0.skip_add.activation_post_process
layer2.1
layer2.1.conv1
layer2.1.bn1
layer2.1.conv2
layer2.1.bn2
layer2.1.relu
layer2.1.skip_add
layer2.1.skip_add.activation_post_process
layer3
layer3.0
layer3.0.conv1
layer3.0.bn1
layer3.0.conv2
layer3.0.bn2
layer3.0.relu
layer3.0.identity_downsample
layer3.0.identity_downsample.0
layer3.0.identity_downsample.1
layer3.0.skip_add
layer3.0.skip_add.activation_post_process
layer3.1
layer3.1.conv1
layer3.1.bn1
layer3.1.conv2
layer3.1.bn2
layer3.1.relu

In [8]:
helper_functions.count_nonzero_params(model)

The number of non-zero parameters : 11186442
The number of zero parameters : 4801


In [9]:
model.load_state_dict(torch.load('resnet_with_10_class.pth', map_location=device))

<All keys matched successfully>

In [10]:
helper_functions.count_nonzero_params(model)

The number of non-zero parameters : 11186442
The number of zero parameters : 1117203


In [None]:
helper_functions.test(model, testloader, device)

In [None]:
helper_functions.print_size_of_model(model)

In [None]:
def fuse_model_layers(model):
    for name, module in model.named_children():
        if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
            # Fuse Conv2d and BatchNorm2d
            if hasattr(module, 'bn'):
                torch.quantization.fuse_modules(module, ['weight', 'bias', 'bn.weight', 'bn.bias', 'bn.running_mean', 'bn.running_var'], inplace=True)
        elif isinstance(module, nn.BatchNorm2d):
            # Fuse BatchNorm2d
            if hasattr(model, 'conv'):
                torch.quantization.fuse_modules(model, [name, 'conv'], inplace=True)


In [None]:
dynamic_quantized = torch.quantization.quantize_dynamic(model, {nn.Linear}, dtype=torch.qint8 )

In [None]:
helper_functions.test(dynamic_quantized, testloader, device)

In [None]:
helper_functions.print_size_of_model(dynamic_quantized)

In [None]:
# model.eval()
# # modules_to_fuse =   [  ['conv1', 'bn1', 'relu'],
# #     ['layer1.0.conv1', 'layer1.0.bn1', 'layer1.0.relu'],
# #     ['layer1.0.conv2', 'layer1.0.bn2'],
# #     ['layer1.1.conv1', 'layer1.1.bn1', 'layer1.1.relu'],
# #     ['layer1.1.conv2', 'layer1.1.bn2'],
# #     ['layer2.0.conv1', 'layer2.0.bn1', 'layer2.0.relu'],
# #     ['layer2.0.conv2', 'layer2.0.bn2'],
# #     ['layer2.0.identity_downsample.0', 'layer2.0.identity_downsample.1'],
# #     ['layer2.1.conv1', 'layer2.1.bn1', 'layer2.1.relu'],
# #     ['layer2.1.conv2', 'layer2.1.bn2'],
# #     ['layer3.0.conv1', 'layer3.0.bn1', 'layer3.0.relu'],
# #     ['layer3.0.conv2', 'layer3.0.bn2'],
# #     ['layer3.0.identity_downsample.0', 'layer3.0.identity_downsample.1'],
# #     ['layer3.1.conv1', 'layer3.1.bn1', 'layer3.1.relu'],
# #     ['layer3.1.conv2', 'layer3.1.bn2'],
# #     ['layer4.0.conv1', 'layer4.0.bn1', 'layer4.0.relu'],
# #     ['layer4.0.conv2', 'layer4.0.bn2'],
# #     ['layer4.0.identity_downsample.0', 'layer4.0.identity_downsample.1'],
# #     ['layer4.1.conv1', 'layer4.1.bn1', 'layer4.1.relu'],
# #     ['layer4.1.conv2', 'layer4.1.bn2'],
# # ]

# modules_to_fuse = [
#     ['conv1', 'bn1', 'relu'],
#     ['layer1.0.conv2', 'layer1.0.bn2'],
#     ['layer2.0.identity_downsample.0', 'layer2.0.identity_downsample.1'],
#     ['layer2.1.conv1', 'layer2.1.bn1', 'layer2.1.relu'],
#     ['layer2.1.conv2', 'layer2.1.bn2'],
#     ['layer3.0.identity_downsample.0', 'layer3.0.identity_downsample.1'],
#     ['layer3.1.conv1', 'layer3.1.bn1', 'layer3.1.relu'],
   
# ]
modules_to_fuse = [['conv1', 'bn1'],
                   ['layer1.0.conv1', 'layer1.0.bn1'],
                   ['layer1.0.conv2', 'layer1.0.bn2'],
                   ['layer1.1.conv1', 'layer1.1.bn1'],
                   ['layer1.1.conv2', 'layer1.1.bn2'],
                   ['layer2.0.conv1', 'layer2.0.bn1'],
                   ['layer2.0.conv2', 'layer2.0.bn2'],
                   ['layer2.0.identity_downsample.0', 'layer2.0.identity_downsample.1'],
                   ['layer2.1.conv1', 'layer2.1.bn1'],
                   ['layer2.1.conv2', 'layer2.1.bn2'],
                   ['layer3.0.conv1', 'layer3.0.bn1'],
                   ['layer3.0.conv2', 'layer3.0.bn2'],
                   ['layer3.0.identity_downsample.0', 'layer3.0.identity_downsample.1'],
                   ['layer3.1.conv1', 'layer3.1.bn1'],
                   ['layer3.1.conv2', 'layer3.1.bn2'],
                   ['layer4.0.conv1', 'layer4.0.bn1'],
                   ['layer4.0.conv2', 'layer4.0.bn2'],
                   ['layer4.0.identity_downsample.0', 'layer4.0.identity_downsample.1'],
                   ['layer4.1.conv1', 'layer4.1.bn1'],
                   ['layer4.1.conv2', 'layer4.1.bn2']]


dynamic_quantized = torch.quantization.fuse_modules(dynamic_quantized, modules_to_fuse)

# fuse_model_layers(dynamic_quantized)

In [None]:
backend = "qnnpack"
dynamic_quantized.qconfig = torch.quantization.get_default_qconfig(backend)
torch.backends.quantized.engine = backend
static_quantized = torch.quantization.prepare(dynamic_quantized, inplace=True)
# dynamic_quantized.qconfig = torch.quantization.default_qconfig
# static_quantized = torch.quantization.prepare(dynamic_quantized, inplace=True)

In [None]:
small_test_data = helper_functions.slice_dataloader(testloader, 0, 1000)

In [None]:
helper_functions.caliberate(static_quantized,small_test_data, device)

In [None]:
torch.quantization.convert(static_quantized, inplace=True)

In [None]:
helper_functions.test(static_quantized, testloader, device)

In [None]:
helper_functions.print_size_of_model(static_quantized)

In [None]:
torch.save(static_quantized.state_dict(), 'Resnet18QuantizedModel.pth')

In [None]:
# torch.save(static_quantized, "resnet18quantizedmodelarch.pth")

In [None]:
# # scripted = torch.jit.script(static_quantized)
# import io
# # b = io.BytesIO()
# # torch.jit.save(scripted, b)

# b = io.BytesIO()
# torch.save(static_quantized.state_dict(), 'model.pth')

In [None]:
# with open('quantized_model.pt', 'wb') as f:
#     f.write(b.getvalue())


In [None]:
# torch.jit.save(torch.jit.script(static_quantized), "quant_model.pth")

In [None]:
# torch.jit.save(torch.jit.script(static_quantized), 'jitmodelfile.pth')

In [None]:
# scripted = torch.jit.script(static_quantized)

# # Save the scripted model to a BytesIO buffer
# buffer = io.BytesIO()
# torch.jit.save(scripted, buffer)

# # Reset the buffer's position to the beginning
# buffer.seek(0)

# # Load the scripted model from the BytesIO buffer
# scripted_quantized = torch.jit.load(buffer)

In [None]:
# scripted_quantized

In [None]:
# input_tensor = torch.randn(1, 3, 224, 224)  # Example input tensor
# output = scripted_quantized(input_tensor)
# print(output)

In [None]:
# data_path = '~/.data/resnet'
# saved_model_dir = 'data/'
# float_model_file = 'mobilenet_pretrained_float.pth'
# scripted_float_model_file = 'mobilenet_quantization_scripted.pth'
# scripted_quantized_model_file = 'mobilenet_quantization_scripted_quantized.pth'

In [None]:
# torch.jit.save(torch.jit.script(static_quantized), saved_model_dir + scripted_float_model_file)

In [None]:
# def load_model(model_file):
#     model = ResNet18()
#     state_dict = torch.jit.load(model_file)
#     model.load_state_dict(state_dict)
#     # model.to('cpu')
#     return model

In [None]:
# model = ResNet18()

In [None]:
# model.load_state_dict(torch.jit.load('jitmodelfile.pth', map_location='cpu'))
# model = torch.jit.load('jitmodelfile.pth', map_location='cpu')

In [None]:
# model


In [None]:
# input_tensor = torch.randn(1, 3, 224, 224)  # Example input tensor
# output = model(input_tensor)


In [None]:
script = torch.jit.script(static_quantized)
script.save('scriptedModel.pth')

In [None]:
# model = torch.jit.load('scriptedMOdel.pth', map_location='cpu')

In [None]:
# helper_functions.test(model, testloader, device)

In [None]:
print(torch.__version__)
print(torchvision.__version__)

In [None]:
# def _fuse_modules(
#     model: nn.Module, modules_to_fuse: Union[List[str], List[List[str]]], is_qat: Optional[bool], **kwargs: Any
# ):
#     if is_qat is None:
#         is_qat = model.training
#     method = torch.ao.quantization.fuse_modules_qat if is_qat else torch.ao.quantization.fuse_modules
#     return method(model, modules_to_fuse, **kwargs)