In [2]:
import gymnasium as gym
from mujoco import viewer
from stable_baselines3 import SAC


  _warn(("h5py is running against HDF5 {0} when it was built against {1}, "


In [3]:
env = gym.make(
    "Ant-v4", ctrl_cost_weight=0.1, use_contact_forces=True, render_mode="human"
)

In [4]:
import torch
from torch import nn
from torch.distributions.normal import Normal
from transformers import DistilBertConfig, DistilBertModel




In [29]:
class CustomActorHead(nn.Module):
    def __init__(self, config, num_action=2):
        super().__init__()
        self.dim_action = num_action
        self.layer_one = nn.Sequential(
            nn.Linear(config.dim, config.num_labels),
            nn.Softmax(dim=1),
            nn.Linear(config.num_labels, self.dim_action * 2),
            nn.LeakyReLU(0.01),
        )

    def forward(self, state):
        # Assuming x is the output from DistilBert
        answer = self.layer_one(state)
        # answer = self.forward(state)
        mu, std = answer[:, : self.dim_action], answer[:, self.dim_action :]
        torch.clamp_(std, min=1e-6, max=1)
        predicted_gauss = Normal(mu, std)

        sample = predicted_gauss.sample()
        prob = predicted_gauss.log_prob(sample)
        return sample, prob


class CustomDistilBertModel(DistilBertModel):
    def __init__(self, config):
        super().__init__(config)
        self.custom_head = CustomActorHead(config)

    def forward(
        self, input_ids, attention_mask=None, head_mask=None, inputs_embeds=None
    ):
        outputs = super().forward(
            input_ids,
            attention_mask=attention_mask,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
        )
        # Assuming the last hidden state is used for the head
        last_hidden_state = outputs[0]
        # Apply your custom head
        logits = self.custom_head(last_hidden_state)
        return logits

In [30]:
bert_config = DistilBertConfig(
    vocab_size=10000,
    hidden_size=16,
    num_hidden_layers=2,
    num_attention_heads=4,
    intermediate_size=100,
    hidden_act="gelu",
    hidden_dropout_prob=0.1,
    attention_probs_dropout_prob=0.1,
    max_position_embeddings=50,
    type_vocab_size=2,
    initializer_range=0.02,
    layer_norm_eps=1e-12,
    pad_token_id=0,
    position_embedding_type="absolute",
    use_cache=True,
    classifier_dropout=None,
)

In [31]:
# config = DistilBertConfig.from_pretrained('distilbert-base-uncased')
model = CustomDistilBertModel(bert_config)


In [32]:
config.dim

768

In [33]:
out = model(
    input_ids=torch.tensor([[1, 1, 1, 1, 1], [1, 1, 1, 1, 1]]),
    attention_mask=torch.tensor([[1, 1, 1, 1, 1], [1, 1, 1, 1, 1]]),
    # **inputs,
)  # type: ignore


RuntimeError: The size of tensor a (2) must match the size of tensor b (3) at non-singleton dimension 1

In [None]:
out

tensor([[[0.2658, 0.1635],
         [0.1348, 0.2985],
         [0.2441, 0.1565],
         [0.2240, 0.1594],
         [0.1313, 0.2221]],

        [[0.2848, 0.1719],
         [0.1523, 0.1742],
         [0.2475, 0.1555],
         [0.2039, 0.1654],
         [0.1115, 0.3331]]], grad_fn=<SoftmaxBackward0>)

In [None]:
class CustomHead(nn.Module):
    def __init__(self, config):
        super(CustomHead, self).__init__()
        self.linear = nn.Linear(config.dim, 2 *16)
        # self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        # Assuming x is the output from DistilBert
        x = self.linear(x)
        # return self.softmax(x)
        return x


class CustomDistilBertModel(DistilBertModel):
    def __init__(self, config):
        super(CustomDistilBertModel, self).__init__(config)
        self.custom_head = CustomHead(config)
    
    def forward(self, input_ids, attention_mask=None, head_mask=None, inputs_embeds=None):
        outputs = super().forward(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds)
        # Assuming the last hidden state is used for the head
        last_hidden_state = outputs[0]
        # Apply your custom head
        logits = self.custom_head(last_hidden_state)
        return logits


In [None]:
# config = DistilBertConfig.from_pretrained('distilbert-base-uncased')
model = CustomDistilBertModel(bert_config)


In [None]:
out = model(
    input_ids=torch.tensor([[1, 1, 1, 1, 1], [1, 1, 1, 1, 1]]),
    attention_mask=torch.tensor([[1, 1, 1, 1, 1], [1, 1, 1, 1, 1]]),
    # **inputs,
)  # type: ignore


In [None]:
out

tensor([[[-9.2073e-02, -4.5514e-02,  3.4229e-01,  6.7587e-01,  3.6829e-01,
          -5.3154e-01,  9.8220e-02,  7.2635e-01, -1.2696e+00,  4.3517e-01,
           8.1557e-01, -9.3974e-01,  2.1697e-01,  6.5389e-01,  2.9961e-01,
           3.5622e-01,  2.0894e-01, -7.0037e-01, -5.0135e-01, -1.4976e-01,
           1.6277e-01,  5.0670e-01,  8.8279e-01, -5.1181e-01,  8.6238e-01,
          -1.1180e+00,  7.6160e-01,  1.8453e-01,  4.6870e-01,  4.4835e-01,
          -4.1612e-01, -1.1636e+00],
         [-5.0892e-02, -2.8898e-01,  9.4392e-01,  3.0855e-01,  4.2375e-01,
           5.3946e-01, -8.1204e-01,  4.4666e-01, -5.9119e-01,  5.8833e-01,
           1.1808e-01, -6.2132e-01,  4.9074e-01,  1.0295e+00, -2.3458e-01,
           7.0849e-01, -9.1251e-01, -3.7691e-01,  5.5013e-01,  1.1712e-01,
           1.4917e-02,  8.2145e-01,  6.3168e-01, -9.0018e-01,  7.5450e-02,
          -1.3444e+00,  1.5304e+00,  9.6531e-01,  1.1018e+00, -2.8351e-01,
           8.8587e-02, -1.2422e+00],
         [ 6.9276e-01, -6.

In [None]:
class CustomModel(nn.Module):
    def __init__(self, config, action_space):
        super(CustomModel, self).__init__()
        self.distilbert = DistilBertModel(config)
        self.fc = nn.Linear(config.dim, 2 * action_space)
    
    def forward(self, input_ids, attention_mask=None, head_mask=None, inputs_embeds=None):
        outputs = self.distilbert(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds)
        # Assuming the last hidden state is used for the head
        last_hidden_state = outputs[0]
        # Apply the fully connected layer
        logits = self.fc(last_hidden_state)
        return logits


In [None]:
config = DistilBertConfig.from_pretrained('distilbert-base-uncased')
action_space = 10 # Example action space size
model = CustomModel(config, action_space)


In [None]:
out = model(
    input_ids=torch.tensor([[1, 1, 1, 1, 1]]),
    attention_mask=torch.tensor([[1, 1, 1, 1, 1]]),
    # **inputs,
)  # type: ignore


RuntimeError: The size of tensor a (2) must match the size of tensor b (3) at non-singleton dimension 1

In [9]:
out.size()

NameError: name 'out' is not defined

In [144]:
# Custom actor (pi) and value function (vf) networks
# of two layers of size 32 each with Relu activation function
# Note: an extra linear layer will be added on top of the pi and the vf nets, respectively

config = DistilBertConfig.from_pretrained("distilbert-base-uncased")
policy_kwargs = {
    "features_extractor_class": DistilBertModel(config),
    # features_extractor_kwargs=dict(config),
    "activation_fn": nn.ReLU,
    "net_arch": {"pi": [32, 32], "qf": [32, 32]},
}
# Create the agent
env = gym.make(
    "Ant-v4", ctrl_cost_weight=0.1, use_contact_forces=True, render_mode="human"
)
model = SAC("MlpPolicy", env, policy_kwargs=policy_kwargs)  # , verbose=1
# Retrieve the environment
# env = model.get_env()
# Train the agent
model.learn(total_timesteps=20_000)
# Save the agent
model.save("ppo_cartpole")

# del model
# the policy_kwargs are automatically loaded
# model = SAC.load("ppo_cartpole", env=env)

TypeError: 'Box' object is not subscriptable

In [56]:
class CustomModel(nn.Module):
    def __init__(self):
        self.bert_config = DistilBertConfig(
            vocab_size=10000,
            hidden_size=16,
            num_hidden_layers=2,
            num_attention_heads=4,
            intermediate_size=100,
            hidden_act="gelu",
            hidden_dropout_prob=0.1,
            attention_probs_dropout_prob=0.1,
            max_position_embeddings=50,
            type_vocab_size=2,
            initializer_range=0.02,
            layer_norm_eps=1e-12,
            pad_token_id=0,
            position_embedding_type="absolute",
            use_cache=True,
            classifier_dropout=None,
        )
        super(CustomModel, self).__init__()
        self.distilbert_1 = DistilBertModel(self.bert_config)
        self.distilbert_2 = DistilBertModel(self.bert_config)

    def forward(
        self, input_ids, num_struct_elements=9, attention_mask=None, components_mask=None
    ):
        outputs_1 = self.distilbert_1(
            input_ids,
            attention_mask=attention_mask,
        )

        last_hidden_state_1 = outputs_1['last_hidden_state']

        input_2 = torch.sum(last_hidden_state_1, axis=2) 
        input_2.mul_(components_mask)# summing through columns
        input_2 = torch.sum(input_2, axis=0)
        ones_vector = torch.ones(num_struct_elements, 1)
        input_2 = ones_vector @ input_2.view(1, input_2.size()[0])
        outputs_2 = self.distilbert_2(
            input_ids,
            attention_mask=attention_mask,
        )
        last_hidden_state_2 = outputs_2['last_hidden_state']

        input_2 = torch.sum(last_hidden_state_2, axis=2)
        input_2.mul_(components_mask)
         # summing through columns
        input_2 = torch.sum(input_2, axis=0)
        return input_2



In [73]:
class QHead(nn.Module):
    def __init__(self, size_input, size_action):
        super(QHead, self).__init__()
        self.size_input = size_input
        self.size_action = size_action
        self.layer = nn.Sequential(
            nn.Linear(self.size_input, self.size_action),
            nn.LeakyReLU(0.01),
            nn.Linear(self.size_action, 1),
        )

    def forward(self, input):
        return self.layer(input)


class PiHead(nn.Module):
    def __init__(self, size_input, size_action):
        super(PiHead, self).__init__()
        self.size_input = size_input
        self.size_action = size_action
        self.layer = nn.Sequential(
            nn.Linear(self.size_input, self.size_action),
            nn.LeakyReLU(0.01),
        )

        self.last_lin = nn.Linear(1, 2)

    def forward(self, input):
        output_1 = self.layer(input)
        # ones_vector = torch.ones(self.size_action, 1)
        # output_1 = ones_vector @ output_1.view(1, self.size_action)
        return self.last_lin(output_1.view(self.size_action, 1))


In [74]:
v = CustomModel()

In [75]:
out = v(
    input_ids=torch.tensor([[190, 1, 80, 50, 1300], [190, 1, 80, 50, 1300]]),
    num_struct_elements=5,
    attention_mask=torch.tensor([[1, 1, 1, 1, 1], [1, 1, 1, 1, 1]]),
    components_mask=torch.tensor([[1, 1, 1, 0, 0], [0, 0, 0, 1, 1]]),
)  # type: ignore


In [76]:
out

tensor([ 5.9605e-07,  4.1723e-07, -4.2468e-07,  1.1921e-07, -2.3842e-07],
       grad_fn=<SumBackward1>)

In [77]:
b = PiHead(5, 3)
n = b(out)
n

tensor([[-0.8198, -0.7539],
        [-0.8623, -0.8571],
        [-0.7876, -0.6756]], grad_fn=<AddmmBackward0>)