In [None]:
ENV_NAME = "Acrobot-v1"
N_ITERATIONS = 3
TRAIN_STEPS = 100000
ENV_VAR = []

In [None]:
"""
Values in an observation

cos(theta1) — cosine of the first joint angle

sin(theta1) — sine of the first joint angle

cos(theta2) — cosine of the second joint angle

sin(theta2) — sine of the second joint angle

theta1_dot — angular velocity of the first joint

theta2_dot — angular velocity of the second joint

"""

In [None]:
#### Implemented on python. No wrapper needed

#!apt-get install -y swig

#!pip install imageio box2d-py gymnasium[box2d]

!pip install stable-baselines3[extra]

In [None]:
# import libraries
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import imageio
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from IPython.display import Image, display,clear_output
from PIL import Image as PILImage, ImageDraw, ImageFont
from gymnasium.wrappers import TimeLimit
from collections import Counter

import os, builtins, math

In [None]:
# Lookup which scenarios in gymnasium are well suited for DQN (linear and lowdimensional (2D))

for env_spec in gym.registry.values():
    if hasattr(env_spec, "entry_point"):
        try:
            env = gym.make(env_spec.id)
            if isinstance(env.action_space, gym.spaces.Discrete):
                print(env_spec.id)
        except:
            continue

In [None]:
# The structure looks something like this:
"""
DummyVecEnv
└── Monitor
    └── TimeLimit
        └── CartPoleEnv
"""

#Custom reward system wrapper

class RewardShapingWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        # No shaping applied for now — default reward passed through
        return obs, reward, terminated, truncated, info

#Custom Live Render Function

class LiveRenderCallback(BaseCallback):
    def __init__(self, render_freq=1000, verbose=0):
        super().__init__(verbose)
        self.render_freq = render_freq

    def _on_step(self) -> bool:
        if self.n_calls % self.render_freq == 0:
            shot = self.training_env.render(mode="rgb_array")
            clear_output(wait=True)
            plt.imshow(shot)
            plt.axis("off")
            plt.show()
        return True


#We need to umwrap the structure until we reach the instance of monitor
def unwrap_monitor(env):
    """Recursively unwrap env until we find a Monitor."""
    current = env
    while not isinstance(current, Monitor):
        if hasattr(current, 'env'):
            current = current.env
        else:
            raise ValueError("Monitor wrapper not found in the environment stack.")
    return current

# ✅ A Gym environment → wrapped in a Monitor(logs metrics, rewards... for episodes) → then wrapped in a VecEnv (like DummyVecEnv)
def make_env():
    def _init():
        env = gym.make(ENV_NAME, render_mode="rgb_array")
        env = RewardShapingWrapper(env)
        env = Monitor(env)  # Wrap with Monitor first
        return env
    return DummyVecEnv([_init])

def test_agent(env=None, model=None, iterations=1, env_name="env", annotate_fn=None, font=None):
    import builtins
    if env is None:
        env = globals().get("env")
    if model is None:
        model = globals().get("model")

    if env is None or model is None:
        raise ValueError("env and model must be defined or passed explicitly")

    if font is None:
        try:
            font = ImageFont.truetype("DejaVuSans.ttf", 14)
        except:
            font = ImageFont.load_default()

    for i in range(iterations):
        obs = env.reset()
        step = 0
        total_reward = 0
        action_counter = Counter()
        frames = []
        done = False

        while not done:
            frame = env.render()

            img = PILImage.fromarray(frame)
            draw = ImageDraw.Draw(img)

            action, _ = model.predict(obs, deterministic=True)
            obs, rewards, dones, infos = env.step(action)
            done = dones[0]  # Since it's a single environment inside DummyVecEnv
            reward = rewards[0]
            info = infos[0]

            total_reward += reward
            action_counter[action.item()] += 1
            step += 1

            # Unwrap observation if needed
            if isinstance(obs, np.ndarray) and obs.ndim == 2:
                obs_values = obs[0]
            else:
                obs_values = obs

            # ✍️ Annotation via custom function or fallback
            if annotate_fn:
                text_lines = annotate_fn(obs_values, step, total_reward, info)
            else:
                text_lines = [
                    f"Step: {step}",
                    f"Total Reward: {total_reward:.2f}"
                ]


            y_offset = 10
            for line in text_lines:
                draw.text((10, y_offset), line, fill=(0, 0, 0), font=font)
                y_offset += 15

            frames.append(np.array(img))

        # Save and display GIF
        gif_path = f"videos/{env_name.lower()}_episode_{i}_agent.gif"
        imageio.mimsave(gif_path, frames, fps=30)
        display(Image(filename=gif_path))

        # Unwrap and plot rewards
        monitor_env = unwrap_monitor(env.envs[0])
        results = monitor_env.get_episode_rewards()

        env.close()

        print(f"\n▶️ Episode {i}")
        print(f"Total reward: {total_reward:.2f}")
        print(f"Episode length: {step} steps")
        print("Action distribution:", dict(action_counter))

        plt.figure(figsize=(12, 6))
        plt.plot(results)
        plt.xlabel("Episode")
        plt.ylabel("Reward")
        plt.title("Training Rewards per Episode")
        plt.show()

        plt.bar(action_counter.keys(), action_counter.values())
        plt.xlabel("Action")
        plt.ylabel("Frequency")
        plt.title("Action Distribution")
        plt.show()

