<a href="https://colab.research.google.com/github/rohitgunasekaran/monte-carlo-control/blob/main/MonteCarloControlExp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
import warnings ; warnings.filterwarnings('ignore')

import gym
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [27]:
pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk

Collecting gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-install-ht3dn00t/gym-walk_8cf7fa9beaee4806a141f9007239cc89
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-install-ht3dn00t/gym-walk_8cf7fa9beaee4806a141f9007239cc89
  Resolved https://github.com/mimoralea/gym-walk to commit b915b94cf2ad16f8833a1ad92ea94e88159279f5
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [28]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [29]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [40]:
env, _ = gym.make('FrozenLake-v1', render_mode='ansi', new_step_api=True, is_slippery=False)
P = env.env.P
init_state, _ = env.reset()
#goal_state = 6
#LEFT, RIGHT = range(2)

In [31]:
P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

Exponentially decaying schedule


In [32]:
def decay_schedule(
    init_value, min_value, decay_ratio,
    max_steps, log_start = -2, log_base=10):

  decay_steps = int(max_steps * decay_ratio)
  rem_steps = max_steps - decay_steps
  values = np.logspace(log_start, 0, decay_steps, base=log_base)
  values = (init_value - min_value) * values + min_value
  values = np.pad(values, (0, rem_steps), 'edge')

  return values

Exploratory Policy Trajectories

In [47]:
from itertools import count
import numpy as np

def generate_trajectory(
    select_action, Q, epsilon,
    env, max_steps=200):
  done, trajectory = False, []

  state = env.reset()
  for t in count():
      action = select_action(state, Q, epsilon)
      next_state, reward, done, _ = env.step(action)
      trajectory.append((state, action, reward, next_state))
      state = next_state
      if isinstance(done, np.bool_):
        done = bool(done)
      if done or t >= max_steps - 1:
          break

  return np.array(trajectory, np.object)

Monte Carlo control

In [49]:
def mc_control (env, gamma = 1.0,
                init_alpha = 0.5,min_alpha = 0.01, alpha_decay_ratio = 0.5,
                init_epsilon = 1.0, min_epsilon = 0.1, epsilon_decay_ratio = 0.9,
                n_episodes = 3000, max_steps = 200, first_visit = True):
  # Ensure env is the environment object, not a tuple
  if isinstance(env, tuple):
      env = env[0]

  nS, nA = env.observation_space.n, env.action_space.n

  Q = np.zeros((nS, nA))
  V = np.zeros(nS)
  pi = np.zeros(nS, dtype=int)

  returns = {(s, a): [] for s in range(nS) for a in range(nA)}

  alpha_schedule = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
  epsilon_schedule = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

  def select_action(state, Q, epsilon):
      if random.random() > epsilon:
          return np.argmax(Q[state])
      else:
          return random.randrange(nA)

  for i in range(n_episodes):
      epsilon = epsilon_schedule[i]
      alpha = alpha_schedule[i]

      trajectory = generate_trajectory(select_action, Q, epsilon, env, max_steps)
      states, actions, rewards, next_states = trajectory[:,0], trajectory[:,1], trajectory[:,2], trajectory[:,3]

      G = 0
      for t in reversed(range(len(trajectory))):
          s, a, r, ns = states[t], actions[t], rewards[t], next_states[t]
          G = gamma * G + r

          if first_visit and (s, a) not in [(states[i], actions[i]) for i in range(t)]:
              returns[(s, a)].append(G)
              Q[s, a] = np.mean(returns[(s, a)])
              V[s] = np.max(Q[s])
              pi[s] = np.argmax(Q[s])
          elif not first_visit:
              returns[(s, a)].append(G)
              Q[s, a] = np.mean(returns[(s, a)])
              V[s] = np.max(Q[s])
              pi[s] = np.argmax(Q[s])


  #return Q, V, pi, Q_track, pi_track
  return Q, V, pi

In [52]:
optimal_Q, optimal_V, optimal_pi = mc_control(env)
print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='State-value function:')
print_policy(optimal_pi, P)

State-value function:
| 00    0.0 | 01    0.0 | 02    0.0 | 03    0.0 |
| 04   0.01 |           | 06    0.0 |           |
| 08   0.02 | 09   0.08 | 10   0.33 |           |
|           | 13   0.21 | 14    1.0 |           |
Policy:
| 00      v | 01      ^ | 02      < | 03      < |
| 04      v |           | 06      < |           |
| 08      > | 09      > | 10      v |           |
|           | 13      > | 14      > |           |


In [51]:
import numpy as np

# Monkey-patch temporarily to avoid crash and log usage
if not hasattr(np, "object"):
    np.object = object
    print("⚠️ Patched np.object → object (temporary fix)")


⚠️ Patched np.object → object (temporary fix)
