In [None]:
import string
import numpy as np
import matplotlib.pyplot as plt

# Keras imports
from keras.layers import multiply, Input, Dense, Permute, Reshape, Flatten
from keras.layers.recurrent import LSTM
from keras.models import Model
from keras.callbacks import ModelCheckpoint
from keras.utils import to_categorical
import keras.backend as K

# 2019-02-13 Ruggero detector

In this notebook I want to explore the idea that an AI can recognize a specific word in a context, and that we can have a hint of what is going on in its ganglia by applying an attention mechanism to it.

The target of this notebook is to build a Ruggero detector. I will create random sequences of letters and will put the word "Ruggero" (with noise) inside of them or not. Then I will say to the AI that some of the sequences contain the word and some not, without specifying where the word is. It would be really cool if the AI could tell me that it aroused its attention where the word "Ruggero" was found.

## Building the data set

The first thing I want to do is to create a sequence of random letters, some with "Ruggero" and some without.

First, let's build an alphabet and an encoding of the alphabet.

In [None]:
# let's limit ourselves to only lowercase characters
letters = string.ascii_lowercase
nletters = len(letters)
alpha_to_n = {letters[i] : i for i in range(nletters)}
n_to_alpha = {i : letters[i] for i in range(nletters)}

Then, let's build a function that can encode a sequence of letters to a sequence of numbers - given the mapping - and another function that can do the opposite: decode a sequence of numbers to a sequence of letters.

In [None]:
def encode_sentence(sentence_alpha, alpha_to_n) :
    return [alpha_to_n[sentence_alpha[i]] for i in range(len(sentence_alpha))]

def decode_sentence(sentence_n, n_to_alpha) :
    return ''.join([n_to_alpha[sentence_n[i]] for i in range(len(sentence_n))])

Now, let's build a function that generate random sequences of letters (sentences), giving the option that they may or not contain a given target. We will also give the option of adding noise to the creation of the sentences that contain the target.

In [None]:
# let's then build a function that can generate a random sequence with or
# without a target sequence (Ruggero)
def generate_sentences(nsentences, sentence_length, alpha_to_n,
                       seed = None, with_target = False, target = 'ruggero', noise = None) :
    
    # get length of the alphabet
    nletters = len(alpha_to_n)
    
    # init random number generator if seed is given
    if seed is not None :
        np.random.seed(seed)
    
    # generate the sentences
    sentences = np.random.randint(low=0, high=nletters, size=(nsentences, sentence_length))
    
    # if the user doesn't require the sentences to include the target, we're done,
    # otherwise we have to include the target at some location
    if with_target :
        
        # get the encoding of the target
        target_encoding = encode_sentence(target, alpha_to_n)
        
        # generate a list of integers that will specify at which location in the sentence
        # the target will be inserted
        target_length = len(target)
        target_location = np.random.randint(low=0, high=sentence_length-target_length+1,
                                           size=nsentences)
        for i, loc in enumerate(target_location) :
            sentences[i, loc:loc+target_length] = target_encoding
        
        # now for the noise part: if user requested noise, then we add it
        if noise is not None :
            
            # first, in this case we generate a list of integers that represent
            # the numbers of letters of the target that we want to mutate. This
            # will be drawn from a uniform distribution between 0 and noise, where
            # noise is passed by the user
            mutation_size = np.random.randint(low = 0, high = noise, size=nsentences)
            
            # we the proceed to the mutation
            for i in range(nsentences) :
                
                # we remember where was the target in this sentence
                loc = target_location[i]
                
                # we get the number of letters to mutate
                n = mutation_size[i]
                
                # we pick n letters at random in the target
                mutation_locations = np.random.randint(low=0, high=target_length, size=n)
                mutations = np.random.randint(low=0, high=nletters, size=n)
                
                # we then do the mutation
                sentences[i, loc+mutation_locations] = mutations
                
    # at the end, return the generated sentences
    # return sentences, mutation_size
    return sentences

The cell below is to test whether this works or not.

In [None]:
sentences = generate_sentences(10, 20, alpha_to_n,
                                              seed=8958574, with_target = True, noise=4.0)
for i in range(len(sentences)) :
    s = sentences[i]
    print(s, decode_sentence(s, n_to_alpha))

Okay, now that we have a system to generate noisy sequences that contain the target or not, we can proceed with the next step, which is to generate the data set for training, validation, and testing.

