## Install required dependencies:

In [None]:
%pip install gymnasium
%pip install stable-baselines3
%pip install swig
%pip install 'gymnasium[box2d]'
%pip install numpy
%pip install torch

## Custom PPO Setup

Uses custom LSTM+PPO PyTorch neural network setup/architecture for RL self-play (inspired by OpenAI's Dota 2 paper) and SB3's PPO implementation to train the agent with.

In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
# from pettingzoo.butterfly import cooperative_pong_v5
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecEnv, DummyVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import BaseCallback

class MultiAgentPPO(gym.Env):
    '''
    Define the class for the custom multi-agent PPO
    Use LSTM + PPO NN for self-play
    '''

    pass

class CustomNN(BaseFeaturesExtractor):
    def __init__(self, obs_space: gym.spaces.Box, features_dim: int):
        super().__init__(obs_space, features_dim)
        # Layers
        self.input = nn.Linear(in_features=obs_space.shape[0], out_features=obs_space.shape[0]) #input, output
        self.LSTM = nn.LSTM(input_size=obs_space.shape[0], hidden_size=512, num_layers=1, batch_first=True)
        self.hidden_1 = nn.Linear(in_features=512, out_features=128)
        self.relu = nn.ReLU()
        self.hidden_2 = nn.Linear(in_features=128, out_features=64)
        # ReLU here
        self.output = nn.Linear(in_features=64, out_features=features_dim)

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        x = self.input(observations)

        # Adjust to 3D Tensor for LSTM if currently 2D
        if observations.dim() == 2:
            observations = observations.unsqueeze(1)

        x, _ = self.LSTM(x)
        x = self.hidden_1(x.squeeze(1)) #Use last hidden state of LSTM
        x = self.relu(x)
        x = self.hidden_2(x)
        x = self.relu(x)
        x = self.output(x)
        return x

class CustomPolicy(ActorCriticPolicy):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs,
                            features_extractor_class=CustomNN,
                            features_extractor_kwargs=dict(features_dim=64))
class Logger(BaseCallback):
    '''
    This function logs the agent's rewards and policy losses overtime 
    '''
    def __init__(self):
        super().__init__()
        self.rewards = []
        self.losses = []

    def _on_step(self) -> bool:
        reward = self.locals.get("rewards", [])
        loss = self.locals.get("loss", None)
    
        if reward:
            self.rewards.append(reward)
        if loss:
            self.losses.append(loss)
        
        return True

if __name__ == '__main__':
    # Currently testing on Lunar Lander
    env = gym.make("LunarLander-v2")

    model = PPO(policy=CustomPolicy, env=env, verbose=0) #Verbose set to zero to prevent excessive output logging
    logger = Logger()
    model.learn(total_timesteps=100000, callback=logger)
    model.save("custom_ppo_lunar_lander")

    obs = env.reset()
    done = False
    while not done:
        actions, states = model.predict(obs, deterministic=True)
        obs, rewards, dones, info = env.step(actions)
        env.render()
        done = any(dones)

## Plotting

In [None]:
# Visualize the rewards and losses
import matplotlib.pyplot as plt

plt.plot(logger.rewards)
plt.title("Rewards over Episodes")
plt.xlabel("Episodes")
plt.ylabel("Rewards")
plt.show()

plt.plot(logger.losses)
plt.title("Policy Losses over Timesteps")
plt.xlabel("Timesteps")
plt.ylabel("Loss")
plt.show()