In [1]:
import torch
import numpy as np
import gym
import gnwrapper
from torch import nn

from stable_baselines3 import A2C
from stable_baselines3.a2c import MlpPolicy
from stable_baselines3.common.env_checker import check_env

from cpprb import ReplayBuffer

from gym_pybullet_drones.utils.Logger import Logger
from gym_pybullet_drones.envs.single_agent_rl.TakeoffAviary import TakeoffAviary
from gym_pybullet_drones.utils.utils import sync, str2bool

In [2]:
torch.cuda.is_available()

False

In [3]:
# ドローンをランダムに行動させ制御対象を把握する
monitor_env = gnwrapper.Monitor(gym.make("takeoff-aviary-v0", gui=True), size=(
    400, 300), directory='.', force=True, video_callable=lambda ep: True)

episode_max_steps = 500

for episode_idx in range(5):
    monitor_env.reset()
    total_rew = 0.
    for _ in range(episode_max_steps):
        _, rew, done, _ = monitor_env.step(monitor_env.action_space.sample())
        total_rew += rew
        if done:
            break
    print("iter={0: 3d} total reward: {1: 4.4f}".format(
        episode_idx, total_rew))
monitor_env.display()
monitor_env.close()

[INFO] BaseAviary.__init__() loaded parameters from the drone's .urdf:
[INFO] m 0.027000, L 0.039700,
[INFO] ixx 0.000014, iyy 0.000014, izz 0.000022,
[INFO] kf 0.000000, km 0.000000,
[INFO] t2w 2.250000, max_speed_kmh 30.000000,
[INFO] gnd_eff_coeff 11.368590, prop_radius 0.023135,
[INFO] drag_xy_coeff 0.000001, drag_z_coeff 0.000001,
[INFO] dw_coeff_1 2267.180000, dw_coeff_2 0.160000, dw_coeff_3 -0.110000
viewMatrix (0.642787516117096, -0.4393851161003113, 0.6275069713592529, 0.0, 0.766044557094574, 0.36868777871131897, -0.5265407562255859, 0.0, -0.0, 0.8191521167755127, 0.5735764503479004, 0.0, 2.384185791015625e-07, 2.384185791015625e-07, -5.000000476837158, 1.0)
projectionMatrix (0.7499999403953552, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, -1.0000200271606445, -1.0, 0.0, 0.0, -0.02000020071864128, 0.0)
iter=  0 total reward: -1047.1226
iter=  1 total reward: -372.8307
iter=  2 total reward: -1098.1368
iter=  3 total reward: -988.0966
iter=  4 total reward: -1168.2801


In [4]:
# 行動空間と状態空間を確認する
env = gym.make("takeoff-aviary-v0")
print("[INFO] Action space:", env.action_space)
print("[INFO] Observation space:", env.observation_space)

print(env.__class__)
print(monitor_env.action_space.sample())
print(type(monitor_env.action_space.sample()))

[INFO] BaseAviary.__init__() loaded parameters from the drone's .urdf:
[INFO] m 0.027000, L 0.039700,
[INFO] ixx 0.000014, iyy 0.000014, izz 0.000022,
[INFO] kf 0.000000, km 0.000000,
[INFO] t2w 2.250000, max_speed_kmh 30.000000,
[INFO] gnd_eff_coeff 11.368590, prop_radius 0.023135,
[INFO] drag_xy_coeff 0.000001, drag_z_coeff 0.000001,
[INFO] dw_coeff_1 2267.180000, dw_coeff_2 0.160000, dw_coeff_3 -0.110000
[INFO] Action space: Box(-1.0, 1.0, (4,), float32)
[INFO] Observation space: Box(-1.0, 1.0, (12,), float32)
<class 'gym_pybullet_drones.envs.single_agent_rl.TakeoffAviary.TakeoffAviary'>
[-0.2694515  -0.26056373  0.85068697 -0.4642893 ]
<class 'numpy.ndarray'>


ドローンの姿勢に関する報酬関数を定義する
- rollとpitch角は0に近ほど良い
- roll,pitch,yawの角速度は0に近いほど良い
- ローターの推力の合計はドローンにかかる重力と釣り合っているほど良い(ローターの回転数を決める環境の場合(act: ActionType=ActionType.RPM)4つのactionを0.0にすることで重力と釣り合う推力になる)
- ドローンの高度が0.1より小さい場合罰を与え、それ以上の場合は何も与えない(ホバリングするようにするため)

