In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '1'


In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import torchvision
import pandas as pd
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import seaborn as sns
from torchvision import datasets as datasets, transforms
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt

from model_resnet import ResNet18, ResNet50, ResNet50_dropblock, ResNet50_dropchannel, ResNet50_droplayer
from datasets import get_data



In [None]:
class Args:
    batch_size = 256
    train_batch_size = 5000
    lr = 1e-2
    gamma = 0.97
    log_interval = 100
    epochs = 50

args = Args()



In [None]:
dataset = 'svhn'
loaders = get_data(dataset)

In [None]:
def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))



In [None]:
train_loader = loaders['train']
test_loader = loaders['valid']

model_mode = ['drop_block', 'drop_channel', 'drop_layer'][2]
model_class = {
    '': ResNet50,
    'drop_block': ResNet50_dropblock,
    'drop_channel': ResNet50_dropchannel,
    'drop_layer': ResNet50_droplayer
}

model = model_class(input_size=3, dropout_rate = 0.2)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
optimizer = torch.optim.Adadelta(model.parameters(), lr=args.lr)

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
    train(args, model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)
    scheduler.step()

In [None]:
predictions = []
labels = []
probs = []
with torch.no_grad():
    for x_batch, y_batch in  test_loader:
        logits = model(x_batch.cuda())
        probs.append(torch.max(torch.softmax(logits, dim=-1), dim=-1).values.cpu().numpy())
        predictions.append(torch.argmax(logits, dim=-1).cpu().numpy())
        labels.append(y_batch.numpy())
labels = np.concatenate(labels)
predictions = np.concatenate(predictions, axis=-1)
probs = np.concatenate(probs, axis=-1)


In [None]:
errors = labels != predictions


In [None]:
uq = 1 - probs


In [None]:
T = 200

# model.train()
model.eval()
predictions = []
for t in range(T):
    if (t+1) % 10  == 0:
        print(t)
    probs = []
    for x_batch, y_batch in  test_loader:
        with torch.no_grad():
            logits = model(x_batch.cuda(), dropout_mask=True)
        probs.append(torch.softmax(logits, dim=-1).cpu().numpy())
    predictions.append(np.concatenate(probs, axis=0))


In [None]:
predictions = np.array(predictions)
predictions.shape


In [None]:
def entropy(preds):
    return -np.mean((np.sum(np.log(preds) * preds, axis=-1)), axis=0)

def std_uq(preds):
    idxs = np.argmax(np.mean(preds, axis=0), axis=-1)
    return np.std(preds[:, np.arange(len(idxs)), idxs], axis=0)

def bald(preds):
    means = np.mean(preds, axis=0)
    return - np.sum(np.log(means) * means, axis=-1) + entropy(preds)

def max_with_std(preds):
    stds = np.std(preds, axis=0)
    means = np.mean(preds, axis=0)
    mean_std = means - stds
    return 1 - np.max(mean_std, axis=-1)


acquisition = max_with_std
uq_mc = acquisition(predictions)


In [None]:
plt.style.use(['seaborn'])
fpr, tpr, _ = roc_curve(errors, uq)
plt.plot(fpr, tpr, label=f'max prob uncertainty')

fpr, tpr, _ = roc_curve(errors, uq_mc)
plt.plot(fpr, tpr, label=f'mc dropout ({acquisition.__name__})')
plt.title(f"Resnet50, {model_mode}, {dataset}")
plt.legend()
plt.show()



### Error by rejection

In [None]:
slices = np.linspace(0.5, 1, 50)

accuracies_max_prob = []
accuracies_mc = []


for slice in slices:
    num_samples = int(len(uq)*slice)
    idx = np.argsort(uq)[:num_samples]
    accuracies_max_prob.append(1 - np.sum(errors[idx]) / num_samples)

    idx = np.argsort(uq_mc)[:num_samples]
    accuracies_mc.append(1 - np.sum(errors[idx]) / num_samples)

plt.title(f'Accuracy by partly reject to classify, Resnet50, {model_mode}, {dataset}')
plt.plot(slices, accuracies_max_prob, label=f'max prob uncertainty')
plt.plot(slices, accuracies_mc, label=f'mc dropout ({acquisition.__name__})')
plt.legend()



In [None]:
aucs = []
nums =  np.linspace(2, 500, 50)
max_score = 0
for num in nums:
    uq_sub = acquisition(predictions[:int(num)])
    score = roc_auc_score(errors, uq_sub)
    if score > max_score:
        i = num
        max_score = score
    aucs.append(score)
plt.plot(nums, aucs)
print(i)

In [None]:



stds = np.std(predictions, axis=0)
means = np.mean(predictions, axis=0)


In [None]:
df = pd.DataFrame({'errors':errors, 'stds': stds[:, 0]})
plt.figure(figsize=(10, 8))
sns.displot(df, x='stds', bins=20)
plt.title("Mc dropout std distribution")


In [None]:
uq_1 = entropy(predictions)
uq_2 = max_with_std(predictions)
plt.scatter(uq_1, uq_2)

