In [13]:
import torch
import math
import os
import torch.optim as optim
import torchmetrics
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image
import natsort
import numpy as np
from torch.quantization import quantize_fx
from torch.ao.quantization import QConfigMapping
import copy
import warnings
warnings.filterwarnings('ignore')

In [14]:
dev = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [15]:
preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

class GetDataset(Dataset):
    def __init__(self):
        super(GetDataset, self).__init__()
        self.path_input = "C:/Users/orsolya.bankovi/Documents/Uni/deepLearning_project/D41_ILRSVRC2012_224"
        self.input = os.listdir(self.path_input)
        self.input_names = list(filter(lambda x: x.endswith(".png"), list(self.input)))
        self.sorted_input = natsort.natsorted(self.input_names)
        self.input_images = []
        for count, images in enumerate(self.sorted_input[:1000]):
            input_image = Image.open(self.path_input + '/' + images)
            input_tensor = preprocess(input_image).float()
            self.input_images.append(input_tensor)
            
        self.labels = []
        with open("C:/Users/orsolya.bankovi/Documents/Uni/deepLearning_project/D41_ILRSVRC2012_224/labels.txt", 'r') as f:
            for i in range(1000):
                line = f.readline()
                self.labels.append(int(line))

    def __getitem__(self, index):
        return self.input_images[index], self.labels[index]

    def __len__(self):
        return len(self.input_images)

In [16]:
class ImageNetTest():
    def __init__(self, batch_size):
        super(ImageNetTest, self).__init__()
        self.batch_size = batch_size
        
    def test(self, net, testset, dev):
        test_loader = torch.utils.data.DataLoader(testset, self.batch_size, shuffle=False)
        net = net.to(dev)
        net.train(False)
        count = 0
        correct = 0


        with torch.no_grad():
            for data, target in test_loader:
                data = data.to(dev)
                target = target.to(dev)
                output = net(data)
                probabilities = torch.nn.functional.softmax(output, dim=1)
                pred = probabilities.argmax(dim=1, keepdim=True)
                if pred == target:
                    correct += 1
                count += 1
        print('Accuracy: ', round(correct/count, 4))
        return round(correct/count, 4)

In [17]:
def quantize_model(model, calibdata, dev, backend="fbgemm"):
    torch.no_grad()
    m = copy.deepcopy(model).to(dev)
    m.eval()

    example_inputs = torch.unsqueeze(calibdata.input_images[10], dim=0).to(dev)
    qconfig = torch.quantization.get_default_qconfig(backend)
    qconfig_mapping = QConfigMapping().set_global(qconfig)

    model_prepared = quantize_fx.prepare_fx(m, qconfig_mapping, example_inputs)

    with torch.inference_mode():
        for i in range(10):
            x = torch.unsqueeze(calibdata.input_images[i], dim=0).to(dev)
            model_prepared(x)
            
    model_quantized = quantize_fx.convert_fx(model_prepared)

    return model_quantized

