In [1]:
from typing import NamedTuple, List, Union, Dict, Any, Optional
import torch as th
from stable_baselines3.common.vec_env import VecNormalize
from stable_baselines3.common.buffers import RolloutBuffer
from gym import spaces
import numpy as np
from copy import deepcopy

class AuxiliaryBufferSamples(NamedTuple):
    observations: th.Tensor
    actions: th.Tensor
    old_values: th.Tensor
    old_log_prob: th.Tensor
    advantages: th.Tensor
    returns: th.Tensor
    infos: List[List[Dict[str, Any]]]

class AuxiliaryBuffer(RolloutBuffer):
    """
    Rollout buffer used in on-policy algorithms like A2C/PPO, 
        with an added tracking of information dictionaries.
    It corresponds to ``buffer_size`` transitions collected
    using the current policy.
    This experience will be discarded after the policy update.
    In order to use PPO objective, we also store the current value of each state
    and the log probability of each taken action.
    The term rollout here refers to the model-free notion and should not
    be used with the concept of rollout used in model-based RL or planning.
    Hence, it is only involved in policy and value function training but not action selection.
    :param buffer_size: Max number of element in the buffer
    :param observation_space: Observation space
    :param action_space: Action space
    :param device: PyTorch device
    :param gae_lambda: Factor for trade-off of bias vs variance for Generalized Advantage Estimator
        Equivalent to classic advantage when set to 1.
    :param gamma: Discount factor
    :param n_envs: Number of parallel environments
    """
    def __init__(self,         
            buffer_size: int,
            observation_space: spaces.Space,
            action_space: spaces.Space,
            device: Union[th.device, str] = "auto",
            gae_lambda: float = 1,
            gamma: float = 0.99,
            n_envs: int = 1):
        super().__init__(buffer_size, observation_space, action_space, device, 
            gae_lambda=gae_lambda, gamma=gamma, n_envs=n_envs)
        
        lengths = [buffer_size] * n_envs
        self.info_bins = np.cumsum(np.array(lengths))
        self.reset()

    def reset(self) -> None:
        self.infos = [[None] * self.buffer_size for __ in range(self.n_envs)]
        super().reset()

    def add(self,
            obs: np.ndarray,
            action: np.ndarray,
            reward: np.ndarray,
            episode_start: np.ndarray,
            value: th.Tensor,
            log_prob: th.Tensor,
            infos: List[Dict[str, Any]], 
            ) -> None:
        """
        :param obs: Observation
        :param action: Action
        :param reward:
        :param episode_start: Start of episode signal.
        :param value: estimated value of the current state
            following the current policy.
        :param log_prob: log probability of the action
            following the current policy.
        :param infos: the information dictionaries returned by the environment
        """
        for i, info in enumerate(infos):
            self.infos[i][self.pos] = deepcopy(info)
        super().add(obs, action, reward, episode_start, value, log_prob)

    

    def _get_samples(self, batch_inds: np.ndarray, env: Optional[VecNormalize] = None) -> AuxiliaryBufferSamples:
        rollout_samples = super()._get_samples(batch_inds, env)
        infos = []
        env_indices = np.digitize(batch_inds, bins=self.info_bins, right=False)
        last_env_indices = np.clip(env_indices - 1, 0, self.info_bins.shape[0])
        step_indices = batch_inds - self.info_bins[last_env_indices]
        for ep_idx, step_idx in zip(env_indices, step_indices):
            infos.append(self.infos[ep_idx][step_idx:])
        return AuxiliaryBufferSamples(*rollout_samples, infos=infos)

    
        


In [2]:
obs_space = spaces.Box(low=np.zeros((3,), dtype=np.float64), high = np.ones((3,), dtype=np.float64), dtype=np.float64)
action_space = spaces.Box(low=np.zeros(1, dtype=np.float64), high=np.ones(1, dtype=np.float64), dtype=np.float64)

buffer = AuxiliaryBuffer(buffer_size= 10, observation_space=obs_space, action_space = action_space, device="cpu", n_envs=3)

