<a href="https://colab.research.google.com/github/lrakotoarivony/Micronet_Challenge/blob/main/Project_Model_Cifar10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data & Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
import numpy as np

In [2]:
n_classes_cifar10 = 10
train_size = 0.8
R = 5


# Download the entire CIFAR10 dataset

from torchvision.datasets import CIFAR10
import numpy as np 
from torch.utils.data import Subset
from torch.utils.data.sampler import SubsetRandomSampler


import torchvision.transforms as transforms

## Normalization is different when training from scratch and when training using an imagenet pretrained backbone

normalize_scratch = transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))


# Data augmentation is needed in order to train from scratch
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    normalize_scratch,
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    normalize_scratch,
])



### The data from CIFAR10 will be downloaded in the following dataset
rootdir = './data/cifar10'

c10train = CIFAR10(rootdir,train=True,download=True,transform=transform_train)
c10test = CIFAR10(rootdir,train=False,download=True,transform=transform_test)



# CIFAR10 is sufficiently large so that training a model up to the state of the art performance will take approximately 3 hours on the 1060 GPU available on your machine. 


def train_validation_split(train_size, num_train_examples):
    # obtain training indices that will be used for validation
    indices = list(range(num_train_examples))
    np.random.shuffle(indices)
    idx_split = int(np.floor(train_size * num_train_examples))
    train_index, valid_index = indices[:idx_split], indices[idx_split:]

    # define samplers for obtaining training and validation batches
    train_sampler = SubsetRandomSampler(train_index)
    valid_sampler = SubsetRandomSampler(valid_index)

    return train_sampler,valid_sampler

def generate_subset(dataset,n_classes,reducefactor,n_ex_class_init):

    nb_examples_per_class = int(np.floor(n_ex_class_init / reducefactor))
    # Generate the indices. They are the same for each class, could easily be modified to have different ones. But be careful to keep the random seed! 

    indices_split = np.random.RandomState(seed=42).choice(n_ex_class_init,nb_examples_per_class,replace=False)

    all_indices = []
    for curclas in range(n_classes):
        curtargets = np.where(np.array(dataset.targets) == curclas)
        indices_curclas = curtargets[0]
        indices_subset = indices_curclas[indices_split]
        #print(len(indices_subset))
        all_indices.append(indices_subset)
    all_indices = np.hstack(all_indices)
    
    return Subset(dataset,indices=all_indices)
    


### These dataloader are ready to be used to train for scratch 
cifar10_train= generate_subset(dataset=c10train,n_classes=n_classes_cifar10,reducefactor=R,n_ex_class_init=5000)
num_train_examples=len(cifar10_train)
train_sampler,valid_sampler=train_validation_split(train_size, num_train_examples)

cifar10_test = generate_subset(dataset=c10test,n_classes=n_classes_cifar10,reducefactor=1,n_ex_class_init=1000) 



Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar10/cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=0.0, max=170498071.0), HTML(value='')))


Extracting ./data/cifar10/cifar-10-python.tar.gz to ./data/cifar10
Files already downloaded and verified


In [3]:
#from minicifar import minicifar_train,minicifar_test,train_sampler,valid_sampler
from torch.utils.data.dataloader import DataLoader

trainloader = DataLoader(c10train,batch_size=64,sampler=train_sampler)
validloader = DataLoader(c10train,batch_size=64,sampler=valid_sampler)
testloader = DataLoader(c10test,batch_size=64) 

# Device

In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device '+str(device))

Using device cuda


# Function

