In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.image as mpimg
!pip install seaborn
import seaborn as sns
import scipy
from scipy.linalg import orth
from tqdm import tqdm
import pickle as pkl
from copy import deepcopy

import PIL
from PIL import Image
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms, utils, models
from torchvision.utils import save_image
from torchvision.datasets import MNIST
import random

from ds import *
from networks import *
from utils import *

!pip install captum
from captum.attr import IntegratedGradients
from captum.attr import Saliency
from captum.attr import DeepLift
from captum.attr import NoiseTunnel
from captum.attr import visualization as viz
from captum.attr import GuidedGradCam
from captum.attr import GradientShap
from captum.attr import InputXGradient

import scipy
from scipy.ndimage import gaussian_filter

import cv2

## Get data loaders

In [None]:
train_tfm = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor()
])
train_dset = CxVAE_Dset(
    csv_file='../../../Datasets/chest_xray_pneumonia/train_labels.csv', 
    root_dir='../../../Datasets/chest_xray_pneumonia/images_224x256/',
    tfm=train_tfm
)
val_dset = CxVAE_Dset(
    csv_file='../../../Datasets/chest_xray_pneumonia/val_labels.csv', 
    root_dir='../../../Datasets/chest_xray_pneumonia/images_224x256/'
)
test_dset = CxVAE_Dset(
    csv_file='../../../Datasets/chest_xray_pneumonia/test_labels.csv', 
    root_dir='../../../Datasets/chest_xray_pneumonia/images_224x256/'
)

train_loader = DataLoader(train_dset, batch_size=4, shuffle=True, num_workers=16, pin_memory=False)
val_loader = DataLoader(val_dset, batch_size=4, shuffle=False, num_workers=16, pin_memory=False)
test_loader = DataLoader(test_dset, batch_size=4, shuffle=False, num_workers=16, pin_memory=False)

## Define model and pass to the training loop

In [None]:
Net = models.vgg16(pretrained=True, progress=False)
print(Net)

# Freeze training for all layers
for param in Net.parameters():
    param.require_grad = False

# Newly created modules have require_grad=True by default
num_features = Net.classifier[6].in_features
fc_new = torch.nn.Linear(num_features, 2)
Net.classifier[6] = fc_new
print(Net)
Net.load_state_dict(torch.load('../ckpt/VGG16_w_AE_pneu_best.pth'))

NetAE = AutoEncoder(3, 3, 8, 4, 32, 16)
NetAE.load_state_dict(torch.load('../ckpt/AE_pneu_best.pth'))

In [None]:
eval_classifier_w_AE_loop(
    test_loader,
    NetAE,
    Net,
    dtype = torch.cuda.FloatTensor,
    device='cuda',
)

In [None]:
def attribute_image_features(algorithm, input, label, **kwargs):
    tensor_attributions = algorithm.attribute(
        input,
        target=label,
        **kwargs
    )
    return tensor_attributions

In [None]:
def overlay_saliency_map(i1_rec, smap, alpha=0.5):
    smap = (smap-np.min(smap))/(np.max(smap) - np.min(smap))
    mask = smap > 0.75*np.max(smap)
    smap = mask*smap
    smap = np.mean(smap, axis=2)
    smap = gaussian_filter(smap, sigma=10)
    smap = (smap-np.min(smap))/(np.max(smap) - np.min(smap))
    smap = np.uint8(smap*255)
    smap = cv2.applyColorMap(smap, colormap=cv2.COLORMAP_PLASMA)
    alpha = 0.5
    i1_rec = np.uint8(i1_rec*255)
    smap = cv2.addWeighted(i1_rec, alpha, smap, 1-alpha, 0)
    return smap


