### gridworld 4*4


    This is a WindyGrid World environment.
    You are an agent on an MxN grid and your goal is to reach the terminal
    state at the top left or the bottom right corner.
    

    For example, a 4x4 grid looks as follows:

    T  o  o  o
    o  x  o  o
    o  o  o  o
    o  o  o  T

    x is your position and T are the two terminal states.

    You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
    Actions going off the edge leave you in your current state.
    You receive a reward of -1 at each step until you reach a terminal state.
    Notice : the wind is blowing , so you won't always move in the direction you intend.
    """"""You dont need to use the AMAlearn package on this hands on session"""""
    

### Getting Started

Run the following code and observe the result.

In [1]:
import random
import pprint

In [2]:
import io
import numpy as np
import sys
from gym.envs.toy_text import discrete

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

class WindyGridworldEnv(discrete.DiscreteEnv):
    """
    This is Grid World environment. 
    You are an agent on an MxN grid and your goal is to reach the terminal
    state at the top left or the bottom right corner.

    For example, a 4x4 grid looks as follows:

    T  o  o  o
    o  x  o  o
    o  o  o  o
    o  o  o  T

    x is your position and T are the two terminal states.

    You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
    Actions going off the edge leave you in your current state.
    You receive a reward of -1 at each step until you reach a terminal state.
    Notice : the wind is blowing , so you won't always move in the direction you intend.
    """

    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self, shape=[4,4]):
        if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
            raise ValueError('shape argument must be a list/tuple of length 2')

        self.shape = shape

        nS = np.prod(shape)
        nA = 4

        MAX_Y = shape[0]
        MAX_X = shape[1]

        P = {s : {a : [] for a in range(nA)} for s in range(nS)}
        grid = np.arange(nS).reshape(shape)
        it = np.nditer(grid, flags=['multi_index'])

        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            # P[s][a] = (prob, next_state, reward, is_done)
            P[s] = {a : [] for a in range(nA)}

            is_done = lambda s: s == 0 or s == (nS - 1)
            reward = 0.0 if is_done(s) else -1.0

            # We're stuck in a terminal state
            if is_done(s):
                P[s][UP] = [(1.0, s, reward, True)]
                P[s][RIGHT] = [(1.0, s, reward, True)]
                P[s][DOWN] = [(1.0, s, reward, True)]
                P[s][LEFT] = [(1.0, s, reward, True)]
            # Not a terminal state
            else:
                ns_up = s if y == 0 else s - MAX_X
                ns_right = s if x == (MAX_X - 1) else s + 1
                ns_down = s if y == (MAX_Y - 1) else s + MAX_X
                ns_left = s if x == 0 else s - 1
                rand = np.random.uniform()
                P[s][UP] = [(0.8, ns_up, reward, is_done(ns_up)),(0.2, ns_right, reward, is_done(ns_up))] 
                P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
                P[s][DOWN] = [(0.8, ns_down, reward, is_done(ns_down)),(0.2, ns_right, reward, is_done(ns_down))] 
                P[s][LEFT] = [(0.8, ns_left, reward, is_done(ns_left)),(0.2, s, reward, is_done(ns_left))] 

            it.iternext()

        # Initial state distribution is uniform
        isd = np.ones(nS) / nS

        # We expose the model of the environment for educational purposes
        # This should not be used in any model-free learning algorithm
        self.P = P

        super(WindyGridworldEnv, self).__init__(nS, nA, P, isd)

    def _render(self, mode='human', close=False):
        """ Renders the current gridworld layout

         For example, a 4x4 grid with the mode="human" looks like:
            T  o  o  o
            o  x  o  o
            o  o  o  o
            o  o  o  T
        where x is your position and T are the two terminal states.
        """
        if close:
            return

        outfile = io.StringIO() if mode == 'ansi' else sys.stdout

        grid = np.arange(self.nS).reshape(self.shape)
        it = np.nditer(grid, flags=['multi_index'])
        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            if self.s == s:
                output = " x "
            elif s == 0 or s == self.nS - 1:
                output = " T "
            else:
                output = " o "

            if x == 0:
                output = output.lstrip()
            if x == self.shape[1] - 1:
                output = output.rstrip()

            outfile.write(output)

            if x == self.shape[1] - 1:
                outfile.write("\n")

            it.iternext()


In [3]:
#run WindyGridworld.ipynb

env =  WindyGridworldEnv()
env.reset()

for _ in range(10):
    env._render()
    state, reward, done, info = env.step(env.action_space.sample()) # Take a random action
    print(info)
env.close()

T  o  o  o
o  o  o  o
o  x  o  o
o  o  o  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
o  o  o  o
o  x  o  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
o  o  o  o
x  o  o  T
{'prob': 1.0}
T  o  o  o
o  o  o  o
o  o  o  o
o  x  o  T
{'prob': 1.0}
T  o  o  o
o  o  o  o
o  o  o  o
o  o  x  T
{'prob': 0.2}
T  o  o  o
o  o  o  o
o  o  o  o
o  o  x  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
o  o  x  o
o  o  o  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
o  x  o  o
o  o  o  T
{'prob': 0.8}
T  o  o  o
o  o  o  o
x  o  o  o
o  o  o  T
{'prob': 0.8}


What are the action space and the state space?

In [4]:
print(env.action_space)
print(env.observation_space)


Discrete(4)
Discrete(16)




---

The action space is number of actions we can take in each state.

The state space is number of states we are allowed to be in as our current state.

---



In [5]:
print(env.nA)

print(env.nS)

4
16


What does the following code indicate?

In [6]:
print(env.P[7][2])

[(0.8, 11, -1.0, False), (0.2, 7, -1.0, False)]




---


env.P represents the transition probabilities of the environment. env.P[7] is the transition probabilities of 8th state of our enviroment. 
env.P[7][2] is when we choose to go down.

It contains information about the probability, next state, reward and whether or not our action is done.

"reward" = 0.0 if we are stuck in the terminal state, else we will get a -1.0 reward. also, "is done" turns true when we are on the last state, terminal state.


---



NOTE: i used given functions as methods inside my classes :)

Find the best policy using the Value Iteration algorithm.

In [7]:
import numpy as np

class ValueIteration(object):
    def __init__(self, env, discount_factor, theta):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.states_value = [0] * self.env.nS
        self.policy = np.zeros([self.env.nS, self.env.nA])
        self.best_actions = []
        self.end_of_ep = False

    def find_action_value(self, curr_state):
        states_value = np.zeros(self.env.nA)
        for action in range(self.env.nA):
            
            for p_s_ns, next_state, state_reward, done in env.P[curr_state][action]:
                states_value[action] += p_s_ns * (state_reward + discount_factor * self.states_value[next_state])
        return states_value
                
    def find_best_value(self, curr_state):
        states_value = self.find_action_value(curr_state)
        return np.max(states_value)
    
    def find_best_action(self, curr_state):
        states_value = self.find_action_value(curr_state)
        best_action = np.argmax(states_value)
        self.best_actions.append(best_action)
        return best_action

    def value_iteration(self):
        while True:
            delta = 0
            for state in range(self.env.nS):
                v_temp = self.states_value[state]
                self.states_value[state] = self.find_best_value(state)
                delta = max(delta, np.abs(v_temp - self.states_value[state]))
            if delta < self.theta:
                    break
    
    def find_policy(self):
        for state in range(self.env.nS):
            best_action = self.find_best_action(state)
            self.policy[state, best_action] = 1.0
            
        print(self.policy)
        
        
    def print_best_actions(self):
        print(self.best_actions)

    def print_states_value(self):
        print(np.round((np.array(self.states_value)),2))
        

In [8]:
discount_factor=1.0
theta=0.0001
my_value_iterator = ValueIteration(env, discount_factor, theta)
my_value_iterator.value_iteration()
my_value_iterator.find_policy()

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


Print what the best action in each state is.

In [9]:
my_value_iterator.print_best_actions()

[0, 3, 3, 2, 0, 0, 2, 2, 0, 2, 2, 2, 1, 1, 1, 0]


Print value in each state is.

In [10]:
my_value_iterator.print_states_value()

[ 0.   -1.25 -2.5  -3.75 -1.53 -2.63 -3.14 -2.5  -2.82 -3.01 -2.05 -1.25
 -3.   -2.   -1.    0.  ]


Find the best policy using the Policy Iteration algorithm.

In [11]:
import numpy as np

class PolicyIteration(object):
    def __init__(self, env, discount_factor, theta):
        self.env = env
        self.discount_factor = discount_factor
        self.theta = theta
        self.states_value = np.zeros(self.env.nS)
        self.policy = np.ones([self.env.nS, self.env.nA]) / self.env.nA
        self.action_values = np.zeros(self.env.nA)
        self.best_actions = []

    def find_action_value(self, curr_state):
        states_value = 0
        for action, action_probability in enumerate(self.policy[curr_state]):
            for p_s_ns, next_state, state_reward, done in env.P[curr_state][action]:
                states_value += action_probability * p_s_ns * (state_reward + discount_factor * self.states_value[next_state])
        return states_value
                
    def find_best_action_value(self, curr_state):
        self.action_values = np.zeros(self.env.nA)
        self.policy_eval(self.policy)
        for action in range(env.nA):
            for p_s_ns, next_state, state_reward, done in env.P[curr_state][action]:
                self.action_values[action] += p_s_ns * (state_reward + discount_factor * self.states_value[next_state])
        return np.argmax(self.action_values)
    
    def find_best_action(self, curr_state):
        states_value = self.find_action_value(curr_state)
        best_action = np.argmax(states_value)
        self.best_actions.append(best_action)
        return best_action

    def policy_eval(self,policy):
        while True:
            delta = 0
            for state in range(self.env.nS):
                v = 0
                v_temp = self.states_value[state]
                v = self.find_action_value(state)
                delta = max(delta, np.abs(v_temp - v))
                self.states_value[state] = v
            if delta < self.theta:
                    break
        return self.states_value
    
    def find_policy(self):
        best_action = self.find_best_action(state)
        self.policy[state, best_action] = 1.0
        print(self.policy)
        
    def policy_improvment(self):
        while True:
            self.policy_eval(self.policy)
            policy_stable = True
            for state in range(self.env.nS):
                chosen_action = np.argmax(self.policy[state])
                best_action = self.find_best_action_value(state)
                if  chosen_action != best_action:
                    policy_stable = False
                self.policy[state] = np.eye(env.nA)[best_action]
                self.best_actions.append(best_action)
            if policy_stable : return self.policy 


    def print_best_actions(self):
        print((np.array(self.best_actions)[:-1]).reshape((4,16)))

    def print_states_value(self):
        print(self.states_value)
        

policy_improvement

In [12]:
discount_factor=1.0
theta=0.00001
my_policy_iterator = PolicyIteration(env, discount_factor, theta)

my_policy_iterator.policy_improvment()
my_policy_iterator.find_policy()

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


Print what the best action in each state is.

In [13]:
my_policy_iterator.print_best_actions()

[[0 3 3 3 0 0 0 0 0 0 0 2 0 1 1 0]
 [0 3 3 3 0 0 0 2 0 2 2 2 1 1 1 0]
 [0 3 3 2 0 0 2 2 0 2 2 2 1 1 1 0]
 [0 3 3 2 0 0 2 2 0 2 2 2 1 1 1 0]]


Print value in each state is.

In [14]:
my_policy_iterator.print_states_value()

[ 0.      -1.25    -2.5     -3.75    -1.5256  -2.628   -3.14    -2.5
 -2.82248 -3.01    -2.05    -1.25    -3.      -2.      -1.       0.     ]


In [15]:
def value_iteration(env, theta=0.0001, discount_factor=1.0):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.        
    """

    V = np.zeros(env.nS)
    policy = np.zeros([env.nS, env.nA])

    while True:
      break
    return policy, V

In [16]:
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
    
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    V_pre = np.zeros(env.nS)
    while True:
        break
    return np.array(V)

In [17]:
def policy_improvement(env, policy_eval_fn = policy_eval, discount_factor=1.0):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
            
    Args:
        env: The OpenAI environment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """

    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    while True:
      break  

    
    return policy, V