# TD Learning Using Neural Networks

In this notebook, we will perform TD Learning on a one dimensional maze very similarly to the previous TD Learning notebook. However, we will no longer be using a table. Rather, we will replace this table with a neural network. Since neural networks can approximate functions, we will use one to approximate the table that would be used otherwise. With small, simple mazes, the benefits may not be immediately recognizable. However with more complex scenarios, this can save a great amount of space. There is an added benefit of not having to deal with complex tables.

We will be using _Holographic Reduced Representations_ to encode information for the neural network. This is able to represent the states of the maze, as well as things such as colored signals, and allow them to be combined in meaninful ways. The following class written by Dr. Phillips produces these _HRR_'s:

##### HRR Class

In [16]:
# %load hrr.py
#!/usr/bin/env python

import numpy
import shelve
import os
import glob
import gc
import itertools

def hrr(length,normalized=False):
    "Creates Holographic Reduced Representations"

    if normalized:
        x = numpy.random.uniform(-numpy.pi,numpy.pi,int((length-1)/2))
        if length % 2:
            x = numpy.real(numpy.fft.ifft(numpy.concatenate([numpy.ones(1),numpy.exp(1j*x),numpy.exp(-1j*x[::-1])])))
        else:
            x = numpy.real(numpy.fft.ifft(numpy.concatenate([numpy.ones(1),numpy.exp(1j*x),numpy.ones(1),numpy.exp(-1j*x[::-1])])))
    else:
        x = numpy.random.normal(0.0,1.0/numpy.sqrt(length),length)
        
    return x

def hrri(length):
    "Returns the identity vector for N-length HRRs."
    return numpy.concatenate([numpy.ones(1),numpy.zeros(length-1)])

def hrrs(length,n=1,normalized=False):
    "Creates a matrix of n HRRs, one per row."
    return numpy.row_stack([hrr(length,normalized) for x in range(n)])

def inv(x):
    "Computes the -exact- inverse of an HRR."
    x = numpy.fft.fft(x)
    return numpy.real(numpy.fft.ifft((1.0/numpy.abs(x))*numpy.exp(-1j*numpy.angle(x))))

def pinv(x):
    "Computes the pseudo-inverse of an HRR."
    return numpy.real(numpy.fft.ifft(numpy.conj(numpy.fft.fft(x))))

def convolve(x,y):
    "Computes the circular convolution of two HRRs."
    return numpy.real(numpy.fft.ifft(numpy.fft.fft(x)*numpy.fft.fft(y)))

def mconvolve(x):
    "Computes the circular convolution of a matrix of HRRs."
    return numpy.real(numpy.fft.ifft(numpy.apply_along_axis(numpy.prod,0,numpy.apply_along_axis(numpy.fft.fft,1,x))))
    
def oconvolve(x,y):
    "Computes the convolution of all pairs of HRRs in the x and y matrices."
    return numpy.row_stack([ [convolve(x[i,:],y[j,:]) for i in range(x.shape[0]) ] for j in range(y.shape[0]) ])

def correlate(x,y,invf=pinv):
    "Computes the correlation of the two provided HRRs."
    return convolve(x,invf(y))

def compose(x,y):
    "Composes two HRRs using addition in angle space. (in-progress)"
    x = numpy.fft.fft(x)
    y = numpy.fft.fft(y)
    return numpy.real(numpy.fft.ifft((x+y)/2.0))

def mcompose(x):
    "Composes a matrix of HRRs, one per row, using addition in angle space. (in-progress)"
    x = numpy.apply_along_axis(numpy.fft.fft,1,x)
    return numpy.real(numpy.fft.ifft(numpy.apply_along_axis(numpy.mean,0,x)))

def decompose(x,y):
    "Decomposes two HRRs using addition in angle space. (in-progress)"
    return compose(x,-y)

def pow(x,k):
    "Compute the convolutive power of an HRR."
    ## Original method only allowed for unitary vectors to
    ## have convolutive powers since magnitudes could not
    ## be kept in check for non-unitary vectors...
    ## return numpy.real(numpy.fft.ifft(numpy.power(numpy.fft.fft(x),k)))
    ## New version, allows for non-unitary vectors to
    ## have convolutive powers, but there is still an
    ## inversion problem since x and sqrt(x^2) are not
    ## the same vector...
    x = numpy.fft.fft(x)
    return numpy.real(numpy.fft.ifft(numpy.abs(x)*numpy.exp(1j*numpy.angle(numpy.power(x,k)))))