In [3]:
n_env = 3
for i in range(10):
    obs = np.random.randn(n_env, 3,)
    act = np.random.randn(n_env, 1,)
    reward = np.random.randn(n_env, )
    dones = np.random.randn(n_env, ) > 0.5
    value = th.randn(n_env, )
    log_prob = th.randn(n_env, )
    infos = [{"i": i, "j": j} for j in range(n_env)]
    buffer.add(obs, act, reward, dones, value, log_prob, infos)
    

In [4]:
batch = buffer.get(batch_size=None)

In [None]:
for data in batch:
    print(data.infos)
buffer.reset()

In [4]:
from typing import List, Dict, Any
import torch as th
from abc import ABC

class AuxiliaryObjective(ABC):
    def __init__(self):
        super().__init__()

    def calculate_loss(self, 
            obs: th.Tensor,
            features: th.Tensor,
            actions: th.Tensor,
            returns: th.Tensor,
            infos: List[List[Dict[str, Any]]]) -> th.Tensor:

        """
        Take as input the observations, features (embeddings), 
            actions and infos, and return the loss induced by them.
        """
        raise NotImplementedError


In [5]:
from stable_baselines3 import A2C
from typing import Any, Dict, Optional, Type, Union, Tuple
from stable_baselines3.common.policies import (
    ActorCriticPolicy, BasePolicy, 
    ActorCriticCnnPolicy)
from stable_baselines3.common.type_aliases import GymEnv, Schedule
from stable_baselines3.common.vec_env import VecEnv
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.utils import obs_as_tensor, explained_variance
from gym import spaces
from torch.nn import functional as F

    
class AuxiliaryPolicy(ActorCriticPolicy):
    def evaluate_actions(self, obs: th.Tensor, actions: th.Tensor) -> Tuple[th.Tensor, th.Tensor, Optional[th.Tensor], th.Tensor]:
        """
        Evaluate actions according to the current policy,
        given the observations.
        :param obs:
        :param actions:
        :return: estimated value, log likelihood of taking those actions
            entropy of the action distribution, and the features.
        """
        # Preprocess the observation if needed
        features = self.extract_features(obs)
        latent_pi, latent_vf = self.mlp_extractor(features)
        distribution = self._get_action_dist_from_latent(latent_pi)
        log_prob = distribution.log_prob(actions)
        values = self.value_net(latent_vf)
        entropy = distribution.entropy()
        return values, log_prob, entropy, features

class AuxiliaryCnnPolicy(ActorCriticCnnPolicy):
    def evaluate_actions(self, obs: th.Tensor, actions: th.Tensor) -> Tuple[th.Tensor, th.Tensor, Optional[th.Tensor], th.Tensor]:
        """
        Evaluate actions according to the current policy,
        given the observations.
        :param obs:
        :param actions:
        :return: estimated value, log likelihood of taking those actions
            entropy of the action distribution, and the features.
        """
        # Preprocess the observation if needed
        features = self.extract_features(obs)
        latent_pi, latent_vf = self.mlp_extractor(features)
        distribution = self._get_action_dist_from_latent(latent_pi)
        log_prob = distribution.log_prob(actions)
        values = self.value_net(latent_vf)
        entropy = distribution.entropy()
        return values, log_prob, entropy, features

