In [None]:
import os  # noqa
import sys  # noqa

module_path = os.path.abspath(os.path.join("posts/monkey-banana-mdp/code"))
# module_path = os.path.abspath(os.path.join("./code"))
sys.path.insert(0, module_path)
from environment import LineWorldEnv  # noqa
from simple_env import SimpleLineWorldEnv  # noqa
from IPython.display import Image  # noqa
import numpy as np  # noqa
import pygame  # noqa

import gymnasium as gym  # noqa
from gymnasium import spaces  # noqa

In Monte Carlo, unlike in Dynamic Programming, we remove absolute knowledge of the environment so that the agent will learn from its own experience by undergoing repeated episodes. We use the algorithm outlined below from Sutton & Barto's "Reinforcement Learning: An Introduction".


In [None]:
Image(filename="on-policy-first-visit-algo.png")

In [None]:
env = LineWorldEnv(size=3)
# env = LineWorldEnv(render_mode="human", size=5)
all_states = env.get_all_states()
q_values = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
n = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
epsilon = 0.1
gamma = 0.4

# Epsilon-soft policy (equal probability for all actions)
policy = {
    s: {a: 1 / len(env.get_possible_actions(s)) for a in env.get_possible_actions(s)}
    for s in all_states
}


for i in range(1000):
    # for i in range(3):
    print(f"Episode: {i}")

    # Generate episode
    seed = 10
    obs, info = env.reset(seed=seed)
    episode = []

    # We start at some state. It can be the same state every time.
    episode_len = 0
    while True:
        episode_len += 1

        # Choose action according to stochastic policy
        state = env.flatten_obs(obs)
        # Each key in the policy dictionary is a state, and the value is a dictionary of actions and their probabilities
        # Sample an action
        action_probabilities = policy[state]
        action = np.random.choice(
            list(action_probabilities.keys()), p=list(action_probabilities.values())
        )
        # print("Action taken: ", action)

        obs, reward, terminated, truncated, info = env.step(action)
        episode.append((state, action, reward))

        if terminated or truncated:
            break

    # Update q-value estimates based on episode generated
    returns = 0
    for j in range(len(episode) - 1, -1, -1):
        state, action, reward = episode[j]

        returns += gamma * returns + reward
        exists = any((s, a) == (state, action) for s, a, r in episode[:j])

        # We only update q-value for first-visit
        if not exists:
            n[state][action] += 1
            q_values[state][action] += (returns - q_values[state][action]) / n[state][
                action
            ]

            # Update policy with the epsilon-max q-value action
            best_action = max(q_values[state], key=q_values[state].get)
            for action in policy[state].keys():
                if action == best_action:
                    policy[state][action] = 1 - epsilon + epsilon / len(policy[state])
                else:
                    policy[state][action] = epsilon / len(policy[state])

print(policy)
print(q_values)

In [None]:
env = LineWorldEnv(render_mode="human", size=3)
obs, info = env.reset()

while True:
    action_probabilities = policy[env.flatten_obs(obs)]
    print("State: ", env.flatten_obs(obs))
    print("Action probabilities: ", action_probabilities)
    action = np.random.choice(
        list(action_probabilities.keys()), p=list(action_probabilities.values())
    )
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        env.close()
        break

In [None]:
env = LineWorldEnv(size=3)
# env = LineWorldEnv(render_mode="human", size=5)
all_states = env.get_all_states()
q_values = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
n = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
epsilon = 0.1
gamma = 0.9

# Epsilon-soft policy (equal probability for all actions)
# policy = {s: np.random.choice(env.get_possible_actions(s)) if env.get_possible_actions(s) > 0 else None for s in all_states}
policy = {}
for s in all_states:
    possible_actions = env.get_possible_actions(s)
    if len(possible_actions) > 0:  # Check if the list of possible actions is not empty
        policy[s] = np.random.choice(possible_actions)
    else:
        policy[s] = None  # or some default action or handling for states with no possible actions