In [18]:
class PTQ():
    def __init__(self, model, calibdata, dev):
        super(PTQ, self).__init__()
        self.model = model
        self.calibdata = calibdata
        self.dev = dev
        self.hooks_names = []
        self.hook_values = []
        self.quant_hook_values = []

        self.target_modules = {'Conv2d', 'Linear', 'ConvTranspose2d', 'Conv1d', 'Conv3d'}
        
        self.indices = [100, 214, 220, 544, 831]

    def get_hooks(self, model, target_modules, quantized=False):
        """Get hooks for given model and target modules."""
        if quantized:
            self.quant_hook_values.clear()
        else:    
            self.hook_values.clear()
        m = copy.deepcopy(model)

        def hook_fn(module, input, output):
            if quantized:
                self.quant_hook_values.append(torch.dequantize(output.detach().cpu()))
            else:
                self.hook_values.append(output.detach().cpu().float())

        self.hooks_names.clear()
        hooks = []
        for module in m.named_modules():
            if module[1].__class__.__name__ in target_modules:
                self.hooks_names.append(module[0])
                hook = module[1].register_forward_hook(hook_fn)
                hooks.append(hook)
                
        rand_inputs = [self.calibdata.input_images[i] for i in self.indices]
        with torch.inference_mode():
            x = torch.stack(rand_inputs, dim=0).to(self.dev)
            m(x)

        for hook in hooks:
            hook.remove()

    def get_module(self, model, name):
        return dict(model.named_modules())[name]

    def get_param(self, module, attr):
        param = getattr(module, attr, None)
        if callable(param):
            return param()
        else:
            return param

    def get_quantized_model(self, model, backend):
        torch.no_grad()
        m = copy.deepcopy(model)
        m.eval()

        qconfig = torch.quantization.get_default_qconfig(backend)
        qconfig_mapping = QConfigMapping().set_global(qconfig)

        m = quantize_fx.fuse_fx(m)
        m = torch.quantization.QuantWrapper(m)
        model_prepared = torch.quantization.prepare(m, qconfig_mapping)

        with torch.inference_mode():
            for i in range(10):
                x = torch.unsqueeze(self.calibdata.input_images[i], dim=0).to(self.dev)
                model_prepared(x)

        model_quantized = torch.quantization.convert(model_prepared)

        self.get_hooks(model_quantized, target_modules=self.target_modules,
                       quantized=True)

        return model_quantized


    def bias_correction(self, model, target_modules, backend):
        i=0
        for _, submodule in model.named_modules():
            if submodule.__class__.__name__ in target_modules:
                bias = self.get_param(submodule, 'bias')
                if bias is not None:
                    self.get_quantized_model(model, backend)
                    f = self.quant_hook_values[i]
                    g = self.hook_values[i]
                    eps = torch.mean(f - g)

                    #print("eps data:", eps)
                    bias.data[::] -= eps
                i += 1
        return model

    def quantize_model(self, backend="fbgemm"):
        torch.no_grad()
        m = copy.deepcopy(self.model).to(self.dev)
        m.eval()

        self.get_hooks(self.model, target_modules=self.target_modules)

        model_bias_corr = self.bias_correction(m, target_modules=self.target_modules, backend=backend)
        model_bias_corr.eval()

        model_quantized_bias_corr = quantize_model(model_bias_corr, self.calibdata, self.dev, backend)

        return model_quantized_bias_corr

