In [10]:
import numpy as np
import sys
from gym.envs.toy_text import discrete
from pprint import pprint

In [7]:

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3


class CliffWalkingEnv(discrete.DiscreteEnv):
    """
    This is a simple implementation of the Gridworld Cliff
    reinforcement learning task.
    Adapted from Example 6.6 (page 145) from Reinforcement Learning: An Introduction
    by Sutton and Barto:
    http://people.inf.elte.hu/lorincz/Files/RL_2006/SuttonBook.pdf
    
    With inspiration from:
    https://github.com/dennybritz/reinforcement-learning/blob/master/lib/envs/cliff_walking.py
    The board is a 4x12 matrix, with (using Numpy matrix indexing):
        [3, 0] as the start at bottom-left
        [3, 11] as the goal at bottom-right
        [3, 1..10] as the cliff at bottom-center
    Each time step incurs -1 reward, and stepping into the cliff incurs -100 reward 
    and a reset to the start. An episode terminates when the agent reaches the goal.
    """
    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self):
        self.shape = (4, 12)
        self.start_state_index = np.ravel_multi_index((3, 0), self.shape)

        nS = np.prod(self.shape)
        nA = 4

        # Cliff Location
        self._cliff = np.zeros(self.shape, dtype=np.bool)
        self._cliff[3, 1:-1] = True

        # Calculate transition probabilities and rewards
        P = {}
        for s in range(nS):
            position = np.unravel_index(s, self.shape)
            P[s] = {a: [] for a in range(nA)}
            P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
            P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
            P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
            P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])

        # Calculate initial state distribution
        # We always start in state (3, 0)
        isd = np.zeros(nS)
        isd[self.start_state_index] = 1.0

        super(CliffWalkingEnv, self).__init__(nS, nA, P, isd)

    def _limit_coordinates(self, coord):
        """
        Prevent the agent from falling out of the grid world
        :param coord: 
        :return: 
        """
        coord[0] = min(coord[0], self.shape[0] - 1)
        coord[0] = max(coord[0], 0)
        coord[1] = min(coord[1], self.shape[1] - 1)
        coord[1] = max(coord[1], 0)
        return coord

    def _calculate_transition_prob(self, current, delta):
        """
        Determine the outcome for an action. Transition Prob is always 1.0. 
        :param current: Current position on the grid as (row, col) 
        :param delta: Change in position for transition
        :return: (1.0, new_state, reward, done)
        """
        new_position = np.array(current) + np.array(delta)
        new_position = self._limit_coordinates(new_position).astype(int)
        new_state = np.ravel_multi_index(tuple(new_position), self.shape)
        if self._cliff[tuple(new_position)]:
            return [(1.0, self.start_state_index, -100, False)]

        terminal_state = (self.shape[0] - 1, self.shape[1] - 1)
        is_done = tuple(new_position) == terminal_state
        return [(1.0, new_state, -1, is_done)]

    def render(self, mode='human'):
        outfile = sys.stdout

        for s in range(self.nS):
            position = np.unravel_index(s, self.shape)
            if self.s == s:
                output = " x "
            # Print terminal state
            elif position == (3, 11):
                output = " T "
            elif self._cliff[position]:
                output = " C "
            else:
                output = " o "

            if position[1] == 0:
                output = output.lstrip()
            if position[1] == self.shape[1] - 1:
                output = output.rstrip()
                output += '\n'

            outfile.write(output)
        outfile.write('\n')

In [8]:
env = CliffWalkingEnv()

In [11]:
pprint(env.P)

