Skip to content

Commit

Permalink
Merge pull request #32 from lucasosouza/backprop-improvements
Browse files Browse the repository at this point in the history
Makes auxiliary task independent of dynamic models
  • Loading branch information
lucasosouza committed Jun 23, 2021
2 parents eea0d0d + 16ccace commit a561eed
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 297 deletions.
175 changes: 104 additions & 71 deletions nupic/embodied/agents/curious_ppo_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@
from nupic.embodied.envs.vec_env import ShmemVecEnv as VecEnv
from nupic.embodied.utils.model_parts import flatten_dims
from nupic.embodied.utils.mpi import mpi_moments
from nupic.embodied.utils.torch import convert_log_to_numpy, to_numpy
from nupic.embodied.utils.utils import (
RunningMeanStd,
explained_variance,
get_mean_and_std,
)
from nupic.embodied.utils.torch import (
convert_log_to_numpy,
to_numpy,
to_tensor,
)


class PpoOptimizer(object):
Expand Down Expand Up @@ -90,6 +86,10 @@ class PpoOptimizer(object):
After how many steps should a video of the training be logged.
dynamics_list : [Dynamics]
List of dynamics models to use for internal reward calculation.
use_disagreement : bool
If loss should be calculated from the variance over dynamics model features.
If false, the loss is the variance over the error of state features and next
state features between the different dynamics models.
dyn_loss_weight: float
Weighting of the dynamic loss in the total loss.
Expand Down Expand Up @@ -129,6 +129,8 @@ def __init__(
debugging,
dynamics_list,
dyn_loss_weight,
auxiliary_task,
use_disagreement,
backprop_through_reward=False,
):
self.dynamics_list = dynamics_list
Expand Down Expand Up @@ -156,25 +158,25 @@ def __init__(
self.debugging = debugging
self.time_trained_so_far = 0
self.dyn_loss_weight = dyn_loss_weight
self.auxiliary_task = auxiliary_task
self.use_disagreement = use_disagreement
self.backprop_through_reward = backprop_through_reward

def start_interaction(self, env_fns, dynamics_list, nlump=1):
def start_interaction(self, env_fns, nlump=1):
"""Set up environments and initialize everything.
Parameters
----------
env_fns : [envs]
List of environments (functions), optionally with wrappers etc.
dynamics_list : [Dynamics]
List of dynamics models.
nlump : int
..
"""
# Specify parameters that should be optimized
# auxiliary task params is the same for all dynamic models
policy_param_list = [*self.policy.param_list]
dynamics_param_list = [*self.dynamics_list[0].auxiliary_task.param_list]
dynamics_param_list = [*self.auxiliary_task.param_list]
for dynamic in self.dynamics_list:
dynamics_param_list.extend(dynamic.param_list)

Expand Down Expand Up @@ -211,7 +213,7 @@ def start_interaction(self, env_fns, dynamics_list, nlump=1):
policy=self.policy,
int_rew_coeff=self.int_coeff,
ext_rew_coeff=self.ext_coeff,
dynamics_list=dynamics_list,
dynamics_list=self.dynamics_list,
)

def empty_tensor(shape):
Expand Down Expand Up @@ -370,7 +372,7 @@ def collect_rewards(self, normalize=True):
def load_returns(self, idxs):
return self.buf_advantages[idxs], self.buf_returns[idxs]

