# Imports 

In [1]:
import pandas as pd 
import numpy as np 
import itertools
import random
from random import  uniform
from copy import deepcopy

# Helpful functions

In [2]:
def cartesian(listoflists):
    """
    listoflists - list of lists of real numbers 
    returns the cartesian product of those lists as a list of tuples 
    """
    return list(itertools.product(*listoflists))

def canonical(i,n):
    """
    i - positive integer between 0 and n-1 
    n - positive integer 
    returns vectors of length n with +1 in the i'th slot
    """
    
    a=[0 for i in range(n)]
    a[i]=1
    return a

def anti_canonical(i,n):
    """
    i - positive integer between 0 and n-1 
    n - positive integer 
    returns vectors of length n with -1 in the i'th slot
    """
    
    a=[0 for i in range(n)]
    a[i]=-1
    return a

def canonical_list(n):
    """
    n - positive integer 
    returns the list of canonical vectors from above 
    """
    return [canonical(i,n) for i in range(n)]

def anti_canonical_list(n):
    """
    n - positive integers 
    returns the list of anti-canonical vectors from above 
    """
    return [anti_canonical(i,n) for i in range(n)]



# Diagnostics

In [3]:
def Qpath(g,start,Q,P,niter,output=False):
    """
    g - grid object 
    start - starting point 
    Q - Q-Table 
    P - grid function 
    niter - maximum number of iterations/moves is performed by the 
            function until it reaches the maximum. 
    TODO: ADD a check for cycle to exit.
    
    returns a path that strictly follows the q-table.
    """
    point=start
    Path=[]
    for i in range(niter):
        
        QValue=np.max(Q[np.ravel_multi_index(tuple(point),tuple(g.range_list))])
        if QValue>=0:
            direction=g.moves[np.argmax(Q[np.ravel_multi_index(tuple(point),tuple(g.range_list))])]
            
            nextpoint=tuple(g.limit_moves(np.array([point[i] + direction[i] for i in range(len(start))])))
            if output:
                print('From ',point,' to ',nextpoint)
                Mat=deepcopy(P)
                CMat=np.array(Mat, dtype=str)
                CMat[point]='@'
                CMat[nextpoint]='*'
                print(CMat)
            Path.append(point)
            point=nextpoint
        
        else:
            if output:
                print('At point ',point,' all the q-values are negative')
            Path.append(point)
            break
        
    return point, Path, len(Path)

def QTableTopology(grid_obj,P,QTable,gL,dim):
    """
    grid_obj - grid object 
    P - objective function (we also call it grid function)
    gL - grid length 
    dim - dimension 
    QTable - Q-Table 
    
    return a matrix of topology or partition on the grid in the following sense. For every point 
    of the grid as starting point of a path it finds the endpoint. 
    """
    T=np.array([np.NaN for i in range(gL**dim)]).reshape(tuple([gL for i in range(dim)]))
    MarkedT=np.array([0 for i in range(gL**dim)]).reshape(tuple([gL for i in range(dim)]))
    point=np.unravel_index(np.argmin(MarkedT), tuple([gL for i in range(dim)]))
    
    for index in range(gL**dim):
        point=np.unravel_index(index, tuple([gL for i in range(dim)]))
        if MarkedT[point]==0:
            end_point, Path, path_length = Qpath(grid_obj,point,QTable,P,gL**dim)
            for state in Path:
                T[state]=P[end_point]
                MarkedT[state]=1
    
    return T

# Creating the grid class and the associated methods 

