<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Zadanie-1---Ocena-strategii-(Policy-evaluation)" data-toc-modified-id="Zadanie-1---Ocena-strategii-(Policy-evaluation)-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Zadanie 1 - Ocena strategii (<em>Policy evaluation</em>)</a></span></li><li><span><a href="#Zadanie-2---Iteracyjne-doskonalenie-strategii-(Policy-iteration)" data-toc-modified-id="Zadanie-2---Iteracyjne-doskonalenie-strategii-(Policy-iteration)-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Zadanie 2 - Iteracyjne doskonalenie strategii (<em>Policy iteration</em>)</a></span></li><li><span><a href="#Zadanie-3---Iteracyjne-obliczanie-funkcji-wartości-(Value-iteration)" data-toc-modified-id="Zadanie-3---Iteracyjne-obliczanie-funkcji-wartości-(Value-iteration)-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Zadanie 3 - Iteracyjne obliczanie funkcji wartości (<em>Value iteration</em>)</a></span></li></ul></div>

# Laboratorium 1

Celem pierwszego laboratorium jest zapoznanie się oraz zaimplementowanie algorytmów uczenia pasywnego. Zaimplementowane algorytmy będą testowane z wykorzystaniem wcześniej przygotowanego środowiska przedstawionego na schemacie poniżej:
![MDP, Markov Decision Process](assets/images/mdp.png)

Dołączenie biblioteki ze środowiskiem

In [22]:
from env.simpleMDP import simpleMDP
import numpy as np
import random

Zapoznanie się z przygotowanym środowiskiem

In [23]:
mdp = simpleMDP()

states = mdp.get_all_states()
print(states)

for s in states:
    actions = mdp.get_possible_actions(s)
    for a in actions:
        next_states = mdp.get_next_states(s, a)
        print("State: " + s + " action: " + a + " " + "list of possible next states: ", next_states)

['s0', 's1', 's2']
State: s0 action: a0 list of possible next states:  {'s0': 0.5, 's2': 0.5}
State: s0 action: a1 list of possible next states:  {'s2': 1}
State: s1 action: a0 list of possible next states:  {'s0': 0.7, 's1': 0.1, 's2': 0.2}
State: s1 action: a1 list of possible next states:  {'s1': 0.95, 's2': 0.05}
State: s2 action: a0 list of possible next states:  {'s0': 0.4, 's2': 0.6}
State: s2 action: a1 list of possible next states:  {'s0': 0.3, 's1': 0.3, 's2': 0.4}


## Zadanie 1 - Ocena strategii (*Policy evaluation*)

