In [1]:
import numpy as np
import keras
from keras.datasets import cifar10, cifar100
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
import os
import sys

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
''' Utilities for concept activation vectors '''
import numpy as np

from keras import backend as k
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

def return_split_models(model, layer):
    ''' Split a model into model_f and model_h

    Parameters
    ----------
    model : (keras.engine.sequential.Sequential)
        Keras sequential model to split
    layer : (int)
        Integer specifying layer to split model on

    Returns
    -------
    model_f : (keras.engine.sequential.Sequential)
        Keras sequential model that is the first part
    model_h : (keras.engine.sequential.Sequential)
        Keras sequential model that is the second part
    '''
    model_f, model_h = Sequential(), Sequential()
    for current_layer in range(0, layer+1):
        model_f.add(model.layers[current_layer])
    for current_layer in range(layer+1, len(model.layers)):
        model_h.add(model.layers[current_layer])
    return model_f, model_h

def train_cav(model_f, x_concept, y_concept):
    ''' Return the concept activation vector for the concept

    Parameters
    ----------
    model_f : (keras.engine.sequential.Sequential)
        First Keras sequential model from return_split_models()
    x_concept : (numpy.ndarray)
        Training data for concept set, has same size as model training data
    y_concept : (numpy.ndarray)
        Labels for concept set, has same size as model training labels

    Returns
    -------
    cav : (numpy.ndarray)
        Concept activation vector
    '''
    concept_activations = model_f.predict(x_concept)
    binary_classifier = Sequential()
    binary_classifier.add(Dense(1, input_shape=concept_activations.shape[1:], activation='sigmoid'))
    binary_classifier.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])
    binary_classifier.fit(concept_activations, y_concept, batch_size=32, epochs=20, shuffle=True)
    cav = binary_concept_classifier.layers[0].get_weights()[0]
    return cav

def conceptual_sensitivity(example, model_f, model_h, concept_cav):
    ''' Return the conceptual conceptual sensitivity for a given example

    Parameters
    ----------
    example : (numpy.ndarray)
        Example to calculate the concept sensitivity (be sure to reshape)
    model_f : (keras.engine.sequential.Sequential)
        First Keras sequential model from return_split_models()
    model_h : (keras.engine.sequential.Sequential)
        Second Keras sequential model from return_split_models()
    concept_cav : (numpy.ndarray)
        Numpy array with the linear concept activation vector for a given concept

    Returns
    -------
    sensitivity : (float32)
        Sensitivity for inputted examples
    '''
    model_f_activations = model_f.predict(example)[0]
    gradients = k.gradients(model_h.output, model_h.input)
    gradient_func = k.function([model_h.input], gradients)
    calc_grad = gradient_func([model_f_activations])[0]
    sensitivity = np.dot(calc_grad, concept_cav)
    return sensitivity

def tcav_score(x_train, y_train, model, layer, x_concept, y_concept):
    ''' Returns the TCAV score for the training data to a given concept

    Parameters
    ----------
    x_train : (numpy.ndarray)
        Training data where the i-th entry as x_train[i] is one example
    y_train : (numpy.ndarray)
        Training labels where the i-th entry as y_train[i] is one example
    model : (keras.engine.sequential.Sequential)
        Trained model to use
    layer : (int)
        Integer specifying layer to split model on
    x_concept : (numpy.ndarray)
        Training data for concept set, has same size as model training data
    y_concept : (numpy.ndarray)
        Labels for concept set, has same size as model training labels

    Returns
    -------
    tcav : (list)
        TCAV score for given concept and class
    '''
    model_f, model_h = return_split_models(model, layer)
    concept_cav = train_cav(model_f, x_concept, y_concept)
    unique_labels = np.unique(y_train)
    tcav = []
    for label in unique_labels:
        training_subset = x_train[np.array(y_train) == 1]
        set_size = training_subset.shape[0]
        count_of_sensitivity = 0
        for example in training_subset:
            sensitivity = conceptual_sensitivity(example, model_f, model_h, concept_cav)
            if sensitivity > 0:
                count_of_sensitivity = count_of_sensitivity + 1
        tcav.append(count_of_sensitivity/set_size)
    return tcav


In [3]:
# Set parameters
batch_size = 32
epochs = 3

In [4]:
# The data, split between train and test sets:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Keep airplanes (0) and ships (8) from CIFAR-10
airplanes = y_train == [0]
ships = y_train == [8]
indices = airplanes + ships
indx_to_use = [i for i, x in enumerate(indices) if x]

x_train = x_train[indx_to_use]
y_train = y_train[indx_to_use]

