# Alert correlation with a Long Short-Term Memmroy (LSTM) Recurrent Neural Network(RNN) and cosine similarity

Copyright (C) Egon Kidmose 2015-2017

This file is part of lstm-rnn-correlation.

lstm-rnn-correlation is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

lstm-rnn-correlation is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with lstm-rnn-correlation. If not, see
<http://www.gnu.org/licenses/>.


In [None]:
import lasagne
from lasagne.layers import *
import theano
import theano.tensor as T
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline 
%matplotlib nbagg

NUM_INPUTS = 2**7 # Size of ASCII
NUM_UNITS_ENC = 10

# For testing
TEST_ALERT = 'abcd'
TEST_BATCH = [[ord(c) for c in TEST_ALERT]]*5
TEST_SHAPE = (5, 4)
X_ALERT = T.imatrix()


In [None]:
# Input layers - does nothing
l_in_1 = InputLayer((None, None))
l_in_2 = InputLayer((None, None))

test_res = lasagne.layers.get_output(l_in_1, inputs={l_in_1: X_ALERT}).eval(
    {X_ALERT: TEST_BATCH})
assert (test_res == TEST_BATCH).all(), "Unexpected output"

print(test_res)

In [None]:
# Embedding layers - applies one hot encoding
l_emb_1 = EmbeddingLayer(l_in_1, NUM_INPUTS, NUM_INPUTS, 
                         W=np.eye(NUM_INPUTS,dtype='float32'),
                         name='Embedding 1')
l_emb_2 = EmbeddingLayer(l_in_2, NUM_INPUTS, NUM_INPUTS, 
                         W=np.eye(NUM_INPUTS,dtype='float32'),
                         name='Embedding 2')
l_emb_1.params[l_emb_1.W].remove('trainable') # Fix weight
l_emb_2.params[l_emb_2.W].remove('trainable') # Fix weight

# Test
test_res = lasagne.layers.get_output(l_emb_1, inputs={l_in_1: X_ALERT}).eval(
    {X_ALERT: TEST_BATCH})
assert (np.argmax(test_res, axis=2) == TEST_BATCH).all()
assert np.all(test_res.shape == (TEST_SHAPE[0], TEST_SHAPE[1], NUM_INPUTS))

print(np.argmax(test_res, axis=2))

print(test_res.shape)

In [None]:
# LSTM layers - two LSTM layers sharing weights etc. 
# Gates are to be shared between the two input paths 
# - they parse inputs from the same source after all.

# Instantiate gates acc. to lasagne.layers.LSTMLayer defaults
ingate = lasagne.layers.Gate()
forgetgate = lasagne.layers.Gate()
cell = lasagne.layers.Gate(
    W_cell=None, 
    nonlinearity=lasagne.nonlinearities.tanh #
)
outgate = lasagne.layers.Gate()

l_enc_1 = LSTMLayer(l_emb_1, 
                    num_units=NUM_UNITS_ENC,
                    ingate=ingate, forgetgate=forgetgate,
                    cell=cell, outgate=outgate,
                    name='LSTM 1',
                   )
l_enc_2 = LSTMLayer(l_emb_2, 
                    num_units=NUM_UNITS_ENC,
                    ingate=ingate, forgetgate=forgetgate,
                    cell=cell, outgate=outgate,
                    name='LSTM 2',
                   )

# Test 
test_res_1 = lasagne.layers.get_output(l_enc_1, inputs={l_in_1: X_ALERT}).eval(
    {X_ALERT: TEST_BATCH})
test_res_2 = lasagne.layers.get_output(l_enc_2, inputs={l_in_2: X_ALERT}).eval(
    {X_ALERT: TEST_BATCH})
assert test_res_1.shape == (TEST_SHAPE[0], TEST_SHAPE[1], NUM_UNITS_ENC), "Unexpected dimensions"
# TODO: For some reason the above fails to tie the weights together, so the below line fails.
# As a work arround we'll just supply all input pairs in swapped order
# assert np.all(test_res_1 == test_res_2) , "The two inputs lines differ"



In [None]:
# Slice Layer
# Pick the outputs for the last entry in the sequences. 

l_last_1 = lasagne.layers.SliceLayer(l_enc_1, indices=-1, axis=1)
l_last_2 = lasagne.layers.SliceLayer(l_enc_2, indices=-1, axis=1)

# Test
test_res_1 = lasagne.layers.get_output(l_last_1, inputs={l_in_1: X_ALERT}).eval(
    {X_ALERT: TEST_BATCH})
assert test_res_1.shape == (TEST_SHAPE[0], NUM_UNITS_ENC)


In [None]:
# Cosine layer definition
class CosineSimilarityLayer(MergeLayer):
    """Calculates the cosine of two inputs."""
    def __init__(self, incoming1, incoming2, **kwargs):
        """Instantiates the layer with incoming1 and incoming2 as the inputs."""
        incomings = [incoming1, incoming2]
        
        for incoming in incomings:
            if isinstance(incoming, tuple):
                if len(incoming) != 2:
                    raise NotImplementedError("Requires shape to be exactly (BATCH_SIZE, N).")
            elif len(incoming.output_shape) != 2:
                raise NotImplementedError("Requires shape to be exactly (BATCH_SIZE, N).")
                
        super(CosineSimilarityLayer, self).__init__(incomings, **kwargs)
    
    def get_output_shape_for(self, input_shapes):
        """Return output shape: (batch_size, 1)."""
        if len(input_shapes) != 2:
            raise ValueError("Requires exactly 2 input_shapes")

        for input_shape in input_shapes:
            if len(input_shape) != 2:
                raise NotImplementedError("Requires shape to be exactly (BATCH_SIZE, N).")

        return (input_shape[0],)
    
    def get_output_for(self, inputs, **kwargs):
        """Calculates the cosine similarity."""
        nominator = (inputs[0] * inputs[1]).sum(axis=1)
        return nominator
        denominator = T.sqrt((inputs[0]**2).sum(axis=1)) * T.sqrt((inputs[1]**2).sum(axis=1))
        return nominator/denominator
        
# Test
test_in_1 = InputLayer((None, None))
test_in_2 = InputLayer((None, None))
test_layer = CosineSimilarityLayer(test_in_1, test_in_2)
in1, in2 = T.dmatrices('in1', 'in2')

test_res = lasagne.layers.get_output(test_layer, inputs={
        test_in_1: in1,
        test_in_2: in2
    }).eval({
        in1: [[0, 1], [1, 0], [0, -1]],
        in2: [[0, 1], [0, 1], [0, 1]],
    })
assert len(test_res.shape) == len(test_layer.output_shape), "Dimension mismatch"
assert (test_res == [ 1.,  0., -1.]).all(), "Invalid output"

In [None]:
# Loss

In [None]:
# Train