# Get Network Activations for Test Images
*Written by Viviane Clay*
### Load Dependencies

In [1]:
import os
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
import tensorflow as tf
import tensorflow.contrib.layers as c_layers

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
print(tf.__version__)

1.13.1


### Load Test Data
Data set and trained models can be downloaded here: http://dx.doi.org/10.17632/zdh4d5ws2z.2 For the paper a subset of the full test set is used. To do this exclude all folders with [x,x,x,1] which are folders containing images labeled as 'puzzle'.

In [2]:
balancedTestSetPath = '../../Results/TowerTraining/BalancedTestSet/'
datagen = ImageDataGenerator(validation_split=0)
test_bal = datagen.flow_from_directory(balancedTestSetPath, class_mode='sparse',
                                       batch_size=1350,shuffle=False,subset="training",target_size=(168,168))
# change batch_size to 1600 if also testing for puzzles
realLabel = []
for c,v in test_bal.class_indices.items():
    c_ext = np.fromstring(c[1:-1], dtype=int, sep=', ')
    realLabel.append(c_ext)

def getRealLabel(labelBatch,RL):
    newLB = []
    for label in labelBatch:
        l = RL[int(label)]
        newLB.append(l)
    return newLB

def getConceptSubset(AllConceptExamples, numExp):
    subset = np.random.randint(0,AllConceptExamples.shape[0],numExp)
    trainExp = AllConceptExamples[subset]
    mask = np.ones(AllConceptExamples.shape, bool)
    mask[subset] = False
    testExp = AllConceptExamples[mask]
    return trainExp,  testExp

Found 1350 images belonging to 13 classes.


In [3]:
obs,label = test_bal.next()
obs = obs/255
y = np.array(getRealLabel(label, realLabel))

## Get Network Activations
### Embodied Agent
Adapt paths to model checkpoints. Checkpoints can be found in the folder 'data/agent_checkpoints'

In [5]:
#external rewards agent (47 days)
ckpt_path_ext = '../../Results/TowerTraining/models/TowerF4/TowerF4_Baseline-0/LearningBrain/model-30100120.cptk'
#curious & external rewards agent (61 days)
ckpt_path_extint = '../../Results/TowerTraining/models/TowerF4/TowerF4_Cur-0/LearningBrain/model-36351394.cptk'
#curious agent (129 days)
ckpt_path_int = '../../Results/TowerTraining/models/TowerF4/TowerF4_Cur_NoR7-0/LearningBrain/model-82450000.cptk'

tf.reset_default_graph()

def swish(input_activation):
    """Swish activation function. For more info: https://arxiv.org/abs/1710.05941"""
    return tf.multiply(input_activation, tf.nn.sigmoid(input_activation))

def create_global_steps():
    """Creates TF ops to track and increment global training step."""
    global_step = tf.Variable(0, name="global_step", trainable=False, dtype=tf.int32)
    increment_step = tf.assign(global_step, tf.add(global_step, 1))
    return global_step, increment_step


global_step, increment_step = create_global_steps()

o_size_h = 168
o_size_w = 168
vec_obs_size = 8
num_layers = 2
h_size = 256
h_size_vec = 256
            
visual_in = tf.placeholder(shape=[None, o_size_h, o_size_w, 3], dtype=tf.float32,name="visual_observation_0")

running_mean = tf.get_variable("running_mean", [vec_obs_size],trainable=False, dtype=tf.float32,initializer=tf.zeros_initializer())
running_variance = tf.get_variable("running_variance", [vec_obs_size],trainable=False,dtype=tf.float32,initializer=tf.ones_initializer())

def create_vector_observation_encoder(observation_input, h_size, activation, num_layers, scope,reuse):
    with tf.variable_scope(scope):
        hidden_vec = observation_input
        for i in range(num_layers):
            hidden_vec = tf.layers.dense(hidden_vec, h_size, activation=activation, reuse=reuse,name="hidden_{}".format(i),kernel_initializer=c_layers.variance_scaling_initializer(1.0))
    return hidden_vec