In [48]:
class QuantizationAwareTraining():
    def __init__(self, trainset, testset, batch_size):
        super(QuantizationAwareTraining, self).__init__()
        self.dev = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.batch_size = batch_size
        self.trainset = trainset
        self.testset = testset
        self.Dice = torchmetrics.Dice(zero_division=1.0, threshold=0.5).to(self.dev)

    def train(self, net):
        train_loader = torch.utils.data.DataLoader(self.trainset, self.batch_size, shuffle=True)
        criterion = nn.BCELoss().to(self.dev)
        optimizer = optim.Adam(net.parameters(), lr=0.0001)
        net.train(True)
        best_epoch = 0
        best_dice = 0.0
        best_net = net

        for epoch in range(10):            
            dice_losses = []
        
            for data, target in train_loader:
                data = data.to(self.dev)
                target = target.to(self.dev)
                optimizer.zero_grad()  # clear the gradient

                output = net(data)  # forward propagation
                loss = criterion(output, target)  # calculate loss
                
                loss.backward()  # current loss
                optimizer.step()  # update parameters

                dice_loss_value = self.Dice(output, target.int())
                dice_losses.append(dice_loss_value.item())
            
            if best_dice < np.average(dice_losses[-len(train_loader)//self.batch_size:]):
                best_dice = np.average(dice_losses[-len(train_loader)//self.batch_size:])
                best_epoch = epoch
                best_net = net
        print('best epoch: ', best_epoch)
        print('best dice: ', best_dice)            
        
        return best_net

    def quantize_model(self, model, calibdata):
        model = model.to(self.dev)
        torch.no_grad()
        m = copy.deepcopy(model)
        m.eval()
        
        example_inputs = torch.unsqueeze(calibdata.input_images[10], dim=0)
        
        qconfig = torch.quantization.get_default_qat_qconfig("qnnpack", 1)
        qconfig_mapping = QConfigMapping().set_global(qconfig)

        model_prepared = quantize_fx.prepare_qat_fx(m, qconfig_mapping, example_inputs)

        with torch.inference_mode():
            for i in range(10):
                x = torch.unsqueeze(calibdata.input_images[i], dim=0).to(self.dev)
                model_prepared(x)
                    
        trained_model = self.train(model_prepared)
        trained_model = trained_model.to(torch.device('cpu'))
        model_quantized = quantize_fx.convert_fx(trained_model)
        model_quantized.train(False)
        model_quantized.eval()
        return model_quantized

In [20]:
def evaluate_model(model, name, testset, dev, batch_size):
    test = ImageNetTest(batch_size)
    print(name, 'FP32')
    model = model.to(dev)
    fp32 = test.test(model, testset, dev)
    dev = torch.device("cpu")
    model = model.to(dev)
    model_int8 = quantize_model(model, testset, dev)
    print(name, 'INT8')
    int8 =  test.test(model_int8, testset, dev)
    ptq_ = PTQ(model, testset, dev)
    bias_corr = ptq_.quantize_model()
    print(name, 'PTQ INT8')
    ptq_int8 = test.test(bias_corr, testset, dev)
    
    return fp32, int8, ptq_int8

In [21]:
class GetSegmentationDataset(Dataset):
    def __init__(self, input_path, target_path, bool_augmentation):
        super(GetSegmentationDataset, self).__init__()
        self.preprocess_input = transforms.Compose([
                      transforms.Resize(256),
                      transforms.ToTensor(),
            ])

        self.preprocess_target = transforms.Compose([
                            transforms.Resize(256, interpolation=transforms.InterpolationMode.NEAREST),
                            transforms.ToTensor(),
                    ])

        self.augmentation = transforms.Compose([
                transforms.RandomRotation(30, interpolation=Image.NEAREST), 
                transforms.Resize(256, interpolation=Image.NEAREST),
                transforms.ToTensor()])
        self.input_images = self.load_images(input_path, bool_augmentation, self.preprocess_input)
        self.target_images = self.load_images(target_path, bool_augmentation, self.preprocess_target)
        print(self.target_images[0].shape)

    def load_images(self, path, bool_augmentation, preprocess_func):
        images = os.listdir(path)
        image_names = list(filter(lambda x: x.endswith(".png"), images))
        sorted_images = natsort.natsorted(image_names)
        loaded_images = []

        for count, image_name in enumerate(sorted_images):
            image = Image.open(os.path.join(path, image_name)).convert('L')
            image_tensor = preprocess_func(image).float()
            loaded_images.append(image_tensor)

            if bool_augmentation and count % 3 == 0:
                image_tensor = self.augmentation(image).float()
                loaded_images.append(image_tensor)

        return loaded_images

    def __getitem__(self, index):
        return self.input_images[index], self.target_images[index]

    def __len__(self):
        return len(self.input_images)

In [41]:
import torchmetrics

class SegmentationTest():
    def __init__(self, dev, batch_size):
        super(SegmentationTest, self).__init__()
        self.dev = dev
        self.batch_size = batch_size
        self.Dice = torchmetrics.Dice(zero_division=1.0, threshold=0.5).to(self.dev)
        
    def test(self, net, test_loader, name):
        test_dice_losses = []
        net.train(False)

        with torch.no_grad():
            for data, target in test_loader:
                data = data.to(self.dev)
                target = target.to(self.dev)
                output = net(data)                
                dice_loss_value = self.Dice(output, target.int())
                test_dice_losses.append(dice_loss_value.item())
                
        print('Test dice loss: ', np.average(np.asarray(test_dice_losses)))
        
        with open('./segmentation_results/' + name + '/test_results.txt', 'w') as f:
            f.write('Test dice loss: ' + str(np.average(np.asarray(test_dice_losses))) + '\n')


In [23]:
"""Image Classification"""
testset = GetDataset()

In [24]:
with open('./imagenet_accuracy.txt', 'w') as f:
        efficientnet = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_b0', pretrained=True)
        fp32, int8, ptq_int8 = evaluate_model(efficientnet, 'EfficientNet_b0', testset, dev, 1)
        f.write('EfficientNet FP32: ' + str(fp32) + '\n')
        f.write('EfficientNet INT8: ' + str(int8) + '\n')
        f.write('EfficientNet PTQ INT8: ' + str(ptq_int8) + '\n')

        maxvit_t = torch.hub.load('pytorch/vision', 'maxvit_t', pretrained=True)
        fp32, int8, ptq_int8 = evaluate_model(maxvit_t, 'MaxViT', testset, dev, 1)
        f.write('MaxViT FP32: ' + str(fp32) + '\n')
        f.write('MaxViT INT8: ' + str(int8) + '\n')
        f.write('MaxViT PTQ INT8: ' + str(ptq_int8) + '\n')

Using cache found in C:\Users\orsolya.bankovi/.cache\torch\hub\NVIDIA_DeepLearningExamples_torchhub


EfficientNet_b0 FP32
Accuracy:  0.743
EfficientNet_b0 INT8
Accuracy:  0.602
EfficientNet_b0 PTQ INT8
Accuracy:  0.633


Using cache found in C:\Users\orsolya.bankovi/.cache\torch\hub\pytorch_vision_main


MaxViT FP32
Accuracy:  0.818
MaxViT INT8
Accuracy:  0.049
MaxViT PTQ INT8
Accuracy:  0.102


In [45]:
train_set=GetSegmentationDataset(input_path='./unet_/train/original', 
                            target_path='./unet_/train/inverse', 
                            bool_augmentation=True)
test_set = GetSegmentationDataset(input_path='./unet_/test/original', 
                        target_path='./unet_/test/inverse', 
                        bool_augmentation=False)
test_loader = torch.utils.data.DataLoader(test_set, 1, shuffle=True)


torch.Size([1, 256, 256])
torch.Size([1, 256, 256])


In [49]:
import unet

UNet = torch.load('./unet_/finished_trained_net.pt' )

dev = torch.device("cpu")
UNet = UNet.to(dev)
Test = SegmentationTest(dev=dev, batch_size=1)
print('UNet FP32')
Test.test(UNet, test_loader, 'original')
unet_int8 = quantize_model(UNet, test_set, dev, backend="qnnpack")
print('UNet INT8')
Test.test(unet_int8, test_loader, 'INT8')
ptq_unet = PTQ(UNet, test_set, dev)
bias_corr = ptq_unet.quantize_model(backend="qnnpack")
print('UNet PTQ INT8')
Test.test(bias_corr, test_loader, 'int8_ptq')
qat = QuantizationAwareTraining(train_set, test_set, batch_size=8)
qat_net = qat.quantize_model(UNet, train_set)
print('UNet QAT INT8')
Test.test(qat_net, test_loader, 'int8_qat')

UNet FP32
Test dice loss:  0.9023179816525607
UNet INT8
Test dice loss:  0.8349785774691207
UNet PTQ INT8
Test dice loss:  0.8363506468722508
best epoch:  9
best dice:  0.6170561487072861
UNet QAT INT8
Test dice loss:  0.8705367499518962


In [51]:
alexnet = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=True)
fp32, int8, ptq_int8 = evaluate_model(alexnet, 'AlexNet', testset, dev, 1)

shufflenet = torch.hub.load('pytorch/vision:v0.10.0', 'shufflenet_v2_x1_0', pretrained=True)
fp32, int8, ptq_int8 = evaluate_model(shufflenet, 'ShuffleNet', testset, dev, 1)

Using cache found in C:\Users\orsolya.bankovi/.cache\torch\hub\pytorch_vision_v0.10.0


AlexNet FP32
Accuracy:  0.539
AlexNet INT8
Accuracy:  0.528
AlexNet PTQ INT8
Accuracy:  0.528
ShuffleNet FP32


Using cache found in C:\Users\orsolya.bankovi/.cache\torch\hub\pytorch_vision_v0.10.0


Accuracy:  0.669
ShuffleNet INT8
Accuracy:  0.653
ShuffleNet PTQ INT8
Accuracy:  0.653
