In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
import torch.nn.utils.prune as prune

import copy
import random

import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
from torch.quantization import QuantStub, DeQuantStub

# Глобальное прореживание нейронной сети (спарсификация)

Для того, чтобы начать оптимизировать размер сети, нам нужен инструментарий для удаления связей внутри нашей модели. Ниже представлена возможная реализация оберток на полносвязанный линейный и сверточный 2d слои. Идея прореживания в том, чтобы сгенирировать определенным образом бинарную маску, регулирующую какие веса мы оставляем, а какие будем отключать. Далее мы перемножим маску с весами слоя,оставляя самые важные, исходя из определенного правила при генерации этой маски.

Функция `weight_sparse` реализует алгоритм генерации такой маски исходя из абсолютного значения - задавая пороговое значение, вычисляем персентиль и далее зануляем только те веса, которые меньше этого значения.

In [None]:
class MaskedLinear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True):
        super(MaskedLinear, self).__init__(in_features, out_features, bias)
    
    def set_mask(self, mask):
        self.mask = torch.tensor(mask, requires_grad=False)
        self.weight.data = self.weight.data*self.mask.data
    
    def get_mask(self):
        return self.mask
    
    def forward(self, x):
        return F.linear(x, self.weight, self.bias)
        
        
class MaskedConv2d(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                 padding=0, dilation=1, groups=1, bias=True):
        super(MaskedConv2d, self).__init__(in_channels, out_channels, 
            kernel_size, stride, padding, dilation, groups, bias)
    
    def prune(self, mask):
        self.mask = torch.tensor(mask, requires_grad=False)
        self.weight.data = self.weight.data*self.mask.data
    
    def get_mask(self):
        return self.mask
    
    def forward(self, x):
        return F.conv2d(x, self.weight, self.bias, self.stride,
                        self.padding, self.dilation, self.groups)

def weight_sparse(model, pruning_perc):    
    all_weights = []
    for p in model.parameters():
        if len(p.data.size()) != 1:
            all_weights += list(p.cpu().data.abs().numpy().flatten())
    threshold = np.percentile(np.array(all_weights), pruning_perc)

    masks = []
    for p in model.parameters():
        if len(p.data.size()) != 1:
            pruned_inds = p.data.abs() > threshold
            masks.append(pruned_inds.float())
    return masks

Алгоритмы прунинга достаточно востребованы и поэтому в современных фреймворках уже представлен набор инструментов позволяющий проводить операцию прунинга. Ниже представлена архитектура нейронной сети ResNet18, адаптированная под низкое разрешение и решающая задачу классификации 10 классов. В методе `prune_unstructured` мы пробегаем по всем слоям нашей сети и применяем функцию прунинга, аналогичную представленной выше.

Метод `calc_weights` позволяет посчитать количество весов с учетом сгенерированных для прунинга масок.

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        skip_branch = self.shortcut(x)
        out += skip_branch
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet, self).__init__()
        block = BasicBlock
        num_blocks = [2, 2, 2, 2]
        self.in_planes = 64

        self.conv1 = nn.Conv2d(1, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=1)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.adaptive_avg_pool2d(out, 1)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out
    
    def prune_unstructured(self, rate):
        for name, module in self.named_modules():
            if isinstance(module, torch.nn.Conv2d):
                prune.l1_unstructured(module, name='weight', amount=rate)
    
    def prune_structured(self, rate):
        for name, module in self.named_modules():
            if isinstance(module, torch.nn.Conv2d):
                prune.ln_structured(module, name='weight', n=2, amount=rate, dim=1)

    def calc_weights(self):
        result = 0
        for name, module in self.named_modules():
            if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
                if hasattr(module, 'weight_mask'):
                    result += int(torch.sum(module.weight_mask.reshape(-1)).item())
                else:
                    result += len(module.weight.reshape(-1))
        return result

Далее реализуем простую функцию тренировки сети на тренировочных данных и функцию для тестирования на валидационных данных. Реализация этих функций поддерживает обучение и тестирование как на ЦПУ, так и на ГПУ.

In [None]:
def fit(model, train_loader, epoch_number=5, device='cuda'):
    optimizer = torch.optim.Adam(model.parameters())
    error = nn.CrossEntropyLoss()
    model.train()
    
    for epoch in range(epoch_number):
        correct = 0
        
        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
            var_X_batch = X_batch.to(device)
            var_y_batch = y_batch.to(device)
            
            optimizer.zero_grad()
            output = model(var_X_batch)
            loss = error(output, var_y_batch)
            loss.backward()
            optimizer.step()

            predicted = torch.max(output.data, 1)[1] 
            correct += (predicted == var_y_batch).sum()
            if batch_idx % 500 == 0:
                print('Epoch : {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t Accuracy:{:.3f}%'.format(
                    epoch, batch_idx*len(X_batch), len(train_loader.dataset), 
                    100.*batch_idx / len(train_loader), loss.data, 
                    float(correct*100) / float(BATCH_SIZE*(batch_idx+1))))
                
                
def evaluate(model, loader, device='cuda'):
    correct = 0
    model.eval() 
    for test_imgs, test_labels in loader:
        test_imgs = test_imgs.to(device)
        test_labels = test_labels.to(device)
        
        output = model(test_imgs)
        predicted = torch.max(output,1)[1]
        correct += (predicted == test_labels).sum()
    print("Test accuracy:{:.3f}% ".format( float(correct) / (len(loader)*BATCH_SIZE)))

