In [None]:
import gym
import  numpy as np

np.set_printoptions(precision=3)

In [None]:
env = gym.make('FrozenLake-v1', is_slippery=True)

Зима пришла. Вы и ваши друзья бросали фрисби в парке, когда вы сделали дикий бросок, который оставил фрисби посреди озера. Вода в основном замерзла, но есть несколько лунок, где лед растаял. Если вы войдете в одну из этих дыр, вы упадете в ледяную воду. В настоящее время существует нехватка международных фрисби, поэтому абсолютно необходимо, чтобы вы пересекли озеро и забрали диск. Однако лед скользкий, поэтому вы не всегда будете двигаться в том направлении, в котором хотите.

Эпизод заканчивается, когда вы достигаете цели или падаете в яму. Вы получаете вознаграждение в размере 1, если достигаете цели, и ноль в противном случае.

In [None]:
init_state = env.reset()
print(init_state)
env.render()

Обозначения
<pre class="literal-block">SFFF       (S: starting point, safe)
FHFH       (F: frozen surface, safe)
FFFH       (H: hole, fall to your doom)
HFFG       (G: goal, where the frisbee is located)
</pre>

Действия:
<pre class="literal-block">
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3
</pre>

In [None]:
action_to_symbol = {
    0: '\u2190',
    1: '\u2193',
    2: '\u2192',
    3: '\u2191'
}
for a in action_to_symbol.keys():
    print(a,':', action_to_symbol[a])

In [None]:
action = env.action_space.sample()
print("action=",action)
next_state, reward, done, info = env.step(action)
print("next_state, reward, done =",next_state, reward, done)

In [None]:
init_state = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    next_state, reward, done, info =env.step(action)
    env.render()

In [None]:
state = 8
action = 2
print(env.unwrapped.P[state][action])

<pre class="literal-block">
`is_slippery`: True/False. If True will move in intended direction with
probability of 1/3 else will move in either perpendicular direction with
equal probability of 1/3 in both directions.
    For example, if action is left and is_slippery is True, then:
    - P(move left)=1/3
    - P(move up)=1/3
    - P(move down)=1/3
</pre>

In [None]:
class MDP():
    def __init__(self, env):
        self.states = np.arange(env.observation_space.n)
        self.actions = np.arange(env.action_space.n)
        self.P = env.unwrapped.P

In [None]:
mdp = MDP(env)

In [None]:
print(mdp.states)
print(mdp.actions)
state = 8
action = 2
print(mdp.P[state][action])

In [None]:
probs, next_states, rewards, dones = zip(*mdp.P[state][action])
probs       = np.array(probs, dtype=np.float32)
next_states = np.array(next_states, dtype=np.int32)
rewards     = np.array(rewards, dtype=np.float32)
dones       = np.array(dones, dtype=np.float32)
probs, next_states, rewards, dones

In [None]:
V = np.zeros(len(mdp.states), dtype=np.float32)
q_value = np.sum(probs*(rewards + (1-dones)*V[next_states]))
q_value

In [None]:
def compute_q_value(transitions, V):
    probs, next_states, rewards, dones = zip(*transitions)
    probs       = np.array(probs, dtype=np.float32)
    next_states = np.array(next_states, dtype=np.int32)
    rewards     = np.array(rewards, dtype=np.float32)
    dones       = np.array(dones, dtype=np.float32)
    q_value = np.sum(probs*(rewards + (1-dones)*V[next_states]))
    return q_value

DYNAMIC PROGRAMMING

In [None]:
V = np.zeros(len(mdp.states), dtype=np.float64)
steps_number = 100
for _ in np.arange(steps_number):
    V_next  = np.zeros(len(mdp.states), dtype=np.float64)
    for s in  mdp.states:
        q_values = []
        for a in mdp.actions:
            q_value = compute_q_value(mdp.P[s][a], V)
            q_values.append(q_value)
        V_next[s] = np.max(q_values)
    V = V_next
    print(V.reshape((4,4)))               

VALUE ITERATION

