In [480]:
# Most of this code is adapted from https://github.com/berkeleydeeprlcourse/homework/

#The following function generates a sample from a probability distribution. You may choose to ignore the logic. Just see how to use it.

import sys
import random
import numpy as np
def weighted_choice(v, p):
   total = sum(p)
   r = random.uniform(0, total)
   upto = 0
   for c, w in zip(v,p):
      if upto + w >= r:
         return c
      upto += w
   assert False, "Shouldn't get here"

In [481]:
#Using the above function to sample from a distribution

#6-faced die with equal probabilities
Sample_Space=[1,2,3,4,5,6]
Prob_Values=[1/6, 1/6, 1/6, 1/6, 1/6, 1/6]

#Generating a sample
weighted_choice(Sample_Space, Prob_Values)

4

In [482]:
#Creating a class for MDP environment definition that takes Transition Probability Matrix p(s'|s,a) and reward E[R_{t+1} | s,a,s'] as inputs

class MDP:
    def __init__(self, transition_probs, rewards, initial_state=None):
        """
        Defines an MDP. Compatible with gym Env.
        :param transition_probs: transition_probs[s][a][s1] = P(s1 | s, a)
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> prob]
            For each state and action, probabilities of next states should sum to 1
            If a state has no actions available, it is considered terminal
        :param rewards: rewards[s][a][s1] = r(s,a,s')
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> reward]
            The reward for anything not mentioned here is zero.
        :param get_initial_state: a state where agent starts or a callable() -> state
            By default, picks initial state at random.

        States and actions can be anything you can use as dict keys, but we recommend that you use strings or integers

        Here's an example from MDP depicted on http://bit.ly/2jrNHNr
        transition_probs = {
              's0':{
                'a0': {'s0': 0.5, 's2': 0.5},
                'a1': {'s2': 1}
              },
              's1':{
                'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
                'a1': {'s1': 0.95, 's2': 0.05}
              },
              's2':{
                'a0': {'s0': 0.4, 's1': 0.6},
                'a1': {'s0': 0.3, 's1': 0.3, 's2':0.4}
              }
            }
        rewards = {
            's1': {'a0': {'s0': +5}},
            's2': {'a1': {'s0': -1}}
        }
        """
        self._check_param_consistency(transition_probs, rewards)
        self._transition_probs = transition_probs
        self._rewards = rewards
        self._initial_state = initial_state
        self.n_states = len(transition_probs)
        self.reset()

    def get_all_states(self):
        """ return a tuple of all possible states """
        return tuple(self._transition_probs.keys())

    def get_possible_actions(self, state):
        """ return a tuple of possible actions in a given state """
        return tuple(self._transition_probs.get(state, {}).keys())

    def is_terminal(self, state):
        """ return True if state is terminal or False if it isn't """
        return len(self.get_possible_actions(state)) == 0

    def get_next_states(self, state, action):
        """ return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._transition_probs[state][action]

    def get_transition_prob(self, state, action, next_state):
        """ return P(next_state | state, action) """
        return self.get_next_states(state, action).get(next_state, 0.0)

    def get_reward(self, state, action, next_state):
        """ return the reward you get for taking action in state and landing on next_state"""
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._rewards.get(state, {}).get(action, {}).get(next_state, 0.0)

    def reset(self):
        """ reset the game, return the initial state"""
        if self._initial_state is None:
            self._current_state = random.choice(tuple(self._transition_probs.keys()))
        elif self._initial_state in self._transition_probs:
            self._current_state = self._initial_state
        elif callable(self._initial_state):
            self._current_state = self._initial_state()
        else:
            raise ValueError("initial state %s should be either a state or a function() -> state" % self._initial_state)
        return self._current_state

    def step(self, action):
        """ take action, return next_state, reward, is_done, empty_info """
        possible_states, probs = zip(*self.get_next_states(self._current_state, action).items())
        next_state = weighted_choice(possible_states, p=probs)
        reward = self.get_reward(self._current_state, action, next_state)
        is_done = self.is_terminal(next_state)
        self._current_state = next_state
        return next_state, reward, is_done, {}

    def render(self):
        print("Currently at %s" % self._current_state)

    def _check_param_consistency(self, transition_probs, rewards):
        for state in transition_probs:
            assert isinstance(transition_probs[state], dict), "transition_probs for %s should be a dictionary " \
                                                              "but is instead %s" % (
                                                              state, type(transition_probs[state]))
            for action in transition_probs[state]:
                assert isinstance(transition_probs[state][action], dict), "transition_probs for %s, %s should be a " \
                                                                          "a dictionary but is instead %s" % (
                                                                              state, action,
                                                                              type(transition_probs[state, action]))
                next_state_probs = transition_probs[state][action]
                assert len(next_state_probs) != 0, "from state %s action %s leads to no next states" % (state, action)
                sum_probs = sum(next_state_probs.values())
                assert abs(sum_probs - 1) <= 1e-10, "next state probabilities for state %s action %s " \
                                                    "add up to %f (should be 1)" % (state, action, sum_probs)
        for state in rewards:
            assert isinstance(rewards[state], dict), "rewards for %s should be a dictionary " \
                                                     "but is instead %s" % (state, type(transition_probs[state]))
            for action in rewards[state]:
                assert isinstance(rewards[state][action], dict), "rewards for %s, %s should be a " \
                                                                 "a dictionary but is instead %s" % (
                                                                 state, action, type(transition_probs[state, action]))
        msg = "The Enrichment Center once again reminds you that Android Hell is a real place where" \
              " you will be sent at the first sign of defiance. "
        assert None not in transition_probs, "please do not use None as a state identifier. " + msg
        assert None not in rewards, "please do not use None as an action identifier. " + msg

In [483]:
#Creating a MDP shown in this Figure https://upload.wikimedia.org/wikipedia/commons/2/21/Markov_Decision_Process_example.png

transition_probs = {'s0':{'a0': {'s0': 0.5, 's2': 0.5},'a1': {'s2': 1}},
                    's1':{'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},'a1': {'s1': 0.95, 's2': 0.05}},
                    's2':{'a0': {'s0': 0.4, 's1': 0.6},'a1': {'s0': 0.3, 's1': 0.3, 's2':0.4}}
                   }
rewards = {'s1': {'a0': {'s0': +5}},'s2': {'a1': {'s0': -1}}}

In [484]:
#Intialize an environment
env=MDP(transition_probs, rewards)

In [485]:
#To know the current state
env.render()

Currently at s1


In [486]:
#To take an action and observe the next state, reward, 'whether terminal state reached or not', any other description.
env.step('a1')

('s1', 0.0, False, {})

## Assignment 2 - Part 1

In [487]:
transition_probs = {'s0':{'A_U': {'s0': 0.9, 's1': 0.1}, 'A_D': {'s4': 0.8, 's0': 0.1, 's1': 0.1}, 'A_L': {'s0': 0.9, 's4': 0.1}, 'A_R': {'s1': 0.8, 's0': 0.1, 's4': 0.1}},
                    's1':{'A_U': {'s1': 0.8, 's2': 0.1, 's0': 0.1}, 'A_D': {'s1': 0.8, 's2': 0.1, 's0': 0.1},   'A_L': {'s0': 0.8, 's1': 0.2}, 'A_R': {'s2': 0.8, 's1': 0.2}},
                    's2':{'A_U': {'s2': 0.8, 's3': 0.1, 's1': 0.1}, 'A_D': {'s6': 0.8, 's1': 0.1, 's3': 0.1}, 'A_L': {'s1': 0.8, 's6': 0.1, 's2': 0.1}, 'A_R': {'s3': 0.8, 's2': 0.1, 's6': 0.1}},
                    's3':{},
                    's4':{'A_U': {'s0': 0.8, 's4': 0.2}, 'A_D': {'s8': 0.8, 's4': 0.2}, 'A_L': {'s4': 0.8, 's8': 0.1, 's0': 0.1}, 'A_R': {'s4': 0.8, 's8': 0.1, 's0': 0.1}},
                    's5':{},
                    's6':{'A_U': {'s2': 0.8, 's6': 0.1, 's7': 0.1}, 'A_D': {'s10': 0.8, 's6': 0.1, 's7': 0.1}, 'A_L': {'s6': 0.8, 's2': 0.1, 's10': 0.1}, 'A_R': {'s7': 0.8, 's10': 0.1, 's2': 0.1}},
                    's7':{},
                    's8':{'A_U': {'s4': 0.8, 's8': 0.1, 's9': 0.1}, 'A_D': {'s8': 0.9, 's9': 0.1}, 'A_L': {'s8': 0.9, 's4': 0.1}, 'A_R': {'s9': 0.8, 's8': 0.1, 's4': 0.1}},
                    's9':{'A_U': {'s9': 0.8, 's8': 0.1, 's10': 0.1}, 'A_D': {'s9': 0.8, 's8': 0.1, 's10': 0.1}, 'A_L': {'s8': 0.8, 's9': 0.2}, 'A_R': {'s10': 0.8, 's9': 0.2}},
                    's10':{'A_U': {'s6': 0.8, 's9': 0.1, 's11': 0.1}, 'A_D': {'s10': 0.8, 's9': 0.1, 's11': 0.1}, 'A_L': {'s9': 0.8, 's10': 0.1, 's6': 0.1}, 'A_R': {'s11': 0.8, 's10': 0.1, 's6': 0.1}},
                    's11':{'A_U': {'s7': 0.8, 's10': 0.1, 's11': 0.1}, 'A_D': {'s11': 0.9, 's10': 0.1}, 'A_L': {'s10': 0.8, 's11': 0.1, 's7': 0.1}, 'A_R': {'s11': 0.9, 's7': 0.1}}
                    }

rewards = {'s0':{'A_U': {'s0': -0.01, 's1': -0.01}, 'A_D': {'s4': -0.01, 's0': -0.01, 's1': -0.01}, 'A_L': {'s0': -0.01, 's4': -0.01}, 'A_R': {'s1': -0.01, 's0': -0.01, 's4': -0.01}},
            's1':{'A_U': {'s1': -0.01, 's2': -0.01, 's0': -0.01}, 'A_D': {'s1': -0.01, 's2': -0.01, 's0': -0.01}, 'A_L': {'s0': -0.01, 's1': -0.01}, 'A_R': {'s2': -0.01, 's1': -0.01}},
            's2':{'A_U': {'s2': -0.01, 's3': 1, 's1': -0.01}, 'A_D': {'s6': -0.01, 's1': -0.01, 's3': 1}, 'A_L': {'s1': -0.01, 's6': -0.01, 's2': -0.01}, 'A_R': {'s3': 1, 's2': -0.01, 's6': -0.01}},
            's3':{}, 
            's4':{'A_U': {'s0': -0.01, 's4': -0.01}, 'A_D': {'s8': -0.01, 's4': -0.01}, 'A_L': {'s4': -0.01, 's8': -0.01, 's0': -0.01}, 'A_R': {'s4': -0.01, 's8': -0.01, 's0': -0.01}},
            's5':{},
            's6':{'A_U': {'s2': -0.01, 's6': -0.01, 's7': -1}, 'A_D': {'s10': -0.01, 's6': -0.01, 's7': -1}, 'A_L': {'s6': -0.01, 's2': -0.01, 's10': -0.01}, 'A_R': {'s7': -1, 's10': -0.01, 's2': -0.01}},
            's7':{},
            's8':{'A_U': {'s4': -0.01, 's8': -0.01, 's9': -0.01}, 'A_D': {'s8': -0.01, 's9': -0.01}, 'A_L': {'s8': -0.01, 's4': -0.01}, 'A_R': {'s9': -0.01, 's8': -0.01, 's4': -0.01}},
            's9':{'A_U': {'s9': -0.01, 's8': -0.01, 's10': -0.01}, 'A_D': {'s9': -0.01, 's8': -0.01, 's10': -0.01}, 'A_L': {'s8': -0.01, 's9': -0.01}, 'A_R': {'s10': -0.01, 's9': -0.01}},
            's10':{'A_U': {'s6': -0.01, 's9': -0.01, 's11': -0.01}, 'A_D': {'s10': -0.01, 's9': -0.01, 's11': -0.01}, 'A_L': {'s9': -0.01, 's10': -0.01, 's6': -0.01}, 'A_R': {'s11': -0.01, 's10': -0.01, 's6': -0.01}},
            's11':{'A_U': {'s7': -1, 's10': -0.01, 's11': -0.01}, 'A_D': {'s11': -0.01, 's10': -0.01}, 'A_L': {'s10': -0.01, 's11': -0.01, 's7': -1}, 'A_R': {'s11': -0.01, 's7': -1}}
            }

initial_state = 's0'


In [488]:
env1=MDP(transition_probs, rewards, initial_state)

In [489]:
env1.get_all_states()

('s0', 's1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11')

In [490]:
env1.get_possible_actions('s6')

('A_U', 'A_D', 'A_L', 'A_R')

In [491]:
env1.is_terminal('s3')

True

In [492]:
env1.get_next_states('s2','A_R')

{'s3': 0.8, 's2': 0.1, 's6': 0.1}

In [493]:
env1.get_transition_prob('s2','A_R','s6')

0.1

In [494]:
env1.get_reward('s2','A_R','s6')

-0.01

In [495]:
Actions=['A_U','A_L','A_D','A_R']
Prob_Values=[1/4, 1/4, 1/4, 1/4]

#Generating a sample
Policy = weighted_choice(Actions, Prob_Values)

In [496]:
is_done = 0
for i in range(100):
    if(is_done != 1):
        state, reward, is_done,_ = env1.step(weighted_choice(Actions, Prob_Values))
        print(i,weighted_choice(Actions, Prob_Values))
        print(state,reward,is_done)

0 A_R
s0 -0.01 False
1 A_R
s0 -0.01 False
2 A_D
s0 -0.01 False
3 A_U
s0 -0.01 False
4 A_L
s0 -0.01 False
5 A_U
s4 -0.01 False
6 A_R
s8 -0.01 False
7 A_R
s8 -0.01 False
8 A_R
s4 -0.01 False
9 A_U
s4 -0.01 False
10 A_D
s4 -0.01 False
11 A_R
s4 -0.01 False
12 A_R
s4 -0.01 False
13 A_R
s0 -0.01 False
14 A_U
s0 -0.01 False
15 A_D
s1 -0.01 False
16 A_R
s1 -0.01 False
17 A_D
s1 -0.01 False
18 A_D
s1 -0.01 False
19 A_U
s2 -0.01 False
20 A_D
s1 -0.01 False
21 A_R
s1 -0.01 False
22 A_L
s1 -0.01 False
23 A_L
s0 -0.01 False
24 A_U
s0 -0.01 False
25 A_R
s1 -0.01 False
26 A_U
s0 -0.01 False
27 A_R
s4 -0.01 False
28 A_U
s4 -0.01 False
29 A_U
s4 -0.01 False
30 A_D
s4 -0.01 False
31 A_L
s0 -0.01 False
32 A_R
s4 -0.01 False
33 A_U
s0 -0.01 False
34 A_L
s1 -0.01 False
35 A_D
s1 -0.01 False
36 A_R
s1 -0.01 False
37 A_R
s1 -0.01 False
38 A_R
s0 -0.01 False
39 A_U
s0 -0.01 False
40 A_L
s4 -0.01 False
41 A_L
s4 -0.01 False
42 A_L
s0 -0.01 False
43 A_D
s4 -0.01 False
44 A_U
s8 -0.01 False
45 A_L
s8 -0.01 Fals

## Assignment 2 - Part 2


In [497]:
actions = ['A_U','A_D','A_L', 'A_R'] # set of actions 
state_reward = [-0.01, -0.01,-0.01,1,-0.01,-0.01,-0.01,-1,-0.01,-0.01,-0.01,-0.01] # reward at every state
states = env1.get_all_states() # set of all states

In [498]:
import numpy as np

# Converting the dictionery transition_probs to matrix form
transition_matrix = []

for s in states:   
    x = transition_probs[s]
    # print(x)
    action_matrix = []
    if not bool(x):
        # print(x)
        transition_matrix.append(np.zeros((len(actions), len(states))))
    else:
        for a in actions:
            y = x[a]
            # print(y)
            action_state_matrix = []
            for i in states:
                if i in y.keys():
                    action_state_matrix.append(y[i])
                else:
                    action_state_matrix.append(0)
            # print(action_state_matrix)
            action_matrix.append(action_state_matrix)
        transition_matrix.append(action_matrix)
        


In [499]:
transition_matrix

[[[0.9, 0.1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  [0.1, 0.1, 0, 0, 0.8, 0, 0, 0, 0, 0, 0, 0],
  [0.9, 0, 0, 0, 0.1, 0, 0, 0, 0, 0, 0, 0],
  [0.1, 0.8, 0, 0, 0.1, 0, 0, 0, 0, 0, 0, 0]],
 [[0.1, 0.8, 0.1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  [0.1, 0.8, 0.1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  [0.8, 0.2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
  [0, 0.2, 0.8, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
 [[0, 0.1, 0.8, 0.1, 0, 0, 0, 0, 0, 0, 0, 0],
  [0, 0.1, 0, 0.1, 0, 0, 0.8, 0, 0, 0, 0, 0],
  [0, 0.8, 0.1, 0, 0, 0, 0.1, 0, 0, 0, 0, 0],
  [0, 0, 0.1, 0.8, 0, 0, 0.1, 0, 0, 0, 0, 0]],
 array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]),
 [[0.8, 0, 0, 0, 0.2, 0, 0, 0, 0, 0, 0, 0],
  [0, 0, 0, 0, 0.2, 0, 0, 0, 0.8, 0, 0, 0],
  [0.1, 0, 0, 0, 0.8, 0, 0, 0, 0.1, 0, 0, 0],
  [0.1, 0, 0, 0, 0.8, 0, 0, 0, 0.1, 0, 0, 0]],
 array([[0., 0., 0., 0., 0., 0., 0., 0

### 1. Policy iteration

In [500]:
policy_iter = 10000 
value_iter = 10000 

p = ['A_L' for s in states] # initilaising a policy 
V = [0 for s in states] # initialising V0

gamma = 1
epsilon = 1e-500 # error margin

In [501]:
for i in range(0, policy_iter):
    flag = True

    # Policy evaluation - computing v(pi)
    for j in range(0, value_iter):
        m = 0 
        V1 = np.zeros(len(states))

        for s in range(0, len(states)):
            v = state_reward[s] 
            for s1 in range(0, len(states)):
                v = v + transition_matrix[s][actions.index(p[s])][s1] * (gamma * V[s1])

            if v - V[s] > m:
                m = v - V[s]

            V[s] = v  

        if m < epsilon:
            break

    # Policy improvement
    for s in range(0, len(states)):

        max_v = V[s]
        for a in range(0, len(actions)):
            v = state_reward[s] 
            for s1 in range(0, len(states)):
                v = v + transition_matrix[s][a][s1] * (gamma * V[s1]) 

            if v > max_v and actions.index(p[s]) != a:
                p[s] = actions[a]
                max_v = v
                flag = False

    if flag:
        # print(p)
        break

for s in states:
    if env1.is_terminal(s):
        p[states.index(s)] = ' '

print(p) # Printing the policy obtained by  policy evaluation

['A_R', 'A_R', 'A_R', ' ', 'A_U', ' ', 'A_L', ' ', 'A_U', 'A_L', 'A_L', 'A_D']


### 2. Value iteration

In [502]:
iter = 10000 # max number of value iterations
epsilon = 1e-500 # error margin 

p = ['A_L' for s in states] # initialising some policy p
V = [0 for s in states] # initilising V0

In [503]:
for i in range(0, iter):
    m = 0  
    V1 = np.zeros(len(states))  
    for s in range(0, len(states)):
        max_v = 0
        for a in range(0, len(actions)):

            v = state_reward[s]
            for s1 in range(0, len(states)):
                v = v + transition_matrix[s][a][s1] * (gamma * V[s1])

            if v > max_v:
                max_v = v
            
            if V[s] >= v:
                continue
            
            p[s] = actions[a] # Updating policy

        V1[s] = max_v

        if V[s] - V1[s] > m:
            m = V[s] - V1[s]

    V = V1

    if m < epsilon:
        # print(p)
        break

for s in states:
    if env1.is_terminal(s):
        p[states.index(s)] = ' '

print(p) # Printing the policy obtained by  value iteration

['A_R', 'A_R', 'A_R', ' ', 'A_U', ' ', 'A_L', ' ', 'A_U', 'A_L', 'A_L', 'A_D']


 The policy obtained by policy iteration is :  
 ['A_R', 'A_R', 'A_R', ' ', 'A_U', ' ', 'A_L', ' ', 'A_U', 'A_L', 'A_L', 'A_D']

 And 
 The policy obtainde by value iteration is:  
 ['A_R', 'A_R', 'A_R', ' ', 'A_U', ' ', 'A_L', ' ', 'A_U', 'A_L', 'A_L', 'A_D']
 
 Both value iteration (VI) and policy iteration (PI) algorithms converge to the policy.
