In [None]:
from abc import ABC, abstractmethod


class EpsilonAnneal(ABC):
    @abstractmethod
    def anneal(self):
        pass


class Constant(EpsilonAnneal):
    def __init__(self, start):
        self.val = start

    def anneal(self):
        pass


class LinearAnneal(EpsilonAnneal):
    """Linear Annealing Schedule.

    Args:
        start:      The initial value of epsilon.
        end:        The final value of epsilon.
        duration:   The number of anneals from start value to end value.

    """

    def __init__(self, start: float, end: float, duration: int):
        self.val = start
        self.min = end
        self.duration = duration

    def anneal(self):
        self.val = max(self.min, self.val - (self.val - self.min) / self.duration)

eps = LinearAnneal(1.0, 0.1, 2000000 // 10)
eps.min

0.1

In [None]:
eps.anneal()
eps.val

0.999424182841602

In [None]:
import torch

history_len = 5

torch.triu(torch.ones(history_len, history_len), diagonal=1)

tensor([[0., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1.],
        [0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0.]])

In [None]:
nn.GELU()

GELU(approximate='none')

In [None]:
from torch import nn

embed_size = 512
num_heads = 8
dropout = 0

attention = nn.MultiheadAttention(
    embed_dim=embed_size,
    num_heads=num_heads,
    dropout=dropout,
    batch_first=True,
)

attention

MultiheadAttention(
  (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
)

In [None]:
import torch

# path = '/data/kimgh/workspace/DTQN/policies/DTQN-test-layers_8/gv_memory.9x9.yaml/model=DTQN_envs=gv_memory.9x9.yaml_obs_embed=8_a_embed=0_in_embed=128_context=100_heads=8_layers=8_batch=32_gate=res_identity=False_history=100_pos=learned_bag=0_seed=1'
# path = '/data/kimgh/workspace/DTQN/policies/DTQN-test-steps_4M/gv_memory.5x5.yaml/model=DTQN_envs=gv_memory.5x5.yaml_obs_embed=8_a_embed=0_in_embed=128_context=50_heads=8_layers=2_batch=32_gate=res_identity=False_history=50_pos=learned_bag=0_seed=1'
path = '/data/kimgh/workspace/DTQN/policies/DTQN-test-custom/gv_memory.21x21.yaml/model=DTQN_envs=gv_memory.21x21.yaml_obs_embed=8_a_embed=0_in_embed=256_context=200_heads=16_layers=16_batch=64_gate=res_identity=False_history=200_pos=learned_bag=0_seed=1'

weight = torch.load(path)

for i, (k, v) in enumerate(weight.items()):
    print(i, k, v.shape)
    
# 총 가중치 개수 계산
total_params = sum(param.numel() for param in weight.values())
print('')
print(total_params)

  weight = torch.load(path)


0 obs_embedding.observation_embedding.0.weight torch.Size([23, 8])
1 obs_embedding.observation_embedding.2.weight torch.Size([256, 48])
2 obs_embedding.observation_embedding.2.bias torch.Size([256])
3 position_embedding.position_encoding torch.Size([1, 200, 256])
4 transformer_layers.0.attn_mask torch.Size([200, 200])
5 transformer_layers.0.layernorm1.weight torch.Size([256])
6 transformer_layers.0.layernorm1.bias torch.Size([256])
7 transformer_layers.0.layernorm2.weight torch.Size([256])
8 transformer_layers.0.layernorm2.bias torch.Size([256])
9 transformer_layers.0.attention.in_proj_weight torch.Size([768, 256])
10 transformer_layers.0.attention.in_proj_bias torch.Size([768])
11 transformer_layers.0.attention.out_proj.weight torch.Size([256, 256])
12 transformer_layers.0.attention.out_proj.bias torch.Size([256])
13 transformer_layers.0.ffn.0.weight torch.Size([1024, 256])
14 transformer_layers.0.ffn.0.bias torch.Size([1024])
15 transformer_layers.0.ffn.2.weight torch.Size([256, 1024

In [None]:
import torch.nn.functional as F

F.gelu(torch.tensor([1.0, 2.0, 3.0]))

tensor([0.8413, 1.9545, 2.9959])

In [None]:
import torch.nn.functional as F

F.gelu(torch.tensor([1.0, 2.0, 3.0]))

tensor([0.8413, 1.9545, 2.9959])

: 

In [1]:
import gym
from gym import spaces
from gym.wrappers.time_limit import TimeLimit
import numpy as np
from typing import Union

try:
    from gym_gridverse.gym import GymEnvironment
    from gym_gridverse.envs.yaml.factory import factory_env_from_yaml
    from gym_gridverse.outer_env import OuterEnv
    from gym_gridverse.representations.observation_representations import (
        make_observation_representation,
    )
    from gym_gridverse.representations.state_representations import (
        make_state_representation,
    )
except ImportError:
    print(
        f"WARNING: ``gym_gridverse`` is not installed. This means you cannot run an experiment with the `gv_*` domains."
    )
    GymEnvironment = None
from envs.gv_wrapper import GridVerseWrapper
import os
from enum import Enum
from typing import Tuple

from utils.random import RNG


def make_env(id_or_path: str) -> GymEnvironment:
    """Makes a GV gym environment."""
    try:
        print("Loading using gym.make")
        env = gym.make(id_or_path)

    except gym.error.Error:
        print(f"Environment with id {id_or_path} not found.")
        print("Loading using YAML")
        inner_env = factory_env_from_yaml(
            os.path.join(os.getcwd(), "envs", "gridverse", id_or_path)
        )
        state_representation = make_state_representation(
            "default", inner_env.state_space
        )
        observation_representation = make_observation_representation(
            "default", inner_env.observation_space
        )
        outer_env = OuterEnv(
            inner_env,
            state_representation=state_representation,
            observation_representation=observation_representation,
        )
        env = GymEnvironment(outer_env)
        env = TimeLimit(GridVerseWrapper(env), max_episode_steps=250)

    return env


class ObsType(Enum):
    DISCRETE = 0
    CONTINUOUS = 1
    IMAGE = 2


def get_env_obs_type(env: gym.Env) -> int:
    obs_space = env.observation_space
    sample_obs = env.reset()
    # Check for image first
    if (
        (isinstance(sample_obs, np.ndarray) and len(sample_obs.shape) == 3)
        and isinstance(obs_space, spaces.Box)
        and np.all(obs_space.low == 0)
        and np.all(obs_space.high == 255)
    ):
        return ObsType.IMAGE
    elif isinstance(
        obs_space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)
    ):
        return ObsType.DISCRETE
    else:
        return ObsType.CONTINUOUS


def get_env_obs_length(env: gym.Env) -> int:
    """Gets the length of the observations in an environment"""
    if get_env_obs_type(env) == ObsType.IMAGE:
        return env.reset().shape
    elif isinstance(env.observation_space, gym.spaces.Discrete):
        return 1
    elif isinstance(env.observation_space, (gym.spaces.MultiDiscrete, gym.spaces.Box)):
        if len(env.observation_space.shape) != 1:
            raise NotImplementedError(f"We do not yet support 2D observation spaces")
        return env.observation_space.shape[0]
    elif isinstance(env.observation_space, spaces.MultiBinary):
        return env.observation_space.n
    else:
        raise NotImplementedError(f"We do not yet support {env.observation_space}")


def get_env_obs_mask(env: gym.Env) -> Union[int, np.ndarray]:
    """Gets the number of observations possible (for discrete case).
    For continuous case, please edit the -5 to something lower than
    lowest possible observation (while still being finite) so the
    network knows it is padding.
    """
    # Check image first
    if get_env_obs_type(env) == ObsType.IMAGE:
        return 0
    if isinstance(env.observation_space, gym.spaces.Discrete):
        return env.observation_space.n
    elif isinstance(env.observation_space, gym.spaces.MultiDiscrete):
        return max(env.observation_space.nvec) + 1
    elif isinstance(env.observation_space, gym.spaces.Box):
        # If you would like to use DTQN with a continuous action space, make sure this value is
        #       below the minimum possible observation. Otherwise it will appear as a real observation
        #       to the network which may cause issues. In our case, Car Flag has min of -1 so this is
        #       fine.
        return -5
    else:
        raise NotImplementedError(f"We do not yet support {env.observation_space}")


def get_env_max_steps(env: gym.Env) -> Union[int, None]:
    """Gets the maximum steps allowed in an episode before auto-terminating"""
    try:
        return env._max_episode_steps
    except AttributeError:
        try:
            return env.max_episode_steps
        except AttributeError:
            return None


  import pkg_resources




In [44]:
env = make_env('gv_memory.5x5.yaml')

Loading using gym.make
Environment with id gv_memory.5x5.yaml not found.
Loading using YAML


  deprecation(


In [53]:
env.state_space

Dict('agent': Box([-1. -1.  0.  0.  0.  0.], 1.0, (6,), float64), 'agent_id_grid': Box(0, 1, (5, 5), int64), 'grid': Box(0, [[[10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]]

 [[10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]]

 [[10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]]

 [[10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]]

 [[10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]
  [10  1  4]]], (5, 5, 3), int64), 'item': Box(0, [10  1  4], (3,), int64))

In [54]:
import inspect

obj = env  # 실제 객체 할당
methods = inspect.getmembers(obj, predicate=inspect.ismethod)
for name, method in methods:
    print(f"Method name: {name}, Method: {method}")


Method name: __class_getitem__, Method: <bound method Generic.__class_getitem__ of <class 'gym.wrappers.time_limit.TimeLimit'>>
Method name: __enter__, Method: <bound method Env.__enter__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: __exit__, Method: <bound method Env.__exit__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: __getattr__, Method: <bound method Wrapper.__getattr__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: __init__, Method: <bound method TimeLimit.__init__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: __init_subclass__, Method: <bound method Env.__init_subclass__ of <class 'gym.wrappers.time_limit.TimeLimit'>>
Method name: __repr__, Method: <bound method Wrapper.__repr__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: __str__, Method: <bound method Wrapper.__str__ of <TimeLimit<GridVerseWrapper<GymEnvironment instance>>>>
Method name: class_nam

In [56]:
help(env)

Help on TimeLimit in module gym.wrappers.time_limit object:

class TimeLimit(gym.core.Wrapper)
 |  TimeLimit(env: gym.core.Env, max_episode_steps: Optional[int] = None, new_step_api: bool = False)
 |  
 |  This wrapper will issue a `truncated` signal if a maximum number of timesteps is exceeded.
 |  
 |  If a truncation is not defined inside the environment itself, this is the only place that the truncation signal is issued.
 |  Critically, this is different from the `terminated` signal that originates from the underlying environment as part of the MDP.
 |  
 |  (deprecated)
 |  This information is passed through ``info`` that is returned when `done`-signal was issued.
 |  The done-signal originates from the time limit (i.e. it signifies a *truncation*) if and only if
 |  the key `"TimeLimit.truncated"` exists in ``info`` and the corresponding value is ``True``. This will be removed in favour
 |  of only issuing a `truncated` signal in future versions.
 |  
 |  Example:
 |     >>> from