In [None]:
from typing import Optional
from gymnasium.envs.classic_control.pendulum import PendulumEnv

import numpy as np

def angle_normalize(x):
    return ((x + np.pi) % (2 * np.pi)) - np.pi

class MyPendulumEnv(PendulumEnv):
    def __init__(self, render_mode: Optional[str] = None, g=10.0):
        super().__init__(render_mode, g)
        self.m = 1.5
        self.l = 0.8
        self.b = 0.2
    
    def step(self, u):
        th, thdot = self.state  # th := theta

        g = self.g
        m = self.m
        l = self.l
        b = self.b
        dt = self.dt

        u = np.clip(u, -self.max_torque, self.max_torque)[0]
        self.last_u = u  # for rendering
        costs = angle_normalize(th) ** 2 + 0.1 * thdot**2 + 0.001 * (u**2)

        newthdot = thdot + (u / (m * l**2) - g / l * np.sin(th) - b * thdot / (m * l**2)) * dt
        newthdot = np.clip(newthdot, -self.max_speed, self.max_speed)
        newth = th + newthdot * dt

        terminated = True if np.abs(angle_normalize(th) - np.pi) < 0.1 else False

        self.state = np.array([newth, newthdot])
        return self._get_obs(), -costs, terminated, False, {}

In [None]:
!rm -rf videos-sb videos-my ppo-pendulum-sb

In [None]:
from gymnasium.envs.registration import register

register("Pendulum-v2", entry_point=MyPendulumEnv)

In [None]:
import os
os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecVideoRecorder
from datetime import datetime

vec_env = VecVideoRecorder(venv=make_vec_env("Pendulum-v2", n_envs=1), video_folder="./videos-sb", name_prefix="pendulum", record_video_trigger=lambda x: x % 10240 == 0)
model = PPO("MlpPolicy", vec_env, n_steps=1024, verbose=1, tensorboard_log="./ppo-pendulum-sb/", device="mps")
model.learn(total_timesteps=20480, tb_log_name="run-"+datetime.now().strftime("%Y%m%d-%H%M%S"))
model.save("ppo_pendulum")

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecVideoRecorder

vec_env = VecVideoRecorder(venv=make_vec_env("Pendulum-v2", n_envs=1), video_folder="./videos-sb", name_prefix="pendulum", record_video_trigger=lambda x: x % 10240 == 0)

model = PPO.load("ppo_pendulum")

obs = vec_env.reset()
vec_env.start_video_recorder()
for _ in range(1024):
    action, _states = model.predict(obs)
    obs, rewards, dones, info = vec_env.step(action)
    vec_env.render(mode='rgb_array')
vec_env.close_video_recorder()
vec_env.close()

In [None]:
%load_ext tensorboard
%tensorboard --logdir ./ppo-pendulum-sb/

<video src="videos-sb/pendulum-step-10240-to-step-10440.mp4" controls autoplay loop />

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal

import matplotlib.pyplot as plt

from dataclasses import dataclass

class ActorCritic(nn.Module):
    def __init__(self, env: MyPendulumEnv):
        super().__init__()
        self.critic = nn.Sequential(
            self.layer_init(nn.Linear(np.prod(env.observation_space.shape), 64)),
            nn.Tanh(),
            self.layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            self.layer_init(nn.Linear(64, 1), std=1.0)
        )
        self.actor = nn.Sequential(
            self.layer_init(nn.Linear(np.prod(env.observation_space.shape), 64)),
            nn.Tanh(),
            self.layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            self.layer_init(nn.Linear(64, np.prod(env.action_space.shape)), std=0.01)
        )
        self.actor_logstd = nn.Parameter(torch.zeros(1, np.prod(env.action_space.shape)))
    
    def layer_init(self, layer, std=np.sqrt(2), bias=0):
        nn.init.orthogonal_(layer.weight, std)
        nn.init.constant_(layer.bias, bias)
        return layer

    def get_value(self, x):
        return self.critic(x)

    def get_action_and_value(self, x, action=None):
        action_mean = self.actor(x)
        action_logstd = self.actor_logstd.expand_as(action_mean)
        action_std = torch.exp(action_logstd)
        probs = Normal(action_mean, action_std)
        if action is None:
            action = probs.sample()
        return action, probs.log_prob(action).sum(1), probs.entropy().sum(1), self.critic(x)

@dataclass
class PPOConfig:
    n_steps: int = 2048 # Number of steps to run for each environment per update
    learning_rate: float = 3e-4 # Learning rate
    eps: float = 1e-5 # Adam epsilon
    gamma: float = 0.99 # Discount factor
    num_envs: int = 1 # Number of environments
    num_steps: int = 2048 # Number of steps
    gae_lambda: float = 0.95 # Lambda for GAE
    clip_coef: float = 0.2 # Clip parameter for PPO
    vf_coef: float = 0.5 # Value function coefficient
    ent_coef: float = 0.01 # Entropy coefficient
    num_minibatches: int = 64 # Number of minibatches
    total_timesteps: int = 102400 # Total number of steps
    batch_size: int = 2048 # Batch size
    minibatch_size = 32 # Minibatch size
    update_epochs: int = 10 # Number of epochs
    max_grad_norm: float = 0.5 # Maximum gradient norm

