In [1]:
import os
import sys
import git
import pathlib

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

PROJ_ROOT_PATH = pathlib.Path(git.Repo('.', search_parent_directories=True).working_tree_dir)
PROJ_ROOT =  str(PROJ_ROOT_PATH)
if PROJ_ROOT not in sys.path:
    sys.path.append(PROJ_ROOT)

In [2]:
import numpy as np
import imageio
import ipyplot

In [3]:
import gymnasium as gym

In [None]:
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3 import A2C

In [None]:
from IPython.display import Image

In [None]:
from lib.folder_paths import makeget_logging_dir, get_exp_name_tag, deconstruct_exp_name
from lib.env_utils import AtariWrapper_NoisyFrame, AtariWrapper_Compressed, make_atari_env_Custom_VecFrameStack

In [None]:
algorithm = A2C

In [None]:
env_id = "BreakoutNoFrameskip-v4"
exp_param_type = "compress"
exp_param_value_LIST = [0.0, 0.1, 0.2, 0.3, 0.4]
eval_param_value_LIST = [0.0, 0.1, 0.2, 0.3, 0.4]
model_type_LIST = ["best", "last"]

exp_name_LIST = []
for exp_param_value in exp_param_value_LIST:
    # Get names and tags of experiment
    exp_name, exp_metaname, exp_tag = get_exp_name_tag(env_id, exp_param_type, exp_param_value)
    print(exp_name)
    exp_name_LIST.append(exp_name)

In [None]:
for exp_name in exp_name_LIST:
    # Get names and tags of experiment
    env_id, exp_param_type, exp_param_value, exp_metaname = deconstruct_exp_name(exp_name)
    
    # Get directories
    models_dir, log_dir, gif_dir, image_dir = makeget_logging_dir(exp_name)
        
    # get all subfolders in log_dir
    folder_list = [folder for folder in os.listdir(log_dir)]
    
    # list of folders with prefix "run"
    runs = sorted([i for i in folder_list for j in ["run"] if j in i])
    run_nos = [run[-1] for run in runs]
    NO_OF_RUNS = len(runs)
    
    for run_no in run_nos:
        for model_type in model_type_LIST:
            for eval_param_value in eval_param_value_LIST:
                # Make vector environment
                if exp_param_type == "compress":
                    wrapper = AtariWrapper_Compressed
                    wrapper_kwargs = {"compress_ratio":float(eval_param_value)}
                elif exp_param_type == "noisy":
                    wrapper = AtariWrapper_NoisyFrame
                    wrapper_kwargs = {"noise":float(eval_param_value)}
                else:
                    wrapper = AtariWrapper
                    wrapper_kwargs = None
                    
                trial_env = make_atari_env_Custom_VecFrameStack(env_id=env_id,
                                                        n_envs=1,
                                                        monitor_dir=None,
                                                        seed=161803,
                                                        wrapper_class=wrapper,
                                                        wrapper_kwargs=wrapper_kwargs
                                                         )
                           
                # Load RL model
                if model_type == "best":
                    model_file = f"{models_dir}/{exp_name}-run_{run_no}-best"
                else:
                    model_file = f"{models_dir}/{exp_name}-run_{run_no}"
                model = algorithm.load(model_file)
                
                # Create animation
                duration = 10 #sec
                fps = 240 #fps
                no_of_frames = duration*fps*2 #only every other frame is used for animation
                
                images = []
                obs = trial_env.reset()
                img = trial_env.render(mode="rgb_array")
                for i in range(no_of_frames):
                    images.append(img)
                    action, _ = model.predict(obs)
                    obs, reward, done, info = trial_env.step(action)
                    img = trial_env.render(mode="rgb_array")
                
                # Convert frames to animation
                eval_param_type = exp_param_type
                gif_file = f"{gif_dir}/{exp_name}-run_{run_no}--eval_{model_type}-{eval_param_type}_{eval_param_value}.gif"
                imageio.mimsave(gif_file, 
                                [np.array(img) for i, img in enumerate(images) if i%2 == 0], duration=duration)
                trial_env.close()