diff --git a/docs/source/API/a2c.rst b/docs/source/API/a2c.rst deleted file mode 100644 index 08232e2..0000000 --- a/docs/source/API/a2c.rst +++ /dev/null @@ -1,13 +0,0 @@ -a2c -=== -.. automodule:: a2c -.. autoclass:: A2CNetBase - :members: __init__, forward - -.. autoclass:: A2CNet - :members: __init__, forward - -.. autoclass:: A2CConfig - -.. autoclass:: A2C - :members: update_parameters, __init__, share_memory, parameters, on_episode, _do_train diff --git a/docs/source/API/algorithms/a2c.rst b/docs/source/API/algorithms/a2c.rst new file mode 100644 index 0000000..c31a12b --- /dev/null +++ b/docs/source/API/algorithms/a2c.rst @@ -0,0 +1,11 @@ +a2c +=== +.. automodule:: a2c + :members: create_discounts_array, calculate_discounted_returns + +.. autoclass:: A2CConfig + +.. autoclass:: _ActResult + +.. autoclass:: A2C + :members: __init__, share_memory, parameters, on_episode, default_action_sampler, from_path, _do_train diff --git a/docs/source/API/q_learning.rst b/docs/source/API/algorithms/q_learning.rst similarity index 100% rename from docs/source/API/q_learning.rst rename to docs/source/API/algorithms/q_learning.rst diff --git a/docs/source/API/epsilon_greedy_q_estimator.rst b/docs/source/API/epsilon_greedy_q_estimator.rst deleted file mode 100644 index b58d229..0000000 --- a/docs/source/API/epsilon_greedy_q_estimator.rst +++ /dev/null @@ -1,9 +0,0 @@ -epsilon\_greedy\_q\_estimator -============================= - -.. automodule:: epsilon_greedy_q_estimator - -.. autoclass:: EpsilonGreedyQEstimatorConfig - -.. autoclass:: EpsilonGreedyQEstimator - :members: __init__, q_hat_value, update_weights, on_state diff --git a/docs/source/API/epsilon_greedy_policy.rst b/docs/source/API/policies/epsilon_greedy_policy.rst similarity index 100% rename from docs/source/API/epsilon_greedy_policy.rst rename to docs/source/API/policies/epsilon_greedy_policy.rst diff --git a/docs/source/API/trainers/pytorch_trainer.rst b/docs/source/API/trainers/pytorch_trainer.rst new file mode 100644 index 0000000..683462e --- /dev/null +++ b/docs/source/API/trainers/pytorch_trainer.rst @@ -0,0 +1,10 @@ +pytorch\_trainer +================ + +.. automodule:: pytorch_trainer + :members: worker + +.. autoclass:: PyTorchTrainerConfig + +.. autoclass:: PyTorchTrainer + :members: __init__, avg_rewards, avg_distortion, actions_before_training, actions_before_episode_begins, actions_after_episode_ends, train, actions_after_training diff --git a/docs/source/API/trainer.rst b/docs/source/API/trainers/trainer.rst similarity index 100% rename from docs/source/API/trainer.rst rename to docs/source/API/trainers/trainer.rst diff --git a/docs/source/conf.py b/docs/source/conf.py index 94aa0a3..8ddf75a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -22,6 +22,7 @@ sys.path.append(os.path.abspath("../../src/utils/")) sys.path.append(os.path.abspath("../../src/datasets/")) sys.path.append(os.path.abspath("../../src/networks/")) +sys.path.append(os.path.abspath("../../src/trainers/")) print(sys.path) diff --git a/docs/source/modules.rst b/docs/source/modules.rst index fed6296..57a2ad9 100644 --- a/docs/source/modules.rst +++ b/docs/source/modules.rst @@ -4,15 +4,17 @@ API .. toctree:: :maxdepth: 4 - API/epsilon_greedy_policy - API/epsilon_greedy_q_estimator - API/q_learning - API/trainer + API/algorithms/epsilon_greedy_q_estimator + API/algorithms/a2c + API/algorithms/q_learning + API/trainers/trainer + API/trainers/pytorch_trainer API/datasets/column_type API/exceptions/exceptions API/maths/optimizer_type API/maths/pytorch_optimizer_builder API/networks/a2c_networks + API/policies/epsilon_greedy_policy API/spaces/actions API/spaces/action_space API/spaces/state @@ -20,7 +22,7 @@ API API/spaces/tiled_environment API/spaces/time_step API/replay_buffer - API/a2c + diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 9c2fe82..3d38035 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -1,16 +1,20 @@ import numpy as np -from typing import TypeVar, Generic, Any, Callable +from typing import TypeVar, Generic, Any, Callable, List from pathlib import Path import torch import torch.nn as nn import torch.nn.functional as F from dataclasses import dataclass - -from src.utils.experience_buffer import unpack_batch +from src.utils import INFO from src.utils.episode_info import EpisodeInfo from src.utils.function_wraps import time_func_wrapper from src.utils.replay_buffer import ReplayBuffer +from src.spaces.time_step import VectorTimeStep +from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig +from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder +from src.maths.loss_functions import mse +from src.exceptions.exceptions import InvalidParamValue Env = TypeVar("Env") Optimizer = TypeVar("Optimizer") @@ -21,35 +25,6 @@ Criteria = TypeVar('Criteria') -class A2CNetBase(nn.Module): - """Base class for A2C networks - - """ - - def __init__(self, architecture): - super(A2CNetBase, self).__init__() - self.architecture = architecture - - def forward(self, x): - return self.architecture(x) - - -class A2CNet(nn.Module): - - def __init__(self, common_net: A2CNetBase, policy_net: A2CNetBase, value_net: A2CNetBase): - super(A2CNet, self).__init__() - self.common_net = common_net - self.policy_net = policy_net - self.value_net = value_net - - def forward(self, x): - x = self.common_net(x) - - pol_out = self.policy_net(x) - val_out = self.value_net(x) - return pol_out, val_out - - @dataclass(init=True, repr=True) class A2CConfig(object): """Configuration for A2C algorithm @@ -58,59 +33,80 @@ class A2CConfig(object): gamma: float = 0.99 tau: float = 1.2 + beta: float = 1.0 + policy_loss_weight: float = 1.0 + value_loss_weight: float = 1.0 + max_grad_norm: float = 1.0 n_iterations_per_episode: int = 100 + n_workers: int = 1 action_sampler: Callable = None - loss_function: LossFunction = None + value_function: LossFunction = None + policy_loss: LossFunction = None batch_size: int = 0 device: str = 'cpu' a2cnet: nn.Module = None save_model_path: Path = None + optimizer_config: PyTorchOptimizerConfig = None -class A2C(Generic[Optimizer]): +@dataclass(init=True, repr=True) +class _ActResult(object): + logprobs: torch.Tensor + values: torch.Tensor + actions: torch.Tensor + entropies: torch.Tensor - @staticmethod - def update_parameters(optimizer: Optimizer, episode_info: EpisodeInfo, *, config: A2CConfig): - """Update the parameters - Parameters - ---------- +def create_discounts_array(end: int, base: float, start=0, endpoint=False): + """ - optimizer: The optimizer instance used in training - episode_info: The episode info - config: The training configuration + Parameters + ---------- + end + base + start + endpoint - Returns - ------- + Returns + ------- - """ + """ + return np.logspace(start, end, num=end, base=base, endpoint=endpoint) - # unroll the batch notice the flip we go in reverse - rewards = rewards = torch.Tensor(episode_info.info["replay_buffer"].to_numpy("reward")).flip(dims=(0,)).view(-1) - logprobs = torch.stack(episode_info.info["logprobs"]).flip(dims=(0,)).view(-1) - values = torch.stack(episode_info.info["values"]).flip(dims=(0,)).view(-1) - returns = [] - ret_ = torch.Tensor([0]) +def calculate_discounted_returns(rewards: np.array, discounts: np.array, n_workers: int = 1) -> np.array: + """Calculate the discounted returns from the episode rewards - # Loop through the rewards in reverse order to generate - # R = r i + γ * R - for r in range(rewards.shape[0]): # B - ret_ = rewards[r] + config.gamma * ret_ - returns.append(ret_) + Parameters + ---------- + rewards: The list of rewards + discounts: The discount factor + n_workers: The number of workers - returns = torch.stack(returns).view(-1) - returns = F.normalize(returns, dim=0) + Returns + ------- - # compute the actor loss. - # Minimize the actor loss: –1 * γ t * (R – v(s t )) * π (a ⏐ s) - actor_loss = -1 * logprobs * (returns - values.detach()) # C + """ - # compute the critic loss. Minimize the critic loss: (R – v) 2 . - critic_loss = torch.pow(values - returns, 2) # D - loss = actor_loss.sum() + config.tau * critic_loss.sum() # E - loss.backward() - optimizer.step() + # T + total_time = len(rewards) + + # Return numbers spaced evenly on a log scale. + # In linear space, the sequence starts at base ** start + # (base to the power of start) and ends with base ** stop (see endpoint below). + # The return is the sum of discounted rewards from step until the + # final step T + returns = np.array([[np.sum(discounts[: total_time - t] * rewards[t:, w]) for t in range(total_time)] for w in range(n_workers)]) + return returns + + +class A2C(Generic[Optimizer]): + + @staticmethod + def default_action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: + + action_dist = torch.distributions.Categorical(logits=logits) + return action_dist @classmethod def from_path(cls, config: A2CConfig, path: Path): @@ -134,6 +130,7 @@ def from_path(cls, config: A2CConfig, path: Path): def __init__(self, config: A2CConfig): self.config: A2CConfig = config + self.optimizer: Optimizer = None self.name = "A2C" @property @@ -143,17 +140,6 @@ def a2c_net(self) -> nn.Module: def __call__(self, x: torch.Tensor): return self.a2c_net(x) - def share_memory(self) -> None: - """Instruct the underlying network to - set up what is needed to share memory - - Returns - ------- - - None - """ - self.a2c_net.share_memory() - def parameters(self) -> Any: """The parameters of the underlying model @@ -163,75 +149,60 @@ def parameters(self) -> Any: """ return self.a2c_net.parameters() - def set_train_mode(self) -> None: - """Set the model to a training mode + def actions_before_training_begins(self, env: Env, **options) -> None: - Returns - ------- - None - """ - self.a2c_net.train() + if env.n_workers != self.config.n_workers: + raise InvalidParamValue(param_name="self.config.n_workers", + param_value=str(self.config.n_workers) + " not equal to " + str(env.n_workers)) - def set_test_mode(self) -> None: - """Set the model to a testing mode + # build the optimizer we need in order to train the model + self.optimizer = pytorch_optimizer_builder(opt_type=self.config.optimizer_config.optimizer_type, + model_params=self.parameters(), + **self.config.optimizer_config.as_dict()) - Returns - ------- - None - """ - self.a2c_net.eval() + if self.config.action_sampler is None: + self.config.action_sampler = A2C.default_action_sampler - def save_model(self, path: Path) -> None: - """Save the model on a file at the given path + def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: + self.set_train_mode() + self.optimizer.zero_grad() - Parameters - ---------- - path: The path to save the model + def actions_after_training(self) -> None: + """Any actions the agent needs to perform after training Returns ------- None """ - torch.save(self.a2c_net.state_dict(), Path(str(path) + "/" + self.name + ".pt")) - def play(self, env: Env, criteria: Criteria): - """Play the agent on the environment + if self.config.save_model_path is not None: + self.save_model(path=self.config.save_model_path) + + def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> None: + """Actions the agent applis after the episode ends Parameters ---------- - env: The environment to test/play the agent - criteria: The criteria to stop the game + env + episode_idx + options Returns ------- """ - time_step = env.reset() + episode_info: EpisodeInfo = options["episode_info"] - while criteria.continue_itrs(): - state = time_step.observation.to_numpy() - state = torch.from_numpy(state).float() - logits, values = self(state) + buffer: ReplayBuffer = episode_info.info["buffer"] - # select action - action = None - time_step = env.step(action) + reward = buffer.get_item_as_torch_tensor("reward"), - if time_step.done: - time_step = env.reset() - - def actions_after_training(self) -> None: - """Any actions the agent needs to perform after training - - Returns - ------- - - """ - - if self.config.save_model_path is not None: - self.save_model(path=self.config.save_model_path) + self._optimize_model(rewards=buffer.get_item_as_torch_tensor("reward"), + logprobs=buffer.get_torch__tensor_info_item_as_torch_tensor("logprobs"), + values=buffer.get_torch__tensor_info_item_as_torch_tensor("values"), + entropies=buffer.get_torch__tensor_info_item_as_torch_tensor("entropies")) def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: """Train the algorithm on the episode @@ -276,59 +247,257 @@ def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: episode_iterations = 0 total_distortion = 0 - buffer = ReplayBuffer(options["buffer_size"]) + time_step: VectorTimeStep = env.reset() + states = time_step.stack_observations() - time_step = env.reset() - state = torch.from_numpy(time_step.observation.to_numpy()).float() - - # represent the state function values - values = [] - - # represent the probabilities under the - # policy - logprobs = [] + buffer = ReplayBuffer(buffer_size=self.config.n_iterations_per_episode) for itr in range(self.config.n_iterations_per_episode): - # policy and critic values - policy, value = self.a2c_net(state) - - # - logits = policy.view(-1) - - values.append(value) - - # choose the action this should be - action = self.config.action_sampler(logits) - - action_applied = action - if isinstance(action, torch.Tensor): - action_applied = env.get_action(action.item()) + act_result: _ActResult = self._act(states) + time_step: VectorTimeStep = env.step(act_result.actions) + next_states = time_step.stack_observations() - # the log probabilities of the policy - logprob_ = policy.view(-1)[action] - logprobs.append(logprob_) + reward = time_step.stack_rewards() - time_step = env.step(action_applied) + episode_score += np.mean(reward) - episode_score += time_step.reward - total_distortion += time_step.info["total_distortion"] + # append the roll outs + buffer.add(state=states, next_state=next_states, + reward=time_step.stack_rewards(), + action=act_result.actions, + done=time_step.stack_dones(), + info={"values": act_result.values, + "entropies": act_result.entropies, + "logprobs": act_result.logprobs}) - state = torch.from_numpy(time_step.observation.to_numpy()).float() - - buffer.add(state=state, action=action, reward=time_step.reward, next_state=time_step.observation, - done=time_step.done) + states = next_states if time_step.done: break episode_iterations += 1 + # what do we do with this? + last_states = states + episode_info = EpisodeInfo(episode_score=episode_score, total_distortion=total_distortion, episode_itrs=episode_iterations, - info={"replay_buffer": buffer, - "logprobs": logprobs, - "values": values}) + info={"buffer": buffer, "last_states": last_states}) return episode_info + def _act(self, state) -> _ActResult: + """The agent acts on the presented state by + choosing the actions + + Parameters + ---------- + state + + Returns + ------- + + """ + + if not isinstance(state, torch.Tensor): + torch_state = torch.Tensor(state) + else: + torch_state = state + + # policy and critic values. The policy + # values are assumed raw + logits, values = self.a2c_net(torch_state) + + # log_softmax may not sum up to one + # and can be negative as well + # get the logprobs for all batches? + #logprobs = F.log_softmax(logits.view(-1), dim=0) + # choose the action. Typically this will be Categorical + # but we leave it open for the application + # We don't call logits.view(-1) so that we get + # as many actions as in the logits rows. + # Each logit row is expected to corrspond to an + # environment worker + action_sampler_dist = self.config.action_sampler(logits) + actions = action_sampler_dist.sample() + log_probs = action_sampler_dist.log_prob(actions) + entropies = action_sampler_dist.entropy().unsqueeze(-1) + + full_pass_result = _ActResult(logprobs=log_probs, actions=actions, + values=values, entropies=entropies) + + return full_pass_result + + def _compute_advantages(self, rewards: np.array, values: np.array) -> np.array: + """Computes an estimate of the advantage function + + Parameters + ---------- + rewards: The rewards + values: The value estimates on the rollout + + Returns + ------- + + A numpy array representing the advantage estimate + """ + # T + total_time = len(rewards) + + # (gamma*tau)^t + tau_discounts = np.logspace(0, total_time - 1, num=total_time - 1, + base=self.config.gamma * self.config.tau, endpoint=False) + + rewards_ = rewards.squeeze() + values_ = values.squeeze() + + # create TD errors: R_t + gamma*V_{t+1} - V_t for t=0 to T + advantages = rewards_[:-1] + self.config.gamma * values_[1:] - values_[: -1] + + # create the GAES by multiplying the tau discounts times the TD errors + gaes = np.array( + [[np.sum(tau_discounts[: total_time - 1 - t] * advantages[t:, w]) for t in range(total_time - 1 )] for w in + range(self.config.n_workers)]) + return gaes + + def _compute_loss_function(self, advantages: torch.Tensor, logprobs: torch.Tensor, + returns: torch.Tensor, values: torch.Tensor, entropies: torch.Tensor) -> torch.Tensor: + """compute the loss mixture function + + Parameters + ---------- + + advantages: The advantage estimates + logprobs: The log probabilities + returns: The discounted returns + values: The value function + entropies: The entropies + + Returns + ------- + + A tensor representing the mixed loss function + """ + + value_loss_function = mse(returns=returns, values=values) + policy_loss = - (advantages.T * logprobs).mean() + + # compute a total loss function to minimize + if self.config.beta is not None: + + # add entropy loss + entropy_loss = -entropies.mean() + + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + \ + self.config.beta * entropy_loss + else: + loss = self.config.policy_loss_weight * policy_loss + \ + self.config.value_loss_weight * value_loss_function + + return loss + + @time_func_wrapper(show_time=False) + def _optimize_model(self, logprobs: torch.Tensor, entropies: torch.Tensor, values: torch.Tensor, + rewards: torch.Tensor) -> None: + """Optimize the model + + Parameters + ---------- + logprobs + entropies + values + rewards + + Returns + ------- + + """ + + print("{0} optimizing model={1}".format(INFO, self.name)) + + discounts: np.array = create_discounts_array(end=len(rewards), + base=self.config.gamma, start=0, endpoint=False) + + # get the discounted returns + discounted_returns: np.array = calculate_discounted_returns(rewards.numpy(), + discounts, + n_workers=self.config.n_workers) + + advantages: np.array = self._compute_advantages(rewards=rewards.numpy(), + values=values.detach().numpy()) + + loss: torch.Tensor = self._compute_loss_function(advantages=torch.from_numpy(advantages), values=values, + entropies=entropies, + returns=torch.from_numpy(discounted_returns), + logprobs=logprobs[:-1]) + + self.optimizer.zero_grad() + loss.backward() + + # clip the grad if needed + torch.nn.utils.clip_grad_norm_(self.parameters(), + self.config.max_grad_norm) + self.optimizer.step() + + print("{0} Finished optimization step....".format(INFO)) + + def set_train_mode(self) -> None: + """Set the model to a training mode + + Returns + ------- + None + """ + self.a2c_net.train() + + def set_test_mode(self) -> None: + """Set the model to a testing mode + + Returns + ------- + None + """ + self.a2c_net.eval() + + def save_model(self, path: Path) -> None: + """Save the model on a file at the given path + + Parameters + ---------- + path: The path to save the model + + Returns + ------- + + None + """ + torch.save(self.a2c_net.state_dict(), Path(str(path) + "/" + self.name + ".pt")) + + def play(self, env: Env, criteria: Criteria): + """Play the agent on the environment + + Parameters + ---------- + env: The environment to test/play the agent + criteria: The criteria to stop the game + + Returns + ------- + + """ + + time_step = env.reset() + + while criteria.continue_itrs(): + state = time_step.observation.to_ndarray() + state = torch.from_numpy(state).float() + logits, values = self(state) + + # select action + action = None + time_step = env.step(action) + + if time_step.done: + time_step = env.reset() diff --git a/src/apps/qlearning_on_mock.py b/src/apps/qlearning_on_mock.py index ad66fa3..eb673f5 100644 --- a/src/apps/qlearning_on_mock.py +++ b/src/apps/qlearning_on_mock.py @@ -2,7 +2,7 @@ import numpy as np from src.algorithms.q_learning import QLearning, QLearnConfig -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.maths.string_distance_calculator import StringDistanceType from src.spaces.actions import ActionSuppress, ActionIdentity, ActionStringGeneralize from src.spaces.discrete_state_environment import Environment, EnvConfig diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py index f60f9ee..00af006 100644 --- a/src/examples/a2c_three_columns.py +++ b/src/examples/a2c_three_columns.py @@ -6,23 +6,27 @@ from src.algorithms.a2c import A2C, A2CConfig from src.networks.a2c_networks import A2CNetSimpleLinear from src.utils.serial_hierarchy import SerialHierarchy +from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData from src.spaces.tiled_environment import TiledEnv, TiledEnvConfig, Layer from src.spaces.discrete_state_environment import DiscreteStateEnvironment -from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData +from src.spaces.multiprocess_env import MultiprocessEnv from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize from src.policies.epsilon_greedy_policy import EpsilonDecayOption from src.utils.distortion_calculator import DistortionCalculationType, DistortionCalculator from src.maths.numeric_distance_type import NumericDistanceType from src.maths.string_distance_calculator import StringDistanceType +from src.maths.pytorch_optimizer_config import PyTorchOptimizerConfig +from src.maths.optimizer_type import OptimizerType from src.utils.reward_manager import RewardManager -from src.algorithms.pytorch_multi_process_trainer import PyTorchMultiProcessTrainer, PyTorchMultiProcessTrainerConfig, OptimizerConfig +from src.trainers.pytorch_trainer import PyTorchTrainer, PyTorchTrainerConfig from src.utils import INFO N_LAYERS = 1 N_BINS = 10 N_EPISODES = 10 +N_WORKERS = 2 OUTPUT_MSG_FREQUENCY = 100 GAMMA = 0.99 ALPHA = 0.1 @@ -98,7 +102,7 @@ def load_mock_subjects() -> MockSubjectsLoader: return ds -def load_discrete_env() -> DiscreteStateEnvironment: +def load_discrete_env(options) -> DiscreteStateEnvironment: mock_ds = load_mock_subjects() @@ -118,7 +122,7 @@ def load_discrete_env() -> DiscreteStateEnvironment: ActionNumericBinGeneralize(column_name="salary", generalization_table=bins), ActionIdentity(column_name="diagnosis")) - action_space.shuffle() + action_space.shuffle(seed=options["rank"] + 42) discrete_env = DiscreteStateEnvironment.from_options(data_set=mock_ds, action_space=action_space, @@ -152,11 +156,10 @@ def load_discrete_env() -> DiscreteStateEnvironment: return tiled_env -def action_sampler(logits) -> torch.Tensor: +def action_sampler(logits: torch.Tensor) -> torch.distributions.Distribution: action_dist = torch.distributions.Categorical(logits=logits) - action = action_dist.sample() - return action + return action_dist if __name__ == '__main__': @@ -164,24 +167,35 @@ def action_sampler(logits) -> torch.Tensor: # set the seed for random engine random.seed(42) + # this the A2C network net = A2CNetSimpleLinear(n_columns=3, n_actions=ACTION_SPACE_SIZE) # agent configuration a2c_config = A2CConfig(action_sampler=action_sampler, n_iterations_per_episode=N_ITRS_PER_EPISODE, - a2cnet=net, save_model_path=Path("./a2c_three_columns_output/")) + a2cnet=net, save_model_path=Path("./a2c_three_columns_output/"), + n_workers=N_WORKERS, + optimizer_config=PyTorchOptimizerConfig(optimizer_type=OptimizerType.ADAM)) # create the agent agent = A2C(a2c_config) # create a trainer to train the Qlearning agent - configuration = PyTorchMultiProcessTrainerConfig(n_episodes=N_EPISODES, - env_loader=load_discrete_env, - optimizer_config=OptimizerConfig()) + configuration = PyTorchTrainerConfig(n_episodes=N_EPISODES) + + env = MultiprocessEnv(env_builder=load_discrete_env, env_args={}, n_workers=N_WORKERS) + + try: + + env.make(agent=agent) + trainer = PyTorchTrainer(env=env, agent=agent, config=configuration) - trainer = PyTorchMultiProcessTrainer(agent=agent, config=configuration) + # train the agent + trainer.train() - # train the agent - trainer.train() + except Exception as e: + print("An excpetion was thrown...{0}".format(str(e))) + finally: + env.close() # avg_rewards = trainer.avg_rewards() """ diff --git a/src/examples/nstep_semi_grad_sarsa_three_columns.py b/src/examples/nstep_semi_grad_sarsa_three_columns.py index 8509857..0f52396 100644 --- a/src/examples/nstep_semi_grad_sarsa_three_columns.py +++ b/src/examples/nstep_semi_grad_sarsa_three_columns.py @@ -4,7 +4,7 @@ from src.algorithms.n_step_semi_gradient_sarsa import SARSAnConfig, SARSAn from src.algorithms.q_estimator import QEstimator -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.datasets.datasets_loaders import MockSubjectsLoader from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize diff --git a/src/examples/qlearning_three_columns.py b/src/examples/qlearning_three_columns.py index c765837..d11fe51 100644 --- a/src/examples/qlearning_three_columns.py +++ b/src/examples/qlearning_three_columns.py @@ -7,7 +7,7 @@ from pathlib import Path from src.algorithms.q_learning import QLearning, QLearnConfig -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.datasets.datasets_loaders import MockSubjectsLoader from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize diff --git a/src/examples/semi_gradient_sarsa.py b/src/examples/semi_gradient_sarsa.py index e28c558..21698da 100644 --- a/src/examples/semi_gradient_sarsa.py +++ b/src/examples/semi_gradient_sarsa.py @@ -9,7 +9,7 @@ from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData from src.spaces.action_space import ActionSpace from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.policies.epsilon_greedy_policy import EpsilonDecayOption from src.algorithms.epsilon_greedy_q_estimator import EpsilonGreedyQEstimatorConfig, EpsilonGreedyQEstimator from src.utils.distortion_calculator import DistortionCalculationType, DistortionCalculator diff --git a/src/maths/loss_functions.py b/src/maths/loss_functions.py new file mode 100644 index 0000000..56559d6 --- /dev/null +++ b/src/maths/loss_functions.py @@ -0,0 +1,13 @@ +"""Module loss_functions. Implements basic loss functions +geared towards using PyTorch + +""" + +import torch + + +def mse(returns: torch.Tensor, values: torch.Tensor) -> torch.Tensor: + + value_error = returns - values + loss = value_error.pow(2).mul(0.5).mean() + return loss diff --git a/src/maths/pytorch_optimizer_config.py b/src/maths/pytorch_optimizer_config.py new file mode 100644 index 0000000..358d2f3 --- /dev/null +++ b/src/maths/pytorch_optimizer_config.py @@ -0,0 +1,30 @@ +"""Module pytorch_optimizer_configuration. Specifies a +data class for configuring PyTorch optimizers + +""" + +from dataclasses import dataclass +from src.maths.optimizer_type import OptimizerType + + +@dataclass(init=True, repr=True) +class PyTorchOptimizerConfig(object): + """Configuration class for the optimizer + + """ + optimizer_type: OptimizerType = OptimizerType.ADAM + optimizer_learning_rate: float = 0.01 + optimizer_eps = 1.0e-5 + optimizer_betas: tuple = (0.9, 0.999) + optimizer_weight_decay: float = 0 + optimizer_amsgrad: bool = False + optimizer_maximize = False + + def as_dict(self) -> dict: + return {"optimizer_type": self.optimizer_type, + "learning_rate": self.optimizer_learning_rate, + "eps": self.optimizer_eps, + "betas": self.optimizer_betas, + "weight_decay": self.optimizer_weight_decay, + "amsgrad": self.optimizer_amsgrad, + "maximize": self.optimizer_maximize} \ No newline at end of file diff --git a/src/networks/a2c_networks.py b/src/networks/a2c_networks.py index 836ed7f..6ffad93 100644 --- a/src/networks/a2c_networks.py +++ b/src/networks/a2c_networks.py @@ -23,6 +23,8 @@ def __init__(self, n_columns: int, n_actions: int): """ super(A2CNetSimpleLinear, self).__init__() + self.n_columns = n_columns + self.n_actions = n_actions self.linear_l1 = nn.Linear(in_features=n_columns, out_features=n_actions) self.actor = nn.Linear(in_features=n_actions, out_features=n_actions) self.critic = nn.Linear(n_actions, 1) @@ -41,7 +43,7 @@ def forward(self, x: torch.Tensor) -> tuple: # activate y = F.relu(self.linear_l1(x)) - actor = F.log_softmax(self.actor(y), dim=0) # C - critic = torch.tanh(self.critic(y)) # D + actor = self.actor(y) #F.log_softmax(self.actor(y), dim=0) # C + critic = self.critic(y) #torch.tanh(self.critic(y)) # D return actor, critic diff --git a/src/parallel/processes_manager.py b/src/parallel/processes_manager.py index 67ef927..3407b7e 100644 --- a/src/parallel/processes_manager.py +++ b/src/parallel/processes_manager.py @@ -20,6 +20,16 @@ def __init__(self, n_procs: int) -> None: self.n_procs = n_procs self.processes = [] + def __len__(self) -> int: + """The number of workers handled by this + instance + + Returns + ------- + + """ + return len(self.processes) + def create_and_start(self, target: Callable, *args) -> None: for i in range(self.n_procs): p = mp.Process(target=target, args=args) @@ -41,4 +51,4 @@ def terminate(self) -> None: def join_and_terminate(self): self.join() - self.terminate() \ No newline at end of file + self.terminate() diff --git a/src/spaces/__init__.py b/src/spaces/__init__.py index a2934e7..24b86ec 100644 --- a/src/spaces/__init__.py +++ b/src/spaces/__init__.py @@ -1,2 +1,2 @@ from src.spaces.time_step import TimeStep, StepType, VectorTimeStep -from src.spaces.multiprocess_env import MultiprocessEnv \ No newline at end of file +from src.spaces.multiprocess_env import MultiprocessEnv diff --git a/src/spaces/action_space.py b/src/spaces/action_space.py index 8f082bb..c516dc9 100644 --- a/src/spaces/action_space.py +++ b/src/spaces/action_space.py @@ -69,7 +69,7 @@ def __setitem__(self, idx: int, action: ActionBase) -> None: def __len__(self) -> int: return len(self.actions) - def shuffle(self) -> None: + def shuffle(self, seed: int = 42) -> None: """Shuffles the action list Returns @@ -77,6 +77,7 @@ def shuffle(self) -> None: None """ + random.seed(seed) random.shuffle(self.actions) # fix the ids of the actions to diff --git a/src/spaces/multiprocess_env.py b/src/spaces/multiprocess_env.py index d743033..c35af2f 100644 --- a/src/spaces/multiprocess_env.py +++ b/src/spaces/multiprocess_env.py @@ -1,6 +1,9 @@ """Module multiprocess_env. Specifies -a vectorsized environment where each instance -of the environment is run independently +a vectorized environment where each instance +of the environment is run independently. The implementation +of the environment is taken from the book +Grokking Deep Reinforcement Learning Algorithms +by Manning publications """ @@ -11,7 +14,7 @@ from src.spaces import TimeStep, VectorTimeStep from src.parallel import TorchProcsHandler - +Agent = TypeVar('Agent') ActionVector = TypeVar('ActionVector') @@ -23,8 +26,19 @@ def __init__(self, env_builder: Callable, env_args: dict, n_workers: int): self.n_workers = n_workers self.workers = TorchProcsHandler(n_procs=n_workers) self.pipes = [mp.Pipe() for _ in range(self.n_workers)] + self.is_made: bool = False + + def __len__(self) -> int: + """The number of workers handled by this + instance + + Returns + ------- - def make(self): + """ + return len(self.workers) + + def make(self, agent: Agent): """Create the workers Returns @@ -33,11 +47,15 @@ def make(self): """ for w in range(self.n_workers): + env_args = self.env_args + env_args["rank"] = w self.workers.create_process_and_start(target=self.work, args=(w, self.env_builder, - self.env_args, + env_args, agent, self.pipes[w][1])) - def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: + self.is_made = True + + def work(self, rank, env_builder: Callable, env_args: dict, agent: Agent, pipe_end) -> None: """The worker function Parameters @@ -63,7 +81,8 @@ def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: if cmd == 'reset': pipe_end.send(env.reset(**kwargs)) elif cmd == 'step': - pipe_end.send(env.step(**kwargs)) + time_step: TimeStep = env.step(**kwargs) + pipe_end.send(time_step) elif cmd == '_past_limit': pipe_end.send(env._elapsed_steps >= env._max_episode_steps) else: @@ -73,12 +92,39 @@ def work(self, rank, env_builder: Callable, env_args: dict, pipe_end) -> None: pipe_end.close() break - def reset(self) -> TimeStep: - pass + def reset(self, rank=None, **kwargs) -> VectorTimeStep: + + if not self.is_made: + raise ValueError("Environment is not created. Did you call make()?") + + time_step = VectorTimeStep() + if rank is not None: + parent_end, _ = self.pipes[rank] + self._send_msg(('reset', {}), rank) + o = parent_end.recv() + time_step.append(o) + return time_step + + # if not reset for a specific worker + # then all workers should reset + self._broadcast_msg(('reset', kwargs)) + + # collect all the timesteps from the + # workers + for rank in range(self.n_workers): + parent_end, _ = self.pipes[rank] + process_time_step = parent_end.recv() + time_step.append(process_time_step) + + return time_step def step(self, actions: ActionVector) -> VectorTimeStep: - assert len(actions) == self.n_workers + if not self.is_made: + raise ValueError("Environment is not created. Did you call make()?") + + if len(actions) != self.n_workers: + raise ValueError("Number of actions is not equal to the number of workers") # send the messages to the workers [self._send_msg(('step', {'action': actions[rank]}), rank) for rank in range(self.n_workers)] @@ -90,8 +136,14 @@ def step(self, actions: ActionVector) -> VectorTimeStep: for rank in range(self.n_workers): parent_end, _ = self.pipes[rank] process_time_step = parent_end.recv() + + # if on this step the local environment + # finished then reset + if process_time_step.done: + self.reset(rank=rank, **{}) + time_step.append(process_time_step) - """ + """ o, r, d, i = parent_end.recv() results.append((o, np.array(r, dtype=np.float), @@ -101,6 +153,9 @@ def step(self, actions: ActionVector) -> VectorTimeStep: """ return time_step + def close(self, **kwargs): + self._close(**kwargs) + def _close(self, **kwargs): self._broadcast_msg(('close', kwargs)) @@ -121,4 +176,14 @@ def _send_msg(self, msg: Any, rank: int): parent_end.send(msg) def _broadcast_msg(self, msg): + """Broadcast the message to all workers + + Parameters + ---------- + msg + + Returns + ------- + + """ [parent_end.send(msg) for parent_end, _ in self.pipes] diff --git a/src/spaces/state.py b/src/spaces/state.py index a6f2d55..f7ffeb9 100644 --- a/src/spaces/state.py +++ b/src/spaces/state.py @@ -100,7 +100,7 @@ def __getitem__(self, name: str) -> float: """ return self.column_distortions[name] - def to_numpy(self) -> np.array: + def to_ndarray(self) -> np.array: """Returns the self.column_distortions values as numpy array Returns @@ -108,7 +108,14 @@ def to_numpy(self) -> np.array: np.array """ + return np.array(self.to_list()) - vals = list(self.column_distortions.values()) - return np.array(vals) + def to_list(self) -> list: + """Returns the self.column_distortions values as numpy array + + Returns + ------- + A list of floats + """ + return list(self.column_distortions.values()) diff --git a/src/spaces/tiled_environment.py b/src/spaces/tiled_environment.py index d5538c7..ef905bf 100644 --- a/src/spaces/tiled_environment.py +++ b/src/spaces/tiled_environment.py @@ -3,9 +3,12 @@ """ import copy +import numpy as np +import torch from typing import TypeVar, List, Any from dataclasses import dataclass +import numpy import numpy as np from src.extern.tile_coding import IHT, tiles @@ -350,7 +353,10 @@ def n_states(self) -> int: def config(self) -> Config: return self.env.config - def step(self, action: ActionBase, **options) -> TimeStep: + def close(self, **options) -> None: + pass + + def step(self, action: Any, **options) -> TimeStep: """Execute the action in the environment and return a new state for observation @@ -365,6 +371,12 @@ def step(self, action: ActionBase, **options) -> TimeStep: """ + # choose the action + if isinstance(action, int) or isinstance(action, numpy.int64): + action = self.get_action(aidx=action) + elif isinstance(action, torch.Tensor): + action = self.get_action(aidx=action.item()) + raw_time_step = self.env.step(action) # a state wrapper to communicate diff --git a/src/spaces/time_step.py b/src/spaces/time_step.py index 2d2ecea..b515818 100644 --- a/src/spaces/time_step.py +++ b/src/spaces/time_step.py @@ -6,6 +6,7 @@ import copy import enum from typing import NamedTuple, Generic, Optional, TypeVar +import numpy as np _Reward = TypeVar('_Reward') _Discount = TypeVar('_Discount') @@ -90,8 +91,53 @@ class VectorTimeStep(object): def __init__(self): self.time_steps = [] + def __len__(self) -> int: + """Returns the number of time-steps + + Returns + ------- + + """ + return len(self.time_steps) + + def __getitem__(self, idx) -> TimeStep: + """Returns the idx-th time step in this + VectorTimeStep + + Parameters + ---------- + idx: The index of the time step to return + + Returns + ------- + + """ + return self.time_steps[idx] + + @property + def done(self) -> bool: + + for step in self.time_steps: + if not step.done: + return False + + return True + def append(self, time_step: TimeStep) -> None: self.time_steps.append(time_step) + def stack_observations(self) -> np.ndarray: + return np.vstack([time_step.observation.to_list() for time_step in self.time_steps]) + + def stack_rewards(self) -> np.ndarray: + return np.vstack([time_step.reward for time_step in self.time_steps]) + + def stack_step_type(self) -> list: + return [time_step.step_type for time_step in self.time_steps] + + def stack_dones(self): + return [time_step.done for time_step in self.time_steps] + + diff --git a/src/trainers/__init__.py b/src/trainers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/algorithms/pytorch_multi_process_trainer.py b/src/trainers/pytorch_trainer.py similarity index 72% rename from src/algorithms/pytorch_multi_process_trainer.py rename to src/trainers/pytorch_trainer.py index 7a55acf..5b78e53 100644 --- a/src/algorithms/pytorch_multi_process_trainer.py +++ b/src/trainers/pytorch_trainer.py @@ -23,39 +23,12 @@ @dataclass(init=True, repr=True) -class OptimizerConfig(object): - """Configuration class for the optimizer - - """ - optimizer_type: OptimizerType = OptimizerType.ADAM - optimizer_learning_rate: float = 0.01 - optimizer_eps = 1.0e-5 - optimizer_betas: tuple = (0.9, 0.999) - optimizer_weight_decay: float = 0 - optimizer_amsgrad: bool = False - optimizer_maximize = False - - def as_dict(self) -> dict: - return {"optimizer_type": self.optimizer_type, - "learning_rate": self.optimizer_learning_rate, - "eps": self.optimizer_eps, - "betas": self.optimizer_betas, - "weight_decay": self.optimizer_weight_decay, - "amsgrad": self.optimizer_amsgrad, - "maximize": self.optimizer_maximize} - - -@dataclass(init=True, repr=True) -class PyTorchMultiProcessTrainerConfig(object): +class PyTorchTrainerConfig(object): """Configuration for PyTorchMultiProcessTrainer """ n_procs: int = 1 n_episodes: int = 100 - optimizer_config: OptimizerConfig = OptimizerConfig() - env_loader: EnvLoader = None - buffer_size: int = 1000 - master_process: int = 0 @dataclass(init=True, repr=True) @@ -110,13 +83,13 @@ def worker(worker_idx: int, worker_model: nn.Module, params: dir): params["update_params_functor"](optimizer, episode_info, config=worker_model.config) #**params) -class PyTorchMultiProcessTrainer(object): +class PyTorchTrainer(object): """The class PyTorchMultiProcessTrainer. Trainer for multiprocessing with PyTorch """ - def __init__(self, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> None: + def __init__(self, env: Env, agent: Agent, config: PyTorchTrainerConfig) -> None: """Constructor. Initialize a trainer by passing the training environment instance the agent to train and configuration dictionary @@ -128,6 +101,7 @@ def __init__(self, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> No """ + self.env: Env = env self.agent = agent self.configuration = config # monitor performance @@ -166,7 +140,14 @@ def actions_before_training(self) -> None: None """ - self.agent.share_memory() + if self.env is None: + raise ValueError("Environment has not been specified") + + if self.agent is None: + raise ValueError("Agent has not been specified") + + self.env.reset() + self.agent.actions_before_training_begins(self.env) def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: """Perform any actions necessary before the training begins @@ -218,20 +199,20 @@ def train(self): # create the processes by attaching the worker - process_handler = TorchProcsHandler(n_procs=self.configuration.n_procs) + for episode in range(0, self.configuration.n_episodes): + print("{0} On episode {1}/{2}".format(INFO, episode, self.configuration.n_episodes)) + + self.actions_before_episode_begins(self.env, episode) - for p in range(self.configuration.n_procs): - # worker_idx: int, worker_model: nn.Module, params: dir - process_handler.create_process_and_start(target=worker, - args=(p, self.agent, - {"optimizer_config": self.configuration.optimizer_config.as_dict(), - "n_episodes": self.configuration.n_episodes, - "update_params_functor": self.agent.update_parameters, - "env_loader": self.configuration.env_loader, - "buffer_size": self.configuration.buffer_size, - "master_process": self.configuration.master_process})) + # train for a number of iterations + episode_info: EpisodeInfo = self.agent.on_episode(self.env, episode) + + print("{0} Episode {1} finished in {2} secs".format(INFO, episode, episode_info.total_execution_time)) + print("{0} Episode score={1}, episode total " + "avg distortion {2}".format(INFO, episode_info.episode_score, + episode_info.total_distortion / episode_info.episode_itrs)) - process_handler.join_and_terminate() + self.agent.actions_after_episode_ends(self.env, episode, **{"episode_info": episode_info}) self.actions_after_training() diff --git a/src/algorithms/trainer.py b/src/trainers/trainer.py similarity index 100% rename from src/algorithms/trainer.py rename to src/trainers/trainer.py diff --git a/src/utils/replay_buffer.py b/src/utils/replay_buffer.py index bcdd5bb..e4e8432 100644 --- a/src/utils/replay_buffer.py +++ b/src/utils/replay_buffer.py @@ -83,8 +83,39 @@ def __getitem__(self, name_attr: str) -> List: def to_numpy(self, name_attr: str) -> np.array: return np.array(self[name_attr]) - def add(self, state: Any, action: Any, reward: float, - next_state: Any, done: bool, info: dict = {}) -> None: + def get_item_as_torch_tensor(self, name_attr: str) -> torch.Tensor: + """ Returns a torch.Tensor representation of the + the named item + + Parameters + ---------- + + name_attr: The name of the attribute + + Returns + ------- + + An instance of torch.Tensor + """ + + items = self[name_attr] + + # convert to np.array to avoid pytorch warning + return torch.Tensor(np.array(items)) + + def get_torch__tensor_info_item_as_torch_tensor(self, name_attr: str) -> torch. Tensor: + + vals = [] + for item in self._memory: + info = item.info + + if name_attr in info: + vals.append(info[name_attr]) + + return torch.stack(vals) + + def add(self, state: Any, action: Any, reward: Any, + next_state: Any, done: Any, info: dict = {}) -> None: """Add a new experience tuple in the buffer Parameters @@ -122,25 +153,7 @@ def sample(self, batch_size: int) -> List[ExperienceTuple]: return random.sample(self._memory, k=batch_size) - def get_item_as_torch_tensor(self, name_attr) -> torch.Tensor: - """ Returns a torch.Tensor representation of the - the named item - Parameters - ---------- - - name_attr: The name of the attribute - - Returns - ------- - - An instance of torch.Tensor - """ - - items = self[name_attr] - - # convert to np.array to avoid pytorch warning - return torch.Tensor(np.array(items)) def reinitialize(self) -> None: """Reinitialize the internal buffer @@ -154,3 +167,5 @@ def reinitialize(self) -> None: self._memory = deque(maxlen=self.capacity) + + diff --git a/tests/test_multiprocess_env.py b/tests/test_multiprocess_env.py index c6ae747..776c50e 100644 --- a/tests/test_multiprocess_env.py +++ b/tests/test_multiprocess_env.py @@ -1,13 +1,77 @@ import unittest import pytest -from src.spaces import MultiprocessEnv +from src.spaces import MultiprocessEnv, TimeStep, StepType + +class DummyEnv(object): + + def __init__(self, **options): + pass + + def close(self, **kwargs): + pass + + def step(self, **kwargs) -> TimeStep: + print("Action executed={0}".format(kwargs["action"])) + time_step = TimeStep(step_type=StepType.FIRST if kwargs["action"] == 1 else StepType.LAST, + reward=0.0, observation=1.0, info={}, discount=0.0) + return time_step + + def reset(self, **kwargs) -> TimeStep: + time_step = TimeStep(step_type=StepType.FIRST, + reward=0.0, observation=1.0, info={}, discount=0.0) + return time_step class TestMultiprocessEnv(unittest.TestCase): + @staticmethod + def make_environment(options): + return DummyEnv(**options) + def test_make(self): - pass + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + try: + multiproc_env.make() + multiproc_env.close() + except Exception as e: + print("Test failed due to excpetion {0}".format(str(e))) + multiproc_env.close() + + def test_step_fail(self): + + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + with pytest.raises(ValueError) as e: + multiproc_env.make() + multiproc_env.step([]) + + multiproc_env.close() + self.assertEqual("Number of actions is not equal to the number of workers", str(e.value)) + + def test_step(self): + + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + multiproc_env.make() + time_step = multiproc_env.step([1, 2]) + multiproc_env.close() + self.assertEqual(len(multiproc_env), len(time_step)) + self.assertEqual(StepType.FIRST, time_step[0].step_type) + self.assertEqual(StepType.LAST, time_step[1].step_type) + + def test_reset(self): + options = {} + multiproc_env = MultiprocessEnv(TestMultiprocessEnv.make_environment, options, n_workers=2) + + multiproc_env.make() + time_step = multiproc_env.reset() + multiproc_env.close() + self.assertEqual(len(multiproc_env), len(time_step)) if __name__ == '__main__': diff --git a/tests/test_trainer.py b/tests/test_trainer.py index 4ec772a..c7c2dc1 100644 --- a/tests/test_trainer.py +++ b/tests/test_trainer.py @@ -4,7 +4,7 @@ import unittest import pytest -from src.algorithms.trainer import Trainer +from src.trainers.trainer import Trainer from src.algorithms.n_step_semi_gradient_sarsa import SARSAnConfig, SARSAn from src.spaces.tiled_environment import TiledEnv