In [None]:
def compute_q_value(transitions, V, gamma):
    probs, next_states, rewards, dones = zip(*transitions)
    probs       = np.array(probs, dtype=np.float32)
    next_states = np.array(next_states, dtype=np.int32)
    rewards     = np.array(rewards, dtype=np.float32)
    dones       = np.array(dones, dtype=np.float32)
    q_value = np.sum(probs*(rewards + gamma*(1-dones)*V[next_states]))
    return q_value

In [None]:
errors = []
V = np.zeros(len(mdp.states), dtype=np.float64)
epsilon = 1e-10
gamma = 0.999
#--------------------------------------------------------------------------
steps_number = 0
while True:
    V_next  = np.zeros(len(mdp.states), dtype=np.float64)
    for s in  mdp.states:
        q_values = []
        for a in mdp.actions:
            q_value = compute_q_value(mdp.P[s][a], V, gamma)
            q_values.append(q_value)
        V_next[s] = np.max(q_values)
    error = np.max(np.abs(V - V_next))
    errors.append(error)
    V = V_next
    steps_number+=1
    if error<=epsilon:
        break
print(steps_number)
print(V.reshape((4,4)))  

In [None]:
from matplotlib import pyplot as plt

In [None]:
plt.plot(errors)

Нахождение оптимальной стратегии

In [None]:
def get_q_fn(mdp, V, gamma):
    """
    return function (state * action-> q_value)
    """
    return lambda state,action: compute_q_value(mdp.P[state][action], V, gamma)

In [None]:
def get_policy(mdp, V, gamma):
    """
    return function (state -> action)
    """
    q_fn  = get_q_fn(mdp, V, gamma)
    return lambda state: np.argmax( [q_fn(state, action) for action in mdp.actions])

policy = get_policy(mdp, V, gamma)

In [None]:
best_actions = np.array([ policy(s) for s in mdp.states  ])
best_actions.reshape( (4,4))

In [None]:
env.reset()
env.render()
print('Решение:')
print(np.vectorize(action_to_symbol.get)(best_actions.reshape( (4,4))))

POLICY ITERATION

In [None]:
init_policy_dict = {s: np.random.choice(mdp.actions) for s in mdp.states} 
init_policy = lambda s: init_policy_dict[s]

In [None]:
print([init_policy(s) for s in mdp.states])

In [None]:
def policy_eval(policy, mdp, gamma, epsilon):
    V = np.zeros(len(mdp.states), dtype=np.float64)
    errors = []
    #--------------------------------------------------------------------------
    steps_number = 0
    while True:
        V_next  = np.zeros(len(mdp.states), dtype=np.float64)
        for s in  mdp.states:
            a = policy(s)
            q_value = compute_q_value(mdp.P[s][a], V, gamma)
            V_next[s] = q_value
        error = np.max(np.abs(V - V_next))
        errors.append(error)
        V = V_next
        steps_number+=1
        if error<=epsilon:
            break
    print("Steps=", steps_number)
    return V

In [None]:
def policy_improvement(V, mdp, policy):
    new_policy = get_policy(mdp, V, gamma)
    return new_policy    

In [None]:
V = policy_eval(policy, mdp, 0.99, 0.1**10)

In [None]:
init_policy_dict = {s: np.random.choice(mdp.actions) for s in mdp.states} 
init_policy = lambda s: init_policy_dict[s]
print([init_policy(s) for s in mdp.states])

In [None]:
policy = init_policy
while True:
    V = policy_eval(policy, mdp, 0.99, 0.1**10)
    new_policy = policy_improvement(V, mdp, policy)
    
    policy_actions= [policy(s) for s in mdp.states]
    new_policy_actions= [new_policy(s) for s in mdp.states]
    policy = new_policy
    if (policy_actions==new_policy_actions):
        break   

In [None]:
best_actions = np.array([ policy(s) for s in mdp.states  ])
best_actions.reshape( (4,4))

In [None]:
env.reset()
env.render()
print('Решение:')
print(np.vectorize(action_to_symbol.get)(best_actions.reshape( (4,4))))