def show_classifier_w_AE_interp(
    eval_loader,
    NetAE,
    Net,
    dtype = torch.cuda.FloatTensor,
    device='cuda',
    n_show = 5,
):
    NetAE.to(device)
    NetAE.type(dtype)
    # Freeze training for all layers
    for param in NetAE.parameters():
        param.require_grad = False
    NetAE.eval()
    
    Net.to(device)
    Net.type(dtype)
    Net.eval()
    tot_err = 0
    tot_samples = 0
    for idx, (xin, yout) in enumerate(eval_loader):
        if idx>= n_show:
            break
        xin, yout = xin.to(device), yout.to(device)
        xin_rec, _ = NetAE(xin)
        output = F.log_softmax(Net(xin_rec))
        predictions = output.argmax(dim=1, keepdim=True).squeeze()
        n_batch = xin.shape[0]
        for j in range(n_batch):
            i1 = xin[j].data.cpu().transpose(0,2).transpose(0,1).numpy()
            i1_rec = xin_rec[j].data.cpu().transpose(0,2).transpose(0,1).clip(0,1).numpy()
            
            saliency = Saliency(Net)
            grads = saliency.attribute(xin_rec[j].unsqueeze(0), target=yout[j].unsqueeze(0))
            grads = np.transpose(grads.squeeze().data.cpu().detach().numpy(), (1, 2, 0))
            
            ig = IntegratedGradients(Net)
            attr_ig, delta = attribute_image_features(ig, xin_rec[j].unsqueeze(0), yout[j], baselines=xin_rec[j].unsqueeze(0) * 0, return_convergence_delta=True)
            attr_ig = np.transpose(attr_ig.squeeze().cpu().detach().numpy(), (1, 2, 0))
            
            ggcam = GuidedGradCam(Net, Net.features[10])
            attr_ggcam = attribute_image_features(ggcam, xin_rec[j].unsqueeze(0), yout[j])
            attr_ggcam = np.transpose(attr_ggcam.squeeze().cpu().detach().numpy(), (1, 2, 0))
            
            input_x_gradient = InputXGradient(Net)
            attr_ixg = attribute_image_features(input_x_gradient, xin_rec[j].unsqueeze(0), yout[j])
            attr_ixg = np.transpose(attr_ixg.squeeze().data.cpu().detach().numpy(), (1, 2, 0))
            
            grads = overlay_saliency_map(i1_rec, grads, alpha=0.5)
            attr_ggcam = overlay_saliency_map(i1_rec, attr_ggcam, alpha=0.5)
            attr_ig = overlay_saliency_map(i1_rec, attr_ig, alpha=0.5)
            attr_ixg = overlay_saliency_map(i1_rec, attr_ixg, alpha=0.5)
            
            print('GT: {}, predicted: {}'.format(yout[j], predictions[j]))
            
            plt.figure(figsize=(24,8))
            
            plt.subplot(1,6,1)
            plt.imshow(i1)
            plt.axis('off')
            plt.subplot(1,6,2)
            plt.imshow(i1_rec)
            plt.axis('off')
            
            plt.subplot(1,6,3)
            ax = plt.gca()
            im = ax.imshow(grads)
            plt.axis('off')
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("bottom", size="5%", pad=0.05)
            plt.colorbar(im, cax=cax, orientation="horizontal")
            
            plt.subplot(1,6,4)
            ax = plt.gca()
            im = ax.imshow(attr_ggcam)
            plt.axis('off')
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("bottom", size="5%", pad=0.05)
            plt.colorbar(im, cax=cax, orientation="horizontal")
            
            plt.subplot(1,6,5)
            ax = plt.gca()
            im = ax.imshow(attr_ig)
            plt.axis('off')
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("bottom", size="5%", pad=0.05)
            plt.colorbar(im, cax=cax, orientation="horizontal")
            
            plt.subplot(1,6,6)
            ax = plt.gca()
            im = ax.imshow(attr_ixg)
            plt.axis('off')
            divider = make_axes_locatable(ax)
            cax = divider.append_axes("bottom", size="5%", pad=0.05)
            plt.colorbar(im, cax=cax, orientation="horizontal")
            
            
            plt.show()

In [None]:
show_classifier_w_AE_interp(
    test_loader,
    NetAE,
    Net,
    dtype = torch.cuda.FloatTensor,
    device='cuda',
    n_show = 3,
)

