# AI in Games, _Reinforcement Learning_<br>Assignment 2

In [70]:
import numpy as np
import contextlib

# Configures numpy print options
@contextlib.contextmanager
def _printoptions(*args, **kwargs):
    original = np.get_printoptions()
    np.set_printoptions(*args, **kwargs)
    try:
        yield
    finally:
        np.set_printoptions(**original)

## Question 1

General abstract class for defining the framework of environment models...

In [71]:
class EnvironmentModel:
    def __init__(self, n_states, n_actions, seed=None):
        self.n_states = n_states
        self.n_actions = n_actions
        self.random_state = np.random.RandomState(seed)

    def p(self, next_state, state, action):
        raise NotImplementedError()

    def r(self, next_state, state, action):
        raise NotImplementedError()

    def draw(self, state, action):
        # Obtaining the probability distribution over state transitions:
        p = [self.p(ns, state, action) for ns in range(self.n_states)]

        # Obtaining the next state & state transition reward:
        next_state = self.random_state.choice(self.n_states, p=p)
        reward = self.r(next_state, state, action)

        return next_state, reward

General class for defining an environment...

In [72]:
class Environment(EnvironmentModel):
    def __init__(self, n_states, n_actions, max_steps, pi, seed=None):
        EnvironmentModel.__init__(self, n_states, n_actions, seed)
        # Maximum number of steps an agent can take:
        self.max_steps = max_steps

        # Probability distribution over initial states:
        self.pi = pi
        if self.pi is None:
          # Defaults to uniform distribution:
          self.pi = np.full(n_states, 1./n_states)

        # Initialising the starting state:
        self.reset()

    def reset(self):
        self.n_steps = 0
        self.state = self.random_state.choice(self.n_states, p=self.pi)
        return self.state

    def step(self, action):
        if action < 0 or action >= self.n_actions:
            raise Exception('Invalid action.')

        # Updating step statistics:
        self.n_steps += 1
        done = (self.n_steps >= self.max_steps)

        # Transitioning the state:
        self.state, reward = self.draw(self.state, action)

        # Returning the new state, reward & whether the agent should stop:
        return self.state, reward, done

    def render(self, policy=None, value=None):
        raise NotImplementedError()

The frozen lake environment class...

In [73]:
class FrozenLake(Environment):
    def __init__(self, lake, slip, max_steps, seed=None):
        """
        lake: A matrix that represents the lake. For example:
        lake =  [['&', '.', '.', '.'],
                ['.', '#', '.', '#'],
                ['.', '.', '.', '#'],
                ['#', '.', '.', '$']]
        slip: The probability that the agent will slip
        max_steps: The maximum number of time steps in an episode
        seed: A seed to control the random number generator (optional)
        """
        # Representations: start:'&', frozen:'.', hole:'#', goal:'$'
        self.lake = np.array(lake)
        self.lake_flat = self.lake.reshape(-1) # Row major representation

        # Probability of "slipping", i.e. picking a random new direction:
        self.slip = slip

        # Number of states to consider
        n_states = self.lake.size + 1
        # NOTE: To see why 1 was added, see the implementation of function `p`

        # Number of actions possible:
        n_actions = 4 # 0 ==> up, 1 ==> left, 2 ==> down, 3 ==> right

        # pi ==> Probability distribution over initial states
        pi = np.zeros(n_states, dtype=float)
        pi[np.where(self.lake_flat == '&')[0]] = 1.0
        # NOTE: In this case, we can only start at the "start" state

        # Setting the absorbing state:
        self.absorbing_state = n_states - 1
        # NOTE `n_states-1` is 1 step out of the range of other valid states

        # Initialising inherited parameters:
        Environment.__init__(self, n_states, n_actions, max_steps, pi, seed=seed)
        # NOTE: The starting state is initialised in the above constructor

    def step(self, action):
        # Slipping with `self.slip` chance:
        if np.random.rand() <= self.slip:
            action = np.random.choice(range(self.n_actions))
        '''
        WHAT SLIPPING MEANS:
        To slip here means to randomly pick a new direction which could be the
        same as the intended direction. This is equivalent to randomly picking
        a new action which could be the same as the intended action.

        NOTE: "Slipping" does not consider whether the intended or altered
        direction leads outside the grid or not.
        '''

        state, reward, done = Environment.step(self, action)
        done = (state == self.absorbing_state) or done
        return state, reward, done

    # Probability of transitioning from `state` to `next_state` given `action`:
    def p(self, next_state, state, action):
        # CASE 1: At or next to absorbing state
        '''
        CHECKING IF WE NEED TO MOVE TO THE ABSORBING STATE:
        If we are at a hole or a goal, then any action takes us to the
        absorbing state. Furthermore, any action from the absorbing state leads
        to the absorbing state. This is why we added 1 to the number of states
        in the instantiation of the environment; this enables iteration upto
        the absorbing state, allowing us to validate the following.
        '''
        if state == self.absorbing_state:
          if next_state == self.absorbing_state: return 1
          return 0

        if self.lake_flat[state] in "#$":
          # NOTE: '#' ==> hole, '$' ==> goal
          if next_state == self.absorbing_state: return 1
          return 0

        #================================================

        # CASE 2: Not at or next to absorbing state
        nRows, nCols = self.lake.shape # Only `nCols` is used here
        # Checking if `action` leads to `next_state`:
        '''
        HOW ACTIONS ARE APPLIED:
        We apply actions as increments or decrements to the index in the row
        major (i.e. flattened array) representation of the frozen lake:

        0 (up) ==> next = cur-nCols
        1 (left) ==> next = cur-1
        2 (down) ==> next = cur+nCols
        3 (right) ==> next = cur+1

        NOTE: `nCols` is defined at the start of this function. `next` denotes
        the next state the action will lead to (unless the action leads out of
        the grid). `cur` denotes the current state. Both `next` and `cur` must
        be grasped as indices of the row major (flattened array) representation
        of frozen lake.
        ------------------------------------------------
        WHEN AN ACTION LEADS THE AGENT OUTSIDE THE GRID:
        Sometimes, an action would lead the agent outside the grid; in such
        cases, the state remains unchanged. Such cases can be identified as
        follows:

        0 (up): When next < 0
        1 (left): When next % nCols > cur % nCols
        2 (down): When next >= n_states
        3 (right): When next % nCols < cur % nCols

        NOTE: In such a case, 1 is returned only if `next_state` == `state`
        '''
        # Obtaining the next state:
        next = self.state + {0:-nCols, 1:-1, 2:nCols, 3:1}[action]
        # Checking if action leads out of the state:
        if {0:next < 0,
            1:next % nCols > self.state % nCols,
            2:next >= self.n_states,
            3:next % nCols < self.state % nCols}[action]:
            # If true, no transition; return 1 only if `next_state` == `state`:
            if next_state == state: return 1
            return 0

        # Given the next state obtained is valid, is it the given next state?
        if next == next_state: return 1 # Yes
        return 0                        # No

    # Reward for transitioning from `state` to `next_state` given `action`:
    def r(self, next_state, state, action):
        # Reward of 1 is obtained only by taking an action out of a goal:
        if self.lake_flat[state] == '$': return 1

        # In all other cases, no reward is obtained:
        return 0

    def render(self, policy=None, value=None):
        if policy is None:
            lake = np.array(self.lake_flat)
            if self.state < self.absorbing_state: lake[self.state] = '@'
            print(lake.reshape(self.lake.shape))
        else:
            # UTF-8 arrows look nicer, but cannot be used in LaTeX
            # https://www.w3schools.com/charsets/ref_utf_arrows.asp
            actions = ['^', '<', '_', '>']
            print('Lake:')
            print(self.lake)
            print('Policy:')
            policy = np.array([actions[a] for a in policy[:-1]])
            print(policy.reshape(self.lake.shape))
            print('Value:')
            with _printoptions(precision=3, suppress=True):
                print(value[:-1].reshape(self.lake.shape))

