<a href="https://colab.research.google.com/github/sczopek/spaceInvadersAtariRl/blob/main/recordVideoSpaceInvaderAgentPlaythrough.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# stable-baselines3 = 'https://stable-baselines3.readthedocs.io/en/master/index.html'
# gymnasium = 'https://gymnasium.farama.org/'

!pip install "stable-baselines3[extra]<=2.3.1"
!pip install gymnasium



In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

  and should_run_async(code)


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Decide which version of Space Invaders to use, noFrameSkip or the default frameSkip version.  If the noFrameSkip version is used, there is an issue of recording all of the sprites.  

Some sprites won't be recorded when noFrameSkip is used.  Either the defender's laser sprite will be missing or the invaders' bomb sprites will be missing.  Using SpaceInvaders-v4 is recommended.


Special care is needed to extract a single video from the agent model.  For training purposes 8 environments are being played simultaneously (n_envs=8).  Later in the code, video from a single environment will be extracted.

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.vec_env import VecFrameStack

# There already exists an environment generator that will make and wrap atari environments correctly.
# env = make_atari_env("SpaceInvadersNoFrameskip-v4", n_envs=8, seed=0)
env = make_atari_env("SpaceInvaders-v4", n_envs=8, seed=0)
# Stack 4 frames
vec_env = VecFrameStack(env, n_stack=4)


# model = PPO.load("/content/gdrive/MyDrive/models/PPO/ppo_spaceInvadersLvl4Train_63955088_steps",
#                  verbose=1,
#                  force_reset=False)
model = PPO.load("/content/gdrive/MyDrive/models/PPO_try2/ppo_spaceInvadersFrameSkipWinningParams_178798848_steps",
                 verbose=1,
                 force_reset=False)

  th_object = th.load(file_content, map_location=device)


Modify Stable-Baselines3::evaluate_policy() function to also return a video recording along with the ep rewards and ep lengths.

In [None]:
import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import gymnasium as gym
import numpy as np

from stable_baselines3.common import type_aliases
from stable_baselines3.common.vec_env import DummyVecEnv, VecEnv, VecMonitor, is_vecenv_wrapped