def create_visual_observation_encoder(image_input, h_size, activation, num_layers, scope,reuse):
    with tf.variable_scope(scope):
        conv1 = tf.layers.conv2d(image_input, 16, kernel_size=[8, 8], strides=[4, 4],activation=tf.nn.elu, reuse=reuse, name="conv_1")
        conv2 = tf.layers.conv2d(conv1, 32, kernel_size=[4, 4], strides=[2, 2],activation=tf.nn.elu, reuse=reuse, name="conv_2")
        hidden_vis = c_layers.flatten(conv2)

    with tf.variable_scope(scope + '/' + 'flat_encoding'):
        hidden_flat = create_vector_observation_encoder(hidden_vis, h_size, activation,num_layers, scope, reuse)
    return hidden_flat


visual_encoders = []
hidden_state, hidden_visual = None, None

encoded_visual = create_visual_observation_encoder(visual_in,h_size,swish,num_layers,"main_graph_0_encoder0", False)
visual_encoders.append(encoded_visual)
hidden_visual = tf.concat(visual_encoders, axis=1)

sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)

saver = tf.train.Saver()

saver.restore(sess, ckpt_path_int)

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use keras.layers.conv2d instead.
Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Use keras.layers.dense instead.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
INFO:tensorflow:Restoring parameters from ../../Results/TowerTraining/models/TowerF4/TowerF4_Cur_NoR7-0/LearningBrain/model-82450000.cptk


2022-01-03 15:37:20.959003: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA


In [6]:
encBal = sess.run(hidden_visual, feed_dict={visual_in: obs})

In [7]:
#change last line of network code to saver.restore(sess, ckpt_path_ext)
encExt = sess.run(hidden_visual, feed_dict={visual_in: obs})

In [8]:
#change last line of network code to saver.restore(sess, ckpt_path_extint)
encExtInt = sess.run(hidden_visual, feed_dict={visual_in: obs})

### Autoencoder

In [4]:
from keras import backend as K
from keras.models import Sequential,Model
from keras.layers.convolutional import Conv3D, Conv2D, UpSampling2D,Conv2DTranspose
from keras.layers.convolutional_recurrent import ConvLSTM2D
from keras.layers.normalization import BatchNormalization
from keras.layers import Dense,MaxPooling2D,TimeDistributed,Input,concatenate,Flatten,Reshape,LSTM,Lambda

K.clear_session()

def swish(input_activation):
    """Swish activation function. For more info: https://arxiv.org/abs/1710.05941"""
    return tf.multiply(input_activation, tf.nn.sigmoid(input_activation))

inImg = Input(batch_shape=(None,168, 168, 3),name="input_1")
conv = Conv2D(filters=16, kernel_size=[8, 8], strides=[4, 4],activation=tf.nn.elu, name="conv_1")(inImg)
print(conv)
conv = Conv2D(filters=32, kernel_size=[4, 4], strides=[2, 2],activation=tf.nn.elu, name="conv_2")(conv)
print(conv)
flat = Reshape((19*19*32,))(conv)
print(flat)
dens = Dense(256,activation=swish,kernel_initializer=c_layers.variance_scaling_initializer(1.0), name="dens_1")(flat)
print(dens)
enc = Dense(256,activation=swish,kernel_initializer=c_layers.variance_scaling_initializer(1.0), name="dens_2")(dens)
print(enc)
de_dens = Dense(20*20*32,activation=swish,kernel_initializer=c_layers.variance_scaling_initializer(1.0), name="dens_3")(enc)
print(de_dens)
shaped = Reshape((20, 20, 32))(de_dens)
print(shaped)
de_conv = Conv2DTranspose(filters=16, kernel_size=[4, 4], strides=[2, 2],activation=tf.nn.elu, name="deconv_1")(shaped)
print(de_conv)

prediction = Conv2DTranspose(filters=3, kernel_size=[8, 8], strides=[4, 4],padding='same',activation=tf.nn.elu, name="deconv_3")(de_conv)
print(prediction)
model = Model(inputs=inImg, outputs=prediction)

model.compile(optimizer='adadelta',loss='mean_squared_error',metrics=['accuracy','mse'])
# adapt this path. Trained model is in the folder 'data'
model.load_weights('../../Results/TowerTraining/Recordings/Standard/3999_16.100/autoencoder/aemodelAdam50E.h5')

intermediate_layer_model = Model(inputs=model.input,
                                 outputs=[model.get_layer('dens_2').output,model.get_layer('deconv_3').output])

