# 自作シミュレーション環境におけるドローンの飛行制御


In [None]:
import os
import random
from time import time

import numpy as np
import pandas as pd
import torch
from tqdm import tqdm

import gymnasium as gym
from stable_baselines3 import PPO  # , A2C # , SAC, DDPG 
from stable_baselines3.ppo.policies import MlpPolicy
from stable_baselines3.common.evaluation import evaluate_policy

from env.ground_for_propeller import PropellerDroneFlight, FlightRecorder
from pybullet_sim_vis import SimCamera
from parapara.generator import to_numpy, play_anim, save_as_mp4

from utils.paths import OUTPUT_PATH, APP_ROOT_DIR


## 1. 共通モジュール


In [None]:
def set_random_seed(seed: int, using_cuda: bool = False) -> None:
    """
    Seed the different random generators.

    :param seed:
    :param using_cuda:
    """
    # Seed python RNG
    random.seed(seed)
    # Seed numpy RNG
    np.random.seed(seed)
    # seed the RNG for all devices (both CPU and CUDA)
    torch.manual_seed(seed)

    # if using_cuda:
    #     # Deterministic operations for CuDNN, it may impact performances
    #     th.backends.cudnn.deterministic = True
    #     th.backends.cudnn.benchmark = False



def play_drone_with_model(model, specified_goal, gravity_enabled=True, complement_force=False, debug_mode=False) -> tuple:
    frames = []
    env = PropellerDroneFlight(
        "rgb_array", weight_approach_reward=20, weight_force_penalty=0,
        gravity_enabled=gravity_enabled, complement_force=complement_force,
        specified_goal=specified_goal
    )

    # hist を貯めるための処理 -->
    rows = 400
    col_len = env.action_space.shape[0] + env.observation_space.shape[0]
    hist = np.empty((rows + 1, col_len), dtype=np.float32)

    obs, _ = env.reset()
    hist[0] = np.concat([np.zeros(env.action_space.shape[0]), obs[:]])
    # <-- hist を貯めるための処理

    fr = FlightRecorder(goal=env.goal)
    for i in tqdm(range(rows)):
        action, _states = model.predict(obs) # モデルの推論
        obs, reward, done, _, info = env.step(action)

        hist[i + 1] = np.concat([action, obs[:]])
        
        fr.record_step(obs, action, reward, done, info)
        frames.append(env.render())
        if done:
            print("Done in {} frame.".format(i))
            break
    
    env.close()

    return frames, fr, hist


set_random_seed(42)

## 2. PPO での学習実行

項目|値
--|--:
重力加速度|9.8 m/(s^2)
機体重量|4.0 kg
Agent アルゴリズム|PPO
学習ステップ|300000


In [None]:
start = time()

# 学習
env = PropellerDroneFlight(
    "human", weight_approach_reward=20, weight_force_penalty=0, gravity_enabled=True,
    specified_goal=(5.0, 5.0, 5.0),debug_mode=True
)

log_path: str = APP_ROOT_DIR / "logs" / "sb3_ppo"
model = PPO(MlpPolicy, env, tensorboard_log=log_path)
model.learn(total_timesteps=3000000, progress_bar=True)  # 300000
print("training finished")

print(time() - start, "sec")

# 評価
mean_rewards, std_rewards = evaluate_policy(model, env, n_eval_episodes=20, return_episode_rewards=True)

pd.DataFrame({"mean_reward": mean_rewards, "+/- std": std_rewards})



## 3. 学習したモデルパラメータで推論

In [None]:
frames, fr, hist = play_drone_with_model(model, specified_goal=(5.0, 5.0, 5.0), debug_mode=False)
obs, _ = env.reset()
columns = [f"prop_rot_{i}" for i  in range(env.action_space.shape[0])] + list(obs._fields)

df_hist = pd.DataFrame(hist, columns=columns)
df_hist_renamed = df_hist.rename({
    "rot_x": "rotation_x", "rot_y": "rotation_y", "rot_z": "rotation_z",
    "pos_x": "position_x", "pos_y": "position_y", "pos_z": "position_z",
}, axis=1)
df_hist_renamed


## 4. 推論結果の可視化

In [None]:
disp = play_anim(frames)
disp


In [None]:
save_as_mp4(frames, str(OUTPUT_PATH / "sb3_ppo_3_million_ep_dots.gif"), fps=10)


In [None]:
len(df_hist_renamed)

In [None]:
frames = SimCamera(
    main_urdf_path=str(APP_ROOT_DIR / "data" / "drone.urdf"),
    save_frequency=1
).convert_to_images(df_hist_renamed)
anim = play_anim(frames)
anim


In [None]:
len(frames)

In [None]:
save_as_mp4(frames, str(OUTPUT_PATH / "sb3_ppo_3_million_ep_real.gif"), fps=10)
