In [None]:
import datetime
import pickle
import pprint
import os
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import ray
from ray import tune
from ray.tune.registry import register_env
from ray.tune.logger import pretty_print
from ray.rllib.agents.ddpg.ddpg import DDPGTrainer

from envs.particle_rllib.environment import ParticleEnv

from callbacks import CustomCallbacks
from logger import info_logger, results_logger

In [None]:
# Function that creates the environment
def create_env_fn(env_context=None):
    return ParticleEnv(n_listeners=n_listeners, 
                       n_landmarks=n_landmarks,
                       render_enable=render_enable)

# Function that maps a policy to its agent id
def policy_mapping_fn(agent_id):
    if agent_id.startswith('manager'):
        return "manager_policy"
    else:
        return "worker_policy"

## Parameters

In [None]:
# pretraining parameters
pretraining_n_epochs = 7

# training parameters
training_n_epochs = 100

# common parameters
training_algo = "DDPG"
env_name = "ParticleManagerListeners"
n_episodes = 70 # number of episodes in one epoch
n_steps = 25 # number of steps in one episode
learning_rate = 0.001 
tau = 0.01 # for updating the target network
gamma = 0.75 # discount factor
replay_buffer_size = 10000000
batch_size = 1024
hidden_layers = [256, 256]

# environment config
n_listeners = 1 
n_landmarks = 12
render_enable = False

# early stop training parameters
early_stop_enable = True # set to True to enable early stopping based on conditions defined below
min_n_epochs = 0.5 * training_n_epochs # minimum number of epochs to enable early stopping
min_rel_delta_reward = 0.01  # minimum acceptable variation of the reward
cut_epochs = 5 # minimun number of epochs without significant variation of the reward to stop the training loop

# other settings
savedata_dir = './savedata/'
checkpoint_dir = './checkpoints/' # checkpoints directory
checkpoint_interval = 2 # number of trainings after which a checkpoint is set
restore_checkpoint_n = 10

# Create checkpoint directory
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
# Create savedata directory
if not os.path.exists(savedata_dir):
    os.makedirs(savedata_dir)

## Environment and trainers configuration

In [None]:
# Initialize and register the environment
register_env(env_name, create_env_fn)
env = create_env_fn()

action_spaces = env.action_space
observation_spaces = env.observation_space

# According to environment implementation, there exists a different action space and observation space for each agent, 
# action_space[0] (resp. observations_space[0]) is allocated for the manager, while the others are allocated for the workers
manager_action_space = action_spaces[0]
manager_observation_space = observation_spaces[0]
worker_action_space = action_spaces[1]
worker_observation_space = observation_spaces[1]

policies = {
    "manager_policy": (None, manager_observation_space, manager_action_space, {}),
    "worker_policy": (None, worker_observation_space, worker_action_space, {})
    }

pretraining_config = {
    "lr": learning_rate,
    "tau": tau,
    "gamma": gamma,
    "horizon": n_steps,
    "actor_hiddens": hidden_layers,
    "critic_hiddens": hidden_layers,
    "buffer_size": replay_buffer_size,
    "train_batch_size": batch_size,
    "multiagent": {
        "policies": policies,
        "policy_mapping_fn": policy_mapping_fn,
        "policies_to_train": ["worker_policy"]
    },
    "callbacks": CustomCallbacks,
    "log_level": "ERROR"
}

## Simulation