In [4]:
class grid:
    def __init__(self, range_list): # tested 
        self.range_list=range_list
        self.moves=self.movement()
        self.dim=self.dimension()
        
    def dimension(self): # tested
        """
        returns the dimension of the grid 
        """
     
        return len(self.range_list)
    
    def coordinates(self): # tested 
        """
        returns the coordinates of the grid as tuples 
        """
        return cartesian([[i for i in range(x)] for x in self.range_list]) 
    
    def movement(self): # tested 
        """
        returns all the possible movement (unit) vectors allowable in the grid
        """
        A=canonical_list(self.dimension())+anti_canonical_list(self.dimension())
        B=[tuple(x) for x in A]
        return B

    
    def movement_size(self): # tested 
        """
        returns the size of the set of movement vectors
        """
        A=canonical_list(self.dimension())+anti_canonical_list(self.dimension())
        B=[tuple(x) for x in A]
        return len(B)
    

    
    def adjacent(self,point): # tested
        """
        point - tuple corresponding to a point in the grid 
        returns points in the grid where the Learning Agent is allowed to travel 
        in one step without wrapping around 
        """
        A=[tuple(np.array(point)+np.array(x)) for x in canonical_list(self.dimension())]
        B=[tuple(np.array(point)-np.array(x)) for x in canonical_list(self.dimension())]
        C=A+B
        
        def bad(i):
            return {x for x in C if x[i]==self.range_list[i]}.union({x for x in C if x[i]==-1})
        
        L=[]
        for i in range(self.dimension()):
            L=L+list(bad(i))
            
        C=[x for x in C if x not in L]
        
        return C
    
    def limit_moves(self, coord):
        """
        coord - coordinates of the point point in the d-dimensional space as numpy array;
        it is used to ensure that the Learning Agent stays within the grid while making a move (step) 
        if the Learning Agent is at point x on the boundary of the grid and chooses direction v
        towadrds the boundary that would make him cross it, then this method will prevent this from happening. 
        """
        for i in range(len(coord)):
            coord[i] = min(coord[i], self.range_list[i] - 1)
            coord[i] = max(coord[i], 0)
        return coord
    
    def adj(self,point):
        """
        point - tuple corresponding to a point in the grid 
        returns points in the grid where the Learning Agent is allowed to travel 
        in one step without wrapping around excluding the point itself
        """
        adj_lst=[]
        for v in self.movement():
            q=tuple(self.limit_moves(np.array(point)+np.array(v)))
            if q != point:
                adj_lst.append(q)
        return adj_lst

    
    def Q_update(self,Q,P,g,start,end,t): # tested 
        """
        Q - q-table 
        P - function that maps the grid to real numbers in the form of list of these numbers 
        g - discount rate 
        start - starting point 
        end - ending point 
        t - learning rate 
        returns an updated q-table 
        """
        start_0=np.ravel_multi_index(tuple(start),tuple(self.range_list))
        end_0=np.ravel_multi_index(tuple(end),tuple(self.range_list))
        a=self.move_index(tuple(np.array(end)-np.array(start)))
        best_dir=max(Q[end_0])
        i=0
        j=0
        reward = P[tuple([end[i] for i in range(len(end))])]-P[tuple([start[j] for j in range(len(start))])]
        Q[start_0][a]=(1-t)*Q[start_0][a]+t*(reward+g*best_dir)
        return Q
    
    def move_index(self,v):
        """
        v - movement vector from self.moves,
        returns the index of this vector in self.moves; 
        this index is column of the q-table corresponding to vector v 
        """
        for i in range(len(self.moves)):
            if self.moves[i] == v:
                return i
    
    def next_point(self,e,point,Q):
        """
        e - epsilon (small positive real number) that determines whether we explore 
        or exploit 
        point - tuple that represents a point in the grid 
        Q - q-table 
        """
        s=uniform(0,1)
        if s<e: 
            return random.choice(self.adj(point)) 
        if s>=e: 
            big = max([Q[np.ravel_multi_index(tuple(point),tuple(self.range_list))][i] for i in range(len(self.moves))])
            where = [i for i in range(len(self.moves)) if Q[np.ravel_multi_index(tuple(point),tuple(self.range_list))][i]==big] 
            there = [tuple(self.limit_moves(np.array(self.moves[i])+np.array(point))) for i in where]
        return random.choice(there) 
        
    def qprocess(self,start,Q,P,g,t,e,N):
        """
        Method implementing a single run of the epsilon-greedy Q-Learning algorithm;
        e - epsilon (small positive real number between zero and one) that determines whether we explore 
        or exploit 
        start - tuple that represents the starting point of the algorithm in the grid
        P - function that maps the grid to real numbers in the form of list of these numbers 
        g - discount rate 
        t - learning rate 
        Q - q-table
        N - maximum number of steps
        returns the updated q-table
        """
        point=start 
        for i in range(N):
            nextpoint=self.next_point(e,point,Q)
            while point==nextpoint:
                nextpoint=self.next_point(e,point,Q)
                

            Q=self.Q_update(Q,P,g,point,nextpoint,t)
            point=nextpoint
        return Q
    
    def qlearning(self,start,Q,P,g,t,e,N,num_epochs):
        """
        Method implementing the entire epsilon-greedy Q-Learning algorithm;
        e - epsilon (small positive real number between zero and one) that determines whether we explore 
        or exploit 
        start - tuple that represents the starting point of the algorithm in the grid
        P - function that maps the grid to real numbers in the form of list of these numbers 
        g - discount rate 
        t - learning rate 
        Q - q-table
        N - maximum number of steps
        returns the updated q-table
        """
        point=start 
        for epoch in range(num_epochs):
            Q = self.qprocess(start,Q,P,g,t,e,N)
            
        return Q
        
                             