The following function `play` can be used to interactively test the environment...

In [74]:
def play(env):
    actions = ['w', 'a', 's', 'd']
    state = env.reset()
    env.render()
    done = False
    while not done:
        c = input('\nMove: ')
        if c == 'x': break
        if c not in actions:
            raise Exception('Invalid action')
        state, r, done = env.step(actions.index(c))
        env.render()
        print('Reward: {0}.'.format(r))

Play test...

In [75]:
# The lake environment display:
lake = [['&', '.', '.', '.'],
        ['.', '#', '.', '#'],
        ['.', '.', '.', '#'],
        ['#', '.', '.', '$']]
play(FrozenLake(lake=lake, slip=0.1, max_steps=10))

[['@' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]

Move: d
[['&' '@' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Reward: 0.

Move: d
[['&' '.' '@' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Reward: 0.

Move: s
[['&' '.' '.' '.']
 ['.' '#' '@' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Reward: 0.

Move: s
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '@' '#']
 ['#' '.' '.' '$']]
Reward: 0.

Move: s
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '@' '$']]
Reward: 0.

Move: d
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '@']]
Reward: 0.

Move: w
[['&' '.' '.' '.']
 ['.' '#' '.' '#']
 ['.' '.' '.' '#']
 ['#' '.' '.' '$']]
Reward: 1.


### Viewing the given data on state transition
Obtaining the stored array of state-transition probabilities...

In [None]:
from google.colab import files
files.upload()
data_npy_file = 'p.npy'

# Loading data from .npy file
data = np.load(data_npy_file, allow_pickle=True)

Saving p.npy to p.npy


In [None]:
from pandas import DataFrame

# Restructuring data a bit:
DATA = []
for i in range(data.shape[0]):
    for j in range(data.shape[1]):
        DATA.append(np.concatenate((np.array([i, j]), data[i,j])))
DATA = np.array(DATA)

# Presenting data in a table:
DF = {"fromState":DATA[:,0], "toState":DATA[:,1]}
for i in range(data.shape[2]):
    DF[f"action{i+2}"] = DATA[:,i+2]
DataFrame(data=DF)

Unnamed: 0,fromState,toState,action2,action3,action4,action5
0,0.0,0.0,0.950,0.950,0.050,0.050
1,0.0,1.0,0.025,0.925,0.025,0.025
2,0.0,2.0,0.000,0.000,0.000,0.000
3,0.0,3.0,0.000,0.000,0.000,0.000
4,0.0,4.0,0.925,0.025,0.025,0.025
...,...,...,...,...,...,...
284,16.0,12.0,1.000,1.000,1.000,1.000
285,16.0,13.0,0.000,0.000,0.000,0.000
286,16.0,14.0,0.000,0.000,0.000,0.000
287,16.0,15.0,1.000,1.000,1.000,1.000