def evaluate_policy_andReturnRecording(
    model: "type_aliases.PolicyPredictor",
    env: Union[gym.Env, VecEnv],
    n_eval_episodes: int = 10,
    deterministic: bool = True,
    render: bool = False,
    callback: Optional[Callable[[Dict[str, Any], Dict[str, Any]], None]] = None,
    reward_threshold: Optional[float] = None,
    return_episode_rewards: bool = False,
    warn: bool = True,
) -> Union[Tuple[float, float], Tuple[List[float], List[int]]]:
    """
    Runs policy for ``n_eval_episodes`` episodes and returns average reward.
    If a vector env is passed in, this divides the episodes to evaluate onto the
    different elements of the vector env. This static division of work is done to
    remove bias. See https://github.com/DLR-RM/stable-baselines3/issues/402 for more
    details and discussion.

    .. note::
        If environment has not been wrapped with ``Monitor`` wrapper, reward and
        episode lengths are counted as it appears with ``env.step`` calls. If
        the environment contains wrappers that modify rewards or episode lengths
        (e.g. reward scaling, early episode reset), these will affect the evaluation
        results as well. You can avoid this by wrapping environment with ``Monitor``
        wrapper before anything else.

    :param model: The RL agent you want to evaluate. This can be any object
        that implements a `predict` method, such as an RL algorithm (``BaseAlgorithm``)
        or policy (``BasePolicy``).
    :param env: The gym environment or ``VecEnv`` environment.
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param deterministic: Whether to use deterministic or stochastic actions
    :param render: Whether to render the environment or not
    :param callback: callback function to do additional checks,
        called after each step. Gets locals() and globals() passed as parameters.
    :param reward_threshold: Minimum expected reward per episode,
        this will raise an error if the performance is not met
    :param return_episode_rewards: If True, a list of rewards and episode lengths
        per episode will be returned instead of the mean.
    :param warn: If True (default), warns user about lack of a Monitor wrapper in the
        evaluation environment.
    :return: Mean reward per episode, std of reward per episode.
        Returns ([float], [int]) when ``return_episode_rewards`` is True, first
        list containing per-episode rewards and second containing per-episode lengths
        (in number of steps).
    """
    is_monitor_wrapped = False
    # Avoid circular import
    from stable_baselines3.common.monitor import Monitor

    if not isinstance(env, VecEnv):
        env = DummyVecEnv([lambda: env])  # type: ignore[list-item, return-value]

    is_monitor_wrapped = is_vecenv_wrapped(env, VecMonitor) or env.env_is_wrapped(Monitor)[0]

    if not is_monitor_wrapped and warn:
        warnings.warn(
            "Evaluation environment is not wrapped with a ``Monitor`` wrapper. "
            "This may result in reporting modified episode lengths and rewards, if other wrappers happen to modify these. "
            "Consider wrapping environment first with ``Monitor`` wrapper.",
            UserWarning,
        )

    n_envs = env.num_envs
    episode_rewards = []
    episode_lengths = []

    # my code for saving images to later make a video
    episode_img = [] # episode_img[j] holds all rgb arr images for ep j
    # for each env, record all the images until the episode completes
    # then when env[envIdx] completes ep, imgRecPerEnv[envIdx] = []
    # reseting the memory for the next ep
    imgRecPerEnv = [ [] for i in range(n_envs) ]

    episode_counts = np.zeros(n_envs, dtype="int")
    # Divides episodes among different sub environments in the vector as evenly as possible
    episode_count_targets = np.array([(n_eval_episodes + i) // n_envs for i in range(n_envs)], dtype="int")

    current_rewards = np.zeros(n_envs)
    current_lengths = np.zeros(n_envs, dtype="int")
    observations = env.reset()
    states = None
    episode_starts = np.ones((env.num_envs,), dtype=bool)
    while (episode_counts < episode_count_targets).any():
        actions, states = model.predict(
            observations,  # type: ignore[arg-type]
            state=states,
            episode_start=episode_starts,
            deterministic=deterministic,
        )
        new_observations, rewards, dones, infos = env.step(actions)
        current_rewards += rewards
        current_lengths += 1
        for i in range(n_envs):
            if episode_counts[i] < episode_count_targets[i]:
                # unpack values so that the callback can access the local variables
                reward = rewards[i]
                done = dones[i]
                info = infos[i]
                episode_starts[i] = done

                # my code to capture each img frame from each env
                img = env.get_images()[i]
                imgRecPerEnv[i].append(img)

                if callback is not None:
                    callback(locals(), globals())

                if dones[i]:
                    if is_monitor_wrapped:
                        # Atari wrapper can send a "done" signal when
                        # the agent loses a life, but it does not correspond
                        # to the true end of episode
                        if "episode" in info.keys():
                            # Do not trust "done" with episode endings.
                            # Monitor wrapper includes "episode" key in info if environment
                            # has been wrapped with it. Use those rewards instead.
                            episode_rewards.append(info["episode"]["r"])
                            episode_lengths.append(info["episode"]["l"])
                            # my code
                            # record images for completed ep
                            # then reset img record
                            episode_img.append(imgRecPerEnv[i])
                            imgRecPerEnv[i] = []
                            # Only increment at the real end of an episode
                            episode_counts[i] += 1
                    else:
                        episode_rewards.append(current_rewards[i])
                        episode_lengths.append(current_lengths[i])
                        episode_counts[i] += 1
                    current_rewards[i] = 0
                    current_lengths[i] = 0

        observations = new_observations

        if render:
            env.render()

    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    if reward_threshold is not None:
        assert mean_reward > reward_threshold, "Mean reward below threshold: " f"{mean_reward:.2f} < {reward_threshold:.2f}"
    if return_episode_rewards:
        return episode_rewards, episode_lengths, episode_img
    return mean_reward, std_reward, episode_img

  and should_run_async(code)


In [None]:
# Evaluate the model and record the videos, along with ep rew and ep lens.

import pickle

ep_reward, ep_len, imgRecPerEnv = evaluate_policy_andReturnRecording(model, vec_env, n_eval_episodes=100, return_episode_rewards=True)

f_out = open('ep_reward.pkl', 'wb')
pickle.dump(ep_reward, f_out)
f_out.close()

f_out = open('ep_len.pkl', 'wb')
pickle.dump(ep_len, f_out)
f_out.close()

# This data object can be large > 10 GB
# omit saving
# f_out = open('imgRecPerEnv.pkl', 'wb')
# pickle.dump(imgRecPerEnv, f_out)
# f_out.close()


# copy it there
!cp ep_reward.pkl ./gdrive/MyDrive/models/evalPPO/ep_reward_100_noFrameSkip_try2.pkl
!cp ep_len.pkl ./gdrive/MyDrive/models/evalPPO/ep_len_100_noFrameSkip_try2.pkl
# !cp ep_len.pkl ./gdrive/MyDrive/models/evalPPO/ep_recording_100_noFrameSkip_try2.pkl

  logger.warn(


In [None]:
type(ep_reward)
maxScoreIdx = max(enumerate(ep_reward),key=lambda x: x[1])[0]
maxScore = ep_reward[maxScoreIdx]
print("Max Score: ", maxScore)


Max Score:  24935.0


In [None]:
import imageio
import numpy as np

imageio.mimsave("space_invaders_ppo.gif", [np.array(img) for i, img in enumerate(imgRecPerEnv[maxScoreIdx])], duration=31.25) #  `duration=20 ms` (1000 ms * 1/fps)