<a href="https://colab.research.google.com/github/vbipin/aip/blob/master/mdp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#we plan to implement some of the algorithms related to MDPs and RL
#MDP study
#%matplotlib inline
#import matplotlib
#import numpy as np
#import matplotlib.pyplot as plt

#I am trying to avoid the numpy dependencies

import random
#
#We plan to implement the gridworld class 
#


In [99]:
#Let us have a gridworld
#ref: Chapter 17, Artificial Intelligence a Modern Approach
#ref: CS188 https://inst.eecs.berkeley.edu/~cs188/fa19/
#ref: https://inst.eecs.berkeley.edu/~cs188/fa19/assets/slides/lec8.pdf
#ref: https://courses.cs.washington.edu/courses/cse473/13au/slides/17-mdp-rl.pdf

#This class will create a 2D grid of row x colums 
#Some of the cells can be disabled by putting it into walls
#cells are addressed just like 2d arrays (r,c)
#There are possibly many terminal states
#terminal states have only one action available: Exit 
#Transistion is as per the book 80% action and 20%sideways ( a variable noise is used to control this distribution)
#There is a special end state, (-1,-1), from which NO action is available. This state is used as a final state.

#Actions #just some alias
Up    = 0
Down  = 1
Right = 2
Left  = 3
Exit  = 4