Просчитаем размер нашей сети. Изначальная сеть, ResNet18 имеет примерно 11.16 млн параметров.

In [None]:
model = ResNet()
model.calc_weights()

11163200

Далее подготовим данные для тренировки и валидации. Для наших целей будем использовать тот же набор данных, что и при реализации алгоритма дистиляции, датасет FashionMNIST, содержащий примрено 70 тысяч черно-белых изображений с разрешением 32х32

In [None]:
BATCH_SIZE = 32
EPOCH = 10
DEVICE = 'cuda'
SEED = 5

torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

In [None]:
train_data = torchvision.datasets.FashionMNIST('./', train=True, download=True, transform=transforms.Compose([transforms.ToTensor()]))
test_data = torchvision.datasets.FashionMNIST('./', train=False, download=True, transform=transforms.Compose([transforms.ToTensor()]))
train_loader = torch.utils.data.DataLoader(train_data, batch_size = BATCH_SIZE, shuffle = True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size = BATCH_SIZE, shuffle = True)

Первоначально обучим нашу нейронную сеть на данных перед применением алгоритма прунинга

In [None]:
model.to(DEVICE)
fit(model, train_loader, epoch_number=EPOCH, device=DEVICE)



In [None]:
evaluate(model, test_loader, device=DEVICE)

Test accuracy:0.932% 


Далее попробуем применить спарсификацию на полученной модели напрямую. Посмотрим на просадку в качестве

In [None]:
pruned_model = copy.deepcopy(model)
pruned_model.prune_unstructured(0.5)

Посмотрим сколько теперь параметров имеет наша сеть. Видно, что, действительно мы смогли убрать 50% весов. 

In [None]:
pruned_model.calc_weights()

5584160

Измерим качество модели после такого алгоритма спарсификации

In [None]:
evaluate(pruned_model, test_loader, device=DEVICE)

Test accuracy:0.909% 


Точность модели просела на 2.3%. Результат довльно слабый. Попробуем применить другой подход, сжать сеть еще больше и при этом не допустить сильной потери в качестве

# Итеративное прореживание

Идин из способов сжатия нейронных сетей - итеративное прореживание (Incremental Magnitude Pruning). Он достаточно ресурсоемкий, однако позволяет достаточно несложными методами добиться неплохого результата. Здесь используется более хитрый подход.

Будем идти с шагом, каждый раз будем отключать внутри сети несколько десятков процентов связей. После отключения, оставшиеся веса дообучим на всех данных используя одну эпоху. Ожидается, что так как мы выкинули за один раз не очень много, то оставшиеся связи "перехватят" ответственность тех слабых, которые мы только что отключили.

Таким образом за P таких итераций мы выкинем желаемое количество сети и не должны при этом потерять сильно в качестве.

Мы составим расписание для сети в виде списка и напишем более умную функцию тренировки

In [None]:
def smart_prune_shed(model, train_loader, schedule, device='cpu'):
    optimizer = torch.optim.Adam(model.parameters())
    error = nn.CrossEntropyLoss()
    model.train()
    
    for rate, epochs in schedule:  # Идем шагами, согласно тому расписанию, которое передали в функцию
        t = rate/100  # Считаем очередное пороговое значение
        model.prune_unstructured(t)  # Отключаем слабые связи
        for i in range(epochs):
            correct = 0
            for batch_idx, (X_batch, y_batch) in enumerate(train_loader):  # Далее дообучаем модель как обычно в течение указанного количества эпох
                var_X_batch = X_batch.to(device)
                var_y_batch = y_batch.to(device)
                optimizer.zero_grad()
                output = model(var_X_batch)
                loss = error(output, var_y_batch)
                loss.backward()
                optimizer.step()

                predicted = torch.max(output.data, 1)[1] 
                correct += (predicted == var_y_batch).sum()
                if batch_idx % 500 == 0:
                    print('Rate : {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t Accuracy:{:.3f}%'.format(
                        rate, batch_idx*len(X_batch), len(train_loader.dataset), 100.*batch_idx / len(train_loader), loss.data, float(correct*100) / float(BATCH_SIZE*(batch_idx+1))))
    return model

In [None]:
pruned_model = copy.deepcopy(model)
pruned_model_70 = smart_prune_shed(pruned_model, train_loader, [
    (50, 1), 
    (20, 1), 
    (10, 1), 
    (10, 1), 
    (10, 1), 
], DEVICE)



Подобным расписанием мы смогли сжать сеть примерно на 70%, посмотрим на полученное качество

In [None]:
evaluate(pruned_model_70, test_loader, device=DEVICE)

Test accuracy:0.932% 


Попробуем сжать еще сильнее. Выкинем 90% сети.

In [None]:
pruned_model_90 = smart_prune_shed(pruned_model_70, train_loader, [
    (20, 1), 
    (20, 1), 
    (20, 1), 
    (10, 1), 
    (10, 1), 
    (10, 1), 
    (5, 1), 
], DEVICE)



In [None]:
evaluate(pruned_model, test_loader, device=DEVICE)

Test accuracy:0.933% 


In [None]:
pruned_model.calc_weights()

1158832

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!cp "drive/My Drive/Colab Notebooks/Pruning.ipynb" ./

!jupyter nbconvert --to latex Pruning.ipynb
!cp Pruning.tex "drive/My Drive/Colab Notebooks/"

Mounted at /content/drive
[NbConvertApp] Converting notebook Pruning.ipynb to latex
[NbConvertApp] Writing 66068 bytes to Pruning.tex