Instructions for updating:
Colocations handled automatically by placer.
Tensor("conv_1/Elu:0", shape=(?, 41, 41, 16), dtype=float32)
Tensor("conv_2/Elu:0", shape=(?, 19, 19, 32), dtype=float32)
Tensor("reshape_1/Reshape:0", shape=(?, 11552), dtype=float32)
Tensor("dens_1/Mul:0", shape=(?, 256), dtype=float32)
Tensor("dens_2/Mul:0", shape=(?, 256), dtype=float32)
Tensor("dens_3/Mul:0", shape=(?, 12800), dtype=float32)
Tensor("reshape_2/Reshape:0", shape=(?, 20, 20, 32), dtype=float32)
Tensor("deconv_1/Elu:0", shape=(?, ?, ?, 16), dtype=float32)
Tensor("deconv_3/Elu:0", shape=(?, ?, ?, 3), dtype=float32)


2022-01-03 15:42:24.535202: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA


In [5]:
encAE = intermediate_layer_model.predict(obs)
encAE = encAE[0]

### BYOL

### Classifier

In [10]:
tf.reset_default_graph()

def swish(input_activation):
    """Swish activation function. For more info: https://arxiv.org/abs/1710.05941"""
    return tf.multiply(input_activation, tf.nn.sigmoid(input_activation))

o_size_h = 168
o_size_w = 168
num_layers = 2
h_size = 256
h_size_vec = 256
            
visual_in = tf.placeholder(shape=[None, o_size_h, o_size_w, 3], dtype=tf.float32,name="visual_observation_0")
labels = tf.placeholder(shape=[None,8], dtype=tf.int64,name="labels")

def create_vector_observation_encoder(observation_input, h_size, activation, num_layers, scope,reuse):
    with tf.variable_scope(scope):
        hidden_vec = observation_input
        for i in range(num_layers):
            hidden_vec = tf.layers.dense(hidden_vec, h_size, activation=activation, reuse=reuse,name="hidden_{}".format(i),kernel_initializer=c_layers.variance_scaling_initializer(1.0))
    return hidden_vec

def create_visual_observation_encoder(image_input, h_size, activation, num_layers, scope,reuse):
    with tf.variable_scope(scope):
        conv1 = tf.layers.conv2d(image_input, 16, kernel_size=[8, 8], strides=[4, 4],activation=tf.nn.elu, reuse=reuse, name="conv_1")
        conv2 = tf.layers.conv2d(conv1, 32, kernel_size=[4, 4], strides=[2, 2],activation=tf.nn.elu, reuse=reuse, name="conv_2")
        hidden_vis = c_layers.flatten(conv2)

    with tf.variable_scope(scope + '/' + 'flat_encoding'):
        hidden_flat = create_vector_observation_encoder(hidden_vis, h_size, activation,num_layers, scope, reuse)
    return hidden_flat

visual_encoders = []

encoded_visual = create_visual_observation_encoder(visual_in,h_size,swish,num_layers,"main_graph_0_encoder0", False)
visual_encoders.append(encoded_visual)
hidden = tf.concat(visual_encoders, axis=1)

out_acts = []
for o in range(8):
    out_acts.append(tf.layers.dense(hidden, 2, activation=tf.nn.softmax, use_bias=False,kernel_initializer=c_layers.variance_scaling_initializer(factor=0.01)))
print(out_acts)

output = tf.concat([tf.multinomial(tf.log(out_acts[k]), 1) for k in range(8)], axis=1)#sample outputs from log probdist
print(output)
#output = tf.round(out_act)
#normalized_logits = tf.identity(normalized_logits_flat, name='action')#has nan in places where prob is negative bc it it log(probs)

comparison = tf.equal(labels, output)

accuracy = tf.reduce_mean(tf.cast(comparison, dtype=tf.float32))

saver = tf.train.Saver()

[<tf.Tensor 'dense/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_1/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_2/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_3/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_4/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_5/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_6/Softmax:0' shape=(?, 2) dtype=float32>, <tf.Tensor 'dense_7/Softmax:0' shape=(?, 2) dtype=float32>]
Tensor("concat_1:0", shape=(?, 8), dtype=int64)


In [11]:
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
# adapt this path. Trained model is in the folder 'data'
outRand,actRand = sess.run([output,hidden], feed_dict = {visual_in: obs})
saver.restore(sess, "../../Results/TowerTraining/Classifier/Model_flat_weighted/model.ckpt")
outClass,actClass = sess.run([output,hidden], feed_dict = {visual_in: obs})