# Test functions

# Notation: 
Q - is the q-table;
P - function defined on the grid, mapping it to real numbers;
e - epsilon, real number between zero and one that determines whether we explore or exploit; 
start - tuple that represents the starting point of the algorithm in the grid;
P - function that maps the grid to real numbers in the form of list of these numbers; 
g - discount rate; 
t - learning rate; 
N - maximum number of steps.

# User Instructions 

First, we need to create an instance of the class grid. Initially it requires only the dimension parameters of the grid. 

## grid_length
is the number of points on the side of the grid.
## dim 
is the dimension of the Euclidean space containing grid.

Therefore, if we denote as $n$ - the number of points on the grid, $\lambda$ - grid length, $\mu$ - dimension of the space, we will have $n=\lambda^{\mu} $.



# The following cell shows how to initialize a grid class instance.

In [5]:
grid_length=5
dim=2
grid1=grid([grid_length,grid_length])

In [6]:
grid1.limit_moves([1,9])

[1, 4]

In [7]:
grid1.adj([1,1])

[(2, 1), (1, 2), (0, 1), (1, 0)]

# Methods.
Immediately after we created a grid object we can use it's methods. Let's start with the methods that don't require any additional parameter.
## grid.coordinates() 
returns the list of all points that belong to the grid.
## grid.movement()
returns all the movement vectors.

In [8]:
grid1.coordinates()

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4)]

In [9]:
grid1.movement()

[(1, 0), (0, 1), (-1, 0), (0, -1)]

For a given point on the grid
## grid.adjacent(point)
are very similar, they both return the list of all points on the grid adjacent to it. I.e. all the points that obtained by making a single move: adding move vector to the point. 

Examples below will demonstrate the use of it.

In [10]:
point1=tuple([1,1]) #point in the interior of the grid

In [11]:
grid1.adjacent(point1)

[(2, 1), (1, 2), (0, 1), (1, 0)]

In [12]:
point2=tuple([0,4]) # boundary point

In [13]:
grid1.adjacent(point2)

[(1, 4), (0, 3)]

# Q-Learning

## In class grid we implemented an $\varepsilon$-greedy algortithm for building q-table.
Below are the meta-parameters of the algorithm.

t - Step length used to update the estimation of q-table values;

e - $\varepsilon$ the probability of algorithm to take a random move during the learning process;

g - Discounting Factor for Future Rewards,

