 $ Understanding \ Tabular \ Learning \ and \ the \ Bellman \ Equation. $
####
- *Solving FrozenLake*

##### $ Classes \ and \ Functions $

In [3]:
import time
import collections
import numpy as np
import gymnasium as gym
from tensorboardX import SummaryWriter

class AgentClass:
    """ This agent use Bellman Equation to run n scenario and find the best 
    and fastes solution using `V(s) = max_a { sum_s' { P(s, a, s') [R(s, a, s') + gamma * V(s')] } }`
    Parameters:
    - env_name : str, environment name 
    - kwargs: dict, other environment parameters"""

    def __init__(self, env_name:str, gamma:float=0.95, **kwargs:dict):
        self.env = gym.make(id=env_name, **kwargs)
        self.state = self.env.reset()
        self.rewards = collections.defaultdict(float) # keys: source-state , action, target-state
        self.transits = collections.defaultdict(collections.Counter) # experience transition keys: state, action
        self.values = collections.defaultdict(float)
        self.GAMMA = gamma

    def play_n_random_steps (self, count:int):

        # Initiate the Environment
        for _ in range(count):
            action = self.env.action_space.sample()  #0: Move left, 1: Move down, 2: Move right, 3: Move up
            new_state, reward, is_done, _ , _ = self.env.step(action)

            # Adding reward and state of each posible scenario 
            self.rewards[f"{(self.state, action, new_state)}"] = reward  # this is the part that capture P(s, a, s')
            self.transits[f"{(self.state, action)}"][new_state] += 1 # if the pattern P(s, a, s') is found and the state is found again +1 reward is added
            self.state = self.env.reset() if is_done else new_state

    def calculate_action_value (self, state:tuple, action:int):
        target_counts = self.transits[f"{(self.state, action)}"]
        total = sum(target_counts.values())
        action_value = 0.0

        for tgt_state, count in target_counts.items():
            reward = self.rewards[f"{(self.state, action, tgt_state)}"]
            val = reward + self.GAMMA * self.values[tgt_state] # P(s, a, s') [R(s, a, s') + gamma * V(s')]
            action_value += (count/total) * val 
        return action_value
    
        # ========================== Note - Important ===========================
        # The reason why we weight the values by their probability of occurrence 
        # is to ensure that we are giving more importance to the states and 
        # actions that have been more successful in the past, 
        # and less importance to the ones that have been less successful.
        # =======================================================================
        
        
    def select_action(self,state):
        """ Filter and select the best action from the previous n-scenario previously run"""
        best_action, best_value = None, None
        for action in range(self.env.action_space.n): # Take one action Up Down Left Right
            action_value = self.calculate_action_value(state=state, action=action) # Filter the best scenario using that action
            if best_action is None or best_value < action_value: # Update the best action and value
                best_value = action_value
                best_action = action

        return best_action 
    

    def play_episode (self, env_t : gym.make):
        """ Run Iteration on a new environment to find the best solution
        Parameteres:
        - env_t : gym.make eviroment"""

        total_reward = 0.0
        state = env_t.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, _, _ = env_t.step(action)
            self.rewards[f"{(self.state, action, new_state)}"] = reward  
            self.transits[f"{(self.state, action)}"][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward
    
    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [self.calculate_action_value(state, action)for action in range(self.env.action_space.n)]
            self.values[state] = max(state_values)

    def close(self):
        self.env.close()


#### $ Running \ Experiments $

In [None]:
# Intiate Variables & Enviroments
ENV_NAME = 'FrozenLake-v1'
RENDER_MODE = None
NUM_EPISODE = 200

# Initiate Agent 
Agent = AgentClass(env_name=ENV_NAME)
tst_env = gym.make(id=ENV_NAME, render_mode = RENDER_MODE)
writer = SummaryWriter(comment="-v-iteration")

iter_no = 0
best_reward = 0.0
while True:
    
    # Run Scenario & Check Best Result
    iter_no += 1
    Agent.play_n_random_steps(50)
    Agent.value_iteration()

    reward = 0.0

    # Run the Test Environment 
    for _ in range(NUM_EPISODE):
        reward += Agent.play_episode(tst_env)
        reward /= NUM_EPISODE
        writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"Best reward updated {best_reward} -> {reward}")
        best_reward = reward

    if reward > 0.9:
        print(f"Solved in {iter_no} interations!")
        break
    writer.close()
    tst_env.close()
    Agent.close()


Best reward updated 0.0 -> 0.25


In [11]:
tst_env.close()
Agent.close()

In [None]:
Skip to content
Search or jump to…
Pulls
Issues
Codespaces
Marketplace
Explore
 
@raudez77 
PacktPublishing
/
Deep-Reinforcement-Learning-Hands-On-Second-Edition
Public
Fork your own copy of PacktPublishing/Deep-Reinforcement-Learning-Hands-On-Second-Edition
Code
Issues
13
Pull requests
8
Actions
Security
Insights
Deep-Reinforcement-Learning-Hands-On-Second-Edition/Chapter05/01_frozenlake_v_iteration.py /
@Shmuma
Shmuma Reformat and plots
Latest commit c4d5255 on Nov 16, 2019
 History
 1 contributor
Executable File  96 lines (84 sloc)  3.19 KB
 

#!/usr/bin/env python3
import gym
import collections
from tensorboardX import SummaryWriter

ENV_NAME = "FrozenLake-v0"
#ENV_NAME = "FrozenLake8x8-v0"      # uncomment for larger version
GAMMA = 0.9
TEST_EPISODES = 20


class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(
            collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            new_state, reward, is_done, _ = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            self.state = self.env.reset() \
                if is_done else new_state

    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            val = reward + GAMMA * self.values[tgt_state]
            action_value += (count / total) * val
        return action_value

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, _ = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.calc_action_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.values[state] = max(state_values)


if __name__ == "__main__":
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-v-iteration")

    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        agent.play_n_random_steps(100)
        agent.value_iteration()

        reward = 0.0
        for _ in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)
            reward /= TEST_EPISODES
            writer.add_scalar("reward", reward, iter_no)
        if reward > best_reward:
            print("Best reward updated %.3f -> %.3f" % (
                best_reward, reward))
            best_reward = reward
        if reward > 0.80:
            print("Solved in %d iterations!" % iter_no)
            break
    writer.close()
Footer
© 2023 GitHub, Inc.
Footer navigation
Terms
Privacy
Security
Status
Docs
Contact GitHub
Pricing
API
Training
Blog
About