def acrobot_annotate_fn(obs, step, total_reward, info):
    # Recover angles from cos and sin
    theta1 = math.atan2(obs[1], obs[0])
    theta2 = math.atan2(obs[3], obs[2])
    theta1_dot = obs[4]
    theta2_dot = obs[5]

    # Format angles in degrees for easier reading
    theta1_deg = math.degrees(theta1)
    theta2_deg = math.degrees(theta2)

    text_lines = [
        f"Step: {step}",
        f"Total Reward: {total_reward:.2f}",
        f"Theta1: {theta1_deg:.1f}°",
        f"Theta2: {theta2_deg:.1f}°",
        f"Theta1 dot: {theta1_dot:.2f}",
        f"Theta2 dot: {theta2_dot:.2f}",
    ]
    return text_lines

In [None]:
# Create and wrap environment (Monitor helps log rewards)

env = make_env()

# Create eval environment (for callback evaluation)
eval_env = make_env()

# Create directory to save models and videos
os.makedirs("models", exist_ok=True)
os.makedirs("videos", exist_ok=True)

# Create the DQN agent
model = DQN(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate=1e-3,
    buffer_size=10000,
    learning_starts=1000,
    batch_size=32,
    gamma=0.99,
    train_freq=4,
    target_update_interval=1000,
    exploration_fraction=0.1,
    exploration_final_eps=0.02,
    tensorboard_log="./dqn_cartpole_tensorboard/",
    seed=42,
)

# Setup evaluation callback to save best model during training
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path="./models/best_model",
    log_path="./models/",
    eval_freq=5000,
    deterministic=True,
    render=False,
)

# Train the model (you can increase total_timesteps for better results)
model.learn(total_timesteps=TRAIN_STEPS, callback=[eval_callback, LiveRenderCallback()])

# Save the final model
model.save(f"models/dqn_{ENV_NAME}_final")

# Load best saved model
model = DQN.load("./models/best_model/best_model.zip", env=env)

test_agent()

In [None]:
test_agent(env=env, model=model, iterations=N_ITERATIONS, env_name="Acrobot", annotate_fn=acrobot_annotate_fn)

In [None]:
"""
+-------------+ (step)   +-----------------+          +--------------------+
|             | action   |                 | new obs  |                    |
|   Agent     +--------->+   Environment   +--------->+     Agent again    |
|  (predict)  |          |                 | reward   |                    |
+-------------+          +-----------------+  done?   +--------------------+
                                  ^                               |
                                  |<-------------[loop]-----------+

"""

## Hyperparameter tuning with manual search

import itertools

def train_dqn_with_params(env, params):
    model = DQN(
        "MlpPolicy",
        env,
        learning_rate=params["learning_rate"],
        batch_size=params["batch_size"],
        gamma=params["gamma"],
        verbose=0,
    )
    model.learn(total_timesteps=10000)
    return model

search_space = {
    "learning_rate": [1e-3, 5e-4],
    "batch_size": [32, 64],
    "gamma": [0.95, 0.99]
}

def evaluate_model(model, env, n_episodes=5):
    rewards = []
    for _ in range(n_episodes):
        obs = env.reset()
        done = False
        total_reward = 0
        while not done:
            action, _states = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            total_reward += reward
        rewards.append(total_reward)
    return sum(rewards) / n_episodes

best_params = None
best_reward = -float('inf')
results = []

for values in itertools.product(*search_space.values()):
    params = dict(zip(search_space.keys(), values))
    print(f"Training with params: {params}")
    model = train_dqn_with_params(env, params)
    avg_reward =  (model, env)
    print(f"Average reward: {avg_reward}")

    results.append((params, avg_reward))
    if avg_reward > best_reward:
        best_reward = avg_reward
        best_params = params

print(f"Best params: {best_params} with average reward {best_reward}")

In [None]:
## Hyperparameter tuning with random search

import random

def random_search(env, search_space, n_trials=10):
    results = []
    for _ in range(n_trials):
        params = {k: random.choice(v) for k, v in search_space.items()}
        print(f"Training with params: {params}")
        model = train_dqn_with_params(env, params)
        # Evaluate model here
        results.append((params, model))
    return results

# Use same search_space as above
results = random_search(env, search_space)


In [None]:
!pip install optuna

In [None]:
## Hyperparameter tuning automated with optuna

import optuna
from stable_baselines3.common.evaluation import evaluate_policy

def objective(trial):
    params = {
        "learning_rate": trial.suggest_loguniform("learning_rate", 1e-5, 1e-3),
        "batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),
        "gamma": trial.suggest_uniform("gamma", 0.9, 0.999),
    }
    model = DQN("MlpPolicy", env, **params, verbose=0)
    model.learn(total_timesteps=10000)
    mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
    return mean_reward

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

print("Best params:", study.best_params)