class LTM:
    "Long-term Memory"
    store = None
    ## Note - this is linux-specific
    tmpdir = "/dev/shm/"
    ## Might want to choose something else in the future...
    
    ## Default HRR size
    N = 1024
    normalized = False
    
    def __init__(self,N=1024,normalized=False):
        self.store = shelve.open(self.tmpdir + "hrr_ltm_" + str(os.getpid()) + "_" + str(id(self)))
        self.N = N
        self.normalized = normalized
        self.store[""] = hrri(self.N)
        
    def __del__(self):
        if (self.store is not None):
            self.store.close()
            for f in glob.glob(self.tmpdir + "hrr_ltm_" + str(os.getpid()) + "_" + str(id(self)) + ".*"):
                os.remove(f)
                
    def lookup(self,q):
        "Lookup a single symbol, encode it if necessary"
        if q in self.store:
            return self.store[q]
        else:
            self.store[q] = hrr(self.N,self.normalized)
            return self.store[q]
    
    def encode(self,q):
        "Encode an HRR"
        if not isinstance(q,str):
            return None
        q = q.split('+')
        rep = numpy.zeros(self.N)
        for substr in q:
            substr = sorted(substr.split('*'))
            key = '*'.join(substr)
            if key not in self.store:
                ## Base concepts
                for i in range(len(substr)):
                    self.lookup(substr[i])
                ## Combinatorix
                for L in range(1,len(substr)):
                    for combination in itertools.combinations(substr,L+1):
                        key = '*'.join(sorted(combination))
                        print(key)
                        if key not in self.store:
                            subrep = hrri(self.N)
                            for i in range(len(combination)):
                                subrep = convolve(subrep,self.lookup(combination[i]))
                            self.store[key] = subrep
            rep += self.store[key]
        rep /= numpy.sqrt(len(q))
        return rep
    
    def decode(self,q):
        "Find closest match currently in memory."
        if isinstance(q,str):
            return None
        match = None
        best = -numpy.inf
        for key in self.store.keys():
            if numpy.dot(q,self.store[key]) > best:
                best = numpy.dot(q,self.store[key])
                match = key
        return match
    
    def unpack(self,q):
        "Unpack all possible symbols and subsymbols"
        if not isinstance(q,str):
            return None
        q = str.split(q.lower(),'+')
        reps = dict()
        for substr in q:
            substr = sorted(substr.split('*'))
            for L in range(len(substr)):
                for combination in itertools.combinations(substr,L+1):
                    key = '*'.join(sorted(combination))
                    reps[key] = self.encode(key)
        return reps

    def print(self):
        print(self)
        for key in self.store.keys():
            print(key,self.store[key])
                

##### Maze Class

In [75]:
import random
from fractions import Fraction
import numpy as np
import keras


LENGTH = 6  # Try me out
HRRLEN = 64
GAMMA = .5
GOAL = 0
EPOCH = 300

class Maze:
    def __init__(self):
        # Initialize reward vector
        self._reward = np.zeros(LENGTH)  # Reward Vector
        self._reward[GOAL] = 1
        
        # Initialize holographic reduced representations
        self.states = hrrs(HRRLEN, LENGTH, normalized=True)
        
        # Initialize model
        self.model = keras.Sequential()
        self.model.add(keras.layers.Dense(1,
                                          activation=None,
                                          input_dim=HRRLEN))
        self.model.compile(loss=keras.losses.mse,
                           optimizer=keras.optimizers.SGD(lr=0.1))

        

        
        
    # Disply the maze with current values inside
    def display(self):
        # obtain values for all states from model
        values = self.model.predict(self.states)
        
        # top of maze
        print(" --------" * LENGTH)
        
        print("|", end='')
        
        # print each predicted value
        for i in range(LENGTH):
            number = np.round(values[i],4)
            print("{:^8.4f}|".format(number[0]), end='')
        print()
        
        # bottom of maze
        print(" --------" * LENGTH)
        for i in range(LENGTH):
            print("   ", i, "   ", end='')
        print()
        return

    # Drop agent into maze and search for goal
    def episode(self, s):
        # Search and update value until reaches goal
        while(s != GOAL):  
            # calculate delta and update model
            self.delta(s)
            # next state
            values = self.model.predict(np.array(self.states))
            s = self.nextS(s, values)
        
        # calculate delta for goal
        self.delta(s)
        return
    
    # model updated within function
    # delta(s) = (r(s) + gamma v(s + 1)) - v(s)
    def delta(self, s):
        # if the state is the goal, train it to equal reward value
        if (s == GOAL):
            state = np.array([self.states[GOAL]])
            self.model.fit(state, [self._reward[GOAL]],
                           batch_size=1,
                           epochs=1,
                           verbose=0)
        # otherwise, train it to equal the target value
        else:
            values = self.model.predict(self.states)
            nextS = self.nextS(s, values)
            target = GAMMA * values[nextS][0]
            self.model.fit(np.array([self.states[s]]),
                           [target],
                           batch_size=1,
                           epochs=1,
                           verbose=0)
            
    # Obtain the value of a state
    # v(s) = r(s) + GAMMA * v(s+1)
    def v(self, s, values):
        # to determine the value, function needs to look
        #    at (s + 1).
        
        # determine left and right values (accounting for wrap around)
        if (s == 0):
            left = (LENGTH-1)
            right = s + 1
        elif (s == (LENGTH-1)):
            left = s - 1
            right = 0
        else:
            left = s - 1
            right = s + 1
        
        # determine direction
        if (values[left][0] >= values[right][0]):
            nextS = left
        else:
            nextS = right
        
        # v(s) = r(s) + GAMMA * v(s+1)
        return self._reward[s] + GAMMA * values[nextS][0]
            

    # Obtain the next state to be moved to (s + 1)
    def nextS(self, s, values):
        # determine left and right values (accounting for wrap around)
        if (s == 0):
            left = (LENGTH-1)
            right = s + 1
        elif (s == (LENGTH-1)):
            left = s - 1
            right = 0
        else:
            left = s - 1
            right = s + 1
            
        # determine direction
        if (self.v(left, values) >= self.v(right, values)):
            nextS = left
        else:
            nextS = right
        return nextS
    
    # Obtain the matrix of state values
    def get_values(self):
        values = self.model.predict(self.states)
        values = np.reshape(values, (LENGTH))
        return values

In [76]:
maze = Maze()
print("Maze before training:")
maze.display()

for i in range(EPOCH):
    s = np.random.randint(0, LENGTH)
    maze.episode(s)

print("Maze after training:")
maze.display()

Maze before training:
 -------- -------- -------- -------- -------- --------
|-0.0289 | 0.0958 | 0.1883 |-0.0774 | 0.1379 |-0.1905 |
 -------- -------- -------- -------- -------- --------
    0        1        2        3        4        5    
Maze after training:
 -------- -------- -------- -------- -------- --------
| 1.0000 | 0.5000 | 0.2500 | 0.1250 | 0.2500 | 0.5000 |
 -------- -------- -------- -------- -------- --------
    0        1        2        3        4        5    
