In [10]:
import torch
import torch.nn as nn
from tqdm import tqdm
import torchvision
from torchvision.datasets import CIFAR10, MNIST
from torchvision.transforms import Compose, ToTensor, Normalize, Lambda
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
import time
import random
import math
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from mlxtend.plotting import plot_confusion_matrix
torch.manual_seed(42)

<torch._C.Generator at 0x10ad06910>

In [11]:
# функція створення незбалансованого датасету
class ImbalanceCIFAR10(torchvision.datasets.CIFAR10):
    cls_num = 10
    def __init__(self, root, imb_type='exp', imb_factor=0.01, rand_number=1, train=True, transform=None, target_transform=None, download=False):
        super(ImbalanceCIFAR10, self).__init__(root, train, transform, target_transform, download)
        np.random.seed(rand_number)
        img_num_list = self.get_img_num_per_cls(self.cls_num, imb_type, imb_factor)
        self.gen_imbalanced_data(img_num_list)
    def get_img_num_per_cls(self, cls_num, imb_type, imb_factor):
        img_max = len(self.data) / cls_num
        img_num_per_cls = []
        if imb_type == 'exp':
            for cls_idx in range(cls_num):
                num = img_max * (imb_factor**(cls_idx / (cls_num - 1.0)))
                img_num_per_cls.append(int(num))
        elif imb_type == 'step':
            for cls_idx in range(cls_num // 2):
                img_num_per_cls.append(int(img_max))
            for cls_idx in range(cls_num // 2):
                img_num_per_cls.append(int(img_max * imb_factor))
        else:
            img_num_per_cls.extend([int(img_max)] * cls_num)
        return img_num_per_cls
    def gen_imbalanced_data(self, img_num_per_cls):
        new_data = []
        new_targets = []
        targets_np = np.array(self.targets, dtype=np.int64)
        classes = np.unique(targets_np)
        # np.random.shuffle(classes)
        self.num_per_cls_dict = dict()
        for the_class, the_img_num in zip(classes, img_num_per_cls):
            self.num_per_cls_dict[the_class] = the_img_num
            idx = np.where(targets_np == the_class)[0]
            np.random.shuffle(idx)
            selec_idx = idx[:the_img_num]
            new_data.append(self.data[selec_idx, ...])
            new_targets.extend([the_class, ] * the_img_num)
        new_data = np.vstack(new_data)
        self.data = new_data
        self.targets = new_targets
    def get_cls_num_list(self):
        cls_num_list = []
        for i in range(self.cls_num):
            cls_num_list.append(self.num_per_cls_dict[i])
        return cls_num_list
    # cifar-10 незбалансований
def CIFAR10_Imbalanced_loaders(train_batch_size=1000, test_batch_size=10000):
    transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,)), Lambda(lambda x: torch.flatten(x))])
    train_loader = DataLoader(ImbalanceCIFAR10('./data/', train=True, download=True, transform=transform),batch_size=train_batch_size, shuffle=True)
    eval_train_loader = DataLoader(CIFAR10('./data/', train=True, download=True, transform=transform), batch_size=test_batch_size, shuffle=False)
    eval_test_loader = DataLoader(CIFAR10('./data/', train=False, download=True, transform=transform), batch_size=test_batch_size, shuffle=False)
    return train_loader, eval_train_loader, eval_test_loader
train_loader, eval_train_loader, eval_test_loader = CIFAR10_Imbalanced_loaders()
# cifar-10 збалансований
def CIFAR10_loaders(train_batch_size=1000, test_batch_size=10000):
    transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,)), Lambda(lambda x: torch.flatten(x))])
    train_loader = DataLoader(CIFAR10('./data/', train=True, download=True, transform=transform), batch_size=train_batch_size, shuffle=True)
    eval_train_loader = DataLoader( CIFAR10('./data/', train=True, download=True, transform=transform),batch_size=test_batch_size, shuffle=False)
    eval_test_loader = DataLoader( CIFAR10('./data/', train=False, download=True, transform=transform), batch_size=test_batch_size, shuffle=False)
    return train_loader, eval_train_loader, eval_test_loader 
train_loader, eval_train_loader, eval_test_loader = CIFAR10_loaders()
# mnist
def MNIST_loaders(train_batch_size=1000, test_batch_size=10000):
    transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,)), Lambda(lambda x: torch.flatten(x))])
    train_loader = DataLoader(MNIST('./data/', train=True, download=True, transform=transform), batch_size=train_batch_size, shuffle=True)
    eval_train_loader = DataLoader(MNIST('./data/', train=True, download=True, transform=transform), batch_size=test_batch_size, shuffle=False)
    eval_test_loader = DataLoader(
    MNIST('./data/', train=False, download=True, transform=transform), batch_size=test_batch_size, shuffle=False)
    return train_loader, eval_train_loader, eval_test_loader
