# Long Short-Term Memory - Sample Implementation and Application

In [1]:
# all imports for the whole notebook
import string
import random
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

## Overview

This tutorial will provide an overview over the concept of LSTMs by providing a Python implementation that does not rely on any framework specifically for neural networks. Thereby all necessary code to implement the LSTM-Cells will be shown. In order to get insight into how LSTMs work internally, a small network will be trained on a simple example task.

## Task description

The task for this LSTM network will be a simple sequence prediction task. Elements of the sequence will be letters from a subset of the alphabet. The sequences are constructed as follows:

1. They start with a small sequence of random characters with varying length.
2. Then there is a marker letter, that will not be used anywhere else in the whole sequence.
3. A random letter follows, which will be called **target**.
4. Another larger random sequence with varying length follows.
5. After this, there is a second marker letter, that will also not be used anywhere else in the whole sequence.
6. The final character is again the **target** letter.

In order to learn the task, the network must recognize the marker letters in order to open/close the gates of the memory cells appropriately. The **target** letter then has to be stored in the network until the second marker appeares. Overall performance of the task can be measured by calculating the average prediction error for the last character in every sequence.

## Data generation

At first we have to setup the general environment for the task, like the list of letters to be used, the markers, and similar initializations.

In [29]:
# marker letters
mark1 = 'a'
mark2 = 'b'

# nums
num_random_letters = 2
num_letters = num_random_letters + 2

# random letters
all_letters = list(string.ascii_lowercase)
letters = all_letters[2:num_letters]

print("Markers:", [mark1, mark2])
print("Letters:", letters)

Markers: ['a', 'b']
Letters: ['c', 'd']


Following we need a function that can sample us a list of letters with length in a specific range.

In [30]:
def randomSequenceSample(minlength, maxlength):
    length = random.randint(minlength, maxlength)
    for _ in range(length):
        rand_idx = random.randint(0, len(letters)-1)
        yield letters[rand_idx]

Lets look at some random samples:

In [31]:
for i in range(5):
    print("Test sample", i+1, ":", list(randomSequenceSample(1, 15)))

Test sample 1 : ['c', 'd', 'c', 'd', 'd', 'c', 'd', 'c', 'c', 'c', 'c', 'd']
Test sample 2 : ['d', 'd', 'c', 'c', 'd', 'd', 'c', 'c', 'c', 'd', 'c', 'd', 'd', 'd']
Test sample 3 : ['c', 'd', 'd', 'd', 'd', 'c', 'c', 'c', 'c', 'c']
Test sample 4 : ['c', 'd']
Test sample 5 : ['d', 'd', 'd']


Now we need to put those random sequence samples together with the markers and target letters in order to get a problem sequence as decribed above.

In [32]:
def generateSequence(premin, premax, midmin, midmax):
    target_i = random.randint(0, len(letters)-1)
    target = letters[target_i]
    pre = list(randomSequenceSample(premin, premax))
    mid = list(randomSequenceSample(midmin, midmax))
    return pre + [mark1, target] + mid + [mark2, target]

Lets look at a few sequences:

In [33]:
for i in range(5):
    print("Sequence test", i+1, ":", generateSequence(0,3,1,5))

Sequence test 1 : ['c', 'c', 'c', 'a', 'c', 'd', 'd', 'b', 'c']
Sequence test 2 : ['c', 'c', 'd', 'a', 'c', 'c', 'b', 'c']
Sequence test 3 : ['c', 'a', 'c', 'c', 'd', 'c', 'b', 'c']
Sequence test 4 : ['c', 'c', 'c', 'a', 'c', 'c', 'c', 'c', 'b', 'c']
Sequence test 5 : ['a', 'd', 'd', 'd', 'd', 'c', 'c', 'b', 'd']


In [34]:
def formatSequenceToPrint(s):
    new_s = []
    check_next = False
    for item in s:
        if item == mark1 or item == mark2:
            item = '_'
            check_next = True
        elif check_next == True:
            item = item.upper()
            check_next = False
        new_s.append(item)
    return new_s

In [35]:
for i in range(5):
    print("Sequence test", i+1, ":", formatSequenceToPrint(generateSequence(0,3,1,5)))

Sequence test 1 : ['_', 'C', 'c', 'c', 'c', 'd', '_', 'C']
Sequence test 2 : ['d', 'd', '_', 'C', 'c', 'd', 'c', 'c', '_', 'C']
Sequence test 3 : ['d', 'c', '_', 'D', 'd', 'c', 'd', '_', 'D']
Sequence test 4 : ['d', 'c', 'd', '_', 'D', 'c', 'c', 'c', 'c', 'd', '_', 'D']
Sequence test 5 : ['d', '_', 'D', 'c', 'c', 'c', 'd', '_', 'D']


In [36]:
def letterToCategorical(l):
    lst = [0.0] * num_letters
    lst[all_letters.index(l)] = 1.0
    return lst