In [None]:
def shuffle_data(data, targets) :
    """
    Takes a nsentences x nwords "data" array and a nsentences-long "targets"
    array and performs a random permutation of the order, preserving the correspondence
    between the row index of the data and the row index of the targets.
    """
    N = data.shape[0]
    perm = np.random.choice(N, size=N, replace=False)
    return data[perm], targets[perm]

In [None]:
def prepare_data(sentence_length, ntrain, nvalid, ntest, alpha_to_n, noise=None) :
    N = ntrain + nvalid + ntest
    
    # generate N sentences without the target
    sentences_without = generate_sentences(N, sentence_length, alpha_to_n,
                                           with_target = False)
    targets_without = np.zeros(N, dtype=np.int32)
    
    # generate N sentences with the target
    sentences_with = generate_sentences(N, sentence_length, alpha_to_n,
                                           with_target = True, noise = noise)
    targets_with = np.ones(N, dtype=np.int32)
    
    # now stack everything and shuffle
    data = np.vstack((sentences_without, sentences_with))
    targets = np.concatenate((targets_without, targets_with))
    
    # shuffle the data
    data, targets = shuffle_data(data, targets)
    
    # now we partition data and targets into train, valid, and test sets
    train_data = data[:2*ntrain, :]
    train_targets = targets[:2*ntrain]
    valid_data = data[2*ntrain:2*(ntrain+nvalid), :]
    valid_targets = targets[2*ntrain:2*(ntrain+nvalid)]
    test_data = data[2*(ntrain+nvalid):, :]
    test_targets = targets[2*(ntrain+nvalid):]
    
    # one further step is required, then return
    train_data = np.expand_dims(train_data, axis = 2)
    valid_data = np.expand_dims(valid_data, axis = 2)
    test_data = np.expand_dims(test_data, axis = 2)
    return train_data, train_targets,\
           valid_data, valid_targets,\
           test_data, test_targets

Let's generate a small data set to see whether this works.

In [None]:
ntrain = 10
nvalid = 5
ntest = 2
sentence_length = 20
train_data, train_targets,\
valid_data, valid_targets,\
test_data, test_targets = prepare_data(sentence_length, ntrain, nvalid, ntest, alpha_to_n)

In [None]:
for i, s in enumerate(np.squeeze(valid_data, axis=-1)) :
    print(s, decode_sentence(s, n_to_alpha), valid_targets[i])

Okay, this works. Now we can generate the full data set.

In [None]:
ntrain = 10000
nvalid = 2000
ntest = 2000
sentence_length = 80
train_data, train_targets,\
valid_data, valid_targets,\
test_data, test_targets = prepare_data(sentence_length, ntrain, nvalid, ntest, alpha_to_n)

We're ready now to build the AI.

## Building the AI

We now want to build an AI architecture that has an attention mechanism. This bit of code I will get from the nice github repository github.com/philipperemy/keras-attention-mechanism. As explained there, there are two kinds of architectures one can try: applying the attention before and after the LSTM. Let's try both.

First, I'll steal the "attention" function from that repository.

In [None]:
def attention_3d_block(inputs, time_steps):
    input_dim = int(inputs.shape[2])
    a = Permute((2, 1))(inputs)
    a = Dense(time_steps, activation='softmax')(a)
    a_probs = Permute((2, 1), name='attention_vec')(a)
    output_attention_mul = multiply([inputs, a_probs], name='attention_mul')
    return output_attention_mul

The data that we will feed to the AI will be in the format of a matrix that has dimensions (sentence_length * nletters). Each of the entries (i,j) of the matrix will be 0 or 1 depending on whether the i-th letter of the sentence is letter j or not. We will prepare the data set in this way by using the "to_categorical" function from the Keras utils. It will be convenient to keep the original data in order to print the results along the way.

In [None]:
# transform the data to categorical
train_data_cat = to_categorical(train_data)
valid_data_cat = to_categorical(valid_data)
test_data_cat = to_categorical(test_data)

### Attention after LSTM

In this first block we will try to build an AI with an attention block after the LSTM unit. We will then try to analyze the results by looking into the AI's attention layer and looking at the results.

In [None]:
# input layer
inputs = Input(shape=(sentence_length, nletters, ))

# LSTM layer
lstm_units = 32
lstm_out = LSTM(lstm_units, return_sequences=True)(inputs)

# attention layer
attention_mul = attention_3d_block(lstm_out, sentence_length)
attention_mul = Flatten()(attention_mul)

