In [1]:
import gymnasium as gym
import numpy as np
from collections import defaultdict

In [2]:
class AcrobotAgent:

    def __init__(self,
                 env: gym.Env, 
                 learning_rate: float,
                 initial_epsilon: float, 
                 epsilon_decay: float,
                 final_epsilon: float,
                 discount_factor: float = 0.95):

        """
        Initialize a Q-learning agent here. 

        Inputs:

        1. env: the acrobot environment
        2. learning_rate: learning rate of the Q-value function
        3. initial_epsilon: initial rate of exploration
        4. epsilon_decay: rate of decay of the exploration factor
        5. final_epsilon: final value of epsilon
        6. discount_factor: used to compute rewards: tradeoff between long term and short term rewards

        """

        self.env = env
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))
        self.lr = learning_rate
        self.initeps = initial_epsilon
        self.epsdecay = epsilon_decay
        self.fineps = final_epsilon
        self.disfac = discount_factor
        self.training_error = []

    
    def choose_action(self, obs: tuple[float, float, float, float, float, float]) -> int:

        # choose epsilon greedy method to choose the next action
        if np.random.random() < self.initeps:
            return self.env.action_space.sample()
        else:
            return int(np.argmax(self.q_values[obs]))
    
    def update_state(self,
                     obs: tuple[float, float, float, float, float, float],
                     action: int,
                     reward: float,
                     terminated: float,
                     next_obs: tuple[float, float, float, float, float, float],
                    ):
        
        # extract current q value of the current state, and the next action you are goint to take)
        cq = self.q_values[obs][action]

        # now you already took this action, and then reached the next state:
        # extract what best can you do from the next state:
        # but that will only happen if you are not already terminated
        fq = (not terminated) * np.max(self.q_values[next_obs])

        # compute next Q values using bellman equation backup
        nq = reward + self.disfac*fq

        # error in our current Q value estimate:
        error = nq - cq

        # update Q value table: use learning rate parameter
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * error
        )

        self.training_error.append(error)

    
    def decay_epsilon(self):

        self.initeps = max(self.fineps, self.initeps - self.epsdecay)

# Train the Agent Now

In [11]:
# Training hyper-parameters
learning_rate = 0.01
n_episodes = 1000
start_epsilon = 1.0
epsilon_decay = (start_epsilon)/(n_episodes/2)
final_epsilon = 0.1

# create environment
env = gym.make("Acrobot-v1")
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)
env.unwrapped.book_or_nips = 'nips'

agent = AcrobotAgent(env=env,
                     learning_rate=learning_rate,
                     initial_epsilon=start_epsilon,
                     epsilon_decay=epsilon_decay,
                     final_epsilon=final_epsilon,
                     )

In [12]:
from tqdm import tqdm

for episode in tqdm(range(n_episodes)):

    obs, info = env.reset()
    done  = False

    while not done:
        
        action = agent.choose_action(tuple(obs))

        next_obs, reward, terminated, truncated, info = env.step(action)

        agent.update_state(tuple(obs), action, reward, terminated, tuple(next_obs))

        done = terminated or truncated
        obs = next_obs
    
    agent.decay_epsilon()

100%|██████████| 1000/1000 [00:25<00:00, 39.84it/s]


# Visualize Results