In [None]:
# !pip install pyitlib
# from pyitlib import discrete_random_variable as drv
import numpy as np
from glob import glob
import os
from PIL import Image
import operator
from shutil import copyfile
import torch
import torch.nn as nn
import torchvision
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import numpy.random as npr
from skimage.measure import shannon_entropy

train_on_gpu = torch.cuda.is_available()
print('Run on GPU: ' + str(train_on_gpu))

In [None]:
# create Dataset object to support batch training
class Dataset(torch.utils.data.Dataset):
    def __init__(self, features, targets, transform):
        self.features = features
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        if self.transform is None:
            return (self.features[idx], self.targets[idx])
        else:
            return (self.transform(self.features[idx]), self.targets[idx])

In [None]:
class Cutout(object):
    """Randomly mask out one or more patches from an image.
    Args:
        n_holes (int): Number of patches to cut out of each image.
        length (int): The length (in pixels) of each square patch.
    """
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        """
        Args:
            img (Tensor): Tensor image of size (C, H, W).
        Returns:
            Tensor: Image with n_holes of dimension length x length cut out of it.
        """
        h = img.size(1)
        w = img.size(2)

        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)

            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)

            mask[y1: y2, x1: x2] = 0.

        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)
        img = img * mask

        return img

In [None]:
normalize = torchvision.transforms.transforms.Normalize(
    mean=[x / 255.0 for x in [125.3, 123.0, 113.9]],
    std=[x / 255.0 for x in [63.0, 62.1, 66.7]])

In [None]:
# def calculate_entropy(image, x_axis = True):
#     grey_img = image.convert('L')
#     grey_img = np.array(grey_img).flatten()
#     entropy = drv.entropy(grey_img, Alphabet_X=np.arange(0, 256))
#     return entropy
# def calculate_entropy(img):
#     histogram = img.histogram()
#     total_pixels = sum(histogram)
#     probabilities = [count / total_pixels for count in histogram]
#     entropy = -sum(p * math.log2(p) for p in probabilities if p > 0)
#     return entropy

def calculate_entropy(img):
    image = np.array(img)
    image = image / 255.0
    entropy = shannon_entropy(image)
    return entropy

In [None]:
def remove_by_entropy(origin_dataset, percent):
    results = []
    entropy_values = []
    for item in origin_dataset:
      entropy = calculate_entropy(item[0])
      results.append([item, entropy])
      entropy_values.append(entropy)
      
    results = sorted(results, key=lambda x: x[1])

    count_to_remove = int((len(origin_dataset) * percent) / 100)
    print(count_to_remove);
    results = results[(count_to_remove - 1):]
    return results, entropy_values

In [None]:
def balanced_removal_by_entropy(origin_dataset, percent):
    results = []
    entropy_values = []
    for item in origin_dataset:
        entropy = calculate_entropy(item[0])
        results.append([item, entropy])
        entropy_values.append(entropy)

    results = sorted(results, key=lambda x: x[1])
    # Cifar 100 dataset contains 500 data points for each class
    count_to_remove = int((500 * percent) / 100)
    class_removed_count = [0] * len(np.unique(origin_dataset.targets))
    for item in results:
        if class_removed_count[item[0][1]] < count_to_remove:
            results.remove(item)
            class_removed_count[item[0][1]] += 1
    return results, entropy_values

In [None]:
removal_method = balanced_removal_by_entropy

In [None]:
def get_dataloader(trainset, testset, remove_images, percent = None):
    transform_train = torchvision.transforms.Compose([
      torchvision.transforms.RandomCrop(32, padding=4),
      torchvision.transforms.RandomHorizontalFlip(),
      torchvision.transforms.ToTensor(),
      normalize
    ])
    transform_train.transforms.append(Cutout(n_holes=1, length=16))

    if remove_images:  
        trainset, entropy_values = removal_method(trainset, percent)

        x_train =  [item[0][0] for item in trainset]
        y_train =  [item[0][1] for item in trainset]
    else:
        x_train =  [item[0] for item in trainset]
        y_train =  [item[1] for item in trainset]
        entropy_values = None
 
    train_dataset = Dataset(x_train, y_train, transform_train)

    testloader = torch.utils.data.DataLoader(testset, batch_size=1000, shuffle=False)
    
    return train_dataset, testloader, entropy_values

