In [13]:
import gym
import collections
from tensorboardX import SummaryWriter

ENV_NAME="FrozenLake-v0"
GAMMA=0.9
TEST_EPISODES=20

class Agent:
    def __init__(self):
        self.env=gym.make(ENV_NAME)
        self.state=self.env.reset()
        self.rewards=collections.defaultdict(float)
        self.transits=collections.defaultdict(collections.Counter)
        self.values=collections.defaultdict(float)
    
    # play n random steps to populate immediate action rewards table and transition table
    # different to cross-entropy, vlearning starts learning after first STEP instead of first batch of EPISODES
    def play_n_random_steps(self,count):
        for _ in range(count):
            action=self.env.action_space.sample()
            new_state,reward,is_done,_=self.env.step(action)
            # rewards table: immediate reward gained by an action from a state
            #     in the case, most reward should be 0, since reward is 1 only when the next/target state succeeds
            self.rewards[(self.state,action,new_state)]=reward
            # transitions table: count the number of next state by an action from a state
            self.transits[(self.state,action)][new_state]+=1
            self.state=self.env.reset() if is_done else new_state
    
    def calc_action_value(self,state,action):
        target_counts=self.transits[(state,action)]
        total=sum(target_counts.values())
        action_value=0.0
        for tgt_state,count in target_counts.items():
            reward=self.rewards[(state,action,tgt_state)]
            # Bellman equation
            action_value+=(count/total)*(reward+GAMMA*self.values[tgt_state])
        return action_value
    
    def select_action(self,state):
        best_action,best_value=None,None
        for action in range(self.env.action_space.n):
            action_value=self.calc_action_value(state,action)
            if best_value is None or best_value < action_value:
                best_value=action_value
                best_action=action
        return best_action
    
    def play_episode(self,env):
        total_reward=0.
        state=env.reset()
        while True:
            action=self.select_action(state)
            new_state,reward,is_done,_=env.step(action)
            self.rewards[(state,action,new_state)]=reward
            self.transits[(state,action)][new_state]+=1
            total_reward+=reward
            if is_done:
                break
            state=new_state
        return total_reward
    
    # populate values table based on transitions table and rewards table
    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values=[self.calc_action_value(state,action) for action in range(self.env.action_space.n)]
            self.values[state]=max(state_values)

In [14]:
test_env=gym.make(ENV_NAME)
agent=Agent()
writer=SummaryWriter(comment="-v-learning")
iter_no=0
best_reward=0.
while True:
    iter_no += 1
    agent.play_n_random_steps(100)
    agent.value_iteration()
    reward=0.
    for _ in range(TEST_EPISODES):
        reward+=agent.play_episode(test_env)
    reward/=TEST_EPISODES
    writer.add_scalar("reward",reward,iter_no)
    if reward>best_reward:
        print(f"Best reward updated {round(best_reward,3)} -> {round(reward,3)}")
        best_reward=reward
    if reward > 0.8:
        print(f"Solved in {iter_no} iterations!")
        break
writer.close()

Best reward updated 0.0 -> 0.2
Best reward updated 0.2 -> 0.25
Best reward updated 0.25 -> 0.4
Best reward updated 0.4 -> 0.5
Best reward updated 0.5 -> 0.8
Best reward updated 0.8 -> 0.85
Solved in 20 iterations!