class AuxiliaryA2C(A2C):
    """
    Advantage Actor Critic (A2C) with the ability to provide an Auxiliary Objective using a Callback
    Paper: https://arxiv.org/abs/1602.01783
    Code: This implementation borrows code from https://github.com/ikostrikov/pytorch-a2c-ppo-acktr-gail and
    and Stable Baselines (https://github.com/hill-a/stable-baselines)
    Introduction to A2C: https://hackernoon.com/intuitive-rl-intro-to-advantage-actor-critic-a2c-4ff545978752
    :param policy: The policy model to use (MlpPolicy, CnnPolicy, ...)
    :param env: The environment to learn from (if registered in Gym, can be str)
    :param learning_rate: The learning rate, it can be a function
        of the current progress remaining (from 1 to 0)
    :param n_steps: The number of steps to run for each environment per update
        (i.e. batch size is n_steps * n_env where n_env is number of environment copies running in parallel)
    :param gamma: Discount factor
    :param gae_lambda: Factor for trade-off of bias vs variance for Generalized Advantage Estimator
        Equivalent to classic advantage when set to 1.
    :param ent_coef: Entropy coefficient for the loss calculation
    :param vf_coef: Value function coefficient for the loss calculation
    :param max_grad_norm: The maximum value for the gradient clipping
    :param rms_prop_eps: RMSProp epsilon. It stabilizes square root computation in denominator
        of RMSProp update
    :param use_rms_prop: Whether to use RMSprop (default) or Adam as optimizer
    :param use_sde: Whether to use generalized State Dependent Exploration (gSDE)
        instead of action noise exploration (default: False)
    :param sde_sample_freq: Sample a new noise matrix every n steps when using gSDE
        Default: -1 (only sample at the beginning of the rollout)
    :param normalize_advantage: Whether to normalize or not the advantage
    :param tensorboard_log: the log location for tensorboard (if None, no logging)
    :param policy_kwargs: additional arguments to be passed to the policy on creation
    :param verbose: Verbosity level: 0 for no output, 1 for info messages (such as device or wrappers used), 2 for
        debug messages
    :param seed: Seed for the pseudo random generators
    :param device: Device (cpu, cuda, ...) on which the code should be run.
        Setting it to auto, the code will be run on the GPU if possible.
    :param _init_setup_model: Whether or not to build the network at the creation of the instance
    :param auxiliary_objective: The auxiliary objective, whose loss to add to training
    :param auxiliary_coef: Auxiliary function coefficient for the loss calculation 
    """
    policy_aliases: Dict[str, Type[BasePolicy]] = {
        "AuxiliaryMlpPolicy": AuxiliaryPolicy,
        "AuxiliaryCnnPolicy": AuxiliaryCnnPolicy,
    }
    def __init__(
            self,
            env: Union[GymEnv, str],
            policy: Union[str, Type[AuxiliaryPolicy]],
            learning_rate: Union[float, Schedule] = 7e-4,
            n_steps: int = 5,
            gamma: float = 0.99,
            gae_lambda: float = 1.0,
            ent_coef: float = 0.0,
            vf_coef: float = 0.5,
            max_grad_norm: float = 0.5,
            rms_prop_eps: float = 1e-5,
            use_rms_prop: bool = True,
            use_sde: bool = False,
            sde_sample_freq: int = -1,
            normalize_advantage: bool = False,
            tensorboard_log: Optional[str] = None,
            policy_kwargs: Optional[Dict[str, Any]] = None,
            verbose: int = 0,
            seed: Optional[int] = None,
            device: Union[th.device, str] = "auto",
            _init_setup_model: bool = True,
            auxiliary_objective: AuxiliaryObjective = None,
            auxiliary_coef: float = 0.5,
            ):

        self.auxiliary_objective = auxiliary_objective
        self.auxiliay_coef = auxiliary_coef

        super().__init__(
            policy = policy,
            env = env,
            learning_rate= learning_rate,
            n_steps= n_steps,
            gamma = gamma,
            gae_lambda= gae_lambda,
            ent_coef = ent_coef,
            vf_coef = vf_coef,
            max_grad_norm = max_grad_norm,
            rms_prop_eps = rms_prop_eps,
            use_rms_prop = use_rms_prop,
            use_sde = use_sde,
            sde_sample_freq = sde_sample_freq,
            normalize_advantage = normalize_advantage,
            tensorboard_log = tensorboard_log,
            policy_kwargs = policy_kwargs,
            verbose = verbose,
            seed = seed,
            device = device,
            _init_setup_model = False
        )

        if _init_setup_model:
            self._setup_model()

    def _setup_model(self) -> None:
        self._setup_lr_schedule()
        self.set_random_seed(self.seed)

        self.rollout_buffer = AuxiliaryBuffer(
            self.n_steps,
            self.observation_space,
            self.action_space,
            device=self.device,
            gamma=self.gamma,
            gae_lambda=self.gae_lambda,
            n_envs=self.n_envs,
        )
        self.policy = self.policy_class(  # pytype:disable=not-instantiable
            self.observation_space,
            self.action_space,
            self.lr_schedule,
            use_sde=self.use_sde,
            **self.policy_kwargs  # pytype:disable=not-instantiable
        )
        self.policy = self.policy.to(self.device)

    def collect_rollouts(
            self,
            env: VecEnv,
            callback: BaseCallback,
            rollout_buffer: AuxiliaryBuffer,
            n_rollout_steps: int,
        ) -> bool:
            """
            Collect experiences using the current policy and fill a ``AuxiliaryBuffer``.
            The term rollout here refers to the model-free notion and should not
            be used with the concept of rollout used in model-based RL or planning.
            :param env: The training environment
            :param callback: Callback that will be called at each step
                (and at the beginning and end of the rollout)
            :param rollout_buffer: Buffer to fill with rollouts
            :param n_rollout_steps: Number of experiences to collect per environment
            :return: True if function returned with at least `n_rollout_steps`
                collected, False if callback terminated rollout prematurely.
            """
            assert self._last_obs is not None, "No previous observation was provided"
            # Switch to eval mode (this affects batch norm / dropout)
            self.policy.set_training_mode(False)

            n_steps = 0
            rollout_buffer.reset()
            # Sample new weights for the state dependent exploration
            if self.use_sde:
                self.policy.reset_noise(env.num_envs)

            callback.on_rollout_start()

            while n_steps < n_rollout_steps:
                if self.use_sde and self.sde_sample_freq > 0 and n_steps % self.sde_sample_freq == 0:
                    # Sample a new noise matrix
                    self.policy.reset_noise(env.num_envs)

                with th.no_grad():
                    # Convert to pytorch tensor or to TensorDict
                    obs_tensor = obs_as_tensor(self._last_obs, self.device)
                    actions, values, log_probs = self.policy(obs_tensor)
                actions = actions.cpu().numpy()

                # Rescale and perform action
                clipped_actions = actions
                # Clip the actions to avoid out of bound error
                if isinstance(self.action_space, spaces.Box):
                    clipped_actions = np.clip(actions, self.action_space.low, self.action_space.high)

                new_obs, rewards, dones, infos = env.step(clipped_actions)

                self.num_timesteps += env.num_envs

                # Give access to local variables
                callback.update_locals(locals())
                if callback.on_step() is False:
                    return False

                self._update_info_buffer(infos)
                n_steps += 1

                if isinstance(self.action_space, spaces.Discrete):
                    # Reshape in case of discrete action
                    actions = actions.reshape(-1, 1)

                # Handle timeout by bootstraping with value function
                # see GitHub issue #633
                for idx, done in enumerate(dones):
                    if (
                        done
                        and infos[idx].get("terminal_observation") is not None
                        and infos[idx].get("TimeLimit.truncated", False)
                    ):
                        terminal_obs = self.policy.obs_to_tensor(infos[idx]["terminal_observation"])[0]
                        with th.no_grad():
                            terminal_value = self.policy.predict_values(terminal_obs)[0]
                        rewards[idx] += self.gamma * terminal_value

                rollout_buffer.add(self._last_obs, actions, rewards, self._last_episode_starts, values, log_probs, infos)
                self._last_obs = new_obs
                self._last_episode_starts = dones

            with th.no_grad():
                # Compute value for the last timestep
                values = self.policy.predict_values(obs_as_tensor(new_obs, self.device))

            rollout_buffer.compute_returns_and_advantage(last_values=values, dones=dones)

            callback.on_rollout_end()

            return True

    def train(self) -> None:
            """
            Update policy using the currently gathered
            rollout buffer (one gradient step over whole data).
            """
            # Switch to train mode (this affects batch norm / dropout)
            self.policy.set_training_mode(True)

            # Update optimizer learning rate
            self._update_learning_rate(self.policy.optimizer)

            # This will only loop once (get all data in one go)
            for rollout_data in self.rollout_buffer.get(batch_size=None):

                actions = rollout_data.actions
                if isinstance(self.action_space, spaces.Discrete):
                    # Convert discrete action from float to long
                    actions = actions.long().flatten()

                values, log_prob, entropy, features = self.policy.evaluate_actions(rollout_data.observations, actions)
                values = values.flatten()

                # Normalize advantage (not present in the original implementation)
                advantages = rollout_data.advantages
                if self.normalize_advantage:
                    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

                # Policy gradient loss
                policy_loss = -(advantages * log_prob).mean()

                # Value loss using the TD(gae_lambda) target
                value_loss = F.mse_loss(rollout_data.returns, values)

                # Entropy loss favor exploration
                if entropy is None:
                    # Approximate entropy when no analytical form
                    entropy_loss = -th.mean(-log_prob)
                else:
                    entropy_loss = -th.mean(entropy)

                # Auxiliary loss
                if self.auxiliary_objective is not None:
                    auxiliary_loss = self.auxiliary_objective.calculate_loss(
                        obs = rollout_data.observations, features=features,
                        actions = rollout_data.actions, returns = rollout_data.returns,
                        infos = rollout_data.infos
                    )
                else:
                    auxiliary_loss = None
            

                loss = policy_loss + self.ent_coef * entropy_loss + self.vf_coef * value_loss
                if auxiliary_loss is not None:
                    loss = loss + self.auxiliay_coef * auxiliary_loss

                # Optimization step
                self.policy.optimizer.zero_grad()
                loss.backward()

                # Clip grad norm
                th.nn.utils.clip_grad_norm_(self.policy.parameters(), self.max_grad_norm)
                self.policy.optimizer.step()

            explained_var = explained_variance(self.rollout_buffer.values.flatten(), self.rollout_buffer.returns.flatten())

            self._n_updates += 1
            self.logger.record("train/n_updates", self._n_updates, exclude="tensorboard")
            self.logger.record("train/explained_variance", explained_var)
            self.logger.record("train/entropy_loss", entropy_loss.item())
            self.logger.record("train/policy_loss", policy_loss.item())
            self.logger.record("train/value_loss", value_loss.item())
            if auxiliary_loss is not None:
                self.logger.record("train/auxiliary_loss", auxiliary_loss.item())
            if hasattr(self.policy, "log_std"):
                self.logger.record("train/std", th.exp(self.policy.log_std).mean().item())
    
        