In [None]:
transform_test = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    normalize
])

trainset = torchvision.datasets.CIFAR100(root= 'data', train=True, download=True)
testset = torchvision.datasets.CIFAR100(root= 'data', train=False, download=True, transform=transform_test)
print('number of train images:' + str(len(trainset)))
print('number of test images:' + str(len(testset)))

In [None]:
# From https://github.com/xternalz/WideResNet-pytorch

import math
import torch
import torch.nn as nn
import torch.nn.functional as F


class BasicBlock(nn.Module):
    def __init__(self, in_planes, out_planes, stride, dropRate=0.0):
        super(BasicBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_planes)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.droprate = dropRate
        self.equalInOut = (in_planes == out_planes)
        self.convShortcut = (not self.equalInOut) and nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride,
                               padding=0, bias=False) or None
    def forward(self, x):
        if not self.equalInOut:
            x = self.relu1(self.bn1(x))
        else:
            out = self.relu1(self.bn1(x))
        out = self.relu2(self.bn2(self.conv1(out if self.equalInOut else x)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, training=self.training)
        out = self.conv2(out)
        return torch.add(x if self.equalInOut else self.convShortcut(x), out)

class NetworkBlock(nn.Module):
    def __init__(self, nb_layers, in_planes, out_planes, block, stride, dropRate=0.0):
        super(NetworkBlock, self).__init__()
        self.layer = self._make_layer(block, in_planes, out_planes, nb_layers, stride, dropRate)
    def _make_layer(self, block, in_planes, out_planes, nb_layers, stride, dropRate):
        layers = []
        for i in range(int(nb_layers)):
            layers.append(block(i == 0 and in_planes or out_planes, out_planes, i == 0 and stride or 1, dropRate))
        return nn.Sequential(*layers)
    def forward(self, x):
        return self.layer(x)

class WideResNet(nn.Module):
    def __init__(self, depth, num_classes, widen_factor=1, dropRate=0.0):
        super(WideResNet, self).__init__()
        nChannels = [16, 16*widen_factor, 32*widen_factor, 64*widen_factor]
        assert((depth - 4) % 6 == 0)
        n = (depth - 4) / 6
        block = BasicBlock
        # 1st conv before any network block
        self.conv1 = nn.Conv2d(3, nChannels[0], kernel_size=3, stride=1,
                               padding=1, bias=False)
        # 1st block
        self.block1 = NetworkBlock(n, nChannels[0], nChannels[1], block, 1, dropRate)
        # 2nd block
        self.block2 = NetworkBlock(n, nChannels[1], nChannels[2], block, 2, dropRate)
        # 3rd block
        self.block3 = NetworkBlock(n, nChannels[2], nChannels[3], block, 2, dropRate)
        # global average pooling and classifier
        self.bn1 = nn.BatchNorm2d(nChannels[3])
        self.relu = nn.ReLU(inplace=True)
        self.fc = nn.Linear(nChannels[3], num_classes)
        self.nChannels = nChannels[3]

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.bias.data.zero_()
    def forward(self, x):
        out = self.conv1(x)
        out = self.block1(out)
        out = self.block2(out)
        out = self.block3(out)
        out = self.relu(self.bn1(out))

        out = F.avg_pool2d(out, 8)
        out = out.view(-1, self.nChannels)
        out = self.fc(out)
        return out 

In [None]:
def get_predictions(model,data):
    return model(data).cpu().numpy().argmax(axis=1)    

In [None]:
import copy

def run_model(model, optimizer, learning_rate, epoch_num, train_dataset, testloader):
    if train_on_gpu:
       model.cuda()

    criterion = torch.nn.CrossEntropyLoss().cuda()
    criterion.__init__(reduce=False)
    
    if optimizer.__name__ == "SGD":
        optimizer = optimizer(model.parameters(), lr=learning_rate, momentum=0.9, nesterov=True, weight_decay=5e-4)
        lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[60, 120, 160], gamma=0.2)
    else:
        optimizer = optimizer(model.parameters(), lr=learning_rate)  # for Adam

    val_accs = np.zeros(epoch_num)  

    lr_arr = []
    p_bar = tqdm(range(epoch_num))

    for i_epoch in p_bar:
        model.train()

        if isinstance(optimizer, torch.optim.SGD):
            lr_arr.append(lr_scheduler.get_last_lr()[0])

        trainset_permutation_inds = npr.permutation(np.arange(len(train_dataset.targets)))
        for batch_idx, batch_start_ind in enumerate(range(0, len(train_dataset.targets), 128)):
            # Get trainset indices for batch
            batch_inds = trainset_permutation_inds[batch_start_ind: batch_start_ind + 128]
            # Get batch inputs and targets, transform them appropriately
            transformed_trainset = []
            for ind in batch_inds:
                transformed_trainset.append(train_dataset.__getitem__(ind)[0])
            inputs = torch.stack(transformed_trainset)
            targets = torch.LongTensor(np.array(train_dataset.targets)[batch_inds].tolist())
            # Map to available device
            inputs, targets = inputs.cuda(), targets.cuda()
            # Forward propagation, compute loss, get predictions
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            _, predicted = torch.max(outputs.data, 1)

            # Update loss, backward propagate, update optimizer
            loss = loss.mean()
            loss.backward()
            optimizer.step()

        if isinstance(optimizer, torch.optim.SGD):
            lr_scheduler.step()

        correct = 0.
        total = 0.
        with torch.no_grad():  # no need to calculate gradients when assessing accuracy
            for i_batch, (X_val, Y_val) in enumerate(testloader):

                if (train_on_gpu):
                    X_val, Y_val = X_val.cuda(), Y_val.cuda()

                model.eval()

                pred_val = get_predictions(model, X_val)
                total += X_val.size(0)
                correct += (pred_val == Y_val.cpu().numpy()).sum()

            val_acc = 100. * correct.item() / total
            val_accs[i_epoch] = val_acc
            p_bar.set_description(("max accuracy: " + str(val_accs.max()) + ' accuracy: ' + str(val_acc)),
                                  refresh=True)
        
    return val_accs, lr_arr


