## Gridworld A

We experiment with policy and value incrementing algorithms on the simple gridworld problem

Gridworld is a closed 4x4 grid in which an agent may move any cardinal direction. The NW and SE corners are terminal states from which no more transitions are allowed. The goal of the agent is to learn a policy which maximized the expected discounted reward at each step. The reward function returns -1 for all non-terminal states.

In [2]:
import numpy as np
import random

To facilitate transitions and later computation, each state is represented internally as a pair of $(i,j)$ coordinates in a`numpy np.array`. Externally a state is just an integer 0...15

In [3]:
world_size = 4
terminal_states = (0, world_size**2 - 1)
states = {i: np.array([i // world_size, i % world_size]) for i in range(world_size * world_size)}

In [4]:
actions = {'up':np.array([-1,0]),
           'down':np.array([1,0]),
           'right':np.array([0,1]),
           'left':np.array([0,-1])}

In [5]:
arrows = {'left':"🠈",
          'right':"🠊",
          'up':"🠉",
          'down':"🠋",
          'none':" "}

In [6]:
def transition(state, action):
    """ Return the new state resulting from applying action to state.
        If action is invalid or if state is terminal, return the input state """
    if state in terminal_states:
        return state
    temp = states[state] + actions[action]
    # if valid move
    if (np.all( np.array([0,0]) <= temp) and np.all(temp < np.array([world_size,world_size]))):
        return temp[0]*world_size + temp[1]
    else:
        # if invalid
        return state

In [7]:
# Quick unit test
transition(3,'up'), transition(3,'down'), transition(8,'left')

(3, 7, 8)

In [8]:
def reward(state, action):
    """ Return -1 for all actions from all states """
    return -1

The global `state_actions` maps states to valid actions from that state. There are no actions available from a terminal state.

In [9]:
state_actions = dict()

for s in states.keys():
    children = set()
    for a in actions.keys():
        if transition(s,a) != s:
            children.add(a)
    state_actions.update({ s: children } )

In [10]:
state_actions

{0: set(),
 1: {'down', 'left', 'right'},
 2: {'down', 'left', 'right'},
 3: {'down', 'left'},
 4: {'down', 'right', 'up'},
 5: {'down', 'left', 'right', 'up'},
 6: {'down', 'left', 'right', 'up'},
 7: {'down', 'left', 'up'},
 8: {'down', 'right', 'up'},
 9: {'down', 'left', 'right', 'up'},
 10: {'down', 'left', 'right', 'up'},
 11: {'down', 'left', 'up'},
 12: {'right', 'up'},
 13: {'left', 'right', 'up'},
 14: {'left', 'right', 'up'},
 15: set()}

The global policy `pi` ($\pi$) starts out as a uniform policy. Each action is taken with uniform probability from each state.

In [17]:
pi = dict()
for s in states:
    children = state_actions[s]
    for a in children:
        pi.update( { (s,a) : 1/len(children) } )

Initialize transition probabilities. In conformity to the RL textbook, $P(s',r | s,a)$ is the defining probability in all problems. Given an agent in state $s$ and completing action $a$, what is the probability of transitioning to state $s'$ and receiving reward $r$. For gridworld, the environment is entirely deterministic. Action $a$ from state $s$ will always transition to the same $s'$ and return reward $-1$. This dictionary contains all valid transitions and maps them to the probability $p=1$.

In [18]:
p = dict()

In [19]:
for s in states:
    children = state_actions[s]
    for a in children:
        next_s = transition(s, a)
        r = reward(s, a)
        p.update( { (s, a, next_s, r) : 1 } )

## 4.1 Iterative Policy Evaluation Algorithm

Following the algorithm on p.75 of the RL text, we iteratively evaluate the value of policy `pi` = $\pi_U$, the uniform policy

In [21]:
V = { s:0.0 for s in states }
gamma = 0.99
count = 1
while count < 50:
    delta = 0
    for s in states:
        v = V[s]
        new_v = 0
        for a in state_actions[s]:
            r = reward(s, a)
            new_state = transition(s, a)
            new_v  += pi[(s,a)]*p[(s,a,new_state,r)]*(r + gamma*V[new_state])
        V[s] = new_v
        delta = max(delta, abs(v - V[s]))
    if delta < 0.01:
        break
    count += 1

Now compute the optimal policy based on the child with max V

In [22]:
optimal = dict()
for s in states:
    children = state_actions[s]
    children_states = [transition(s,a) for a in children]
    if (len(children_states) == 0):
        continue
    best_child = max([c for c in children_states], key = lambda x: V[x])
    optimal.update({s : best_child} )
optimal.update({ s: s for s in terminal_states })

In [23]:
for i in range(world_size):
    for j in range(world_size):
        print (f"{V[i*world_size + j]:8.2f}", end=' ')
    print()

    0.00    -9.67   -13.58   -14.44 
   -9.67   -12.72   -14.02   -13.59 
  -13.58   -14.02   -12.73    -9.69 
  -14.44   -13.59    -9.69     0.00 


In [368]:
for i in range(world_size):
    for j in range(world_size):
        print (f"{optimal[i*world_size + j]:4}", end=' ')
    print()

   0    0    1    2 
   0    4    5   11 
   4    5   11   15 
   8   14   15   15 


## Policy Iteration

In [369]:
V = dict( {s : random.random() for s in states} )
V.update ( {s : 0 for s in terminal_states } )
pi = dict( {s : random.choice(list(state_actions[s])) for s in states if len(state_actions[s]) > 0} )
pi.update( { s: None for s in terminal_states} )

In [382]:
count= 0
gamma = 0.4
done = False

while not done:
    while True:
        delta = 0
        for s in states:
            v = V[s]
            if pi[s]:
                next_s = transition(s,pi[s])
                r = reward(s, pi[s])
                if (s == 1):
                    print ("help:", s, v, next_s, r, gamma, V[next_s])
                V[s] = 1 * (r + gamma * V[next_s])
                delta = max(delta, abs(v - V[s]))
                #print(delta, s, V[s])
        if delta < 2:
            break
    done = True
    #print(pi)
    optimal = dict()
    for s in states:
        if s not in terminal_states:
            children = state_actions[s]
            best_child = max([c for c in children], \
                             key = lambda x: (-1+gamma*V[transition(s,x)]))
            optimal.update({s : best_child} )
            if optimal[s] != pi[s]:
                done = False
                pi[s] = optimal[s]
optimal.update({ s: 'none' for s in terminal_states })

help: 1 -1.0 0 -1 0.4 0


In [383]:
terminal_states

(0, 15)

In [389]:
for i in range(world_size):
    for j in range(world_size):
        print (f"{arrows[optimal[i*world_size + j]]:2}", end=' ')
    print()

   🠈  🠈  🠈  
🠉  🠈  🠊  🠋  
🠉  🠊  🠊  🠋  
🠊  🠊  🠊     


In [390]:
for i in range(world_size):
    for j in range(world_size):
        print (f"{V[i*world_size + j]:6}", end=' ')
    print()

     0   -1.0   -1.4  -1.56 
  -1.0   -1.4  -1.56   -1.4 
  -1.4  -1.56   -1.4   -1.0 
 -1.56   -1.4   -1.0      0 