In [5]:
def angle_normalize(x):
    return ((x + np.pi) % (2 * np.pi)) - np.pi

def reward_fn(obses, acts):
    pos, ang, vel, gyr = obses[:, 0:3], angle_normalize(obses[:, 3:6]), obses[:, 6:9], angle_normalize(obses[:, 9:12])
    cost = np.zeros((pos.shape[0], ))
    cost += np.sum(ang[:, 0:2] ** 2, axis=1)
    cost += np.sum(gyr ** 2, axis=1) * 0.1
    cost += np.sum(acts ** 2, axis=1) * 0.001
    cost += (pos[:, 2] < 0.1).astype(np.float32)
    return -cost

ダイナミクスモデルの実装を2層のMLPで実装する

現在の状態と次の状態の差分を予測する

In [6]:
class DynamicsModel(nn.Module):
    def __init__(self, input_dim, output_dim, units=(32, 32)):
        super().__init__()
        self.model = torch.nn.Sequential(
            torch.nn.Linear(input_dim, units[0]),
            torch.nn.ReLU(),
            torch.nn.Linear(units[0], units[1]),
            torch.nn.ReLU(),
            torch.nn.Linear(units[1], output_dim)
        )

        self.input_dim = input_dim

        self._loss_fn = torch.nn.MSELoss(reduction='mean')
        self._optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)

    def predict(self, inputs):
        assert inputs.shape[1] == self.input_dim
        return self.model(inputs)

    def fit(self, inputs, labels):
        predicts = self.predict(inputs)
        loss = self._loss_fn(predicts, labels)
        self._optimizer.zero_grad()
        loss.backward()
        self._optimizer.step()
        return loss.data.numpy()

In [7]:
obs_dim = 6  # 実機での適用に備え、姿勢制御に使える情報はオイラー角と角速度のみとした
act_dim = env.action_space.high.size  # 各ローターのrpm(4)
dynamics_model = DynamicsModel(input_dim=obs_dim+act_dim, output_dim=obs_dim)

In [8]:
print(dynamics_model.model)

Sequential(
  (0): Linear(in_features=10, out_features=32, bias=True)
  (1): ReLU()
  (2): Linear(in_features=32, out_features=32, bias=True)
  (3): ReLU()
  (4): Linear(in_features=32, out_features=6, bias=True)
)


In [9]:
def extract_ang_and_gyr(obses): 
    return obses[:, 3:6], obses[:, 9:12]

def format_output(diff_obses):
    # print(diff_obses.shape)
    formatted = np.zeros((diff_obses.shape[0], 12), dtype=np.float32)
    formatted[:, 3:6] += diff_obses[:, 0:3]
    formatted[:, 9:12] += diff_obses[:, 3:6]
    return formatted

def predict_next_state(obses, acts):
    inputs = np.concatenate([*extract_ang_and_gyr(obses), acts], axis=1)
    assert inputs.shape[1] == obs_dim + act_dim
    inputs = torch.from_numpy(inputs).float()
    diff_obses = dynamics_model.predict(inputs).data.numpy()
    next_obses = obses + format_output(diff_obses)
    return next_obses

In [10]:
# 10Kデータ分 (s, a, s') を保存できるリングバッファを用意します
rb_dict = {
    "size": 10000,
    "default_dtype": np.float32,
    "env_dict": {
        "obs": {"shape": env.observation_space.shape},
        "next_obs": {"shape": env.observation_space.shape},
        "act": {"shape": env.action_space.shape}}}
dynamics_buffer = ReplayBuffer(**rb_dict)

RSの実装

In [11]:
class RandomPolicy:
    def __init__(self, max_action, act_dim):
        self._max_action = max_action  # action の最大値
        self._act_dim = act_dim  # action の次元数

    def get_actions(self, batch_size):
        # 一様分布からバッチサイズ分ランダムにサンプリング
        return np.random.uniform(
            low=-self._max_action,
            high=self._max_action,
            size=(batch_size, self._act_dim))

In [12]:
policy = RandomPolicy(
    max_action=env.action_space.high[0],
    act_dim=env.action_space.high.size)