y_train = (y_train == 8).astype(int)
y_train = np.concatenate(y_train).ravel().tolist()

# Ships are now 1, airplanes are 0

# keep cloud (50) and sea (54) from CIFAR-100
(x_train_concept, y_train_concept), (x_test_concept, y_test_concept) = cifar100.load_data()

other = y_train_concept == [47]
concept = y_train_concept == [54]
indices = other + concept
indx_to_use = [i for i, x in enumerate(indices) if x]

x_train_concept = x_train_concept[indx_to_use]
y_train_concept = y_train_concept[indx_to_use]
y_train_concept = (y_train_concept == 54).astype(int)
y_train_concept = np.concatenate(y_train_concept).ravel().tolist()
# Sea is now 1, clouds are 0

In [5]:
model = Sequential()
model.add(Conv2D(32, (3, 3), padding='same',
                 input_shape=x_train.shape[1:]))
model.add(Activation('relu'))
model.add(Conv2D(32, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))

# initiate optimizer
opt = keras.optimizers.Adam(lr=0.001)

# train the model
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255

model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, shuffle=True)
model.save_weights("test_model.h5")

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [6]:
''' Utilities for concept activation vectors '''
import numpy as np

from keras import backend as k
from keras.models import Sequential
from keras.layers import Dense, InputLayer
from keras.optimizers import Adam

def return_split_models(model, layer):
    ''' Split a model into model_f and model_h

    Parameters
    ----------
    model : (keras.engine.sequential.Sequential)
        Keras sequential model to split
    layer : (int)
        Integer specifying layer to split model on

    Returns
    -------
    model_f : (keras.engine.sequential.Sequential)
        Keras sequential model that is the first part
    model_h : (keras.engine.sequential.Sequential)
        Keras sequential model that is the second part
    '''
    model_f, model_h = Sequential(), Sequential()
    for current_layer in range(0, layer+1):
        model_f.add(model.layers[current_layer])
    # Write input layer for model_h
    model_h.add(InputLayer(input_shape=model.layers[layer+1].input_shape[1:]))
    for current_layer in range(layer+1, len(model.layers)):
        model_h.add(model.layers[current_layer])
    return model_f, model_h


In [12]:
model_f, model_h = return_split_models(model, 12)

In [13]:
model_f.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 32, 32, 32)        896       
_________________________________________________________________
activation_1 (Activation)    (None, 32, 32, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 30, 30, 32)        9248      
_________________________________________________________________
activation_2 (Activation)    (None, 30, 30, 32)        0         
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 15, 15, 32)        0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 15, 15, 32)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 15, 15, 64)        18496     
__________

In [14]:
acts = model_f.predict(x_train_concept)

In [16]:
def train_cav(model_f, x_concept, y_concept):
    ''' Return the concept activation vector for the concept

    Parameters
    ----------
    model_f : (keras.engine.sequential.Sequential)
        First Keras sequential model from return_split_models()
    x_concept : (numpy.ndarray)
        Training data for concept set, has same size as model training data
    y_concept : (numpy.ndarray)
        Labels for concept set, has same size as model training labels

    Returns
    -------
    cav : (numpy.ndarray)
        Concept activation vector
    '''
    concept_activations = model_f.predict(x_concept)
    binary_classifier = Sequential()
    binary_classifier.add(Dense(1, input_shape=concept_activations.shape[1:], activation='sigmoid'))
    binary_classifier.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.001), metrics=['accuracy'])
    binary_classifier.fit(concept_activations, y_concept, batch_size=32, epochs=20, shuffle=True)
    cav = binary_classifier.layers[0].get_weights()[0]
    return cav

concept_cav = train_cav(model_f, x_train_concept, y_train_concept)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [20]:
def conceptual_sensitivity(example, model_f, model_h, concept_cav):
    ''' Return the conceptual conceptual sensitivity for a given example

    Parameters
    ----------
    example : (numpy.ndarray)
        Example to calculate the concept sensitivity (be sure to reshape)
    model_f : (keras.engine.sequential.Sequential)
        First Keras sequential model from return_split_models()
    model_h : (keras.engine.sequential.Sequential)
        Second Keras sequential model from return_split_models()
    concept_cav : (numpy.ndarray)
        Numpy array with the linear concept activation vector for a given concept

    Returns
    -------
    sensitivity : (float32)
        Sensitivity for inputted examples
    '''
    model_f_activations = model_f.predict(example)[0]
    gradients = k.gradients(model_h.output, model_h.input)
    gradient_func = k.function([model_h.input], gradients)
    calc_grad = gradient_func([model_f_activations])[0]
    sensitivity = np.dot(calc_grad, concept_cav)
    return sensitivity

