In [24]:
from rllib.agent import MPCAgent
from rllib.model import AbstractModel
from rllib.reward.utilities import tolerance
from rllib.environment import SystemEnvironment
from rllib.environment.systems import InvertedPendulum, GaussianNoiseSystem
from rllib.util.rollout import rollout_agent
import numpy as np
import torch
from torch.distributions import MultivariateNormal

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Define reward and dynamic model.

In [25]:
class PendulumSparseReward(AbstractModel):
    """Reward for Inverted Pendulum."""

    def __init__(self, action_cost=0):
        super().__init__(dim_state=(2,), dim_action=(1,), model_kind="rewards")
        self.action_cost = action_cost
        self.reward_offset = 0

    def forward(self, state, action, next_state):
        """See `abstract_reward.forward'."""
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.get_default_dtype())
        if not isinstance(action, torch.Tensor):
            action = torch.tensor(action, dtype=torch.get_default_dtype())

        cos_angle = torch.cos(state[..., 0])
        velocity = state[..., 1]

        angle_tolerance = tolerance(cos_angle, lower=0.95, upper=1.0, margin=0.1)
        velocity_tolerance = tolerance(velocity, lower=-0.5, upper=0.5, margin=0.5)
        state_cost = angle_tolerance * velocity_tolerance

        action_tolerance = tolerance(action[..., 0], lower=-0.1, upper=0.1, margin=0.1)
        action_cost = self.action_cost * (action_tolerance - 1)

        cost = state_cost + action_cost

        return cost.unsqueeze(-1), torch.zeros(1)


class PendulumDenseReward(AbstractModel):
    """Reward for Inverted Pendulum."""

    def __init__(self, action_cost=0.0):
        super().__init__(dim_state=(2,), dim_action=(1,), model_kind="rewards")
        self.action_cost = action_cost
        self.reward_offset = 0

    def forward(self, state, action, next_state):
        """See `abstract_reward.forward'."""
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.get_default_dtype())
        if not isinstance(action, torch.Tensor):
            action = torch.tensor(action, dtype=torch.get_default_dtype())

        cos_angle = 1 - torch.cos(state[..., 0])
        state_cost = cos_angle ** 2
        action_cost = self.action_cost * (action ** 2).sum(-1)

        return -(action_cost + state_cost), torch.tensor(0.0)


class PendulumModel(AbstractModel):
    """Pendulum Model.

    Torch implementation of a pendulum model using euler forwards integration.
    """

    def __init__(
        self, mass, length, friction, step_size=1 / 80, noise: MultivariateNormal = None
    ):
        super().__init__(dim_state=(2,), dim_action=(1,))
        self.mass = mass
        self.length = length
        self.friction = friction
        self.step_size = step_size
        self.noise = noise

    def forward(self, state, action):
        """Get next-state distribution."""
        # Physical dynamics
        action = torch.clamp(action, -1.0, 1.0)
        mass = self.mass
        gravity = 9.81
        length = self.length
        friction = self.friction
        inertia = mass * length ** 2
        dt = self.step_size

        angle, angular_velocity = torch.split(state, 1, dim=-1)
        for _ in range(1):
            x_ddot = (
                (gravity / length) * torch.sin(angle)
                + action * (1 / inertia)
                - (friction / inertia) * angular_velocity
            )

            angle = angle + dt * angular_velocity
            angular_velocity = angular_velocity + dt * x_ddot

        next_state = torch.cat((angle, angular_velocity), dim=-1)

        if self.noise is None:
            return next_state, torch.zeros(1)
        else:
            return next_state + self.noise.mean, self.noise.covariance_matrix


## Define parameters.

In [26]:
action_cost=0.1
sparse_reward = False
solver = "CEMShooting"
sparse_reward_model = PendulumSparseReward(action_cost=action_cost)
dense_reward_model = PendulumDenseReward(action_cost=action_cost)
reward_model = sparse_reward_model if sparse_reward else dense_reward_model
dynamical_model =  PendulumModel(mass=0.3, length=0.5, friction=0.005, step_size=1 / 80)

## Define environment.

In [27]:
initial_distribution = torch.distributions.Uniform(
    torch.tensor([np.pi, -0.0]), torch.tensor([np.pi, +0.0])
)
environment = SystemEnvironment(
    InvertedPendulum(mass=0.3, length=0.5, friction=0.005, step_size=1 / 80),
    reward=reward_model,
    initial_state=initial_distribution.sample,
)

## Define Agent.

In [32]:
agent = MPCAgent.default(
        environment=environment,
        mpc_solver_name="MPPIShooting",
        dynamical_model=dynamical_model,
        reward_model=reward_model,
        exploration_episodes=0,
        horizon=50,
    )

rollout_agent(agent=agent, environment=environment, max_steps=400, num_episodes=1, render=True)

100%|██████████| 1/1 [00:16<00:00, 16.15s/it]
