<a href="https://colab.research.google.com/github/vbipin/aip/blob/master/mdp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#we plan to implement some of the algorithms related to MDPs and RL
#MDP study
#%matplotlib inline
#import matplotlib
#import numpy as np
#import matplotlib.pyplot as plt

#I am trying to avoid the numpy dependencies

import random
#
#We plan to implement the gridworld class 
#


In [0]:
#Let us have a gridworld
#ref: Chapter 17, Artificial Intelligence a Modern Approach
#ref: CS188 https://inst.eecs.berkeley.edu/~cs188/fa19/
#ref: https://inst.eecs.berkeley.edu/~cs188/fa19/assets/slides/lec8.pdf

#This class will create a 2D grid of row x colums 
#Some of the cells can be disabled by putting it into walls
#cells are addressed just like 2d arrays (r,c)
#There are possibly many terminal states
#terminal states have only one action available: Exit 
#Transistion is as per the book 80% action and 20%sideways

#Actions #just some alias
Up    = 0
Right = 1
Down  = 2
Left  = 3
Exit  = 4

class GridWorld :
    #Default is as given in the AIMA book
    def __init__(self, 
                 rows    =3, 
                 columns =4, 
                 walls   =[(1,1)], terminals= {(0,3):+1.0, (1,3):-1.0}, 
                 gamma   =1.0, 
                 living_reward=0,
                 noise   =0.2
                ) :
        """We dont expect these parameters to change during the agent run"""
        self.rows      = rows
        self.columns   = columns
        self.N         = rows * columns #total cells
        self.walls     = walls
        self.terminals = terminals #dictionary of terminal celss and their rewards.
        self.gamma     = gamma
        self.living_reward = living_reward
        self.all_actions   = [ Up, Down, Right, Left, Exit ]
        self.all_states    = [(r,c) for r in range(rows) for c in range(columns) if (r,c) not in walls ]
        self.noise         = noise
        self.end_state     = (-1, -1) #a dummy state to reach after taking Exit
        
        #transitions from each state and the probabilities
        self.noise                = noise
        self.action_transitions   = { 
            Up:   ([Up,    Left, Right], [1-noise, noise/2, noise/2 ]),
            Down: ([Down,  Left, Right], [1-noise, noise/2, noise/2 ]),
            Left: ([Left,  Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Right:([Right, Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Exit :([Exit], [1.0])
        }
    
    def actions(self, state) :
        """returns all valid actions from the current state"""
        if state in self.terminals :
            return [Exit]
        return [ Up, Down, Right, Left ]
    
    def reward(self, state, action) :
        if state in self.terminals :
            return self.terminals[state] #dict has the terminal values +1 or -1
        return self.living_reward        #usually a small -ve value
    
    def transitions(self, state, action) :
        """returna list of tuple(nextstate, action, probability)"""
        actual_actions, probs = self.action_transitions[action]
        return [ self._next_cell(state, a) for a in actual_actions ], actual_actions, probs
    
    def move(self, state, action) :
        """Take the action and return the tuple(new_state, reward, is_terminal)"""         
        if action == Exit :
            new_state   = self.end_state
            reward      = self.reward(state, action) #terminals got the rewards as well
            is_terminal = True
            
        else :
            #we find the new_cell of the slippery action according to the prbabilities
            #but this cell may be invalid. If invalid, we stay in the current state.

            #zip is basically it's own inverse.
            #ref: https://stackoverflow.com/questions/12974474/how-to-unzip-a-list-of-tuples-into-individual-lists/12974504
            #transitions = self.transitions(state, action)
            #cells, actions, p = list(zip(*transitions))
                  
            cells, actions, p = self.transitions(state, action)
            #we choose one cell acccording to probabilities
            new_state   = random.choices(cells, weights=p)[0] #only one; we take index 0                
            reward      = self.reward(state, action) #
            is_terminal = False
            
        return new_state, reward, is_terminal #keep the same for mat as OpenAI gym.
    
    def _next_cell(self, state, action) : 
        """Blindly takes the action without checking anything and returns the position"""
        r,c = state #row & column
        if action == Up :
            target = r-1, c  
        if action == Down :
            target = r+1, c
        if action == Right :
            target = r, c+1  
        if action == Left :
            target = r, c-1
        
        if self._valid_cell(target) :
            return target
        return state #stay put the target is invalid.
    
    def _valid_cell(self, cell) :
        """Returns true if the cell is a valid cell"""
        r, c = cell #this may be an illegal node; we need to check
        
        #is it any of the walls?
        if (r,c) in self.walls :
            return False
        
        #is it outside the grid?
        if r < 0 or r >= self.rows or c < 0 or c >= self.columns :
            return False
        
        return True
    
    #pretty print the grid and agent if given.
    def print(self, agent_state=None) :
        for r in range(self.rows) :
            for c in range(self.columns) :
                cell = (r,c)
                if cell in self.walls :
                    print('# ', end='')
                elif cell in self.terminals :
                    if self.terminals[cell] > 0 :
                        print('+', end=' ')
                    else :
                        print('-', end=' ')
                elif cell == agent_state :
                    print('@ ', end='')
                else :
                    print('. ', end='')
            print("")

In [0]:
grid_world = GridWorld(gamma=0.9, living_reward=-0.04)
start = (2,0) #as in the book

In [10]:
# + and - are the terminal states. @ is our agent.
grid_world.print(start)

. . . + 
. # . - 
@ . . . 


In [0]:
############# I might change the API later ##########


In [0]:
class Policy :
    def __init__(self, grid_world=None) :
        """Holds one policy and returns actions according to it"""
        self.grid_world = grid_world
        self.policy     = { } #{ state: policy_action}
        
    def __getitem__(self, state) :
        return self.policy[state]
    
    def __setitem__(self, state, action) :
        self.policy[state] = action

In [0]:
###### Some test code. 

In [0]:
def random_policy(grid_world) :
    """returns a random action function"""
    def f(state) :
        return random.choice(grid_world.actions(state))
    return f

def fixed_policy(grid_world) :
    p = Policy()
    p.policy = {state: Up   for state in grid_world.all_states }
    p.policy.update({state:Exit for state in grid_world.terminals})
    #print(p.policy)
    def f(state) :
        return p[state]
    return f

def good_policy(grid_world) :
    p = Policy()
    p.policy = {
        (0,0):Right, (0,1): Right, (0,2): Right, (0,3) : Exit,
        (1,0):Up,    (1,1): Right, (1,2): Up,    (1,3) : Exit,
        (2,0):Up,    (2,1): Left,  (2,2): Up,    (2,3) : Left,
               }
    p.policy.update({state:Exit for state in grid_world.terminals})
    #print(p.policy)
    def f(state) :
        return p[state]
    return f

In [0]:
def run(grid_world, state, policy) :
    """runs a full episode and return the total reward"""
    rewards = []
    gamma = grid_world.gamma
    time_step = 0
    while True :
        action = policy(state)
        #a.print()
        #print(action)
        state, r, exited = grid_world.move(state, action)
        rewards.append(r * (gamma**time_step) ) #the further we go down, the less we value the reward
        if exited :
            break
    
        time_step += 1
    return rewards


def expected_utility(grid_world, state, policy, N=100) :
    """run the policy till completion several times and return the expected utility"""
    s = 0.0
    for _ in range(N) :
        #from the same start state we run till completion, N times
        s += sum( run(grid_world, state, policy) )
    return s/N

In [22]:
#page  651; AIMA Book
#The utilities of the states in the 4 × 3 world, calculated with γ = 1 and
#R(s) = − 0.04 for nonterminal states

grid_world = GridWorld(gamma=1.0, living_reward=-0.04)

for cell in grid_world.all_states :
    print (expected_utility(grid_world, cell, good_policy(grid_world), 10000))

0.8059440000001052
0.8696080000000642
0.9149199999999595
1.0
0.7589440000001115
0.6405880000000638
-1.0
0.7086120000001105
0.6574880000000892
0.5939000000000813
0.37200800000003836