In [None]:
try:
    
    curr_epoch = 1 # Current epoch
    checkpoint_counter = 0 # Counter to know when to save a new checkpoint

    # Initialize Ray
    ray.shutdown()
    ray.init()
    
    # Initialize trainer, start with pre-training configuration
    trainer = DDPGTrainer(env=env_name, 
                          config=pretraining_config)
    
    # Restore a checkpoint
    if(restore_checkpoint_n != 0):
        trainer.restore(checkpoint_dir + 'checkpoint_{n}/checkpoint-{n}'.format(n=restore_checkpoint_n))
        curr_epoch = trainer._episodes_total // n_episodes + 1

        if curr_epoch > pretraining_n_epochs:
            training_config = trainer.get_config()
            training_config['multiagent']['policies_to_train'] = list(policies.keys())
            trainer._setup(training_config)

        info_logger.info("Restored checkpoint {}".format(restore_checkpoint_n))
    else:
        info_logger.info("Initializing pre-training mode")

    countdown_enabled = False # flag for early stopping
    countdown = cut_epochs

    # Print the current configuration
    pp = pprint.PrettyPrinter(indent=4)
    print("Current configiguration\n-----------------------")
    pp.pprint(trainer.get_config())
    print("-----------------------\n")

    while curr_epoch <= training_n_epochs:

        # loop for training_n_epochs
        
        info_logger.info("Current epoch: {}".format(curr_epoch))

        # initialize iteration data saving log
        savedata_file_name = '{}-epoch.csv'.format(curr_epoch)
        savedata_file_path = savedata_dir + "/" + savedata_file_name
        savedata_columns = ["episodes_total","episode_len_mean", "worker_reward_mean", "manager_reward_mean", "prob_correct_goal"]
        savedata = pd.DataFrame(columns=savedata_columns)
    
        episode_mean_rewards = [] # mean reward of the episode in time

        # after pretraining_n_epochs epochs, reset the configuration to the training one
        if curr_epoch == pretraining_n_epochs + 1:
            training_config = trainer.get_config()
            training_config['multiagent']['policies_to_train'] = list(policies.keys())
            trainer._setup(training_config)
            info_logger.info("Switch to training mode")
            print("Training configiguration\n-----------------------")
            pp.pprint(trainer.get_config())
            print("-----------------------\n")

        curr_episode = 1 # Current episode

        while curr_episode <= n_episodes * curr_epoch:

            # loop for n_episodes

            result = trainer.train()
            curr_episode = result['episodes_total']
            episode_mean_reward = result['episode_reward_mean']
            episode_mean_len = result['episode_len_mean']
            prob_correct_goal = result['custom_metrics']['prob_correct_goal_mean']

            print(pretty_print(result))
            episode_mean_rewards.append(episode_mean_reward)
            plt.plot(episode_mean_rewards)
            plt.show()
            
            checkpoint_counter += 1
            # save a checkpoint every checkpoint_interval trains
            if(checkpoint_counter == checkpoint_interval):
                trainer.save(checkpoint_dir)
                info_logger.info("Checkpoint saved (iteration {})".format(result['training_iteration']))
                checkpoint_counter = 0

            # update the log 
            training_data = []
            training_data.append(curr_episode) # first entry is the total number of episodes
            training_data.append(episode_mean_reward) # second entry is the mean episode length
            training_data += [result['policy_reward_mean'][policy_name] 
                                   for policy_name in policies.keys()]  # other entries are mean policy rewards
            training_data.append(prob_correct_goal)
            training_data_df = pd.DataFrame([training_data], columns=savedata_columns)
            savedata = savedata.append(training_data_df, ignore_index=True)

        # print results on file
        savedata.to_csv(savedata_file_path)

        # compute epoch's results
        curr_epoch_mean_reward = result['episode_reward_mean']
        curr_epoch_prob_correct_goal = result['custom_metrics']['prob_correct_goal_mean']
        
        results_logger.info("Epoch: {}".format(curr_epoch))
        results_logger.info("\tmean reward = {}".format(curr_epoch_mean_reward))
        results_logger.info("\tprobability of correct goal = {}".format(curr_epoch_prob_correct_goal))

        # check early stopping conditions
        if(early_stop_enable):

            # 1st stopping criterion: the minimum number of epochs to enable early stop has been reached
            if(curr_epoch >= min_n_epochs):

                if(countdown_enabled):  

                    # compute the minimum acceptable variation of the reward with respect to the previous episode
                    min_delta_reward = min_rel_delta_reward * prev_epoch_mean_reward 
                    # compute the actual variation of the reward with respect to the previous episode
                    delta_reward = abs(curr_epoch_mean_reward - prev_epoch_mean_reward)

                    # check that the variation of the reward is significant
                    if(delta_reward > min_delta_reward):

                        # reset the countdown
                        countdown = cut_epochs

                    else:
                        
                        countdown -= 1

                        # 2nd stopping criterion: there has not been any significant variation for more than cut_epochs episodes
                        if(countdown == 0):
                            break

                else: 
                    # this branch is executed the first time the program reaches the minimum number of episodes only, start the countdown
                    countdown_enabled = True

                prev_epoch_mean_reward = curr_epoch_mean_reward
                
        curr_epoch +=1

finally:
    ray.shutdown()