In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch

from torch.nn.parameter import Parameter
from torch.nn.modules.module import Module
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.nn.parameter import Parameter
import random
import numpy as np

def weight_quantization(b):

    def uniform_quant(x, b):
        xdiv = x.mul((2 ** b - 1))
        xhard = xdiv.round().div(2 ** b - 1)
        return xhard

    class _pq(torch.autograd.Function):
        @staticmethod
        def forward(ctx, input, alpha):
            input.div_(alpha)
            input_c = input.clamp(min=-1, max=1)
            sign = input_c.sign()
            input_abs = input_c.abs()
            input_q = uniform_quant(input_abs, b).mul(sign)
            ctx.save_for_backward(input, input_q)
            input_q = input_q.mul(alpha)
            return input_q

        @staticmethod
        def backward(ctx, grad_output):
            grad_input = grad_output.clone()
            input, input_q = ctx.saved_tensors
            i = (input.abs()>1.).float()
            sign = input.sign()
            grad_alpha = (grad_output*(sign*i + (0.0)*(1-i))).sum()
            grad_input = grad_input*(1-i)
            return grad_input, grad_alpha

    return _pq().apply


class weight_quantize_fn(nn.Module):
    def __init__(self, w_bit):
        super(weight_quantize_fn, self).__init__()
        self.w_bit = w_bit-1
        self.weight_q = weight_quantization(b=self.w_bit)
        self.register_parameter('wgt_alpha', Parameter(torch.tensor(4.0)))

    def forward(self, weight):
        mean = weight.data.mean()
        std = weight.data.std()
        weight = weight.add(-mean).div(std)
        weight_q = self.weight_q(weight, self.wgt_alpha)

        return weight_q


def act_quantization(b):

    def uniform_quant(x, b=4):
        xdiv = x.mul(2 ** b - 1)
        xhard = xdiv.round().div(2 ** b - 1)
        return xhard

    class _uq(torch.autograd.Function):
        @staticmethod
        def forward(ctx, input, alpha):
            input=input.div(alpha)
            input_c = input.clamp(max=1)
            input_q = uniform_quant(input_c, b)
            ctx.save_for_backward(input, input_q)
            input_q = input_q.mul(alpha)
            return input_q

        @staticmethod
        def backward(ctx, grad_output):
            grad_input = grad_output.clone()
            input, input_q = ctx.saved_tensors
            i = (input > 1.).float()
            grad_alpha = (grad_output * (i + (0.0)*(1-i))).sum()
            grad_input = grad_input*(1-i)
            return grad_input, grad_alpha

    return _uq().apply