In [20]:
from torch import nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
import numpy as np
from typing import Callable, Tuple, Dict
from torch.distributions import Normal


def discount_n_step(x: np.ndarray, n_step: int, gamma: float) -> np.ndarray:
    """
    Taken from RLlib: https://github.com/ray-project/ray/blob/66650cdadbbc19735d7be4bc613b9c3de30a44da/rllib/evaluation/postprocessing.py#L21
    Args:
        x: The array of rewards 
        n_step: The number of steps to look ahead and adjust.
        gamma: The discount factor.

    Examples:
        n-step=3
        Trajectory=o0 r0 d0, o1 r1 d1, o2 r2 d2, o3 r3 d3, o4 r4 d4=True o5
        gamma=0.9
        Returned trajectory:
        0: o0 [r0 + 0.9*r1 + 0.9^2*r2 + 0.9^3*r3] d3 o0'=o3
        1: o1 [r1 + 0.9*r2 + 0.9^2*r3 + 0.9^3*r4] d4 o1'=o4
        2: o2 [r2 + 0.9*r3 + 0.9^2*r4] d4 o1'=o5
        3: o3 [r3 + 0.9*r4] d4 o3'=o5
        4:
    """
    returns = np.array(x, copy=True, dtype=np.float64)
    len_ = returns.shape[0]
    # Change rewards in place.
    for i in range(len_):
        for j in range(1, n_step):
            if i + j < len_:
                returns[i] += (
                    (gamma ** j) * returns[i + j]
                )
    return returns

