<a href="https://colab.research.google.com/github/maragraziani/interpretAI_DigiPath/blob/main/2DCNNs/concept_attribution_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Concept attribution with Regression Concept Vectors
This notebook wil walk you through the implementation of one approach to concept-based explanability, namely that of Regression Concept Vectors (RCVs). We will directly experiment with pathology images. 

In [1]:
!pip install rcvtool

Collecting rcvtool
  Downloading https://files.pythonhosted.org/packages/00/c6/18c038d7bcaeb95c093188501b7162422ee8dc3823a3e028e959c63a9c86/rcvtool-0.1.5.tar.gz
Building wheels for collected packages: rcvtool
  Building wheel for rcvtool (setup.py) ... [?25l[?25hdone
  Created wheel for rcvtool: filename=rcvtool-0.1.5-cp37-none-any.whl size=8716 sha256=33bef5fbf2857dafb13e42c14710096836a93364d72862eee2d27b16e5a884cc
  Stored in directory: /root/.cache/pip/wheels/df/66/2b/8091875f34198257a1317c4ee538e46d077569b8f3d8136bec
Successfully built rcvtool
Installing collected packages: rcvtool
Successfully installed rcvtool-0.1.5


In [2]:
import tensorflow.keras
import sys
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Activation, Flatten, Dropout, Conv2D, MaxPooling2D, GlobalAveragePooling2D
from tensorflow.keras.optimizers import SGD, Adadelta
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import metrics
from tensorflow.keras.preprocessing import image
from sklearn.metrics import roc_curve, auc
import time
import sys
import shutil
import numpy as np
import cv2

In [3]:
tf.compat.v1.disable_eager_execution()
tf.keras.backend.clear_session()

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
BATCH_SIZE = 32
seed=0
np.random.seed(seed)

In [6]:
# BATCH GENERATORS AND STAIN NORMALIZATION FUNCTIONS
import sys
sys.path.append('/content/drive/MyDrive/CNNinterpret')
from normalizers import *
global normalizer

def get_normalizer(patch, save_folder=''):
    normalizer = ReinhardNormalizer()
    normalizer.fit(patch)
    np.save('{}/normalizer'.format(save_folder),normalizer)
    np.save('{}/normalizing_patch'.format(save_folder), patch)
    print('Normalisers saved to disk.')
    return normalizer

normalizer=get_normalizer(np.load('/content/drive/MyDrive/CNNinterpret/normalizing_patch.npy'))

def normalize_patch(patch, normalizer):
    return np.float64(normalizer.transform(np.uint8(patch)))

def get_batch_data(patch_list, labels, batch_size=32):
    num_samples=len(patch_list)
    while True:
        for offset in range(0,num_samples, batch_size):
            batch_x = []
            batch_y = []
            batch_samples=patch_list[offset:offset+batch_size]
            for patch_id in range(len(batch_samples)):
                #print(len(batch_samples))
                patch=patch_list[patch_id]
                patch=normalize_patch(patch, normalizer)
                # VGG input is fixed to 224,224 for finetuning imagenet weights
                patch=cv2.resize(patch, (224,224)) 
                patch=tensorflow.keras.applications.vgg16.preprocess_input(patch)
                label=labels[patch_id]
                batch_x.append(patch)
                batch_y.append(label)
            
            batch_x=np.asarray(batch_x, dtype=np.float32)/255.
            batch_y=np.asarray(batch_y, dtype=np.float32)
            yield batch_x, batch_y

Using brightness standardization
Normalisers saved to disk.


In [7]:
import h5py as hd
y_test = np.asarray(hd.File('/content/drive/MyDrive/CNNinterpret/camelyonpatch_level_2_split_valid_y.h5', 'r')['y'][:])
x_test = np.asarray(hd.File('/content/drive/MyDrive/CNNinterpret/camelyonpatch_level_2_split_valid_x.h5', 'r')['x'][:])
y_test=y_test[:].ravel()

In [8]:
#Let's load the VGG weights finetuned on the histopathology dataset.
base_model=tensorflow.keras.applications.VGG16()
predictions=tensorflow.keras.layers.Dense(1,activation='sigmoid')(base_model.layers[-2].output)
vgg_histo_model = tensorflow.keras.Model(base_model.input, predictions)
vgg_histo_model.load_weights('/content/drive/MyDrive/CNNinterpret/weights.h5')
vgg_histo_model.compile(loss=tensorflow.keras.losses.binary_crossentropy, optimizer=tensorflow.keras.optimizers.SGD(lr=1e-4), metrics=['accuracy'])

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


In [9]:
vgg_histo_model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 56, 56, 128)       0     

In [10]:
# Let's extract the concept measures
import rcvtool as rcv
NSAMPLES=50
test_generator = get_batch_data(x_test[:NSAMPLES], y_test[:NSAMPLES])

layer = 'block5_conv3'
all_cm=np.zeros(NSAMPLES)
all_acts=np.zeros((NSAMPLES, 512))

i=0
for step in range(NSAMPLES//BATCH_SIZE):
  x_b, y_b = test_generator.__next__()
  cm_b=[]
  for x in x_b:
    cm_b.append(rcv.get_texture_measure(x, mtype='contrast')) 
  all_cm[i:i+len(cm_b)]=cm_b
  acts_b=rcv.get_batch_activations(vgg_histo_model, 'block5_conv3',x_b,pooling='AVG')
  all_acts[i:i+len(acts_b)]=acts_b
  i+=len(x_b)


  import pandas.util.testing as tm


In [11]:
contrast_vector = rcv.get_rcv(all_acts,all_cm,type='global linear', evaluation=True, verbose=True)

Global linear regression under euclidean assumption
Random state:  1
R2:  1.0
TEST mse: 398.48120179273263, r2: 0.9594215068685689


In [12]:
contrast_vector=contrast_vector.params
contrast_vector=contrast_vector[1:]
contrast_vector/=np.linalg.norm(contrast_vector)

In [13]:
""" Functions to compute conceptual sensitivity scores """
def get_directional_derivative(model, rcv_vector, reference_layer, class_id, input_image):
  def gap(x):
        """Utility function to perform gap to a tensor """
        return tf.keras.backend.mean(x, axis=(1,2))
  y_c = model.output[0, class_id]
  conv_output = model.get_layer(reference_layer).output
  grads = tf.keras.backend.gradients(y_c, conv_output)[0]
  grads = gap(grads)
  directional_derivative = tf.matmul(grads, tf.expand_dims(tf.convert_to_tensor(rcv_vector, dtype=tf.float32), axis=1))
  get_derivative = tf.keras.backend.function([model.input], [directional_derivative])
  return get_derivative(input_image) [0]


In [14]:
x_b, y_b = test_generator.__next__()

In [15]:
N_TEST_SAMPLES=10
conceptual_sensitivities=[]
for i in range(N_TEST_SAMPLES):
    conceptual_sensitivities.append(get_directional_derivative(vgg_histo_model, contrast_vector, 'block5_conv3', 0, np.expand_dims(x_b[i],axis=0))[0][0])

In [16]:
conceptual_sensitivities

[0.000206186,
 0.00027738482,
 0.00031645334,
 0.00012242963,
 0.0002872779,
 0.00012964806,
 0.00025965975,
 8.393503e-05,
 0.00029746583,
 9.536054e-05]