INFO:tensorflow:Restoring parameters from ../../Results/TowerTraining/Classifier/Model_flat_weighted/model.ckpt


In [8]:
def getFlatLabel(label2D):
    flatLabels = []
    for label in label2D:
        flatLabel = np.zeros(8)
        if label[0] == 0:
            flatLabel[0] = 1
        elif label[0] == 1:
            flatLabel[1] = 1
        elif label[0] == 2:
            flatLabel[2] = 1
        elif label[0] == 3:
            flatLabel[3] = 1
        elif label[0] == 4:
            flatLabel[4] = 1
        if label[1] == 1:
            flatLabel[5] = 1
        if label[2] == 1:
            flatLabel[6] = 1
        if label[3] == 1:
            flatLabel[7] = 1
        flatLabels.append(flatLabel)
    return np.array(flatLabels)
flatL = getFlatLabel(y)[:,:-1]

In [9]:
#calculte calssifier stats with same amount of positive as negative examples per concept 
#to do this negative examples are randomly sampled from all possible negative examples

accuracies, precisions, recalls, f1scores = [],[],[],[]
for c in range(7):
    accuracy, precision, recall, f1score = [],[],[],[]
    for i in range(1000):#repeat multiple times to avoid effect of random sampling for negative examples
        classIDs = np.where(flatL[:,c]==1)[0]
        numExp = np.shape(classIDs)[0]
        negExpIDs = np.where(flatL[:,c]==0)[0]
        np.random.shuffle(negExpIDs)
        negExpIDs = negExpIDs[:numExp]

        true_pos = np.sum(outClass[classIDs,c] == flatL[classIDs,c])
        true_neg = np.sum(outClass[negExpIDs,c] == flatL[negExpIDs,c])
        false_pos = np.sum(outClass[negExpIDs,c] != flatL[negExpIDs,c])
        false_neg = np.sum(outClass[classIDs,c] != flatL[classIDs,c])

        accuracy.append((true_neg+true_pos)/(numExp*2))
        precision.append(true_pos/(true_pos+false_pos))
        recall.append(true_pos/(true_pos+false_neg))
        f1score.append(2*(precision[i]*recall[i]/(precision[i]+recall[i])))
    accuracies.append(np.mean(accuracy)*100)
    precisions.append(np.mean(precision)*100)
    recalls.append(np.mean(recall)*100)
    f1scores.append(np.mean(f1score)*100)
classifier_stats = {'Accuracy': np.mean(accuracies),
            'Accuracies': np.array(accuracies),
            'Precisions': np.array(precisions),
            'Recalls': np.array(recalls),
            'F1Scores': np.array(f1scores)}
classifier_stats

{'Accuracy': 78.6261469387755,
 'Accuracies': array([84.10842857, 88.4538    , 84.21      , 72.4986    , 85.027     ,
        58.5466    , 77.5386    ]),
 'Precisions': array([91.00782645, 98.62420721, 85.52777471, 89.23971613, 96.37859743,
        86.2125284 , 96.43816935]),
 'Recalls': array([75.71428571, 78.        , 82.4       , 51.2       , 72.8       ,
        20.4       , 57.2       ]),
 'F1Scores': array([82.65558629, 87.10680125, 83.92670424, 65.06043012, 82.94337239,
        32.98348875, 71.80562265])}

## Save Activations

In [22]:
np.save(balancedTestSetPath + 'encExt.npy',encExt)
np.save(balancedTestSetPath + 'encInt.npy',encBal)
np.save(balancedTestSetPath + 'encExtInt.npy',encExtInt)
np.save(balancedTestSetPath + 'encRand.npy',actRand)
np.save(balancedTestSetPath + 'encAE.npy',encAE)
np.save(balancedTestSetPath + 'encC.npy',actClass)
np.save(balancedTestSetPath + 'outClass.npy',outClass)
np.save(balancedTestSetPath + 'labels.npy',y)
# adapt this path.
figurePath = './Results/TowerTraining/Figures/AgentRewardComparisonsAdaTH/ActivationPatternsNormx2No-01-FlatC-50-50Test/'
np.save(figurePath+'classifier_stats.npy',classifier_stats)