def discount_n_step_2d(rewards: np.ndarray, n_step: int, gamma: float) -> np.ndarray:
    gammas = gamma ** np.arange(n_step)
    returns = rewards[:, :n_step] @ gammas
    return returns

def compute_discounted_threshold(reward: float, n_step: int, gamma: float) -> float:
    """
    Calculate the n_step threshold associated with the reward, assuming that you'd get this exact reward for n_steps.
    
    Args:
        reward: The undiscounted reward to calculate threshold for.
        n_step: How many steps we'd get this reward for
        gamma: The discount for the n_step
    
    Returns:
        The threshold for the discount
    """
    shape = n_step + 1
    rewards = np.array([reward]*shape)
    return discount_n_step(rewards, n_step, gamma)[0]


def create_minmax_scaler(scale: float, max: float) -> Tuple[Callable, Callable]:
    def normalizer(value):
        value = value * scale
        value = value / max
        value = np.clip(value, 1e-10, 1.) # numerical stability
        return value

    def unnormalizer(value):
        value = value * max
        value = value / scale
        return value

    return normalizer, unnormalizer


class PredictorTail(nn.Module):
    def __init__(self, input_dim: int, net_arch: List[int]):
        super(PredictorTail, self).__init__()
        modules = []
        arch = [input_dim] + net_arch
        for idx in range(len(arch) - 1):
            modules.append(nn.Linear(arch[idx], arch[idx + 1]))
            modules.append(nn.ELU())
        self.shared_net = nn.Sequential(*modules)
        self.mean_predictor = nn.Sequential(
            nn.Linear(arch[-1], 1)
        )
        self.std_predictor = nn.Sequential(
            nn.Linear(arch[-1], 1),
            nn.Softplus()
        )

    def forward(self, data: th.Tensor) -> Normal:
        features = self.shared_net(data)
        mean, std = self.mean_predictor(features), self.std_predictor(features)
        dist = Normal(mean, std)
        return dist


