In [2]:
import os
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '4'

In [3]:
import numpy as np
import pandas as pd

# import tqdm.notebook as tq
import tqdm as tq

import random

import torch
import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import OneHotEncoder

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [30, 10]

In [4]:
random_seed = 13

torch.manual_seed(random_seed)
random.seed(random_seed)
np.random.seed(random_seed)

In [5]:
dataset = 'FordA'

from sktime.datasets import load_UCR_UEA_dataset

X_train, y_train = load_UCR_UEA_dataset(name=dataset, split='train', return_type='numpyflat')
X_test, y_test = load_UCR_UEA_dataset(name=dataset, split='test', return_type='numpyflat')

In [6]:
print(f'Length training data: {len(X_train)} labels: {len(y_train)} test data: {len(X_test)} labels: {len(y_test)}')

Length training data: 3601 labels: 3601 test data: 1320 labels: 1320


In [7]:
encoder = OneHotEncoder(categories='auto', sparse_output=False)

y_train_ohe = encoder.fit_transform(np.expand_dims(y_train, axis=-1))
y_test_ohe = encoder.transform(np.expand_dims(y_test, axis=-1))

y_train_norm = y_train_ohe.argmax(axis=-1)
y_test_norm = y_test_ohe.argmax(axis=-1)

In [8]:
class FordADataset(Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        inputs = self.X[idx]
        label = self.y[idx]
        
        return inputs, label

In [9]:
dataset_train = FordADataset(X_train, y_train_ohe)
dataset_test = FordADataset(X_test, y_test_ohe)

In [10]:
dataloader_train = DataLoader(dataset_train, batch_size=120, shuffle=True)
dataloader_train_not_shuffled = DataLoader(dataset_train, batch_size=120, shuffle=False)
dataloader_test = DataLoader(dataset_test, batch_size=120, shuffle=False)

In [11]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 10, kernel_size=3, stride=1),
            nn.BatchNorm1d(10),
            nn.ReLU(inplace=True)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(10, 50, kernel_size=3, stride=1),
            nn.BatchNorm1d(50),
            nn.MaxPool1d(3),
            nn.ReLU(inplace=True)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(50, 50, kernel_size=3, stride=1),
            nn.BatchNorm1d(50),
            nn.MaxPool1d(3),
            nn.ReLU(inplace=True)
        )
        
        self.fc1 = nn.Sequential(
            nn.Linear(50 * 54, 50),
            nn.Dropout(0.5),
            nn.ReLU(inplace=True)
        )
        self.fc2 = nn.Sequential(
            nn.Linear(50, 2),
            nn.Softmax(-1)
        )
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)

        batch_size = x.shape[0]
        x = x.view(batch_size, -1)
        x = self.fc1(x)
        x = self.fc2(x)
        
        return x

In [16]:
model = torch.load('models/ford-a-cnn.pth')
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = model.to(device)

In [17]:
model.eval()

preds = []
labels = []
for x in dataloader_train_not_shuffled:
    input_, label_ = x
    input_ = input_.reshape(input_.shape[0], 1, -1)
    input_ = input_.float().to(device)
    label_ = label_.float().to(device)

    pred_ = model(input_)
    preds.extend(pred_)
    labels.extend(label_)

preds = torch.stack(preds)
labels = torch.stack(labels)
print('Prediction Accuracy Train', np.round((preds.argmax(dim=-1) == labels.argmax(dim=-1)).int().sum().float().item() / len(preds), 4))

y_train_pred = preds.cpu().detach().numpy().round(3)

Prediction Accuracy Train 0.9889


In [18]:
model.eval()

preds = []
labels = []
for x in dataloader_test:
    input_, label_ = x
    input_ = input_.reshape(input_.shape[0], 1, -1)
    input_ = input_.float().to(device)
    label_ = label_.float().to(device)

    pred_ = model(input_)
    preds.extend(pred_)
    labels.extend(label_)

preds = torch.stack(preds)
labels = torch.stack(labels)
print('Prediction Accuracy Test', np.round((preds.argmax(dim=-1) == labels.argmax(dim=-1)).int().sum().float().item() / len(preds), 4))

y_test_pred = preds.cpu().detach().numpy().round(3)

Prediction Accuracy Test 0.9121


# Save results as json

In [19]:
%load_ext autoreload
%autoreload 2

In [20]:
import exp_perturbation_analysis as exp_pa

import exp_stability_indicator as exp_si

import exp_plots as exp_pl

import scipy as sp

In [21]:
from captum.attr import GradientShap, DeepLiftShap, IntegratedGradients, ShapleyValueSampling, Saliency, DeepLift

In [22]:
attribution_measure = exp_si.attribution_stability_indicator
get_asi_for_dataset = exp_si.get_asi_for_dataset

In [23]:
sample_idx = 2
sample_ts_np, sample_l = dataloader_test.dataset[sample_idx]

sample, label = dataset_train[0]
shape = sample_ts_np.reshape(1, -1).shape

baselines = torch.from_numpy(np.array([dataset_test[torch.randint(len(dataset_test), (1,))][0] for _ in range(10)])).reshape(-1, *shape).float().to(device)



attribution_techniques = [
    ['Saliency', Saliency, {}],
    ['GradientShap', GradientShap, {'baselines': baselines}],
    
    ['DeepLift', DeepLift, {}],
    ['DeepLiftShap', DeepLiftShap, {'baselines': baselines}],
    
    ['IntegratedGradients', IntegratedGradients, {}],
    
#     ['ShapleyValueSampling', ShapleyValueSampling, {}],
]

# Check hyperparameters

In [24]:
from itertools import product

loader = dataloader_train_not_shuffled

weights_to_test = ['amax', 'js', 'ts', 'att']
weights_boundaries = [0, 3, 1]

def define_weights(dimensions, boundaries):
    low_bound, high_bound, steps = boundaries
    scaled_steps = 1 / steps
    high_bound = int(high_bound * scaled_steps + 1)
    return np.array(list(product(range(low_bound, high_bound), repeat=dimensions))) / scaled_steps

weights_to_test = define_weights(len(weights_to_test), weights_boundaries)

In [25]:
weights_to_test[1:,:], weights_to_test[1:,:].shape

(array([[0., 0., 0., 1.],
        [0., 0., 0., 2.],
        [0., 0., 0., 3.],
        ...,
        [3., 3., 3., 1.],
        [3., 3., 3., 2.],
        [3., 3., 3., 3.]]),
 (255, 4))

In [26]:
weights_to_test.shape[0] * 240 / 60 / 60

17.066666666666666

In [27]:
import os

import json

class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, np.int64):
            return str(obj)
        if isinstance(obj, np.float32):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
    
def checkpoint_save(file_name, data):
    if os.path.exists(file_name):
        with open(f'{file_name}', 'r+') as f:
            old_data = json.load(f)
    else:
        old_data = []
    old_data += data
    old_data_json = json.dumps(old_data, cls=NumpyArrayEncoder)
    with open(f'{file_name}', 'w+') as f:
        f.write(old_data_json)

In [None]:
file_name = 'results/ford-a-cnn-results'

results = []

for i, w_to_test in tq.tqdm(enumerate(weights_to_test[1:,:])):
    ret_dict = get_asi_for_dataset(loader, model, attribution_techniques, w_to_test, no_bar=True, full_dict=False)
    if i % 50 == 0:
        checkpoint_save(f'{file_name}.json', results)
        results = []
    results.append([i, w_to_test, ret_dict])
checkpoint_save(f'{file_name}.json', results)

               activations. The hooks and attributes will be removed
            after the attribution is finished
44it [7:24:19, 604.26s/it]