def update(self):
def learn(self):
"""Calculate losses and update parameters based on current rollout.
Returns
Expand Down Expand Up @@ -437,10 +439,14 @@ def update_step_ppo(
):
"""Regular update step in exploration by disagreement using PPO"""

features, next_features = self.update_auxiliary_task(
acs, obs, last_obs, return_next_features=True
)

self.optimizer.zero_grad()
# TODO: aux task could be skipped if not using aux task
aux_loss, aux_loss_info = self.auxiliary_loss(acs, obs, last_obs)
dyn_loss, dyn_loss_info = self.dynamics_loss() # forward
aux_loss, aux_loss_info = self.auxiliary_loss()
dyn_loss, dyn_loss_info = self.dynamics_loss(acs, features, next_features)
policy_loss, loss_info = self.ppo_loss(
acs, neglogprobs, advantages, returns
) # forward
Expand All @@ -458,17 +464,26 @@ def update_step_backprop(self, acs, obs, last_obs):
"""
Update when using backpropagation through rewards instead of PPO
Alternate between steps to the dynamics loss and the policy loss
TODO: do we need two update auxiliary tasks in this 2-stage training loop?
"""

features, next_features = self.update_auxiliary_task(
acs, obs, last_obs, return_next_features=True
)

self.dynamics_optimizer.zero_grad()
aux_loss, aux_loss_info = self.auxiliary_loss(acs, obs, last_obs)
dyn_loss, dyn_loss_info = self.dynamics_loss()
aux_loss, aux_loss_info = self.auxiliary_loss()
dyn_loss, dyn_loss_info = self.dynamics_loss(acs, features, next_features)
total_dyn_loss = aux_loss + dyn_loss
total_dyn_loss.backward()
self.dynamics_optimizer.step()

features, next_features = self.update_auxiliary_task(
acs, obs, last_obs, return_next_features=not self.use_disagreement
)

self.policy_optimizer.zero_grad()
policy_loss, loss_info = self.backprop_loss(acs, obs, last_obs)
policy_loss, loss_info = self.backprop_loss(acs, features, next_features)
policy_loss.backward()
self.policy_optimizer.step()

Expand Down Expand Up @@ -508,76 +523,88 @@ def log_post_update(self, info):

return info

def auxiliary_loss(self, acs, obs, last_obs):
# Update features of the policy network to minibatch obs and acs
# TODO: No need to update features if there is no aux task. When there is a task
# I think we also don't need to update the policy features, just policy.ac and
# policy.ob (at least as long we don't share features with policy) This saves a
# forward pass.
self.policy.update_features(obs, acs)

# Update features of the auxiliary network to minibatch obs and acs
# Using first element in dynamics list is sufficient bc all dynamics
# models have the same auxiliary task model and features
# TODO: should the feature model be independent of dynamics?
self.dynamics_list[0].auxiliary_task.update_features(obs, last_obs)
def auxiliary_loss(self):
# Get the loss and variance of the feature model
aux_loss = torch.mean(
self.dynamics_list[0].auxiliary_task.get_loss()
)
aux_loss = torch.mean(self.auxiliary_task.get_loss())
# Take variance over steps -> [feature_dim] vars -> average
# This is the average variance in a feature over time
feature_var = torch.mean(
torch.var(self.dynamics_list[0].auxiliary_task.features, [0, 1])
)
feature_var_2 = torch.mean(
torch.var(self.dynamics_list[0].auxiliary_task.features, [2])
)
feature_var = torch.mean(torch.var(self.auxiliary_task.features, [0, 1]))
feature_var_2 = torch.mean(torch.var(self.auxiliary_task.features, [2]))
return aux_loss, {
"phi/feature_var_ax01": feature_var,
"phi/feature_var_ax2": feature_var_2,
"loss/auxiliary_task": aux_loss
}

def backprop_loss(self, acs, obs, last_obs):
pred_features = []
# Get output from all dynamics models (featurewise)
# shape=[num_dynamics, num_envs, n_steps_per_seg, feature_dim]
def update_auxiliary_task(self, acs, obs, last_obs, return_next_features=True):
# Update the auxiliary task
self.auxiliary_task.update_features(acs, obs, last_obs)

# Gather the data from auxiliary task
features = self.auxiliary_task.features.detach()
next_features = None
if return_next_features:
next_features = self.auxiliary_task.next_features.detach()

return features, next_features

# Forward pass per dynamic model
# TODO: parallelize this loop! Can use Ray, torch.mp, etc
# TODO: This is what takes longer. Could we just store the disagreement in the
# buffer during rollout?
def calculate_disagreement(self, acs, features, next_features):
""" If next features is defined, return prediction error.
Otherwise returns predictions i.e. dynamics model last layer output
"""

# Get predictions
predictions = []
# Get output from all dynamics models (featurewise)
for idx, dynamics in enumerate(self.dynamics_list):
print(f"Running dynamics model: {idx+1}/{len(self.dynamics_list)}")
pred_features.append(
dynamics.predict_features_mb(obs=obs, last_obs=last_obs, acs=acs)
)
# If using disagreement, prediiction is the next state
# shape=[num_dynamics, num_envs, n_steps_per_seg, feature_dim]
prediction = dynamics.get_predictions(ac=acs, features=features)
if next_features is not None:
# If not using disagreement, prediction is the error
# shape=[num_dynamics, num_envs, n_steps_per_seg]
prediction = dynamics.get_prediction_error(prediction, next_features)
predictions.append(prediction)

# Get variance over dynamics models
disagreement = torch.var(torch.stack(pred_features), axis=0)
disagreement = torch.var(torch.stack(predictions), axis=0)
return disagreement

def calculate_rewards(self, acs, obs, last_obs):
"""Calculates the reward from the output of the dynamics models and the external
rewards.
"""

features, next_features = self.update_auxiliary_task(
acs, obs, last_obs, return_next_features=not self.use_disagreement
)

disagreement = self.calculate_disagreement(acs, features, next_features)
disagreement_reward = torch.mean(disagreement, axis=-1)
return disagreement_reward

def backprop_loss(self, acs, features, next_features):
"""
TODO: parallelize this loop! Can use Ray, torch.mp, etc
TODO: This is what takes longer. Could we just store the disagreement in the
buffer during rollout?
"""

disagreement = self.calculate_disagreement(acs, features, next_features)
# Loss is minimized, and we need to maximize variance, so using the inverse
loss = 1 / torch.mean(disagreement)
# loss = self.rollout.calculate_backprop_loss()
return loss, {"loss/backprop_through_reward_loss": loss}

def dynamics_loss(self):
def dynamics_loss(self, acs, features, next_features):
dyn_prediction_loss = 0
for idx, dynamic in enumerate(self.dynamics_list):
print(
f"Getting dynamics model prediction error: {idx+1}/{len(self.dynamics_list)}"
)
# Get the features of the observations in the dynamics model (just
# gets features from the auxiliary model)
dynamic.update_features()
# Put features into dynamics model and get loss
# (if use_disagreement just returns features, therefore the
# partial loss is used for optimizing and loging)
# disagreement.append(torch.mean(np.var(dynamic.get_loss(),axis=0)))

for idx, dynamic in enumerate(self.dynamics_list):
print(f"Getting dynamics model prediction error: {idx+1}/{len(self.dynamics_list)}") # noqa: E501
# Put features into dynamics model and get partial loss (dropout)
dyn_prediction_loss += torch.mean(dynamic.get_predictions_partial())
dyn_prediction_loss += torch.mean(dynamic.get_predictions_partial(
acs, features, next_features
))

# Divide by number of models so the number of dynamic models doesn't impact
# the total loss. Multiply by a weight that can be defined by the user
Expand Down Expand Up @@ -660,16 +687,22 @@ def step(self):
"""
# Collect rollout
print("Collecting Rollout")
self.rollout.collect_rollout()
print("--------------------collect rollout------------------------------------")
acs, obs, last_obs = self.rollout.collect_rollout()