for i in range(1000):
# for i in range(3):
    print(f"Episode: {i}")

    # Generate episode
    obs, info = env.reset(seed=np.random.randint(0, 1000))
    episode = []

    # We start at some state. It can be the same state every time. 
    episode_len = 0
    while True:
        episode_len += 1

        state = env.flatten_obs(obs)
        action = policy[state]
        
        obs, reward, terminated, truncated, info = env.step(action)
        episode.append((state, action, reward))

        if terminated or truncated:
          break                                    
        if episode_len > 100:
          break
    if not terminated:
      continue


    # Update q-value estimates based on episode generated
    returns = 0
    for j in range(len(episode)-1, -1, -1):
      state, action, reward = episode[j]

      returns += gamma * returns + reward
      exists = any((s, a) == (state, action) for s, a, r in episode[:j])

      # We only update q-value for first-visit
      if not exists:
        n[state][action] += 1
        q_values[state][action] += (returns - q_values[state][action]) / n[state][action]

        # Update policy with the epsilon-max q-value action
        best_action = max(q_values[state], key=q_values[state].get)
        policy[state] = best_action
  
print(policy)
print(q_values)

In [None]:
env = LineWorldEnv(render_mode="human", size=3)
obs, info = env.reset()

while True:
    print("State: ", env.flatten_obs(obs))
    action = policy[env.flatten_obs(obs)]
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        env.close()
        break

### Simple environment to debug Monte Carlo


In [None]:
env = SimpleLineWorldEnv(render_mode="human", size=5)
obs, info = env.reset()

while True:
    # Sample random action
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        env.close()
        break

In [None]:
env = SimpleLineWorldEnv(size=5)
# env = LineWorldEnv(render_mode="human", size=5)
all_states = env.get_all_states()
q_values = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
n = {s: {a: 0 for a in env.get_possible_actions(s)} for s in all_states}
epsilon = 0.1
gamma = 0.9

# Epsilon-soft policy (equal probability for all actions)
policy = {s: {a: 1/len(env.get_possible_actions(s)) for a in env.get_possible_actions(s)} for s in all_states}

# for i in range(1000):
for i in range(10):
    print(f"Episode: {i}")

    # Generate episode
    seed = 10
    obs, info = env.reset(seed=seed)
    episode = []

    # We start at some state. It can be the same state every time. 
    episode_len = 0
    while True:
        episode_len += 1

        # Choose action according to stochastic policy
        state = env.flatten_obs(obs)
        # Each key in the policy dictionary is a state, and the value is a dictionary of actions and their probabilities
        # Sample an action
        action_probabilities = policy[state]
        action = np.random.choice(list(action_probabilities.keys()), p=list(action_probabilities.values()))
        # print("Action taken: ", action)
        
        obs, reward, terminated, truncated, info = env.step(action)
        episode.append((state, action, reward))

        if terminated or truncated:
          break                                    

    # Update q-value estimates based on episode generated
    returns = 0
    for j in range(len(episode)-1, -1, -1):
      state, action, reward = episode[j]

      returns += gamma * returns + reward
      exists = any((s, a) == (state, action) for s, a, r in episode[:j])

      # We only update q-value for first-visit
      if not exists:
        n[state][action] += 1
        q_values[state][action] += (returns - q_values[state][action]) / n[state][action]

        # Update policy with the epsilon-max q-value action
        best_action = max(q_values[state], key=q_values[state].get)
        original_policy = policy[state].copy()
        for action in policy[state].keys():
          if action == best_action:
            policy[state][action] = 1 - epsilon + epsilon / len(policy[state])
          else:
            policy[state][action] = epsilon / len(policy[state])
        # Check if policy has changed
        if original_policy[state] != policy[state]:
              print("State: ", state)
              print("Original state-policy", original_policy[state])
              print("New state-policy", policy[state])

          
    print(policy)
  
print(policy)
print(q_values)

In [None]:
one = {1: 0.5, 2: 0.5}

two = {1: 0.5, 2: 0.5}

one == two