class DecomposedReturnsObjective(AuxiliaryObjective, nn.Module):
    def __init__(self, keys_to_predict: List[str],
            scales: Dict[str, float], 
            maximums: Dict[str, float], 
            n_step: int, 
            gamma: float,
            feature_dim: int,
            action_dim: int,
            shared_net_arch: List[int],
            predictor_net_arch: List[int],
            loss_weights: Dict[str, float],
            device: str = "cpu"):
        """
        A unified implementation of an auxiliary loss that predicts decomposed returns.
        Args:
            keys_to_predict: The keys of the values in the information 
                dictionary (dict returned by the environment) to predict
            scales: The scalers (specified as a dictionary from keys to scaler) 
                to normalize the values with
            maximums: The maximum values of the keys to normalize with 
                (specified as a dictionary from keys to max values)
            n_step: The minimum and maximum n-step to bound the horizon with
            gamma: The discount factor to use for the calculation of returns
            feature_dim: Feature dim of the features (used to init the shared network)
            action_dim: Dimension of the actions (used to init the shared network)
            shared_net_arch: The shared network arch (specified as ints of MLP layer size)
            predictor_net_arch: The network arch used by each predictor 
                (specified as ints of MLP laye size)
            loss_weights: The weights for each predictor in the final loss calculation
                (specified as dict from key to float)
            device: The torch to keep the internal model onto
        
        """
        super().__init__()
        self.keys_to_predict = keys_to_predict
        self.scales = scales
        self.maximums = maximums
        self.n_step = n_step
        self.gamma = gamma
        self.loss_weights = loss_weights
        self.device = device
        self.create_networks(feature_dim = feature_dim, action_dim= action_dim, 
            shared_net_arch= shared_net_arch, predictor_net_arch= predictor_net_arch)
        self.create_normalizers()

    def create_normalizers(self):
        self.normalizers = {}
        self.unnormalizers = {}
        for key in self.keys_to_predict:
            max = self.maximums[key]
            scale = self.scales[key]
            max = compute_discounted_threshold(max, self.n_step, self.gamma)
            normalizer, unnormalizer = create_minmax_scaler(scale, max)
            self.normalizers[key] = normalizer
            self.unnormalizers[key] = unnormalizer

    def create_networks(self, feature_dim: int, action_dim: int, 
            shared_net_arch: List[int], predictor_net_arch: List[int]):
        arch = [feature_dim + action_dim] + shared_net_arch
        predictor_input_dim = shared_net_arch[-1] + action_dim
        modules = []
        for idx in range(len(arch) - 1):
            modules.append(nn.Linear(arch[idx], arch[idx + 1]))
            modules.append(nn.ELU())
        self.shared_net = nn.Sequential(*modules)
        self.shared_net = self.shared_net.to(device = self.device)
        for key in self.keys_to_predict:
            tail = PredictorTail(input_dim= predictor_input_dim, net_arch=predictor_net_arch)
            tail = tail.to(device = self.device)
            setattr(self, f"{key}_tail", tail)
        
    def forward(self, features: th.Tensor, actions: th.Tensor) -> Dict[str, Normal]:
        output = {}
        shared_features = self.shared_net(th.cat((features, actions), dim=1))
        shared_features = th.cat((shared_features, actions), dim=1)
        for key in self.keys_to_predict:
            tail = getattr(self, f"{key}_tail")
            pred = tail(shared_features)
            output[key] = pred
        return output

    def calculate_loss(self, obs: th.Tensor, 
            features: th.Tensor, 
            actions: th.Tensor, 
            returns: th.Tensor, 
            infos: List[List[Dict[str, Any]]]) -> th.Tensor:
        mask = th.from_numpy(np.array([len(info_samples) >= self.n_step for info_samples in infos]))
        masked_features = features[mask]
        masked_actions = actions[mask]
        pred = self.forward(features = masked_features, actions = masked_actions)
        loss = None

        for key in self.keys_to_predict:
            rewards = []
            for i, info_samples in enumerate(infos):
                if not mask[i].item():
                    continue
                rewards.append([info.get(key) for info in info_samples[:self.n_step]])
            facet_returns = discount_n_step_2d(rewards = np.array(rewards), 
                n_step=self.n_step, gamma=self.gamma)
            target = th.from_numpy(self.normalizers[key](facet_returns))
            target = target.to(device= self.device)
            target = target.reshape(-1, 1)
            facet_loss = -th.mean(pred[key].log_prob(target))
            if loss is None:
                loss = self.loss_weights[key] * facet_loss
            else:
                loss += self.loss_weights[key] * facet_loss
        
        return loss
    