In [None]:
mean_std = means - stds
args_1 = np.argmax(means, axis=-1)
args_2 = np.argmax(mean_std, axis=-1)
np.sum(args_1 != args_2)

In [None]:
diff_idx = args_1 != args_2

In [None]:
diff_idx


In [None]:
# plt.rcParams['axes.facecolor'] = 'white'

plt.style.use(['seaborn'])
plt.title("Points with diffent max classes by mean prob and mean-std prob")
plt.xlabel('Max prob')
plt.ylabel('Max prob for mean std')
plt.scatter(np.max(means[diff_idx], axis=-1), np.max(mean_std[diff_idx], axis=-1))


In [None]:
images = []
labels = []

for x_batch, y_batch in test_loader:
    images.append(x_batch)
    labels.append(y_batch)



In [None]:
imgs_t = torch.cat(images, dim=0)


In [None]:
imgs = np.concatenate(images, axis=0)
img_labels = np.concatenate(labels, axis=0)

In [None]:
def plt_img(img):
    img = np.rollaxis(img, 0, 3)
    plt.imshow(img)
    plt.show()

plt_img(imgs[0])

In [None]:
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


In [None]:
import torchvision
plt.style.use(['default'])

plt.figure(figsize=(10, 4))
plt.title("common images")
imshow(torchvision.utils.make_grid(imgs_t[:8]))
plt.show()

plt.figure(figsize=(10, 4))
plt.title("strange images")
imshow(torchvision.utils.make_grid(imgs_t[diff_idx][:8]))


## TSNE implementation

In [None]:
# Build embeddings
embeddings = []
val_labels = []
model.eval()

for data, labels in test_loader:
    with torch.no_grad():
        model(data.to(device))
    embeddings.append(model.embedding.to('cpu').numpy())
    val_labels.append(labels)
embeddings = np.concatenate(embeddings, axis=0)
val_labels = np.concatenate(val_labels)


In [None]:
from sklearn.manifold import TSNE

nums = 2000
data_1000 = embeddings[:nums]
labels_1000 = val_labels[:nums]
errors_1000 = diff_idx[:nums]

tsne = TSNE(n_components=2, random_state=0)
# configuring the parameteres
# the number of components = 2
# default perplexity = 30
# default learning rate = 200
# default Maximum number of iterations for the optimization = 1000
tsne_dims = tsne.fit_transform(data_1000)
tsne_data = np.vstack((tsne_dims.T, labels_1000, errors_1000)).T
# creating a new data frame which help us in ploting the result data


In [None]:
tsne_df = pd.DataFrame(data=tsne_data, columns=('dim_1', 'dim_2', 'label', 'errors'))
# Ploting the result of tsne
sns.FacetGrid(tsne_df, hue='errors', size=5).map(plt.scatter, 'dim_1', 'dim_2').add_legend()
plt.title("Point diff by different acquisition")
plt.show()
sns.FacetGrid(tsne_df, hue='label', size=5).map(plt.scatter, 'dim_1', 'dim_2').add_legend()
plt.title("Class representation")
plt.show()


### OOD test

In [None]:
ood_loaders = get_data('cifar')

model.eval()
probs = []
with torch.no_grad():
    for x_batch, y_batch in  ood_loaders['valid']:
        logits = model(x_batch.cuda())
        probs.append(torch.max(torch.softmax(logits, dim=-1), dim=-1).values.cpu().numpy())
probs = np.concatenate(probs, axis=-1)
uq_ood = 1 - probs

ood_predictions = []
for t in range(T):
    if (t+1) % 10  == 0:
        print(t)
    probs = []
    for x_batch, y_batch in  ood_loaders['valid']:
        with torch.no_grad():
            logits = model(x_batch.cuda(), dropout_mask=True)
        probs.append(torch.softmax(logits, dim=-1).cpu().numpy())
    ood_predictions.append(np.concatenate(probs, axis=0))

In [None]:
uq_ood_mc = acquisition(ood_predictions)
norm_num = 4000
ood_num = 1000

errors_all = np.concatenate((errors[:norm_num], np.ones(ood_num)))
uq_all = np.concatenate((uq[:norm_num], uq_ood[:ood_num]))
uq_all_mc = np.concatenate((uq_mc[:norm_num], uq_ood_mc[:ood_num]))

In [None]:
slices = np.linspace(0.5, 1, 50)
accuracies_max_prob = []
accuracies_mc = []

for slice in slices:
    num_samples = int(len(uq_all)*slice)
    idx = np.argsort(uq_all)[:num_samples]
    accuracies_max_prob.append(1 - np.sum(errors_all[idx]) / num_samples)

    idx = np.argsort(uq_all_mc)[:num_samples]
    accuracies_mc.append(1 - np.sum(errors_all[idx]) / num_samples)

plt.title(f'Accuracy by partly reject to classify with  (4000 norm/ 1000 ood points), Resnet50, {model_mode}, {dataset}')
plt.plot(slices, accuracies_max_prob, label=f'max prob uncertainty')
plt.plot(slices, accuracies_mc, label=f'mc dropout ({acquisition.__name__})')
plt.legend()