diff --git a/.gitignore b/.gitignore index 5448251..3e5e47d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ src/algorithms/__pycache__/ src/policies/__pycache__/ src/apps/__pycache__/ src/.coverage +src/maths/__pycache__/ diff --git a/docs/source/API/a2c.rst b/docs/source/API/a2c.rst new file mode 100644 index 0000000..08232e2 --- /dev/null +++ b/docs/source/API/a2c.rst @@ -0,0 +1,13 @@ +a2c +=== +.. automodule:: a2c +.. autoclass:: A2CNetBase + :members: __init__, forward + +.. autoclass:: A2CNet + :members: __init__, forward + +.. autoclass:: A2CConfig + +.. autoclass:: A2C + :members: update_parameters, __init__, share_memory, parameters, on_episode, _do_train diff --git a/docs/source/API/column_type.rst b/docs/source/API/column_type.rst new file mode 100644 index 0000000..b1dd713 --- /dev/null +++ b/docs/source/API/column_type.rst @@ -0,0 +1,15 @@ +column\_type +============ + +.. automodule:: column_type + +.. autoclass:: ColumnType + + + + + + + + + diff --git a/docs/source/API/discrete_state_environment.rst b/docs/source/API/discrete_state_environment.rst new file mode 100644 index 0000000..93d2f97 --- /dev/null +++ b/docs/source/API/discrete_state_environment.rst @@ -0,0 +1,31 @@ +discrete\_state\_environment +============================ + +.. automodule:: discrete_state_environment + + + + + + + + + + + + .. rubric:: Classes + + .. autosummary:: + + DiscreteEnvConfig + DiscreteStateEnvironment + MultiprocessEnv + + + + + + + + + diff --git a/docs/source/API/exceptions.rst b/docs/source/API/exceptions.rst new file mode 100644 index 0000000..35743f4 --- /dev/null +++ b/docs/source/API/exceptions.rst @@ -0,0 +1,30 @@ +exceptions +========== + +.. automodule:: exceptions + +.. autoclass:: Error + :members: __init__, __str__ + +.. autoclass:: IncompatibleVectorSizesException + :members: __init__, __str__ + +.. autoclass:: InvalidDataTypeException + :members: __init__, __str__ + +.. autoclass:: InvalidFileFormat + :members: __init__, __str__ + +.. autoclass:: InvalidParamValue + :members: __init__, __str__ + +.. autoclass:: InvalidSchemaException + :members: __init__, __str__ + +.. autoclass:: InvalidStateException + :members: __init__, __str__ + + + + + diff --git a/docs/source/API/optimizer_type.rst b/docs/source/API/optimizer_type.rst new file mode 100644 index 0000000..1802e6b --- /dev/null +++ b/docs/source/API/optimizer_type.rst @@ -0,0 +1,6 @@ +optimizer\_type +=============== + +.. automodule:: optimizer_type + +.. autoclass:: OptimizerType diff --git a/docs/source/API/pytorch_optimizer_builder.rst b/docs/source/API/pytorch_optimizer_builder.rst new file mode 100644 index 0000000..a7b2741 --- /dev/null +++ b/docs/source/API/pytorch_optimizer_builder.rst @@ -0,0 +1,5 @@ +pytorch\_optimizer\_builder +=========================== + +.. automodule:: pytorch_optimizer_builder + :members: pytorch_optimizer_builder diff --git a/docs/source/API/replay_buffer.rst b/docs/source/API/replay_buffer.rst new file mode 100644 index 0000000..0a3f01b --- /dev/null +++ b/docs/source/API/replay_buffer.rst @@ -0,0 +1,7 @@ +replay\_buffer +============== + +.. automodule:: replay_buffer + +.. autoclass:: ReplayBuffer + :members: __init__, __len__, __getitem__, add, sample, get_item_as_torch_tensor, reinitialize diff --git a/docs/source/API/tiled_environment.rst b/docs/source/API/tiled_environment.rst new file mode 100644 index 0000000..79785d5 --- /dev/null +++ b/docs/source/API/tiled_environment.rst @@ -0,0 +1,44 @@ +tiled\_environment +================== + +.. automodule:: tiled_environment + +.. autoclass:: TiledEnvConfig + +.. autoclass:: Tile + :members: __init__, build, search, + +.. autoclass:: Layer + :members: n_tiles_per_action, __init__, __len__, build_tiles, get_global_tile_index, _do_build_tile, _do_build_three_columns + + +.. autoclass:: Tiles + :members: __init__, __getitem__, __len__, build + + +.. autoclass:: TiledEnv + :members: from_options, __init__, action_space, n_actions, n_states, config, step, reset, get_state_action_tile_matrix, get_action, save_current_dataset, create_tiles, get_aggregated_state, initialize_column_counts, all_columns_visited, initialize_distances, apply_action. total_current_distortion, featurize_state_action, featurize_raw_state, _create_column_scales, _validate + + + + + + + + + + .. rubric:: Classes + + .. autosummary:: + + TiledEnv + TiledEnvConfig + + + + + + + + + diff --git a/docs/source/Examples/a2c_three_columns.rst b/docs/source/Examples/a2c_three_columns.rst new file mode 100644 index 0000000..bd9c741 --- /dev/null +++ b/docs/source/Examples/a2c_three_columns.rst @@ -0,0 +1,23 @@ +A2C algorithm on three columns data set +======================================= + + +A2C algorithm +------------- + +Both the Q-learning algorithm we used in `Q-learning on a three columns dataset `_ and the SARSA algorithm in +`Semi-gradient SARSA on a three columns data set`_ are value-based methods; that is they estimate value functions. Specifically the state-action function +:math:`Q`. By knowing :math:`Q` we can construct a policy to follow for example to choose the action that at the given state +maximizes the state-action function i.e. :math:`argmax_{\alpha}Q(s_t, \alpha)` i.e. a greedy policy. + +However, the true objective of reinforcement learning is to directly learn a policy :math:`\pi`. + + +The main advantage of learning a parametrized policy is that it can be any learnable function e.g. a linear model or a deep neural network. + +The A2C algorithm falls under the umbrella of actor-critic methods [REF]. In these methods, we estimate a parametrized policy; the actor +and a parametrized value function; the critic. + + +Specifically, we will use a weight-sharing model. Moreover, the environment is a multi-process class that gathers samples from multiple +emvironments at once diff --git a/docs/source/conf.py b/docs/source/conf.py index 8e650cb..e4c9874 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -18,6 +18,8 @@ sys.path.append(os.path.abspath("../../src/exceptions/")) sys.path.append(os.path.abspath("../../src/spaces/")) sys.path.append(os.path.abspath("../../src/policies/")) +sys.path.append(os.path.abspath("../../src/maths/")) +sys.path.append(os.path.abspath("../../src/utils/")) print(sys.path) diff --git a/docs/source/examples.rst b/docs/source/examples.rst index d45647a..2eff455 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -8,3 +8,4 @@ Some examples can be found below Examples/qlearning_three_columns Examples/semi_gradient_sarsa_three_columns + Examples/a2c_three_columns diff --git a/docs/source/modules.rst b/docs/source/modules.rst index 3a20fac..31f55ae 100644 --- a/docs/source/modules.rst +++ b/docs/source/modules.rst @@ -12,11 +12,12 @@ API API/epsilon_greedy_q_estimator API/q_learning API/trainer - generated/q_estimator - generated/exceptions - generated/action_space - generated/column_type - generated/discrete_state_environment - generated/observation_space - generated/tiled_environment + API/optimizer_type + API/pytorch_optimizer_builder + API/replay_buffer + API/a2c + API/exceptions + API/column_type + API/discrete_state_environment + API/tiled_environment diff --git a/src/algorithms/a2c.py b/src/algorithms/a2c.py index 715233f..4370a68 100644 --- a/src/algorithms/a2c.py +++ b/src/algorithms/a2c.py @@ -1,13 +1,15 @@ import numpy as np -from typing import TypeVar, Generic +from typing import TypeVar, Generic, Any, Callable import torch import torch.nn as nn import torch.nn.functional as F from dataclasses import dataclass + from src.utils.experience_buffer import unpack_batch from src.utils.episode_info import EpisodeInfo from src.utils.function_wraps import time_func_wrapper +from src.utils.replay_buffer import ReplayBuffer Env = TypeVar("Env") Optimizer = TypeVar("Optimizer") @@ -18,8 +20,8 @@ class A2CNetBase(nn.Module): - """ - Base class for A2C networks + """Base class for A2C networks + """ def __init__(self, architecture): @@ -54,9 +56,8 @@ class A2CConfig(object): gamma: float = 0.99 tau: float = 1.2 - n_workers: int = 1 n_iterations_per_episode: int = 100 - optimizer: Optimizer = None + action_sampler: Callable = None loss_function: LossFunction = None batch_size: int = 0 device: str = 'cpu' @@ -64,102 +65,68 @@ class A2CConfig(object): class A2C(Generic[Optimizer]): - def __init__(self, config: A2CConfig, a2c_net: A2CNet): - - self.gamma = config.gamma - self.tau = config.tau - self.n_workers = config.n_workers - self.n_iterations_per_episode = config.n_iterations_per_episode - self.batch_size = config.batch_size - self.optimizer = config.optimizer - self.device = config.device - self.loss_function = config.loss_function - self.a2c_net = a2c_net - self.rewards = [] - self.memory = [] - self.name = "A2C" + @staticmethod + def update_parameters(optimizer: Optimizer, episode_info: EpisodeInfo, *, config: A2CConfig): + """Update the parameters - def select_action(self, env: Env, observation: State) -> Action: - """ - Select an action - :param env: The environment over which the agent is trained - :param observation: The current observation of the environment - :return: Returns an Action type - """ - return env.sample_action() + Parameters + ---------- - def update_policy_network(self): - """ - Update the policy network - :return: - """ - pass + optimizer: The optimizer instance used in training + episode_info: The episode info + config: The training configuration - def calculate_loss(self): - """ - Calculate the loss - :return: - """ - pass + Returns + ------- - def accummulate_batch(self): - """ - Accumulate the memory items - :return: """ - pass - """ - def train(self, env: Env) -> None: - - Train the agent on the given environment - :param env: - :return: - - - # reset the environment and obtain the - # the time step - time_step: TimeStep = env.reset() + # unroll the batch notice the flip we go in reverse + rewards = rewards = torch.Tensor(episode_info.info["replay_buffer"].to_numpy("reward")).flip(dims=(0,)).view(-1) + logprobs = torch.stack(episode_info.info["logprobs"]).flip(dims=(0,)).view(-1) + values = torch.stack(episode_info.info["values"]).flip(dims=(0,)).view(-1) - observation = time_step.observation + returns = [] + ret_ = torch.Tensor([0]) - # the batch to process - batch = [] + # Loop through the rewards in reverse order to generate + # R = r i + γ * R + for r in range(rewards.shape[0]): # B + ret_ = rewards[r] + config.gamma * ret_ + returns.append(ret_) - # learn over the episode - for iteration in range(1, self.n_iterations_per_episode + 1): + returns = torch.stack(returns).view(-1) + returns = F.normalize(returns, dim=0) - # select an action - action = self.select_action(env=env, observation=observation) + # compute the actor loss. + # Minimize the actor loss: –1 * γ t * (R – v(s t )) * π (a ⏐ s) + actor_loss = -1 * logprobs * (returns - values.detach()) # C - # step in the environment according - # to the selected action - next_time_step = env.step(action=action) + # compute the critic loss. Minimize the critic loss: (R – v) 2 . + critic_loss = torch.pow(values - returns, 2) # D + loss = actor_loss.sum() + config.tau * critic_loss.sum() # E + loss.backward() + optimizer.step() - batch.append(next_time_step.observation) + def __init__(self, config: A2CConfig, a2c_net: A2CNet): - if len(batch) < self.batch_size: - continue + self.config: A2CConfig = config + self.a2c_net = a2c_net + self.name = "A2C" - # unpack the batch in order to process it - states_v, actions_t, vals_ref = unpack_batch(batch=batch, net=self.a2c_net, device=self.device) - batch.clear() + def share_memory(self) -> None: + """Instruct the underlying network to + set up what is needed to share memory - self.optimizer.zero_grad() - # we reached the end of the episode - #if next_time_step.last(): - # break + Returns + ------- - #next_state = next_time_step.observation - policy_val, v_val = self.a2c_net.forward(x=states_v) + None + """ + self.a2c_net.share_memory() - self.optimizer.zero_grad() - - # claculate loss - loss = self.calculate_loss() - loss.backward() - self.optimizer.step() - """ + def parameters(self) -> Any: + return self.a2c_net.parameters() def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: """Train the algorithm on the episode @@ -182,8 +149,9 @@ def on_episode(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: return episode_info @time_func_wrapper(show_time=False) - def _do_train(self, env: Env, episode_idx: int, **option) -> EpisodeInfo: - """Train the algorithm on the episode + def _do_train(self, env: Env, episode_idx: int, **options) -> EpisodeInfo: + """Train the algorithm on the episode. In fact this method simply + plays the environment to collect batches Parameters ---------- @@ -203,63 +171,59 @@ def _do_train(self, env: Env, episode_idx: int, **option) -> EpisodeInfo: episode_iterations = 0 total_distortion = 0 - episode_info = EpisodeInfo(episode_score=episode_score, total_distortion=total_distortion, episode_itrs=episode_iterations) - return episode_info + buffer = ReplayBuffer(options["buffer_size"]) - def actions_before_training(self, env: Env, **options) -> None: - """Any actions before training begins + time_step = env.reset() + state = torch.from_numpy(time_step.observation.to_numpy()).float() - Parameters - ---------- + # represent the state function values + values = [] - env: The environment that training occurs - options: Any options passed by the client code + # represent the probabilities under the + # policy + logprobs = [] - Returns - ------- - None - """ + for itr in range(self.config.n_iterations_per_episode): - """ - if not isinstance(self.config.policy, WithQTableMixinBase): - raise InvalidParamValue(param_name="policy", param_value=str(self.config.policy)) + # policy and critic values + policy, value = self.a2c_net(state) - for state in range(1, env.n_states): - for action in range(env.n_actions): - self.q_table[state, action] = 0.0 - """ + # + logits = policy.view(-1) - def actions_before_episode_begins(self, env: Env, episode_idx, **options) -> None: - """Execute any actions the algorithm needs before - the episode ends + values.append(value) - Parameters - ---------- + # choose the action this should be + action = self.config.action_sampler(logits) - env: The environment that training occurs - episode_idx: The episode index - options: Any options passed by the client code + action_applied = action + if isinstance(action, torch.Tensor): + action_applied = env.get_action(action.item()) - Returns - ------- + # the log probabilities of the policy + logprob_ = policy.view(-1)[action] + logprobs.append(logprob_) - None - """ + time_step = env.step(action_applied) - def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> None: - """Execute any actions the algorithm needs after - the episode ends + episode_score += time_step.reward + total_distortion += time_step.info["total_distortion"] - Parameters - ---------- - env: The environment that training occurs - episode_idx: The episode index - options: Any options passed by the client code + state = torch.from_numpy(time_step.observation.to_numpy()).float() - Returns - ------- - None + buffer.add(state=state, action=action, reward=time_step.reward, next_state=time_step.observation, + done=time_step.done) + + if time_step.done: + break + + episode_iterations += 1 + + episode_info = EpisodeInfo(episode_score=episode_score, + total_distortion=total_distortion, episode_itrs=episode_iterations, + info={"replay_buffer": buffer, + "logprobs": logprobs, + "values": values}) + return episode_info - """ - #self.config.policy.actions_after_episode(episode_idx) diff --git a/src/algorithms/pytorch_multi_process_trainer.py b/src/algorithms/pytorch_multi_process_trainer.py new file mode 100644 index 0000000..345e2c2 --- /dev/null +++ b/src/algorithms/pytorch_multi_process_trainer.py @@ -0,0 +1,277 @@ +"""Module pytorch_multi_process_trainer. Specifies a trainer +for PyTorch-based models. + +""" + +import numpy as np +import torch.nn as nn +from typing import TypeVar, Any +from dataclasses import dataclass + +import torch.multiprocessing as mp + +from src.utils import INFO +from src.utils.function_wraps import time_func, time_func_wrapper +from src.utils.episode_info import EpisodeInfo +from src.maths.optimizer_type import OptimizerType +from src.maths.pytorch_optimizer_builder import pytorch_optimizer_builder +from src.utils import INFO + +Env = TypeVar("Env") +Agent = TypeVar("Agent") +EnvLoader = TypeVar('EnvLoader') + + +@dataclass(init=True, repr=True) +class OptimizerConfig(object): + """Configuration class for the optimizer + + """ + optimizer_type: OptimizerType = OptimizerType.ADAM + optimizer_learning_rate: float = 0.01 + optimizer_eps = 1.0e-5 + optimizer_betas: tuple = (0.9, 0.999) + optimizer_weight_decay: float = 0 + optimizer_amsgrad: bool = False + optimizer_maximize = False + + def as_dict(self) -> dict: + return {"optimizer_type": self.optimizer_type, + "learning_rate": self.optimizer_learning_rate, + "eps": self.optimizer_eps, + "betas": self.optimizer_betas, + "weight_decay": self.optimizer_weight_decay, + "amsgrad": self.optimizer_amsgrad, + "maximize": self.optimizer_maximize} + + +@dataclass(init=True, repr=True) +class PyTorchMultiProcessTrainerConfig(object): + """Configuration for PyTorchMultiProcessTrainer + + """ + n_procs: int = 1 + n_episodes: int = 100 + optimizer_config: OptimizerConfig = OptimizerConfig() + env_loader: EnvLoader = None + buffer_size: int = 1000 + master_process: int = 0 + + +@dataclass(init=True, repr=True) +class WorkerResult(object): + worker_idx: int + + +class TorchProcsHandler(object): + """The TorchProcsHandler class. Utility + class to handle PyTorch processe + + """ + + def __init__(self, n_procs: int) -> None: + """Constructor + + Parameters + ---------- + n_procs: The number of processes to handle + + """ + self.n_procs = n_procs + self.processes = [] + + def create_and_start(self, target: Any, *args) -> None: + for i in range(self.n_procs): + p = mp.Process(target=target, args=args) + p.start() + self.processes.append(p) + + def create_process_and_start(self, target: Any, args) -> None: + p = mp.Process(target=target, args=args) + p.start() + self.processes.append(p) + + def join(self) -> None: + for p in self.processes: + p.join() + + def terminate(self) -> None: + for p in self.processes: + p.terminate() + + def join_and_terminate(self): + self.join() + self.terminate() + + +def worker(worker_idx: int, worker_model: nn.Module, params: dir): + """Executes the process work + + Parameters + ---------- + + worker_idx: The id of the worker + worker_model: The model the worker is using + params: Parameters needed + + Returns + ------- + + """ + + # load the environment. Every worker has a distinct + # copy of the environment + env = params["env_loader"]() + + # create the optimizer + optimizer = pytorch_optimizer_builder(opt_type=params["optimizer_config"]["optimizer_type"], + model_params=worker_model.parameters(), + **params["optimizer_config"]) + + for episode in range(params["n_episodes"]): + + if worker_idx == params["master_process"]: + print("{0} On episode {1}/{2}".format(INFO, episode, params["n_episodes"])) + + optimizer.zero_grad() + + # run the episode + episode_info = worker_model.on_episode(env=env, episode_idx=episode, buffer_size=params["buffer_size"]) + + if worker_idx == params["master_process"]: + print("{0} Episode {1} finished in {2} secs".format(INFO, episode, episode_info.total_execution_time)) + print("{0} Episode score={1}, episode total avg distortion {2}".format(INFO, episode_info.episode_score, + episode_info.total_distortion / episode_info.episode_itrs)) + + print("{0} Episode finished after {1} iterations".format(INFO, episode_info.episode_itrs)) + + # update the parameters this function is moel dependent + # so should be more generic + params["update_params_functor"](optimizer, episode_info, config=worker_model.config) #**params) + + +class PyTorchMultiProcessTrainer(object): + """The class PyTorchMultiProcessTrainer. Trainer + for multiprocessing with PyTorch + + """ + + def __init__(self, agent: Agent, config: PyTorchMultiProcessTrainerConfig) -> None: + """Constructor. Initialize a trainer by passing the training environment + instance the agent to train and configuration dictionary + + Parameters + ---------- + + agent: The agent to train + config: Configuration parameters for the trainer + + """ + + self.agent = agent + self.configuration = config + # monitor performance + self.total_rewards: np.array = np.zeros(self.configuration.n_episodes) + self.iterations_per_episode = [] + self.total_distortions = [] + + def avg_rewards(self) -> np.array: + """ + Returns the average reward per episode + :return: + """ + avg = np.zeros(self.configuration['n_episodes']) + + for i in range(self.total_rewards.shape[0]): + avg[i] = self.total_rewards[i] / self.iterations_per_episode[i] + return avg + + def avg_distortion(self) -> np.array: + """ + Returns the average reward per episode + :return: + """ + avg = np.zeros(self.configuration['n_episodes']) + + for i in range(len(self.total_distortions)): + avg[i] = self.total_distortions[i] / self.iterations_per_episode[i] + return avg + + def actions_before_training(self) -> None: + """Any actions to perform before training begins + + Returns + ------- + + None + """ + + self.agent.share_memory() + + def actions_before_episode_begins(self, env: Env, episode_idx: int, **options) -> None: + """Perform any actions necessary before the training begins + + Parameters + ---------- + env: The environment to train on + episode_idx: The training episode index + options: Any options passed by the client code + + Returns + ------- + + None + + """ + self.agent.actions_before_episode_begins(env, episode_idx, **options) + + def actions_after_episode_ends(self, env: Env, episode_idx: int, **options) -> None: + """Any actions after the training episode ends + + Parameters + ---------- + + env: The environment to train on + episode_idx: The training episode index + options: Any options passed by the client code + + Returns + ------- + + None + """ + self.agent.actions_after_episode_ends(env, episode_idx, **options) + + if episode_idx % self.configuration['output_msg_frequency'] == 0: + if self.env.config.distorted_set_path is not None: + self.env.save_current_dataset(episode_idx) + + @time_func_wrapper(show_time=True) + def train(self): + + print("{0} Training agent {1}".format(INFO, self.agent.name)) + print("{0} Number of training episodes {1}".format(INFO, self.configuration.n_episodes)) + print("{0} Number of processes {1}".format(INFO, self.configuration.n_procs)) + + # any actions needed before training starts + self.actions_before_training() + + # create the processes by attaching the worker + + process_handler = TorchProcsHandler(n_procs=self.configuration.n_procs) + + for p in range(self.configuration.n_procs): + # worker_idx: int, worker_model: nn.Module, params: dir + process_handler.create_process_and_start(target=worker, + args=(p, self.agent, + {"optimizer_config": self.configuration.optimizer_config.as_dict(), + "n_episodes": self.configuration.n_episodes, + "update_params_functor": self.agent.update_parameters, + "env_loader": self.configuration.env_loader, + "buffer_size": self.configuration.buffer_size, + "master_process": self.configuration.master_process})) + + process_handler.join_and_terminate() + + + diff --git a/src/examples/a2c_three_columns.py b/src/examples/a2c_three_columns.py new file mode 100644 index 0000000..eb12725 --- /dev/null +++ b/src/examples/a2c_three_columns.py @@ -0,0 +1,224 @@ +import random +from pathlib import Path +import numpy as np +import torch +import torch.nn as nn + +from src.algorithms.a2c import A2C, A2CConfig, A2CNet, A2CNetBase +from src.utils.serial_hierarchy import SerialHierarchy +from src.spaces.tiled_environment import TiledEnv, TiledEnvConfig, Layer +from src.spaces.discrete_state_environment import DiscreteStateEnvironment +from src.datasets.datasets_loaders import MockSubjectsLoader, MockSubjectsData +from src.spaces.action_space import ActionSpace +from src.spaces.actions import ActionIdentity, ActionStringGeneralize, ActionNumericBinGeneralize +from src.policies.epsilon_greedy_policy import EpsilonDecayOption +from src.algorithms.epsilon_greedy_q_estimator import EpsilonGreedyQEstimatorConfig, EpsilonGreedyQEstimator +from src.utils.distortion_calculator import DistortionCalculationType, DistortionCalculator +from src.utils.numeric_distance_type import NumericDistanceType +from src.utils.string_distance_calculator import StringDistanceType +from src.utils.reward_manager import RewardManager +from src.utils.plot_utils import plot_running_avg +from src.algorithms.pytorch_multi_process_trainer import PyTorchMultiProcessTrainer, PyTorchMultiProcessTrainerConfig, OptimizerConfig +from src.utils import INFO + + +N_LAYERS = 1 +N_BINS = 10 +N_EPISODES = 1000 +OUTPUT_MSG_FREQUENCY = 100 +GAMMA = 0.99 +ALPHA = 0.1 +N_ITRS_PER_EPISODE = 30 +EPS = 1.0 +EPSILON_DECAY_OPTION = EpsilonDecayOption.CONSTANT_RATE #.INVERSE_STEP +EPSILON_DECAY_FACTOR = 0.01 +MAX_DISTORTION = 0.7 +MIN_DISTORTION = 0.3 +OUT_OF_MAX_BOUND_REWARD = -1.0 +OUT_OF_MIN_BOUND_REWARD = -1.0 +IN_BOUNDS_REWARD = 5.0 +N_ROUNDS_BELOW_MIN_DISTORTION = 10 +SAVE_DISTORTED_SETS_DIR = "/home/alex/qi3/drl_anonymity/src/examples/semi_grad_sarsa/distorted_set" +REWARD_FACTOR = 0.95 +PUNISH_FACTOR = 2.0 +ACTION_SPACE_SIZE = 5 + + +def get_ethinicity_hierarchy(): + ethnicity_hierarchy = SerialHierarchy(values={}) + + ethnicity_hierarchy["Mixed White/Asian"] = "White/Asian" + ethnicity_hierarchy["White/Asian"] = "Mixed" + + ethnicity_hierarchy["Chinese"] = "Asian" + ethnicity_hierarchy["Indian"] = "Asian" + ethnicity_hierarchy["Mixed White/Black African"] = "White/Black" + ethnicity_hierarchy["White/Black"] = "Mixed" + + ethnicity_hierarchy["Black African"] = "African" + ethnicity_hierarchy["African"] = "Black" + ethnicity_hierarchy["Asian other"] = "Asian" + ethnicity_hierarchy["Black other"] = "Black" + ethnicity_hierarchy["Mixed White/Black Caribbean"] = "White/Black" + ethnicity_hierarchy["White/Black"] = "Mixed" + + ethnicity_hierarchy["Mixed other"] = "Mixed" + ethnicity_hierarchy["Arab"] = "Asian" + ethnicity_hierarchy["White Irish"] = "Irish" + ethnicity_hierarchy["Irish"] = "European" + ethnicity_hierarchy["Not stated"] = "Not stated" + ethnicity_hierarchy["White Gypsy/Traveller"] = "White" + ethnicity_hierarchy["White British"] = "British" + ethnicity_hierarchy["British"] = "European" + ethnicity_hierarchy["Bangladeshi"] = "Asian" + ethnicity_hierarchy["White other"] = "White" + ethnicity_hierarchy["Black Caribbean"] = "Caribbean" + ethnicity_hierarchy["Caribbean"] = "Black" + ethnicity_hierarchy["Pakistani"] = "Asian" + + ethnicity_hierarchy["European"] = "European" + ethnicity_hierarchy["Mixed"] = "Mixed" + ethnicity_hierarchy["Asian"] = "Asian" + ethnicity_hierarchy["Black"] = "Black" + ethnicity_hierarchy["White"] = "White" + return ethnicity_hierarchy + + +def load_mock_subjects() -> MockSubjectsLoader: + + mock_data = MockSubjectsData(FILENAME=Path("../../data/mocksubjects.csv"), + COLUMNS_TYPES={"ethnicity": str, "salary": float, "diagnosis": int}, + FEATURES_DROP_NAMES=["NHSno", "given_name", + "surname", "dob"] + ["preventative_treatment", + "gender", "education", "mutation_status"], + NORMALIZED_COLUMNS=["salary"]) + + ds = MockSubjectsLoader(mock_data) + + assert ds.n_columns == 3, "Invalid number of columns {0} not equal to 3".format(ds.n_columns) + + return ds + + +def load_discrete_env() -> DiscreteStateEnvironment: + + mock_ds = load_mock_subjects() + + # create bins for the salary generalization + unique_salary = mock_ds.get_column_unique_values(col_name="salary") + unique_salary.sort() + + # modify slightly the max value because + # we get out of bounds for the maximum salary + bins = np.linspace(unique_salary[0], unique_salary[-1] + 1, N_BINS) + + action_space = ActionSpace(n=ACTION_SPACE_SIZE) + action_space.add_many(ActionIdentity(column_name="ethnicity"), + ActionStringGeneralize(column_name="ethnicity", + generalization_table=get_ethinicity_hierarchy()), + ActionIdentity(column_name="salary"), + ActionNumericBinGeneralize(column_name="salary", generalization_table=bins), + ActionIdentity(column_name="diagnosis")) + + action_space.shuffle() + + discrete_env = DiscreteStateEnvironment.from_options(data_set=mock_ds, + action_space=action_space, + distortion_calculator=DistortionCalculator( + numeric_column_distortion_metric_type=NumericDistanceType.L2_AVG, + string_column_distortion_metric_type=StringDistanceType.COSINE_NORMALIZE, + dataset_distortion_type=DistortionCalculationType.SUM), + reward_manager=RewardManager(bounds=(MIN_DISTORTION, MAX_DISTORTION), + out_of_max_bound_reward=OUT_OF_MAX_BOUND_REWARD, + out_of_min_bound_reward=OUT_OF_MIN_BOUND_REWARD, + in_bounds_reward=IN_BOUNDS_REWARD), + gamma=GAMMA, + reward_factor=REWARD_FACTOR, + punish_factor=PUNISH_FACTOR, + min_distortion=MIN_DISTORTION, max_distortion=MAX_DISTORTION, + n_rounds_below_min_distortion=N_ROUNDS_BELOW_MIN_DISTORTION, + distorted_set_path=Path(SAVE_DISTORTED_SETS_DIR), + n_states=N_LAYERS * Layer.n_tiles_per_action(N_BINS, + mock_ds.n_columns)) + + # establish the configuration for the Tiled environment + tiled_env_config = TiledEnvConfig(n_layers=N_LAYERS, n_bins=N_BINS, + env=discrete_env, + column_ranges={"ethnicity": [0.0, 1.0], + "salary": [0.0, 1.0], + "diagnosis": [0.0, 1.0]}) + # create the Tiled environment + tiled_env = TiledEnv(tiled_env_config) + tiled_env.create_tiles() + + return tiled_env + +def action_sampler(logits) -> torch.Tensor: + + action_dist = torch.distributions.Categorical(logits=logits) + action = action_dist.sample() + return action + + +if __name__ == '__main__': + + # set the seed for random engine + random.seed(42) + + # in_features is the number of columns in the data set + # out_features is the number of actions in the environment + common_net = A2CNetBase(architecture=nn.Sequential(nn.Linear(in_features=3, out_features=ACTION_SPACE_SIZE))) + + # dim is the dimension along which Softmax will be computed (so every slice along dim will sum to 1) + # this model simply outputs a discrete probability distribution + # over the ACTION_SPACE_SIZE possible actions + policy_net = A2CNetBase(architecture=nn.Softmax(dim=0)) + + # The critic or value network outputs a single number representing the state value + value_net = A2CNetBase(architecture=nn.Linear(ACTION_SPACE_SIZE, 1)) + net = A2CNet(common_net, policy_net, value_net) + + # agent configuration + a2c_config = A2CConfig(action_sampler=action_sampler, n_iterations_per_episode=1000) + + # create the agent + agent = A2C(a2c_config, net) + + # create a trainer to train the Qlearning agent + configuration = PyTorchMultiProcessTrainerConfig(n_episodes=N_EPISODES, + env_loader=load_discrete_env, + optimizer_config=OptimizerConfig()) + + trainer = PyTorchMultiProcessTrainer(agent=agent, config=configuration) + + # train the agent + trainer.train() + + # avg_rewards = trainer.avg_rewards() + """ + avg_rewards = trainer.total_rewards + plot_running_avg(avg_rewards, steps=100, + xlabel="Episodes", ylabel="Reward", + title="Running reward average over 100 episodes") + + avg_episode_dist = np.array(trainer.total_distortions) + print("{0} Max/Min distortion {1}/{2}".format(INFO, np.max(avg_episode_dist), np.min(avg_episode_dist))) + + plot_running_avg(avg_episode_dist, steps=100, + xlabel="Episodes", ylabel="Distortion", + title="Running distortion average over 100 episodes") + + print("=============================================") + print("{0} Generating distorted dataset".format(INFO)) + """ + + """ + # Let's play + env.reset() + + stop_criterion = IterationControl(n_itrs=10, min_dist=MIN_DISTORTION, max_dist=MAX_DISTORTION) + agent.play(env=env, stop_criterion=stop_criterion) + env.save_current_dataset(episode_index=-2, save_index=False) + """ + print("{0} Done....".format(INFO)) + print("=============================================") diff --git a/src/maths/__init__.py b/src/maths/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/maths/optimizer_type.py b/src/maths/optimizer_type.py new file mode 100644 index 0000000..006482f --- /dev/null +++ b/src/maths/optimizer_type.py @@ -0,0 +1,14 @@ +"""Module optimizer_type. Specifies an +enumeration for various PyTorch optimizers + +""" +import enum + + +class OptimizerType(enum.IntEnum): + + INVALID = -1 + GD = 0 + SGD = 1 + BatchGD = 2 + ADAM = 3 diff --git a/src/maths/pytorch_optimizer_builder.py b/src/maths/pytorch_optimizer_builder.py new file mode 100644 index 0000000..54beaeb --- /dev/null +++ b/src/maths/pytorch_optimizer_builder.py @@ -0,0 +1,34 @@ +"""Module pytorch_optimizer_builder. Specifies +a simple factory for building PyTorch optimizers + +""" +from typing import Any +import torch.optim as optim +from src.maths.optimizer_type import OptimizerType + +TORCH_OPTIMIZER_TYPES = [OptimizerType.ADAM] + + +def pytorch_optimizer_builder(opt_type: OptimizerType, model_params: Any, **options) -> optim.Optimizer: + """ Factory method for building PyTorch optimizers + + Parameters + ---------- + + opt_type: The type of the optimizer + model_params: Model parameters to optimize on + options: Options for the optimizer + + Returns + ------- + + A concrete instance of the optim.Optimizer class + """ + + if opt_type not in TORCH_OPTIMIZER_TYPES: + raise ValueError("Invalid PyTorch optimizer type. Type {0} not in {1}".format(opt_type.name, + TORCH_OPTIMIZER_TYPES)) + if opt_type == OptimizerType.ADAM: + return optim.Adam(params=model_params, lr=options["learning_rate"], + eps=options["eps"], betas=options["betas"], weight_decay=options["weight_decay"], + amsgrad=options["amsgrad"]) \ No newline at end of file diff --git a/src/spaces/discrete_state_environment.py b/src/spaces/discrete_state_environment.py index fb612bf..773c9aa 100644 --- a/src/spaces/discrete_state_environment.py +++ b/src/spaces/discrete_state_environment.py @@ -8,7 +8,7 @@ from pathlib import Path from typing import TypeVar, List from dataclasses import dataclass -import multiprocessing as mp +import torch from src.spaces.actions import ActionBase, ActionType from src.spaces.time_step import TimeStep, StepType @@ -131,12 +131,20 @@ def get_action(self, aidx: int) -> ActionBase: return self.config.action_space[aidx] def save_current_dataset(self, episode_index: int, save_index: bool = False) -> None: + """Save the current distorted dataset for the given episode index + + Parameters + ---------- + + episode_index: The epsidoe index + save_index: Flad indicating if the row index should be output as well + + Returns + ------- + + None """ - Save the current distorted datase for the given episode index - :param episode_index: - :param save_index: - :return: - """ + self.distorted_data_set.save_to_csv( filename=Path(str(self.config.distorted_set_path) + "_" + str(episode_index)), save_index=save_index) @@ -351,49 +359,3 @@ def step(self, action: ActionBase) -> TimeStep: return self.current_time_step - -class MultiprocessEnv(object): - - def __init__(self, make_env_fn, make_env_kargs, seed, n_workers): - self.make_env_fn = make_env_fn - self.make_env_kargs = make_env_kargs - self.seed = seed - self.n_workers = n_workers - self.pipes = [mp.Pipe() for rank in range(self.n_workers)] - self.workers = [ - mp.Process(target=self.work, - args=(rank, self.pipes[rank][1])) for rank in range(self.n_workers)] - [w.start() for w in self.workers] - - def work(self, rank, worker_end): - env = self.make_env_fn(**self.make_env_kargs, seed=self.seed + rank) - while True: - cmd, kwargs = worker_end.recv() - if cmd == 'reset': - worker_end.send(env.reset(**kwargs)) - elif cmd == 'step': - worker_end.send(env.step(**kwargs)) - elif cmd == '_past_limit': - # Another way to check time limit truncation - worker_end.send(env._elapsed_steps >= env._max_episode_steps) - else: - env.close(**kwargs) - del env - worker_end.close() - break - - def step(self, actions): - assert len(actions) == self.n_workers - [self.send_msg(('step', {'action': actions[rank]}), rank) \ - for rank in range(self.n_workers)] - results = [] - for rank in range(self.n_workers): - parent_end, _ = self.pipes[rank] - o, r, d, _ = parent_end.recv() - if d: - self.send_msg(('reset', {}), rank) - o = parent_end.recv() - results.append((o, - np.array(r, dtype=np.float), - np.array(d, dtype=np.float), _)) - return [np.vstack(block) for block in np.array(results).T] diff --git a/src/spaces/state.py b/src/spaces/state.py index 50223ad..a6f2d55 100644 --- a/src/spaces/state.py +++ b/src/spaces/state.py @@ -109,6 +109,6 @@ def to_numpy(self) -> np.array: """ - vals = self.column_distortions.values() + vals = list(self.column_distortions.values()) return np.array(vals) diff --git a/src/spaces/tiled_environment.py b/src/spaces/tiled_environment.py index 5f1a6db..d5538c7 100644 --- a/src/spaces/tiled_environment.py +++ b/src/spaces/tiled_environment.py @@ -163,7 +163,6 @@ def build_tiles(self, next_tile_global_idx: int) -> int: """ return next_tile_global_idx - def _do_build_tile(self, action: int, next_local_tile_idx: int, next_tile_global_idx: int) -> tuple: @@ -611,12 +610,7 @@ def featurize_raw_state(self, state: RawState) -> TiledState: tiled_state = np.zeros(self.n_layers * self.n_actions * self.n_bins ** (len(self.column_ranges))) - found = False for layer in range(self.n_layers): - - #if found: - # break - for action in range(self.n_actions): global_idx = self.tiles[layer].get_global_tile_index(raw_state=state, action=action) if global_idx != INVALID_ID: diff --git a/src/utils/replay_buffer.py b/src/utils/replay_buffer.py new file mode 100644 index 0000000..bcdd5bb --- /dev/null +++ b/src/utils/replay_buffer.py @@ -0,0 +1,156 @@ +import random +from typing import Any, List +from collections import namedtuple, deque +import numpy as np +import torch + +from src.exceptions.exceptions import InvalidParamValue + +ExperienceTuple = namedtuple("ExperienceTuple", field_names=["state", "action", + "reward", "next_state", "done", "info"]) + + +class ReplayBuffer(object): + """The ReplayBuffer class. + Models a fixed size replay buffer. + The buffer is represented by using a deque from Python’s built-in collections library. + This is basically a list that we can set a maximum size. If we try to add a new element whilst the list + is already full, it will remove the first item in the list and add the new item to the end of the list. + Hence new experiences replace the oldest experiences. + The experiences themselves are tuples of (state1, reward, action, state2, done) that we append to the replay deque + and they are represented via the named tuple ExperienceTuple + """ + + TUPLE_NAMES = ["state", "action", "reward", "next_state", "done", "info"] + + def __init__(self, buffer_size: int): + """Constructor + + Parameters + ---------- + + buffer_size: The maximum capacity of the buffer + + """ + + self.capacity: int = buffer_size + self._memory = deque(maxlen=buffer_size) + + def __len__(self) -> int: + """ Return the current size of the internal memory. + + Returns + ------- + + """ + return len(self._memory) + + def __getitem__(self, name_attr: str) -> List: + """Return the full batch of the name_attr attribute + + Parameters + ---------- + name_attr: The name of the attribute to collect the + batch values + + Returns + ------- + + A list + """ + + if name_attr not in ReplayBuffer.TUPLE_NAMES: + raise InvalidParamValue(param_name=name_attr, param_value=name_attr) + + batch = [] + for item in self._memory: + + if name_attr == "action": + batch.append(item.action) + elif name_attr == "state": + batch.append(item.state) + elif name_attr == "next_state": + batch.append(item.next_state) + elif name_attr == "reward": + batch.append(item.reward) + elif name_attr == "done": + batch.append(item.done) + elif name_attr == "info": + batch.append(item.info) + + return batch + + def to_numpy(self, name_attr: str) -> np.array: + return np.array(self[name_attr]) + + def add(self, state: Any, action: Any, reward: float, + next_state: Any, done: bool, info: dict = {}) -> None: + """Add a new experience tuple in the buffer + + Parameters + ---------- + + state: The current state + action: The action taken + reward: The reward observed + next_state: The next state observed + done: Whether the episode is done + info: Any other info needed + + Returns + ------- + None + + """ + + e = ExperienceTuple(state, action, reward, next_state, done, info) + self._memory.append(e) + + def sample(self, batch_size: int) -> List[ExperienceTuple]: + """Randomly sample a batch of experiences from memory. + + Parameters + ---------- + + batch_size: The batch size we want to sample + + Returns + ------- + + A list of ExperienceTuple + """ + + return random.sample(self._memory, k=batch_size) + + def get_item_as_torch_tensor(self, name_attr) -> torch.Tensor: + """ Returns a torch.Tensor representation of the + the named item + + Parameters + ---------- + + name_attr: The name of the attribute + + Returns + ------- + + An instance of torch.Tensor + """ + + items = self[name_attr] + + # convert to np.array to avoid pytorch warning + return torch.Tensor(np.array(items)) + + def reinitialize(self) -> None: + """Reinitialize the internal buffer + + Returns + ------- + + None + + """ + + self._memory = deque(maxlen=self.capacity) +