class QConv2D(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=False):
        super(QConv2D, self).__init__(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias)

        self.bits = 8

        self.act_quant = act_quantization(self.bits)
        self.act_alpha = torch.nn.Parameter(torch.tensor(4.0))

        self.weight_quant = weight_quantize_fn(w_bit=self.bits)
        self.weight_quant_values = Parameter(torch.zeros([out_channels, in_channels//groups, kernel_size, kernel_size]))
        self.weight = Parameter(torch.zeros([out_channels, in_channels//groups, kernel_size, kernel_size]))



    def forward(self, x):

        weight_quant_values = self.weight_quant(self.weight)
        #print(self.weight.shape)
        self.weight_quant_values = torch.nn.Parameter(weight_quant_values)

        x_q  = self.act_quant(x,self.act_alpha)
        return F.conv2d(x_q, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups)


'''

def conv_1x1_bn(inp, oup):
    return nn.Sequential(
        QConv2D(inp, oup, kernel_size = 1, stride=1, padding = 0, groups=1, bias=False),
        nn.BatchNorm2d(oup),
        nn.ReLU6(inplace=True)
    )

'''

def conv_1x1_bn(inp, oup):
    return nn.Sequential(
        nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
        nn.BatchNorm2d(oup),
        nn.ReLU6(inplace=True)
    )
def conv_3x3_bn(inp, oup, stride):
    return nn.Sequential(
        nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
        nn.BatchNorm2d(oup),
        nn.ReLU6(inplace=True)
    )


class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio,i):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = round(inp * expand_ratio)
        self.use_res_connect = self.stride == 1 and inp == oup

        if expand_ratio == 1:
            if i > 9 and i < 15 :
                    self.conv = nn.Sequential(
                        # dw
                        QConv2D(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                        nn.BatchNorm2d(hidden_dim),
                        nn.ReLU6(inplace=True),
                        # pw-linear
                        QConv2D(hidden_dim, oup, 1, 1, 0, bias=False),
                        nn.BatchNorm2d(oup),
                    )
            else:
                    self.conv = nn.Sequential(
                        # dw
                        nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                        nn.BatchNorm2d(hidden_dim),
                        nn.ReLU6(inplace=True),
                        # pw-linear
                        nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                        nn.BatchNorm2d(oup),
                    )
        else:
            if i > 9 and i < 15 :
                      self.conv = nn.Sequential(
                          # pw
                          QConv2D(inp, hidden_dim, 1, 1, 0, bias=False),
                          nn.BatchNorm2d(hidden_dim),
                          nn.ReLU6(inplace=True),
                          # dw
                          QConv2D(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                          nn.BatchNorm2d(hidden_dim),
                          nn.ReLU6(inplace=True),
                          # pw-linear
                          QConv2D(hidden_dim, oup, 1, 1, 0, bias=False),
                          nn.BatchNorm2d(oup),
                      )
            else:
                      self.conv = nn.Sequential(
                          # pw
                          nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
                          nn.BatchNorm2d(hidden_dim),
                          nn.ReLU6(inplace=True),
                          # dw
                          nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                          nn.BatchNorm2d(hidden_dim),
                          nn.ReLU6(inplace=True),
                          # pw-linear
                          nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                          nn.BatchNorm2d(oup),
                      )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)

class MobileNetV2(nn.Module):
    def __init__(self, num_classes=10, width_mult=1.0):
        super(MobileNetV2, self).__init__()
        # setting of inverted residual blocks
        self.cfgs = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]

        input_channel = 32
        last_channel = 1280

        # first conv layer
        self.features = [conv_3x3_bn(3, input_channel, 2)]

        # inverted residual blocks
        for t, c, n, s in self.cfgs:
            output_channel = int(c * width_mult)
            for i in range(n):
                if i == 0:
                    self.features.append(InvertedResidual(input_channel, output_channel, s, t,i))
                else:
                    self.features.append(InvertedResidual(input_channel, output_channel, 1, t,i))
                input_channel = output_channel

        # last several layers
        self.features.append(conv_1x1_bn(input_channel, last_channel))
        self.features = nn.Sequential(*self.features)

        # classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(last_channel, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.mean([2, 3])  # global average pooling
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Creating an instance of MobileNetV2
model = MobileNetV2(num_classes=10)


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import models
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler

# Load and transform the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])



trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Splitting the training set for validation
validation_fraction = 0.1
num_train = len(trainset)
indices = list(range(num_train))
split = int(np.floor(validation_fraction * num_train))

np.random.shuffle(indices)
train_idx, validation_idx = indices[split:], indices[:split]

train_sampler = SubsetRandomSampler(train_idx)
validation_sampler = SubsetRandomSampler(validation_idx)


trainloader = torch.utils.data.DataLoader(trainset, sampler=train_sampler, batch_size=128, shuffle=False, num_workers=2)
validationloader = torch.utils.data.DataLoader(trainset, sampler=validation_sampler, batch_size=128, shuffle=False, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)


# Define the Model

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

weight_decay = 1e-4
epochs = 100

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=weight_decay)

def adjust_learning_rate(optimizer, epoch):
    adjust_list = [  30,60, 95  ]
    if epoch in adjust_list:
        for param_group in optimizer.param_groups:
            param_group['lr'] = param_group['lr'] * 0.1

# Function to calculate accuracy
def calculate_accuracy(loader, model):
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def calculate_batch_accuracy(outputs, labels):
    _, predicted = torch.max(outputs.data, 1)
    total = labels.size(0)
    correct = (predicted == labels).sum().item()
    return 100 * correct / total

# Training Loop
for epoch in range(epochs):  # loop over the dataset multiple times
    adjust_learning_rate(optimizer, epoch)
    model.train()
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        batch_accuracy = calculate_batch_accuracy(outputs, labels)
        print(f'Epoch: {epoch + 1}, Batch: {i + 1}, Loss: {loss.item():.4f}, Accuracy: {batch_accuracy:.2f}%')


    # Print statistics
    model.eval()
    train_accuracy = calculate_accuracy(trainloader, model)
    test_accuracy = calculate_accuracy(testloader, model)
    validation_accuracy = calculate_accuracy(validationloader, model)
    print(f'Epoch: {epoch + 1}, Loss: {running_loss / len(trainloader)}, Train Accuracy: {train_accuracy}%, Test Accuracy: {test_accuracy}%, Validation Accuracy: {validation_accuracy}%')

print('Finished Training')

# Save the Model
PATH = './cifar_mobilenet.pth'
torch.save(model.state_dict(), PATH)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:05<00:00, 29698682.00it/s]


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch: 86, Batch: 297, Loss: 0.0051, Accuracy: 100.00%
Epoch: 86, Batch: 298, Loss: 0.0084, Accuracy: 100.00%
Epoch: 86, Batch: 299, Loss: 0.0054, Accuracy: 100.00%
Epoch: 86, Batch: 300, Loss: 0.0085, Accuracy: 100.00%
Epoch: 86, Batch: 301, Loss: 0.0166, Accuracy: 100.00%
Epoch: 86, Batch: 302, Loss: 0.0114, Accuracy: 99.22%
Epoch: 86, Batch: 303, Loss: 0.0207, Accuracy: 99.22%
Epoch: 86, Batch: 304, Loss: 0.0282, Accuracy: 99.22%
Epoch: 86, Batch: 305, Loss: 0.0162, Accuracy: 99.22%
Epoch: 86, Batch: 306, Loss: 0.0060, Accuracy: 100.00%
Epoch: 86, Batch: 307, Loss: 0.0250, Accuracy: 99.22%
Epoch: 86, Batch: 308, Loss: 0.0072, Accuracy: 100.00%
Epoch: 86, Batch: 309, Loss: 0.0125, Accuracy: 100.00%
Epoch: 86, Batch: 310, Loss: 0.0050, Accuracy: 100.00%
Epoch: 86, Batch: 311, Loss: 0.0093, Accuracy: 100.00%
Epoch: 86, Batch: 312, Loss: 0.0065, Accuracy: 100.00%
Epoch: 86, Batch: 313, Loss: 0.0144, Accuracy: 99.22%
Epoch: