In [1]:
# using ray 1.9 to run
# python 3.9

from ray.rllib.agents.ppo.ppo_torch_policy import PPOTorchPolicy
from ray.rllib.agents.a3c.a3c_torch_policy import A3CTorchPolicy
from ray.rllib.agents.a3c.a2c import A2CTrainer
from ray.rllib.agents.ppo import PPOTrainer
import gym
import ray.tune as tune
from torch.nn import functional as F
from typing import Optional, Dict
import torch.nn as nn
import ray
from collections import deque
#from ray.rllib.agents.ppo.ppo_torch_policy import ValueNetworkMixin
from ray.rllib.evaluation.episode import MultiAgentEpisode
from ray.rllib.evaluation.postprocessing import compute_gae_for_sample_batch, \
    Postprocessing
from ray.rllib.models.action_dist import ActionDistribution
from ray.rllib.models.modelv2 import ModelV2
#from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.policy_template import build_policy_class
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import Deprecated
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.torch_ops import apply_grad_clipping, sequence_mask
from ray.rllib.utils.typing import TrainerConfigDict, TensorType, \
    PolicyID, LocalOptimizer
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
import copy
import numpy as np
import sys
sys.path.append('../src')
torch, nn = try_import_torch()
from cache_guessing_game_env_impl import *




In [2]:
from cache_guessing_game_env_impl import *

def u_init(policy: Policy, obs_space: gym.spaces.Space, 
              action_space: gym.spaces.Space, config: TrainerConfigDict)->None:
        policy.past_len = 5        
        policy.past_models = deque(maxlen =policy.past_len)
        policy.timestep = 0
    

In [3]:
# need a customized model that implemented __deepcopy__ properly
import torch
import torch.nn as nn
import torch.nn.functional as F

from typing import Optional
from ray.rllib.models import ModelCatalog

class ResidualBlock(nn.Module):
    def __init__(self, dim: int) -> None:
        super(ResidualBlock, self).__init__()

        self.dim = dim
        layers = []
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.dim, self.dim))
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.dim, self.dim))
        self.layers = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x + self.layers(x)