# Calculate reward or loss
update_info = self.update()
# convert_to_numpy
convert_log_to_numpy(update_info)
print("--------------------calculate reward-----------------------------------")
disagreement_reward = self.calculate_rewards(acs, obs, last_obs)
self.rollout.update_buffer_pre_step(disagreement_reward)

# Update stepcount
print("--------------------gradient steps-------------------------------------")
update_info = self.learn()

print("---------------------log statistics-----------------------------------")
convert_log_to_numpy(update_info)
self.step_count = self.start_step + self.rollout.step_count

print("-------------------------done------------------------------------------")

# Return the update statistics for logging
return {"update": update_info}

Expand Down
70 changes: 8 additions & 62 deletions nupic/embodied/envs/rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from nupic.embodied.utils.torch import env_output_to_tensor, to_numpy


class Rollout(object):
"""Collect rollouts of experiences in the environments and process them.
Expand Down Expand Up @@ -170,10 +171,7 @@ def collect_rollout(self):
self.ep_infos_new = []
for _ in range(self.nsteps):
self.rollout_step()
print("--------------------calculate reward-----------------------------------")
self.calculate_reward()
print("-------------------------done------------------------------------------")
self.update_info()
return self.buf_acs, self.buf_obs, self.buf_obs_last

def load_from_buffer(self, idxs):
"""
Expand All @@ -187,64 +185,7 @@ def load_from_buffer(self, idxs):
last_obs = self.buf_obs_last[idxs]
return acs, rews, neglogprobs, obs, last_obs