In [39]:
print("Categorical of 'a':", letterToCategorical("a"))
print("Categorical of 'e':", letterToCategorical("c"))

Categorical of 'a': [1.0, 0.0, 0.0, 0.0]
Categorical of 'e': [0.0, 0.0, 1.0, 0.0]


In [40]:
def sequenceToCategoricalNumpy(seq):
    new_s = []
    for letter in seq:
        new_s.append(letterToCategorical(letter))
    return np.array(new_s).T

In [41]:
print("Numpy categorical matrix of ['a', 'b', 'c']:")
print(sequenceToCategoricalNumpy(list("abc")))

Numpy categorical matrix of ['a', 'b', 'c']:
[[ 1.  0.  0.]
 [ 0.  1.  0.]
 [ 0.  0.  1.]
 [ 0.  0.  0.]]


In [42]:
def catSequenceToInputOutputLists(seq):
    inpList = []
    outList = []
    for i in range(seq.shape[1] - 1):
        inpList.append(seq[:,i,None])
        outList.append(seq[:,i+1,None])
    return (inpList, outList)

In [43]:
def inpOutSequenceGenerator(n, premin, premax, midmin, midmax):
    for i in range(n):
        seq = generateSequence(premin, premax, midmin, midmax)
        catSeq = sequenceToCategoricalNumpy(seq)
        yield catSequenceToInputOutputLists(catSeq)

## LSTM Basics

In [44]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))
def sigmoid_deriv(x):
    return sigmoid(x)*(1 - sigmoid(x))

In [45]:
class Gate:
    def __init__(self, inpDim, bias):
        self.inpDim = inpDim
        self.W = np.random.rand(1, self.inpDim) * 0.2 - 0.1
        self.deltaW = np.zeros(self.W.shape)
        self.W[0,0] = bias
        self.f = sigmoid
        self.f_deriv = sigmoid_deriv
        
    def forward(self, inp):
        self.inp = inp
        self.netInp = self.W @ inp
        self.y = self.f(self.netInp)
        return self.y
    
    def update(self):
        self.W += self.deltaW
        self.deltaW = np.zeros(self.W.shape)

In [46]:
class OutGate(Gate):
    def backward(self, error, learningRate):
        self.grad = self.f_deriv(self.netInp) * error
        self.deltaW += learningRate * (self.grad @ self.inp.T)

In [47]:
class InpGate(Gate):
    def backward(self, grad, learningRate):
        self.deltaW += learningRate * grad

In [48]:
class ForgetGate(Gate):
    def backward(self, grad, learningRate):
        self.deltaW += learningRate * grad

In [84]:
class LSTMCell:
    def __init__(self, inpDim):
        self.inpDim = inpDim
        
        # 1. Initialize cell specific parts
        
        self.state = np.array([[0.0]])
        self.W = np.random.rand(1, self.inpDim) * 0.2 - 0.1
        self.deltaW = np.zeros(self.W.shape)
        self.g = sigmoid
        self.g_deriv = sigmoid_deriv
        self.y = np.array([[0.0]])
        
        # 2. Initialize gates
        
        # Gate input is one larger than cell input, because of the peephole connections.
        gateDim = self.inpDim + 1
        # Biases for the gates were taken from the papers. They proved most successfull.
        inpBias = 0.0
        forgetBias = -2.0
        outBias = 2.0
        
        # now create the objects
        self.inpGate = InpGate(gateDim, inpBias)
        self.forgetGate = ForgetGate(gateDim, forgetBias)
        self.outGate = OutGate(gateDim, outBias)
        
        # 3. Initialize derivatives
        
        self.stateDerivWRTCellWeights = np.zeros(self.W.shape)
        self.stateDerivWRTInpGateWeights = np.zeros(self.inpGate.W.shape)
        self.stateDerivWRTForgetGateWeights = np.zeros(self.forgetGate.W.shape)
        
    def forward(self, inp):
        
        self.inp = inp
        self.netInp = self.W @ inp
        
        inpWPrevPeep = np.append(inp, self.state, axis = 0)
        self.inpGate.forward(inpWPrevPeep)
        self.forgetGate.forward(inpWPrevPeep)
        
        # update derivatives
        self.stateDerivWRTCellWeights *= self.forgetGate.y
        self.stateDerivWRTCellWeights += self.g_deriv(self.netInp) * self.inpGate.y * inp.T
        
        self.stateDerivWRTInpGateWeights *= self.forgetGate.y
        self.stateDerivWRTInpGateWeights += self.g(self.netInp) * self.inpGate.f_deriv(self.inpGate.netInp) * inpWPrevPeep.T
        
        self.stateDerivWRTForgetGateWeights *= self.forgetGate.y
        self.stateDerivWRTForgetGateWeights += self.state * self.forgetGate.f_deriv(self.forgetGate.netInp) * inpWPrevPeep.T
        
        
        self.state = self.forgetGate.y * self.state + self.inpGate.y * self.g(self.netInp)
        
        inpWPostPeep = np.append(inp, self.state, axis = 0)
        self.outGate.forward(inpWPostPeep)
        
        self.y = self.outGate.y * self.state
        
        return self.y
    
    def backward(self, error, learningRate):
        
        outGateError = self.state * error
        self.outGate.backward(outGateError, learningRate)
        
        internalError = self.outGate.y * error
        
        self.deltaW += learningRate * internalError * self.stateDerivWRTCellWeights
        
        inpGateError = internalError * self.stateDerivWRTInpGateWeights
        self.inpGate.backward(inpGateError, learningRate)
        
        forgetGateError = internalError * self.stateDerivWRTForgetGateWeights
        self.forgetGate.backward(forgetGateError, learningRate)
        
    def update(self):
        self.W += self.deltaW
        self.deltaW = np.zeros(self.W.shape)
        
        self.outGate.update()
        self.inpGate.update()
        self.forgetGate.update()
    
    def reset(self):
        self.state = np.array([[0.0]])
        self.stateDerivWRTCellWeights = np.zeros(self.W.shape)
        self.stateDerivWRTInpGateWeights = np.zeros(self.inpGate.W.shape)
        self.stateDerivWRTForgetGateWeights = np.zeros(self.forgetGate.W.shape)
        
        
        
        