N - number of steps during a single epoch;

nEpochs - number of epochs,

P - function that maps the grid to real numbers.

The algorithm starts with q-table consisting exclusively of zeroes.

# Example 4

P as a matrix of the following form.

\begin{equation}
P=\left[
\begin{matrix}
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 10 & 1 & 1 & 1& 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 5 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1\\
1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1 & 1
\end{matrix}
\right]
\end{equation}

# Two-dimension grid 5 x 5 with two exceptional values

In [14]:
#Initialization of parameters and objects.
dim=2
grid_length=5
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
P=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
P[2,2]=2
P[4,4]=10
start=(0,0)
end=(1,0)
grid4=grid([grid_length,grid_length])

In [15]:
grid4.Q_update(Q,P,1,start,end,1)

[[0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0]]

In [16]:
#Learning process
g=.2
t=0.8
e=0.2
N=2000
nEpochs=5
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [17]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,10)

((2, 2), [(0, 0), (1, 0), (2, 0), (2, 1), (2, 2)], 5)

# QTopology function example

In [16]:
QTableTopology(grid4,P,QTable4,grid_length,dim)

array([[ 2.,  2.,  2.,  2., 10.],
       [ 2.,  2.,  2.,  2., 10.],
       [ 2.,  2.,  2.,  2., 10.],
       [ 1.,  2.,  2., 10., 10.],
       [ 1., 10., 10., 10., 10.]])

# Two-dimensional grid 5 x 5 (increasing the reward for higher Q-values) with two exceptional values

In [17]:
#Initialization of parameters and objects.
dim=2
grid_length=5
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
P=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
P[2,2]=2
P[4,4]=10
start=(0,0)
grid4=grid([grid_length,grid_length])

In [18]:
#Learning process
g=1
t=0.8
e=0.2
N=2000
nEpochs=5
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [19]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,10)

((4, 4),
 [(0, 0),
  (0, 1),
  (1, 1),
  (1, 2),
  (1, 3),
  (2, 3),
  (3, 3),
  (4, 3),
  (4, 4),
  (4, 4)],
 10)

In [20]:
QTable4