class GridWorld :
    #Default is as given in the AIMA book
    def __init__(self, 
                 rows    =3, 
                 columns =4, 
                 walls   =[(1,1)], terminals= {(0,3):+1.0, (1,3):-1.0}, 
                 gamma   =1.0, 
                 living_reward=0,
                 noise   =0.2
                ) :
        """We dont expect these parameters to change during the agent run"""
        self.rows      = rows
        self.columns   = columns
        self.N         = rows * columns #total cells
        self.walls     = walls
        self.terminals = terminals #dictionary of terminal celss and their rewards.
        self.gamma     = gamma
        self.living_reward = living_reward
        self.all_actions   = [ Up, Down, Right, Left, Exit ]
        self.end_state     = (-1, -1) #a dummy state to reach after taking Exit
        self.all_states    = [(r,c) for r in range(rows) for c in range(columns) if (r,c) not in walls ] + [self.end_state]
        self.noise         = noise
        
        
        #transitions from each state and the probabilities
        self.noise                = noise
        self.action_transitions   = { 
            Up:   ([Up,    Left, Right], [1-noise, noise/2, noise/2 ]),
            Down: ([Down,  Left, Right], [1-noise, noise/2, noise/2 ]),
            Left: ([Left,  Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Right:([Right, Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Exit :([Exit], [1.0])
        }
    
    def actions(self, state) :
        """returns all valid actions from the current state"""
        if state in self.terminals :
            return [Exit]
        if state == self.end_state :
            return [] #No action available.
        return [ Up, Down, Right, Left ]
    
    def reward(self, state, action, next_state=None) :
        """reward is the instantaneous reward. It is usually R(s,a,s')"""
        #In grid world the reward depends only on state.
        if state in self.terminals :
            return self.terminals[state] #dict has the terminal values +1 or -1
        if state == self.end_state :
            return 0.0
        return self.living_reward        #usually a small -ve value
    
    def transitions(self, state, action) :
        """returna list of tuple(nextstate, action, probability)"""
        actual_actions, probs = self.action_transitions[action]
        return [ self._next_cell(state, a) for a in actual_actions ], actual_actions, probs
    
    def move(self, state, action) :
        """Take the action and return the tuple(new_state, reward, is_terminal)"""                          
        assert action in self.actions(state) #just a check if this is a valid action at this time or not
        
        cells, actions, p = self.transitions(state, action)
        
        #we choose one cell acccording to probabilities
        new_state   = random.choices(cells, weights=p)[0] #only one; we take index 0                
        reward      = self.reward(state, action) #
        
        is_terminal = False
        if new_state == self.end_state :
            is_terminal = True
            
        return new_state, reward, is_terminal #keep the same for mat as OpenAI gym.
    
    def _next_cell(self, state, action) : 
        """Blindly takes the action without checking anything and returns the position"""
        r,c = state #row & column
        if action == Exit :
            return self.end_state
        if action == Up :
            target = r-1, c  
        if action == Down :
            target = r+1, c
        if action == Right :
            target = r, c+1  
        if action == Left :
            target = r, c-1 
        
        if self._valid_cell(target) :
            return target
        return state #stay put the target is invalid.
    
    def _valid_cell(self, cell) :
        """Returns true if the cell is a valid cell"""
        r, c = cell #this may be an illegal node; we need to check
        
        #is it any of the walls?
        if (r,c) in self.walls :
            return False
        
        #is it outside the grid?
        if r < 0 or r >= self.rows or c < 0 or c >= self.columns :
            return False
        
        return True
    
    #pretty print the grid and agent if given.
    def print(self, agent_state=None) :
        for r in range(self.rows) :
            for c in range(self.columns) :
                cell = (r,c)
                if cell in self.walls :
                    print('# ', end='')
                elif cell in self.terminals :
                    if self.terminals[cell] > 0 :
                        print('+', end=' ')
                    else :
                        print('-', end=' ')
                elif cell == agent_state :
                    print('@ ', end='')
                else :
                    print('. ', end='')
            print("")

In [100]:
grid_world=GridWorld(gamma=1.0, living_reward=-0.04)

In [124]:
v={s:0 for s in grid_world.all_states}
v.update({(0,3):1})
v.update({(1,3):-1})
#v.update({(0,2):0.76})

In [125]:
v

{(0, 0): 0,
 (0, 1): 0,
 (0, 2): 0,
 (0, 3): 1,
 (1, 0): 0,
 (1, 2): 0,
 (1, 3): -1,
 (2, 0): 0,
 (2, 1): 0,
 (2, 2): 0,
 (2, 3): 0,
 (-1, -1): 0}

In [139]:
for i in range(100):
    for s in grid_world.all_states:   
        result=[]
        for ac in grid_world.actions(s):
            sd,noneed,p=grid_world.transitions(s,ac)
            yy=0
            for i in range(len(sd)):
                #r=grid_world.reward(s,ac,sd[i])
                yy=yy+(p[i]*((grid_world.reward(s,ac,sd[i]))+v[sd[i]]))
            result.append(yy)
            v.update({s:max(result)})



In [140]:
v

{(0, 0): 0.7927010181242994,
 (0, 1): 0.8531948452847933,
 (0, 2): 0.9132068898496833,
 (0, 3): 1.0,
 (1, 0): 0.7427010181242992,
 (1, 2): 0.6561839020886074,
 (1, 3): -1.0,
 (2, 0): 0.6852554037060499,
 (2, 1): 0.6256904883600551,
 (2, 2): 0.5835289591742714,
 (2, 3): 0.26065612266773786,
 (-1, -1): 0}

In [134]:
from Tkinter import Tk, Entry, mainloop, StringVar

root = Tk()

height = 5
width = 5
for i in range(height):  # Rows
    for j in range(width):  # Columns
        text_var = StringVar()
        # here we are setting cell text value
        text_var.set('%s%s' % (i, j)) 
        b = Entry(root, textvariable=text_var)
        b.grid(row=i, column=j)
mainloop()

ModuleNotFoundError: No module named 'Tkinter'

In [None]:
v={}
for i in grid_world.all_states:
    print(i)
    key=i
    v.update({key:0})

In [None]:
a=2
b=4
c=5
max([a,b,c])

In [None]:
q=grid_world.transitions((0,3),2)
q[0][0]

In [None]:
def expected_value(sd,a):
    aa=grid_world.transitions(sd,a)
    nxt=aa[0][0]
    while True:    
        if nxt == (-1,-1):
            print("reached_terminal")
            break
        sd1=aa[0][0]
        p1=aa[2][0]
        r1=grid_world.reward(sd,a,sd1)
        sd2=aa[0][1]
        p2=aa[2][1]
        r2=grid_world.reward(sd,a,sd2)
        sd3=aa[0][2]
        p3=aa[2][2]
        r3=grid_world.reward(sd,a,sd3)
        p=p1*r1+p2*r2+p3*r3
        vd1=p(r1+v[sd1])#messed up
        
        return 

In [49]:
s=(0,0)
for s in grid_world.all_states:
    print("state",s)
    

state (0, 0)
state (0, 1)
state (0, 2)
state (0, 3)
state (1, 0)
state (1, 2)
state (1, 3)
state (2, 0)
state (2, 1)
state (2, 2)
state (2, 3)
state (-1, -1)


In [None]:
vds=prob[reward+v[s]]

In [None]:
policy={(0,0):Right,(0,1):Right,(0,2):Right,(0,3):Exit,(1,0):Up,(1,2):Up,(1,3):Exit,
        (1,4):Exit,(2,0):Up,(2,1):Right,(2,2):Up,(2,3):Left}

In [None]:
policy={}
for s in grid_world.all_states:
    if s== (-1,-1):
        break
    policy.update({s:Up})  
    #dicti.update({key:temp})
    print(policy)
policy.update({(0,3):Exit}) 
print("policy=",policy)

In [None]:
nxt=(2,0)
reward=0
iter=0
while True:    
    if nxt == (-1,-1):
        break
    action=policy[nxt]
    a=grid_world.move(nxt,action)
    nxt=a[0]
    reward+=a[1]
    iter+=1
    print("iteration=",iter,"action=",action,"next=",nxt,"reward=",reward)

In [None]:
############# I might change the API later ##########


In [None]:
class Policy :
    def __init__(self, grid_world=None) :
        """Holds one policy and returns actions according to it"""
        self.grid_world = grid_world
        self.policy     = { } #{ state: policy_action}
        
    def __getitem__(self, state) :
        return self.policy[state]
    
    def __setitem__(self, state, action) :
        self.policy[state] = action
        
    def print(self) :
        print_chars = {Up:'^', Down:'v', Right:'>', Left:'<', Exit:'+'}
        for state in [(r,c) for r in range(grid_world.rows) for c in range(grid_world.columns)]:
            if state not in self.policy :
                print('#', end=' ')
            else : 
                print(print_chars[self.policy[state]], end=' ')
            if (state[1]+1) % grid_world.columns == 0 :
                print("") #just a newline

In [None]:
###### Some test code. 

In [None]:
def random_policy(grid_world) :
    """returns a random action function"""
    def f(state) :
        return random.choice(grid_world.actions(state))
    return f

def fixed_policy(grid_world) :
    p = Policy(grid_world)
    p.policy = {state: Up   for state in grid_world.all_states }
    p.policy.update({state:Exit for state in grid_world.terminals})
    #print(p.policy)
    def f(state) :
        return p[state]
    return f

def good_policy(grid_world) :
    p = Policy(grid_world)
    p.policy = {
        (-1, -1) : Exit, #just to avoid some extra checking code
        (0,0):Right, (0,1): Right, (0,2): Right, (0,3) : Exit,
        (1,0):Up,    (1,1): Right, (1,2): Up,    (1,3) : Exit,
        (2,0):Up,    (2,1): Left,  (2,2): Up,    (2,3) : Left,
               }
    p.policy.update({state:Exit for state in grid_world.terminals})
    #print(p.policy)
    def f(state) :
        return p[state]
    return f
def my_policy(grid_world) :
    p = Policy(grid_world)
    p.policy = {
        (-1, -1) : Exit, #just to avoid some extra checking code
        (0,0):Up, (0,1): Up, (0,2): Up, (0,3) : Exit,
        (1,0):Up,    (1,1): Up, (1,2): Up,    (1,3) : Exit,
        (2,0):Up,    (2,1): Up,  (2,2): Up,    (2,3) : Up,
               }
    p.policy.update({state:Exit for state in grid_world.terminals})
    #print(p.policy)
    def f(state) :
        return p[state]
    return f
    
def run(grid_world, state, policy=None) :
    """runs a full episode and return the total reward"""
    rewards = []
    gamma = grid_world.gamma
    
    time_step = 0
    while True :
        action = policy(state)
        #a.print()
        #print(action)
        state, r, exited = grid_world.move(state, action)
        rewards.append(r * (gamma**time_step) ) #the further we go down, the less we value the reward
        if exited :
            break    
        time_step += 1
    return rewards


def expected_utility(grid_world, state, policy, N=100) :
    """run the policy till completion several times and return the expected utility"""
    s = 0.0
    for _ in range(N) :
        #from the same start state we run till completion, N times
        s += sum( run(grid_world, state, policy) )
    return s/N

In [None]:
#page  651; AIMA Book
#The utilities of the states in the 4 × 3 world, calculated with γ = 1 and
#R(s) = − 0.04 for nonterminal states

grid_world = GridWorld(gamma=1.0, living_reward=-0.04)

for cell in grid_world.all_states :
    print (expected_utility(grid_world, cell, good_policy(grid_world), 1000))

In [None]:
def qvalue(grid_world, state, action, V) :
    """returns the Q value of the state action pair"""
    #  SUM [  P(s' | s, a) * V(s2) ] of all s' from (s,a)
    next_states, actions, p = grid_world.transitions(state, action)
    
    values = [ p[i] * V[s] for i,s in enumerate(next_states) ]
    #print(values)
    #print(sum(values))
    return sum( values )

def max_qvalue(grid_world, state, V) :
    """returns the maximum of q values and its action"""
    q = [ (qvalue(grid_world, state, action, V), action) for action in grid_world.actions(state) ]
    #print(q)
    return max(q)

def value_iteration(grid_world) :
    states = grid_world.all_states
    gamma  = grid_world.gamma
    #epsilon = 0.0001 * (1-gamma)/gamma
    epsilon = 1e-7
    
    #initialize to 0
    U = { s: 0 for s in states }
    V = { s: 0 for s in states }
    
    #difference between U and V. if it is less than epsilon, we are good enough
    def diff(U,V) :
        return sum([abs(U[s] - V[s]) for s in states])
            
    for count in range(1000) :
        for state in states :
            qmax, qaction = max_qvalue(grid_world, state, U)
            #print(q)
            V[state] = grid_world.reward(state, None) + gamma * qmax
            
            #print(U)
            #print(V)
        if diff(U, V) < epsilon : #we are not improving much. Converged?
            break
                
        #print (diff(U, V), epsilon)
        U.update(V)
    print(count)
    return U
 
def policy_from_value(grid_world, V) :
    p = Policy(grid_world)
    for state in grid_world.all_states :
        qmax, qaction = max_qvalue(grid_world, state, V)
        p[state] = qaction
    return p

In [None]:
grid_world = GridWorld(gamma=0.9, living_reward=-0.04)

In [None]:
V = value_iteration(grid_world)

In [None]:
p = policy_from_value(grid_world, V)

In [None]:
p.print()