In [31]:
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
from collections import defaultdict
from mpl_toolkits.mplot3d import axes3d
import gym

def generate_episode(env, policy, pi):
    steps = []
    done = True
    while True:
        if done:  state, reward, done = env.reset(), None, False
        else:     state, reward, done, info = env.step(action)
        action = policy(state, pi)        
        steps.append((state, reward, done, action))
        if done:  break
    return steps, len(steps)-1

def argmax_rand(arr):
    return np.random.choice(np.flatnonzero(arr == np.max(arr)))

def on_policy_mc_control(env, n_episodes, gamma, epsilon):   
    def policy(state, pi):
        return np.random.choice(env.act_space, p=[pi[(state,a)] for a in env.act_space])
    
    pi = defaultdict(lambda: 1/env.action_space.n)  
    Q = defaultdict(float)    
    Returns = defaultdict(list) 
    
    for _ in range(n_episodes):
        episode_steps, terminal_state_index = generate_episode(env, policy, pi)
        G = 0
        for t in range(terminal_state_index-1,-1,-1):
            state, _, _, action = episode_steps[t]
            _, reward_1, _, _ = episode_steps[t+1]
            
            G = gamma * G + reward_1
            
            if not (state, action) in [(episode_steps[i][0], episode_steps[i][3]) for i in range(0, t)]:
                Returns[(state, action)].append(G)
                Q[(state, action)] = np.average(Returns[(state, action)])
                A_star = argmax_rand([Q[(state,a)] for a in range(env.action_space.n)]) 
                for a in range(env.action_space.n):
                    if a == A_star:   pi[(state,a)] = 1 - epsilon + epsilon/env.action_space.n
                    else:             pi[(state,a)] = epsilon/env.action_space.n
                        
    return Q, pi


def test_performance(policy, nb_episodes=100):
    sum_returns = 0
    for i in range(nb_episodes):
        state  = env.reset()
        done = False
        while not done:
            action = np.argmax([pi[(state, a)] for a in range(env.action_space.n)])
            state, reward, done, info = env.step(action)
            if done:
                sum_returns += reward
    return sum_returns/nb_episodes

env = gym.make("FrozenLake-v0", is_slippery = True)
if not hasattr(env, 'act_space'): env.act_space = [0, 1, 2, 3]

clear_output()
print("Calculating reward ...")
Q, pi = on_policy_mc_control(env, n_episodes=10000, gamma=0.095, epsilon=0.3)  
print("Mean reward:",test_performance(pi))
print("policy : \n ", pi)

Calculating reward ...


KeyboardInterrupt: 

In [30]:
def follow_policy(pi):
        state  = env.reset()
        done = False
        while not done:
            action = np.argmax([pi[(state, a)] for a in range(env.action_space.n)])
            state, reward, done, info = env.step(action)
            env.render()
            if done:
               break
            
clear_output()
follow_policy(pi)

(Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Down)
SFFF
FHFH
FFFH
[41mH[0mFFG