class MyPPO:
    def __init__(self, n_epochs, env: MyPendulumEnv):
        self.n_epochs = n_epochs
        self.env = env
        self.device = torch.device("cpu")
        self.config = PPOConfig()
        self.rewards = []
    
    def learn(self):
        device = self.device
        agent = ActorCritic(self.env).to(device)
        print(agent)
        optimizer = optim.Adam(agent.parameters(), lr=self.config.learning_rate, eps=self.config.eps)

        obs = torch.zeros((self.config.num_steps, self.config.num_envs) + self.env.observation_space.shape).to(self.device)
        actions = torch.zeros((self.config.num_steps, self.config.num_envs) + self.env.action_space.shape).to(device)
        logprobs = torch.zeros((self.config.num_steps, self.config.num_envs)).to(device)
        rewards = torch.zeros((self.config.num_steps, self.config.num_envs)).to(device)
        dones = torch.zeros((self.config.num_steps, self.config.num_envs)).to(device)
        values = torch.zeros((self.config.num_steps, self.config.num_envs)).to(device)

        global_step = 0
        next_obs = torch.Tensor(self.env.reset()).to(device)
        next_done = torch.zeros(self.config.num_envs).to(device)
        num_updates = int(self.config.total_timesteps // self.config.batch_size)

        for update in range(1, num_updates + 1):
            frac = 1.0 - (update - 1.0) / num_updates
            lrnow = frac * self.config.learning_rate
            optimizer.param_groups[0]["lr"] = lrnow

            for step in range(0, self.config.num_steps):
                global_step += 1 * self.config.num_envs
                obs[step] = next_obs
                dones[step] = next_done

                with torch.no_grad():
                    action, logprob, _, value = agent.get_action_and_value(next_obs)
                    values[step] = value.flatten()
                actions[step] = action
                logprobs[step] = logprob

                next_obs, reward, done, _ = self.env.step(action.cpu().numpy())
                rewards[step] = torch.tensor(reward).to(device).view(-1)
                next_obs, next_done = torch.Tensor(next_obs).to(device), torch.Tensor(done).to(device)

                self.rewards.append(reward)

                if done:
                    self.env.reset()

            with torch.no_grad():
                next_value = agent.get_value(next_obs).reshape(1, -1)
                advantages = torch.zeros_like(rewards).to(device)
                lastgaelam = 0
                for t in reversed(range(self.config.num_steps)):
                    if t == self.config.num_steps - 1:
                        nextnonterminal = 1.0 - next_done
                        nextvalues = next_value
                    else:
                        nextnonterminal = 1.0 - dones[t + 1]
                        nextvalues = values[t + 1]
                    delta = rewards[t] + self.config.gamma * nextvalues * nextnonterminal - values[t]
                    advantages[t] = lastgaelam = delta + self.config.gamma * self.config.gae_lambda * nextnonterminal * lastgaelam
                returns = advantages + values

            b_obs = obs.reshape((-1,) + self.env.observation_space.shape)
            b_logprobs = logprobs.reshape(-1)
            b_actions = actions.reshape((-1,) + self.env.action_space.shape)
            b_advantages = advantages.reshape(-1)
            b_returns = returns.reshape(-1)
            b_values = values.reshape(-1)

            b_inds = np.arange(self.config.batch_size)
            clipfracs = []
            for _ in range(self.config.update_epochs):
                np.random.shuffle(b_inds)
                for start in range(0, self.config.batch_size, self.config.minibatch_size):
                    end = start + self.config.minibatch_size
                    mb_inds = b_inds[start:end]

                    _, newlogprob, entropy, newvalue = agent.get_action_and_value(b_obs[mb_inds], b_actions[mb_inds])
                    logratio = newlogprob - b_logprobs[mb_inds]
                    ratio = logratio.exp()

                    with torch.no_grad():
                        clipfracs += [((ratio - 1.0).abs() > self.config.clip_coef).float().mean().item()]

                    mb_advantages = b_advantages[mb_inds]

                    pg_loss1 = -mb_advantages * ratio
                    pg_loss2 = -mb_advantages * torch.clamp(ratio, 1 - self.config.clip_coef, 1 + self.config.clip_coef)
                    pg_loss = torch.max(pg_loss1, pg_loss2).mean()

                    newvalue = newvalue.view(-1)
                    v_loss_unclipped = (newvalue - b_returns[mb_inds]) ** 2
                    v_clipped = b_values[mb_inds] + torch.clamp(
                        newvalue - b_values[mb_inds],
                        -self.config.clip_coef,
                        self.config.clip_coef,
                    )
                    v_loss_clipped = (v_clipped - b_returns[mb_inds]) ** 2
                    v_loss_max = torch.max(v_loss_unclipped, v_loss_clipped)
                    v_loss = 0.5 * v_loss_max.mean()

                    entropy_loss = entropy.mean()
                    loss = pg_loss - self.config.ent_coef * entropy_loss + v_loss * self.config.vf_coef

                    optimizer.zero_grad()
                    loss.backward()
                    nn.utils.clip_grad_norm_(agent.parameters(), self.config.max_grad_norm)
                    optimizer.step()
    
    def plot_rewards(self):
        print(f"rewards={self.rewards}")
        plt.plot(self.rewards)
        plt.title("Pendulum-v2 rewards")
        plt.xlabel("Episode")
        plt.ylabel("Reward")
        plt.show()

In [None]:
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecVideoRecorder


vec_env = VecVideoRecorder(venv=make_vec_env("Pendulum-v2", n_envs=1), video_folder="./videos-my", name_prefix="pendulum", record_video_trigger=lambda x: x % 10240 == 0)
model = MyPPO(n_epochs=4, env=vec_env)
model.learn()

In [None]:
model.plot_rewards()