In [13]:
def random_shooting(init_obs, n_mpc_episodes=64, horizon=20):
    init_actions = policy.get_actions(batch_size=n_mpc_episodes)
    returns = np.zeros(shape=(n_mpc_episodes, ))
    obses = np.tile(init_obs, (n_mpc_episodes, 1))

    for i in range(horizon):
        if i == 0:
            acts = np.copy(init_actions)
        else:
            acts = policy.get_actions(batch_size=n_mpc_episodes)
        
        next_obses = predict_next_state(obses, acts)

        rewards = reward_fn(obses, acts)

        returns += rewards
        obses = next_obses

    return init_actions[np.argmax(returns)] 
        

RSの実行

In [14]:
batch_size = 100
n_episodes = 100

def fit_dynamics(n_iter=50):
    mean_loss = 0.
    for _ in range(n_iter):
        samples = dynamics_buffer.sample(batch_size)
        inputs = np.concatenate([*extract_ang_and_gyr(samples['obs']), samples['act']], axis=1)
        labels = np.concatenate([*extract_ang_and_gyr(samples['next_obs'])], axis=1) - np.concatenate([*extract_ang_and_gyr(samples['obs'])], axis=1)
        mean_loss += dynamics_model.fit(
            torch.from_numpy(inputs).float(),
            torch.from_numpy(labels).float()
        )
    return mean_loss

total_steps = 0

for _ in range(10):
    obs = env.reset()
    for _ in range(200):
        total_steps += 1
        act = env.action_space.sample()
        next_obs, _, done, _ = env.step(act)
        dynamics_buffer.add(obs=obs, act=act, next_obs=next_obs)
        obs = next_obs
        if done:
            break

fit_dynamics(n_iter=1000)

for episode_idx in range(n_episodes):
    total_rew = 0.

    obs = env.reset()
    for _ in range(episode_max_steps):
        total_steps += 1

        act = random_shooting(obs)
        next_obs, _, done, _ = env.step(act)

        rew = reward_fn(obs.reshape(1, obs.shape[0]), act.reshape(1, act.shape[0])) 

        dynamics_buffer.add(obs=obs, act=act, next_obs=next_obs)

        total_rew += float(rew[0])
        if done:
            break
        obs = next_obs

    mean_loss = fit_dynamics(n_iter=100)
    if episode_idx % 5 == 0:
        print("iter={0: 3d} total steps: {1: 5d} total reward: {2: 4.4f} mean loss: {3:.6f}".format(
            episode_idx, total_steps, total_rew, mean_loss))

iter=  0 total steps:  2500 total reward: -550.6097 mean loss: 1.384046
iter=  5 total steps:  5000 total reward: -550.6085 mean loss: 3.122018
iter= 10 total steps:  7500 total reward: -550.5868 mean loss: 2.944312
iter= 15 total steps:  10000 total reward: -550.8588 mean loss: 2.932097
iter= 20 total steps:  12500 total reward: -550.7419 mean loss: 2.977425
iter= 25 total steps:  15000 total reward: -550.6530 mean loss: 2.392183
iter= 30 total steps:  17500 total reward: -550.7344 mean loss: 2.233169
iter= 35 total steps:  20000 total reward: -550.7420 mean loss: 2.339706
iter= 40 total steps:  22500 total reward: -550.7538 mean loss: 2.439366
iter= 45 total steps:  25000 total reward: -550.6807 mean loss: 2.611433
iter= 50 total steps:  27500 total reward: -550.8711 mean loss: 2.839083
iter= 55 total steps:  30000 total reward: -551.1702 mean loss: 2.901275
iter= 60 total steps:  32500 total reward: -550.8875 mean loss: 2.731098
iter= 65 total steps:  35000 total reward: -550.6219 m

In [15]:
for episode_idx in range(3):
    obs = monitor_env.reset()
    total_rew = 0.
    for _ in range(episode_max_steps):
        act = random_shooting(obs)
        next_obs, rew, done, _ = monitor_env.step(act)
        total_rew += rew
        if done:
            break
        obs = next_obs
    print("iter={0: 3d} total reward: {1: 4.4f}".format(episode_idx, total_rew))

monitor_env.display(reset=True)

iter=  0 total reward: -496.6952
iter=  1 total reward: -482.5822
iter=  2 total reward: -419.0153
