# Student Name: Zhongyue Xing

# Preface

The Environment below has three states: 1, 2, and 3. Possible transitions are: (1) 1->1, 1->2; (2) 2->1, 2->2, 2->3; and (3) 3->2, 3->3.

Actions of the Agent are decoded by -1, 0, and +1, which correspond to its intention to move left, stay, and move right, respectively. The Environment, however, does not always respond to these intentions exactly, and there is 10% chance that action 0 will result in moving to the left (if moving to the left is admissible), and 1 action will result in staying - in other words, there is an "east wind" (please see get_reward() of the Environment). 

Further, we assume that the number of steps, T, is infinite and whenever the process enters state 3, the Environment generates reward = 1. In all other cases the reward is 0. For example, transition 2->3 will result in reward 1, transition 3->3 will result in reward 1, transition 3->2 will result in reward 0, transition 2->2 will result in reward 0, etc.

Let’s use $\gamma=0.9$. Currently, the Agent always selects action 0 (has an intention to stay). Please notice that without the wind, the state-values would be [0,0,1/0.1] for this policy. With the wind, the value of state 3, however, becomes only 4.74 because the transition 3->2 happens sooner or later with probability 1. The Agent does not make an attempt to return to state 3 – its intention is to stay.

Function reward_cumulative(T=10, S0=1, gamma=1) returns the observed cumulative discounted reward for T number of steps if the process starts in state S0. Please notice that given $\gamma=0.9$, T=100 does not make this estimate much different from infinite time because $\gamma^T$ is of order $10^{-5}$.

Function V_estimate(T=10, S0=1, gamma=0.9, n_trials=10) calls reward_cumulative() function n_trials number of times and then estimates the state-value based on these n_trials paths.

In [117]:
import random
from matplotlib import pyplot as plt 
import numpy as np

class Environment:
    def __init__(self, S0 = 1):
        self.time = 0
        self.state = S0

    def admissible_actions(self):
        A = list((-1,0,1))
        if self.state == 1: A.remove(-1)
        if self.state == 3: A.remove(1)
        return A
    
    def check_state(self):
        return self.state

    def get_reward(self, action):
        self.time += 1
        move = action
        if self.state > 1 and move > -1:
            move = np.random.choice([move-1, move],p=[0.1,0.9])
        self.state += move
        if self.state == 3:
            reward = 1
        else:
            reward = 0
        return reward


class Agent:
    def __init__(self):
        self.current_reward = 0.0

    def step(self, env):
        #actions = env.admissible_actions()
        action_selected = 0
        reward = env.get_reward(action_selected)            
        self.current_reward = reward
        
def reward_cumulative(T=10, S0=1, gamma=1):
    env = Environment(S0)
    agent = Agent()
    G = 0
    while env.time <= T:
        agent.step(env)
        G += gamma**(env.time-1)*agent.current_reward
    return G

def V_estimate(T=10, S0=1, gamma=0.9, n_trials=10):
    V_estimate = 0
    for i in range(1,n_trials+1):
        #V_estimate = (V_estimate*(i-1) + reward_cumulative(T, S0, gamma))/i
        V_estimate = V_estimate+(reward_cumulative(T, S0, gamma)-V_estimate)/i 
    return V_estimate

In [116]:
T = 100
V = np.array([V_estimate(T, S0 = s, gamma=0.9, n_trials=10000) for s in range(1,4)])

np.set_printoptions(precision=2)
print("state-value function:")
print(V)

state-value function:
[0.   0.   4.78]


## Problem 1 (5 points)

Please add state 4 and 5 to the Environment. For entering states 4 and 5 assume no rewards. Please keep current reward from entering state 3 as is, i.e. 2->3, 3->3, 4->3 will result in reward=1. All other cases correspond to 0 reward.

For current actions of the Agent, keep $\gamma=0.9$ and estimate the state-values using V_estimate() function with T = 100 and n_trials=10000. Print the result for states 1, 2, 3, 4, 5. What value of state 4 do you observe?


In [2]:
from matplotlib import pyplot as plt 
import numpy as np

class Environment:
    def __init__(self, S0 = 1):
        self.time = 0
        self.state = S0

    def admissible_actions(self):
        A = list((-1,0,1))
        if self.state == 1: A.remove(-1)
        if self.state == 5: A.remove(1)
        return A
    
    def check_state(self):
        return self.state

    def get_reward(self, action):
        self.time += 1
        move = action
        if self.state > 1 and move > -1:
            move = np.random.choice([move-1, move],p=[0.1,0.9])
        self.state += move
        if self.state == 3:
            reward = 1
        else:
            reward = 0
        return reward


