![MuJoCo banner](https://raw.githubusercontent.com/google-deepmind/mujoco/main/banner.png)

# <h1><center>チュートリアル  <a href="https://colab.research.google.com/github/google-deepmind/mujoco/blob/main/mjx/training_apg.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" width="140" align="center"/></a></center></h1>

このノートブックは、MuJoCoのJAXベース実装である [**MuJoCo XLA (MJX)**](https://github.com/google-deepmind/mujoco/blob/main/mjx) における微分可能物理を用いたポリシー学習のチュートリアルを提供します。

**GPUアクセラレーション付きのColabランタイムが必要です。** CPU専用ランタイムを使用している場合は、メニュー「ランタイム > ランタイムのタイプを変更」から切り替えることができます。


このノートブックは [Jing Yuan Luo](https://github.com/Andrew-Luo1) によって作成されました。

<!-- Copyright 2021 DeepMind Technologies Limited

     Licensed under the Apache License, Version 2.0 (the "License");
     you may not use this file except in compliance with the License.
     You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-->

## ポリシー学習とポリシー勾配

ここでは、ポリシー学習の概要を振り返り、MJXの微分可能性をポリシー学習にどのように活用できるかを文脈に沿って説明します。以下の概念に馴染みがない場合は、多くの優れたリソースが [オンライン](https://spinningup.openai.com/en/latest/spinningup/rl_intro.html) で利用できます！

ポリシー学習の目標は、ある期間にわたる総報酬 $\sum r_t$ を最大化するアクション $a_t \sim \pi(\cdot| x_t, \theta)$ を出力する制御ポリシー $\pi$ を見つけることです。ここで $r_t$ は時刻tの状態とアクションで評価される報酬関数の略記です：

$$r_t = r(x_t, a_t)$$

$\theta$ はポリシーのパラメータであり、ポリシーがニューラルネットワークの場合は重みに相当します。**ポリシー勾配法** は、重みに関する報酬の勾配を推定し、この値を勾配降下法や [Adam](https://arxiv.org/abs/1412.6980) などの一次最適化アルゴリズムで使用します。ポリシー勾配の推定方法は、想定する状態遷移モデルによって異なります。

#### ゼロ次ポリシー勾配（ZoPG）

`mjx.step` をシミュレーション関数fとして参照し、fの値のみに依存するゼロ次勾配と、そのヤコビアンに依存する一次勾配を区別するために、いくつかの [用語](https://arxiv.org/abs/2202.00817) を借用します。

標準的な [PPO](https://github.com/google/brax/blob/main/brax/training/agents/ppo/train.py) などの強化学習（RL）アルゴリズムは、確率的状態遷移モデル $x_{t+1} \sim P(\cdot | x_t, a_t)$ を仮定します。これにより、以下の形式のZoPGが得られます：

$$
\nabla_\theta J(\pi_\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[ \sum \nabla_\theta \log\pi_\theta (a_t | s_t) R(\tau) \right]
$$

ここで $R(\tau)$ はロールアウト $\tau = \{x_t, a_t\}_{t=0}^{T}$ に依存する何らかの関数です。この手法の人気と精緻化に向けた広範な研究にもかかわらず、勾配の分散が高いという基本的な特性があります。これにより、オプティマイザはポリシー空間を徹底的に探索でき、堅牢でしばしば驚くほど優れたポリシーを実現できます。しかし、分散が高いため、収束には多くのサンプル $(x_t, a_t)$ が必要になります。

#### 一次ポリシー勾配（FoPG）
一方、決定論的状態遷移モデル $x_{t+1} = f(x_t, a_t)$ を仮定すると、一次ポリシー勾配が得られます。他の一般的な名称には、解析的ポリシー勾配（APG）や時間方向逆伝播（BPTT）があります。状態の進化を確率的なブラックボックスとしてモデル化するZoPG法とは異なり、FoPGはシミュレーション関数fのヤコビアンを明示的に含みます。例として、報酬が状態のみに依存する場合の報酬 $r_t$ の勾配を見てみましょう。
$$
\frac{\partial r_t}{\partial \theta} = \frac{\partial r_t}{\partial x_t}\frac{\partial x_t}{\partial \theta} 
$$

$$
\frac{\partial x_t}{\partial \theta} = \color{blue}{\frac{\partial f(x_{t-1}, a_{t-1})}{\partial x_{t-1}}}\frac{\partial x_{t-1}}{\partial \theta} + \color{blue}{\frac{\partial f(x_{t-1}, a_{t-1})}{\partial a_{t-1}}} \frac{\partial a_{t-1}}{\partial \theta}
$$

上式の青色の項はMJXの微分可能性によって可能となるものであり、FoPGとZoPGの主要な違いです。重要な考慮点は、接触点付近でこれらのヤコビアンがどのように見えるかです。ヤコビアン内の特定の勾配が病的になりうる理由を理解するために、硬い球が大理石のブロックに向かって落下する状況を想像してください。地面に触れる直前に、速度は距離に対してどのように変化するでしょうか ($\frac{\partial \dot{z}_t}{\partial z_t}$) ？これは [ハードコンタクト](https://arxiv.org/html/2404.02887v1) による **情報量のない勾配** のケースです。幸い、MuJoCoのデフォルトの接触設定はFoPGによる学習に十分な [ソフト](https://mujoco.readthedocs.io/en/stable/computation/index.html#soft-contact-model) さを持っています。ソフトコンタクトでは、剛体接触のように即座に偏向するのに十分な力を提供するのではなく、ボールが地面に侵入するにつれて増加する力を加えます。

FoPGについて考える有用な方法は、連鎖律と計算グラフを通じたものです。以下は、報酬がアクションに依存しない場合に $r_2$ がポリシー勾配にどのように影響するかを示しています：

<img src="../doc/images/mjx/apg_diagram.png" alt="drawing" width="300"/>

この例では3つの異なる勾配の連鎖があることに注目してください。赤い経路は、直前のアクションが状態にどのように影響したかを考慮します。青い経路は *時間方向逆伝播* という名前の由来を説明し、アクションが下流の報酬にどのように影響するかを捉えます。最も直感的でないのは緑の連鎖かもしれません。これは、報酬がアクションの前のアクションへの依存性にどのように依存するかを示しています。経験的に、 `jax.lax.stop_grad` によってこれら3つの経路の *いずれか* をブロックすると、ポリシー学習が著しく妨げられることがわかっています。 $x_t$ のバックボーンの長さが増すにつれて、 [勾配爆発](https://arxiv.org/abs/2111.05803) が重要な考慮事項になります。実際には、下流の勾配を減衰させるか、定期的に勾配を切り詰めることで解決できます。

**FoPGの注意点**

FoPGは特に [状態空間の次元が増加する](https://arxiv.org/abs/2204.07137) につれて非常にサンプル効率が高いことが示されていますが、根本的な欠点の一つとして、勾配の分散が低いため、FoPGはZoPGよりも探索能力が劣り、問題の定式化においてより明示的であることが求められます。

さらに、ロボットが倒れた時の大きなペナルティなど、不連続な報酬の定式化はRLにおいて至る所に存在します。FoPGはそのようなペナルティを通じて逆伝播できないため、FoPGでロバストなポリシーを設計することは [かなり困難](https://arxiv.org/abs/2403.14864) になる可能性があります。

最後に、サンプル効率にもかかわらず、FoPG法はウォールクロック時間で苦戦する可能性があります。勾配の分散が低いため、 [RL](https://arxiv.org/abs/2109.11978) とは異なり、データ収集の大規模並列化による恩恵が大きくありません。さらに、ポリシー勾配は通常、自動微分を介して計算されます。これはシミュレーションの順方向展開と比べて3〜5倍遅く、メモリを大量に消費します。メモリ要件は $O(m \cdot (m+n) \cdot T)$ でスケーリングされます。ここで、mとnは状態と制御の次元、$m \cdot (m+n)$ はヤコビアンの次元、Tは伝播されるステップ数です。

特定のモデルでは、 `mjx.step` を通じた自動微分が現在 [nan勾配](https://github.com/google-deepmind/mujoco/issues/1517) を引き起こすことに注意してください。現在のところ、メモリ要件と学習時間が倍増するコストを払って、倍精度浮動小数点数を使用することでこの問題に対処しています。


---
**論文**

学術的な文脈でこの研究を使用する場合は、以下の論文を引用してください：

```
@misc{luo2024residual,
  title={Residual Policy Learning for Perceptive Quadruped Control Using Differentiable Simulation},
  author={Luo, Jing Yuan and Song, Yunlong and Klemm, Victor and Shi, Fan and Scaramuzza, Davide and Hutter, Marco},
  year={2024},
  eprint={2410.03076},
  archivePrefix={arXiv},
  primaryClass={cs.RO},
  url={https://doi.org/10.48550/arXiv.2410.03076}
}
```

---

**このチュートリアルの内容**

このチュートリアルでは、Braxのシンプルな APG [アルゴリズム](https://github.com/google/brax/tree/main/brax/training/agents/apg) を使用して、FoPGの2つの使用方法を示します。このアルゴリズムは基本的に、FoPGを使ってポリシーに対するライブ勾配降下を行い、短い時間窓でポリシーを展開し、そのデータを使ってポリシー更新を行い、その後中断した箇所から継続します。

## セットアップ：インポートとインストール

In [None]:
# MuJoCo、MJX、Braxのインストール
!pip install mujoco
!pip install mujoco_mjx
!pip install brax

In [None]:
#@title MuJoCoのインストールが成功したか確認

# GPUレンダリングのセットアップ
from google.colab import files
import distutils.util
import os
import subprocess
if subprocess.run('nvidia-smi').returncode:
  raise RuntimeError(
      'Cannot communicate with GPU. '
      'Make sure you are using a GPU Colab runtime. '
      'Go to the Runtime menu and select Choose runtime type.')

# glvndがNvidia EGLドライバを検出できるようにICDコンフィグを追加します。
# これは通常Nvidiaドライバパッケージの一部としてインストールされますが、Colabの
# カーネルはAPT経由でドライバをインストールしないため、ICDが欠けています。
# (https://github.com/NVIDIA/libglvnd/blob/master/src/EGL/icd_enumeration.md)
NVIDIA_ICD_CONFIG_PATH = '/usr/share/glvnd/egl_vendor.d/10_nvidia.json'
if not os.path.exists(NVIDIA_ICD_CONFIG_PATH):
  with open(NVIDIA_ICD_CONFIG_PATH, 'w') as f:
    f.write("""{
    "file_format_version" : "1.0.0",
    "ICD" : {
        "library_path" : "libEGL_nvidia.so.0"
    }
}
""")

# MuJoCoがEGLレンダリングバックエンドを使用するように設定（GPU必須）
print('Setting environment variable to use GPU rendering:')
%env MUJOCO_GL=egl

# インストールが成功したか確認
try:
  print('Checking that the installation succeeded:')
  import mujoco
  mujoco.MjModel.from_xml_string('<mujoco/>')
except Exception as e:
  raise e from RuntimeError(
      'Something went wrong during installation. Check the shell output above '
      'for more information.\n'
      'If using a hosted Colab runtime, make sure you enable GPU acceleration '
      'by going to the Runtime menu and selecting "Choose runtime type".')

print('Installation successful.')

# その他のインポートとヘルパー関数
import time
import itertools
import numpy as np

# グラフィックスとプロット
print('Installing mediapy:')
!command -v ffmpeg >/dev/null || (apt update && apt install -y ffmpeg)
!pip install -q mediapy
import mediapy as media
import matplotlib.pyplot as plt

# numpyの出力をより読みやすく
np.set_printoptions(precision=3, suppress=True, linewidth=100)

from IPython.display import clear_output
clear_output()

In [None]:
#@title MuJoCo、MJX、Braxのインポート

import os
os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "0.8" # 0.9だとラグが大きすぎる
from datetime import datetime
import functools

# 数学
import jax.numpy as jp
import numpy as np
import jax
from jax import config # 解析的勾配は倍精度でより良く動作する
config.update("jax_debug_nans", True)
config.update("jax_enable_x64", True)
config.update('jax_default_matmul_precision', jax.lax.Precision.HIGH)
from brax import math

# シミュレーション
import mujoco
import mujoco.mjx as mjx

# Brax
from brax import envs
from brax.base import Motion, Transform
from brax.io import mjcf
from brax.envs.base import PipelineEnv, State
from brax.mjx.pipeline import _reformat_contact
from brax.training.acme import running_statistics
from brax.io import model

# アルゴリズム
from brax.training.agents.apg import train as apg
from brax.training.agents.apg import networks as apg_networks
from brax.training.agents.ppo import train as ppo

# サポート
from etils import epath
import mediapy as media
import matplotlib.pyplot as plt
from ml_collections import config_dict
from typing import Any, Dict


#### 四足歩行ロボット環境

In [None]:
!git clone https://github.com/google-deepmind/mujoco_menagerie.git

In [None]:
xml_path = epath.Path('mujoco_menagerie/anybotics_anymal_c/scene_mjx.xml').as_posix()

mj_model = mujoco.MjModel.from_xml_path(xml_path)

if 'renderer' not in dir():
    renderer = mujoco.Renderer(mj_model)

init_q = mj_model.keyframe('standing').qpos

mj_data = mujoco.MjData(mj_model)
mj_data.qpos = init_q
mujoco.mj_forward(mj_model, mj_data)

renderer.update_scene(mj_data)
media.show_image(renderer.render())

In [None]:
# ロールアウトのレンダリング
def render_rollout(reset_fn, step_fn, 
                   inference_fn, env, 
                   n_steps = 200, camera=None,
                   seed=0):
    rng = jax.random.key(seed)
    render_every = 3
    state = reset_fn(rng)
    rollout = [state.pipeline_state]

    for i in range(n_steps):
        act_rng, rng = jax.random.split(rng)
        ctrl, _ = inference_fn(state.obs, act_rng)
        state = step_fn(state, ctrl)
        if i % render_every == 0:
            rollout.append(state.pipeline_state)

    media.show_video(env.render(rollout, camera=camera), 
                     fps=1.0 / (env.dt*render_every),
                     codec='gif')

## スタディ1：キネマティクスの模倣

FoPGは [模倣学習](https://openreview.net/forum?id=06mk-epSwZ) においてうまく機能することが示されています。特に、エージェントの状態が参照状態から離れすぎた場合に参照状態にリセットされる設定で効果的です。このセクションでは、その場でのトロット歩行を学習します。PDコントローラのゲインが限られているため、高速なトロット歩行は決して自明ではありません！

RL環境には3つの報酬があります：
- min_reference_tracking は最小座標での参照モーションからの誤差にペナルティを与えます。これにより、ポリシーの出力がより正確になります。
- reference_tracking は最大座標での誤差にペナルティを与え、学習の安定性を向上させます。
- feet_height は追跡するボディの位置と速度のバランスを調整し、足の *位置* に追加のインセンティブを与えます。

#### 参照キネマティクスの設計

In [None]:
def cos_wave(t, step_period, scale):
    _cos_wave = -jp.cos(((2*jp.pi)/step_period)*t)
    return _cos_wave * (scale/2) + (scale/2)

def dcos_wave(t, step_period, scale):
    """ 
    cos波の導関数（参照速度用）
    """
    return ((scale*jp.pi) / step_period) * jp.sin(((2*jp.pi)/step_period)*t)

def make_kinematic_ref(sinusoid, step_k, scale=0.3, dt=1/50):
    """ 
    12個の脚関節のトロット歩行キネマティクスを作成します。
    step_kは足を上げ下げするのにかかるタイムステップ数です。
    歩行サイクルは 2 * step_k * dt 秒の長さです。
    """
    
    _steps = jp.arange(step_k)
    step_period = step_k * dt
    t = _steps * dt
    
    wave = sinusoid(t, step_period, scale)
    # アクティブな前脚の1ステップ分のコマンド
    fleg_cmd_block = jp.concatenate(
        [jp.zeros((step_k, 1)),
        wave.reshape(step_k, 1),
        -2*wave.reshape(step_k, 1)],
        axis=1
    )
    # 立ち姿勢の設定では前脚と後脚が反転
    h_leg_cmd_bloc = -1 * fleg_cmd_block

    block1 = jp.concatenate([
        jp.zeros((step_k, 3)),
        fleg_cmd_block,
        h_leg_cmd_bloc,
        jp.zeros((step_k, 3))],
        axis=1
    )

    block2 = jp.concatenate([
        fleg_cmd_block,
        jp.zeros((step_k, 3)),
        jp.zeros((step_k, 3)),
        h_leg_cmd_bloc],
        axis=1
    )
    # 1ステップサイクルで、両方のアクティブな脚ペアが非アクティブとアクティブフェーズを持つ
    step_cycle = jp.concatenate([block1, block2], axis=0)
    return step_cycle


In [None]:
poses  = make_kinematic_ref(cos_wave, step_k=25)

frames = []
init_q = mj_model.keyframe('standing').qpos
mj_data.qpos = init_q
default_ap = init_q[7:]

for i in range(len(poses)):
    mj_data.qpos[7:] = poses[i] + default_ap
    mujoco.mj_forward(mj_model, mj_data)
    renderer.update_scene(mj_data)
    frames.append(renderer.render())

media.show_video(frames, fps=50, codec='gif')

#### RL環境

In [None]:
def get_config():
  def get_default_rewards_config():
    default_config = config_dict.ConfigDict(
        dict(
            scales=config_dict.ConfigDict(
              dict(
                min_reference_tracking = -2.5 * 3e-3, # 大きさを均等にするため
                reference_tracking = -1.0,
                feet_height = -1.0
                )
              )
            )
    )
    return default_config

  default_config = config_dict.ConfigDict(
      dict(rewards=get_default_rewards_config(),))

  return default_config

# 数学関数（https://github.com/jiawei-ren/diffmimic より）
def quaternion_to_matrix(quaternions):
    r, i, j, k = quaternions[..., 0], quaternions[..., 1], quaternions[..., 2], quaternions[..., 3]
    two_s = 2.0 / (quaternions * quaternions).sum(-1)

    o = jp.stack(
        (
            1 - two_s * (j * j + k * k),
            two_s * (i * j - k * r),
            two_s * (i * k + j * r),
            two_s * (i * j + k * r),
            1 - two_s * (i * i + k * k),
            two_s * (j * k - i * r),
            two_s * (i * k - j * r),
            two_s * (j * k + i * r),
            1 - two_s * (i * i + j * j),
        ),
        -1,
    )
    return o.reshape(quaternions.shape[:-1] + (3, 3))

def matrix_to_rotation_6d(matrix):
    batch_dim = matrix.shape[:-2]
    return matrix[..., :2, :].reshape(batch_dim + (6,))

def quaternion_to_rotation_6d(quaternion):
    return matrix_to_rotation_6d(quaternion_to_matrix(quaternion))

class TrotAnymal(PipelineEnv):

  def __init__(
      self,
      termination_height: float=0.25,
      **kwargs,
  ):
    step_k = kwargs.pop('step_k', 25)

    physics_steps_per_control_step = 10
    kwargs['n_frames'] = kwargs.get(
        'n_frames', physics_steps_per_control_step)

    mj_model = mujoco.MjModel.from_xml_path(xml_path)
    kp = 230
    mj_model.actuator_gainprm[:, 0] = kp
    mj_model.actuator_biasprm[:, 1] = -kp

    sys = mjcf.load_model(mj_model)

    super().__init__(sys=sys, **kwargs)    
    
    self.termination_height = termination_height
    
    self._init_q = mj_model.keyframe('standing').qpos
    
    self.err_threshold = 0.4 # diffmimic; 論文の値
    
    self._default_ap_pose = mj_model.keyframe('standing').qpos[7:]
    self.reward_config = get_config()

    self.action_loc = self._default_ap_pose
    self.action_scale = jp.array([0.2, 0.8, 0.8] * 4)
    
    self.feet_inds = jp.array([21,28,35,42]) # LF, RF, LH, RH

    #### 模倣参照
    kinematic_ref_qpos = make_kinematic_ref(
      cos_wave, step_k, scale=0.3, dt=self.dt)
    kinematic_ref_qvel = make_kinematic_ref(
      dcos_wave, step_k, scale=0.3, dt=self.dt)
    
    self.l_cycle = jp.array(kinematic_ref_qpos.shape[0])
    
    # 状態空間全体に拡張

    kinematic_ref_qpos += self._default_ap_pose
    ref_qs = np.tile(self._init_q.reshape(1, 19), (self.l_cycle, 1))
    ref_qs[:, 7:] = kinematic_ref_qpos
    self.kinematic_ref_qpos = jp.array(ref_qs)
    
    ref_qvels = np.zeros((self.l_cycle, 18))
    ref_qvels[:, 6:] = kinematic_ref_qvel
    self.kinematic_ref_qvel = jp.array(ref_qvels)

    # JIT時間と学習のウォールクロック時間を大幅に削減できる
    self.pipeline_step = jax.checkpoint(self.pipeline_step, 
      policy=jax.checkpoint_policies.dots_with_no_batch_dims_saveable)
    
  def reset(self, rng: jax.Array) -> State:
    # 決定論的初期化

    qpos = jp.array(self._init_q)
    qvel = jp.zeros(18)
    
    data = self.pipeline_init(qpos, qvel)

    # 地面上に配置
    pen = jp.min(data.contact.dist)
    qpos = qpos.at[2].set(qpos[2] - pen)
    data = self.pipeline_init(qpos, qvel)

    state_info = {
        'rng': rng,
        'steps': 0.0,
        'reward_tuple': {
            'reference_tracking': 0.0,
            'min_reference_tracking': 0.0,
            'feet_height': 0.0
        },
        'last_action': jp.zeros(12), # MJXチュートリアルより
        'kinematic_ref': jp.zeros(19),
    }

    x, xd = data.x, data.xd
    obs = self._get_obs(data.qpos, x, xd, state_info)
    reward, done = jp.zeros(2)
    metrics = {}
    for k in state_info['reward_tuple']:
      metrics[k] = state_info['reward_tuple'][k]
    state = State(data, obs, reward, done, metrics, state_info)
    return jax.lax.stop_gradient(state)
  
  def step(self, state: State, action: jax.Array) -> State:
    action = jp.clip(action, -1, 1) # 生のアクション

    action = self.action_loc + (action * self.action_scale)

    data = self.pipeline_step(state.pipeline_state, action)
    
    ref_qpos = self.kinematic_ref_qpos[jp.array(state.info['steps']%self.l_cycle, int)]
    ref_qvel = self.kinematic_ref_qvel[jp.array(state.info['steps']%self.l_cycle, int)]
    
    # 最大座標の計算
    ref_data = data.replace(qpos=ref_qpos, qvel=ref_qvel)
    ref_data = mjx.forward(self.sys, ref_data)
    ref_x, ref_xd = ref_data.x, ref_data.xd

    state.info['kinematic_ref'] = ref_qpos

    # 観測データ
    x, xd = data.x, data.xd
    obs = self._get_obs(data.qpos, x, xd, state.info)

    # 転倒または落下した場合に終了
    done = 0.0
    done = jp.where(x.pos[0, 2] < self.termination_height, 1.0, done)
    up = jp.array([0.0, 0.0, 1.0])
    done = jp.where(jp.dot(math.rotate(up, x.rot[0]), up) < 0, 1.0, done)

    # 報酬
    reward_tuple = {
        'reference_tracking': (
          self._reward_reference_tracking(x, xd, ref_x, ref_xd)
          * self.reward_config.rewards.scales.reference_tracking
        ),
        'min_reference_tracking': (
          self._reward_min_reference_tracking(ref_qpos, ref_qvel, state)
          * self.reward_config.rewards.scales.min_reference_tracking
        ),
        'feet_height': (
          self._reward_feet_height(data.geom_xpos[self.feet_inds][:, 2]
                                   ,ref_data.geom_xpos[self.feet_inds][:, 2])
          * self.reward_config.rewards.scales.feet_height
        )
    }
    
    reward = sum(reward_tuple.values())

    # 状態管理
    state.info['reward_tuple'] = reward_tuple
    state.info['last_action'] = action # 観測に使用

    for k in state.info['reward_tuple'].keys():
      state.metrics[k] = state.info['reward_tuple'][k]

    state = state.replace(
        pipeline_state=data, obs=obs, reward=reward,
        done=done)
    
    #### 参照から離れすぎた場合に状態を参照にリセット
    error = (((x.pos - ref_x.pos) ** 2).sum(-1)**0.5).mean()
    to_reference = jp.where(error > self.err_threshold, 1.0, 0.0)

    to_reference = jp.array(to_reference, dtype=int) # 出力の型を入力と同じに保つ
    ref_data = self.mjx_to_brax(ref_data)

    data = jax.tree_util.tree_map(lambda x, y: 
                                  jp.array((1-to_reference)*x + to_reference*y, x.dtype), data, ref_data)
    
    x, xd = data.x, data.xd # データが変更された可能性がある
    obs = self._get_obs(data.qpos, x, xd, state.info)
    
    return state.replace(pipeline_state=data, obs=obs)
    
  def _get_obs(self, qpos: jax.Array, x: Transform, xd: Motion,
               state_info: Dict[str, Any]) -> jax.Array:

    inv_base_orientation = math.quat_inv(x.rot[0])
    local_rpyrate = math.rotate(xd.ang[0], inv_base_orientation)

    obs_list = []
    # ヨーレート
    obs_list.append(jp.array([local_rpyrate[2]]) * 0.25)
    # 射影重力
    obs_list.append(
        math.rotate(jp.array([0.0, 0.0, -1.0]), inv_base_orientation))
    # モーター角度
    angles = qpos[7:19]
    obs_list.append(angles - self._default_ap_pose)
    # 前回のアクション
    obs_list.append(state_info['last_action'])
    # キネマティクス参照
    kin_ref = self.kinematic_ref_qpos[jp.array(state_info['steps']%self.l_cycle, int)]
    obs_list.append(kin_ref[7:]) # 最初の7インデックスは固定

    obs = jp.clip(jp.concatenate(obs_list), -100.0, 100.0)

    return obs
  
  def mjx_to_brax(self, data):
    """ 
    コアMJXデータ構造にBraxラッパーを適用します。
    """
    q, qd = data.qpos, data.qvel
    x = Transform(pos=data.xpos[1:], rot=data.xquat[1:])
    cvel = Motion(vel=data.cvel[1:, 3:], ang=data.cvel[1:, :3])
    offset = data.xpos[1:, :] - data.subtree_com[self.sys.body_rootid[1:]]
    offset = Transform.create(pos=offset)
    xd = offset.vmap().do(cvel)
    data = _reformat_contact(self.sys, data)
    return data.replace(q=q, qd=qd, x=x, xd=xd)


  # ------------ 報酬関数 ----------------
  def _reward_reference_tracking(self, x, xd, ref_x, ref_xd):
    """
    慣性フレームのボディ位置に基づく報酬。
    特に、姿勢の高次元表現を使用します。
    """

    f = lambda x, y: ((x - y) ** 2).sum(-1).mean()

    _mse_pos = f(x.pos,  ref_x.pos)
    _mse_rot = f(quaternion_to_rotation_6d(x.rot),
                 quaternion_to_rotation_6d(ref_x.rot))
    _mse_vel = f(xd.vel, ref_xd.vel)
    _mse_ang = f(xd.ang, ref_xd.ang)

    # ほぼ同じ大きさになるように調整
    return _mse_pos      \
      + 0.1 * _mse_rot   \
      + 0.01 * _mse_vel  \
      + 0.001 * _mse_ang

  def _reward_min_reference_tracking(self, ref_qpos, ref_qvel, state):
    """ 
    最小座標を使用。関節角度の追跡精度を向上させます。
    """
    pos = jp.concatenate([
      state.pipeline_state.qpos[:3],
      state.pipeline_state.qpos[7:]])
    pos_targ = jp.concatenate([
      ref_qpos[:3],
      ref_qpos[7:]])
    pos_err = jp.linalg.norm(pos_targ - pos)
    vel_err = jp.linalg.norm(state.pipeline_state.qvel- ref_qvel)

    return pos_err + vel_err

  def _reward_feet_height(self, feet_pos, feet_pos_ref):
    return jp.sum(jp.abs(feet_pos - feet_pos_ref)) # L1ノルムを使って0に近づけようとする

envs.register_environment('trotting_anymal', TrotAnymal)

#### FoPGによる模倣学習
NVIDIA 3060 TI GPUで15分かかります

In [None]:
make_networks_factory = functools.partial(
    apg_networks.make_apg_networks,
    hidden_layer_sizes=(256, 128)
)

epochs = 499

train_fn = functools.partial(apg.train,
                             episode_length=240,
                             policy_updates=epochs,
                             horizon_length=32,
                             num_envs=64,
                             learning_rate=1e-4,
                             num_eval_envs=64,
                             num_evals=10 + 1,
                             use_float64=True,
                             normalize_observations=True,
                             network_factory=make_networks_factory)

In [None]:
x_data = []
y_data = []
ydataerr = []
times = [datetime.now()]

def progress(it, metrics):
  times.append(datetime.now())
  x_data.append(it)
  y_data.append(metrics['eval/episode_reward'])
  ydataerr.append(metrics['eval/episode_reward_std'])

# 各足は毎秒2回地面に接触する
env = envs.get_environment("trotting_anymal", step_k = 13)
eval_env = envs.get_environment("trotting_anymal", step_k = 13)

make_inference_fn, params, _= train_fn(environment=env,
                                       progress_fn=progress,
                                       eval_env=eval_env)

plt.errorbar(x_data, y_data, yerr=ydataerr)

In [None]:
demo_env = envs.training.EpisodeWrapper(env, 
                                        episode_length=1000, 
                                        action_repeat=1)

render_rollout(
  jax.jit(demo_env.reset),
  jax.jit(demo_env.step),
  jax.jit(make_inference_fn(params)),
  demo_env,
  n_steps=200,
  seed=1
)

model_path = '/tmp/trotting_2hz_policy'
model.save_params(model_path, params)

**サンプル効率に関する注意**

上記では、epochs * horizon_length * num_envs = 1.024e6のシミュレータステップを使用して学習しています。PPOで10倍のサンプルを使用した場合の結果を見てみましょう：

In [None]:
train_fn = functools.partial(
    ppo.train, num_timesteps=10_000_000, num_evals=10, reward_scaling=0.1,
    episode_length=1000, normalize_observations=True, action_repeat=1,
    unroll_length=10, num_minibatches=32, num_updates_per_batch=8,
    discounting=0.97, learning_rate=3e-4, entropy_cost=1e-3, num_envs=1024,
    batch_size=1024, seed=0)

x_data = []
y_data = []
ydataerr = []
env = envs.get_environment("trotting_anymal", step_k = 13)

def progress(num_steps, metrics):
  x_data.append(num_steps)
  y_data.append(metrics['eval/episode_reward'])
  ydataerr.append(metrics['eval/episode_reward_std'])

make_inference_fn, params, _= train_fn(environment=env, progress_fn=progress)

plt.errorbar(x_data, y_data, yerr=ydataerr)
plt.xlabel('# environment steps')
plt.ylabel('reward per episode')

PPOがAPGに追いつくには約9e6のシミュレータステップが必要であることがわかります。

## スタディ2：四足歩行ロボットの移動

模倣学習の例で見たように、FoPG法は詳細な報酬信号の恩恵を受けます。移動を教えるために、Raibertヒューリスティックに基づいて足に報酬を与えます。[先行研究](https://arxiv.org/abs/2403.14864) と同様に、対角の脚ペアが固定周波数で同期して動くようにインセンティブを与える歩行スケジュールを使用します。新しいスケジュールされたステップの開始時に、ステップの終了時の足の目標位置を計算します。

各足について、以下を計算します：

$$
p^* = h_0 + \frac{\Delta T}{2} v_0
$$

ここで $p^*$ は足の目標位置のx, y成分、$h_0$ は離地時の対応する股関節のx, y成分、$\Delta T$ はスケジュールされたステップの持続時間、$v_0$ は離地時のベース速度です。

探索能力が限られているため、FoPG法はポリシーの良い「初期推定」を持つことで大きな恩恵を受けます。これはモデル予測制御や軌道最適化ではよく知られた用語です。問題を [残差学習](https://arxiv.org/abs/1512.03385) として定式化します。$\phi$ を既に持っているベースラインポリシーのパラメータとし、$f$ と $g$ をそれぞれ学習されるポリシーとベースラインポリシーのニューラルネットワークとします。$\phi$ を固定し、以下のポリシーのパラメータ $\theta$ を学習します：

$$
a_t = f(g(x_t; \phi), x_t; \theta) + g(x_t; \phi)
$$

前のセクションのその場トロット歩行ポリシーを $\phi$ として使用し、$x_t$ は時刻tの状態を表します。この例では0.75 m/sの速度目標を追跡しますが、より速いトロット歩行の方が安定するため、より速い速度目標に対して $\phi$ を試すこともできます！

単純にパラメータ $\theta$ を $\phi$ として初期化し、ポリシー $a_t = f(x_t; \theta)$ で学習を「ホットスタート」する方がより自然に思えるかもしれませんが、実際には残差法の方がより安定して学習できることがわかっています。

#### RL環境

In [None]:
def axis_angle_to_quaternion(v: jp.ndarray, theta:jp.float_):
    """ 
    軸角度表現：vの周りにthetaだけ回転。
    """    
    return jp.concatenate([jp.cos(0.5*theta).reshape(1), jp.sin(0.5*theta)*v.reshape(3)])

def get_config():
  """anymal四足歩行ロボット環境の報酬設定を返します。"""

  def get_default_rewards_config():
    default_config = config_dict.ConfigDict(
        dict(
            scales=config_dict.ConfigDict(
                dict(
                    tracking_lin_vel = 1.0,
                    orientation = -1.0, # 水平でないベース
                    height = 0.5,
                    lin_vel_z=-1.0, # 自殺ポリシーを防止
                    torque = -0.01,
                    feet_pos = -1, # 不良アクションのハードコーディング
                    feet_height = -1, # 静止を防止
                    joint_velocity = -0.001
                    )
            ),
        )
    )
    return default_config

  default_config = config_dict.ConfigDict(
      dict(rewards=get_default_rewards_config(),))

  return default_config

class FwdTrotAnymal(PipelineEnv):

  def __init__(
      self,
      termination_height: float=0.25,
      **kwargs,
  ):
    
    self.target_vel = kwargs.pop('target_vel', 0.75)
    step_k = kwargs.pop('step_k', 25)
    self.baseline_inference_fn = kwargs.pop("baseline_inference_fn")
    physics_steps_per_control_step = 10
    kwargs['n_frames'] = kwargs.get(
        'n_frames', physics_steps_per_control_step)
    self.termination_height = termination_height

    mj_model = mujoco.MjModel.from_xml_path(xml_path)
    kp = 230
    mj_model.actuator_gainprm[:, 0] = kp
    mj_model.actuator_biasprm[:, 1] = -kp
    self._init_q = mj_model.keyframe('standing').qpos
    self._default_ap_pose = mj_model.keyframe('standing').qpos[7:]
    self.reward_config = get_config()

    self.action_loc = self._default_ap_pose
    self.action_scale = jp.array([0.2, 0.8, 0.8] * 4)
    
    self.target_h = self._init_q[2]

    sys = mjcf.load_model(mj_model)
    super().__init__(sys=sys, **kwargs)
    
    """
    キネマティクス参照は歩行スケジューリングに使用されます。
    """

    kinematic_ref_qpos = make_kinematic_ref(
      cos_wave, step_k, scale=0.3, dt=self.dt)
    self.l_cycle = jp.array(kinematic_ref_qpos.shape[0])
    self.kinematic_ref_qpos = jp.array(kinematic_ref_qpos + self._default_ap_pose)

    """
    足の追跡
    """
    gait_k = step_k * 2
    self.gait_period = gait_k * self.dt

    self.step_k = step_k
    self.feet_inds = jp.array([21,28,35,42]) # LF, RF, LH, RH
    self.hip_inds = self.feet_inds - 6

    self.pipeline_step = jax.checkpoint(self.pipeline_step,
      policy=jax.checkpoint_policies.dots_with_no_batch_dims_saveable)
    
  def reset(self, rng: jax.Array) -> State:
    rng, key_xyz, key_ang, key_ax, key_q, key_qd = jax.random.split(rng, 6)

    qpos = jp.array(self._init_q)
    qvel = jp.zeros(18)
    
    #### ランダム性の追加 ####
  
    r_xyz = 0.2 * (jax.random.uniform(key_xyz, (3,))-0.5)
    r_angle = (jp.pi/12) * (jax.random.uniform(key_ang, (1,)) - 0.5) # 15度の範囲
    r_axis = (jax.random.uniform(key_ax, (3,)) - 0.5)
    r_axis = r_axis / jp.linalg.norm(r_axis)
    r_quat = axis_angle_to_quaternion(r_axis, r_angle)

    r_joint_q = 0.2 * (jax.random.uniform(key_q, (12,)) - 0.5)
    r_joint_qd = 0.1 * (jax.random.uniform(key_qd, (12,)) - 0.5)
  
    qpos = qpos.at[0:3].set(qpos[0:3] + r_xyz)
    qpos = qpos.at[3:7].set(r_quat)
    qpos = qpos.at[7:19].set(qpos[7:19] + r_joint_q)
    qvel = qvel.at[6:18].set(qvel[6:18] + r_joint_qd)
    
    data = self.pipeline_init(qpos, qvel)

    # 地面にめり込まず、地面の上にいることを保証
    pen = jp.min(data.contact.dist)
    qpos = qpos.at[2].set(qpos[2] - pen)
    data = self.pipeline_init(qpos, qvel)

    state_info = {
        'rng': rng,
        'steps': 0.0,
        'reward_tuple': {
            'tracking_lin_vel': 0.0,
            'orientation': 0.0,
            'height': 0.0,
            'lin_vel_z': 0.0,
            'torque': 0.0,
            'joint_velocity': 0.0,
            'feet_pos': 0.0,
            'feet_height': 0.0
        },
        'last_action': jp.zeros(12), # MJXチュートリアルより
        'baseline_action': jp.zeros(12),
        'xy0': jp.zeros((4, 2)),
        'k0': 0.0,
        'xy*': jp.zeros((4, 2))
    }

    x, xd = data.x, data.xd
    _obs = self._get_obs(data.qpos, x, xd, state_info) # 内部観測（トロッターへ）
  
    action_key, key = jax.random.split(state_info['rng'])
    state_info['rng'] = key
    next_action, _ = self.baseline_inference_fn(_obs, action_key)

    obs = jp.concatenate([_obs, next_action])

    reward, done = jp.zeros(2)
    metrics = {}
    for k in state_info['reward_tuple']:
      metrics[k] = state_info['reward_tuple'][k]
    state = State(data, obs, reward, done, metrics, state_info)
    return jax.lax.stop_gradient(state)

  def step(self, state: State, action: jax.Array) -> State:

    action = jp.clip(action, -1, 1)

    cur_base = state.obs[-12:]
    action += cur_base
    state.info['baseline_action'] = cur_base

    action = self.action_loc + (action * self.action_scale)

    data = self.pipeline_step(state.pipeline_state, action)
    
    # 観測データ
    x, xd = data.x, data.xd
    obs = self._get_obs(data.qpos, x, xd, state.info)

    # 転倒または落下した場合に終了
    done = 0.0
    done = jp.where(x.pos[0, 2] < self.termination_height, 1.0, done)
    up = jp.array([0.0, 0.0, 1.0])
    done = jp.where(jp.dot(math.rotate(up, x.rot[0]), up) < 0, 1.0, done)

    #### 足の位置参照の更新 ####

    # 新しいステップの開始を検出
    s = state.info['steps']
    step_num = s // (self.step_k)
    even_step = step_num % 2 == 0
    new_step = (s % self.step_k) == 0
    new_even_step = jp.logical_and(new_step, even_step)
    new_odd_step = jp.logical_and(new_step, jp.logical_not(even_step))

    # Raibertヒューリスティックを適用して、ステップ後の足の目標位置を計算
    hip_xy = data.geom_xpos[self.hip_inds][:,:2] # 4 x 2
    v_body = data.qvel[0:2]
    step_period = self.gait_period/2
    raibert_xy = hip_xy + (step_period/2) * v_body

    # 更新
    cur_tars = state.info['xy*']
    i_RFLH = jp.array([1, 2])
    i_LFRH = jp.array([0, 3])
    feet_xy = data.geom_xpos[self.feet_inds][:,:2]
    
    # トロット歩行では、対角の脚ペアの一方を動かし、
    # もう一方を固定する
    case_c1 = raibert_xy.at[i_LFRH].set(feet_xy[i_LFRH]) 
    case_c2 = raibert_xy.at[i_RFLH].set(feet_xy[i_RFLH])
    xy_tars = jp.where(new_even_step, case_c1, cur_tars)
    xy_tars = jp.where(new_odd_step, case_c2, xy_tars)
    state.info['xy*'] = xy_tars

    # ステップ開始時のタイムステップと位置を保存
    state.info['k0'] = jp.where(new_step,
                                state.info['steps'],
                                state.info['k0'])
    state.info['xy0'] = jp.where(new_step, 
                                 feet_xy,
                                 state.info['xy0'])

    # 報酬
    reward_tuple = {
        'tracking_lin_vel': (
            self._reward_tracking_lin_vel(jp.array([self.target_vel, 0, 0]), x, xd)
            * self.reward_config.rewards.scales.tracking_lin_vel
        ),
        'orientation': (
          self._reward_orientation(x)
          * self.reward_config.rewards.scales.orientation
        ),
        'lin_vel_z': (
            self._reward_lin_vel_z(xd)
            * self.reward_config.rewards.scales.lin_vel_z
        ),
        'height': (
          self._reward_height(data.qpos) 
          * self.reward_config.rewards.scales.height
        ),
        'torque': (
          self._reward_action(data.qfrc_actuator)
          * self.reward_config.rewards.scales.torque
        ),
        'joint_velocity': (
          self._reward_joint_velocity(data.qvel)
          * self.reward_config.rewards.scales.joint_velocity
        ),
        'feet_pos': (
          self._reward_feet_pos(data, state)
          * self.reward_config.rewards.scales.feet_pos
        ),
        'feet_height': (
          self._reward_feet_height(data, state.info)
          * self.reward_config.rewards.scales.feet_height
        )
    }
    
    reward = sum(reward_tuple.values())

    # 状態管理
    state.info['reward_tuple'] = reward_tuple
    state.info['last_action'] = action

    for k in state.info['reward_tuple'].keys():
      state.metrics[k] = state.info['reward_tuple'][k]

    # 次のアクション
    action_key, key = jax.random.split(state.info['rng'])
    state.info['rng'] = key
    next_action, _ = self.baseline_inference_fn(obs, action_key)
    obs = jp.concatenate([obs, next_action])

    state = state.replace(
        pipeline_state=data, obs=obs, reward=reward,
        done=done)
    return state

  def _get_obs(self, qpos: jax.Array, x: Transform, xd: Motion,
               state_info: Dict[str, Any]) -> jax.Array:

    inv_base_orientation = math.quat_inv(x.rot[0])
    local_rpyrate = math.rotate(xd.ang[0], inv_base_orientation)

    obs_list = []
    # ヨーレート
    obs_list.append(jp.array([local_rpyrate[2]]) * 0.25)
    # 射影重力
    obs_list.append(
        math.rotate(jp.array([0.0, 0.0, -1.0]), inv_base_orientation))
    # モーター角度
    angles = qpos[7:19]
    obs_list.append(angles - self._default_ap_pose)
    # 前回のアクション
    obs_list.append(state_info['last_action'])
    # 歩行スケジュール
    kin_ref = self.kinematic_ref_qpos[jp.array(state_info['steps']%self.l_cycle, int)]
    obs_list.append(kin_ref)

    obs = jp.clip(jp.concatenate(obs_list), -100.0, 100.0)

    return obs

  # ------------ 報酬関数 ----------------
  def _reward_tracking_lin_vel(
      self, commands: jax.Array, x: Transform, xd: Motion) -> jax.Array:
    # 線形速度コマンドの追従（xy軸）
    local_vel = math.rotate(xd.vel[0], math.quat_inv(x.rot[0]))
    lin_vel_error = jp.sum(jp.square(commands[:2] - local_vel[:2]))
    lin_vel_reward = jp.exp(-lin_vel_error)
    return lin_vel_reward
  def _reward_orientation(self, x: Transform) -> jax.Array:
    # 水平でないベース姿勢にペナルティ
    up = jp.array([0.0, 0.0, 1.0])
    rot_up = math.rotate(up, x.rot[0])
    return jp.sum(jp.square(rot_up[:2]))
  def _reward_lin_vel_z(self, xd: Motion) -> jax.Array:
    # z軸のベース線形速度にペナルティ
    return jp.clip(jp.square(xd.vel[0, 2]), 0, 10)
  def _reward_joint_velocity(self, qvel):
      return jp.clip(jp.sqrt(jp.sum(jp.square(qvel[6:]))), 0, 100)
  def _reward_height(self, qpos) -> jax.Array:
    return jp.exp(-jp.abs(qpos[2] - self.target_h)) # 1メートル以上の高さにはならない
  def _reward_action(self, action) -> jax.Array:
    return jp.sqrt(jp.sum(jp.square(action)))
  def _reward_feet_pos(self, data, state):        
    dt = (state.info['steps'] - state.info['k0']) * self.dt # スカラー
    step_period = self.gait_period / 2
    xyt = state.info['xy0'] + (state.info['xy*'] - state.info['xy0']) * (dt/step_period)

    feet_pos = data.geom_xpos[self.feet_inds][:, :2]

    rews = jp.sum(jp.square(feet_pos - xyt), axis=1)   
    rews = jp.clip(rews, 0, 10)
    return jp.sum(rews)
  def _reward_feet_height(self, data, state_info):
    """ 
    足の高さが整流正弦波を追跡する
    """
    h_tar = 0.1
    t = state_info['steps'] * self.dt
    offset = self.gait_period/2
    ref1 = jp.sin((2*jp.pi/self.gait_period)*t) # RFとLHの足
    ref2 = jp.sin((2*jp.pi/self.gait_period)*(t - offset)) # LFとRHの足
    
    ref1, ref2 = ref1 * h_tar, ref2 * h_tar
    h_tars = jp.array([ref2, ref1, ref1, ref2])
    h_tars = h_tars.clip(min=0, max=None) + 0.02 # 足の高さオフセット
    
    feet_height = data.geom_xpos[self.feet_inds][:,2]
    errs = jp.clip(jp.square(feet_height - h_tars), 0, 10)
    return jp.sum(errs)
  
envs.register_environment('anymal', FwdTrotAnymal)


#### FoPGによる残差学習
NVIDIA 3060 TI GPUで15分かかります

In [None]:
# トロット歩行の推論関数を再構築
make_networks_factory = functools.partial(
    apg_networks.make_apg_networks,
    hidden_layer_sizes=(256, 128)
)

nets = make_networks_factory(observation_size=1, # observation_size引数はパラメータ初期化にのみ使用されるため重要ではない
                             action_size=12,
                             preprocess_observations_fn=running_statistics.normalize)

make_inference_fn = apg_networks.make_inference_fn(nets)

# 移動学習の設定
make_networks_factory = functools.partial(
    apg_networks.make_apg_networks,
    hidden_layer_sizes=(128, 64)
)

epochs = 499

train_fn = functools.partial(apg.train,
                             episode_length=1000,
                             policy_updates=epochs,
                             horizon_length=32,
                             num_envs=64,
                             learning_rate=1.5e-4,
                             schedule_decay=0.995,
                             num_eval_envs=64,
                             num_evals=10 + 1,
                             use_float64=True,
                             normalize_observations=True,
                             network_factory=make_networks_factory)

model_path = '/tmp/trotting_2hz_policy'
params = model.load_params(model_path)
baseline_inference_fn = make_inference_fn(params)

env_kwargs = dict(target_vel=0.75, step_k=13, 
                  baseline_inference_fn=baseline_inference_fn)

In [None]:
x_data = []
y_data = []
ydataerr = []
times = [datetime.now()]

def progress(it, metrics):
  times.append(datetime.now())
  x_data.append(it)
  y_data.append(metrics['eval/episode_reward'])
  ydataerr.append(metrics['eval/episode_reward_std'])

env = envs.get_environment("anymal", **env_kwargs)
eval_env = envs.get_environment("anymal", **env_kwargs)

make_inference_fn, params, _= train_fn(environment=env,
                                       progress_fn=progress,
                                       eval_env=eval_env)

plt.errorbar(x_data, y_data, yerr=ydataerr)

In [None]:
demo_env = envs.training.EpisodeWrapper(env, 
                                        episode_length=1000, 
                                        action_repeat=1)

render_rollout(
  jax.jit(demo_env.reset),
  jax.jit(demo_env.step),
  jax.jit(make_inference_fn(params)),
  demo_env,
  n_steps=200,
  camera="track"
)

**サンプル効率に関する注意**

PPOで再度1e7サンプルを使用した場合と比較してみましょう：

In [None]:
train_fn = functools.partial(
    ppo.train, num_timesteps=10_000_000, num_evals=10, reward_scaling=0.1,
    episode_length=1000, normalize_observations=True, action_repeat=1,
    unroll_length=10, num_minibatches=32, num_updates_per_batch=8,
    discounting=0.97, learning_rate=3e-4, entropy_cost=1e-3, num_envs=1024,
    batch_size=1024, seed=0)

x_data = []
y_data = []
ydataerr = []

env = envs.get_environment("anymal", **env_kwargs)

def progress(num_steps, metrics):
  x_data.append(num_steps)
  y_data.append(metrics['eval/episode_reward'])
  ydataerr.append(metrics['eval/episode_reward_std'])

make_inference_fn, params, _= train_fn(environment=env, progress_fn=progress)

plt.errorbar(x_data, y_data, yerr=ydataerr)
plt.xlabel('# environment steps')
plt.ylabel('reward per episode')

PPOはこの設定では移動の学習に苦戦し、10倍以上のシミュレータステップを使用しても難しいことがわかります。

これはPPOの欠点を示しているのではなく、FoPG法を効果的に活用するポリシー学習のセットアップを実証しています。良いベースラインからの小さく正確な摂動を学習すること、つまり深い谷のローカルミニマムに最適化することが含まれます。FoPGは、足の配置スプラインのような微妙な報酬信号を使ってこれらの摂動を導くのに十分な精度を持っています。

一方、PPOなどのRLアルゴリズムは、より構造化されていない報酬を持つ [ポリシー学習のセットアップ](https://colab.research.google.com/github/google-deepmind/mujoco/blob/main/mjx/tutorial.ipynb) で恩恵を受けます。FoPG法とは異なり、転倒時の大きなペナルティなどのスパースで非微分可能な報酬から [大きな恩恵](https://www.science.org/doi/abs/10.1126/scirobotics.adg1462) を受けます。