# output layer
output = Dense(1, activation='sigmoid')(attention_mul)

# put everything together
m_after = Model(inputs=[inputs], outputs=output)

# compile
m_after.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# define checkpointer and fit the model
checkpointer = ModelCheckpoint(filepath='../data/ruggero-detector-after.hdf5', 
                               verbose=1, save_best_only=True)
m_after.fit(train_data_cat, train_targets,
          batch_size=32,
          epochs=2,
          validation_data=(valid_data_cat, valid_targets),
          callbacks=[checkpointer])

In [None]:
# let's evaluate the model's performance on the test data set
score, acc = m_after.evaluate(test_data_cat, test_targets,
                            batch_size=32,
                            verbose=2)
print('Test score:', score)
print('Test accuracy:', acc)

Okay, so the model that has 32 LSTM units almost perfectly is able to discriminate sentences that contain or not contain the target.

Now we want to go to the question of how this is done. The following function is taken directly from the github repository mentioned earlier.

In [None]:
def get_activations(model, inputs, layer_name=None):
    # fetch the outputs of 
    if layer_name is None:
        # all layers with given name
        outputs = [layer.output for layer in model.layers]
    else:
        # all layer outputs
        outputs = [layer.output for layer in model.layers if layer.name == layer_name]
        
    # evaluation functions
    inp = model.input
    funcs = [K.function([inp] + [K.learning_phase()], [out]) for out in outputs]
    
    # return the evaluation functions evaluated on the inputs
    return [func([inputs, 1.])[0] for func in funcs]

We now select a single test sequence, get the attention matrix associated to it, and print the target, the model's predictions, and the sequence itself.

In [None]:
i = 11

# get the attention matrix
attention_matrix = get_activations(m_after,
                                   np.expand_dims(test_data_cat[i], axis=0),
                                   layer_name='attention_vec')[0].squeeze().transpose()

# print model's prediction and the actual target
target = test_targets[i]
test_sentence = decode_sentence(test_data[i].squeeze(), n_to_alpha)
print(target)
print(m_after.predict(np.expand_dims(test_data_cat[i], axis=0)))

# plot the activation matrix
fig, ax = plt.subplots(1, 1, figsize = (20,10))
cax = ax.imshow(attention_matrix, aspect = 'auto', cmap=plt.cm.Oranges)
plt.colorbar(cax)
plt.xticks(np.arange(sentence_length), test_sentence)
plt.show()

No matter how hard I try extracting information from this network, nothing really meaningful comes out.

### Attention before LSTM

In [None]:
# input layer
inputs = Input(shape=(sentence_length, nletters, ))

# attention layer
attention_mul = attention_3d_block(inputs, sentence_length)

# LSTM layer
lstm_units = 32
lstm_out = LSTM(lstm_units, return_sequences=False)(attention_mul)

# output layer
output = Dense(1, activation='sigmoid')(lstm_out)

# put everything together
m_before = Model(inputs=[inputs], outputs=output)

# compile
m_before.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

checkpointer = ModelCheckpoint(filepath='../data/ruggero-detector-before.hdf5', 
                               verbose=1, save_best_only=True)
m_before.fit(train_data_cat, train_targets,
          batch_size=32,
          epochs=10,
          validation_data=(valid_data_cat, valid_targets),
          callbacks=[checkpointer])

In [None]:
score, acc = m_before.evaluate(test_data_cat, test_targets,
                            batch_size=32,
                            verbose=2)
print('Test score:', score)
print('Test accuracy:', acc)

In [None]:
i = 108
attention_matrix = get_activations(m_before,
                                   np.expand_dims(test_data_cat[i], axis=0),
                                   layer_name='attention_vec')[0].squeeze().transpose()
target = test_targets[i]
test_sentence = decode_sentence(test_data[i].squeeze(), n_to_alpha)
print(target)
print(m_before.predict(np.expand_dims(test_data_cat[i], axis=0)))
fig = plt.figure(figsize = (10,3))
plt.bar(np.arange(sentence_length), attention_matrix.mean(axis=0))
plt.xticks(np.arange(sentence_length), test_sentence)
plt.show()

# Epilogue

So in the end I come to a few conclusions.

1. The architecture of putting an attention layer after an LSTM works fantastically well as a Ruggero-detector. However, I found it impossible to extract meaning out of it by inspecting the attention layer.

2. The architecture of putting the attention before the LSTM is much less efficient as a Ruggero-detector, and produces even more puzzling results in