In [None]:
import sys
USING_COLAB = 'google.colab' in sys.modules

if USING_COLAB:
    !apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
    !pip install -U renderlab
    !pip install -U colabgymrender
    !pip install -U moviepy==0.2.3.5
    !pip install imageio==2.4.1
    !pip install --upgrade AutoROM
    !AutoROM --accept-license
    !pip install gymnasium
    !pip install flappy-bird-gymnasium
    !pip install gym[classic_control] > /dev/null 2>&1
    !pip install stable_baselines3

import numpy as np
import gymnasium as gym
import random
import matplotlib.pyplot as plt
from copy import deepcopy
import flappy_bird_gymnasium

from torch.utils.data import DataLoader
from torch import nn
import torch

from tqdm import tqdm, trange

seed = 24
data_seed = 700

## Helper functions

In [None]:
def reseed(seed, env=None):
    '''
        Sets the seed for reproducibility

        When @param env is provided, also sets the
        random number generataor of the gym environment
        to this particular seed
    '''
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    if env is not None:
        env.unwrapped._np_random = gym.utils.seeding.np_random(seed)[0]

reseed(seed)

In [None]:
def visualize(env_name='FlappyBird-v0', algorithm=None, video_name="test", env_args={}):
    """Visualize a policy network for a given algorithm on a single episode

        Args:
            env_name: Name of the gym environment to roll out `algorithm` in, it will be instantiated using gym.make or make_vec_env
            algorithm (PPOActor): Actor whose policy network will be rolled out for the episode. If
            no algorithm is passed in, a random policy will be visualized.
            video_name (str): Name for the mp4 file of the episode that will be saved (omit .mp4). Only used
            when running on local machine.
    """

    def get_action(obs):
        if not algorithm:
            return env.action_space.sample()
        else:
            return algorithm.select_action(obs)

    if USING_COLAB:
        from renderlab import RenderFrame

        directory = './video'
        env_args['render_mode'] = 'rgb_array'
        env = gym.make(env_name, **env_args)
        env = RenderFrame(env, directory)
        obs, info = env.reset()

        for i in range(1000):
            action = get_action(obs)
            obs, reward, done, truncate, info = env.step(action)

            if done:
                break

        env.play()
    else:
        import cv2

        video = cv2.VideoWriter(f"{video_name}.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 24, (600,400))

        env_args['render_mode'] = 'rgb_array'
        env = gym.make(env_name, **env_args)
        obs, info = env.reset()

        for i in range(500):
            action = get_action(obs)
            res = env.step(action)
            obs, reward, done, truncate, info = res

            if done:
                break

            im = env.render()
            im = im[:,:,::-1]

            video.write(im)

        video.release()
        env.close()
        print(f"Video saved as {video_name}.mp4")

In [None]:
flappy_env_name = "FlappyBird-v0"
visualize(env_name=flappy_env_name)

In [None]:
def evaluate_policy(actor, environment, num_episodes=100, progress=True):
    '''
        Returns the mean trajectory reward of rolling out `actor` on `environment

        Parameters
        - actor: PPOActor instance, defined in Part 1
        - environment: classstable_baselines3.common.vec_env.VecEnv instance
        - num_episodes: total number of trajectories to collect and average over
    '''
    total_rew = 0

    iterate = (trange(num_episodes) if progress else range(num_episodes))
    for _ in iterate:
        obs = environment.reset()
        done = False

        while not done:
            action = actor.select_action(obs)

            next_obs, reward, done, info = environment.step(action)
            total_rew += reward

            obs = next_obs

    return (total_rew / num_episodes).item()

## PPO algorithm

In [None]:
# Dependencies:

from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env.base_vec_env import VecEnv

hyperparameters = {
    "n_steps": 10000,
    "policy_kwargs": {
        "net_arch": {
            "pi": [32, 32],
            "vf": [32, 32],
            "activation_fn": "tanh",
        }
    },
}

In [None]:
from stable_baselines3.common.env_util import make_vec_env

real_vec_env_3 = make_vec_env(flappy_env_name, n_envs = 3)
real_vec_env_1 = make_vec_env(flappy_env_name, n_envs = 1)

In [None]:
class PPOActor():
    def __init__(self, ckpt: str=None, environment: VecEnv=None, model=None):
        '''
          Requires environment to be a 1-vectorized environment

          The `ckpt` is a .zip file path that leads to the checkpoint you want
          to use for this particular actor.

          If the `model` variable is provided, then this constructor will store
          that as the internal representing model instead of loading one from the
          checkpoint path

        '''
        assert ckpt is not None or model is not None
        if model is not None:
            self.model = model
            return

        # TODO: MODIFY
        self.model = PPO.load(ckpt, env = environment)
        # End TODO


    def select_action(self, obs):
        '''
          Gives the action prediction of this particular actor
        '''
        # TODO:
        action, states = self.model.predict(obs)
        return action
        # END TODO

In [None]:
class PPOCallback(BaseCallback):
    def __init__(self, verbose=0, save_path='default', eval_env=None):
        super(PPOCallback, self).__init__(verbose)
        self.rewards = []

        self.save_freq = 120000
        self.min_reward = -np.inf
        self.actor = None
        self.eval_env = eval_env

        self.save_path = save_path

        self.eval_steps = []
        self.eval_rewards = []

    def _init_callback(self) -> None:
        pass

    def _on_training_start(self) -> None:
        """
        This method is called before the first rollout starts.
        """

        self.actor = PPOActor(model=self.model)

    def _on_rollout_start(self) -> None:
        """
        A rollout is the collection of environment interaction
        using the current policy.
        This event is triggered before collecting new samples.
        """
        pass

    def _on_rollout_end(self) -> None:
        """
        This event is triggered before updating the policy.
        """

        episode_info = self.model.ep_info_buffer
        rewards = [ep_info['r'] for ep_info in episode_info]
        mean_rewards = np.mean(rewards)

        self.rewards.append(mean_rewards)


    def _on_step(self) -> bool:
        """
        This method will be called by the model after each call to `env.step()`.

        For child callback (of an `EventCallback`), this will be called
        when the event is triggered.

        :return: If the callback returns False, training is aborted early.
        """
        if self.eval_env is None:
            return True

        if self.num_timesteps % self.save_freq == 0 and self.num_timesteps != 0:
            mean_reward = evaluate_policy(self.actor, environment=self.eval_env, num_episodes=20)
            print(f'evaluating {self.num_timesteps=}, {mean_reward=}=======')

            self.eval_steps.append(self.num_timesteps)
            self.eval_rewards.append(mean_reward)
            if mean_reward > self.min_reward:
                self.min_reward = mean_reward
                self.model.save(self.save_path)
                print(f'model saved on eval reward: {self.min_reward}')

        return True

    def _on_training_end(self) -> None:
        """
        This event is triggered before exiting the `learn()` method.
        """
        print(f'model saved on eval reward: {self.min_reward}')

        plt.plot(self.eval_steps, self.eval_rewards, c='red')
        plt.xlabel('Episodes')
        plt.ylabel('Rewards')
        plt.title('Rewards over Episodes')

        plt.show()
        plt.close()

In [None]:
ckpt_path = 'expert'
total_steps = 1500000

reseed(seed)
expert_callback = PPOCallback(save_path=ckpt_path, eval_env=real_vec_env_1)

# TODO
expert = PPO('MlpPolicy', env = real_vec_env_3, **hyperparameters)
expert.learn(total_timesteps=total_steps, callback=expert_callback)
# END TODO

In [None]:
expert_actor = PPOActor(ckpt="expert.zip", environment = real_vec_env_1)
reward_ppo = evaluate_policy(expert_actor, real_vec_env_1)
print(reward_ppo)

In [None]:
visualize(algorithm=expert_actor, video_name='expert')

## A2C

In [None]:
from stable_baselines3 import A2C
from stable_baselines3.common.vec_env import DummyVecEnv

In [None]:
class A2CActor():
    def __init__(self, ckpt: str=None, environment: VecEnv=None, model=None):
        '''
          Requires environment to be a 1-vectorized environment

          The `ckpt` is a .zip file path that leads to the checkpoint you want
          to use for this particular actor.

          If the `model` variable is provided, then this constructor will store
          that as the internal representing model instead of loading one from the
          checkpoint path

        '''
        assert ckpt is not None or model is not None
        if model is not None:
            self.model = model
            return

        # TODO: MODIFY
        self.model = A2C.load(ckpt, env = environment)
        # End TODO


    def select_action(self, obs):
        '''
          Gives the action prediction of this particular actor
        '''
        # TODO:
        action, states = self.model.predict(obs)
        return action
        # END TODO

In [None]:
class A2CCallback(BaseCallback):
    def __init__(self, verbose=0, save_path='default', eval_env=None):
        super(A2CCallback, self).__init__(verbose)
        self.rewards = []

        self.save_freq = 10000
        self.min_reward = -np.inf
        self.actor = None
        self.eval_env = eval_env

        self.save_path = save_path

        self.eval_steps = []
        self.eval_rewards = []

    def _init_callback(self) -> None:
        pass

    def _on_training_start(self) -> None:
        """
        This method is called before the first rollout starts.
        """

        self.actor = A2CActor(model=self.model)

    def _on_rollout_start(self) -> None:
        """
        A rollout is the collection of environment interaction
        using the current policy.
        This event is triggered before collecting new samples.
        """
        pass

    def _on_rollout_end(self) -> None:
        """
        This event is triggered before updating the policy.
        """

        episode_info = self.model.ep_info_buffer
        rewards = [ep_info['r'] for ep_info in episode_info]
        mean_rewards = np.mean(rewards)

        self.rewards.append(mean_rewards)


    def _on_step(self) -> bool:
        """
        This method will be called by the model after each call to `env.step()`.

        For child callback (of an `EventCallback`), this will be called
        when the event is triggered.

        :return: If the callback returns False, training is aborted early.
        """
        if self.eval_env is None:
            return True

        if self.num_timesteps % self.save_freq == 0 and self.num_timesteps != 0:
            mean_reward = evaluate_policy(self.actor, environment=self.eval_env, num_episodes=20)
            print(f'evaluating {self.num_timesteps=}, {mean_reward=}=======')

            self.eval_steps.append(self.num_timesteps)
            self.eval_rewards.append(mean_reward)
            if mean_reward > self.min_reward:
                self.min_reward = mean_reward
                self.model.save(self.save_path)
                print(f'model saved on eval reward: {self.min_reward}')

        return True

    def _on_training_end(self) -> None:
        """
        This event is triggered before exiting the `learn()` method.
        """
        print(f'model saved on eval reward: {self.min_reward}')

        plt.plot(self.eval_steps, self.eval_rewards, c='red')
        plt.xlabel('Episodes')
        plt.ylabel('Rewards')
        plt.title('Rewards over Episodes')

        plt.show()
        plt.close()

In [None]:
ckpt_path = 'expert'
total_steps = 1500000
reseed(seed)
expert_callback = A2CCallback(save_path=ckpt_path, eval_env=real_vec_env_1)

# TODO
expert = A2C('MlpPolicy', env = real_vec_env_3, **hyperparameters)
expert.learn(total_timesteps=total_steps, callback=expert_callback)
# END TODO

In [None]:
expert_actor_a2c = A2CActor(ckpt="expert.zip", environment = real_vec_env_1)
reward_a2c = evaluate_policy(expert_actor_a2c, real_vec_env_1)
print(reward_a2c)

In [None]:
visualize(algorithm=expert_actor_a2c, video_name='expert_a2c')