train_loader, eval_train_loader, eval_test_loader = MNIST_loaders()

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [12]:
# нанесення класів на дані
def create_data_pos(images, labels):
    return overlay_labels_on_images(images, labels)
def create_data_neg(images, labels):
    labels_neg = labels.clone()
    for idx, y in enumerate(labels):
        all_labels = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        all_labels.pop(y.item()) # remove y from labels to generate negative data
    labels_neg[idx] = torch.tensor(np.random.choice(all_labels)).cuda()
    return overlay_labels_on_images(images, labels_neg)
def overlay_labels_on_images(images, labels):
    """Replace the first 10 pixels of images with one-hot-encoded labels"""
    num_images = images.shape[0]
    data = images.clone()
    data[:, :10] *= 0.0
    data[range(0,num_images), labels] = images.max()
    return data
# підрахунок середнього з середньоквадратичним відхиленням
def meanWithStdDeviation(name, lst, unit):
    values = torch.tensor(lst)
    mean = round(torch.mean(values).item(), 2)
    std = round(torch.std(values).item(), 2)
    print(name, ": ", mean, "±", std, unit)
# вивід матриці помилок
def plt_cm(cm):
    plot_confusion_matrix(conf_mat=cm, figsize=(8,8))
    plt.title('Confusion Matrix', fontsize=14)
    plt.tight_layout()
    plt.show()
# нормалізація матриці помилок
def generate_random_value(max):
    while True:
        x = random.uniform(0, 2)
        y = random.uniform(0, 1)
        if y <= math.exp(-((2 - 2 * x) ** 2)):
            return x/2*(max)
def clarify_cm(confusion_matrix):
    num_classes = confusion_matrix.shape[0]
    mean = np.mean(confusion_matrix, axis=None)
    max = np.max(confusion_matrix, axis=None)
    for i in range(num_classes):
        for j in range(num_classes):
            if i == j:
                confusion_matrix[i][j] = 0
            else:
                if confusion_matrix[i][j] < 2/3*max:
                    confusion_matrix[i][j] += generate_random_value(max)
    return confusion_matrix

In [13]:
class FFLayer(nn.Linear):
    def __init__(self, in_features, out_features, pow, norm, bias=True, device=None, dtype=None):
        super().__init__(in_features, out_features, bias, device, dtype)
        self.relu = torch.nn.ReLU()
        self.opt = torch.optim.AdamW(self.parameters(), lr=0.02)
        self.threshold = 3.0
        self.pow = pow
        self.norm = norm
def forward(self, x):
    x_direction = x / (torch.nan_to_num(x.norm(self.norm, 1, keepdim=True)) + 1e-4)
    return self.relu(torch.mm(x_direction, self.weight.T) + self.bias.unsqueeze(0))
def train(self, x_pos, x_neg):
    g_pos = self.forward(x_pos).pow(self.pow).mean(1)
    g_neg = self.forward(x_neg).pow(self.pow).mean(1)
    # # The following loss pushes pos (neg) samples to values larger (smaller) than the self.threshold.
    loss = torch.log(1 + torch.exp(torch.cat([-g_pos + self.threshold,
    g_neg - self.threshold]))).mean()
    self.opt.zero_grad()
    # this backward just compute the derivative and hence is not considered backpropagation.
    loss.backward()
    self.opt.step()
    return self.forward(x_pos).detach(), self.forward(x_neg).detach(), loss.detach()

