In [1]:
import numpy as np
import numpy.random as rn
import matplotlib.pyplot as plt
%matplotlib inline


In [2]:
## Define the gridworld MDP class

class Gridworld(object):
    """
    Gridworld MDP.
    """
    
    def __init__(self, grid_size, wind, discount):
        """
        grid_size: Grid size. int.
        wind: Chance of moving randomly. float.
        discount: MDP discount. float.
        -> Gridworld
        """

        self.actions = ((1, 0), (0, 1), (-1, 0), (0, -1))
        self.n_actions = len(self.actions)
        self.n_states = grid_size**2
        self.grid_size = grid_size
        self.wind = wind
        self.discount = discount

        # Preconstruct the transition probability array.
        self.transition_probability = np.array(
            [[[self._transition_probability(i, j, k)
               for k in range(self.n_states)]
              for j in range(self.n_actions)]
             for i in range(self.n_states)])

    def __str__(self):
        return "Gridworld({}, {}, {})".format(self.grid_size, self.wind,
                                              self.discount)
    
    def int_to_point(self, i):
        """
        Convert a state int into the corresponding coordinate.

        i: State int.
        -> (x, y) int tuple.
        """

        return (i % self.grid_size, i // self.grid_size)

    def point_to_int(self, p):
        """
        Convert a coordinate into the corresponding state int.

        p: (x, y) tuple.
        -> State int.
        """

        return int(p[0] + p[1]*self.grid_size)

    def neighbouring(self, i, k):
        """
        Get whether two points neighbour each other. Also returns true if they
        are the same point.

        i: (x, y) int tuple.
        k: (x, y) int tuple.
        -> bool.
        """

        return abs(i[0] - k[0]) + abs(i[1] - k[1]) <= 1

    def _transition_probability(self, i, j, k):
        """
        Get the probability of transitioning from state i to state k given
        action j.

        i: State int.
        j: Action int.
        k: State int.
        -> p(s_k | s_i, a_j)
        """

        xi, yi = self.int_to_point(i)
        xj, yj = self.actions[j]
        xk, yk = self.int_to_point(k)
        

        if not self.neighbouring((xi, yi), (xk, yk)):
            return None

        # Is k the intended state to move to?
        if (xi + xj, yi + yj) == (xk, yk):
            return None

        # If these are not the same point, then we can move there by wind.
        if (xi, yi) != (xk, yk):
            return None

        
        # If these are the same point, we can only move here by either moving
        # off the grid or being blown off the grid. Are we on a corner or not?
        if on corner:
            # Corner.
            # Can move off the edge in two directions.
            # Did we intend to move off the grid?
            if we intended to move off the grid: 
                # we have the regular success chance of staying here
                # plus an extra chance of blowing onto the *other* off-grid square
                return None
            else:
                # We can blow off the grid in either direction only by wind.
                return None
        else:
            # Not a corner. Is it an edge?
            if not an edge:
                return None
            
            else:#it is an edge
                # we Can only move off the edge in one direction.
                # Did we intend to move off the grid?
                if We intended to move off the grid:
                    #we have the regular success chance of staying here.
                return None
                else:
                    # We can blow off the grid only by wind.
                    return None

    def reward(self, state_int):
        """
        Reward for being in state state_int.

        state_int: State integer. int.
        -> Reward.
        """
    
        postive_reward = 10
        negative_reward = -100
        
        #look at figure 6,7 to retrurn a reward at the given state. 
        
        return None

In [None]:
## Function for plotting the matrix values

def plot_matrix(matrix):
    fig, ax = plt.subplots()
    num_rows = len(matrix)
    min_val, max_val = 0, num_rows

    for i in range(num_rows):
        for j in range(num_rows):
            c = matrix[i][j]
            ax.text(j + 0.5, i + 0.5, '{:.1f}'.format(c), va='center', ha='center')

    ax.set_xlim(min_val, max_val)
    ax.set_ylim(max_val, min_val)
    ax.set_xticks(np.arange(max_val))
    ax.set_yticks(np.arange(max_val))
    ax.xaxis.tick_top()
    ax.grid()
    plt.show()
    plt.close()

In [3]:
## Creating the gridworld MDP with the following parameters

grid_size = 10
wind = 0.1
discount = 0.8

# Make the gridworld and associated data.
gw = Gridworld(grid_size, wind, discount)

In [None]:
## Plotting the reward value for each state of the grid

def reward_grid_plot():
    reward_matrix = np.zeros((grid_size, grid_size))
    for j in range(grid_size):
        for i in range(grid_size):
            reward_matrix[i][j] = gw.reward(gw.point_to_int((i,j)))
    plot_matrix(reward_matrix)
    return reward_matrix
reward_matrix = reward_grid_plot()

In [None]:
## For visualization generating the heat map of the ground truth reward

plt.pcolor(np.flipud(reward_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of Reward function 1')
plt.show()

In [None]:
## Implementing the algorithm for computing the optimal value function for each state
## The algorithm takes as input the MDP and returns an array of optimal values,
## where i^th value in the array corresponds to the optimal value of the i^th state.

def optimal_value(n_states, n_actions, transition_probabilities, reward,
                  discount, threshold=1e-2):
    """
    Find the optimal value function.

    n_states: Number of states. int.
    n_actions: Number of actions. int.
    transition_probabilities: Function taking (state, action, state) to
        transition probabilities.
    reward: Vector of rewards for each state.
    discount: MDP discount factor. float.
    threshold: Convergence threshold, default 1e-2. float.
    -> Array of values for each state
    """

    v = np.zeros(n_states)

    #write code here
    
    return v

In [None]:
## Plotting the optimal values of each state in the grid

# Generating the array of rewards to be passed onto the optimal value algorithm

reward_states = np.zeros(gw.n_states)
for i in range(gw.n_states):
    reward_states[i] = gw.reward(i)
    
# Computing the optimal value of each state

v = optimal_value(gw.n_states, gw.n_actions, gw.transition_probability, reward_states, gw.discount)

# Plotting

value_matrix = np.zeros((grid_size, grid_size))
for i in range(gw.n_states):
    value_matrix[int(i%grid_size)][int(i/grid_size)] = round(v[i], 1)

plot_matrix(value_matrix)

In [None]:
## For visualization generating the heat map of the optimal state values

plt.pcolor(np.flipud(value_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of optimal state values for Reward function 1')
plt.show()

In [None]:
## Implementing the function for computing the optimal policy.
## The function takes as input the MDP and outputs a
## deterministic policy, which is an array of actions.
## The i^th entry in the array corresponds to the
## optimal action to take at the i^th state.

def find_policy(n_states, n_actions, transition_probabilities, reward, discount,
                threshold=1e-2, v=None, stochastic=False):
    """
    Find the optimal policy.

    n_states: Number of states. int.
    n_actions: Number of actions. int.
    transition_probabilities: Function taking (state, action, state) to
        transition probabilities.
    reward: Vector of rewards for each state.
    discount: MDP discount factor. float.
    threshold: Convergence threshold, default 1e-2. float.
    v: Value function (if known). Default None.
    stochastic: Whether the policy should be stochastic. Default True.
    -> Action probabilities for each state or action int for each state
        (depending on stochasticity).
    """

    if v is None:
        v = optimal_value(n_states, n_actions, transition_probabilities, reward,
                          discount, threshold)

    def _policy(s):
        return None
     
    policy = np.array([_policy(s) for s in range(n_states)])
    return policy

In [None]:
## Function for plotting the optimal actions at each state in the grid
## The function takes as input the matrix containing optimal actions
## and plots the actions for each state on the grid

def plot_arrow(action_matrix):
    
    fig, ax = plt.subplots()
    num_rows = len(action_matrix)
    min_val, max_val = 0, num_rows

    for i in range(num_rows):
        for j in range(num_rows):
            c = action_matrix[i][j]
            arrow = ''
            if(c == 0):
                arrow = u'↓'
            elif(c == 1):
                arrow = u'→'
            elif(c == 2):
                arrow = u'↑'
            else:
                arrow = u'←'
            
            ax.text(j + 0.5, i + 0.5, arrow, va='center', ha='center')

    ax.set_xlim(min_val, max_val)
    ax.set_ylim(max_val, min_val)
    ax.set_xticks(np.arange(max_val))
    ax.set_yticks(np.arange(max_val))
    ax.xaxis.tick_top()
    ax.grid()

In [None]:
## Plotting the optimal actions for each state in the grid

# Finding the array of optimal policy

optimal_policy = find_policy(gw.n_states, gw.n_actions, gw.transition_probability, reward_states, gw.discount, stochastic=False)

# Generating the matrix containing the optimal actions

action_matrix = np.zeros((grid_size, grid_size))
for i in range(gw.n_states):
    action_matrix[int(i%grid_size)][int(i/grid_size)] = optimal_policy[i]
    

# Plotting
plot_arrow(action_matrix)


**IRL**

In [None]:

## IRL algorithm
## LP formulation

from cvxopt import matrix, solvers
def irl(n_states, n_actions, transition_probability, policy, discount, Rmax,
        l1):
    """
    Find a reward function with inverse RL as described in Ng & Russell, 2000.

    n_states: Number of states. int.
    n_actions: Number of actions. int.
    transition_probability: NumPy array mapping (state_i, action, state_k) to
        the probability of transitioning from state_i to state_k under action.
        Shape (N, A, N).
    policy: Vector mapping state ints to action ints. Shape (N,).
    discount: Discount factor. float.
    Rmax: Maximum reward. float.
    l1: l1 regularisation. float.
    -> Reward vector
    """

    A = set(range(n_actions))  # Set of actions to help manage reordering
                               # actions.
    # The transition policy convention is different here to the rest of the code
    # for legacy reasons; here, we reorder axes to fix this. We expect the
    # new probabilities to be of the shape (A, N, N).
    transition_probability = np.transpose(transition_probability, (1, 0, 2))

    def T(a, s):
        """
        Shorthand for a dot product used a lot in the LP formulation.
        """

        return None

    # This entire function just computes the block matrices used for the LP
    # formulation of IRL.

    # Minimise c . x.
    
    ##WRITE CODE HERE 
    
    return None

In [None]:
def accuracy(exp_pol,ag_pol):
    
    num_states = len(exp_pol)
    count = 0.0
    
    for i in range(num_states):
        pass
    
    return np.divide(count,num_states)


def iter_acc(grid_obj,op_pol,op_pol_compare):
    
    n_states = grid_obj.n_states
    n_actions = grid_obj.n_actions
    tr_prob = grid_obj.transition_probability
    disc = grid_obj.discount
    Rmax = 100
    accuracy_array = []
    
    
    
    
    lam_range = np.linspace(0,5,500)
    
    
    for lam in lam_range:
        pass
    
    return np.amax(accuracy_array), best_pol


In [None]:
#Plotting the accuracy

plt.plot(lam_range,accuracy_array)
plt.xlabel('Lambda')
plt.ylabel('Accuracy')
plt.title('Accuracy vs Lambda (Reward function 2)')
plt.show()

In [None]:

Rmax = None
lamda = None


rec_reward = irl(...)

# Creating the recovered reward matrix

rec_reward_matrix = None



plt.subplot(1, 2, 1)
plt.pcolor(np.flipud(reward_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of Reward function 2')


plt.subplot(1, 2, 2)
plt.pcolor(np.flipud(rec_reward_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of recovered reward')
plt.show()

In [None]:
## Computing the optimal values of each state with extracted reward vector

opt_val_rec = optimal_value(...)

# Creating the recovered optimal value matrix

opt_val_rec_matrix = None
    

# Generating the heatmap of the optimal values using extracted reward and the groundtruth reward

plt.subplot(1, 2, 1)
plt.pcolor(np.flipud(value_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of optimal values (GR)')


plt.subplot(1, 2, 2)
plt.pcolor(np.flipud(opt_val_rec_matrix))
plt.colorbar()
plt.axis('off')
plt.title('Heat map of optimal values (ER)')
plt.show()

In [None]:
## Plotting the optimal policy of the agent

optimal_policy_ag = find_policy(...)

# Generating the matrix containing the optimal actions for the agent

action_matrix_ag = pass

# Plotting
plot_arrow(action_matrix_ag)