In [None]:
def compute_tangent_space(NetAE, 
    xin,
    orth = True,
    device='cuda',
    dtype = torch.cuda.FloatTensor
):
    '''
    xin: has to be single image as a batch
    '''
    NetAE.to(device)
    NetAE.type(dtype)
    for param in NetAE.parameters():
        param.require_grad = True
        
    xin = xin.to(device)
    z = NetAE.encode(xin)
    z = z.detach()
    z.requires_grad = True
    out = NetAE.decode(z)
    
    out = out.squeeze()      # remove batch dimension
    output_shape = out.shape # store original output shape
    out = out.reshape(-1)    # and transform the output into a vector
    latent_dim = z.reshape(-1).shape[0]
    tangent_space = torch.zeros((latent_dim, out.shape[0]))
    for i in range(out.shape[0]):
        out[i].backward(retain_graph=True)
        tangent_space[:, i] = z.grad.reshape(-1)
        z.grad.zero_()
    tangent_space = tangent_space.reshape((-1, *output_shape)) # tangent space in model output shape
#             if transform is not None: # transform the output space
#                 tangent_space = transform(tangent_space)
    orth_tangent_space = True
    if orth:                 # orthogonalize the tangent space. note that the numerical dimension of the 
                             # tangent space might be less than the dimension of the latent space
        orth_shape = (-1, *tangent_space.shape[1:])  
        T = tangent_space.reshape((latent_dim, -1))
        orth_tangent_space = torch.zeros_like(T)
        T = scipy.linalg.orth(T.T).T
        T = torch.Tensor(T)
        for i in range(T.shape[0]):                        # the remaining dimensions are filled with zeros
            orth_tangent_space[i, :] = T[i, :]
        orth_tangent_space = orth_tangent_space.reshape(orth_shape)
    return tangent_space, orth_tangent_space

In [None]:
def project_into_tangent_space(tangent_space, vector):
    dim = tangent_space.shape[0]
    coeff = torch.zeros(dim)
    for i in range(dim):
        coeff[i] = tangent_space[i, :, :, :].flatten() @ vector.flatten()
    gradient_in_tangent_space = (coeff @ tangent_space.reshape((dim, -1))).reshape((3, 256, 224))
    return gradient_in_tangent_space


fraction_gradient_in_tangent_space = []
fraction_integrated_gradient_in_tangent_space = []
fraction_guided_gc_in_tangent_space = []
fraction_gshap_in_tangent_space = []