def calculate_backprop_loss(self):
"""
Calculates the reward from the output of teh dynamics models and the external
rewards.
"""

pred_features = []
# Get output from all dynamics models (featurewise)
# shape=[num_dynamics, num_envs, n_steps_per_seg, feature_dim]

# Forward pass per dynamic model
# TODO: parallelize this loop! Can use Ray, torch.mp, etc
# TODO: update buffer to minibatch content?
# This is what takes longer. Could we just store the disagreement in the buffer
# during rollout?
for idx, dynamics in enumerate(self.dynamics_list):
print(f"Running dynamics model: {idx+1}/{len(self.dynamics_list)}")
pred_features.append(
dynamics.predict_features(
obs=self.buf_obs, last_obs=self.buf_obs_last, acs=self.buf_acs
)
)

# Get variance over dynamics models
# shape pre var = n_dynamic_models, n_envs, n_steps_per_seg, feature_dim
# shape post var = n_envs, n_steps_per_seg, feature_dim
disagreement = torch.var(torch.stack(pred_features), axis=0)

# Loss is minimized, and we need to maximize variance, so using the inverse
loss = 1 / torch.mean(disagreement)
return loss

def calculate_reward(self):
"""Calculates the reward from the output of the dynamics models and the external
rewards.
"""
net_output = []
# Get output from all dynamics models (featurewise)
# shape=[num_dynamics, num_envs, n_steps_per_seg, feature_dim]

# If use_disagreement=False this will be the prediction error and the reward
# is then the variance over prediction error.
for idx, dynamics in enumerate(self.dynamics_list):
print(f"Running dynamics model: {idx+1}/{len(self.dynamics_list)}")
net_output.append(
dynamics.predict_features(
obs=self.buf_obs, last_obs=self.buf_obs_last, acs=self.buf_acs
)
)
# Get variance over dynamics models
# shape=[n_envs, n_steps_per_seg, feature_dim]
net_output = torch.stack(net_output)
disagreement = torch.var(net_output, axis=0)
# Get reward by mean along features
# shape=[n_envs, n_steps_per_seg]
disagreement_reward = torch.mean(disagreement, axis=-1)
def update_buffer_rewards(self, disagreement_reward):

# Fill reward buffer with the new rewards
self.buf_rewards[:] = self.reward_function(
Expand Down Expand Up @@ -305,6 +246,11 @@ def rollout_step(self):
)
self.buf_vpred_last[sli] = next_vpreds

def update_buffer_pre_step(self, disagreement_reward):
"""All post step update buffer activities"""
self.update_buffer_rewards(disagreement_reward)
self.update_info()

def update_info(self):
"""If there is episode info (like stats at the end of an episode) save them."""
all_ep_infos = self.ep_infos_new
Expand Down
Loading

0 comments on commit a561eed

Please sign in to comment.