conceptual_sensitivity(x_train[0], model_f, model_h, concept_cav)

ValueError: Error when checking input: expected conv2d_1_input to have 4 dimensions, but got array with shape (32, 32, 3)

In [22]:
model_f.predict(x_train)

array([[0.04790389, 0.        , 0.        , ..., 0.3659242 , 0.        ,
        0.        ],
       [0.212491  , 0.        , 0.        , ..., 0.18722388, 0.        ,
        0.28412533],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.10809718],
       ...,
       [0.        , 0.        , 0.        , ..., 0.04549147, 0.        ,
        0.        ],
       [0.10371948, 0.        , 0.        , ..., 0.        , 0.        ,
        0.04492123],
       [0.01309306, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ]], dtype=float32)

In [19]:
x_train[0]

array([[[0.5254902 , 0.7294118 , 0.8745098 ],
        [0.5137255 , 0.72156864, 0.8627451 ],
        [0.5019608 , 0.7137255 , 0.85490197],
        ...,
        [0.49803922, 0.70980394, 0.87058824],
        [0.49803922, 0.70980394, 0.87058824],
        [0.5019608 , 0.7137255 , 0.8745098 ]],

       [[0.52156866, 0.7411765 , 0.89411765],
        [0.5058824 , 0.7294118 , 0.8784314 ],
        [0.5019608 , 0.7294118 , 0.8784314 ],
        ...,
        [0.49803922, 0.7176471 , 0.8784314 ],
        [0.49803922, 0.7176471 , 0.8784314 ],
        [0.5019608 , 0.72156864, 0.88235295]],

       [[0.5019608 , 0.7254902 , 0.8862745 ],
        [0.49803922, 0.7137255 , 0.8745098 ],
        [0.5019608 , 0.7137255 , 0.8745098 ],
        ...,
        [0.49411765, 0.70980394, 0.87058824],
        [0.49411765, 0.70980394, 0.87058824],
        [0.49411765, 0.7058824 , 0.8666667 ]],

       ...,

       [[0.68235296, 0.8156863 , 0.92156863],
        [0.67058825, 0.80784315, 0.8980392 ],
        [0.60784316, 0

In [None]:
model_f = Sequential()
model_f.add(Conv2D(32, (3, 3), padding='same',
                 input_shape=x_train.shape[1:], weights = model.layers[0].get_weights()))
model_f.add(Activation('relu'))
model_f.add(Conv2D(32, (3, 3), weights = model.layers[2].get_weights()))
model_f.add(Activation('relu'))
model_f.add(MaxPooling2D(pool_size=(2, 2)))
model_f.add(Dropout(0.25))

model_f.add(Conv2D(64, (3, 3), padding='same', weights = model.layers[6].get_weights()))
model_f.add(Activation('relu'))
model_f.add(Conv2D(64, (3, 3), weights = model.layers[8].get_weights()))
model_f.add(Activation('relu'))
model_f.add(MaxPooling2D(pool_size=(2, 2)))
model_f.add(Flatten())

acts = model_f.predict(x_train_concept)

model_h = Sequential()
model_h.add(Dense(512, input_shape=acts.shape[1:], weights = model.layers[13].get_weights()))
model_h.add(Activation('relu'))
model_h.add(Dropout(0.5))
model_h.add(Dense(1, weights = model.layers[16].get_weights()))
model_h.add(Activation('sigmoid'))

concept_cav = train_cav(model_f, x_train_concept, y_train_concept)

In [None]:
training_subset = x_train[np.array(y_train) == 1]

gradients = k.gradients(model_h.output, model_h.input)
gradient_func = k.function([model_h.input], gradients)
pos_sens = 0
list_sens = []
for train_ex in training_subset:
    example = train_ex.reshape((1, 32, 32, 3))
    example_f = model_f.predict(example)[0].reshape((1,2304))
    calc_grad = gradient_func([example_f])[0]
    sensitivity = np.dot(calc_grad, concept_cav)
    list_sens.append(sensitivity)
    if sensitivity > 0:
        pos_sens = pos_sens + 1


In [None]:
import seaborn as sns, numpy as np
ax = sns.distplot(list_sens)
plt.show()

In [None]:
training_subset = x_train[np.array(y_train) == 0]
i = 777
example = training_subset[i].reshape((1, 32, 32, 3))
example_f = model_f.predict(example)[0].reshape((1,2304))
gradients = k.gradients(model_h.output, model_h.input)
gradient_func = k.function([model_h.input], gradients)
calc_grad = gradient_func([example_f])[0]
sensitivity = np.dot(calc_grad, concept_cav)
sensitivity