In [21]:
import cc_rl

class MlpExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space, output_dim: int = 256):
        super().__init__(observation_space, output_dim)
        n_inputs = np.prod(observation_space.shape)
        self.mlp = nn.Sequential(
            nn.Flatten(),
            nn.Linear(n_inputs, output_dim),
            nn.ReLU(),
        )

    def forward(self, observations):
        return self.mlp(observations)

KEYS_TO_PREDICT = ["throughput", "latency", "loss"]
SCALES = {
    "throughput": 1/10,
    "latency": -(1/1000),
    "loss": -(1/2000)
}
MAXIMUMS = {
    "throughput": 500,
    "latency": 5,
    "loss": 1.0 
}
LOSS_WEIGHTS = {
    "throughput": 0.3,
    "latency": 0.3,
    "loss": 0.4,
}
N_STEP = 5
GAMMA = 0.95
FEATURE_DIM = 256
ACTION_DIM = 1
SHARED_NET_ARCH = [128, 128]
PREDICTOR_NET_ARCH = [64]


objective = DecomposedReturnsObjective(
    keys_to_predict= KEYS_TO_PREDICT,
    scales = SCALES,
    maximums= MAXIMUMS,
    n_step = N_STEP,
    gamma = GAMMA,
    feature_dim= FEATURE_DIM,
    action_dim= ACTION_DIM,
    shared_net_arch= SHARED_NET_ARCH,
    predictor_net_arch= PREDICTOR_NET_ARCH,
    loss_weights= LOSS_WEIGHTS,
    device= "cpu"
)


model = AuxiliaryA2C(env = 'cc-rl-v0', policy="AuxiliaryMlpPolicy", 
    auxiliary_objective=objective, policy_kwargs=dict(features_extractor_class = MlpExtractor), n_steps=20, verbose=1, auxiliary_coef=2)

Using cpu device
Creating environment from the given name 'cc-rl-v0'
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [27]:
model.learn(3000)

-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 100       |
|    ep_rew_mean        | 102       |
| time/                 |           |
|    fps                | 209       |
|    iterations         | 100       |
|    time_elapsed       | 9         |
|    total_timesteps    | 2000      |
| train/                |           |
|    auxiliary_loss     | 0.575     |
|    entropy_loss       | -1.41     |
|    explained_variance | -0.000474 |
|    learning_rate      | 0.0007    |
|    n_updates          | 399       |
|    policy_loss        | 2.74      |
|    std                | 0.997     |
|    value_loss         | 8.81      |
-------------------------------------


<__main__.AuxiliaryA2C at 0x7f4bbe2581c0>