for idx, (xin, yout) in tqdm(enumerate(test_loader)):
    n_batch = xin.shape[0]
    for idy in range(n_batch):
        if os.path.exists('out/pneumonia_tspace_{}_{}.npy'.format(idx, idy)) and os.path.exists('out/pneumonia_otspace_{}_{}.npy'.format(idx, idy)):
            tangent_space, orth_tangent_space = np.load('out/pneumonia_tspace_{}_{}.npy'.format(idx, idy)), np.load('out/pneumonia_otspace_{}_{}.npy'.format(idx, idy))
        else:
            tangent_space, orth_tangent_space = compute_tangent_space(NetAE, xin[idy].unsqueeze(0), orth=True, device='cuda', dtype = torch.cuda.FloatTensor)
            np.save('out/pneumonia_tspace_{}_{}.npy'.format(idx, idy), tangent_space)
            np.save('out/pneumonia_otspace_{}_{}.npy'.format(idx, idy), orth_tangent_space)
        print('tspace done!')

        out, _ = NetAE(xin[idy].unsqueeze(0).cuda())
        out = out.detach()
        out.requires_grad = True

        prediction = Net(out)[0]
        c = torch.argmax(F.log_softmax(prediction))
        if c.item() == 1 and c.item() == yout[idy].item():
            prediction[c].backward()
            grad = deepcopy(out.grad.detach().reshape((3, 256, 224)))

            out.grad.zero_()
            ig = IntegratedGradients(Net)
            attr, delta = ig.attribute(out, target=c, return_convergence_delta=True)
            attr = deepcopy(attr.detach().reshape((3, 256, 224)))

            guided_gc = GuidedGradCam(Net, Net.features[10])
            attr1 = guided_gc.attribute(out, target=c)
            attr1 = deepcopy(attr1.detach().reshape((3, 256, 224)))
            
            gradient_shap = GradientShap(Net)
            # choosing baselines randomly
            baselines = torch.randn(20, 3, 256, 224).cuda()
            # Computes gradient shap for the input
            attr2 = gradient_shap.attribute(out, baselines, target=c)

            grad_in_tsp = project_into_tangent_space(torch.tensor(orth_tangent_space).to('cpu').float(), grad.to('cpu').float())
            ig_in_tsp = project_into_tangent_space(torch.tensor(orth_tangent_space).to('cpu').float(), attr.to('cpu').float())
            guided_gc_in_tsp = project_into_tangent_space(torch.tensor(orth_tangent_space).to('cpu').float(), attr1.to('cpu').float())
            gshap_in_tsp = project_into_tangent_space(torch.tensor(orth_tangent_space).to('cpu').float(), attr2.to('cpu').float())

            fgtsp = np.linalg.norm(grad_in_tsp.cpu().numpy())/np.linalg.norm(grad.cpu().numpy())
            figtsp = np.linalg.norm(ig_in_tsp.cpu().numpy())/np.linalg.norm(attr.cpu().numpy())
            fguidedgctsp = np.linalg.norm(guided_gc_in_tsp.cpu().numpy())/np.linalg.norm(attr1.cpu().numpy())
            fgshaptsp = np.linalg.norm(gshap_in_tsp.cpu().numpy())/np.linalg.norm(attr2.cpu().numpy())

            fraction_gradient_in_tangent_space.append(fgtsp)
            fraction_integrated_gradient_in_tangent_space.append(figtsp)
            fraction_guided_gc_in_tangent_space.append(fguidedgctsp)
            fraction_gshap_in_tangent_space.append(fgshaptsp)
            print(fgtsp)
            print(figtsp)
            print(fguidedgctsp)
            print(fgshaptsp)
            print('***')
            np.save('out/pneumonia_fgtsp.npy', fraction_gradient_in_tangent_space)
            np.save('out/pneumonia_figtsp.npy', fraction_integrated_gradient_in_tangent_space)
            np.save('out/pneumonia_fguidedgctsp.npy', fraction_guided_gc_in_tangent_space)
            np.save('out/pneumonia_fgshaptsp.npy', fraction_gshap_in_tangent_space)

In [None]:
# load results of computation
fraction_gradient_in_tangent_space = np.load('out/pneumonia_fgtsp.npy')
fraction_integrated_gradient_in_tangent_space = np.load('out/pneumonia_figtsp.npy')

In [None]:
for values, name in [(fraction_gradient_in_tangent_space, 'Fraction of gradient in\ntangent space'),
                     (fraction_integrated_gradient_in_tangent_space, 'Fraction of integrated gradient in\ntangent space'),
                     (fraction_guided_gc_in_tangent_space, 'Fraction of guided_gc in\ntangent space'),
                     (fraction_gshap_in_tangent_space, 'Fraction of gshap in\ntangent space'),
                    ]:
    sns.distplot(values, kde=False)
    plt.xlim([0, 1])
plt.title(f'Fraction in Tangent Space, Gradient shap vs. guded gcam vs. Raw Gradient vs. Integrated Gradients')
plt.xlim([0.05, 0.45])
plt.show()

In [None]:
for values, name in [(fraction_gradient_in_tangent_space, 'Fraction of gradient in\ntangent space'),
                     (fraction_integrated_gradient_in_tangent_space, 'Fraction of integrated gradient in\ntangent space'),
                     (fraction_guided_gc_in_tangent_space, 'Fraction of guided_gc in\ntangent space')
                    ]:
    sns.distplot(values, kde=False)
    plt.xlim([0, 1])
plt.title(f'Fraction in Tangent Space, Raw Gradient vs. Integrated Gradients')
plt.xlim([0.15, 0.45])
plt.show()