<p style='text-align: justify;'>
Celem ćwiczenie jest zaimplementowanie algorytmu oceny strategii. Algorytm wyznacza funkcję oceny (wartości) strategii wykorzystując równanie Bellmana. Odbywa się to w sposób iteracyjny zgodnie ze wzorem:
\begin{equation}
	        V_{k + 1}(s) = \sum_a \pi(a | s) \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V_k(s')]
\end{equation}
</p>

Pierwszym krokiem jest zainicjalizowanie strategii, która będzie podlegała ocenie. W tym zadaniu ocenie będzie podlegała strategia stochastyczna, w której będzie prawdopodobieństwo wyboru każdej z możliwych opcji będzie takie samo.

In [24]:
policy = dict()

for s in states:
    actions = mdp.get_possible_actions(s)
    action_prob = 1 / len(actions)
    policy[s] = dict()
    for a in actions:
        policy[s][a] = action_prob

print(policy)

{'s0': {'a0': 0.5, 'a1': 0.5}, 's1': {'a0': 0.5, 'a1': 0.5}, 's2': {'a0': 0.5, 'a1': 0.5}}


Implementacja algorytmu oceny strategii - podejście z dwiema tablicami

In [25]:
def policy_eval_two_arrays(mdp, policy, gamma, theta):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state

    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = {s: random.random() for s in mdp.get_all_states()}
    V_check = {s: 0 for s in mdp.get_all_states()}
    
    while True:
        V_check = V.copy()
        delta = 0
        for s in mdp.get_all_states():
            v = 0
            for a in mdp.get_possible_actions(s):
                for s_next, s_next_probability in mdp.get_next_states(s, a).items():
                    v += policy[s][a] * s_next_probability * (mdp.get_reward(s, a, s_next) + gamma * V_check[s_next])
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        
        if delta < theta:
            break
                                                                       
    return V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [29]:
V = policy_eval_two_arrays(mdp, policy, 0.9, 0.0001)

assert np.isclose(V['s0'], 1.46785443374683)
assert np.isclose(V['s1'], 4.55336594491180)
assert np.isclose(V['s2'], 1.68544141660991)

print("Yay!")

Yay!


Implementacja algorytmu oceny strategii - obliczenia w miejscu

In [30]:
def policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state

    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate.
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = {s: 0 for s in mdp.get_all_states()}
    
    while True:
        delta = 0
        for s in mdp.get_all_states():
            v = 0
            for a in mdp.get_possible_actions(s):
                for s_next, s_next_probability in mdp.get_next_states(s, a).items():
                    v += policy[s][a] * s_next_probability * (mdp.get_reward(s, a, s_next) + gamma * V[s_next])
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        if delta < theta:
            break
    
    return V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [31]:
V = policy_eval_in_place(mdp, policy, 0.9, 0.0001)

assert np.isclose(V['s0'], 1.4681508097651)
assert np.isclose(V['s1'], 4.5536768132712)
assert np.isclose(V['s2'], 1.6857723431614)

print("Yay!")

Yay!


## Zadanie 2 - Iteracyjne doskonalenie strategii (*Policy iteration*)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego doskonalenia strategii. Zadaniem algorytmu jest wyznaczenie optymalnej strategii dla danego środowiska. Pierwszym krokiem algorytmu jest zainicjalizowanie strategii losowymi akcjami, następnie jej ocenienie oraz poprawa, zgodnie ze wzorem:
\begin{equation}
    \pi(s) \leftarrow  \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')]
\end{equation}
Jeżeli poprawiona strategia będzie się różniła od poprzedniej, należy ją ponownie ocenić i poprawić. Algorytm kończy działanie w momencie kiedy poprawiona strategia będzie dokładnie taka sama, jak ta poddawana poprawie.
</p>

Zaimplementuj funkcję do oceny strategii, tylko tym razem przyjmij, że strategia przekazywana na wejściu będzie deteministyczna 

In [32]:
policy = dict()

for s in states:
    actions = mdp.get_possible_actions(s)
    action_prob = 1
    policy[s] = dict()
    for a in actions:
        policy[s][a] = action_prob
        action_prob = 0

print(policy)

{'s0': {'a0': 1, 'a1': 0}, 's1': {'a0': 1, 'a1': 0}, 's2': {'a0': 1, 'a1': 0}}


In [33]:
def deterministic_policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified deterministic policy for the specified MDP:

        'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state

        'policy' - the deterministic policy (action probability for each state), for the given mdp, too evaluate
        'gamma' - discount factor for MDP
        'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                  smaller than theta
    """
    V = dict()

    for s in mdp.get_all_states():
        V[s] = 0
 
    while True:
        delta = 0
        for s in mdp.get_all_states():
            v = 0
            for a in mdp.get_possible_actions(s):
                for s_next, s_next_probability in mdp.get_next_states(s, a).items():
                    v += policy[s][a] * s_next_probability * (mdp.get_reward(s, a, s_next) + gamma * V[s_next])
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        if delta < theta:
            break

    return V

Zaimplementuj funkcję do poprawy strategii `policy` na podstawie funkcji oceny `value_function` wyznaczonej dla niej 

In [34]:
def policy_improvement(mdp, policy, value_function, gamma):
    """
            This function improves specified deterministic policy for the specified MDP using value_function:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state

           'policy' - the deterministic policy (action for each state), for the given mdp, too improve.
           'value_function' - the value function, for the given policy.
            'gamma' - discount factor for MDP

           Function returns True if policy was improved or False otherwise
       """

    policy_stable = True
    out_policy = {s: {a: 0 for a in mdp.get_possible_actions(s)} for s in mdp.get_all_states()}

    for s in mdp.get_all_states():
        for a in mdp.get_possible_actions(s):
            v = 0
            for s_next, s_next_probability in mdp.get_next_states(s, a).items():
                out_policy[s][a] += s_next_probability * (mdp.get_reward(s, a, s_next) + gamma * value_function[s_next])
                
    # reonstructing new policy
    new_values = [np.max(list(s_dict.values())) for s_dict in out_policy.values()]
    for i, s in enumerate(mdp.get_all_states()):
        for a in mdp.get_possible_actions(s):
            out_policy[s][a] = 1 if out_policy[s][a] == new_values[i] and 1 not in out_policy[s].values() else 0
            
    if out_policy != policy:
        policy = out_policy
        policy_stable = False

    return policy_stable, policy

Zaimplementuj funkcję do iteracyjnego doskonalenia strategii

In [35]:
def policy_iteration(mdp, gamma, theta):

    """
            This function calculate optimal policy for the specified MDP:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state

           'gamma' - discount factor for MDP
           'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
                      than theta
           Function returns optimal policy and value function for the policy
       """

    policy = dict()

    for s in states:
        actions = mdp.get_possible_actions(s)
        action_prob = 1
        policy[s] = dict()
        for a in actions:
            policy[s][a] = action_prob
            action_prob = 0

    policy_stable = False

    while not policy_stable:
        V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)
        policy_stable, new_policy = policy_improvement(mdp, policy, V, gamma)
        
        if not policy_stable:
            V_new = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

            new_values = [val for val in V_new.values()]
            current_values = [val for val in V.values()]

            if all([a >= b for a, b in zip(new_values, current_values)]):
                policy = new_policy
                V = V_new
                
    # Get best action for state s
    actions = [max(x, key=x.get) for x in policy.values()]
    for i, s in enumerate(mdp.get_all_states()):
        policy[s] = actions[i]

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [36]:
optimal_policy, optimal_value = policy_iteration(mdp, 0.9, 0.001)
print(optimal_policy, optimal_value)

assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

print("Yay!")

{'s0': 'a1', 's1': 'a0', 's2': 'a1'} {'s0': 3.785366128142998, 's1': 7.298653645273432, 's2': 4.206831790079636}
Yay!


## Zadanie 3 - Iteracyjne obliczanie funkcji wartości (*Value iteration*)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego obliczania funkcji wartości. Algorytm ten łączy w sobie dwa wyżej wspomniane podejścia oceny i wyznaczania optymalnej strategii. Najpierw wyznaczana jest optymalna funkcja wartości stanu zgodnie ze wzorem:
\begin{equation}
    V(s) \leftarrow  \max_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
Po wyznaczeniu optymalnej funkcji wartości dla każdego stanu określana jest strategia postępowania w każdym możliwym stanie zgodnie ze wzorem:
\begin{equation}
    \pi(s) = \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
</p>

Implementacja algorytmu do iteracyjnego obliczania funkcji wartości

In [37]:
def value_iteration(mdp, gamma, theta):
    """
            This function calculate optimal policy for the specified MDP using Value Iteration approach:

            'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state

            'gamma' - discount factor for MDP
            'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                      smaller than theta
            Function returns optimal policy and value function for the policy
       """
    V = dict()
    policy = dict()

    # init with a policy with first avail action for each state
    for current_state in mdp.get_all_states():
        V[current_state] = 0
        policy[current_state] = actions[0]
                
    while True:
        delta = 0
        for s in mdp.get_all_states():
            v = V[s]
            V_s_a = {a: 0 for a in mdp.get_possible_actions(s)}
            for a in mdp.get_possible_actions(s):
                for s_next, s_next_probability in mdp.get_next_states(s, a).items():
                    V_s_a[a] += s_next_probability * (mdp.get_reward(s, a, s_next) + gamma * V[s_next])
            V[s] = max(list(V_s_a.values()))
            delta = max(delta, np.abs(v - V[s])) # previous vs new
            policy[s] = max(V_s_a.keys(), key=(lambda k: V_s_a[k]))
                
        if delta < theta:
            break
                    
    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [38]:
optimal_policy, optimal_value = value_iteration(mdp, 0.9, 0.001)

assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

print("Yay!")

Yay!
