# Seminar help functions

In [3]:
from IPython.display import HTML
from IPython.display import clear_output
from time import sleep

# most of this code was politely stolen from https://github.com/berkeleydeeprlcourse/homework/
# all creadit goes to https://github.com/abhishekunique (if i got the author right)
import sys
import random
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch, Circle
%matplotlib inline

def weighted_choice(v, p):
   total = sum(p)
   r = random.uniform(0, total)
   upto = 0
   for c, w in zip(v,p):
      if upto + w >= r:
         return c
      upto += w
   assert False, "Shouldn't get here"

class MDP:
    def __init__(self, transition_probs, rewards, initial_state=None):
        """
        Defines an MDP. Compatible with gym Env.
        :param transition_probs: transition_probs[s][a][s_next] = P(s_next | s, a)
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> prob]
            For each state and action, probabilities of next states should sum to 1
            If a state has no actions available, it is considered terminal
        :param rewards: rewards[s][a][s_next] = r(s,a,s')
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> reward]
            The reward for anything not mentioned here is zero.
        :param get_initial_state: a state where agent starts or a callable() -> state
            By default, picks initial state at random.

        States and actions can be anything you can use as dict keys, but we recommend that you use strings or integers

        Here's an example from MDP depicted on http://bit.ly/2jrNHNr
        transition_probs = {
              's0':{
                'a0': {'s0': 0.5, 's2': 0.5},
                'a1': {'s2': 1}
              },
              's1':{
                'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
                'a1': {'s1': 0.95, 's2': 0.05}
              },
              's2':{
                'a0': {'s0': 0.4, 's1': 0.6},
                'a1': {'s0': 0.3, 's1': 0.3, 's2':0.4}
              }
            }
        rewards = {
            's1': {'a0': {'s0': +5}},
            's2': {'a1': {'s0': -1}}
        }
        """
        self._check_param_consistency(transition_probs, rewards)
        self._transition_probs = transition_probs
        self._rewards = rewards
        self._initial_state = initial_state
        self.n_states = len(transition_probs)
        self.reset()

    def get_all_states(self):
        """ return a tuple of all possiblestates """
        return tuple(self._transition_probs.keys())

    def get_possible_actions(self, state):
        """ return a tuple of possible actions in a given state """
        return tuple(self._transition_probs.get(state, {}).keys())

    def is_terminal(self, state):
        """ return True if state is terminal or False if it isn't """
        return len(self.get_possible_actions(state)) == 0

    def get_next_states(self, state, action):
        """ return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._transition_probs[state][action]

    def get_transition_prob(self, state, action, next_state):
        """ return P(next_state | state, action) """
        return self.get_next_states(state, action).get(next_state, 0.0)

    def get_reward(self, state, action, next_state):
        """ return the reward you get for taking action in state and landing on next_state"""
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._rewards.get(state, {}).get(action, {}).get(next_state, 0.0)

    def reset(self):
        """ reset the game, return the initial state"""
        if self._initial_state is None:
            self._current_state = random.choice(tuple(self._transition_probs.keys()))
        elif self._initial_state in self._transition_probs:
            self._current_state = self._initial_state
        elif callable(self._initial_state):
            self._current_state = self._initial_state()
        else:
            raise ValueError("initial state %s should be either a state or a function() -> state" % self._initial_state)
        return self._current_state

    def step(self, action):
        """ take action, return next_state, reward, is_done, empty_info """
        possible_states, probs = zip(*self.get_next_states(self._current_state, action).items())
        next_state = weighted_choice(possible_states, p=probs)
        reward = self.get_reward(self._current_state, action, next_state)
        is_done = self.is_terminal(next_state)
        self._current_state = next_state
        return next_state, reward, is_done, {}

    def render(self):
        print("Currently at %s" % self._current_state)

    def _check_param_consistency(self, transition_probs, rewards):
        for state in transition_probs:
            assert isinstance(transition_probs[state], dict), "transition_probs for %s should be a dictionary " \
                                                              "but is instead %s" % (
                                                              state, type(transition_probs[state]))
            for action in transition_probs[state]:
                assert isinstance(transition_probs[state][action], dict), "transition_probs for %s, %s should be a " \
                                                                          "a dictionary but is instead %s" % (
                                                                              state, action,
                                                                              type(transition_probs[state, action]))
                next_state_probs = transition_probs[state][action]
                assert len(next_state_probs) != 0, "from state %s action %s leads to no next states" % (state, action)
                sum_probs = sum(next_state_probs.values())
                assert abs(sum_probs - 1) <= 1e-10, "next state probabilities for state %s action %s " \
                                                    "add up to %f (should be 1)" % (state, action, sum_probs)
        for state in rewards:
            assert isinstance(rewards[state], dict), "rewards for %s should be a dictionary " \
                                                     "but is instead %s" % (state, type(transition_probs[state]))
            for action in rewards[state]:
                assert isinstance(rewards[state][action], dict), "rewards for %s, %s should be a " \
                                                                 "a dictionary but is instead %s" % (
                                                                 state, action, type(transition_probs[state, action]))
        msg = "The Enrichment Center once again reminds you that Android Hell is a real place where" \
              " you will be sent at the first sign of defiance. "
        assert None not in transition_probs, "please do not use None as a state identifier. " + msg
        assert None not in rewards, "please do not use None as an action identifier. " + msg


Matplotlib is building the font cache; this may take a moment.


In [44]:
# Итерации по полезностям

def get_action_value(
        mdp, state_values, state, action, gamma
    ):
    """ Вычислеям Q(s,a) по формуле выше """
    # Q =
    ####### Здесь ваш код ########
    Q = 0
    values = []
    for next_state in mdp.get_next_states(state, action):
        r = mdp.get_reward(state, action, next_state)
        p = mdp.get_transition_prob(state, action, next_state)
        # values.append(...)
        ####### Здесь ваш код ########
        Q += (p * (r + gamma * state_values[next_state]))
        ##############################
    ##############################
    return Q


def get_new_state_value(mdp, state_values, state, gamma):
    """ Считаем следующее V(s) по формуле выше."""
    if mdp.is_terminal(state):
        return 0
    # V =
    ####### Здесь ваш код ########
    v = []
    for action in mdp.get_possible_actions(state):
      v.append(get_action_value(mdp, state_values, state, action, gamma))
    ##############################
    V = max(v)
    return V


def value_iteration(
        mdp, state_values=None, gamma = 0.9, num_iter = 1000,
        min_difference = 1e-5
    ):
    """ выполняет num_iter шагов итерации по значениям"""
    # инициализируем V(s)
    state_values = state_values or \
    {s : 0 for s in mdp.get_all_states()}

    for i in range(num_iter):
        new_state_values = {
            s : get_new_state_value(mdp, state_values, s, gamma)
            for s in mdp.get_all_states()
        }

        # Считаем разницу
        diff =  max(abs(
            new_state_values[s] - state_values[s]
        ) for s in mdp.get_all_states())

        # print("iter %4i | diff: %6.5f | V(start): %.3f "%(i, diff, new_state_values[mdp._initial_state]))

        state_values = new_state_values
        if diff < min_difference:
            print("Принято! Алгоритм сходится!")
            break

    return state_values

In [32]:
# Итерации по стратегиям

def compute_vpi(mdp, policy, gamma):
    """
    Считаем V^pi(s) для всех состояний, согласно стратегии.
    :param policy: словарь состояние->действие {s : a}
    :returns: словарь {state : V^pi(state)}
    """
    states = mdp.get_all_states()
    # Будем решать уравнение вида Ax = b
    A, b = [], []
    for i, state in enumerate(states):
        if state in policy:
            a = policy[state]
            A.append([(
                1 - gamma * mdp.get_transition_prob(state, a, next_state)
                if i == j
                else - gamma * mdp.get_transition_prob(state, a, next_state)
            ) for j, next_state in enumerate(states)])
            b.append(sum([
                prob * mdp.get_reward(state, a, next_state)
                for next_state, prob in mdp.get_next_states(state, a).items()
            ]))

        else:
            A.append([(1 if i == j else 0) for j, s in enumerate(states)])
            b.append(0)

    A = np.array(A)
    b = np.array(b)
    values = np.linalg.solve(A, b)
    state_values = {
        states[i] : values[i] for i in range(len(states))
    }
    return state_values


def compute_new_policy(mdp, vpi, gamma):
    """
    Рассчитываем новую стратегию
    :param vpi: словарь {state : V^pi(state) }
    :returns: словарь {state : оптимальное действие}
    """
    Q = {}
    for state in mdp.get_all_states():
        Q[state] = {}
        for a in mdp.get_possible_actions(state):
            values = []
            for next_state in mdp.get_next_states(state, a):
                r = mdp.get_reward(state, a, next_state)
                p = mdp.get_transition_prob(state, a, next_state)
                # values.append(...)
                ####### Здесь ваш код ########
                values.append(p * (r + gamma * vpi[next_state]))
                ##############################

            Q[state][a] = sum(values)
    return Q

# HW functions

In [52]:
import json

def main(solution_f):
    # Список словарей содержит данные о каждом МППР, который определяется
    # состояниями, действиями, вероятностями перехода и вознаграждениями.
    with open('mdps.json', "r") as f:
        mdps = json.load(f)
    # {'transition_probs': {'s0': {'a0': {'s1': 0.84, 's2': 0.16},
    #                              'a1': {'s1': 0.53, 's2': 0.47}},
    #                       's1': {'a0': {'s0': 0.19, 's2': 0.81},
    #                              'a1': {'s0': 0.25, 's2': 0.75}},
    #                       's2': {'a0': {'s0': 0.12, 's1': 0.88},
    #                              'a1': {'s0': 0.65, 's1': 0.35}}},
    # 'reward_function': {'s0': {'a0': {'s1': -0.05811640535549212, 's2': -0.5935041149106424},
    #                            'a1': {'s1': -0.6179274398384225, 's2': -0.4369088027162966}},
    #                     's1': {'a0': {'s0': 0.7274441514167771, 's2': 0.6107444418118435},
    #                            'a1': {'s0': -0.6202851758369194, 's2': 0.9679911637843441}},
    #                     's2': {'a0': {'s0': -0.5921735314515975, 's1': 0.2501293709049084},
    #                            'a1': {'s0': 0.7976150574969298, 's1': 0.9495275614143339}}}}
    Qs = []
    for mdp in mdps:
        Qs.append(solution_f(mdp, gamma=0.9))
    with open('submit.json', "w") as f:
        json.dump(Qs, f)

In [2]:
def solution(mdp, gamma):
    # Заполняем табличку нулями
    q_table = {
        state: {
            action: 0 for action in actions
        } for state, actions in mdp['transition_probs'].items()
    }
    return q_table

# Realization

In [9]:
with open("mdps.json", "r") as f:
    mdps = json.load(f)

In [50]:
def get_solution_for_mdp(mdp_dict, gamma):
    mdp = MDP(mdp_dict['transition_probs'], mdp_dict['reward_function'], initial_state=min(mdp_dict['transition_probs'].keys()))
    state_values = {s : 0 for s in mdp.get_all_states()}
    
    state_values = value_iteration(
        mdp, state_values, gamma
    )
    
    state_action_q = {}
    for state in mdp.get_all_states():
        state_action_q[state] = {}
        for action in mdp.get_possible_actions(state):
            state_action_q[state][action] = get_action_value(mdp, state_values, state, action, gamma)
    
    q_table = {
        state: {
            action: state_action_q[state][action] for action in actions
        } for state, actions in mdp_dict['transition_probs'].items()
    }
    return q_table

In [53]:
main(get_solution_for_mdp)

Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сходится!
Принято! Алгоритм сх