In [20]:
import torch
import torch.nn as nn

import transformers

from decision_transformer.models.model import TrajectoryModel
from decision_transformer.models.trajectory_gpt2 import GPT2Model
import math
import numpy as np
import torch.nn.functional as F
from torch import distributions as pyd


class TanhTransform(pyd.transforms.Transform):
    domain = pyd.constraints.real
    codomain = pyd.constraints.interval(-1.0, 1.0)
    bijective = True
    sign = +1

    def __init__(self, cache_size=1):
        super().__init__(cache_size=cache_size)

    @staticmethod
    def atanh(x):
        return 0.5 * (x.log1p() - (-x).log1p())

        # atanh 함수는 [-1, 1] 범위 밖의 값에 대해 정의되지 않으므로, 입력을 이 범위 내로 제한
        eps = 1e-6  # 소수값을 추가하여 -1과 1에서의 수치적 불안정성을 방지
        return 0.5 * (x.clamp(-1 + eps, 1 - eps).log1p() - (-x).clamp(-1 + eps, 1 - eps).log1p())


    def __eq__(self, other):
        return isinstance(other, TanhTransform)

    def _call(self, x):
        return x.tanh()

    def _inverse(self, y):
        # We do not clamp to the boundary here as it may degrade the performance of certain algorithms.
        # one should use `cache_size=1` instead
        return self.atanh(y)

    def log_abs_det_jacobian(self, x, y):
        # We use a formula that is more numerically stable, see details in the following link
        # https://github.com/tensorflow/probability/commit/ef6bb176e0ebd1cf6e25c6b5cecdd2428c22963f#diff-e120f70e92e6741bca649f04fcd907b7
        return 2.0 * (math.log(2.0) - x - F.softplus(-2.0 * x))


class SquashedNormal(pyd.transformed_distribution.TransformedDistribution):
    """
    Squashed Normal Distribution(s)

    If loc/std is of size (batch_size, sequence length, d),
    this returns batch_size * sequence length * d
    independent squashed univariate normal distributions.
    """

    def __init__(self, loc, std):
        self.loc = loc
        self.std = std
        self.base_dist = pyd.Normal(loc, std)

        transforms = [TanhTransform()]

        super().__init__(self.base_dist, transforms)

    @property
    def mean(self):
        mu = self.loc
        for tr in self.transforms:
            mu = tr(mu)
        return mu

    def entropy(self, N=1):
        # sample from the distribution and then compute
        # the empirical entropy:
        x = self.rsample((N,))
        log_p = self.log_prob(x)

        # log_p: (batch_size, context_len, action_dim),
        return -log_p.mean(axis=0).sum(axis=2)

    def log_likelihood(self, x):
        # log_prob(x): (batch_size, context_len, action_dim)
        # sum up along the action dimensions
        # Return tensor shape: (batch_size, context_len)
        
        x = self.transforms[0](x)        
        return self.log_prob(x).sum(axis=2)

        # transform x to the original distribution
        for tr in reversed(self.transforms):
            x = tr.inv(x)
        return self.base_dist.log_prob(x).sum(axis=2)
        return self.log_prob(x).sum(axis=2)



class DiagGaussianActor(nn.Module):
    """torch.distributions implementation of an diagonal Gaussian policy."""

    def __init__(self, hidden_dim, act_dim, log_std_bounds=[-5.0, 2.0], transform_type = 'tanh', value_range = [-1.0, 1.0]):
        super().__init__()

        self.mu = torch.nn.Linear(hidden_dim, act_dim)
        self.log_std = torch.nn.Linear(hidden_dim, act_dim)
        self.log_std_bounds = log_std_bounds
        self.transform_type = transform_type
        self.value_range = value_range
        def weight_init(m):
            """Custom weight init for Conv2D and Linear layers."""
            if isinstance(m, torch.nn.Linear):
                nn.init.orthogonal_(m.weight.data)
                if hasattr(m.bias, "data"):
                    m.bias.data.fill_(0.0)

        self.apply(weight_init)

    def forward(self, obs):
        mu, log_std = self.mu(obs), self.log_std(obs)
        log_std = torch.tanh(log_std)
        # log_std is the output of tanh so it will be between [-1, 1]
        # map it to be between [log_std_min, log_std_max]
        log_std_min, log_std_max = self.log_std_bounds
        log_std = log_std_min + 0.5 * (log_std_max - log_std_min) * (log_std + 1.0)
        std = log_std.exp()
        return SquashedNormal(mu, std)

predict_state = DiagGaussianActor(512, 4, transform_type='tanh')
obs = torch.randn(256,1, 512)
predict_s = predict_state(obs)  
print(predict_s.log_likelihood(torch.ones(1, 4)*2))

tensor([[-5.3787e+00],
        [-1.6671e+02],
        [-1.0803e+04],
        [-4.4501e+04],
        [-2.0007e+04],
        [-8.3776e+04],
        [-4.2887e+03],
        [-3.3194e+01],
        [-3.4077e+03],
        [-3.8216e+04],
        [-4.2032e+03],
        [-1.5372e+05],
        [-1.8730e+04],
        [-9.0873e+00],
        [-4.5264e+04],
        [-5.1000e+04],
        [-5.8880e+03],
        [-6.3658e+04],
        [-6.3684e+03],
        [-7.7166e+03],
        [-1.5558e+04],
        [-3.7459e+02],
        [-1.0210e+02],
        [-2.9861e+03],
        [-8.5973e+03],
        [-7.7735e+03],
        [-3.4323e+03],
        [-1.1128e+03],
        [-7.8199e+03],
        [-2.3401e+04],
        [-1.7543e+04],
        [-1.9214e+02],
        [-2.3474e+03],
        [-1.4360e+05],
        [-6.5525e+02],
        [-2.5245e+01],
        [-1.0918e+02],
        [-3.3024e+03],
        [-2.9143e+03],
        [-2.2422e+02],
        [-7.7976e+03],
        [-1.6789e+01],
        [-3.4572e+04],
        [-1

torch.Size([1, 4])
tensor([[1.1000, 1.1000, 1.1000, 1.1000]])


ValueError: The value argument must be within the support