{0: {0: [(1.0, 0, -1, False)],
     1: [(1.0, 1, -1, False)],
     2: [(1.0, 12, -1, False)],
     3: [(1.0, 0, -1, False)]},
 1: {0: [(1.0, 1, -1, False)],
     1: [(1.0, 2, -1, False)],
     2: [(1.0, 13, -1, False)],
     3: [(1.0, 0, -1, False)]},
 2: {0: [(1.0, 2, -1, False)],
     1: [(1.0, 3, -1, False)],
     2: [(1.0, 14, -1, False)],
     3: [(1.0, 1, -1, False)]},
 3: {0: [(1.0, 3, -1, False)],
     1: [(1.0, 4, -1, False)],
     2: [(1.0, 15, -1, False)],
     3: [(1.0, 2, -1, False)]},
 4: {0: [(1.0, 4, -1, False)],
     1: [(1.0, 5, -1, False)],
     2: [(1.0, 16, -1, False)],
     3: [(1.0, 3, -1, False)]},
 5: {0: [(1.0, 5, -1, False)],
     1: [(1.0, 6, -1, False)],
     2: [(1.0, 17, -1, False)],
     3: [(1.0, 4, -1, False)]},
 6: {0: [(1.0, 6, -1, False)],
     1: [(1.0, 7, -1, False)],
     2: [(1.0, 18, -1, False)],
     3: [(1.0, 5, -1, False)]},
 7: {0: [(1.0, 7, -1, False)],
     1: [(1.0, 8, -1, False)],
     2: [(1.0, 19, -1, False)],
     3: [(1.0, 6, -1, Fa

In [40]:
import numpy as np
import sys
from six import StringIO, b

from gym import utils
from gym.envs.toy_text import discrete

# actions
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

MAPS = {
    "3x4": [
        "EEEG",
        "EXEH",
        "SEEE",
    ],
    "4x4": [
        "GEEE",
        "EEEE",
        "EEEE",
        "EEEG",
    ],
    "5x4": [
        "GEEE",
        "EEEE",
        "EEEE",
        "EEEG",
        "XEXX",
    ]
}


class ClassicGridEnv(discrete.DiscreteEnv):
    """
    Example Gridworld:

        EEEG
        EXEH
        SEEE

    S : starting cell
    E : empty cell
    H : hole
    G : goal
    X : obstacle

    The episode ends when the agent reaches a terminal state (goal or pit). The agent
    receives a reward when transitioning in the environment. See below for details.

    Rewards r(s, a, s'):
        3x4:
            r(goal,any action, s') = 1
            r(pit, any action, s') = -1,
            r(nonterminal state, any action, s') =  -0.1

        4x4 or 5x4:
            r(nonterminal state, any action, s') =  -1
    """

    metadata = {'render.modes': ['human', 'ansi']}


    def __init__(self, desc=None, map_name=None, is_noisy=False, nrew=-0.1):
        if desc is None and map_name is None:
            raise ValueError('Must provide either desc or map_name')
        elif desc is None:
            desc = MAPS[map_name]
        self.desc = desc = np.asarray(desc, dtype='c')
        self.nrow, self.ncol = nrow, ncol = desc.shape
        self.reward_range = (-1, 1)

        nA = 4
        nS = nrow * ncol

        #initial state distribution
        isd = np.array(desc == b'S').astype('float64').ravel()
        isd /= isd.sum()

        # Empty dict of dict of lists for environment dynamics
        P = {s: {a : [] for a in range(nA)} for s in range(nS)}

        def to_s(row, col):
            return row*ncol + col

        def inc(row, col, a):
            # left
            if a == 0:
                col = max(col-1, 0)
            # down
            if a == 1:
                row = min(row+1, nrow-1)
            # right
            if a == 2:
                col = min(col+1, ncol-1)
            if a == 3:
                row = max(row-1, 0)
            return (row, col)

        # create state-transition probabilities
        for row in range(nrow):
            for col in range(ncol):
                s = to_s(row, col)
                for a in range(nA):
                    li = P[s][a]
                    letter = desc[row, col]
                    if letter == b'G':
                        li.append((1.0, s, 0, True))
                    elif letter == b'H':
                        li.append((1.0, s, 0, True))
                    elif letter == b'X':
                        li.append((0.0, s, 0, False))
                    else:
                        if is_noisy:
                            for b in [(a-1)%4, a, (a+1)%4]:
                                newrow, newcol = inc(row, col, b)
                                newletter = desc[newrow, newcol]
                                if newletter == b'X':
                                    newrow, newcol = row, col
                                    newletter = desc[newrow, newcol]
                                newstate = to_s(newrow, newcol)
                                done = bytes(newletter) in b'GH'
                                rew = nrew
#                                 if done:
#                                     rew = float(newletter == b'G') - float(newletter == b'H')
#                                 else:
#                                     rew = nrew
                                # rew = float(newletter == b'G') - float(newletter == b'H') + nrew * (float(newletter != b'G') * float(newletter != b'H'))
                                # rew = -0.02
                                # rew = float(newletter == b'G') - float(newletter == b'H') - 0.02
                                #
                                # if map_name == "3x4":
                                #     rew = float(newletter == b'G') - float(newletter == b'H') + nrew * (float(newletter != b'G') * float(newletter != b'H'))
                                #     # rew = nrew
                                # else:
                                #     rew = float(newletter == b'G') - 1
                                li.append((0.8 if b == a else 0.1, newstate, rew, done))
                        else:
                            newrow, newcol = inc(row, col, a)
                            newletter = desc[newrow, newcol]
                            if newletter == b'X':
                                newrow, newcol = row, col
                                newletter = desc[newrow, newcol]
                            newstate = to_s(newrow, newcol)
                            newletter = desc[newrow, newcol]
                            done = bytes(newletter) in b'GH'
                            if map_name == "3x4":
                                rew = float(newletter == b'G') - float(newletter == b'H') + nrew * (float(newletter != b'G') * float(newletter != b'H'))
                            else:
                                rew = float(newletter == b'G') - 1
                            li.append((1.0, newstate, rew, done))

        super(ClassicGridEnv, self).__init__(nS, nA, P, isd)

    def render(self, mode='human'):
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        if mode != 'human':
            return outfile


class ClassicGridEnv3x4(ClassicGridEnv):
    def __init__(self, nrew):
        super(ClassicGridEnv3x4, self).__init__(map_name="3x4", is_noisy=True, nrew=nrew)

class ClassicGridEnv4x4(ClassicGridEnv):
    def __init__(self):
        super(ClassicGridEnv4x4, self).__init__(map_name="4x4", is_noisy=False)

class ClassicGridEnv5x4Static(ClassicGridEnv):
    """
    Do not allow agent to move from cell 13 to cell 15 (17 in array).
    """
    def __init__(self):
        super(ClassicGridEnv5x4Static, self).__init__(map_name="5x4", is_noisy=False)
        self.P[13][1] =  [(1.0, 13, -1.0, False)]
        # State 15 is state 17
        self.P[17] = {
        0: [(1.0, 12, -1.0, False)],
        1: [(1.0, 17, -1.0, False)],
        2: [(1.0, 14, -1.0, False)],
        3: [(1.0, 13, -1.0, False)]
        }

class ClassicGridEnv5x4Dynamic(ClassicGridEnv):
    """
    Allow agent to move from cell 13 to cell 15 (17 in array).
    """
    def __init__(self):
        super(ClassicGridEnv5x4Dynamic, self).__init__(map_name="5x4", is_noisy=False)
        self.P[17] = {
        0: [(1.0, 12, -1.0, False)],
        1: [(1.0, 17, -1.0, False)],
        2: [(1.0, 14, -1.0, False)],
        3: [(1.0, 13, -1.0, False)]
        }


# if __name__ =='__main__':
#     import pdb
#     from copy import deepcopy
#     from collections import defaultdict
#     from pprint import pprint
#     import sys
#     if "../" not in sys.path:
#         sys.path.append("../")

#     import numpy as np



#     env = ClassicGridEnv3x4(nrew=-0.04)
#     optimal_3x4_policy = np.array([
#         [0.0, 0.0, 1.0, 0.0],
#         [0.0, 0.0, 1.0, 0.0],
#         [0.0, 0.0, 1.0, 0.0],
#         [0.0, 0.0, 1.0, 0.0],

#         [0.0, 0.0, 0.0, 1.0],
#         [0.0, 0.0, 0.0, 0.0],
#         [0.0, 0.0, 0.0, 1.0],
#         [0.0, 0.0, 0.0, 1.0],

#         [0.0, 0.0, 0.0, 1.0],
#         [1.0, 0.0, 0.0, 0.0],
#         [0.0, 0.0, 1.0, 0.0],
#         [0.0, 0.0, 0.0, 1.0],
#     ])
    
#     print(policy_evaluation(optimal_3x4_policy, env, discount_factor=1.0).reshape((3,4)))


In [43]:
def policy_evaluation(policy, env, discount_factor=1.0, theta=0.00001):
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    V[3] = 1
    V[7] = -1
    while True:
        delta = 0
        # For each state, perform a "full backup"
        for s in range(env.nS):
            if s==3 or s==7:
                continue
            v = 0
            # Look at the possible next actions
            for a, action_prob in enumerate(policy[s]):
                # For each action, look at the possible next states...
                for  prob, next_state, reward, done in env.P[s][a]:
                    # Calculate the expected value
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Stop evaluating once our value function change is below a threshold
        if delta < theta:
            break
    return np.array(V)

env2 = ClassicGridEnv3x4(nrew=-0.04)
optimal_policy = np.array([
    [0.0, 0.0, 1.0, 0.0],
    [0.0, 0.0, 1.0, 0.0],
    [0.0, 0.0, 1.0, 0.0],
    [0.0, 0.0, 1.0, 0.0],

    [0.0, 0.0, 0.0, 1.0],
    [0.0, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 1.0],
    [0.0, 0.0, 0.0, 1.0],

    [0.0, 0.0, 0.0, 1.0],
    [1.0, 0.0, 0.0, 0.0],
    [0.0, 0.0, 1.0, 0.0],
    [1.0, 0.0, 0.0, 0.0],
])
print('State 2')
pprint(env2.P[2])
print()
print('State 3: Goal')
pprint(env2.P[3])
print()
pprint(env2.P[6])
print()
print('State 7: Hole')
pprint(env2.P[7])
print()
print('State 11')
pprint(env2.P[11])



State 2
{0: [(0.1, 2, -0.04, False), (0.8, 1, -0.04, False), (0.1, 6, -0.04, False)],
 1: [(0.1, 1, -0.04, False), (0.8, 6, -0.04, False), (0.1, 3, -0.04, True)],
 2: [(0.1, 6, -0.04, False), (0.8, 3, -0.04, True), (0.1, 2, -0.04, False)],
 3: [(0.1, 3, -0.04, True), (0.8, 2, -0.04, False), (0.1, 1, -0.04, False)]}

State 3: Goal
{0: [(1.0, 3, 0, True)],
 1: [(1.0, 3, 0, True)],
 2: [(1.0, 3, 0, True)],
 3: [(1.0, 3, 0, True)]}

{0: [(0.1, 2, -0.04, False), (0.8, 6, -0.04, False), (0.1, 10, -0.04, False)],
 1: [(0.1, 6, -0.04, False), (0.8, 10, -0.04, False), (0.1, 7, -0.04, True)],
 2: [(0.1, 10, -0.04, False), (0.8, 7, -0.04, True), (0.1, 2, -0.04, False)],
 3: [(0.1, 7, -0.04, True), (0.8, 2, -0.04, False), (0.1, 6, -0.04, False)]}

State 7: Hole
{0: [(1.0, 7, 0, True)],
 1: [(1.0, 7, 0, True)],
 2: [(1.0, 7, 0, True)],
 3: [(1.0, 7, 0, True)]}

State 11
{0: [(0.1, 7, -0.04, True), (0.8, 10, -0.04, False), (0.1, 11, -0.04, False)],
 1: [(0.1, 10, -0.04, False), (0.8, 11, -0.04, Fals

In [44]:
print(policy_evaluation(optimal_policy, env2).reshape((3,4))) 

[[ 0.81155822  0.86780822  0.91780822  1.        ]
 [ 0.76155822  0.          0.66027397 -1.        ]
 [ 0.70530822  0.65530822 -0.52098982 -0.61865674]]


In [46]:
pprint(env2.P[10])

{0: [(0.1, 6, -0.04, False), (0.8, 9, -0.04, False), (0.1, 10, -0.04, False)],
 1: [(0.1, 9, -0.04, False), (0.8, 10, -0.04, False), (0.1, 11, -0.04, False)],
 2: [(0.1, 10, -0.04, False), (0.8, 11, -0.04, False), (0.1, 6, -0.04, False)],
 3: [(0.1, 11, -0.04, False), (0.8, 6, -0.04, False), (0.1, 9, -0.04, False)]}


In [47]:
pprint(env2.P[11])

{0: [(0.1, 7, -0.04, True), (0.8, 10, -0.04, False), (0.1, 11, -0.04, False)],
 1: [(0.1, 10, -0.04, False), (0.8, 11, -0.04, False), (0.1, 11, -0.04, False)],
 2: [(0.1, 11, -0.04, False), (0.8, 11, -0.04, False), (0.1, 7, -0.04, True)],
 3: [(0.1, 11, -0.04, False), (0.8, 7, -0.04, True), (0.1, 10, -0.04, False)]}
