In [2]:
import torch
from torch import nn
from torch.quantization import get_default_qconfig, prepare, convert, fuse_modules
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, Subset, random_split
# from Model.Schema import resModel
from tqdm import tqdm
from PIL import Image


from torch.quantization import QuantStub, DeQuantStub
import torchmetrics


In [3]:
from torch.nn.quantized import FloatFunctional

class QuantizedBasicBlock(nn.Module):
    def __init__(self, basic_block):
        super(QuantizedBasicBlock, self).__init__()
        self.conv1 = basic_block.conv1
        self.bn1 = basic_block.bn1
        self.relu = basic_block.relu
        self.conv2 = basic_block.conv2
        self.bn2 = basic_block.bn2
        self.downsample = basic_block.downsample
        self.stride = basic_block.stride
        self.add = FloatFunctional()

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out = self.add.add(out, identity)
        out = self.relu(out)
        return out

class resModel(nn.Module):
    def __init__(self, num_classes):
        super(resModel, self).__init__()
        self.model = models.resnet18(weights="IMAGENET1K_V1")
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

        self.accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.precision = torchmetrics.Precision(task="multiclass", average="macro", num_classes=num_classes)
        self.recall = torchmetrics.Recall(task="multiclass", average="macro", num_classes=num_classes)
        self.f1score = torchmetrics.F1Score(task="multiclass", num_classes=num_classes)

        self.model.layer1 = nn.Sequential(
            QuantizedBasicBlock(self.model.layer1[0]),
            QuantizedBasicBlock(self.model.layer1[1])
        )
        self.model.layer2 = nn.Sequential(
            QuantizedBasicBlock(self.model.layer2[0]),
            QuantizedBasicBlock(self.model.layer2[1])
        )
        self.model.layer3 = nn.Sequential(
            QuantizedBasicBlock(self.model.layer3[0]),
            QuantizedBasicBlock(self.model.layer3[1])
        )
        self.model.layer4 = nn.Sequential(
            QuantizedBasicBlock(self.model.layer4[0]),
            QuantizedBasicBlock(self.model.layer4[1])
        )
        
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()

    def forward(self, x):
        x = self.quant(x)
        x = self.model(x)
        x = self.dequant(x)
        return x

In [12]:
num_classes = 70
model = resModel(num_classes)
model.load_state_dict(torch.load('Weights/model_13.pth'))
model.eval()

model_fp32 = resModel(num_classes)
model_fp32.load_state_dict(torch.load('Weights/model_13.pth'))
model_fp32.eval()

resModel(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_runn

In [5]:
def skip_add_quantization(m):
    if isinstance(m, models.resnet.BasicBlock) and hasattr(m, 'downsample') and m.downsample is not None:
        m.qconfig = None

model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
model.apply(skip_add_quantization)

model_prepared = prepare(model, inplace=True)

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

full_dataset = datasets.ImageFolder(root='dataset/train', transform=transform)

num_calibration = int(0.1 * len(full_dataset))
num_rest = len(full_dataset) - num_calibration
calibration_dataset, _ = random_split(full_dataset, [num_calibration, num_rest])

calibration_loader = DataLoader(calibration_dataset, batch_size=32, shuffle=True)

model_prepared.eval()
with torch.no_grad():
    for data, _ in tqdm(calibration_loader, desc="Calibrating", leave=True):
        data = data.to('cpu')
        model_prepared(data)

        
model_prepared.to('cpu')

model_quantized = convert(model_prepared, inplace=False)

torch.save(model_quantized.state_dict(), 'Weights/model_quantized_13.pth')

Calibrating: 100%|█████████████████████████████████████████████████████████████████████| 25/25 [00:41<00:00,  1.68s/it]


In [6]:
# test_transform = transforms.Compose([
#     transforms.Resize((256, 256)),
#     transforms.ToTensor()
# ])

# image_path = 'dataset/valid/American  Spaniel/07.jpg'
# image = Image.open(image_path).convert('RGB')

# image = test_transform(image).unsqueeze(0)


# model_quantized.eval()

# device = torch.device('cpu')
# model_quantized.to(device)
# image = image.to(device)

# with torch.no_grad():
#     output = model_quantized(image)
#     prediction = torch.argmax(output, dim=1)

# print(prediction.item())

In [7]:
model_quantized.eval()



resModel(
  (model): ResNet(
    (conv1): QuantizedConv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), scale=0.10969597846269608, zero_point=52, padding=(3, 3), bias=False)
    (bn1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): QuantizedBasicBlock(
        (conv1): QuantizedConv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.19458085298538208, zero_point=78, padding=(1, 1), bias=False)
        (bn1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): QuantizedConv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.07432597130537033, zero_point=77, padding=(1, 1), bias=False)
        (bn2): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (add): Q

In [10]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from Model.Schema import resModel
import wandb
from tqdm import tqdm

test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])
test_dataset = datasets.ImageFolder(root='dataset/test', transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

def test_model(model, test_loader, device):
    model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in tqdm(test_loader, desc="Testing", leave=True):
            data, target = data.to(device), target.to(device)

            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    test_accuracy = 100 * correct / total
    return test_accuracy

device= torch.device('cpu')
model_quantized.to(device)


resModel(
  (model): ResNet(
    (conv1): QuantizedConv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), scale=0.10969597846269608, zero_point=52, padding=(3, 3), bias=False)
    (bn1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): QuantizedBasicBlock(
        (conv1): QuantizedConv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.19458085298538208, zero_point=78, padding=(1, 1), bias=False)
        (bn1): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): QuantizedConv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), scale=0.07432597130537033, zero_point=77, padding=(1, 1), bias=False)
        (bn2): QuantizedBatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (add): Q

In [13]:
import time

start_time_q = time.time()
accuracy_q = test_model(model_quantized, test_loader, device)
end_time_q = time.time()


print(f"Test Accuracy of the static quantized model: {accuracy_q}%")
print(f"Time taken to test the static quantized model: {end_time_q - start_time_q} seconds")



start_time_s = time.time()
accuracy_s = test_model(model_fp32, test_loader, device)
end_time_s = time.time()

print(f"Test Accuracy of the standard model: {accuracy_s}%")
print(f"Time taken to test the standard model: {end_time_s - start_time_s} seconds")

Testing: 100%|█████████████████████████████████████████████████████████████████████████| 11/11 [00:10<00:00,  1.07it/s]


Test Accuracy of the static quantized model: 86.14285714285714%
Time taken to test the static quantized model: 10.329203367233276 seconds


Testing: 100%|█████████████████████████████████████████████████████████████████████████| 11/11 [00:17<00:00,  1.56s/it]

Test Accuracy of the standard model: 86.42857142857143%
Time taken to test the standard model: 17.2046480178833 seconds