In [None]:
torch.manual_seed(5)
torch.cuda.manual_seed(5)

In [None]:
entropy_percent = 5
train_dataset, testloader, entropy_values = get_dataloader(trainset, testset, True, entropy_percent)
print('number of remaining train images:' + str(len(train_dataset)))
print('number of remaining test images:' + str(len(testloader.dataset)))
percent = ((len(trainset) - len(train_dataset)) * 100)/len(trainset) 
print('removed images:' + str(percent) + '%')

number of remaining train images:47500
number of remaining test images:10000
removed images:5.0%


In [None]:
wideResNet = WideResNet(depth=28, num_classes=100, widen_factor=10, dropRate=0.3) 
optimizer = torch.optim.Adam
val_acc = run_model(wideResNet, optimizer, 0.001, 200, train_dataset, testloader)

In [None]:
plt.plot(val_acc)  
plt.xlabel('Epochs')
plt.ylabel('Val Accuracy')
plt.show()

In [None]:
# Draw entropy histogram
import math
n = len(entropy_values)
range_val = max(entropy_values) - min(entropy_values)
root = int(math.sqrt(n));
intervals_Width = range_val/root
bins = [(min(entropy_values) + (x * intervals_Width)) for x in np.arange (0, root, 1)]
plt.hist(entropy_values, bins=bins)
plt.xlabel('Entropy Value')
plt.ylabel('Number of images')
plt.show()

In [None]:
# Draw entropy histograms per class
def get_entropies_by_class(dataset, class_num):
  entropies = []
  for item in dataset :
     if item[1] == class_num:
        entropy = calculate_entropy(item[0])
        entropies.append(entropy)
  return entropies
 
import math

for index in range(100):
  entropy_values = get_entropies_by_class(trainset, index)
 
  n = len(entropy_values)
  range_val = max(entropy_values) - min(entropy_values)
  root = int(math.sqrt(n));
  intervals_Width = range_val/root
  bins = [(min(entropy_values) + (x * intervals_Width)) for x in np.arange (0, root, 1)]
  plt.hist(entropy_values, bins=bins)
  plt.xlabel('class: ' + str(index))
  plt.show()