In [1]:
import gymnasium as gym
import jax
import jax.numpy as jnp
import numpy as np

# Monte Carlo

The Monte Carlo method estimates the value function for a given policy by estimating it from "experience", that is to simply averaging the returns observed after (first) visit to that state.

Suppose we wish to estimate $v_\pi(s)$, the first-visit MC method estimates $v_\pi(s)$ as the average of the returns following first visits to $s$ (note that $s$ can be visited multiple times in the same episode).

Here is the pseudocode:
1. Initialize:
- $\pi$ $\leftarrow$ policy to be evaluated
- $V$ $\leftarrow$ an arbitrary state-value function
- $Returns(s)$ $\leftarrow$ an ampty list, for all $s \in S$

2. Repeat forever:
- Generate an episode using $\pi$
- For each state $s$ appearing in the epsiode:
  - $G$ $\leftarrow$ return following the first occurence of $s$
  - Append $G$ to $Returns(s)$
  - $V(s)$ $\leftarrow$ average($Returns(s)$)

For Generalized Policy Iteration (GPI), we simply update the policy functuon $\pi$ after each episode by performing $\pi \leftarrow argmax_aQ(s,a)$ where $Q(s,a)$ is the action-value function.

However, note that to store all the returns and compute the average can be memory heavy and slow, hence implementing the MC method, we often use the following:

$$Q(s,a)_{new} ← Q(s,a)_{old} + \alpha (G_t - Q(s,a)_{old}) $$

Note that if we set $\alpha$ to be $\frac{1}{N(s,a)}$ where $N(s,a)$ is the number of visits for the state $s$ and action $a$, then we obtain the running average update, which is more memory efficient.

However, in practice, the $\alpha$ can be chosen to be other number. In fact, it is a hyperparameter that can be tweaked to enhance the learning. If $\alpha$ is small, the learning process is slower but more stable, as it moves towards the target slower, and vice versa if it is big.

Lastly, to avoid the algorithm from always choosing the same action (due to argmax), we can use the epsilon-greedy algoritm. This ensures that we have a balance of exploration and exploitation. We can do this by ensuring that for a set $\epsilon$ probability, we make sure to randomly choose the action $a$ from the action space given state $s$.

In [2]:
gamma = 1.0 #no discount factor
alpha = 0.1 #learning rate
epsilon = 0.1 #epsilon greedy algo
num_episodes = 500000

In [3]:
env = gym.make("Blackjack-v1", sab=True)

In [4]:
Q = {}
def get_Q(state, action):
    return Q.get((state, action), 0.0)

In [5]:
def policy(state):
    if np.random.rand() < epsilon:
        return env.action_space.sample()
    q_vals = [get_Q(state, a) for a in range(env.action_space.n)]
    return int(np.argmax(q_vals))

