In [190]:
"""
Implements the gridworld MDP.

Matthew Alger, 2015
matthew.alger@anu.edu.au
"""

import numpy as np
import numpy.random as rn

class Gridworld(object):
    """
    Gridworld MDP.
    """

    def __init__(self, grid_size, wind, discount,horizon):
        """
        grid_size: Grid size. int.
        wind: Chance of moving randomly. float.
        discount: MDP discount. float.
        -> Gridworld
        """

        self.actions = ((1, 0), (0, 1), (-1, 0), (0, -1))
        self.n_actions = len(self.actions)
        self.n_states = grid_size**2
        self.grid_size = grid_size
        self.wind = wind
        self.discount = discount
        self.horizon = horizon

        # Preconstruct the transition probability array.
        self.transition_probability = np.array(
            [[[self._transition_probability(i, j, k)
               for k in range(self.n_states)]
              for j in range(self.n_actions)]
             for i in range(self.n_states)])
        self.reward_v = np.array([[self.reward(s) for s in range(self.n_states)] for a in range(self.n_actions)]).T
    def __str__(self):
        return "Gridworld({}, {}, {})".format(self.grid_size, self.wind,
                                              self.discount)

    def int_to_point(self, i):
        """
        Convert a state int into the corresponding coordinate.

        i: State int.
        -> (x, y) int tuple.
        """

        return (i % self.grid_size, i // self.grid_size)

    def point_to_int(self, p):
        """
        Convert a coordinate into the corresponding state int.

        p: (x, y) tuple.
        -> State int.
        """

        return p[0] + p[1]*self.grid_size

    def neighbouring(self, i, k):
        """
        Get whether two points neighbour each other. Also returns true if they
        are the same point.

        i: (x, y) int tuple.
        k: (x, y) int tuple.
        -> bool.
        """

        return abs(i[0] - k[0]) + abs(i[1] - k[1]) <= 1

    def _transition_probability(self, i, j, k):
        """
        Get the probability of transitioning from state i to state k given
        action j.

        i: State int.
        j: Action int.
        k: State int.
        -> p(s_k | s_i, a_j)
        """

        xi, yi = self.int_to_point(i)
        xj, yj = self.actions[j]
        xk, yk = self.int_to_point(k)

        if not self.neighbouring((xi, yi), (xk, yk)):
            return 0.0

        # Is k the intended state to move to?
        if (xi + xj, yi + yj) == (xk, yk):
            return 1 - self.wind + self.wind/self.n_actions

        # If these are not the same point, then we can move there by wind.
        if (xi, yi) != (xk, yk):
            return self.wind/self.n_actions

        # If these are the same point, we can only move here by either moving
        # off the grid or being blown off the grid. Are we on a corner or not?
        if (xi, yi) in {(0, 0), (self.grid_size-1, self.grid_size-1),
                        (0, self.grid_size-1), (self.grid_size-1, 0)}:
            # Corner.
            # Can move off the edge in two directions.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here plus an extra chance of blowing
                # onto the *other* off-grid square.
                return 1 - self.wind + 2*self.wind/self.n_actions
            else:
                # We can blow off the grid in either direction only by wind.
                return 2*self.wind/self.n_actions
        else:
            # Not a corner. Is it an edge?
            if (xi not in {0, self.grid_size-1} and
                yi not in {0, self.grid_size-1}):
                # Not an edge.
                return 0.0

            # Edge.
            # Can only move off the edge in one direction.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here.
                return 1 - self.wind + self.wind/self.n_actions
            else:
                # We can blow off the grid only by wind.
                return self.wind/self.n_actions

    def reward(self, state_int):
        """
        Reward for being in state state_int.

        state_int: State integer. int.
        -> Reward.
        """

        if state_int == self.n_states - 1:
            return 1
        return 0



import math
import numpy as np
import scipy.special

inf  = float("inf")

def soft_bellman_operation(env, reward):
    
    # Input:    env    :  environment object
    #           reward :  SxA dimensional vector 
    #           horizon:  finite horizon
        
    discount = env.discount
    horizon = env.horizon
    
    if horizon is None or math.isinf(horizon):
        raise ValueError("Only finite horizon environments are supported")
    
    n_states  = env.n_states
    n_actions = env.n_actions
    
#     T = env.transition_matrix
    
    V = np.zeros(shape = (horizon, n_states))
    Q = np.zeros(shape = (horizon, n_states,n_actions))
        
    broad_R = reward

    # Base case: final timestep
    # final Q(s,a) is just reward
    Q[horizon - 1, :, :] = broad_R
    # V(s) is always normalising constant
    V[horizon - 1, :] = scipy.special.logsumexp(Q[horizon - 1, :, :], axis=1)

    # Recursive case
    for t in reversed(range(horizon - 1)):
#         next_values_s_a = T @ V[t + 1, :]
#         next_values_s_a = next_values_s_a.reshape(n_states,n_actions)
        for a in range(n_actions):
            Ta = env.transition_probability[:,a,:]
            next_values_s_a = Ta@V[t + 1, :]
            Q[t, :, a] = broad_R[:,a] + discount * next_values_s_a
            
        V[t, :] = scipy.special.logsumexp(Q[t, :, :], axis=1)

    pi = np.exp(Q - V[:, :, None])

    return V, Q, pi


def create_Pi(pi):
    horizon, n_states, n_actions = pi.shape
    
    Pi = np.zeros(((horizon - 1)*n_states*n_actions,1))
    
    for t in range(horizon-1):
        curr_pi = pi[t].flatten('F')[:,None]
#         print(curr_pi.shape)
        next_pi = pi[t+1].flatten('F')[:,None]
        Pi[t*n_states*n_actions:(t+1)*n_states*n_actions] = np.log(curr_pi) - np.log(next_pi)
    return Pi

In [286]:
grid_size = 5
wind = 0.1
discount = 0.99
horizon = 10

gw = Gridworld(grid_size,wind,discount,horizon)

reward = gw.reward_v
reward[-1][-1] = 1
V,Q,pi = soft_bellman_operation(gw,gw.reward_v)


# Construct Psi and Pi
I = np.eye(gw.n_states)
E = I
P = gw.transition_probability[:,0,:]

for a in range(1, gw.n_actions):
    E = np.vstack((E,I))
    P = np.vstack((P, gw.transition_probability[:,a,:]))
    
gamma = gw.discount
psi_rows = (horizon - 1)*gw.n_states*gw.n_actions
psi_cols = horizon*gw.n_states

Pi = create_Pi(pi)

Psi = np.zeros((psi_rows, psi_cols))

for i in range(horizon-1):
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, i*gw.n_states:(i+1)*gw.n_states] = E
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, (i+1)*gw.n_states:(i+2)*gw.n_states] = -discount*P
    