class Agent:
    def __init__(self):
        self.current_reward = 0.0

    def step(self, env):
        #actions = env.admissible_actions()
        action_selected = 0
        reward = env.get_reward(action_selected)            
        self.current_reward = reward
        
def reward_cumulative(T=10, S0=1, gamma=1):
    env = Environment(S0)
    agent = Agent()
    G = 0
    while env.time <= T:
        agent.step(env)
        G += gamma**(env.time-1)*agent.current_reward
    return G

def V_estimate(T=10, S0=1, gamma=0.9, n_trials=10):
    V_estimate = 0
    for i in range(1,n_trials+1):
        #V_estimate = (V_estimate*(i-1) + reward_cumulative(T, S0, gamma))/i
        V_estimate = V_estimate+(reward_cumulative(T, S0, gamma)-V_estimate)/i 
    return V_estimate

T = 100
V = np.array([V_estimate(T, S0 = s, gamma=0.9, n_trials=10000) for s in range(1,6)])

np.set_printoptions(precision=2)
print("state-value function:")
print(V)

state-value function:
[0.   0.   4.76 2.78 1.31]


## Problem 2 (5 points)

Using the Environment you developed in Problem 1, change the actions of the Agent to the optimal, that is, from states 1 and 2 it will want to move to right, stay in stay 3, and move to left from states 4 and 5.

For these actions of the Agent and $\gamma=0.9$, estimate the state-values using V_estimate() function with T = 100 and n_trials=10000. Print the result for states 1, 2, 3, 4, 5.

In [70]:
from matplotlib import pyplot as plt 
import numpy as np

class Environment:
    def __init__(self, S0 = 1):
        self.time = 0
        self.state = S0

    def admissible_actions(self):
        A = list((-1,0,1))
        if self.state == 1: A.remove(-1)
        if self.state == 5: A.remove(1)
        return A
    
    def check_state(self):
        return self.state

    def get_reward(self, action):
        self.time += 1
        assert action in self.admissible_actions()
        move = action
        if self.state > 1 and move > -1 or self.state == 1 and action == 1:
            move = np.random.choice([move-1, move],p=[0.1,0.9])
        self.state += move
        if self.state == 3:
            reward = 1
        else:
            reward = 0
        return reward


class Agent:
    def __init__(self):
        self.current_reward = 0.0

    def step(self, env):
        actions = env.admissible_actions()
        if env.state < 3:
            action_selected = 1
        elif env.state > 3:
            action_selected = -1
        else:
            action_selected = 0
        reward = env.get_reward(action_selected)            
        self.current_reward = reward
        
def reward_cumulative(T=10, S0=1, gamma=1):
    env = Environment(S0)
    agent = Agent()
    G = 0
    while env.time <= T:
        agent.step(env)
        G += gamma**(env.time-1)*agent.current_reward
    return G

def V_estimate(T=10, S0=1, gamma=0.9, n_trials=10):
    V_estimate = 0
    for i in range(1,n_trials+1):
        #V_estimate = (V_estimate*(i-1) + reward_cumulative(T, S0, gamma))/i
        V_estimate = V_estimate+(reward_cumulative(T, S0, gamma)-V_estimate)/i 
    return V_estimate


T = 100
V = np.array([V_estimate(T, S0 = s, gamma=0.9, n_trials=10000) for s in range(1,6)])

np.set_printoptions(precision=2)
print("state-value function:")
print(V)

state-value function:
[8.01 9.   9.   9.11 8.19]


## Problem 3 (15 points)
For the Environment and Agent you developed in Problem 2, please obtain the state-value for this policy in an alternative way without simulations by solving the Bellman equation (eq. 4.5) numerically: use the iterative policy evaluation algorithm on p.75 of "Reinforcement Learning" by Sutton and Barto.

Please notice that for these actions the policy $\pi(a|s)$ is<br>
$\pi(-1|1)=0, \pi(0|1)=0, \pi(+1|1)=1$,<br>
$\pi(-1|2)=0, \pi(0|2)=0, \pi(+1|2)=1$,<br>
$\pi(-1|3)=0, \pi(0|3)=1, \pi(+1|3)=0$,<br>
$\pi(-1|4)=1, \pi(0|4)=0, \pi(+1|4)=0$,<br>
etc.