class FFNet(torch.nn.Module):
    def __init__(self, dims, pow, norm):
        super(FFNet, self).__init__()
        self.weight = torch.nn.Parameter(torch.randn(10, 20))
        self.layers = []
        self.pow = pow
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        for d in range(len(dims) - 1):
            layer = FFLayer(dims[d], dims[d + 1], pow, norm)
            self.add_module("fc{}".format(d), layer)
            self.layers += [layer.cuda()]
    def train(self, data_loader, num_epochs, cm):
        cached_data = []
        for epoch in tqdm(range(num_epochs)):
            epoch_loss = 0
            for batch_i, (x_batch, y_batch) in enumerate(data_loader):
                if (epoch + 1) == 1:
                    h_batch_pos, h_batch_neg = create_data_pos(x_batch, y_batch), create_data_neg(x_batch, y_batch, cm)
                    h_batch_pos, h_batch_neg = h_batch_pos.to(self.device), h_batch_neg.to(self.device)
                    cached_data.append((h_batch_pos, h_batch_neg))
                else:
                    h_batch_pos, h_batch_neg = cached_data[batch_i]
                    for layer_i, layer in enumerate(self.layers):
                        h_batch_pos_epoch, h_batch_neg_epoch, loss = layer.train(h_batch_pos, h_batch_neg)
                        epoch_loss += loss.item()
                        h_batch_pos, h_batch_neg = h_batch_pos_epoch, h_batch_neg_epoch
            print(' epoch {} loss: {}'.format(epoch + 1, epoch_loss))
    @torch.no_grad()
    def predict(self, data_loader):
        all_predictions = torch.Tensor([])
        all_labels = torch.Tensor([])
        all_predictions, all_labels = all_predictions.to(self.device), all_labels.to(self.device)
        for batch_i, (x_batch, y_batch) in enumerate(data_loader):
            x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)
            goodness_per_label_batch = []
            for label in range(10):
                h_batch = overlay_labels_on_images(x_batch, label)
                goodness_batch = []
                for layer in self.layers:
                    h_batch = layer(h_batch)
                    goodness_batch += [h_batch.pow(self.pow).mean(1)]
                goodness_per_label_batch += [sum(goodness_batch).unsqueeze(1)]
                goodness_per_label_batch = torch.cat(goodness_per_label_batch, 1)
                predictions_batch = goodness_per_label_batch.argmax(1)
                all_predictions = torch.cat((all_predictions, predictions_batch), 0)
                all_labels = torch.cat((all_labels, y_batch), 0)
        return all_predictions.eq(all_labels).float().mean().item(), all_predictions, all_labels
    @torch.no_grad()
    def count_neurons(self):
        total_neurons = 0
        for parameter in self.parameters():
            total_neurons += parameter.numel()
        return total_neurons

In [16]:
# проведення експериметів без балансування
def experiment(dims, epochs = 20, pow = 2, norm = 2, iters = 3):
    times = []
    train_accuracies = []
    test_accuracies = []
    for i in range(iters):
        net = FFNet(dims, epochs, pow, norm)
        time_training_start = time.time()
        net.train(train_loader)
        time_training_end = time.time()
        training_time = time_training_end - time_training_start
        times.append(training_time)
        train_accuracy, predicted_y_train, y_train = net.predict(eval_train_loader)
        test_accuracy, predicted_y_test, y_test = net.predict(eval_test_loader)
        train_accuracies.append(train_accuracy * 100)
        test_accuracies.append(test_accuracy * 100)
        print("Total parameters:", net.count_neurons())
        meanWithStdDeviation("Time", times, "s")
        meanWithStdDeviation("Train accuracy", train_accuracies, "%")
        meanWithStdDeviation("Test accuracy", test_accuracies, "%")
# проведення експериметів з балансуванням
def iterate_cm(dims, num_epochs = 20, pow = 2, norm = 2, cm = None, model_path = None):
    net = FFNet(dims, pow, norm)
    if(model_path):
        net.load_state_dict(torch.load(model_path))
    time_training_start = time.time()
    net.train(train_loader, num_epochs, cm)
    time_training_end = time.time()
    training_time = time_training_end - time_training_start
    train_accuracy, predicted_y_train, y_train = net.predict(eval_train_loader)
    test_accuracy, predicted_y_test, y_test = net.predict(eval_test_loader)
    cm = confusion_matrix(y_test.tolist(), predicted_y_test.tolist())
    print("Total parameters:", net.count_neurons())
    return round(training_time, 2), round(train_accuracy * 100, 2), round(test_accuracy * 100, 2), cm, net
def cm_experiment(dims, num_epochs = 10, iters = 10, pow = 2, norm = 2, model_paths = [], cm=[], times=[], train_accuracies=[], test_accuracies=[]):
    model_path = None
    for i in range(1, iters + 1):
        print("Epochs:",(i-1)*num_epochs, "-", i*num_epochs)
        if(len(cm)):
            cm = clarify_cm(cm)
        if(len(model_paths)):
            model_path = model_paths[-1]
            training_time, train_accuracy, test_accuracy, cm, net = iterate_cm(dims, num_epochs, pow, norm, cm, model_path)
        if(type(cm) == 'list'):
            cm = np.array(cm)
        file_path = f"cifarmodel_{i}.pth"
        # torch.save(net.state_dict(), file_path)
        model_paths.append(file_path)
        times.append(time)
        train_accuracies.append(time)
        test_accuracies.append(time)
        plt_cm(cm)
        print("Time", training_time, "s")
        print("Train accuracy", train_accuracy, "%")
        print("Test accuracy", test_accuracy, "%")
    plt_cm(cm)
    return model_paths, train_accuracies, test_accuracies, times, cm

In [17]:
train_loader, eval_train_loader, eval_test_loader = CIFAR10_loaders()

cm_experiment([3072, 3072, 3072, 10])

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Epochs: 0 - 10


AttributeError: 'list' object has no attribute 'sum'