In [6]:
def training(train_loader, valid_loader, model, criterion, optimizer,n_epochs=10):
    
    train_losses, valid_losses, train_acc, valid_acc  = [], [], [], []
    # initialize tracker for minimum validation loss
    valid_loss_min = np.Inf  # set initial "min" to infinity
    
    for epoch in range(n_epochs):
        train_loss, valid_loss = 0, 0 # monitor losses
        class_correct_train ,class_total_train = 0, 0 
        class_correct_valid ,class_total_valid = 0, 0 
        

        # train the model
        model.train() # prep model for training
        for data, label in train_loader:
            data = data.to(device=device, dtype=torch.float32)
            label = label.to(device=device, dtype=torch.long)
            optimizer.zero_grad() # clear the gradients of all optimized variables
            output = model(data) # forward pass: compute predicted outputs by passing inputs to the model
            loss = criterion(output, label) # calculate the loss
            loss.backward() # backward pass: compute gradient of the loss with respect to model parameters
            optimizer.step() # perform a single optimization step (parameter update)
            train_loss += loss.item() * data.size(0) # update running training loss

            _, pred = torch.max(output, 1)
            correct = np.squeeze(pred.eq(label.data.view_as(pred)))
            for i in range(len(label)):
                digit = label.data[i]
                class_correct_train += correct[i].item()
                class_total_train += 1
            

        # validate the model
        model.eval()
        for data, label in valid_loader:
            data = data.to(device=device, dtype=torch.float32)
            label = label.to(device=device, dtype=torch.long)
            with torch.no_grad():
                output = model(data)
            loss = criterion(output,label)
            valid_loss += loss.item() * data.size(0)

            _, pred = torch.max(output, 1)
            correct = np.squeeze(pred.eq(label.data.view_as(pred)))
            for i in range(len(label)):
                digit = label.data[i]
                class_correct_valid += correct[i].item()
                class_total_valid += 1
        



        # calculate average loss over an epoch
        train_loss /= len(train_loader.sampler)
        valid_loss /= len(valid_loader.sampler)

        train_losses.append(train_loss)
        valid_losses.append(valid_loss)


        train_acc.append(class_correct_train/class_total_train)
        valid_acc.append(class_correct_valid/class_total_valid)


        print('epoch: {} \ttraining Loss: {:.6f} \tvalidation Loss: {:.6f}'.format(epoch+1, train_loss, valid_loss))

        # save model if validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
            valid_loss_min,
            valid_loss))
            torch.save(model.state_dict(), 'model_densenet121_0.1.pt')
            valid_loss_min = valid_loss
            
        #scheduler.step()
        print('lr : {} for epochs : {}'.format(optimizer.param_groups[0]['lr'],epoch))

    return train_losses, valid_losses,  train_acc, valid_acc