The non-zero transition probabilities $p(s^\prime,r|s,a)$ are<br>

$p(s^\prime=1,r=0|s=1,a=0)=1$,<br>
$p(s^\prime=1,r=0|s=1,a=+1)=0.1,p(s^\prime=2,r=0|s=1,a=+1)=0.9$,<br>

$p(s^\prime=1,r=0|s=2,a=-1)=1$,<br>
$p(s^\prime=1,r=0|s=2,a=0)=0.1,p(s^\prime=2,r=0|s=2,a=0)=0.9$,<br>
$p(s^\prime=2,r=0|s=2,a=+1)=0.1,p(s^\prime=3,r=1|s=2,a=+1)=0.9$,<br>

$p(s^\prime=2,r=0|s=3,a=-1)=1$,<br>
$p(s^\prime=2,r=0|s=3,a=0)=0.1,p(s^\prime=3,r=1|s=3,a=0)=0.9$,<br>
$p(s^\prime=3,r=1|s=3,a=+1)=0.1,p(s^\prime=4,r=0|s=3,a=+1)=0.9$,<br>

etc.




In [1]:
from matplotlib import pyplot as plt
from collections import defaultdict
import numpy as np

class Environment:
    def __init__(self, S0 = 1):
        self.time = 0
        self.state = S0

    def admissible_actions(self, state):
        """returns possible actions of a given state"""
        A = list((-1,0,1))
        if state == 1: A.remove(-1)
        if state == 5: A.remove(1)
        return A
    
    def possible_outcomes(self, state, action):
        """ 
        returns list of possible (outcome states, reward pairs) after
        conducting an action
        
        """

        assert action in self.admissible_actions(state), "action {} at state {}!".format(action, state)
        if state > 1 and action > -1 or state == 1 and action == 1:
            outcome_states = [state+action, state+action-1]
        else:
            outcome_states = [state+action]

        outcome_pair = []
        for outcome_state in outcome_states:
            if outcome_state == 3:
                outcome_pair.append((outcome_state, 1))
            else:
                outcome_pair.append((outcome_state, 0))
        return outcome_pair


    def transition_prob(self, new_state, reward, cur_state, action):
        """
        returns transition probabilities for given:
            new_state (s'), reward (r), current state (s), and action (a)
        """
        # valid actions
        assert action in self.admissible_actions(cur_state)

        # valid reward
        if (reward == 1 and new_state == 3) or (reward == 0 and new_state != 3):
            # valid new state with no wind effect
            if new_state == cur_state + action:
                if action == -1 or (action == 0 and cur_state == 1):
                    return 1
                else:
                    return 0.9
            # valid new state with wind effect
            elif new_state == cur_state + (action - 1):
                return 0.1


        # remaining combinations are invalid
        return 0

    def all_states(self):
        return list(range(1, 6))



def policy(action, state):
    if (state < 3 and action == 1) or (
        state > 3 and action == -1) or(
        state == 3 and action == 0):
        return 1

    else:
        return 0

def iterative_policy_eval(env, pi, gamma = 0.9, theta = 0.01):
    """
    Input policy a pi function and information of the environment
    to estimate its expected value under pi. Return a default dictionary
    of state:expected_values
    
    """

    # start with all 0 value function
    V = defaultdict(float)

    counter = 0
    while True:
        delta = 0
        for state in env.all_states():
            v = V[state]
            sumV = 0

            # summation (refer to textbook pg. 75)
            for action in env.admissible_actions(state):
                action_prob = pi(action, state)
                for new_state, reward in env.possible_outcomes(state, action):
                    transition_prob = env.transition_prob(new_state, reward, state, action)
                    #print(state, action, new_state, reward, action_prob, transition_prob)
                    sumV += action_prob * transition_prob *(reward + gamma*V[new_state])
            V[state] = sumV

            delta = max(delta, abs(v - V[state]))

        if delta < theta:
            break
    return V

print("state-value function:")
V = iterative_policy_eval(
    env = Environment(), 
    pi = lambda action, state: policy(action, state),
    gamma = 0.9, theta = 0.0000000000000000000000001
)
print([round(value,2) for _, value in sorted([state_value_pair for state_value_pair in V.items()])])

state-value function:
[8.01, 9.0, 9.0, 9.1, 8.19]


In [None]:
exit