In [1]:
from   matplotlib   import pyplot as plt
from   mpl_toolkits import mplot3d

import numpy as np

%matplotlib notebook

In [75]:
class Grids():
    
    def get_grid(self, name):
        try:
            return getattr(self, name)()
        except:
            raise Exception(f'"{name}" not recognised!')
    
    def ten_by_ten(self):
        return np.ones((10, 10))
    
    def four_rooms(self):
        grid = np.ones((11, 11))
        grid[:,5] = 0
        grid[5,:5] = 0
        grid[6,6:] = 0
        grid[[2,5,6,9],[5,1,8,5]] = 1
        return grid
    
    def i_maze(self):
        grid = np.ones((3, 15))
        grid[[0,2],1:-1] = 0
        return grid
    
    def three_rooms(self):
        grid            = np.ones((11, 11))
        grid[:,5]       = 0
        grid[[2, 8], 5] = 1
        grid[5,5:]      = 0
        return grid
    
class GridWorld(Grids):
    """
    GridWorld class for Laplacian computations
    
    ==============
      Parameters
    ==============
    
        grid      : numpy.ndarray
                    an array of values in the set {0, 1}.
                        - 0 indicates a wall
                        - 1 indicates available movement
        
        diffusion : None, str
                    None          - use the original   laplacian definition
                    "normalised"  - use the normalised laplacian definition
                    "random_walk" - use the normalised random-walk laplacian definition
                    
                    See https://en.wikipedia.org/wiki/Laplacian_matrix#Definition for more details
                    
        pvf_func  : str
                    "eigen"       - eigenvectors
                    "svd"         - left singular vectors
                    "ica"         - independent component vectors
                    
                    
    ==============
        Methods
    ==============
    
        render    : visualises the gridworld using matplotlib.pyplot.imshow
        
        
    ==============
      Attributes
    ==============
    
        A         : adjacency matrix
        D         : diagonal  matrix which is the row sum of the adjacency matrix
        L         : laplacian matrix
        grid      : numpy.array of the gridworld
        encode    : dictionary translating between adjacency nodes and gridworld coordinates
        
        
    ==============
    Related Papers
    ==============
    
        https://arxiv.org/pdf/1703.00956.pdf - A Laplacian Framework for Option Discovery in Reinforcement Learning
        https://arxiv.org/pdf/1810.04586.pdf - The Laplacian in RL: Learning Representations with Efficient Approximations
    """
    def __init__(self, grid, diffusion = None, pvf_func = 'eigen'):
        if isinstance(grid, str):
            self.name         = grid
            grid              = self.get_grid(grid)
        self._grid            = np.zeros([i + 2 for i in grid.shape])
        self._grid[1:-1,1:-1] = grid
        self.diffusion        = diffusion
        
        for variable in 'ADLev':
            setattr(self, f'_{variable}', None)             
        
        self.pvf              = getattr(self, f'_{pvf_func}')
        
    def render(self, i = None):
        """ renders the gridworld """
        if i is None:
            self._render()
        else:
            self._render_with_pvf(i)
            
    @property
    def grid(self):
        return self._grid
    
    @property
    def A(self):
        """ adjacency matrix """
        if isinstance(self._A, np.ndarray):
            return self._A
        states  = zip(*np.where(self._grid))
        encode  = self._encode = dict(enumerate(states))
        lookup  = self._lookup = {v : k for k, v in encode.items()}
        self._A = np.zeros((len(encode),) * 2)
        for i, state in encode.items():
            for adj in self.find_adjacent(state):
                j = lookup[adj]
                self._A[i, j] = 1
        return self._A

    @property
    def D(self):
        """ diagonal matrix which is the row sum of the adjacency matrix """
        if isinstance(self._D, np.ndarray):
            return self._D
        self._D = np.diag(self.A.sum(axis = 1))
        return self._D
    
    @property
    def L(self):
        """ laplacian matrix """
        if isinstance(self._L, np.ndarray):
            return self._L
        if self.diffusion is None:
            self._L = self.D - self.A
            return self._L
        elif self.diffusion in ['normalised', 'normalized']:
            rD = np.diag(1 / np.diag(np.sqrt(self.D)))
            self._L = np.eye(len(self.A)) - rD @ self.A @ rD
            return self._L
        elif self.diffusion == 'random_walk':
            self._L = np.eye(len(self.A)) - np.diag(1 / np.diag(self.D)) @ self.A
            return self._L
        raise Exception(f'Diffusion "{self.diffusion}" not recognised!')
        
    @property
    def encode(self):
        if '_encode' in self.__dict__:
            return self._encode
        raise Exception('Adjacency matrix needs to be computed first!')
        
    @property
    def lookup(self):
        if '_lookup' in self.__dict__:
            return self._lookup
        raise Exception('Adjacency matrix needs to be computed first!')
        
    @property
    def e(self):
        if isinstance(self._e, np.ndarray):
            return self._e
        self.pvf()
        return self._e
    
    @property
    def v(self):
        if isinstance(self._v, np.ndarray):
            return self._v
        self.pvf()
        return self._v
    
    def value_iterate(self, i, p = 1, gamma = 1, theta = 1e-12):
        v = self.v[:,i]
        V = np.zeros_like(self.grid)
        while True:
            old = V.copy()
            for state in zip(*np.where(self._grid)):
                prob = np.eye(4) * p + (1 - p) / 4
                rV   = []
                for state_ in self.find_adjacent(state):
                    rV.append(v[self.lookup[state_]] - v[self.lookup[state]] + gamma * V[state_])
                V[state] = max(prob.dot(rV))
            err = np.absolute(V - old).max()
            if err < theta: break
        return V
    
    def _render(self):
        kwargs  = {}
        if 'name' in self.__dict__:
            kwargs['num'] = self.name
        fig, ax = plt.subplots(figsize = (8, 6), **kwargs)
        plt.imshow(self._grid, cmap = 'magma')
        for i in range(self._grid.shape[0]):
            plt.hlines(i + 0.5, -0.5, self._grid.shape[1] - 0.5, alpha = 0.2)
        for j in range(self._grid.shape[1]):
            plt.vlines(j + 0.5, -0.5, self._grid.shape[0] - 0.5, alpha = 0.2)
        plt.xticks([])
        plt.yticks([])
        plt.show()
        
    def _render_with_pvf(self, k):
        kwargs  = {}
        if 'name' in self.__dict__:
            kwargs['num'] = self.name
            
        V      = self.value_iterate(k)
        policy = np.zeros_like(V, dtype = str)

        for state in zip(*np.where(env.grid)):
            values = [V[state_] for state_ in self.find_adjacent(state)]
            if len(np.unique(values)) > 1:
                policy[state] = '⇩⇧⇦⇨'[np.argmax(values)]
            
        fig = plt.figure(figsize = (16, 6), **kwargs)
        fig.add_subplot(1, 2, 1)
        plt.title('Environment')
        plt.imshow(self._grid, cmap = 'magma')
        for i in range(self._grid.shape[0]):
            plt.hlines(i + 0.5, -0.5, self._grid.shape[1] - 0.5, alpha = 0.2)
        for j in range(self._grid.shape[1]):
            plt.vlines(j + 0.5, -0.5, self._grid.shape[0] - 0.5, alpha = 0.2)
        plt.xticks([])
        plt.yticks([])
        
        for state in zip(*np.where(self.grid)):
            plt.annotate(policy[state], state[::-1], va = 'center', ha = 'center')
            
        ax   = fig.add_subplot(1, 2, 2, projection='3d')
        plt.title(f'PVF {k + 1}', y = 1.08)
        X, Y = np.meshgrid(range(13), range(13))

        ax.contour3D(X, Y, env.value_iterate(k), 50, cmap='binary')
        plt.show()

    def _find_adjacent(self, state):
        """ helper function for find_adjacent method """
        i, j = state
        for k in [-1, 1]:
            if self._grid[i + k, j]:
                yield i + k, j
            else:
                yield i, j
            if self._grid[i, j + k]:
                yield i, j + k
            else:
                yield i, j
                
    def find_adjacent(self, state):
        """ computes neighbouring states """
        return list(self._find_adjacent(state)) # down, up, left, right
        
    def _eigen(self):
        e, v    = np.linalg.eig(self.L)
        o       = e.argsort()[::-1]
        self._e = e[o]
        self._v = v[:,o]
        
    def _svd(self):
        v, e,_  = np.linalg.svd(self.L, full_matrices = False)
        self._e = e
        self._v = v
        
    def _ica(self):
        raise NotImplementedError()

In [76]:
env  = GridWorld(grid = 'four_rooms', diffusion = 'normalised', pvf_func = 'eigen')

env.render(0)


<IPython.core.display.Javascript object>