print("The rank of P is  : ", np.linalg.matrix_rank(P))
print("The rank of Psi is: ", np.linalg.matrix_rank(Psi))

# find the  solution
v = np.linalg.pinv(Psi) @ Pi
print("The last solution is: ", v[-grid_size**2:])
# print("The kernel of Psi is: ", scipy.linalg.null_space(Psi))

The rank of P is  :  25
The rank of Psi is:  249
The last solution is:  [[ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [ 0.10127475]
 [-0.89872525]]


In [290]:
print("The projection of the kernel of Psi onto the last n-components is: \n", scipy.linalg.null_space(Psi)[-grid_size**2:])


The projection of the kernel of Psi onto the last n-components is: 
 [[0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]
 [0.0661165]]


In [241]:
np.linalg.matrix_rank(P)

25

In [240]:
P.shape

(100, 25)

In [187]:
x = np.linalg.pinv(Psi) @ Pi

In [188]:
x.shape

(250, 1)

In [277]:
class BlockingGridworld(object):
    """
    Gridworld MDP.
    """

    def __init__(self, grid_size, wind, discount,horizon):
        """
        grid_size: Grid size. int.
        wind: Chance of moving randomly. float.
        discount: MDP discount. float.
        -> Gridworld
        """

        self.actions = ((1, 0), (0, 1), (-1, 0), (0, -1))
        self.n_actions = len(self.actions)
        self.n_states = grid_size**2
        self.grid_size = grid_size
        self.wind = wind
        self.discount = discount
        self.horizon = horizon
        self.blocked_states = [6,7,8,9]
        
        # Preconstruct the transition probability array.
        self.transition_probability = np.array(
            [[[self._transition_probability(i, j, k)
               for k in range(self.n_states)]
              for j in range(self.n_actions)]
             for i in range(self.n_states)])
        
        self.normalize_transition_matrices()
      
        self.reward_v = np.array([[self.reward(s) for s in range(self.n_states)] for a in range(self.n_actions)]).T
    def __str__(self):
        return "Gridworld({}, {}, {})".format(self.grid_size, self.wind,
                                              self.discount)

    def int_to_point(self, i):
        """
        Convert a state int into the corresponding coordinate.

        i: State int.
        -> (x, y) int tuple.
        """

        return (i % self.grid_size, i // self.grid_size)

    def point_to_int(self, p):
        """
        Convert a coordinate into the corresponding state int.

        p: (x, y) tuple.
        -> State int.
        """

        return p[0] + p[1]*self.grid_size

    def neighbouring(self, i, k):
        """
        Get whether two points neighbour each other. Also returns true if they
        are the same point.

        i: (x, y) int tuple.
        k: (x, y) int tuple.
        -> bool.
        """

        return abs(i[0] - k[0]) + abs(i[1] - k[1]) <= 1

    def _transition_probability(self, i, j, k):
        """
        Get the probability of transitioning from state i to state k given
        action j.

        i: State int.
        j: Action int.
        k: State int.
        -> p(s_k | s_i, a_j)
        """

        xi, yi = self.int_to_point(i)
        xj, yj = self.actions[j]
        xk, yk = self.int_to_point(k)
       
        # is this state blocked in the MDP?
        if k in self.blocked_states:
            return 0.0 
        
        
        if not self.neighbouring((xi, yi), (xk, yk)):
            return 0.0

        # Is k the intended state to move to?
        if (xi + xj, yi + yj) == (xk, yk):
            return 1 - self.wind + self.wind/self.n_actions

        # If these are not the same point, then we can move there by wind.
        if (xi, yi) != (xk, yk):
            return self.wind/self.n_actions

        # If these are the same point, we can only move here by either moving
        # off the grid or being blown off the grid. Are we on a corner or not?
        if (xi, yi) in {(0, 0), (self.grid_size-1, self.grid_size-1),
                        (0, self.grid_size-1), (self.grid_size-1, 0)}:
            # Corner.
            # Can move off the edge in two directions.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here plus an extra chance of blowing
                # onto the *other* off-grid square.
                return 1 - self.wind + 2*self.wind/self.n_actions
            else:
                # We can blow off the grid in either direction only by wind.
                return 2*self.wind/self.n_actions
        else:
            # Not a corner. Is it an edge?
            if (xi not in {0, self.grid_size-1} and
                yi not in {0, self.grid_size-1}):
                # Not an edge.
                return 0.0

            # Edge.
            # Can only move off the edge in one direction.
            # Did we intend to move off the grid?
            if not (0 <= xi + xj < self.grid_size and
                    0 <= yi + yj < self.grid_size):
                # We intended to move off the grid, so we have the regular
                # success chance of staying here.
                return 1 - self.wind + self.wind/self.n_actions
            else:
                # We can blow off the grid only by wind.
                return self.wind/self.n_actions
            
    def normalize_transition_matrices(self):
        for a in range(self.n_actions):
            P = self.transition_probability[:,a,:]
            sum_P = P.sum(axis = 1)
            normalized_P = P/sum_P[:,None]
            self.transition_probability[:,a,:] = normalized_P
            
    def reward(self, state_int):
        """
        Reward for being in state state_int.

        state_int: State integer. int.
        -> Reward.
        """

        if state_int == self.n_states - 1:
            return 1
        return 0



In [285]:
grid_size = 5
wind = 0.1
discount = 0.9
horizon = 10

gw = BlockingGridworld(grid_size,wind,discount,horizon)

reward = gw.reward_v
reward[-1][-1] = 1
V,Q,pi = soft_bellman_operation(gw,gw.reward_v)


# Construct Psi and Pi
I = np.eye(gw.n_states)
E = I
P = gw.transition_probability[:,0,:]

for a in range(1, gw.n_actions):
    E = np.vstack((E,I))
    P = np.vstack((P, gw.transition_probability[:,a,:]))
    
gamma = gw.discount
psi_rows = (horizon - 1)*gw.n_states*gw.n_actions
psi_cols = horizon*gw.n_states

Pi = create_Pi(pi)

Psi = np.zeros((psi_rows, psi_cols))

for i in range(horizon-1):
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, i*gw.n_states:(i+1)*gw.n_states] = E
    Psi[i*gw.n_states*gw.n_actions:(i+1)*gw.n_states*gw.n_actions, (i+1)*gw.n_states:(i+2)*gw.n_states] = -discount*P
    
print("The rank of P is  : ", np.linalg.matrix_rank(P))
print("The rank of Psi is: ", np.linalg.matrix_rank(Psi))

# find the  solution
v = np.linalg.pinv(Psi) @ Pi
print("The solution of mu (V_{T-1}) is: ", v[-grid_size**2:])
print("The dim of kernel of Psi is: ", scipy.linalg.null_space(Psi).shape[1])

The rank of P is  :  21
The rank of Psi is:  245
The solution of mu (V_{T-1}) is:  [[ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 1.93971830e-18]
 [-1.50735184e-19]
 [-6.72343766e-19]
 [-7.36184720e-19]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [ 7.82290185e-02]
 [-9.21770982e-01]]
The dim of kernel of Psi is:  5
