From 9dd16a42df83cf554a20042bc21f1143d7ffe311 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 28 Mar 2022 16:00:55 +0100 Subject: [PATCH 01/13] API updates to fix A2C algorithm --- src/algorithms/a2c.py | 145 ++++++++++++++---- src/apps/qlearning_on_mock.py | 2 +- src/examples/a2c_three_columns.py | 2 +- .../nstep_semi_grad_sarsa_three_columns.py | 2 +- src/examples/qlearning_three_columns.py | 2 +- src/examples/semi_gradient_sarsa.py | 2 +- src/parallel/processes_manager.py | 12 +- src/spaces/__init__.py | 2 +- src/spaces/multiprocess_env.py | 57 ++++++- src/spaces/time_step.py | 27 ++++ .../pytorch_trainer.py} | 39 ++--- src/{algorithms => trainers}/trainer.py | 0 tests/test_multiprocess_env.py | 68 +++++++- tests/test_trainer.py | 2 +- 14 files changed, 296 insertions(+), 66 deletions(-) rename src/{algorithms/pytorch_multi_process_trainer.py => trainers/pytorch_trainer.py} (88%) rename src/{algorithms => trainers}/trainer.py (100%) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 9c2fe82..64f780d 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -11,6 +11,9 @@ from src.utils.episode_info import EpisodeInfo from src.utils.function_wraps import time_func_wrapper from src.utils.replay_buffer import ReplayBuffer +from src.spaces.time_step import VectorTimeStep +from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig +from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder Env = TypeVar("Env") Optimizer = TypeVar("Optimizer") @@ -65,6 +68,7 @@ class A2CConfig(object): device: str = 'cpu' a2cnet: nn.Module = None save_model_path: Path = None + optimizer_config: PyTorchOptimizerConfig = None class A2C(Generic[Optimizer]): @@ -134,6 +138,7 @@ def from_path(cls, config: A2CConfig, path: Path): def __init__(self, config: A2CConfig): self.config: A2CConfig = config + self.optimizer: Optimizer = None self.name = "A2C" @property @@ -222,6 +227,61 @@ def play(self, env: Env, criteria: Criteria): if time_step.done: time_step = env.reset() + def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: + logpas = torch.stack(logpas).squeeze() + entropies = torch.stack(entropies).squeeze() + values = torch.stack(values).squeeze() + + T = len(rewards) + discounts = np.logspace(0, T, num=T, base=self.config.gamma, endpoint=False) + rewards = np.array(rewards).squeeze() + returns = np.array([[np.sum(discounts[:T - t] * rewards[t:, w]) for t in range(T)] + for w in range(n_workers)]) + + np_values = values.data.numpy() + tau_discounts = np.logspace(0, T - 1, num=T - 1, base=self.config.gamma * self.config.tau, endpoint=False) + advs = rewards[:-1] + self.config.gamma * np_values[1:] - np_values[:-1] + gaes = np.array([[np.sum(tau_discounts[:T - 1 - t] * advs[t:, w]) for t in range(T - 1)] + for w in range(n_workers)]) + discounted_gaes = discounts[:-1] * gaes + + values = values[:-1, ...].view(-1).unsqueeze(1) + logpas = logpas.view(-1).unsqueeze(1) + entropies = entropies.view(-1).unsqueeze(1) + returns = torch.FloatTensor(returns.T[:-1]).view(-1).unsqueeze(1) + discounted_gaes = torch.FloatTensor(discounted_gaes.T).view(-1).unsqueeze(1) + + T -= 1 + T *= n_workers + + assert returns.size() == (T, 1) + assert values.size() == (T, 1) + assert logpas.size() == (T, 1) + assert entropies.size() == (T, 1) + + value_error = returns.detach() - values + value_loss = value_error.pow(2).mul(0.5).mean() + policy_loss = -(discounted_gaes.detach() * logpas).mean() + entropy_loss = -entropies.mean() + loss = self.policy_loss_weight * policy_loss + self.value_loss_weight * value_loss + self.entropy_loss_weight * entropy_loss + + self.optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_norm_(self.parameters(), + self.ac_model_max_grad_norm) + self.optimizer.step() + + def actions_before_training_begins(self, env: Env, **options) -> None: + + # build the optimizer we need in order to train the model + self.optimizer = pytorch_optimizer_builder(opt_type=self.config.optimizer_config.optimizer_type, + model_params=self.parameters(), + **self.config.optimizer_config.as_dict()) + + def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: + self.set_train_mode() + self.optimizer.zero_grad() + def actions_after_training(self) -> None: """Any actions the agent needs to perform after training @@ -233,6 +293,10 @@ def actions_after_training(self) -> None: if self.config.save_model_path is not None: self.save_model(path=self.config.save_model_path) + def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> None: + + self.optimize_model(env.n_workers()) + def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: """Train the algorithm on the episode @@ -278,46 +342,30 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: buffer = ReplayBuffer(options["buffer_size"]) - time_step = env.reset() - state = torch.from_numpy(time_step.observation.to_numpy()).float() - - # represent the state function values - values = [] + time_step: VectorTimeStep = env.reset() + states = time_step.stack_observations() #torch.from_numpy(time_step.observation.to_numpy()).float() # represent the probabilities under the # policy logprobs = [] - + entropies = [] + rewards = [] + values = [] for itr in range(self.config.n_iterations_per_episode): - # policy and critic values - policy, value = self.a2c_net(state) - - # - logits = policy.view(-1) - - values.append(value) + # make a full pass on the model + action, is_exploratory, logprob, entropy, value = self._full_pass(state=states) - # choose the action this should be - action = self.config.action_sampler(logits) - - action_applied = action - if isinstance(action, torch.Tensor): - action_applied = env.get_action(action.item()) - - # the log probabilities of the policy - logprob_ = policy.view(-1)[action] - logprobs.append(logprob_) - - time_step = env.step(action_applied) + # step with the given actions + time_step: VectorTimeStep = env.step(action) episode_score += time_step.reward total_distortion += time_step.info["total_distortion"] - state = torch.from_numpy(time_step.observation.to_numpy()).float() + state = time_step.stack_observations() - buffer.add(state=state, action=action, reward=time_step.reward, next_state=time_step.observation, - done=time_step.done) + buffer.add(state=state, action=action, reward=time_step.reward, + next_state=time_step.observation, done=time_step.done) if time_step.done: break @@ -332,3 +380,44 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: return episode_info + def _full_pass(self, state) -> tuple: + + # policy and critic values + policy, value = self.a2c_net(state) + + logits = policy.view(-1) + + ##dist = torch.distributions.Categorical(logits=logits) + ##action = dist.sample() + + # choose the action. Typically this will be Categorical + # but we leave it open for the application + action_sampler_dist = self.config.action_sampler(logits) + action = action_sampler_dist.sample() + + # the log probabilities of the policy + logprob = action_sampler_dist.log_prob(action).unsqueeze(-1)#policy.view(-1)[action] + entropy = action_sampler_dist.entropy().unsqueeze(-1) + #logprobs.append(logprob_) + + #logpa = dist.log_prob(action).unsqueeze(-1) + #entropy = dist.entropy().unsqueeze(-1) + + action = action.item() if len(action) == 1 else action.data.numpy() + is_exploratory = action != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) + return action, is_exploratory, logprob, entropy, value + + def _interaction_step(self, states, env: Env): + actions, is_exploratory, logpas, entropies, values = self.ac_model.full_pass(states) + new_states, rewards, is_terminals, _ = env.step(actions) + + self.logpas.append(logpas); + self.entropies.append(entropies) + self.rewards.append(rewards); + self.values.append(values) + + self.running_reward += rewards + self.running_timestep += 1 + self.running_exploration += is_exploratory[:, np.newaxis].astype(np.int) + + return new_states, is_terminals \ No newline at end of file diff --git a/src/apps/qlearning_on_mock.py b/src/apps/qlearning_on_mock.py index ad66fa3..eb673f5 100644 --- a/src/apps/qlearning_on_mock.py +++ b/src/apps/qlearning_on_mock.py @@ -2,7 +2,7 @@ import numpy as np from src.algorithms.q_learning import QLearning, QLearnConfig -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.maths.string_distance_calculator import StringDistanceType from src.spaces.actions import ActionSuppress, ActionIdentity, ActionStringGeneralize from src.spaces.discrete_state_environment import Environment, EnvConfig diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py index f60f9ee..33bce39 100644 --- a/src/examples/a2c_three_columns.py +++ b/src/examples/a2c_three_columns.py @@ -16,7 +16,7 @@ from src.maths.numeric_distance_type import NumericDistanceType from src.maths.string_distance_calculator import StringDistanceType from src.utils.reward_manager import RewardManager -from src.algorithms.pytorch_multi_process_trainer import PyTorchMultiProcessTrainer, PyTorchMultiProcessTrainerConfig, OptimizerConfig +from src.trainers.pytorch_trainer import PyTorchMultiProcessTrainer, PyTorchMultiProcessTrainerConfig, OptimizerConfig from src.utils import INFO diff --git a/src/examples/nstep_semi_grad_sarsa_three_columns.py b/src/examples/nstep_semi_grad_sarsa_three_columns.py index 8509857..0f52396 100644 --- a/src/examples/nstep_semi_grad_sarsa_three_columns.py +++ b/src/examples/nstep_semi_grad_sarsa_three_columns.py @@ -4,7 +4,7 @@ from src.algorithms.n_step_semi_gradient_sarsa import SARSAnConfig, SARSAn from src.algorithms.q_estimator import QEstimator -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.datasets.datasets_loaders import MockSubjectsLoader from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize diff --git a/src/examples/qlearning_three_columns.py b/src/examples/qlearning_three_columns.py index c765837..d11fe51 100644 --- a/src/examples/qlearning_three_columns.py +++ b/src/examples/qlearning_three_columns.py @@ -7,7 +7,7 @@ from pathlib import Path from src.algorithms.q_learning import QLearning, QLearnConfig -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.datasets.datasets_loaders import MockSubjectsLoader from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize diff --git a/src/examples/semi_gradient_sarsa.py b/src/examples/semi_gradient_sarsa.py index e28c558..21698da 100644 --- a/src/examples/semi_gradient_sarsa.py +++ b/src/examples/semi_gradient_sarsa.py @@ -9,7 +9,7 @@ from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.policies.epsilon_greedy_policy import EpsilonDecayOption from src.algorithms.epsilon_greedy_q_estimator import EpsilonGreedyQEstimatorConfig, EpsilonGreedyQEstimator from src.utils.distortion_calculator import DistortionCalculationType, DistortionCalculator diff --git a/src/parallel/processes_manager.py b/src/parallel/processes_manager.py index 67ef927..3407b7e 100644 --- a/src/parallel/processes_manager.py +++ b/src/parallel/processes_manager.py @@ -20,6 +20,16 @@ def __init__(self, n_procs: int) -> None: self.n_procs = n_procs self.processes = [] + def __len__(self) -> int: + """The number of workers handled by this + instance + + Returns + ------- + + """ + return len(self.processes) + def create_and_start(self, target: Callable, *args) -> None: for i in range(self.n_procs): p = mp.Process(target=target, args=args) @@ -41,4 +51,4 @@ def terminate(self) -> None: def join_and_terminate(self): self.join() - self.terminate() \ No newline at end of file + self.terminate() diff --git a/src/spaces/__init__.py b/src/spaces/__init__.py index a2934e7..24b86ec 100644 --- a/src/spaces/__init__.py +++ b/src/spaces/__init__.py @@ -1,2 +1,2 @@ from src.spaces.time_step import TimeStep, StepType, VectorTimeStep -from src.spaces.multiprocess_env import MultiprocessEnv \ No newline at end of file +from src.spaces.multiprocess_env import MultiprocessEnv diff --git a/src/spaces/multiprocess_env.py b/src/spaces/multiprocess_env.py index d743033..8d4900e 100644 --- a/src/spaces/multiprocess_env.py +++ b/src/spaces/multiprocess_env.py @@ -1,6 +1,9 @@ """Module multiprocess_env. Specifies -a vectorsized environment where each instance -of the environment is run independently +a vectorized environment where each instance +of the environment is run independently. The implementation +of the environment is taken from the book +Grokking Deep Reinforcement Learning Algorithms +by Manning publications """ @@ -24,6 +27,16 @@ def __init__(self, env_builder: Callable, env_args: dict, n_workers: int): self.workers = TorchProcsHandler(n_procs=n_workers) self.pipes = [mp.Pipe() for _ in range(self.n_workers)] + def __len__(self) -> int: + """The number of workers handled by this + instance + + Returns + ------- + + """ + return len(self.workers) + def make(self): """Create the workers @@ -73,12 +86,33 @@ def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: pipe_end.close() break - def reset(self) -> TimeStep: - pass + def reset(self, rank=None, **kwargs) -> VectorTimeStep: + + time_step = VectorTimeStep() + if rank is not None: + parent_end, _ = self.pipes[rank] + self._send_msg(('reset', {}), rank) + o = parent_end.recv() + time_step.append(0) + return time_step + + # if not reset for a specific worker + # then all workers should reset + self._broadcast_msg(('reset', kwargs)) + + # collect all the timesteps from the + # workers + for rank in range(self.n_workers): + parent_end, _ = self.pipes[rank] + process_time_step = parent_end.recv() + time_step.append(process_time_step) + + return time_step def step(self, actions: ActionVector) -> VectorTimeStep: - assert len(actions) == self.n_workers + if len(actions) != self.n_workers: + raise ValueError("Number of actions is not equal to the number of workers") # send the messages to the workers [self._send_msg(('step', {'action': actions[rank]}), rank) for rank in range(self.n_workers)] @@ -101,6 +135,9 @@ def step(self, actions: ActionVector) -> VectorTimeStep: """ return time_step + def close(self, **kwargs): + self._close(**kwargs) + def _close(self, **kwargs): self._broadcast_msg(('close', kwargs)) @@ -121,4 +158,14 @@ def _send_msg(self, msg: Any, rank: int): parent_end.send(msg) def _broadcast_msg(self, msg): + """Broadcast the message to all workers + + Parameters + ---------- + msg + + Returns + ------- + + """ [parent_end.send(msg) for parent_end, _ in self.pipes] diff --git a/src/spaces/time_step.py b/src/spaces/time_step.py index 2d2ecea..1341f4d 100644 --- a/src/spaces/time_step.py +++ b/src/spaces/time_step.py @@ -6,6 +6,7 @@ import copy import enum from typing import NamedTuple, Generic, Optional, TypeVar +import numpy as np _Reward = TypeVar('_Reward') _Discount = TypeVar('_Discount') @@ -90,8 +91,34 @@ class VectorTimeStep(object): def __init__(self): self.time_steps = [] + def __len__(self) -> int: + """Returns the number of time-steps + + Returns + ------- + + """ + return len(self.time_steps) + + def __getitem__(self, idx) -> TimeStep: + """Returns the idx-th time step in this + VectorTimeStep + + Parameters + ---------- + idx: The index of the time step to return + + Returns + ------- + + """ + return self.time_steps[idx] + def append(self, time_step: TimeStep) -> None: self.time_steps.append(time_step) + def stack_observations(self): + return np.vstack([time_step.observation for time_step, in self.time_steps]) + diff --git a/src/algorithms/pytorch_multi_process_trainer.py b/src/trainers/pytorch_trainer.py similarity index 88% rename from src/algorithms/pytorch_multi_process_trainer.py rename to src/trainers/pytorch_trainer.py index 7a55acf..4ec3f0a 100644 --- a/src/algorithms/pytorch_multi_process_trainer.py +++ b/src/trainers/pytorch_trainer.py @@ -12,7 +12,7 @@ from src.utils import INFO from src.utils.function_wraps import time_func, time_func_wrapper -from src.utils.episode_info import EpisodeInfo +from src.utils.episode_info import EpisodeInfo, from src.maths.optimizer_type import OptimizerType from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder from src.utils import INFO @@ -22,27 +22,7 @@ EnvLoader = TypeVar('EnvLoader') -@dataclass(init=True, repr=True) -class OptimizerConfig(object): - """Configuration class for the optimizer - """ - optimizer_type: OptimizerType = OptimizerType.ADAM - optimizer_learning_rate: float = 0.01 - optimizer_eps = 1.0e-5 - optimizer_betas: tuple = (0.9, 0.999) - optimizer_weight_decay: float = 0 - optimizer_amsgrad: bool = False - optimizer_maximize = False - - def as_dict(self) -> dict: - return {"optimizer_type": self.optimizer_type, - "learning_rate": self.optimizer_learning_rate, - "eps": self.optimizer_eps, - "betas": self.optimizer_betas, - "weight_decay": self.optimizer_weight_decay, - "amsgrad": self.optimizer_amsgrad, - "maximize": self.optimizer_maximize} @dataclass(init=True, repr=True) @@ -116,7 +96,7 @@ class PyTorchMultiProcessTrainer(object): """ - def __init__(self, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> None: + def __init__(self, env: Env, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> None: """Constructor. Initialize a trainer by passing the training environment instance the agent to train and configuration dictionary @@ -128,6 +108,7 @@ def __init__(self, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> No """ + self.env: Env = env self.agent = agent self.configuration = config # monitor performance @@ -166,7 +147,7 @@ def actions_before_training(self) -> None: None """ - self.agent.share_memory() + self.env.reset() def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: """Perform any actions necessary before the training begins @@ -218,6 +199,18 @@ def train(self): # create the processes by attaching the worker + for episode in range(0, self.configuration["n_episodes"]): + print("{0} On episode {1}/{2}".format(INFO, episode, self.configuration["n_episodes"])) + + self.actions_before_episode_begins(self.env, episode) + + # train for a number of iterations + episode_info: EpisodeInfo = self.agent.on_episode(self.env, episode) + + print("{0} Episode {1} finished in {2} secs".format(INFO, episode, episode_info.total_execution_time)) + print("{0} Episode score={1}, episode total avg distortion {2}".format(INFO, episode_info.episode_score, + episode_info.total_distortion / episode_info.episode_itrs)) + process_handler = TorchProcsHandler(n_procs=self.configuration.n_procs) for p in range(self.configuration.n_procs): diff --git a/src/algorithms/trainer.py b/src/trainers/trainer.py similarity index 100% rename from src/algorithms/trainer.py rename to src/trainers/trainer.py diff --git a/tests/test_multiprocess_env.py b/tests/test_multiprocess_env.py index c6ae747..776c50e 100644 --- a/tests/test_multiprocess_env.py +++ b/tests/test_multiprocess_env.py @@ -1,13 +1,77 @@ import unittest import pytest -from src.spaces import MultiprocessEnv +from src.spaces import MultiprocessEnv, TimeStep, StepType + +class DummyEnv(object): + + def __init__(self, **options): + pass + + def close(self, **kwargs): + pass + + def step(self, **kwargs) -> TimeStep: + print("Action executed={0}".format(kwargs["action"])) + time_step = TimeStep(step_type=StepType.FIRST if kwargs["action"] == 1 else StepType.LAST, + reward=0.0, observation=1.0, info={}, discount=0.0) + return time_step + + def reset(self, **kwargs) -> TimeStep: + time_step = TimeStep(step_type=StepType.FIRST, + reward=0.0, observation=1.0, info={}, discount=0.0) + return time_step class TestMultiprocessEnv(unittest.TestCase): + @staticmethod + def make_environment(options): + return DummyEnv(**options) + def test_make(self): - pass + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + try: + multiproc_env.make() + multiproc_env.close() + except Exception as e: + print("Test failed due to excpetion {0}".format(str(e))) + multiproc_env.close() + + def test_step_fail(self): + + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + with pytest.raises(ValueError) as e: + multiproc_env.make() + multiproc_env.step([]) + + multiproc_env.close() + self.assertEqual("Number of actions is not equal to the number of workers", str(e.value)) + + def test_step(self): + + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + multiproc_env.make() + time_step = multiproc_env.step([1, 2]) + multiproc_env.close() + self.assertEqual(len(multiproc_env), len(time_step)) + self.assertEqual(StepType.FIRST, time_step[0].step_type) + self.assertEqual(StepType.LAST, time_step[1].step_type) + + def test_reset(self): + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + multiproc_env.make() + time_step = multiproc_env.reset() + multiproc_env.close() + self.assertEqual(len(multiproc_env), len(time_step)) if __name__ == '__main__': diff --git a/tests/test_trainer.py b/tests/test_trainer.py index 4ec772a..c7c2dc1 100644 --- a/tests/test_trainer.py +++ b/tests/test_trainer.py @@ -4,7 +4,7 @@ import unittest import pytest -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.algorithms.n_step_semi_gradient_sarsa import SARSAnConfig, SARSAn from src.spaces.tiled_environment import TiledEnv From 2962f52f3c686a01dec5ff2aaea50b14aa8ad71a Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 30 Mar 2022 11:38:31 +0100 Subject: [PATCH 02/13] Bug fixes with A2C --- src/algorithms/a2c.py | 231 ++++++++++++++++++++++++------ src/examples/a2c_three_columns.py | 40 ++++-- src/networks/a2c_networks.py | 6 +- src/spaces/action_space.py | 3 +- src/spaces/multiprocess_env.py | 15 +- src/spaces/state.py | 13 +- src/spaces/tiled_environment.py | 11 +- src/spaces/time_step.py | 2 +- src/trainers/pytorch_trainer.py | 46 +++--- 9 files changed, 272 insertions(+), 95 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 64f780d..5bab8c3 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -1,5 +1,5 @@ import numpy as np -from typing import TypeVar, Generic, Any, Callable +from typing import TypeVar, Generic, Any, Callable, List from pathlib import Path import torch import torch.nn as nn @@ -14,6 +14,7 @@ from src.spaces.time_step import VectorTimeStep from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder +from src.maths.loss_functions import mse Env = TypeVar("Env") Optimizer = TypeVar("Optimizer") @@ -53,6 +54,85 @@ def forward(self, x): return pol_out, val_out +def create_discounts_array(end: int, base: float, start=0, endpoint=False): + """ + + Parameters + ---------- + end + base + start + endpoint + + Returns + ------- + + """ + return np.logspace(start, end, num=end, base=base, endpoint=endpoint) + + +def discounted_returns(rewards: List[float], discounts: List[float], gamma: float, n_workers: int = 1) -> np.array: + """Calculate the discounted returns from the episode rewards + + Parameters + ---------- + rewards: The list of rewards + gamma: The discount factor + n_workers: The number of workers + + Returns + ------- + + """ + + # T + total_time = len(rewards) + + # Return numbers spaced evenly on a log scale. + # In linear space, the sequence starts at base ** start + # (base to the power of start) and ends with base ** stop (see endpoint below). + #discounts = np.logspace(0, total_time, num=total_time, base=gamma, endpoint=False) + + # The return is the sum of discounted rewards from step until the + # final step T + returns = np.array([[np.sum(discounts[: total_time - t] * rewards[t:, w]) for t in range(total_time)] for w in range(n_workers)]) + return returns + + +def generalized_advantage_estimate(rewards: List[float], + values: List[float], + gamma: float, tau: float, + n_workers: int) -> np.array: + """Computes an estimate of the advantage funcion + + Parameters + ---------- + rewards + values + gamma + tau + n_workers: The number of workers + + Returns + ------- + + """ + + # T + total_time = len(rewards) + + # (gamma*tau)^t + tau_discounts = np.logspace(0, total_time - 1, num=total_time-1, + base=gamma*tau, endpoint=False) + + # create TD errors: R_t + gamma*V_{t+1} - V_t for t=0 to T + advantages = rewards[:-1] + gamma * values[1:] - values[: -1] + + # create the GAES by multiplying the tau discounts times the TD errors + gaes = np.array([[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:]) for t in range(total_time)] for w in range(n_workers)]) + return gaes + + @dataclass(init=True, repr=True) class A2CConfig(object): """Configuration for A2C algorithm @@ -61,9 +141,15 @@ class A2CConfig(object): gamma: float = 0.99 tau: float = 1.2 + beta: float = 1.0 + policy_loss_weight: float = 1.0 + value_loss_weight: float = 1.0 + max_grad_norm: float = 1.0 n_iterations_per_episode: int = 100 + n_workers: int = 1 action_sampler: Callable = None - loss_function: LossFunction = None + value_function: LossFunction = None + policy_loss: LossFunction = None batch_size: int = 0 device: str = 'cpu' a2cnet: nn.Module = None @@ -71,8 +157,22 @@ class A2CConfig(object): optimizer_config: PyTorchOptimizerConfig = None +@dataclass(init=True, repr=True) +class _InteractionResult(object): + logpas: float + entropies = None + rewards = None + values = None + + class A2C(Generic[Optimizer]): + @staticmethod + def default_action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: + + action_dist = torch.distributions.Categorical(logits=logits) + return action_dist + @staticmethod def update_parameters(optimizer: Optimizer, episode_info: EpisodeInfo, *, config: A2CConfig): """Update the parameters @@ -90,7 +190,7 @@ def update_parameters(optimizer: Optimizer, episode_info: EpisodeInfo, *, config """ # unroll the batch notice the flip we go in reverse - rewards = rewards = torch.Tensor(episode_info.info["replay_buffer"].to_numpy("reward")).flip(dims=(0,)).view(-1) + rewards = rewards = torch.Tensor(episode_info.info["replay_buffer"].to_ndarray("reward")).flip(dims=(0,)).view(-1) logprobs = torch.stack(episode_info.info["logprobs"]).flip(dims=(0,)).view(-1) values = torch.stack(episode_info.info["values"]).flip(dims=(0,)).view(-1) @@ -148,17 +248,6 @@ def a2c_net(self) -> nn.Module: def __call__(self, x: torch.Tensor): return self.a2c_net(x) - def share_memory(self) -> None: - """Instruct the underlying network to - set up what is needed to share memory - - Returns - ------- - - None - """ - self.a2c_net.share_memory() - def parameters(self) -> Any: """The parameters of the underlying model @@ -216,7 +305,7 @@ def play(self, env: Env, criteria: Criteria): time_step = env.reset() while criteria.continue_itrs(): - state = time_step.observation.to_numpy() + state = time_step.observation.to_ndarray() state = torch.from_numpy(state).float() logits, values = self(state) @@ -228,20 +317,62 @@ def play(self, env: Env, criteria: Criteria): time_step = env.reset() def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: + + discounts = create_discounts_array(end=len(rewards), base=self.config.gamma, start=0, endpoint=False) + + # get the discounted returns + disreturns = discounted_returns(rewards, discounts, gamma=self.config.gamma, n_workers=self.config.n_workers) + + # get the gaes + gaes = generalized_advantage_estimate(rewards=rewards, gamma=self.config.gamma, values=values, + tau=self.config.tau, n_workers=self.config.n_workers) + + # discounted gaes + discounted_gaes = discounts[:-1] * gaes + + # the loss function for the critic network + value_loss_function = mse(returns=disreturns, values=values) + policy_loss = - (discounted_gaes * logpas).mean() + + # compute a total loss function to minimize + if self.config.beta is not None: + + # add entropy loss + entropy_loss = -entropies.mean() + + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + \ + self.config.beta * entropy_loss + else: + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + + self.optimizer.zero_grad() + loss.backward() + + # clip the grad if needed + torch.nn.utils.clip_grad_norm_(self.parameters(), + self.config.max_grad_norm) + self.optimizer.step() + + + """ logpas = torch.stack(logpas).squeeze() entropies = torch.stack(entropies).squeeze() values = torch.stack(values).squeeze() - T = len(rewards) - discounts = np.logspace(0, T, num=T, base=self.config.gamma, endpoint=False) + # T + total_episode_time = len(rewards) + + discounts = np.logspace(0, total_episode_time, num=total_episode_time, base=self.config.gamma, endpoint=False) rewards = np.array(rewards).squeeze() - returns = np.array([[np.sum(discounts[:T - t] * rewards[t:, w]) for t in range(T)] + returns = np.array([[np.sum(discounts[:T - t] * rewards[t:, w]) for t in range(total_episode_time)] for w in range(n_workers)]) np_values = values.data.numpy() - tau_discounts = np.logspace(0, T - 1, num=T - 1, base=self.config.gamma * self.config.tau, endpoint=False) + tau_discounts = np.logspace(0, total_episode_time - 1, num=total_episode_time - 1, base=self.config.gamma * self.config.tau, endpoint=False) advs = rewards[:-1] + self.config.gamma * np_values[1:] - np_values[:-1] - gaes = np.array([[np.sum(tau_discounts[:T - 1 - t] * advs[t:, w]) for t in range(T - 1)] + gaes = np.array([[np.sum(tau_discounts[:total_episode_time - 1 - t] * advs[t:, w]) for t in range(total_episode_time - 1)] for w in range(n_workers)]) discounted_gaes = discounts[:-1] * gaes @@ -251,25 +382,19 @@ def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: returns = torch.FloatTensor(returns.T[:-1]).view(-1).unsqueeze(1) discounted_gaes = torch.FloatTensor(discounted_gaes.T).view(-1).unsqueeze(1) - T -= 1 - T *= n_workers + total_episode_time -= 1 + total_episode_time *= n_workers - assert returns.size() == (T, 1) - assert values.size() == (T, 1) - assert logpas.size() == (T, 1) - assert entropies.size() == (T, 1) + assert returns.size() == (total_episode_time, 1) + assert values.size() == (total_episode_time, 1) + assert logpas.size() == (total_episode_time, 1) + assert entropies.size() == (total_episode_time, 1) value_error = returns.detach() - values value_loss = value_error.pow(2).mul(0.5).mean() policy_loss = -(discounted_gaes.detach() * logpas).mean() entropy_loss = -entropies.mean() - loss = self.policy_loss_weight * policy_loss + self.value_loss_weight * value_loss + self.entropy_loss_weight * entropy_loss - - self.optimizer.zero_grad() - loss.backward() - torch.nn.utils.clip_grad_norm_(self.parameters(), - self.ac_model_max_grad_norm) - self.optimizer.step() + """ def actions_before_training_begins(self, env: Env, **options) -> None: @@ -278,6 +403,9 @@ def actions_before_training_begins(self, env: Env, **options) -> None: model_params=self.parameters(), **self.config.optimizer_config.as_dict()) + if self.config.action_sampler is None: + self.config.action_sampler = A2C.default_action_sampler + def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: self.set_train_mode() self.optimizer.zero_grad() @@ -340,7 +468,7 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: episode_iterations = 0 total_distortion = 0 - buffer = ReplayBuffer(options["buffer_size"]) + #buffer = ReplayBuffer(options["buffer_size"]) time_step: VectorTimeStep = env.reset() states = time_step.stack_observations() #torch.from_numpy(time_step.observation.to_numpy()).float() @@ -353,6 +481,11 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: values = [] for itr in range(self.config.n_iterations_per_episode): + # interact with the environemnt + interaction_result = self._interaction_step(states=states, env=env) + + + """ # make a full pass on the model action, is_exploratory, logprob, entropy, value = self._full_pass(state=states) @@ -366,6 +499,10 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: buffer.add(state=state, action=action, reward=time_step.reward, next_state=time_step.observation, done=time_step.done) + + """ + + # check if we finished ma if time_step.done: break @@ -382,10 +519,16 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: def _full_pass(self, state) -> tuple: - # policy and critic values - policy, value = self.a2c_net(state) + if not isinstance(state, torch.Tensor): + torch_state = torch.Tensor(state) + else: + torch_state = state - logits = policy.view(-1) + # policy and critic values. The policy + # values are assumed raw + policy, value = self.a2c_net(torch_state) + + logits = policy #.view(-1) ##dist = torch.distributions.Categorical(logits=logits) ##action = dist.sample() @@ -407,10 +550,15 @@ def _full_pass(self, state) -> tuple: is_exploratory = action != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) return action, is_exploratory, logprob, entropy, value - def _interaction_step(self, states, env: Env): - actions, is_exploratory, logpas, entropies, values = self.ac_model.full_pass(states) - new_states, rewards, is_terminals, _ = env.step(actions) + def _interaction_step(self, states, env: Env) -> _InteractionResult: + actions, is_exploratory, logpas, entropies, values = self._full_pass(states) + time_step = env.step(actions) + #new_states, rewards, is_terminals, _ = env.step(actions) + interaction_result = _InteractionResult(logpas=logpas, values=values, + entropies=entropies, rewards=rewards) + return interaction_result + """ self.logpas.append(logpas); self.entropies.append(entropies) self.rewards.append(rewards); @@ -420,4 +568,5 @@ def _interaction_step(self, states, env: Env): self.running_timestep += 1 self.running_exploration += is_exploratory[:, np.newaxis].astype(np.int) - return new_states, is_terminals \ No newline at end of file + return new_states, is_terminals + """ \ No newline at end of file diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py index 33bce39..4017934 100644 --- a/src/examples/a2c_three_columns.py +++ b/src/examples/a2c_three_columns.py @@ -6,17 +6,20 @@ from src.algorithms.a2c import A2C, A2CConfig from src.networks.a2c_networks import A2CNetSimpleLinear from src.utils.serial_hierarchy import SerialHierarchy +from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData from src.spaces.tiled_environment import TiledEnv, TiledEnvConfig, Layer from src.spaces.discrete_state_environment import DiscreteStateEnvironment -from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData +from src.spaces.multiprocess_env import MultiprocessEnv from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize from src.policies.epsilon_greedy_policy import EpsilonDecayOption from src.utils.distortion_calculator import DistortionCalculationType, DistortionCalculator from src.maths.numeric_distance_type import NumericDistanceType from src.maths.string_distance_calculator import StringDistanceType +from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig +from src.maths.optimizer_type import OptimizerType from src.utils.reward_manager import RewardManager -from src.trainers.pytorch_trainer import PyTorchMultiProcessTrainer, PyTorchMultiProcessTrainerConfig, OptimizerConfig +from src.trainers.pytorch_trainer import PyTorchTrainer, PyTorchTrainerConfig from src.utils import INFO @@ -98,7 +101,7 @@ def load_mock_subjects() -> MockSubjectsLoader: return ds -def load_discrete_env() -> DiscreteStateEnvironment: +def load_discrete_env(options) -> DiscreteStateEnvironment: mock_ds = load_mock_subjects() @@ -118,7 +121,7 @@ def load_discrete_env() -> DiscreteStateEnvironment: ActionNumericBinGeneralize(column_name="salary", generalization_table=bins), ActionIdentity(column_name="diagnosis")) - action_space.shuffle() + action_space.shuffle(seed=options["rank"] + 42) discrete_env = DiscreteStateEnvironment.from_options(data_set=mock_ds, action_space=action_space, @@ -152,11 +155,10 @@ def load_discrete_env() -> DiscreteStateEnvironment: return tiled_env -def action_sampler(logits) -> torch.Tensor: +def action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: action_dist = torch.distributions.Categorical(logits=logits) - action = action_dist.sample() - return action + return action_dist if __name__ == '__main__': @@ -164,24 +166,34 @@ def action_sampler(logits) -> torch.Tensor: # set the seed for random engine random.seed(42) + # this the A2C network net = A2CNetSimpleLinear(n_columns=3, n_actions=ACTION_SPACE_SIZE) # agent configuration a2c_config = A2CConfig(action_sampler=action_sampler, n_iterations_per_episode=N_ITRS_PER_EPISODE, - a2cnet=net, save_model_path=Path("./a2c_three_columns_output/")) + a2cnet=net, save_model_path=Path("./a2c_three_columns_output/"), + optimizer_config=PyTorchOptimizerConfig(optimizer_type=OptimizerType.ADAM)) # create the agent agent = A2C(a2c_config) # create a trainer to train the Qlearning agent - configuration = PyTorchMultiProcessTrainerConfig(n_episodes=N_EPISODES, - env_loader=load_discrete_env, - optimizer_config=OptimizerConfig()) + configuration = PyTorchTrainerConfig(n_episodes=N_EPISODES) + + env = MultiprocessEnv(env_builder=load_discrete_env, env_args={}, n_workers=2) + + try: + + env.make() + trainer = PyTorchTrainer(env=env, agent=agent, config=configuration) - trainer = PyTorchMultiProcessTrainer(agent=agent, config=configuration) + # train the agent + trainer.train() - # train the agent - trainer.train() + except Exception as e: + print("An excpetion was thrown...{0}".format(str(e))) + finally: + env.close() # avg_rewards = trainer.avg_rewards() """ diff --git a/src/networks/a2c_networks.py b/src/networks/a2c_networks.py index 836ed7f..6ffad93 100644 --- a/src/networks/a2c_networks.py +++ b/src/networks/a2c_networks.py @@ -23,6 +23,8 @@ def __init__(self, n_columns: int, n_actions: int): """ super(A2CNetSimpleLinear, self).__init__() + self.n_columns = n_columns + self.n_actions = n_actions self.linear_l1 = nn.Linear(in_features=n_columns, out_features=n_actions) self.actor = nn.Linear(in_features=n_actions, out_features=n_actions) self.critic = nn.Linear(n_actions, 1) @@ -41,7 +43,7 @@ def forward(self, x: torch.Tensor) -> tuple: # activate y = F.relu(self.linear_l1(x)) - actor = F.log_softmax(self.actor(y), dim=0) # C - critic = torch.tanh(self.critic(y)) # D + actor = self.actor(y) #F.log_softmax(self.actor(y), dim=0) # C + critic = self.critic(y) #torch.tanh(self.critic(y)) # D return actor, critic diff --git a/src/spaces/action_space.py b/src/spaces/action_space.py index 8f082bb..c516dc9 100644 --- a/src/spaces/action_space.py +++ b/src/spaces/action_space.py @@ -69,7 +69,7 @@ def __setitem__(self, idx: int, action: ActionBase) -> None: def __len__(self) -> int: return len(self.actions) - def shuffle(self) -> None: + def shuffle(self, seed: int = 42) -> None: """Shuffles the action list Returns @@ -77,6 +77,7 @@ def shuffle(self) -> None: None """ + random.seed(seed) random.shuffle(self.actions) # fix the ids of the actions to diff --git a/src/spaces/multiprocess_env.py b/src/spaces/multiprocess_env.py index 8d4900e..ecd1503 100644 --- a/src/spaces/multiprocess_env.py +++ b/src/spaces/multiprocess_env.py @@ -26,6 +26,7 @@ def __init__(self, env_builder: Callable, env_args: dict, n_workers: int): self.n_workers = n_workers self.workers = TorchProcsHandler(n_procs=n_workers) self.pipes = [mp.Pipe() for _ in range(self.n_workers)] + self.is_made: bool = False def __len__(self) -> int: """The number of workers handled by this @@ -46,10 +47,14 @@ def make(self): """ for w in range(self.n_workers): + env_args = self.env_args + env_args["rank"] = w self.workers.create_process_and_start(target=self.work, args=(w, self.env_builder, - self.env_args, + env_args, self.pipes[w][1])) + self.is_made = True + def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: """The worker function @@ -88,12 +93,15 @@ def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: def reset(self, rank=None, **kwargs) -> VectorTimeStep: + if not self.is_made: + raise ValueError("Environment is not created. Did you call make()?") + time_step = VectorTimeStep() if rank is not None: parent_end, _ = self.pipes[rank] self._send_msg(('reset', {}), rank) o = parent_end.recv() - time_step.append(0) + time_step.append(o) return time_step # if not reset for a specific worker @@ -111,6 +119,9 @@ def reset(self, rank=None, **kwargs) -> VectorTimeStep: def step(self, actions: ActionVector) -> VectorTimeStep: + if not self.is_made: + raise ValueError("Environment is not created. Did you call make()?") + if len(actions) != self.n_workers: raise ValueError("Number of actions is not equal to the number of workers") diff --git a/src/spaces/state.py b/src/spaces/state.py index a6f2d55..f7ffeb9 100644 --- a/src/spaces/state.py +++ b/src/spaces/state.py @@ -100,7 +100,7 @@ def __getitem__(self, name: str) -> float: """ return self.column_distortions[name] - def to_numpy(self) -> np.array: + def to_ndarray(self) -> np.array: """Returns the self.column_distortions values as numpy array Returns @@ -108,7 +108,14 @@ def to_numpy(self) -> np.array: np.array """ + return np.array(self.to_list()) - vals = list(self.column_distortions.values()) - return np.array(vals) + def to_list(self) -> list: + """Returns the self.column_distortions values as numpy array + + Returns + ------- + A list of floats + """ + return list(self.column_distortions.values()) diff --git a/src/spaces/tiled_environment.py b/src/spaces/tiled_environment.py index d5538c7..8aa43d7 100644 --- a/src/spaces/tiled_environment.py +++ b/src/spaces/tiled_environment.py @@ -3,9 +3,11 @@ """ import copy +import numpy as np from typing import TypeVar, List, Any from dataclasses import dataclass +import numpy import numpy as np from src.extern.tile_coding import IHT, tiles @@ -350,7 +352,10 @@ def n_states(self) -> int: def config(self) -> Config: return self.env.config - def step(self, action: ActionBase, **options) -> TimeStep: + def close(self, **options) -> None: + pass + + def step(self, action: Any, **options) -> TimeStep: """Execute the action in the environment and return a new state for observation @@ -365,6 +370,10 @@ def step(self, action: ActionBase, **options) -> TimeStep: """ + # choose the action + if isinstance(action, int) or isinstance(action, numpy.int64): + action = self.get_action(aidx=action) + raw_time_step = self.env.step(action) # a state wrapper to communicate diff --git a/src/spaces/time_step.py b/src/spaces/time_step.py index 1341f4d..65d94e3 100644 --- a/src/spaces/time_step.py +++ b/src/spaces/time_step.py @@ -118,7 +118,7 @@ def append(self, time_step: TimeStep) -> None: self.time_steps.append(time_step) def stack_observations(self): - return np.vstack([time_step.observation for time_step, in self.time_steps]) + return np.vstack([time_step.observation.to_list() for time_step in self.time_steps]) diff --git a/src/trainers/pytorch_trainer.py b/src/trainers/pytorch_trainer.py index 4ec3f0a..e64b05e 100644 --- a/src/trainers/pytorch_trainer.py +++ b/src/trainers/pytorch_trainer.py @@ -12,7 +12,7 @@ from src.utils import INFO from src.utils.function_wraps import time_func, time_func_wrapper -from src.utils.episode_info import EpisodeInfo, +from src.utils.episode_info import EpisodeInfo from src.maths.optimizer_type import OptimizerType from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder from src.utils import INFO @@ -22,20 +22,13 @@ EnvLoader = TypeVar('EnvLoader') - - - @dataclass(init=True, repr=True) -class PyTorchMultiProcessTrainerConfig(object): +class PyTorchTrainerConfig(object): """Configuration for PyTorchMultiProcessTrainer """ n_procs: int = 1 n_episodes: int = 100 - optimizer_config: OptimizerConfig = OptimizerConfig() - env_loader: EnvLoader = None - buffer_size: int = 1000 - master_process: int = 0 @dataclass(init=True, repr=True) @@ -90,13 +83,13 @@ def worker(worker_idx: int, worker_model: nn.Module, params: dir): params["update_params_functor"](optimizer, episode_info, config=worker_model.config) #**params) -class PyTorchMultiProcessTrainer(object): +class PyTorchTrainer(object): """The class PyTorchMultiProcessTrainer. Trainer for multiprocessing with PyTorch """ - def __init__(self, env: Env, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> None: + def __init__(self, env: Env, agent: Agent, config: PyTorchTrainerConfig) -> None: """Constructor. Initialize a trainer by passing the training environment instance the agent to train and configuration dictionary @@ -147,7 +140,14 @@ def actions_before_training(self) -> None: None """ + if self.env is None: + raise ValueError("Environment has not been specified") + + if self.agent is None: + raise ValueError("Agent has not been specified") + self.env.reset() + self.agent.actions_before_training_begins(self.env) def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: """Perform any actions necessary before the training begins @@ -199,8 +199,8 @@ def train(self): # create the processes by attaching the worker - for episode in range(0, self.configuration["n_episodes"]): - print("{0} On episode {1}/{2}".format(INFO, episode, self.configuration["n_episodes"])) + for episode in range(0, self.configuration.n_episodes): + print("{0} On episode {1}/{2}".format(INFO, episode, self.configuration.n_episodes)) self.actions_before_episode_begins(self.env, episode) @@ -208,23 +208,9 @@ def train(self): episode_info: EpisodeInfo = self.agent.on_episode(self.env, episode) print("{0} Episode {1} finished in {2} secs".format(INFO, episode, episode_info.total_execution_time)) - print("{0} Episode score={1}, episode total avg distortion {2}".format(INFO, episode_info.episode_score, - episode_info.total_distortion / episode_info.episode_itrs)) - - process_handler = TorchProcsHandler(n_procs=self.configuration.n_procs) - - for p in range(self.configuration.n_procs): - # worker_idx: int, worker_model: nn.Module, params: dir - process_handler.create_process_and_start(target=worker, - args=(p, self.agent, - {"optimizer_config": self.configuration.optimizer_config.as_dict(), - "n_episodes": self.configuration.n_episodes, - "update_params_functor": self.agent.update_parameters, - "env_loader": self.configuration.env_loader, - "buffer_size": self.configuration.buffer_size, - "master_process": self.configuration.master_process})) - - process_handler.join_and_terminate() + print("{0} Episode score={1}, episode total " + "avg distortion {2}".format(INFO, episode_info.episode_score, + episode_info.total_distortion / episode_info.episode_itrs)) self.actions_after_training() From 23bcd3930bd2fa18eccdd59750629b8a36ccdd27 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 30 Mar 2022 11:39:15 +0100 Subject: [PATCH 03/13] Refactor optimizer configuration for PyTorch --- src/maths/pytorch_optimizer_config.py | 30 +++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 src/maths/pytorch_optimizer_config.py diff --git a/src/maths/pytorch_optimizer_config.py b/src/maths/pytorch_optimizer_config.py new file mode 100644 index 0000000..358d2f3 --- /dev/null +++ b/src/maths/pytorch_optimizer_config.py @@ -0,0 +1,30 @@ +"""Module pytorch_optimizer_configuration. Specifies a +data class for configuring PyTorch optimizers + +""" + +from dataclasses import dataclass +from src.maths.optimizer_type import OptimizerType + + +@dataclass(init=True, repr=True) +class PyTorchOptimizerConfig(object): + """Configuration class for the optimizer + + """ + optimizer_type: OptimizerType = OptimizerType.ADAM + optimizer_learning_rate: float = 0.01 + optimizer_eps = 1.0e-5 + optimizer_betas: tuple = (0.9, 0.999) + optimizer_weight_decay: float = 0 + optimizer_amsgrad: bool = False + optimizer_maximize = False + + def as_dict(self) -> dict: + return {"optimizer_type": self.optimizer_type, + "learning_rate": self.optimizer_learning_rate, + "eps": self.optimizer_eps, + "betas": self.optimizer_betas, + "weight_decay": self.optimizer_weight_decay, + "amsgrad": self.optimizer_amsgrad, + "maximize": self.optimizer_maximize} \ No newline at end of file From 3bf7dbd8e5693714d0b77e9fa5b3a274a7d49209 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 30 Mar 2022 13:15:23 +0100 Subject: [PATCH 04/13] Fix bugs with A2C API --- src/algorithms/a2c.py | 35 ++++++++++++++++++++++--------- src/examples/a2c_three_columns.py | 2 +- src/spaces/multiprocess_env.py | 8 +++---- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 5bab8c3..f74957a 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -165,6 +165,12 @@ class _InteractionResult(object): values = None +@dataclass(init=True, repr=True) +class _FullPassResult(object): + logprobs: torch.Tensor + values: torch.Tensor + + class A2C(Generic[Optimizer]): @staticmethod @@ -468,8 +474,6 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: episode_iterations = 0 total_distortion = 0 - #buffer = ReplayBuffer(options["buffer_size"]) - time_step: VectorTimeStep = env.reset() states = time_step.stack_observations() #torch.from_numpy(time_step.observation.to_numpy()).float() @@ -517,7 +521,7 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: return episode_info - def _full_pass(self, state) -> tuple: + def _full_pass(self, state) -> _FullPassResult: if not isinstance(state, torch.Tensor): torch_state = torch.Tensor(state) @@ -526,29 +530,40 @@ def _full_pass(self, state) -> tuple: # policy and critic values. The policy # values are assumed raw - policy, value = self.a2c_net(torch_state) + logits, value = self.a2c_net(torch_state) + + # log_softmax may not sum up to one + # and can be negative as well + # get the logprobs for all batches? + logprobs = F.log_softmax(logits.view(-1), dim=0) - logits = policy #.view(-1) + #logits = policy #.view(-1) ##dist = torch.distributions.Categorical(logits=logits) ##action = dist.sample() # choose the action. Typically this will be Categorical # but we leave it open for the application + # We don't call logits.view(-1) so that we get + # as many actions as in the logits rows. + # Each logit row is expected to corrspond to an + # environment worker action_sampler_dist = self.config.action_sampler(logits) - action = action_sampler_dist.sample() + actions = action_sampler_dist.sample() # the log probabilities of the policy - logprob = action_sampler_dist.log_prob(action).unsqueeze(-1)#policy.view(-1)[action] + #logprob = action_sampler_dist.log_prob(action).unsqueeze(-1)#policy.view(-1)[action] entropy = action_sampler_dist.entropy().unsqueeze(-1) #logprobs.append(logprob_) #logpa = dist.log_prob(action).unsqueeze(-1) #entropy = dist.entropy().unsqueeze(-1) - action = action.item() if len(action) == 1 else action.data.numpy() - is_exploratory = action != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) - return action, is_exploratory, logprob, entropy, value + #actions = actions.item() if len(action) == 1 else action.data.numpy() + is_exploratory = actions != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) + + full_pass_result = _FullPassResult(logprobs=logprobs, actions=actions) + return full_pass_result #action, is_exploratory, logprob, entropy, value def _interaction_step(self, states, env: Env) -> _InteractionResult: actions, is_exploratory, logpas, entropies, values = self._full_pass(states) diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py index 4017934..99a7993 100644 --- a/src/examples/a2c_three_columns.py +++ b/src/examples/a2c_three_columns.py @@ -184,7 +184,7 @@ def action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: try: - env.make() + env.make(agent=agent) trainer = PyTorchTrainer(env=env, agent=agent, config=configuration) # train the agent diff --git a/src/spaces/multiprocess_env.py b/src/spaces/multiprocess_env.py index ecd1503..140d872 100644 --- a/src/spaces/multiprocess_env.py +++ b/src/spaces/multiprocess_env.py @@ -14,7 +14,7 @@ from src.spaces import TimeStep, VectorTimeStep from src.parallel import TorchProcsHandler - +Agent = TypeVar('Agent') ActionVector = TypeVar('ActionVector') @@ -38,7 +38,7 @@ def __len__(self) -> int: """ return len(self.workers) - def make(self): + def make(self, agent: Agent): """Create the workers Returns @@ -50,12 +50,12 @@ def make(self): env_args = self.env_args env_args["rank"] = w self.workers.create_process_and_start(target=self.work, args=(w, self.env_builder, - env_args, + env_args, agent, self.pipes[w][1])) self.is_made = True - def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: + def work(self, rank, env_builder: Callable, env_args: dict, agent: Agent, pipe_end) -> None: """The worker function Parameters From edd1a04b9e6092fccf3ce81665a0f442321212a4 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 1 Apr 2022 11:37:42 +0100 Subject: [PATCH 05/13] Update A2C implementation --- src/algorithms/a2c.py | 89 +++++++++++++--------------------- src/spaces/multiprocess_env.py | 11 ++++- 2 files changed, 42 insertions(+), 58 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index f74957a..c0a885e 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -24,11 +24,11 @@ TimeStep = TypeVar("TimeStep") Criteria = TypeVar('Criteria') - +""" class A2CNetBase(nn.Module): - """Base class for A2C networks + Base class for A2C networks - """ + def __init__(self, architecture): super(A2CNetBase, self).__init__() @@ -52,6 +52,7 @@ def forward(self, x): pol_out = self.policy_net(x) val_out = self.value_net(x) return pol_out, val_out +""" def create_discounts_array(end: int, base: float, start=0, endpoint=False): @@ -71,7 +72,7 @@ def create_discounts_array(end: int, base: float, start=0, endpoint=False): return np.logspace(start, end, num=end, base=base, endpoint=endpoint) -def discounted_returns(rewards: List[float], discounts: List[float], gamma: float, n_workers: int = 1) -> np.array: +def calculate_discounted_returns(rewards: List[float], discounts: List[float], gamma: float, n_workers: int = 1) -> np.array: """Calculate the discounted returns from the episode rewards Parameters @@ -169,6 +170,8 @@ class _InteractionResult(object): class _FullPassResult(object): logprobs: torch.Tensor values: torch.Tensor + actions: torch.Tensor + entropies: torch.Tensor class A2C(Generic[Optimizer]): @@ -327,7 +330,7 @@ def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: discounts = create_discounts_array(end=len(rewards), base=self.config.gamma, start=0, endpoint=False) # get the discounted returns - disreturns = discounted_returns(rewards, discounts, gamma=self.config.gamma, n_workers=self.config.n_workers) + discounted_returns = calculate_discounted_returns(rewards, discounts, gamma=self.config.gamma, n_workers=self.config.n_workers) # get the gaes gaes = generalized_advantage_estimate(rewards=rewards, gamma=self.config.gamma, values=values, @@ -337,7 +340,7 @@ def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: discounted_gaes = discounts[:-1] * gaes # the loss function for the critic network - value_loss_function = mse(returns=disreturns, values=values) + value_loss_function = mse(returns=discounted_returns, values=values) policy_loss = - (discounted_gaes * logpas).mean() # compute a total loss function to minimize @@ -361,47 +364,6 @@ def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: self.config.max_grad_norm) self.optimizer.step() - - """ - logpas = torch.stack(logpas).squeeze() - entropies = torch.stack(entropies).squeeze() - values = torch.stack(values).squeeze() - - # T - total_episode_time = len(rewards) - - discounts = np.logspace(0, total_episode_time, num=total_episode_time, base=self.config.gamma, endpoint=False) - rewards = np.array(rewards).squeeze() - returns = np.array([[np.sum(discounts[:T - t] * rewards[t:, w]) for t in range(total_episode_time)] - for w in range(n_workers)]) - - np_values = values.data.numpy() - tau_discounts = np.logspace(0, total_episode_time - 1, num=total_episode_time - 1, base=self.config.gamma * self.config.tau, endpoint=False) - advs = rewards[:-1] + self.config.gamma * np_values[1:] - np_values[:-1] - gaes = np.array([[np.sum(tau_discounts[:total_episode_time - 1 - t] * advs[t:, w]) for t in range(total_episode_time - 1)] - for w in range(n_workers)]) - discounted_gaes = discounts[:-1] * gaes - - values = values[:-1, ...].view(-1).unsqueeze(1) - logpas = logpas.view(-1).unsqueeze(1) - entropies = entropies.view(-1).unsqueeze(1) - returns = torch.FloatTensor(returns.T[:-1]).view(-1).unsqueeze(1) - discounted_gaes = torch.FloatTensor(discounted_gaes.T).view(-1).unsqueeze(1) - - total_episode_time -= 1 - total_episode_time *= n_workers - - assert returns.size() == (total_episode_time, 1) - assert values.size() == (total_episode_time, 1) - assert logpas.size() == (total_episode_time, 1) - assert entropies.size() == (total_episode_time, 1) - - value_error = returns.detach() - values - value_loss = value_error.pow(2).mul(0.5).mean() - policy_loss = -(discounted_gaes.detach() * logpas).mean() - entropy_loss = -entropies.mean() - """ - def actions_before_training_begins(self, env: Env, **options) -> None: # build the optimizer we need in order to train the model @@ -422,6 +384,7 @@ def actions_after_training(self) -> None: Returns ------- + None """ if self.config.save_model_path is not None: @@ -476,6 +439,7 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: time_step: VectorTimeStep = env.reset() states = time_step.stack_observations() #torch.from_numpy(time_step.observation.to_numpy()).float() + actions = self._full_pass(states) # represent the probabilities under the # policy @@ -483,10 +447,20 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: entropies = [] rewards = [] values = [] + + buffer = ReplayBuffer(buffer_size=self.config.n_iterations_per_episode) for itr in range(self.config.n_iterations_per_episode): - # interact with the environemnt - interaction_result = self._interaction_step(states=states, env=env) + time_step: VectorTimeStep = env.step(actions) + next_states = time_step.stack_observations() + full_pass: _FullPassResult = self._network_pass(next_states) + + # if we finished the episode we go to the optimization + # else step on the envifonment + + + + """ @@ -520,8 +494,7 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: "values": values}) return episode_info - - def _full_pass(self, state) -> _FullPassResult: + def _network_pass(self, state) -> _FullPassResult: if not isinstance(state, torch.Tensor): torch_state = torch.Tensor(state) @@ -530,7 +503,7 @@ def _full_pass(self, state) -> _FullPassResult: # policy and critic values. The policy # values are assumed raw - logits, value = self.a2c_net(torch_state) + logits, values = self.a2c_net(torch_state) # log_softmax may not sum up to one # and can be negative as well @@ -553,7 +526,7 @@ def _full_pass(self, state) -> _FullPassResult: # the log probabilities of the policy #logprob = action_sampler_dist.log_prob(action).unsqueeze(-1)#policy.view(-1)[action] - entropy = action_sampler_dist.entropy().unsqueeze(-1) + entropies = action_sampler_dist.entropy().unsqueeze(-1) #logprobs.append(logprob_) #logpa = dist.log_prob(action).unsqueeze(-1) @@ -562,9 +535,12 @@ def _full_pass(self, state) -> _FullPassResult: #actions = actions.item() if len(action) == 1 else action.data.numpy() is_exploratory = actions != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) - full_pass_result = _FullPassResult(logprobs=logprobs, actions=actions) + full_pass_result = _FullPassResult(logprobs=logprobs, actions=actions, + values=values, entropies=entropies) + return full_pass_result #action, is_exploratory, logprob, entropy, value + """ def _interaction_step(self, states, env: Env) -> _InteractionResult: actions, is_exploratory, logpas, entropies, values = self._full_pass(states) time_step = env.step(actions) @@ -573,7 +549,8 @@ def _interaction_step(self, states, env: Env) -> _InteractionResult: interaction_result = _InteractionResult(logpas=logpas, values=values, entropies=entropies, rewards=rewards) return interaction_result - """ + """ + """ self.logpas.append(logpas); self.entropies.append(entropies) self.rewards.append(rewards); @@ -584,4 +561,4 @@ def _interaction_step(self, states, env: Env) -> _InteractionResult: self.running_exploration += is_exploratory[:, np.newaxis].astype(np.int) return new_states, is_terminals - """ \ No newline at end of file + """ \ No newline at end of file diff --git a/src/spaces/multiprocess_env.py b/src/spaces/multiprocess_env.py index 140d872..c35af2f 100644 --- a/src/spaces/multiprocess_env.py +++ b/src/spaces/multiprocess_env.py @@ -81,7 +81,8 @@ def work(self, rank, env_builder: Callable, env_args: dict, agent: Agent, pipe_e if cmd == 'reset': pipe_end.send(env.reset(**kwargs)) elif cmd == 'step': - pipe_end.send(env.step(**kwargs)) + time_step: TimeStep = env.step(**kwargs) + pipe_end.send(time_step) elif cmd == '_past_limit': pipe_end.send(env._elapsed_steps >= env._max_episode_steps) else: @@ -135,8 +136,14 @@ def step(self, actions: ActionVector) -> VectorTimeStep: for rank in range(self.n_workers): parent_end, _ = self.pipes[rank] process_time_step = parent_end.recv() + + # if on this step the local environment + # finished then reset + if process_time_step.done: + self.reset(rank=rank, **{}) + time_step.append(process_time_step) - """ + """ o, r, d, i = parent_end.recv() results.append((o, np.array(r, dtype=np.float), From 1c0bee13cb8611433ba7c45edce58217afeb60c4 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 1 Apr 2022 14:08:05 +0100 Subject: [PATCH 06/13] API update... --- src/algorithms/a2c.py | 204 +++++++------------------------- src/spaces/tiled_environment.py | 3 + src/spaces/time_step.py | 21 +++- src/trainers/pytorch_trainer.py | 2 + src/utils/replay_buffer.py | 51 +++++--- 5 files changed, 103 insertions(+), 178 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index c0a885e..5a36e2a 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -6,8 +6,7 @@ import torch.nn.functional as F from dataclasses import dataclass - -from src.utils.experience_buffer import unpack_batch +from src.utils import INFO from src.utils.episode_info import EpisodeInfo from src.utils.function_wraps import time_func_wrapper from src.utils.replay_buffer import ReplayBuffer @@ -24,36 +23,6 @@ TimeStep = TypeVar("TimeStep") Criteria = TypeVar('Criteria') -""" -class A2CNetBase(nn.Module): - Base class for A2C networks - - - - def __init__(self, architecture): - super(A2CNetBase, self).__init__() - self.architecture = architecture - - def forward(self, x): - return self.architecture(x) - - -class A2CNet(nn.Module): - - def __init__(self, common_net: A2CNetBase, policy_net: A2CNetBase, value_net: A2CNetBase): - super(A2CNet, self).__init__() - self.common_net = common_net - self.policy_net = policy_net - self.value_net = value_net - - def forward(self, x): - x = self.common_net(x) - - pol_out = self.policy_net(x) - val_out = self.value_net(x) - return pol_out, val_out -""" - def create_discounts_array(end: int, base: float, start=0, endpoint=False): """ @@ -72,7 +41,7 @@ def create_discounts_array(end: int, base: float, start=0, endpoint=False): return np.logspace(start, end, num=end, base=base, endpoint=endpoint) -def calculate_discounted_returns(rewards: List[float], discounts: List[float], gamma: float, n_workers: int = 1) -> np.array: +def calculate_discounted_returns(rewards: List[float], discounts: List[float], n_workers: int = 1) -> np.array: """Calculate the discounted returns from the episode rewards Parameters @@ -182,49 +151,6 @@ def default_action_sampler(logits: torch.Tensor) -> torch.distributions.Distribu action_dist = torch.distributions.Categorical(logits=logits) return action_dist - @staticmethod - def update_parameters(optimizer: Optimizer, episode_info: EpisodeInfo, *, config: A2CConfig): - """Update the parameters - - Parameters - ---------- - - optimizer: The optimizer instance used in training - episode_info: The episode info - config: The training configuration - - Returns - ------- - - """ - - # unroll the batch notice the flip we go in reverse - rewards = rewards = torch.Tensor(episode_info.info["replay_buffer"].to_ndarray("reward")).flip(dims=(0,)).view(-1) - logprobs = torch.stack(episode_info.info["logprobs"]).flip(dims=(0,)).view(-1) - values = torch.stack(episode_info.info["values"]).flip(dims=(0,)).view(-1) - - returns = [] - ret_ = torch.Tensor([0]) - - # Loop through the rewards in reverse order to generate - # R = r i + γ * R - for r in range(rewards.shape[0]): # B - ret_ = rewards[r] + config.gamma * ret_ - returns.append(ret_) - - returns = torch.stack(returns).view(-1) - returns = F.normalize(returns, dim=0) - - # compute the actor loss. - # Minimize the actor loss: –1 * γ t * (R – v(s t )) * π (a ⏐ s) - actor_loss = -1 * logprobs * (returns - values.detach()) # C - - # compute the critic loss. Minimize the critic loss: (R – v) 2 . - critic_loss = torch.pow(values - returns, 2) # D - loss = actor_loss.sum() + config.tau * critic_loss.sum() # E - loss.backward() - optimizer.step() - @classmethod def from_path(cls, config: A2CConfig, path: Path): """Load the A2C model parameters from the given path @@ -325,16 +251,19 @@ def play(self, env: Env, criteria: Criteria): if time_step.done: time_step = env.reset() - def optimize_model(self, logpas, entropies, values, rewards, n_workers) -> None: + @time_func_wrapper(show_time=False) + def optimize_model(self, logpas, entropies, values, rewards, n_workers: int) -> None: + print("{0} optimizing model={1}".format(INFO, self.name)) discounts = create_discounts_array(end=len(rewards), base=self.config.gamma, start=0, endpoint=False) # get the discounted returns - discounted_returns = calculate_discounted_returns(rewards, discounts, gamma=self.config.gamma, n_workers=self.config.n_workers) + discounted_returns = calculate_discounted_returns(rewards.detach().numpy(), discounts, n_workers=n_workers) # get the gaes - gaes = generalized_advantage_estimate(rewards=rewards, gamma=self.config.gamma, values=values, - tau=self.config.tau, n_workers=self.config.n_workers) + gaes = generalized_advantage_estimate(rewards=rewards.detach().numpy(), gamma=self.config.gamma, + values=values.detach().numpy(), + tau=self.config.tau, n_workers=self.config.n_workers) # discounted gaes discounted_gaes = discounts[:-1] * gaes @@ -391,8 +320,28 @@ def actions_after_training(self) -> None: self.save_model(path=self.config.save_model_path) def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> None: + """Actions the agent applis after the episode ends + + Parameters + ---------- + env + episode_idx + options + + Returns + ------- + + """ + + episode_info: EpisodeInfo = options["episode_info"] + + buffer: ReplayBuffer = episode_info.info["buffer"] - self.optimize_model(env.n_workers()) + self.optimize_model(rewards=buffer.get_item_as_torch_tensor("reward"), + logpas=buffer.get_torch__tensor_info_item_as_torch_tensor("logprobs"), + values=buffer.get_torch__tensor_info_item_as_torch_tensor("values"), + entropies=buffer.get_torch__tensor_info_item_as_torch_tensor("entropies"), + n_workers=env.n_workers) def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: """Train the algorithm on the episode @@ -438,49 +387,26 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: total_distortion = 0 time_step: VectorTimeStep = env.reset() - states = time_step.stack_observations() #torch.from_numpy(time_step.observation.to_numpy()).float() - actions = self._full_pass(states) - - # represent the probabilities under the - # policy - logprobs = [] - entropies = [] - rewards = [] - values = [] + states = time_step.stack_observations() + #torch.from_numpy(time_step.observation.to_numpy()).float() buffer = ReplayBuffer(buffer_size=self.config.n_iterations_per_episode) + for itr in range(self.config.n_iterations_per_episode): - time_step: VectorTimeStep = env.step(actions) + full_pass: _FullPassResult = self._network_pass(states) + time_step: VectorTimeStep = env.step(full_pass.actions) next_states = time_step.stack_observations() - full_pass: _FullPassResult = self._network_pass(next_states) - - # if we finished the episode we go to the optimization - # else step on the envifonment - - - - - - """ - # make a full pass on the model - action, is_exploratory, logprob, entropy, value = self._full_pass(state=states) + buffer.add(state=states, next_state=next_states, + reward=time_step.stack_rewards(), + action=full_pass.actions, + done=time_step.stack_dones(), + info={"values": full_pass.values, + "entropies": full_pass.entropies, + "logprobs": full_pass.logprobs}) - # step with the given actions - time_step: VectorTimeStep = env.step(action) - - episode_score += time_step.reward - total_distortion += time_step.info["total_distortion"] - - state = time_step.stack_observations() - - buffer.add(state=state, action=action, reward=time_step.reward, - next_state=time_step.observation, done=time_step.done) - - """ - - # check if we finished ma + states = next_states if time_step.done: break @@ -489,9 +415,7 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: episode_info = EpisodeInfo(episode_score=episode_score, total_distortion=total_distortion, episode_itrs=episode_iterations, - info={"replay_buffer": buffer, - "logprobs": logprobs, - "values": values}) + info={"buffer": buffer}) return episode_info def _network_pass(self, state) -> _FullPassResult: @@ -510,11 +434,6 @@ def _network_pass(self, state) -> _FullPassResult: # get the logprobs for all batches? logprobs = F.log_softmax(logits.view(-1), dim=0) - #logits = policy #.view(-1) - - ##dist = torch.distributions.Categorical(logits=logits) - ##action = dist.sample() - # choose the action. Typically this will be Categorical # but we leave it open for the application # We don't call logits.view(-1) so that we get @@ -523,42 +442,9 @@ def _network_pass(self, state) -> _FullPassResult: # environment worker action_sampler_dist = self.config.action_sampler(logits) actions = action_sampler_dist.sample() - - # the log probabilities of the policy - #logprob = action_sampler_dist.log_prob(action).unsqueeze(-1)#policy.view(-1)[action] entropies = action_sampler_dist.entropy().unsqueeze(-1) - #logprobs.append(logprob_) - - #logpa = dist.log_prob(action).unsqueeze(-1) - #entropy = dist.entropy().unsqueeze(-1) - - #actions = actions.item() if len(action) == 1 else action.data.numpy() - is_exploratory = actions != np.argmax(logits.detach().numpy(), axis=int(len(state) != 1)) full_pass_result = _FullPassResult(logprobs=logprobs, actions=actions, values=values, entropies=entropies) - return full_pass_result #action, is_exploratory, logprob, entropy, value - - """ - def _interaction_step(self, states, env: Env) -> _InteractionResult: - actions, is_exploratory, logpas, entropies, values = self._full_pass(states) - time_step = env.step(actions) - #new_states, rewards, is_terminals, _ = env.step(actions) - - interaction_result = _InteractionResult(logpas=logpas, values=values, - entropies=entropies, rewards=rewards) - return interaction_result - """ - """ - self.logpas.append(logpas); - self.entropies.append(entropies) - self.rewards.append(rewards); - self.values.append(values) - - self.running_reward += rewards - self.running_timestep += 1 - self.running_exploration += is_exploratory[:, np.newaxis].astype(np.int) - - return new_states, is_terminals - """ \ No newline at end of file + return full_pass_result diff --git a/src/spaces/tiled_environment.py b/src/spaces/tiled_environment.py index 8aa43d7..ef905bf 100644 --- a/src/spaces/tiled_environment.py +++ b/src/spaces/tiled_environment.py @@ -4,6 +4,7 @@ import copy import numpy as np +import torch from typing import TypeVar, List, Any from dataclasses import dataclass @@ -373,6 +374,8 @@ def step(self, action: Any, **options) -> TimeStep: # choose the action if isinstance(action, int) or isinstance(action, numpy.int64): action = self.get_action(aidx=action) + elif isinstance(action, torch.Tensor): + action = self.get_action(aidx=action.item()) raw_time_step = self.env.step(action) diff --git a/src/spaces/time_step.py b/src/spaces/time_step.py index 65d94e3..b515818 100644 --- a/src/spaces/time_step.py +++ b/src/spaces/time_step.py @@ -114,11 +114,30 @@ def __getitem__(self, idx) -> TimeStep: """ return self.time_steps[idx] + @property + def done(self) -> bool: + + for step in self.time_steps: + if not step.done: + return False + + return True + def append(self, time_step: TimeStep) -> None: self.time_steps.append(time_step) - def stack_observations(self): + def stack_observations(self) -> np.ndarray: return np.vstack([time_step.observation.to_list() for time_step in self.time_steps]) + def stack_rewards(self) -> np.ndarray: + return np.vstack([time_step.reward for time_step in self.time_steps]) + + def stack_step_type(self) -> list: + return [time_step.step_type for time_step in self.time_steps] + + def stack_dones(self): + return [time_step.done for time_step in self.time_steps] + + diff --git a/src/trainers/pytorch_trainer.py b/src/trainers/pytorch_trainer.py index e64b05e..5b78e53 100644 --- a/src/trainers/pytorch_trainer.py +++ b/src/trainers/pytorch_trainer.py @@ -212,6 +212,8 @@ def train(self): "avg distortion {2}".format(INFO, episode_info.episode_score, episode_info.total_distortion / episode_info.episode_itrs)) + self.agent.actions_after_episode_ends(self.env, episode, **{"episode_info": episode_info}) + self.actions_after_training() def actions_after_training(self) -> None: diff --git a/src/utils/replay_buffer.py b/src/utils/replay_buffer.py index bcdd5bb..3b7c40d 100644 --- a/src/utils/replay_buffer.py +++ b/src/utils/replay_buffer.py @@ -83,6 +83,37 @@ def __getitem__(self, name_attr: str) -> List: def to_numpy(self, name_attr: str) -> np.array: return np.array(self[name_attr]) + def get_item_as_torch_tensor(self, name_attr: str) -> torch.Tensor: + """ Returns a torch.Tensor representation of the + the named item + + Parameters + ---------- + + name_attr: The name of the attribute + + Returns + ------- + + An instance of torch.Tensor + """ + + items = self[name_attr] + + # convert to np.array to avoid pytorch warning + return torch.Tensor(np.array(items)) + + def get_torch__tensor_info_item_as_torch_tensor(self, name_attr: str) -> torch. Tensor: + + vals = [] + for item in self._memory: + info = item.info + + if name_attr in info: + vals.append(info[name_attr]) + + return torch.stack(vals) + def add(self, state: Any, action: Any, reward: float, next_state: Any, done: bool, info: dict = {}) -> None: """Add a new experience tuple in the buffer @@ -122,25 +153,7 @@ def sample(self, batch_size: int) -> List[ExperienceTuple]: return random.sample(self._memory, k=batch_size) - def get_item_as_torch_tensor(self, name_attr) -> torch.Tensor: - """ Returns a torch.Tensor representation of the - the named item - Parameters - ---------- - - name_attr: The name of the attribute - - Returns - ------- - - An instance of torch.Tensor - """ - - items = self[name_attr] - - # convert to np.array to avoid pytorch warning - return torch.Tensor(np.array(items)) def reinitialize(self) -> None: """Reinitialize the internal buffer @@ -154,3 +167,5 @@ def reinitialize(self) -> None: self._memory = deque(maxlen=self.capacity) + + From 0a6b6b6d53a59db963ab92d7b7a8d492b1f0e02e Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 14 Apr 2022 07:23:35 +0100 Subject: [PATCH 07/13] NOP --- src/trainers/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/trainers/__init__.py diff --git a/src/trainers/__init__.py b/src/trainers/__init__.py new file mode 100644 index 0000000..e69de29 From c4b553efdecf064f5ed07ae00ca7e227e7ed248d Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 14 Apr 2022 07:24:30 +0100 Subject: [PATCH 08/13] Add loss functions --- src/maths/loss_functions.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 src/maths/loss_functions.py diff --git a/src/maths/loss_functions.py b/src/maths/loss_functions.py new file mode 100644 index 0000000..56559d6 --- /dev/null +++ b/src/maths/loss_functions.py @@ -0,0 +1,13 @@ +"""Module loss_functions. Implements basic loss functions +geared towards using PyTorch + +""" + +import torch + + +def mse(returns: torch.Tensor, values: torch.Tensor) -> torch.Tensor: + + value_error = returns - values + loss = value_error.pow(2).mul(0.5).mean() + return loss From 8e50ba33866787c7b68a253a03fdabc8d11a49b1 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 14 Apr 2022 07:25:02 +0100 Subject: [PATCH 09/13] Fix bugs --- src/algorithms/a2c.py | 106 +++++++++++++++++------------- src/examples/a2c_three_columns.py | 4 +- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 5a36e2a..4d050a9 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -24,6 +24,38 @@ Criteria = TypeVar('Criteria') +@dataclass(init=True, repr=True) +class A2CConfig(object): + """Configuration for A2C algorithm + + """ + + gamma: float = 0.99 + tau: float = 1.2 + beta: float = 1.0 + policy_loss_weight: float = 1.0 + value_loss_weight: float = 1.0 + max_grad_norm: float = 1.0 + n_iterations_per_episode: int = 100 + n_workers: int = 1 + action_sampler: Callable = None + value_function: LossFunction = None + policy_loss: LossFunction = None + batch_size: int = 0 + device: str = 'cpu' + a2cnet: nn.Module = None + save_model_path: Path = None + optimizer_config: PyTorchOptimizerConfig = None + + +@dataclass(init=True, repr=True) +class _FullPassResult(object): + logprobs: torch.Tensor + values: torch.Tensor + actions: torch.Tensor + entropies: torch.Tensor + + def create_discounts_array(end: int, base: float, start=0, endpoint=False): """ @@ -99,50 +131,10 @@ def generalized_advantage_estimate(rewards: List[float], advantages = rewards[:-1] + gamma * values[1:] - values[: -1] # create the GAES by multiplying the tau discounts times the TD errors - gaes = np.array([[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:]) for t in range(total_time)] for w in range(n_workers)]) + gaes = np.array([[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:, w]) for t in range(total_time - 1)] for w in range(n_workers)]) return gaes -@dataclass(init=True, repr=True) -class A2CConfig(object): - """Configuration for A2C algorithm - - """ - - gamma: float = 0.99 - tau: float = 1.2 - beta: float = 1.0 - policy_loss_weight: float = 1.0 - value_loss_weight: float = 1.0 - max_grad_norm: float = 1.0 - n_iterations_per_episode: int = 100 - n_workers: int = 1 - action_sampler: Callable = None - value_function: LossFunction = None - policy_loss: LossFunction = None - batch_size: int = 0 - device: str = 'cpu' - a2cnet: nn.Module = None - save_model_path: Path = None - optimizer_config: PyTorchOptimizerConfig = None - - -@dataclass(init=True, repr=True) -class _InteractionResult(object): - logpas: float - entropies = None - rewards = None - values = None - - -@dataclass(init=True, repr=True) -class _FullPassResult(object): - logprobs: torch.Tensor - values: torch.Tensor - actions: torch.Tensor - entropies: torch.Tensor - - class A2C(Generic[Optimizer]): @staticmethod @@ -255,28 +247,48 @@ def play(self, env: Env, criteria: Criteria): def optimize_model(self, logpas, entropies, values, rewards, n_workers: int) -> None: print("{0} optimizing model={1}".format(INFO, self.name)) + + logpas_ = logpas #torch.stack(logpas).squeeze() + entropies_ = entropies #torch.stack(entropies).squeeze() + values_ = values #torch.stack(values).squeeze() + discounts = create_discounts_array(end=len(rewards), base=self.config.gamma, start=0, endpoint=False) + rewards_ = np.array(rewards).squeeze() + # get the discounted returns - discounted_returns = calculate_discounted_returns(rewards.detach().numpy(), discounts, n_workers=n_workers) + discounted_returns = calculate_discounted_returns(rewards_, + discounts, + n_workers=self.config.n_workers) + + np_values = values.data.numpy().squeeze() # get the gaes - gaes = generalized_advantage_estimate(rewards=rewards.detach().numpy(), gamma=self.config.gamma, - values=values.detach().numpy(), + gaes = generalized_advantage_estimate(rewards=rewards_, gamma=self.config.gamma, + values=np_values, tau=self.config.tau, n_workers=self.config.n_workers) # discounted gaes discounted_gaes = discounts[:-1] * gaes + values_ = values_[:-1, ...].view(-1).unsqueeze(1) + #logpas_ = logpas_.view(-1).unsqueeze(1) + entropies_ = entropies_.view(-1).unsqueeze(1) + returns_ = torch.FloatTensor(discounted_returns.T[:-1]).view(-1).unsqueeze(1) + discounted_gaes = torch.FloatTensor(discounted_gaes.T)#.view(-1).unsqueeze(1) + # the loss function for the critic network - value_loss_function = mse(returns=discounted_returns, values=values) - policy_loss = - (discounted_gaes * logpas).mean() + #if not isinstance(discounted_returns, torch.Tensor): + # discounted_returns = torch.from_numpy(discounted_returns) + + value_loss_function = mse(returns=returns_.detach(), values=values_) + policy_loss = - (discounted_gaes.detach() * logpas_).mean() # compute a total loss function to minimize if self.config.beta is not None: # add entropy loss - entropy_loss = -entropies.mean() + entropy_loss = -entropies_.mean() loss = self.config.policy_loss_weight * policy_loss + \ self.config.value_loss_weight * value_loss_function + \ diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py index 99a7993..00af006 100644 --- a/src/examples/a2c_three_columns.py +++ b/src/examples/a2c_three_columns.py @@ -26,6 +26,7 @@ N_LAYERS = 1 N_BINS = 10 N_EPISODES = 10 +N_WORKERS = 2 OUTPUT_MSG_FREQUENCY = 100 GAMMA = 0.99 ALPHA = 0.1 @@ -172,6 +173,7 @@ def action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: # agent configuration a2c_config = A2CConfig(action_sampler=action_sampler, n_iterations_per_episode=N_ITRS_PER_EPISODE, a2cnet=net, save_model_path=Path("./a2c_three_columns_output/"), + n_workers=N_WORKERS, optimizer_config=PyTorchOptimizerConfig(optimizer_type=OptimizerType.ADAM)) # create the agent @@ -180,7 +182,7 @@ def action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: # create a trainer to train the Qlearning agent configuration = PyTorchTrainerConfig(n_episodes=N_EPISODES) - env = MultiprocessEnv(env_builder=load_discrete_env, env_args={}, n_workers=2) + env = MultiprocessEnv(env_builder=load_discrete_env, env_args={}, n_workers=N_WORKERS) try: From 3e99824eb31a57ffe902b6c5b70376464590eb24 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 Apr 2022 09:47:49 +0100 Subject: [PATCH 10/13] #79 Refactor API --- src/utils/replay_buffer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/replay_buffer.py b/src/utils/replay_buffer.py index 3b7c40d..e4e8432 100644 --- a/src/utils/replay_buffer.py +++ b/src/utils/replay_buffer.py @@ -114,8 +114,8 @@ def get_torch__tensor_info_item_as_torch_tensor(self, name_attr: str) -> torch. return torch.stack(vals) - def add(self, state: Any, action: Any, reward: float, - next_state: Any, done: bool, info: dict = {}) -> None: + def add(self, state: Any, action: Any, reward: Any, + next_state: Any, done: Any, info: dict = {}) -> None: """Add a new experience tuple in the buffer Parameters From 17850555765d173270a12766a955d5234c61e9c8 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 Apr 2022 10:02:14 +0100 Subject: [PATCH 11/13] #79 Refactor implementation --- src/algorithms/a2c.py | 395 +++++++++++++++++++++++------------------- 1 file changed, 218 insertions(+), 177 deletions(-) diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 4d050a9..3d38035 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -14,6 +14,7 @@ from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder from src.maths.loss_functions import mse +from src.exceptions.exceptions import InvalidParamValue Env = TypeVar("Env") Optimizer = TypeVar("Optimizer") @@ -49,7 +50,7 @@ class A2CConfig(object): @dataclass(init=True, repr=True) -class _FullPassResult(object): +class _ActResult(object): logprobs: torch.Tensor values: torch.Tensor actions: torch.Tensor @@ -73,13 +74,13 @@ def create_discounts_array(end: int, base: float, start=0, endpoint=False): return np.logspace(start, end, num=end, base=base, endpoint=endpoint) -def calculate_discounted_returns(rewards: List[float], discounts: List[float], n_workers: int = 1) -> np.array: +def calculate_discounted_returns(rewards: np.array, discounts: np.array, n_workers: int = 1) -> np.array: """Calculate the discounted returns from the episode rewards Parameters ---------- rewards: The list of rewards - gamma: The discount factor + discounts: The discount factor n_workers: The number of workers Returns @@ -93,48 +94,12 @@ def calculate_discounted_returns(rewards: List[float], discounts: List[float], n # Return numbers spaced evenly on a log scale. # In linear space, the sequence starts at base ** start # (base to the power of start) and ends with base ** stop (see endpoint below). - #discounts = np.logspace(0, total_time, num=total_time, base=gamma, endpoint=False) - # The return is the sum of discounted rewards from step until the # final step T returns = np.array([[np.sum(discounts[: total_time - t] * rewards[t:, w]) for t in range(total_time)] for w in range(n_workers)]) return returns -def generalized_advantage_estimate(rewards: List[float], - values: List[float], - gamma: float, tau: float, - n_workers: int) -> np.array: - """Computes an estimate of the advantage funcion - - Parameters - ---------- - rewards - values - gamma - tau - n_workers: The number of workers - - Returns - ------- - - """ - - # T - total_time = len(rewards) - - # (gamma*tau)^t - tau_discounts = np.logspace(0, total_time - 1, num=total_time-1, - base=gamma*tau, endpoint=False) - - # create TD errors: R_t + gamma*V_{t+1} - V_t for t=0 to T - advantages = rewards[:-1] + gamma * values[1:] - values[: -1] - - # create the GAES by multiplying the tau discounts times the TD errors - gaes = np.array([[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:, w]) for t in range(total_time - 1)] for w in range(n_workers)]) - return gaes - - class A2C(Generic[Optimizer]): @staticmethod @@ -184,129 +149,12 @@ def parameters(self) -> Any: """ return self.a2c_net.parameters() - def set_train_mode(self) -> None: - """Set the model to a training mode - - Returns - ------- - None - """ - self.a2c_net.train() - - def set_test_mode(self) -> None: - """Set the model to a testing mode - - Returns - ------- - None - """ - self.a2c_net.eval() - - def save_model(self, path: Path) -> None: - """Save the model on a file at the given path - - Parameters - ---------- - path: The path to save the model - - Returns - ------- - - None - """ - torch.save(self.a2c_net.state_dict(), Path(str(path) + "/" + self.name + ".pt")) - - def play(self, env: Env, criteria: Criteria): - """Play the agent on the environment - - Parameters - ---------- - env: The environment to test/play the agent - criteria: The criteria to stop the game - - Returns - ------- - - """ - - time_step = env.reset() - - while criteria.continue_itrs(): - state = time_step.observation.to_ndarray() - state = torch.from_numpy(state).float() - logits, values = self(state) - - # select action - action = None - time_step = env.step(action) - - if time_step.done: - time_step = env.reset() - - @time_func_wrapper(show_time=False) - def optimize_model(self, logpas, entropies, values, rewards, n_workers: int) -> None: - - print("{0} optimizing model={1}".format(INFO, self.name)) - - logpas_ = logpas #torch.stack(logpas).squeeze() - entropies_ = entropies #torch.stack(entropies).squeeze() - values_ = values #torch.stack(values).squeeze() - - discounts = create_discounts_array(end=len(rewards), base=self.config.gamma, start=0, endpoint=False) - - rewards_ = np.array(rewards).squeeze() - - # get the discounted returns - discounted_returns = calculate_discounted_returns(rewards_, - discounts, - n_workers=self.config.n_workers) - - np_values = values.data.numpy().squeeze() - - # get the gaes - gaes = generalized_advantage_estimate(rewards=rewards_, gamma=self.config.gamma, - values=np_values, - tau=self.config.tau, n_workers=self.config.n_workers) - - # discounted gaes - discounted_gaes = discounts[:-1] * gaes - - values_ = values_[:-1, ...].view(-1).unsqueeze(1) - #logpas_ = logpas_.view(-1).unsqueeze(1) - entropies_ = entropies_.view(-1).unsqueeze(1) - returns_ = torch.FloatTensor(discounted_returns.T[:-1]).view(-1).unsqueeze(1) - discounted_gaes = torch.FloatTensor(discounted_gaes.T)#.view(-1).unsqueeze(1) - - # the loss function for the critic network - #if not isinstance(discounted_returns, torch.Tensor): - # discounted_returns = torch.from_numpy(discounted_returns) - - value_loss_function = mse(returns=returns_.detach(), values=values_) - policy_loss = - (discounted_gaes.detach() * logpas_).mean() - - # compute a total loss function to minimize - if self.config.beta is not None: - - # add entropy loss - entropy_loss = -entropies_.mean() - - loss = self.config.policy_loss_weight * policy_loss + \ - self.config.value_loss_weight * value_loss_function + \ - self.config.beta * entropy_loss - else: - loss = self.config.policy_loss_weight * policy_loss + \ - self.config.value_loss_weight * value_loss_function - - self.optimizer.zero_grad() - loss.backward() - - # clip the grad if needed - torch.nn.utils.clip_grad_norm_(self.parameters(), - self.config.max_grad_norm) - self.optimizer.step() - def actions_before_training_begins(self, env: Env, **options) -> None: + if env.n_workers != self.config.n_workers: + raise InvalidParamValue(param_name="self.config.n_workers", + param_value=str(self.config.n_workers) + " not equal to " + str(env.n_workers)) + # build the optimizer we need in order to train the model self.optimizer = pytorch_optimizer_builder(opt_type=self.config.optimizer_config.optimizer_type, model_params=self.parameters(), @@ -349,11 +197,12 @@ def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> N buffer: ReplayBuffer = episode_info.info["buffer"] - self.optimize_model(rewards=buffer.get_item_as_torch_tensor("reward"), - logpas=buffer.get_torch__tensor_info_item_as_torch_tensor("logprobs"), - values=buffer.get_torch__tensor_info_item_as_torch_tensor("values"), - entropies=buffer.get_torch__tensor_info_item_as_torch_tensor("entropies"), - n_workers=env.n_workers) + reward = buffer.get_item_as_torch_tensor("reward"), + + self._optimize_model(rewards=buffer.get_item_as_torch_tensor("reward"), + logprobs=buffer.get_torch__tensor_info_item_as_torch_tensor("logprobs"), + values=buffer.get_torch__tensor_info_item_as_torch_tensor("values"), + entropies=buffer.get_torch__tensor_info_item_as_torch_tensor("entropies")) def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: """Train the algorithm on the episode @@ -400,23 +249,27 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: time_step: VectorTimeStep = env.reset() states = time_step.stack_observations() - #torch.from_numpy(time_step.observation.to_numpy()).float() buffer = ReplayBuffer(buffer_size=self.config.n_iterations_per_episode) for itr in range(self.config.n_iterations_per_episode): - full_pass: _FullPassResult = self._network_pass(states) - time_step: VectorTimeStep = env.step(full_pass.actions) + act_result: _ActResult = self._act(states) + time_step: VectorTimeStep = env.step(act_result.actions) next_states = time_step.stack_observations() + reward = time_step.stack_rewards() + + episode_score += np.mean(reward) + + # append the roll outs buffer.add(state=states, next_state=next_states, reward=time_step.stack_rewards(), - action=full_pass.actions, + action=act_result.actions, done=time_step.stack_dones(), - info={"values": full_pass.values, - "entropies": full_pass.entropies, - "logprobs": full_pass.logprobs}) + info={"values": act_result.values, + "entropies": act_result.entropies, + "logprobs": act_result.logprobs}) states = next_states @@ -425,12 +278,26 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: episode_iterations += 1 + # what do we do with this? + last_states = states + episode_info = EpisodeInfo(episode_score=episode_score, total_distortion=total_distortion, episode_itrs=episode_iterations, - info={"buffer": buffer}) + info={"buffer": buffer, "last_states": last_states}) return episode_info - def _network_pass(self, state) -> _FullPassResult: + def _act(self, state) -> _ActResult: + """The agent acts on the presented state by + choosing the actions + + Parameters + ---------- + state + + Returns + ------- + + """ if not isinstance(state, torch.Tensor): torch_state = torch.Tensor(state) @@ -444,7 +311,7 @@ def _network_pass(self, state) -> _FullPassResult: # log_softmax may not sum up to one # and can be negative as well # get the logprobs for all batches? - logprobs = F.log_softmax(logits.view(-1), dim=0) + #logprobs = F.log_softmax(logits.view(-1), dim=0) # choose the action. Typically this will be Categorical # but we leave it open for the application @@ -454,9 +321,183 @@ def _network_pass(self, state) -> _FullPassResult: # environment worker action_sampler_dist = self.config.action_sampler(logits) actions = action_sampler_dist.sample() + log_probs = action_sampler_dist.log_prob(actions) entropies = action_sampler_dist.entropy().unsqueeze(-1) - full_pass_result = _FullPassResult(logprobs=logprobs, actions=actions, - values=values, entropies=entropies) + full_pass_result = _ActResult(logprobs=log_probs, actions=actions, + values=values, entropies=entropies) return full_pass_result + + def _compute_advantages(self, rewards: np.array, values: np.array) -> np.array: + """Computes an estimate of the advantage function + + Parameters + ---------- + rewards: The rewards + values: The value estimates on the rollout + + Returns + ------- + + A numpy array representing the advantage estimate + """ + # T + total_time = len(rewards) + + # (gamma*tau)^t + tau_discounts = np.logspace(0, total_time - 1, num=total_time - 1, + base=self.config.gamma * self.config.tau, endpoint=False) + + rewards_ = rewards.squeeze() + values_ = values.squeeze() + + # create TD errors: R_t + gamma*V_{t+1} - V_t for t=0 to T + advantages = rewards_[:-1] + self.config.gamma * values_[1:] - values_[: -1] + + # create the GAES by multiplying the tau discounts times the TD errors + gaes = np.array( + [[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:, w]) for t in range(total_time - 1 )] for w in + range(self.config.n_workers)]) + return gaes + + def _compute_loss_function(self, advantages: torch.Tensor, logprobs: torch.Tensor, + returns: torch.Tensor, values: torch.Tensor, entropies: torch.Tensor) -> torch.Tensor: + """compute the loss mixture function + + Parameters + ---------- + + advantages: The advantage estimates + logprobs: The log probabilities + returns: The discounted returns + values: The value function + entropies: The entropies + + Returns + ------- + + A tensor representing the mixed loss function + """ + + value_loss_function = mse(returns=returns, values=values) + policy_loss = - (advantages.T * logprobs).mean() + + # compute a total loss function to minimize + if self.config.beta is not None: + + # add entropy loss + entropy_loss = -entropies.mean() + + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + \ + self.config.beta * entropy_loss + else: + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + + return loss + + @time_func_wrapper(show_time=False) + def _optimize_model(self, logprobs: torch.Tensor, entropies: torch.Tensor, values: torch.Tensor, + rewards: torch.Tensor) -> None: + """Optimize the model + + Parameters + ---------- + logprobs + entropies + values + rewards + + Returns + ------- + + """ + + print("{0} optimizing model={1}".format(INFO, self.name)) + + discounts: np.array = create_discounts_array(end=len(rewards), + base=self.config.gamma, start=0, endpoint=False) + + # get the discounted returns + discounted_returns: np.array = calculate_discounted_returns(rewards.numpy(), + discounts, + n_workers=self.config.n_workers) + + advantages: np.array = self._compute_advantages(rewards=rewards.numpy(), + values=values.detach().numpy()) + + loss: torch.Tensor = self._compute_loss_function(advantages=torch.from_numpy(advantages), values=values, + entropies=entropies, + returns=torch.from_numpy(discounted_returns), + logprobs=logprobs[:-1]) + + self.optimizer.zero_grad() + loss.backward() + + # clip the grad if needed + torch.nn.utils.clip_grad_norm_(self.parameters(), + self.config.max_grad_norm) + self.optimizer.step() + + print("{0} Finished optimization step....".format(INFO)) + + def set_train_mode(self) -> None: + """Set the model to a training mode + + Returns + ------- + None + """ + self.a2c_net.train() + + def set_test_mode(self) -> None: + """Set the model to a testing mode + + Returns + ------- + None + """ + self.a2c_net.eval() + + def save_model(self, path: Path) -> None: + """Save the model on a file at the given path + + Parameters + ---------- + path: The path to save the model + + Returns + ------- + + None + """ + torch.save(self.a2c_net.state_dict(), Path(str(path) + "/" + self.name + ".pt")) + + def play(self, env: Env, criteria: Criteria): + """Play the agent on the environment + + Parameters + ---------- + env: The environment to test/play the agent + criteria: The criteria to stop the game + + Returns + ------- + + """ + + time_step = env.reset() + + while criteria.continue_itrs(): + state = time_step.observation.to_ndarray() + state = torch.from_numpy(state).float() + logits, values = self(state) + + # select action + action = None + time_step = env.step(action) + + if time_step.done: + time_step = env.reset() From 940ccb2c7c1db08d1f1855086b93f2f552510ba5 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 Apr 2022 10:22:51 +0100 Subject: [PATCH 12/13] #79 Refactor documentation --- docs/source/API/{ => algorithms}/a2c.rst | 0 docs/source/API/{ => algorithms}/q_learning.rst | 0 .../API/{ => policies}/epsilon_greedy_policy.rst | 0 docs/source/API/trainers/pytorch_trainer.rst | 10 ++++++++++ docs/source/API/{ => trainers}/trainer.rst | 0 docs/source/modules.rst | 13 ++++++++----- 6 files changed, 18 insertions(+), 5 deletions(-) rename docs/source/API/{ => algorithms}/a2c.rst (100%) rename docs/source/API/{ => algorithms}/q_learning.rst (100%) rename docs/source/API/{ => policies}/epsilon_greedy_policy.rst (100%) create mode 100644 docs/source/API/trainers/pytorch_trainer.rst rename docs/source/API/{ => trainers}/trainer.rst (100%) diff --git a/docs/source/API/a2c.rst b/docs/source/API/algorithms/a2c.rst similarity index 100% rename from docs/source/API/a2c.rst rename to docs/source/API/algorithms/a2c.rst diff --git a/docs/source/API/q_learning.rst b/docs/source/API/algorithms/q_learning.rst similarity index 100% rename from docs/source/API/q_learning.rst rename to docs/source/API/algorithms/q_learning.rst diff --git a/docs/source/API/epsilon_greedy_policy.rst b/docs/source/API/policies/epsilon_greedy_policy.rst similarity index 100% rename from docs/source/API/epsilon_greedy_policy.rst rename to docs/source/API/policies/epsilon_greedy_policy.rst diff --git a/docs/source/API/trainers/pytorch_trainer.rst b/docs/source/API/trainers/pytorch_trainer.rst new file mode 100644 index 0000000..683462e --- /dev/null +++ b/docs/source/API/trainers/pytorch_trainer.rst @@ -0,0 +1,10 @@ +pytorch\_trainer +================ + +.. automodule:: pytorch_trainer + :members: worker + +.. autoclass:: PyTorchTrainerConfig + +.. autoclass:: PyTorchTrainer + :members: __init__, avg_rewards, avg_distortion, actions_before_training, actions_before_episode_begins, actions_after_episode_ends, train, actions_after_training diff --git a/docs/source/API/trainer.rst b/docs/source/API/trainers/trainer.rst similarity index 100% rename from docs/source/API/trainer.rst rename to docs/source/API/trainers/trainer.rst diff --git a/docs/source/modules.rst b/docs/source/modules.rst index fed6296..4b0e26c 100644 --- a/docs/source/modules.rst +++ b/docs/source/modules.rst @@ -4,15 +4,18 @@ API .. toctree:: :maxdepth: 4 - API/epsilon_greedy_policy - API/epsilon_greedy_q_estimator - API/q_learning - API/trainer + + API/algorithms/epsilon_greedy_q_estimator + API/algorithms/a2c + API/algorithms/q_learning + API/trainers/trainer + API/trainers/pytorch_trainer API/datasets/column_type API/exceptions/exceptions API/maths/optimizer_type API/maths/pytorch_optimizer_builder API/networks/a2c_networks + API/policies/epsilon_greedy_policy API/spaces/actions API/spaces/action_space API/spaces/state @@ -20,7 +23,7 @@ API API/spaces/tiled_environment API/spaces/time_step API/replay_buffer - API/a2c + From 34e2e08f81ad50a781863a6966241664ada06c91 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 Apr 2022 10:32:45 +0100 Subject: [PATCH 13/13] #79 Update documentation API --- docs/source/API/algorithms/a2c.rst | 12 +++++------- docs/source/API/epsilon_greedy_q_estimator.rst | 9 --------- docs/source/conf.py | 1 + docs/source/modules.rst | 1 - 4 files changed, 6 insertions(+), 17 deletions(-) delete mode 100644 docs/source/API/epsilon_greedy_q_estimator.rst diff --git a/docs/source/API/algorithms/a2c.rst b/docs/source/API/algorithms/a2c.rst index 08232e2..c31a12b 100644 --- a/docs/source/API/algorithms/a2c.rst +++ b/docs/source/API/algorithms/a2c.rst @@ -1,13 +1,11 @@ a2c === .. automodule:: a2c -.. autoclass:: A2CNetBase - :members: __init__, forward - -.. autoclass:: A2CNet - :members: __init__, forward - + :members: create_discounts_array, calculate_discounted_returns + .. autoclass:: A2CConfig + +.. autoclass:: _ActResult .. autoclass:: A2C - :members: update_parameters, __init__, share_memory, parameters, on_episode, _do_train + :members: __init__, share_memory, parameters, on_episode, default_action_sampler, from_path, _do_train diff --git a/docs/source/API/epsilon_greedy_q_estimator.rst b/docs/source/API/epsilon_greedy_q_estimator.rst deleted file mode 100644 index b58d229..0000000 --- a/docs/source/API/epsilon_greedy_q_estimator.rst +++ /dev/null @@ -1,9 +0,0 @@ -epsilon\_greedy\_q\_estimator -============================= - -.. automodule:: epsilon_greedy_q_estimator - -.. autoclass:: EpsilonGreedyQEstimatorConfig - -.. autoclass:: EpsilonGreedyQEstimator - :members: __init__, q_hat_value, update_weights, on_state diff --git a/docs/source/conf.py b/docs/source/conf.py index 94aa0a3..8ddf75a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -22,6 +22,7 @@ sys.path.append(os.path.abspath("../../src/utils/")) sys.path.append(os.path.abspath("../../src/datasets/")) sys.path.append(os.path.abspath("../../src/networks/")) +sys.path.append(os.path.abspath("../../src/trainers/")) print(sys.path) diff --git a/docs/source/modules.rst b/docs/source/modules.rst index 4b0e26c..57a2ad9 100644 --- a/docs/source/modules.rst +++ b/docs/source/modules.rst @@ -4,7 +4,6 @@ API .. toctree:: :maxdepth: 4 - API/algorithms/epsilon_greedy_q_estimator API/algorithms/a2c API/algorithms/q_learning