class DNNEncoder(nn.Module):
    def __init__(self,
                 input_dim: int,
                 hidden_dim: int,
                 output_dim: int,
                 num_blocks: Optional[int] = 1) -> None:
        super(DNNEncoder, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_blocks = num_blocks

        layers = []
        layers.append(nn.Linear(self.input_dim, self.hidden_dim))
        for _ in range(self.num_blocks):
            layers.append(ResidualBlock(self.hidden_dim))
        layers.append(nn.ReLU())
        layers.append(nn.Linear(self.hidden_dim, self.output_dim))
        self.layers = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.layers(x)


class TestModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        hidden_dim = 256 
        super(TestModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)
        nn.Module.__init__(self)
        self.a_model = nn.Sequential(
            
            DNNEncoder(
                input_dim=int(np.product(self.obs_space.shape)),
                hidden_dim=hidden_dim,
                output_dim=hidden_dim,
            ),
            nn.Linear(hidden_dim, num_outputs)
        )
        self.v_model = nn.Sequential(
            DNNEncoder(
                input_dim=int(np.product(self.obs_space.shape)),
                hidden_dim=hidden_dim,
                output_dim=hidden_dim,
            ),
            nn.Linear(hidden_dim, 1)
        )
        self.obs_space = obs_space
        self.action_space = action_space
        self.num_outputs = num_outputs
        self.model_config = model_config
        self.name = name
        
        self.past_len = 5
        self.past_models = deque(maxlen=self.past_len)
        self.past_mean_rewards = deque(maxlen=self.past_len)
        self._last_flat_in = None
        
    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        copied_model = TestModel(self.obs_space, self.action_space, self.num_outputs, self.model_config, self.name+'_copy')
        copied_model.a_model.load_state_dict(self.a_model.state_dict())
        copied_model.v_model.load_state_dict(self.v_model.state_dict())
        return copied_model
        
    def forward(self, input_dict, state, seq_lens):
        #if obs[-1] > 0.99:
        #    self.recent_model.append((copy.deepcopy(self.a_model), copy.deepcopy(self.v_model)))
        #    if len(self.recent_model) > 5:
        #        self.recent_model.pop()
        obs = input_dict["obs_flat"].float()
        return self._forward(obs, input_dict, state, seq_lens)

    def _forward(self, obs, input_dict, state, seq_lens):
        self._last_flat_in = obs.reshape(obs.shape[0], -1)
        self._output = self.a_model(self._last_flat_in)
        return self._output, state 

    def value_function(self):
        return self.v_model(self._last_flat_in).squeeze(1)
    
ModelCatalog.register_custom_model("test_model", TestModel)
def make_model(policy: Policy, 
               obs_space: gym.spaces.Space,
               action_space: gym.spaces.Space, 
               model_config: TrainerConfigDict):
    new_model = TestModel(obs_space, action_space, np.prod(action_space.shape), model_config, 'test')
    return new_model

In [4]:
def copy_model(model: ModelV2) -> ModelV2:
    copdied_model= TorchModelV2(
        obs_space = model.obs_space,
        action_space = model.action_space, 
        num_outputs = model.num_outputs,
        model_config = model.model_config,
        name = 'copied')
    
    return copied_model

In [5]:
def compute_div_loss(policy: Policy, model: ModelV2,
                      dist_class: ActionDistribution,
                      train_batch: SampleBatch):
    logits, _ = model.from_batch(train_batch)
    values = model.value_function()
    valid_mask = torch.ones_like(values, dtype=torch.bool)
    dist = dist_class(logits, model)
    log_probs = dist.logp(train_batch[SampleBatch.ACTIONS]).reshape(-1)
    
    divs = []
    for idx, past_model in enumerate(policy.past_models):
        logits, _ = past_model.from_batch(train_batch)
        values = past_model.value_function()
        valid_mask = torch.ones_like(values, dtype=torch.bool)
        dist = dist_class(logits, past_model)
        past_log_probs = dist.logp(train_batch[SampleBatch.ACTIONS]).reshape(-1) 
        div = div_metric(log_probs, past_log_probs).sum(1)
        div = div.mean(0)
        divs.append(div)
    
    divs_sort_idx = np.argsort([d.data[0] for d in divs])
    div_loss_orig = 0
    for idx in divs_sort_idx:
        div_loss += divs[idx]
        div_loss_orig += divs[idx]
    
    div_loss = div_loss / self.past_len
    
    return div_loss

In [6]:
import pickle
def custom_loss(policy: Policy, model: ModelV2,
                      dist_class: ActionDistribution,
                      train_batch: SampleBatch) -> TensorType:
    logits, _ = model.from_batch(train_batch)
    values = model.value_function()
    policy.timestep += 1
    #if len(policy.devices) > 1:
        # copy weights of main model (tower-0) to all other towers type
    #if policy.timestep % 10 == 0:
    copied_model = pickle.loads(pickle.dumps(model))
    copied_model.load_state_dict(model.state_dict())
        #policy.past_models.append(copied_model)
        #policy.past_models.append(copy.deepcopy(model))
        #policy.past_models.append(copy.copy(model))
    
    if policy.is_recurrent():
        B = len(train_batch[SampleBatch.SEQ_LENS])
        max_seq_len = logits.shape[0] // B
        mask_orig = sequence_mask(train_batch[SampleBatch.SEQ_LENS],
                                  max_seq_len)
        valid_mask = torch.reshape(mask_orig, [-1])
    else:
        valid_mask = torch.ones_like(values, dtype=torch.bool)
    dist = dist_class(logits, model)
    log_probs = dist.logp(train_batch[SampleBatch.ACTIONS]).reshape(-1)
    
    #print('log_probs')
    #print(log_probs)
    
    pi_err = -torch.sum(
        torch.masked_select(log_probs * train_batch[Postprocessing.ADVANTAGES],
                            valid_mask))
    # Compute a value function loss.
    if policy.config["use_critic"]:
        value_err = 0.5 * torch.sum(
            torch.pow(
                torch.masked_select(
                    values.reshape(-1) -
                    train_batch[Postprocessing.VALUE_TARGETS], valid_mask),
                2.0))
    # Ignore the value function.
    else:
        value_err = 0.0
    entropy = torch.sum(torch.masked_select(dist.entropy(), valid_mask))
    div_loss = compute_div_loss(policy, model, dist_class, train_batch)
    total_loss = (pi_err + value_err * policy.config["vf_loss_coeff"] -
                  entropy * policy.config["entropy_coeff"] - 1000 * div_loss )
    print('pi_err')
    print(pi_err)
    print('value_err')
    print(value_err)
    print('div_loss')
    print(div_loss)
    
    # Store values for stats function in model (tower), such that for
    # multi-GPU, we do not override them during the parallel loss phase.
    model.tower_stats["entropy"] = entropy
    model.tower_stats["pi_err"] = pi_err
    model.tower_stats["value_err"] = value_err
    return total_loss

In [7]:
CustomPolicy = A3CTorchPolicy.with_updates(
    name="MyCustomA3CTorchPolicy",
    loss_fn=custom_loss,
    #make_model= make_model,
    before_init=custom_init)
CustomTrainer = A2CTrainer.with_updates(
    get_policy_class=lambda _: CustomPolicy)

NameError: name 'custom_init' is not defined

In [None]:
#tune.run(CustomTrainer, config={"env": 'Frostbite-v0', "num_gpus":1})

#tune.run(CustomTrainer, config={"env": 'Frostbite-v0', "num_gpus":0})#, 'model': { 'custom_model': 'test_model' }})
tune.register_env("cache_guessing_game_env_fix", CacheGuessingGameEnv)#Fix)
# Two ways of training
# method 2b
config = {
    'env': 'cache_guessing_game_env_fix', #'cache_simulator_diversity_wrapper',
    'env_config': {
        'verbose': 1,
        "force_victim_hit": False,
        'flush_inst': False,
        "allow_victim_multi_access": False,
        "attacker_addr_s": 0,
        "attacker_addr_e": 3,
        "victim_addr_s": 0,
        "victim_addr_e": 1,
        "reset_limit": 1,
        "cache_configs": {
                # YAML config file for cache simulaton
            "architecture": {
              "word_size": 1, #bytes
              "block_size": 1, #bytes
              "write_back": True
            },
            "cache_1": {#required
              "blocks": 2, 
              "associativity": 2,  
              "hit_time": 1 #cycles
            },
            "mem": {#required
              "hit_time": 1000 #cycles
            }
        }
    }, 
    #'gamma': 0.9, 
    'num_gpus': 1, 
    'num_workers': 1, 
    'num_envs_per_worker': 1, 
    #'entropy_coeff': 0.001, 
    #'num_sgd_iter': 5, 
    #'vf_loss_coeff': 1e-05, 
    'model': {
        'custom_model': 'test_model',#'rnn', 
        #'max_seq_len': 20, 
        #'custom_model_config': {
        #    'cell_size': 32
        #   }
    }, 
    'framework': 'torch',
}
tune.run(CustomTrainer, config=config)#config={"env": 'Freeway-v0', "num_gpus":1})

