# Atlantis - Reinforcement Learning - A2C

In [None]:
import gymnasium as gym
from stable_baselines3 import A2C, PPO
from sb3_contrib import RecurrentPPO
import numpy as np
import os

## Train and save the models with different timesteps

In [None]:
models_dir = "models/A2C"
logdir = "logs"

if not os.path.exists(models_dir):
    os.makedirs(models_dir)

if not os.path.exists(logdir):
    os.makedirs(logdir)

# Create the environment
env = gym.make('ALE/Atlantis-v5', render_mode="rgb_array", obs_type="grayscale")
env.reset()

# Initialize the model
model = RecurrentPPO("MlpLstmPolicy", env, verbose=0, device="cuda")



In [None]:
def print_stats(locals):
    print(f"Episode: {locals['iteration']}")
    print(f"Lives: {locals['infos'][0]['lives']}")
    print(f"Reward: {locals['infos'][0]['episode']['r']}")
    print(f"L: {locals['infos'][0]['episode']['l']}")
    print(f"T: {locals['infos'][0]['episode']['t']}")

In [None]:
from stable_baselines3.common.callbacks import BaseCallback


class CustomCallback(BaseCallback):
    """
    A custom callback that derives from ``BaseCallback``.

    :param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages
    """
    def __init__(self, verbose: int = 0):
        super().__init__(verbose)
        # Those variables will be accessible in the callback
        # (they are defined in the base class)
        # The RL model
        # self.model = None  # type: BaseAlgorithm
        # An alias for self.model.get_env(), the environment used for training
        # self.training_env # type: VecEnv
        # Number of time the callback was called
        # self.n_calls = 0  # type: int
        # num_timesteps = n_envs * n times env.step() was called
        # self.num_timesteps = 0  # type: int
        # local and global variables
        # self.locals = {}  # type: Dict[str, Any]
        # self.globals = {}  # type: Dict[str, Any]
        # The logger object, used to report things in the terminal
        # self.logger # type: stable_baselines3.common.logger.Logger
        # Sometimes, for event callback, it is useful
        # to have access to the parent object
        # self.parent = None  # type: Optional[BaseCallback]
        self.episodes_stats = []

    def _on_training_start(self) -> None:
        """
        This method is called before the first rollout starts.
        """
        pass

    def _on_rollout_start(self) -> None:
        """
        A rollout is the collection of environment interaction
        using the current policy.
        This event is triggered before collecting new samples.
        """
        pass

    def _on_step(self) -> bool:
        """
        This method will be called by the model after each call to `env.step()`.

        For child callback (of an `EventCallback`), this will be called
        when the event is triggered.

        :return: If the callback returns False, training is aborted early.
        """

        if self.locals['dones'][0]:
            print_stats(self.locals)

            stat = {
                "episode": self.locals['iteration'],
                "lives": self.locals['infos'][0]['lives'],
                "reward": self.locals['infos'][0]['episode']['r'],
                "l": self.locals['infos'][0]['episode']['l'],
                "t": self.locals['infos'][0]['episode']['t']
            }

            self.episodes_stats.append(stat)
        return True

    def _on_rollout_end(self) -> None:
        """
        This event is triggered before updating the policy.
        """
        pass

    def _on_training_end(self) -> None:
        """
        This event is triggered before exiting the `learn()` method.
        """
        pass

In [None]:
model

In [None]:
custom = CustomCallback()

TIME_STEPS = 100
MAX_ITERS = 1000
iters = 0

while iters < MAX_ITERS:

    iters += 1
    model.learn(total_timesteps=TIME_STEPS, reset_num_timesteps=True)
    
    # Print rewards every PRINT_EVERY timesteps
    total_reward = 0
        
    vec_env = model.get_env()
    obs = vec_env.reset()[0]
    for _ in range(TIME_STEPS):
        
        action, _ = model.predict(obs)
        # Convert action to integer if it's in array form
        if isinstance(action, np.ndarray):
            action = action.item()
            
        obs, reward, terminated, truncated, info = env.step(action)
        total_reward += reward
        if terminated:
            break
    print("Total reward at iteration {}: {}".format(iters, total_reward))
    # Save the model
    model.save(f"{models_dir}/{TIME_STEPS*iters}.zip")

env.close()


## Test the different models

In [None]:
iters = 0

# Create the environment
env = gym.make('ALE/Atlantis-v5', render_mode="human", obs_type="grayscale")
env.reset()

while iters < MAX_ITERS:

    iters += 1

    model_path = f"{models_dir}/{TIME_STEPS*iters}.zip"
    model = A2C.load(model_path, env=env)

    episodes = 5

    for ep in range(episodes):
        vec_env = model.get_env()
        obs = vec_env.reset()[0]
        done = False
        while not done:
            action, _states = model.predict(obs)
            
            # Convert action to integer if it's in array form
            if isinstance(action, np.ndarray):
                action = action.item()
            
            obs, rewards, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            env.render()

    env.close()