In [85]:
class OutputLayer:
    def __init__(self, inpDim, outDim):
        self.inpDim = inpDim
        self.outDim = outDim
        
        self.W = np.random.rand(self.outDim, self.inpDim) * 0.2 - 0.1
        self.deltaW = np.zeros(self.W.shape)
        self.f = sigmoid
        self.f_deriv = sigmoid_deriv
        
    def forward(self, inp):
        self.inp = inp
        self.netInp = self.W @ inp
        self.y = self.f(self.netInp)
        return self.y
    
    def backward(self, target, learningRate):
        #self.error = np.square(self.y - target)
        self.error = self.y - target
        self.grad = self.f_deriv(self.netInp) * self.error
        self.deltaW += learningRate * (self.grad @ self.inp.T)
    
    def update(self):
        self.W += self.deltaW
        self.deltaW = np.zeros(self.W.shape)
        

In [86]:
class LSTMNetwork:
    def __init__(self, inpDim, outDim, n_cells):
        self.inpDim = inpDim
        self.outDim = outDim
        self.n_cells = n_cells
        
        self.biasUnit = np.array([[1]])
        self.inpAndHiddenDimWBias = self.inpDim + n_cells + 1
        self.n_cellsWBias = 1 + self.n_cells
        
        self.cells = [LSTMCell(self.inpAndHiddenDimWBias) for _ in range(self.n_cells)]
        self.outLayer = OutputLayer(self.n_cellsWBias, self.outDim)
        
    def forward(self, inp):
        inpAndHidden = np.append(inp, self.hiddenLayerState(), axis = 0)
        inpAndHiddenWBias = np.append(inpAndHidden, self.biasUnit, axis = 0)
        
        for cell in self.cells:
            cell.forward(inpAndHiddenWBias)
            
        stateWBias = np.append(self.hiddenLayerState(), self.biasUnit, axis = 0)
        
        self.outLayer.forward(stateWBias)
        
        return self.outLayer.y
    
    def hiddenLayerState(self):
        return np.array([[cell.y[0,0] for cell in self.cells]]).T
    
    def cellStates(self):
        return np.array([[cell.state[0,0] for cell in self.cells]]).T
        
    def backward(self, target, learningRate):
        self.outLayer.backward(target, learningRate)
        self.error = np.sum(np.square(self.outLayer.y - target))
        
        outErrors = self.outLayer.W.T @ self.outLayer.grad
        outErrorsWOBias = outErrors[:-1]
        
        for idx, cell in enumerate(self.cells):
            cell.backward(outErrorsWOBias[idx], learningRate)
        
    def update(self):
        self.outLayer.update()
        for cell in self.cells:
            cell.update()
    
    def resetCells(self):
        for cell in self.cells:
            cell.reset()
            
    def train(self, inpOutList, batchSize, learningRate):
        for sample_i, (inpSeq, outSeq) in enumerate(inpOutList):
            self.resetCells()
            for i in range(len(inpSeq)):
                inp = inpSeq[i]
                out = outSeq[i]
                self.forward(inp)
                self.backward(out, learningRate)
                if (i+1)%batchSize == 0 or i == len(inpSeq) - 1:
                    self.update()
            
                
            
            
        
        

In [87]:
n = LSTMNetwork(num_letters,num_letters,40)

In [88]:
inpOutList = inpOutSequenceGenerator(500,0,0,0,0)
n.train(inpOutList, batchSize = 1, learningRate = 0.1)

In [89]:
n.outLayer.y

array([[ 0.99991166],
       [ 0.9997975 ],
       [ 0.99984277],
       [ 0.99981824]])