[[0.0, 8.973382938975444, 0, 0],
 [8.999999941656323, 7.044220084341477, 0, 7.094115152760924],
 [8.64, 0.0, 0, 8.999969169755943],
 [7.2, 9.0, 0, 8.999099576864722],
 [9.0, 0, 0, 8.98559999997523],
 [0.0, 8.999999710419903, 0.0, 0],
 [0.7603200000000001, 8.999999999999998, 7.960479754329958, 8.294194942782127],
 [7.389056, 9.0, 8.68957789931224, 7.320995839996059],
 [9.0, 9.0, 9.0, 8.999999999999886],
 [8.99997696, 0, 8.99999759890806, 9.0],
 [6.954378659660794, 0.0, 8.999937717179163, 0],
 [0, 8.999995237662706, 7.199999938436067, 8.661399466553547],
 [7.999995493220352, 8.0, 7.999999993166709, 7.999967589800267],
 [9.0, 9.0, 9.0, 9.0],
 [9.0, 0, 9.0, 9.0],
 [0.0, 0.0, 8.991650467109103, 0],
 [8.927999999999999, 8.999999999988203, 8.62807510741722, 4.565825435160262],
 [9.0, 9.0, 8.999999999999527, 8.999999994731692],
 [9.0, 9.0, 9.0, 9.0],
 [9.0, 0, 9.0, 9.0],
 [0, 0, 8.845669736728052, 0],
 [0, 9.0, 8.9998792851456, 7.761902934064686],
 [0, 9.0, 9.0, 9.0],
 [0, 9.0, 9.0, 9.0],
 [0,

In [21]:
QTableTopology(grid4,P,QTable4,grid_length,dim)

array([[10., 10., 10., 10., 10.],
       [10., 10., 10., 10., 10.],
       [10., 10., 10., 10., 10.],
       [10., 10., 10., 10., 10.],
       [10., 10., 10., 10., 10.]])

# Three-dimensional grid with one exceptional value

In [21]:
#Initialization of parameters and objects.
dim=3
grid_length=5
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
P=np.array([1 for i in range(grid_length**3)]).reshape(grid_length,grid_length,grid_length)
P[2,2,1]=10
#P[4,4]=10
start=(0,0,0)
grid4=grid([grid_length,grid_length,grid_length])

In [22]:
#Learning process
g=.2
t=0.8
e=0.8
N=10000
nEpochs=15
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [23]:
#Diagnostics path

start_point=(0,0,0)
Qpath(grid4,start_point,QTable4,P,10)

((2, 2, 1),
 [(0, 0, 0), (1, 0, 0), (2, 0, 0), (2, 1, 0), (2, 2, 0), (2, 2, 1)],
 6)

# Larger grid (10 x 10) with two large exceptional values 

In [18]:
#Initialization of parameters and objects.
dim=2
grid_length=10
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
P=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
P[2,2]=2
P[4,4]=10
start=(0,0)
grid4=grid([grid_length,grid_length])

In [19]:
#Learning process
g=.4
t=0.8
e=0.5
N=20000
nEpochs=10
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [26]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,10)

((2, 2), [(0, 0), (1, 0), (2, 0), (2, 1), (2, 2)], 5)

# Larger two-dimensional grid (10 x 10) two exceptional values with one "not very exceptional"

In [27]:
#Initialization of parameters and objects.
dim=2
grid_length=10
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
P=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
P[2,2]=1.1
P[4,4]=10
start=(0,0)
grid4=grid([grid_length,grid_length])

In [28]:
#Learning process
g=.4
t=0.8
e=0.5
N=20000
nEpochs=10
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [None]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,10)

# Two dimensions with noise, 5 x 5 with two exceptional values and relatively low reward 

In [None]:
#Initialization of parameters and objects.
dim=2
grid_length=5
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
U=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
noise = np.random.normal(0, .01, U.shape)
P=U+noise
P[2,2]=2
P[4,4]=4
start=(0,0)
grid4=grid([grid_length,grid_length])

In [None]:
#Learning process
g=.2
t=0.8
e=0.2
N=2000
nEpochs=5
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [None]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,10)

# Two dimensions with noise, 5 x 5 with two exceptional values and higher reward

In [None]:
#Initialization of parameters and objects.
dim=2
grid_length=5
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
U=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
noise = np.random.normal(0, .01, U.shape)
P=U+noise
P[2,2]=2
P[4,4]=3
start=(0,0)
grid4=grid([grid_length,grid_length])

In [None]:
#Learning process
g=.8
t=0.1
e=0.8
N=2000
nEpochs=5
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [None]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,30)

# Extreme Experiments

In [30]:
#Initialization of parameters and objects.
dim=2
grid_length=10
Q=[[0 for i in range(2*dim)] for i in range(grid_length**dim)]
U=np.array([1 for i in range(grid_length**2)]).reshape(grid_length,grid_length)
noise = np.random.normal(0, .05, U.shape)
P=U+noise
P[2,2]=2
P[8,8]=10
P[4,4]=6

start=(0,0)
grid4=grid([grid_length,grid_length])

In [31]:
#Learning process
g=0.8
t=0.8
e=0.8
N=10000
nEpochs=10
QTable4=grid4.qlearning(start,Q,P,g,t,e,N,nEpochs)

In [32]:
#Diagnostics path

start_point=(0,0)
Qpath(grid4,start_point,QTable4,P,50)
#QTableTopology(grid4,P,QTable4,grid_length,dim)

((4, 4),
 [(0, 0), (1, 0), (2, 0), (2, 1), (2, 2), (3, 2), (4, 2), (4, 3), (4, 4)],
 9)