In [6]:
for ep in range(num_episodes):
    episode = []
    state, _ = env.reset()
    done = False

    # Generate full episode
    while not done:
        action = policy(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        episode.append((state, action, reward))
        state = next_state
        done = terminated or truncated

    # First-visit MC updates
    G = 0
    visited = set()
    for t in reversed(range(len(episode))):
        s, a, r = episode[t]
        G = gamma * G + r
        if (s, a) not in visited:
            visited.add((s, a))
            old_q = get_Q(s, a)
            Q[(s, a)] = old_q + alpha * (G - old_q)

print("Learned Q for state (20, 10, False):")
for a in range(env.action_space.n):
    print(a, get_Q((20, 10, False), a))

Learned Q for state (20, 10, False):
0 0.6873336141764251
1 -0.5051139654361626


# Temporal Difference Learning

Instead of running the full episode in Monte Carlo methodm, temporal difference (TD) learning wait only until the next time step, that is, after $t+1$ they immediately form a target and make an useful update using the observed reward $R_{t+1}$ and the estimate $V(S_{t+1})$. This is the simplest TD method, known as $TD(0)$, which can be adjusted to include more time steps up to $t+n$, called $n$ step TD learning. The update is as follows

$$
Q(s,a) \leftarrow Q(s,a) + \alpha (R_{t+1} + \gamma Q(s', a') - Q(s,a)) $$

The above is the SARSA method, which is an on-policy method. It learns the value of the policy it is currently following, including the exploration steps. The update rule for SARSA uses the Q-value of the next action actually taken by the agent's current policy (which includes exploration, e.g., using an epsilon-greedy strategy).

Whereas for Q-leaning method, it is an off-policy method. It learns the value of the optimal policy regardless of the agent's current exploration strategy. The update rule for Q-learning uses the Q-value of the greedy action (the action with the maximum Q-value) in the next state, effectively learning the optimal policy even while exploring sub-optimally. The update rule is as follows:

$$
Q(s,a) \leftarrow Q(s,a) + \alpha (R_{t+1} + \gamma \max_a Q(s', a') - Q(s,a)) $$


In [1]:
import gymnasium as gym
import random
import numpy as np
from collections import defaultdict
from tqdm import trange

def make_epsilon_greedy_policy(Q, nA, epsilon):
    """Returns a function that given a state returns an action using epsilon-greedy."""
    def policy_fn(state):
        if random.random() < epsilon: # epsilon-greedy
            return random.randrange(nA)
        else:
            # choose greedy action (tie-break randomly)
            qvals = [Q[(state, a)] for a in range(nA)]
            max_q = max(qvals)
            max_actions = [a for a, q in enumerate(qvals) if q == max_q]
            return random.choice(max_actions)
    return policy_fn

def run_agent(env_name="Blackjack-v1",
              method="sarsa",           # "sarsa" or "q_learning"
              num_episodes=200000,
              alpha=0.1,
              gamma=1.0,
              epsilon=0.1,
              eval_every=None):
    """
    Train a tabular SARSA or Q-learning agent on Blackjack and return learned Q and a history of evaluation rewards.
    """
    env = gym.make(env_name, sab=False)  # sab=False for standard Blackjack rules
    nA = env.action_space.n # number of possible actions (Action space)

    # Q-value store with default 0.0
    Q = defaultdict(float) # initialize

    eval_history = []

    for ep in trange(num_episodes, desc=f"Training {method}"):
        state, _ = env.reset()
        # initial action for SARSA (on-policy) must be sampled from behavior policy
        policy = make_epsilon_greedy_policy(Q, nA, epsilon)
        action = policy(state)

        done = False
        while not done:
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            if method == "sarsa":
                # choose next action via epsilon-greedy from next_state
                next_action = policy(next_state) if not done else None # epsilon greedy
                target = reward + (gamma * Q[(next_state, next_action)] if not done else 0.0)
                # update
                old_q = Q[(state, action)]
                Q[(state, action)] = old_q + alpha * (target - old_q)

                # move to next step
                state = next_state
                action = next_action

            elif method == "q_learning":
                # compute target using max_a' Q(s', a')
                if done:
                    target = reward
                else:
                    qvals_next = [Q[(next_state, a)] for a in range(nA)]
                    target = reward + gamma * max(qvals_next)

                old_q = Q[(state, action)]
                Q[(state, action)] = old_q + alpha * (target - old_q)

                # choose next action for behavior (ε-greedy) to continue exploring
                state = next_state
                action = policy(state) if not done else None

            else:
                raise ValueError("method must be 'sarsa' or 'q_learning'")

            # optional: N[(state, action)] += 1  (if tracking visits)

        # optional evaluation during training
        if eval_every is not None and (ep + 1) % eval_every == 0:
            avg_reward = evaluate_policy(env, Q, nA, num_episodes=5000)
            eval_history.append((ep + 1, avg_reward))

    env.close()
    return Q, eval_history

def evaluate_policy(env, Q, nA, num_episodes=5000):
    """Evaluate the greedy policy derived from Q by running episodes and returning mean reward."""
    total_reward = 0.0
    for _ in range(num_episodes):
        state, _ = env.reset()
        done = False
        while not done:
            # greedy action (break ties randomly)
            qvals = [Q[(state, a)] for a in range(nA)]
            max_q = max(qvals)
            max_actions = [a for a, q in enumerate(qvals) if q == max_q]
            action = random.choice(max_actions)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward
            state = next_state
    return total_reward / num_episodes

if __name__ == "__main__":
    # hyperparameters
    NUM_EPISODES = 200000
    ALPHA = 0.1
    GAMMA = 1.0
    EPSILON = 0.1

    # Train SARSA
    Q_sarsa, eval_hist_sarsa = run_agent(method="sarsa",
                                         num_episodes=NUM_EPISODES,
                                         alpha=ALPHA,
                                         gamma=GAMMA,
                                         epsilon=EPSILON,
                                         eval_every=50000)

    # Train Q-learning
    Q_qlearn, eval_hist_q = run_agent(method="q_learning",
                                      num_episodes=NUM_EPISODES,
                                      alpha=ALPHA,
                                      gamma=GAMMA,
                                      epsilon=EPSILON,
                                      eval_every=50000)

    # Final evaluation
    env = gym.make("Blackjack-v1", sab=False)
    nA = env.action_space.n
    avg_reward_sarsa = evaluate_policy(env, Q_sarsa, nA, num_episodes=20000)
    avg_reward_qlearn = evaluate_policy(env, Q_qlearn, nA, num_episodes=20000)
    env.close()

    print(f"\nFinal greedy-policy average reward (SARSA):   {avg_reward_sarsa:.4f}")
    print(f"Final greedy-policy average reward (Q-learning): {avg_reward_qlearn:.4f}")

    # Example: show Q for one state
    example_state = (20, 10, False)  # player's sum=20, dealer showing 10, usable ace False
    print("\nQ-values for state", example_state)
    print(" SARSA:", [Q_sarsa[(example_state, a)] for a in range(nA)])
    print(" Q-Learn:", [Q_qlearn[(example_state, a)] for a in range(nA)])


Training sarsa: 100%|██████████| 200000/200000 [00:33<00:00, 5937.57it/s]
Training q_learning: 100%|██████████| 200000/200000 [00:27<00:00, 7164.32it/s]



Final greedy-policy average reward (SARSA):   -0.0862
Final greedy-policy average reward (Q-learning): -0.0682

Q-values for state (20, 10, False)
 SARSA: [0.5618423406970627, -0.82055376955104]
 Q-Learn: [0.2508874480609621, -0.9807600251803228]