In [7]:
class_names = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [8]:
def evaluation(model, test_loader, criterion): 

    test_loss = 0.0
    class_correct = list(0. for i in range(10))
    class_total = list(0. for i in range(10))

    model.eval()
    for data, label in test_loader:
        data = data.to(device=device, dtype=torch.float32)
        label = label.to(device=device, dtype=torch.long)
        with torch.no_grad():
            output = model(data)
        loss = criterion(output, label)
        test_loss += loss.item()*data.size(0)
        _, pred = torch.max(output, 1)
        correct = np.squeeze(pred.eq(label.data.view_as(pred)))
        for i in range(len(label)):
            digit = label.data[i]
            class_correct[digit] += correct[i].item()
            class_total[digit] += 1

    test_loss = test_loss/len(test_loader.sampler)
    print('test Loss: {:.6f}\n'.format(test_loss))
    for i in range(10):

        if(np.sum(class_total[i])==0):
            print(class_names[i])
        else:
            print('test accuracy of %s: %2d%% (%2d/%2d)' % (class_names[i], 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
    print('\ntest accuracy (overall): %2.2f%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))

# Model

In [None]:
#import wget
!wget.download('https://raw.githubusercontent.com/kuangliu/pytorch-cifar/master/models/densenet.py')
from densenet import *

/bin/bash: -c: line 0: syntax error near unexpected token `'https://raw.githubusercontent.com/kuangliu/pytorch-cifar/master/models/densenet.py''
/bin/bash: -c: line 0: `wget.download('https://raw.githubusercontent.com/kuangliu/pytorch-cifar/master/models/densenet.py')'


ModuleNotFoundError: ignored

In [None]:
!wget https://raw.githubusercontent.com/kuangliu/pytorch-cifar/master/models/densenet.py
from densenet import *

--2021-03-07 14:49:29--  https://raw.githubusercontent.com/kuangliu/pytorch-cifar/master/models/densenet.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3542 (3.5K) [text/plain]
Saving to: ‘densenet.py’


2021-03-07 14:49:29 (58.5 MB/s) - ‘densenet.py’ saved [3542/3542]



In [9]:
'''DenseNet in PyTorch.'''
import math

import torch
import torch.nn as nn
import torch.nn.functional as F


class Bottleneck(nn.Module):
    def __init__(self, in_planes, growth_rate):
        super(Bottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(4*growth_rate)
        self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        out = self.conv1(F.relu(self.bn1(x)))
        out = self.conv2(F.relu(self.bn2(out)))
        out = torch.cat([out,x], 1)
        return out

class Bottleneck_Quant(nn.Module):
    def __init__(self, in_planes, growth_rate):
        super(Bottleneck_Quant, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = QuantConv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(4*growth_rate)
        self.conv2 = QuantConv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        out = self.conv1(F.relu(self.bn1(x)))
        out = self.conv2(F.relu(self.bn2(out)))
        out = torch.cat([out,x], 1)
        return out


class Transition(nn.Module):
    def __init__(self, in_planes, out_planes):
        super(Transition, self).__init__()
        self.bn = nn.BatchNorm2d(in_planes)
        self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False)

    def forward(self, x):
        out = self.conv(F.relu(self.bn(x)))
        out = F.avg_pool2d(out, 2)
        return out

class Transition_Quant(nn.Module):
    def __init__(self, in_planes, out_planes):
        super(Transition_Quant, self).__init__()
        self.bn = nn.BatchNorm2d(in_planes)
        self.conv = QuantConv2d(in_planes, out_planes, kernel_size=1, bias=False)

    def forward(self, x):
        out = self.conv(F.relu(self.bn(x)))
        out = F.avg_pool2d(out, 2)
        return out


class DenseNet(nn.Module):
    def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
        super(DenseNet, self).__init__()
        self.growth_rate = growth_rate

        num_planes = 2*growth_rate
        self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False)

        self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
        num_planes += nblocks[0]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans1 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
        num_planes += nblocks[1]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans2 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
        num_planes += nblocks[2]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans3 = Transition(num_planes, out_planes)
        num_planes = out_planes

        self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
        num_planes += nblocks[3]*growth_rate

        self.bn = nn.BatchNorm2d(num_planes)
        self.linear = nn.Linear(num_planes, num_classes)

    def _make_dense_layers(self, block, in_planes, nblock):
        layers = []
        for i in range(nblock):
            layers.append(block(in_planes, self.growth_rate))
            in_planes += self.growth_rate
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.trans1(self.dense1(out))
        out = self.trans2(self.dense2(out))
        out = self.trans3(self.dense3(out))
        out = self.dense4(out)
        out = F.avg_pool2d(F.relu(self.bn(out)), 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

    def show_params(self):
        for m in self.modules():
            if isinstance(m, QuantConv2d):
                m.show_params()

class DenseNet_Quant(nn.Module):
    def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
        super(DenseNet_Quant, self).__init__()
        self.growth_rate = growth_rate

        num_planes = 2*growth_rate
        self.conv1 = QuantConv2d(3, num_planes, kernel_size=3, padding=1, bias=False)

        self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
        num_planes += nblocks[0]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans1 = Transition_Quant(num_planes, out_planes)
        num_planes = out_planes

        self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
        num_planes += nblocks[1]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans2 = Transition_Quant(num_planes, out_planes)
        num_planes = out_planes

        self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
        num_planes += nblocks[2]*growth_rate
        out_planes = int(math.floor(num_planes*reduction))
        self.trans3 = Transition_Quant(num_planes, out_planes)
        num_planes = out_planes

        self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
        num_planes += nblocks[3]*growth_rate

        self.bn = nn.BatchNorm2d(num_planes)
        self.linear = nn.Linear(num_planes, num_classes)

    def _make_dense_layers(self, block, in_planes, nblock):
        layers = []
        for i in range(nblock):
            layers.append(block(in_planes, self.growth_rate))
            in_planes += self.growth_rate
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.trans1(self.dense1(out))
        out = self.trans2(self.dense2(out))
        out = self.trans3(self.dense3(out))
        out = self.dense4(out)
        out = F.avg_pool2d(F.relu(self.bn(out)), 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

    def show_params(self):
        for m in self.modules():
            if isinstance(m, QuantConv2d):
                m.show_params()


def DenseNet121():
    return DenseNet(Bottleneck, [6,12,24,16], growth_rate=32)

def DenseNet169():
    return DenseNet(Bottleneck, [6,12,32,32], growth_rate=32)

def DenseNet201():
    return DenseNet(Bottleneck, [6,12,48,32], growth_rate=32)

def DenseNet161():
    return DenseNet(Bottleneck, [6,12,36,24], growth_rate=48)

def densenet_cifar():
    return DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)

def densenet_cifar_quant():
    return DenseNet_Quant(Bottleneck_Quant, [6,12,24,16], growth_rate=12)

def test():
    net = densenet_cifar()
    x = torch.randn(1,3,32,32)
    y = net(x)
    print(y)

# test()


In [11]:
#model = DenseNet121()
model = densenet_cifar()
model.to(device=device)

DenseNet_Quant(
  (conv1): QuantConv2d(
    3, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False
    (weight_quant): weight_quantize_fn()
  )
  (dense1): Sequential(
    (0): Bottleneck_Quant(
      (bn1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): QuantConv2d(
        24, 48, kernel_size=(1, 1), stride=(1, 1), bias=False
        (weight_quant): weight_quantize_fn()
      )
      (bn2): BatchNorm2d(48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): QuantConv2d(
        48, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False
        (weight_quant): weight_quantize_fn()
      )
    )
    (1): Bottleneck_Quant(
      (bn1): BatchNorm2d(36, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): QuantConv2d(
        36, 48, kernel_size=(1, 1), stride=(1, 1), bias=False
        (weight_quant): weight_quantize_fn()
      )
      (bn2): BatchNorm2d(48, eps=1

In [13]:
loaded_cpt=torch.load('model_densenet121_moinsepochs.pt')
model.load_state_dict(loaded_cpt)

<All keys matched successfully>

In [None]:
pytorch_total_params = sum(p.numel() for p in model_quant.parameters())
pytorch_total_params_training = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(pytorch_total_params)

1000850


In [None]:
optimizer = torch.optim.SGD(model.parameters(),lr = 0.01)
criterion = nn.CrossEntropyLoss()

In [None]:
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.SGD(model.parameters(),lr=0.1, momentum=0.9,weight_decay=5e-4) #weight_decay=1e-4

from torch.optim.lr_scheduler import MultiStepLR

scheduler = MultiStepLR(optimizer, milestones=[150, 250], gamma=0.1)
# scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

In [None]:
train_losses, valid_losses, train_acc, valid_acc = training(trainloader, validloader, model, criterion, optimizer,n_epochs=350)

In [18]:
evaluation(model, testloader, criterion)

test Loss: 0.218348

test accuracy of plane: 95% (952/1000)
test accuracy of car: 97% (976/1000)
test accuracy of bird: 91% (917/1000)
test accuracy of cat: 83% (837/1000)
test accuracy of deer: 95% (957/1000)
test accuracy of dog: 90% (902/1000)
test accuracy of frog: 94% (947/1000)
test accuracy of horse: 95% (950/1000)
test accuracy of ship: 94% (947/1000)
test accuracy of truck: 93% (933/1000)

test accuracy (overall): 93.18% (9318/10000)


In [None]:
torch.save(model.state_dict(), 'model_densenet121_v1.pt')
# 22

In [None]:
plt.figure(figsize=(10,10))

plt.subplot(3,1,1)
plt.plot(range(n_epochs), train_losses)
plt.plot(range(n_epochs), valid_losses)

plt.legend(['train', 'validation'], prop={'size': 10})
plt.title('loss function', size=10)
plt.xlabel('epoch', size=10)
plt.ylabel('loss value', size=10)

plt.subplot(3,1,3)
plt.plot(range(n_epochs), train_acc)
plt.plot(range(n_epochs), valid_acc)

plt.legend(['train', 'validation'], prop={'size': 10})
plt.title('accuracy', size=10)
plt.xlabel('epoch', size=10)
plt.ylabel('acc value', size=10)
plt.savefig("Densenet161_training_scratch.png")

# Quantization

In [10]:
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.nn.parameter import Parameter


# this function construct an additive pot quantization levels set, with clipping threshold = 1,
def build_power_value(B=2, additive=True):
    base_a = [0.]
    base_b = [0.]
    base_c = [0.]
    if additive:
        if B == 2:
            for i in range(3):
                base_a.append(2 ** (-i - 1))
        elif B == 4:
            for i in range(3):
                base_a.append(2 ** (-2 * i - 1))
                base_b.append(2 ** (-2 * i - 2))
        elif B == 6:
            for i in range(3):
                base_a.append(2 ** (-3 * i - 1))
                base_b.append(2 ** (-3 * i - 2))
                base_c.append(2 ** (-3 * i - 3))
        elif B == 3:
            for i in range(3):
                if i < 2:
                    base_a.append(2 ** (-i - 1))
                else:
                    base_b.append(2 ** (-i - 1))
                    base_a.append(2 ** (-i - 2))
        elif B == 5:
            for i in range(3):
                if i < 2:
                    base_a.append(2 ** (-2 * i - 1))
                    base_b.append(2 ** (-2 * i - 2))
                else:
                    base_c.append(2 ** (-2 * i - 1))
                    base_a.append(2 ** (-2 * i - 2))
                    base_b.append(2 ** (-2 * i - 3))
        else:
            pass
    else:
        for i in range(2 ** B - 1):
            base_a.append(2 ** (-i - 1))
    values = []
    for a in base_a:
        for b in base_b:
            for c in base_c:
                values.append((a + b + c))
    values = torch.Tensor(list(set(values)))
    values = values.mul(1.0 / torch.max(values))
    return values


def weight_quantization(b, grids, power=True):

    def uniform_quant(x, b):
        xdiv = x.mul((2 ** b - 1))
        xhard = xdiv.round().div(2 ** b - 1)
        return xhard

    def power_quant(x, value_s):
        shape = x.shape
        xhard = x.view(-1)
        value_s = value_s.type_as(x)
        idxs = (xhard.unsqueeze(0) - value_s.unsqueeze(1)).abs().min(dim=0)[1]  # project to nearest quantization level
        xhard = value_s[idxs].view(shape)
        # xout = (xhard - x).detach() + x
        return xhard

    class _pq(torch.autograd.Function):
        @staticmethod
        def forward(ctx, input, alpha):
            input.div_(alpha)                          # weights are first divided by alpha
            input_c = input.clamp(min=-1, max=1)       # then clipped to [-1,1]
            sign = input_c.sign()
            input_abs = input_c.abs()
            if power:
                input_q = power_quant(input_abs, grids).mul(sign)  # project to Q^a(alpha, B)
            else:
                input_q = uniform_quant(input_abs, b).mul(sign)
            ctx.save_for_backward(input, input_q)
            input_q = input_q.mul(alpha)               # rescale to the original range
            return input_q

        @staticmethod
        def backward(ctx, grad_output):
            grad_input = grad_output.clone()             # grad for weights will not be clipped
            input, input_q = ctx.saved_tensors
            i = (input.abs()>1.).float()
            sign = input.sign()
            grad_alpha = (grad_output*(sign*i + (input_q-input)*(1-i))).sum()
            return grad_input, grad_alpha

    return _pq().apply


class weight_quantize_fn(nn.Module):
    def __init__(self, w_bit, power=True):
        super(weight_quantize_fn, self).__init__()
        assert (w_bit <=5 and w_bit > 0) or w_bit == 32
        self.w_bit = w_bit-1
        self.power = power if w_bit>2 else False
        self.grids = build_power_value(self.w_bit, additive=True)
        self.weight_q = weight_quantization(b=self.w_bit, grids=self.grids, power=self.power)
        self.register_parameter('wgt_alpha', Parameter(torch.tensor(3.0)))

    def forward(self, weight):
        if self.w_bit == 32:
            weight_q = weight
        else:
            mean = weight.data.mean()
            std = weight.data.std()
            weight = weight.add(-mean).div(std)      # weights normalization
            weight_q = self.weight_q(weight, self.wgt_alpha)
        return weight_q


def act_quantization(b, grid, power=True):

    def uniform_quant(x, b=3):
        xdiv = x.mul(2 ** b - 1)
        xhard = xdiv.round().div(2 ** b - 1)
        return xhard

    def power_quant(x, grid):
        shape = x.shape
        xhard = x.view(-1)
        value_s = grid.type_as(x)
        idxs = (xhard.unsqueeze(0) - value_s.unsqueeze(1)).abs().min(dim=0)[1]
        xhard = value_s[idxs].view(shape)
        return xhard

    class _uq(torch.autograd.Function):
        @staticmethod
        def forward(ctx, input, alpha):
            input=input.div(alpha)
            input_c = input.clamp(max=1)
            if power:
                input_q = power_quant(input_c, grid)
            else:
                input_q = uniform_quant(input_c, b)
            ctx.save_for_backward(input, input_q)
            input_q = input_q.mul(alpha)
            return input_q

        @staticmethod
        def backward(ctx, grad_output):
            grad_input = grad_output.clone()
            input, input_q = ctx.saved_tensors
            i = (input > 1.).float()
            grad_alpha = (grad_output * (i + (input_q - input) * (1 - i))).sum()
            grad_input = grad_input*(1-i)
            return grad_input, grad_alpha

    return _uq().apply


class QuantConv2d(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=False):
        super(QuantConv2d, self).__init__(in_channels, out_channels, kernel_size, stride, padding, dilation, groups,
                                          bias)
        self.layer_type = 'QuantConv2d'
        self.bit = 4
        self.weight_quant = weight_quantize_fn(w_bit=self.bit, power=True)
        self.act_grid = build_power_value(self.bit, additive=True)
        self.act_alq = act_quantization(self.bit, self.act_grid, power=True)
        self.act_alpha = torch.nn.Parameter(torch.tensor(8.0))

    def forward(self, x):
        weight_q = self.weight_quant(self.weight)
        x = self.act_alq(x, self.act_alpha)
        return F.conv2d(x, weight_q, self.bias, self.stride,
                        self.padding, self.dilation, self.groups)

    def show_params(self):
        wgt_alpha = round(self.weight_quant.wgt_alpha.data.item(), 3)
        act_alpha = round(self.act_alpha.data.item(), 3)
        print('clipping threshold weight alpha: {:2f}, activation alpha: {:2f}'.format(wgt_alpha, act_alpha))


# 8-bit quantization for the first and the last layer
class first_conv(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=False):
        super(first_conv, self).__init__(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias)
        self.layer_type = 'FConv2d'

    def forward(self, x):
        max = self.weight.data.max()
        weight_q = self.weight.div(max).mul(127).round().div(127).mul(max)
        weight_q = (weight_q-self.weight).detach()+self.weight
        return F.conv2d(x, weight_q, self.bias, self.stride,
                        self.padding, self.dilation, self.groups)

class last_fc(nn.Linear):
    def __init__(self, in_features, out_features, bias=True):
        super(last_fc, self).__init__(in_features, out_features, bias)
        self.layer_type = 'LFC'

    def forward(self, x):
        max = self.weight.data.max()
        weight_q = self.weight.div(max).mul(127).round().div(127).mul(max)
        weight_q = (weight_q-self.weight).detach()+self.weight
        return F.linear(x, weight_q, self.bias)

In [None]:
model_quant = densenet_cifar_quant()
model_quant.to(device=device)

In [14]:
loaded_cpt_clone = loaded_cpt.copy()
for key in loaded_cpt.keys():
  if "conv" in key:
    #print(key)
    loaded_cpt_clone[key.replace("weight","act_alpha")] = torch.nn.Parameter(torch.tensor(8.0))
    loaded_cpt_clone[key.replace("weight","weight_quant.wgt_alpha")] = Parameter(torch.tensor(3.0))

In [15]:
model_quant.load_state_dict(loaded_cpt_clone)

<All keys matched successfully>

In [None]:
module1 = model.dense1[0].conv1
print(module1)
#.weight_quant.wgt_alpha.data.item()
print(module1.weight_quant(module1.weight))
print(module1.weight)
#print(module1.weight)

QuantConv2d(
  24, 48, kernel_size=(1, 1), stride=(1, 1), bias=False
  (weight_quant): weight_quantize_fn()
)
tensor([[[[-1.8000]],

         [[-0.3000]],

         [[-0.9000]],

         ...,

         [[-0.3000]],

         [[ 1.2000]],

         [[ 0.9000]]],


        [[[ 0.9000]],

         [[ 1.8000]],

         [[ 0.3000]],

         ...,

         [[ 0.9000]],

         [[ 0.0000]],

         [[ 0.0000]]],


        [[[-0.0000]],

         [[ 1.2000]],

         [[ 0.9000]],

         ...,

         [[-0.6000]],

         [[-0.9000]],

         [[ 0.3000]]],


        ...,


        [[[-0.3000]],

         [[ 0.6000]],

         [[-1.2000]],

         ...,

         [[ 1.2000]],

         [[-0.0000]],

         [[ 0.0000]]],


        [[[-0.9000]],

         [[-0.3000]],

         [[-0.0000]],

         ...,

         [[ 1.2000]],

         [[-0.0000]],

         [[ 1.2000]]],


        [[[-1.8000]],

         [[ 0.6000]],

         [[ 0.3000]],

         ...,

         [[ 0.60

In [None]:
bit = 5
for m in model.modules():
  if isinstance(m, QuantConv2d):
    m.weight_quant = weight_quantize_fn(w_bit=bit)
    print(m.weight_quant(m.weight))
    m.act_grid = build_power_value(bit)
    m.act_alq = act_quantization(bit, m.act_grid)

[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
          [ 0.3750, -0.0625, -0.7500],
          [ 1.1250,  0.7500,  0.2500]],

         ...,

         [[ 0.3750, -1.1250,  0.2500],
          [ 0.1250,  1.5000,  1.5000],
          [ 1.1250,  1.0000,  0.7500]],

         [[-0.1875, -0.7500,  0.3750],
          [ 0.5625, -1.1250,  0.0000],
          [-1.0000,  0.7500, -1.5000]],

         [[-1.1250,  0.1250,  0.3750],
          [-1.5000,  1.1250,  1.5000],
          [ 1.5000,  0.2500,  0.3750]]],


        [[[-1.0000,  0.7500,  1.5000],
          [-0.1250, -1.5000,  0.1875],
          [-1.0000,  1.0000, -1.5000]],

         [[ 0.3750,  1.1250,  0.5000],
          [-0.7500, -1.0000, -0.3750],
          [-0.2500, -1.5000,  0.7500]],

         [[ 0.7500, -1.0000,  1.1250],
          [ 1.1250, -1.5000,  1.5000],
          [-0.5000, -0.5000,  1.5000]],

         ...,

         [[-0.7500, -0.3750,  0.5000],
          [-1.5000,  0.1875, -0.1250],
    

In [24]:
optimizer = torch.optim.SGD(model_quant.parameters(),lr = 0.001)
criterion = nn.CrossEntropyLoss()

In [20]:
train_losses, valid_losses, train_acc, valid_acc = training(trainloader, validloader, model_quant, criterion, optimizer,n_epochs=350)

epoch: 1 	training Loss: 1.162761 	validation Loss: 1.066015
validation loss decreased (inf --> 1.066015).  Saving model ...
lr : 0.01 for epochs : 0
epoch: 2 	training Loss: 0.884332 	validation Loss: 0.871543
validation loss decreased (1.066015 --> 0.871543).  Saving model ...
lr : 0.01 for epochs : 1
epoch: 3 	training Loss: 0.779017 	validation Loss: 0.804190
validation loss decreased (0.871543 --> 0.804190).  Saving model ...
lr : 0.01 for epochs : 2
epoch: 4 	training Loss: 0.713184 	validation Loss: 0.717825
validation loss decreased (0.804190 --> 0.717825).  Saving model ...
lr : 0.01 for epochs : 3
epoch: 5 	training Loss: 0.685637 	validation Loss: 0.715333
validation loss decreased (0.717825 --> 0.715333).  Saving model ...
lr : 0.01 for epochs : 4
epoch: 6 	training Loss: 0.616280 	validation Loss: 0.711009
validation loss decreased (0.715333 --> 0.711009).  Saving model ...
lr : 0.01 for epochs : 5
epoch: 7 	training Loss: 0.601006 	validation Loss: 0.678774
validation los

KeyboardInterrupt: ignored

In [22]:
loaded_cpt=torch.load('model_densenet121_0.1.pt')
model_quant.load_state_dict(loaded_cpt)

<All keys matched successfully>

In [23]:
evaluation(model_quant, testloader, criterion)

test Loss: 0.437395

test accuracy of plane: 85% (859/1000)
test accuracy of car: 95% (955/1000)
test accuracy of bird: 80% (804/1000)
test accuracy of cat: 73% (738/1000)
test accuracy of deer: 85% (855/1000)
test accuracy of dog: 75% (755/1000)
test accuracy of frog: 90% (904/1000)
test accuracy of horse: 88% (889/1000)
test accuracy of ship: 90% (905/1000)
test accuracy of truck: 91% (917/1000)

test accuracy (overall): 85.81% (8581/10000)


# BWN

In [None]:
import torch.nn as nn
import numpy
from torch.autograd import Variable


class BC():
    def __init__(self, model):

        # First we need to 
        # count the number of Conv2d and Linear
        # This will be used next in order to build a list of all 
        # parameters of the model 

        count_targets = 0
        for m in model.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                count_targets = count_targets + 1

        start_range = 0
        end_range = count_targets-1
        self.bin_range = numpy.linspace(start_range,
                end_range, end_range-start_range+1)\
                        .astype('int').tolist()

        # Now we can initialize the list of parameters

        self.num_of_params = len(self.bin_range)
        self.saved_params = [] # This will be used to save the full precision weights
        
        self.target_modules = [] # this will contain the list of modules to be modified

        self.model = model # this contains the model that will be trained and quantified

        ### This builds the initial copy of all parameters and target modules
        index = -1
        for m in model.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
                index = index + 1
                if index in self.bin_range:
                    tmp = m.weight.data.clone()
                    self.saved_params.append(tmp)
                    self.target_modules.append(m.weight)


    def save_params(self):

        ### This loop goes through the list of target modules, and saves the corresponding weights into the list of saved_parameters

        for index in range(self.num_of_params):
            self.saved_params[index].copy_(self.target_modules[index].data)

    def binarization(self):

        ### To be completed
        ### (1) Save the current full precision parameters using the save_params method

#         self.save_params()
#          ### (2) Binarize the weights in the model, by iterating through the list of target modules and overwrite the values with their binary version
#         for index in range(self.num_of_params):

#             self.target_modules[index].data.copy_((self.target_modules[index]>=0).float())  # Nous 
#             self.target_modules[index].data[self.target_modules[index].data==0.]=-1

            
            
        self.save_params()
        for index in range(self.num_of_params):
            self.target_modules[index].data.copy_(self.target_modules[index].data.sign()) # Le mec

    def BWN(self): # Binary Weight Network
        self.save_params()
        for index in range(self.num_of_params):
            E=self.target_modules[index].data.abs().mean()
            self.target_modules[index].data.copy_(self.target_modules[index].data.sign() *E)
            

    def restore(self):

        ### restore the copy from self.saved_params into the model 

        for index in range(self.num_of_params):
            self.target_modules[index].data.copy_(self.saved_params[index])
      
    def clip(self):

        ## To be completed 
        ## Clip all parameters to the range [-1,1] using Hard Tanh 
        ## you can use the nn.Hardtanh function
            
        clip_scale=[]
        m=nn.Hardtanh(-1, 1)
        for index in range(self.num_of_params):
            clip_scale.append(m(Variable(self.target_modules[index].data)))
        for index in range(self.num_of_params):
            self.target_modules[index].data.copy_(clip_scale[index].data)  # Le mec 


#         for index in range(self.num_of_params):
#             hardtanh = nn.Hardtanh()
#             self.target_modules[index].data.copy_(hardtanh(self.target_modules[index].data)) # Nous


    def forward(self,x):

        ### This function is used so that the model can be used while training
        out = self.model(x)
        return out

In [None]:
train_losses, valid_losses, train_acc, valid_acc = training_binary(n_epochs, trainloader, validloader, modelbc, criterion, optimizer_bc)

TypeError: ignored

In [None]:
def evaluation_binary(model, test_loader, criterion): 

  test_loss = 0.0
  class_correct = list(0. for i in range(10))
  class_total = list(0. for i in range(10))

  model.model.eval()
  #model.binarization()
  model.BWN()
  for data, label in test_loader:
      data = data.to(device=device, dtype=torch.float32)
      label = label.to(device=device, dtype=torch.long)
      #with torch.no_grad():
      output = model.forward(data)
      #print(output)
      loss = criterion(output, label)
      test_loss += loss.item()*data.size(0)
      _, pred = torch.max(output, 1)
      correct = np.squeeze(pred.eq(label.data.view_as(pred)))
      for i in range(len(label)):
          digit = label.data[i]
          class_correct[digit] += correct[i].item()
          class_total[digit] += 1

  test_loss = test_loss/len(test_loader.sampler)
  print('test Loss: {:.6f}\n'.format(test_loss))
  for i in range(10):

      if(np.sum(class_total[i])==0):
        print(class_names[i])
      else:
        print('test accuracy of %s: %2d%% (%2d/%2d)' % (class_names[i], 100 * class_correct[i] / class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])))
  print('\ntest accuracy (overall): %2.2f%% (%2d/%2d)' % (100. * np.sum(class_correct) / np.sum(class_total), np.sum(class_correct), np.sum(class_total)))

In [None]:
n_epochs = 100 # number of epochs to train the model

def training_binary(n_epochs, train_loader, valid_loader, model, criterion, optimizer):
  '''Method who train the GNN for n_epochs, return the different loss so we can plot it'''
  train_losses, valid_losses, train_acc, valid_acc = [], [], [], []
  # initialize tracker for minimum validation loss
  valid_loss_min = np.Inf  # set initial "min" to infinity



  for epoch in range(n_epochs):
    train_loss, valid_loss, test_loss = 0, 0, 0 # monitor losses
    class_correct_train ,class_total_train = 0, 0 
    class_correct_valid ,class_total_valid = 0, 0 
    class_correct_test ,class_total_test = 0, 0 


    # train the model
    model.model.train() # prep model for training
    for data, label in train_loader:
        data = data.to(device=device, dtype=torch.float32)
#         data=data.half()
        label = label.to(device=device, dtype=torch.long)
        #model.binarization()
        model.BWN()
        optimizer.zero_grad()
    
         # clear the gradients of all optimized variables
        
        
        output = model.forward(data) # forward pass: compute predicted outputs by passing inputs to the model
        loss = criterion(output, label) # calculate the loss
        
        loss.backward() # backward pass: compute gradient of the loss with respect to model parameters
        model.restore()
        optimizer.step() # perform a single optimization step (parameter update)
        model.clip()
        
        train_loss += loss.item() * data.size(0) # update running training loss

        _, pred = torch.max(output, 1)
        correct = np.squeeze(pred.eq(label.data.view_as(pred)))
        for i in range(len(label)):
            digit = label.data[i]
            class_correct_train += correct[i].item()
            class_total_train += 1
        

    # validate the model
    model.model.eval()
    #model.binarization()
    model.BWN()
    for data, label in valid_loader:
        data = data.to(device=device, dtype=torch.float32)
        label = label.to(device=device, dtype=torch.long)
        with torch.no_grad():
            output = model.model(data)
        loss = criterion(output,label)
        valid_loss += loss.item() * data.size(0)

        _, pred = torch.max(output, 1)
        correct = np.squeeze(pred.eq(label.data.view_as(pred)))
        for i in range(len(label)):
            digit = label.data[i]
            class_correct_valid += correct[i].item()
            class_total_valid += 1
    model.restore()

    # calculate average loss over an epoch
    train_loss /= len(train_loader.sampler)
    valid_loss /= len(valid_loader.sampler)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)

    train_acc.append(class_correct_train/class_total_train)
    valid_acc.append(class_correct_valid/class_total_valid)

    print('epoch: {} \ttraining Loss: {:.6f} \tvalidation Loss: {:.6f}'.format(epoch+1, train_loss, valid_loss))

    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        valid_loss))
        torch.save(model.model.state_dict(), 'model_binary.pt')
        valid_loss_min = valid_loss

  return train_losses, valid_losses, train_acc, valid_acc

In [None]:
modelbc = BC(model)
modelbc.model = modelbc.model.to(device)
criterion = nn.CrossEntropyLoss()

optimizer_bc = torch.optim.SGD(modelbc.model.parameters(),lr = 0.00001)