# 応募用ファイルの作成手順

ここでは環境構築から始めて, エージェントを作成して投稿ができるフォーマットに整理し, 実際に対戦を実行して最終的に応募用ファイルを作成するまでの流れの一例を説明する.

## 事前準備

### シミュレータのダウンロード

[コンペティションサイト](https://user.competition.signate.jp/ja/competition/detail/?competition=de1556abda294254b30bdec61520f764)から`simulator_dist.zip`をダウンロードして, 作業ディレクトリに展開しておく. 部門別にダウンロード先は異なるので, 注意. オープン部門は[こちら](https://user.competition.signate.jp/ja/competition/detail/?competition=de1556abda294254b30bdec61520f764&task=58817e1148ad44828cd34018296d09b6&tab=task), ユース部門は[こちら](https://user.competition.signate.jp/ja/competition/detail/?competition=de1556abda294254b30bdec61520f764&task=ddc905b07101400b83e5a43d3e7c7b72&tab=task)のデータタブよりダウンロードする. 展開後, 以下のようなディレクトリ構造のファイル群が作成される.

```bash
simulator_dist
├── Agents                            : 行動判断モジュールを格納したディレクトリ
├── common                            : コンペティションで定義されている戦闘場面などの設定ファイルを格納したディレクトリ
├── dist                              : シミュレータ本体を含めた依存ライブラリのwhlファイルを格納したディレクトリ
├── dockerfiles                       : Dockerによる環境構築に使われるベースイメージファイルを格納したディレクトリ
│   ├── Dockerfile.cpu                : CPU用の環境
│   ├── Dockerfile.gpu                : GPU用の環境
│   └── Dockerfile.macos              : MacOS用の環境
├── docs                              : 説明資料を格納したディレクトリ
│   └── html
│       └── core
│           └── index.html            : 説明資料のトップページ
├── figs                              : 説明図などを格納したディレクトリ
├── simulator                         : シミュレータ本体
├── src                               : 対戦などを行うためのモジュールを実装したプログラム
├── docker-compose.yml                : 仮想環境を構築するための定義ファイル
├── make_agent.ipynb                  : エージェントを作成するための簡単な手順を示したノートブック
├── README.md                         : シミュレータなどに関する解説ドキュメント
├── replay.py                         : 戦闘シーンを動画として作成するプログラム
└── validate.py                       : エージェント同士の対戦を実行するプログラム
```


### ドキュメントリンクの有効化

必要に応じて`tutorial/docs`以下に, 配布されたシミュレータの`simulator_dist/docs`にあるドキュメントをすべてコピーしたうえで, `tutorial/docs/html`のドキュメントリンクを有効化しておく. 例えば, Pythonを使って

```bash
$ python -m http.server 5500
```
としてサーバを起動できる. VSCodeを使用している場合は, 拡張機能[LiveServer](https://marketplace.visualstudio.com/items?itemName=ritwickdey.LiveServer)をインストールし, このファイルがあるディレクトリを開いた状態で画面右下の「Go Live」アイコンをクリックすれば自動的に5500番ポートにサーバが起動する. すると, このノートブックに記載されているリンクが有効になり閲覧することができるようになる.

## 環境構築

`simulator_dist/README.md`の「環境構築」項目にある通りに好きな方法で構築する.

### Dockerによる構築(推奨)

`simulator_dist/docker-compose.yml`の`dev1`-`build`->`dockerfile`でOS別に`dockerfiles`のDockerfileを選択することで各自のOSに合わせて環境構築ができる. コンペティションの評価環境で実行する場合は, `simulator_dist/docker-compose.yml`を下記のように編集する.

```yaml
services:
  dev1:
    build:
      context: .
      dockerfile: dockerfiles/Dockerfile.cpu
    container_name: atla4
    ports:
      - "8080:8080"
    volumes:
      - .:/workspace
    tty: true
```

そして, 以下のコマンドを実行することで立ち上げる.

```bash
$ docker compose up -d
...
```
そして, 以下のコマンドでコンテナの中に入り, 分析や開発を行う.

```bash
$ docker exec -it atla4 bash
...
```

`simulator_dist/README.md`にあるように例えばDev Container拡張機能を導入済みのVSCodeで立ち上げたコンテナに入り, 動作確認などを行う.

```bash
$ cd /path/to/workspace
$ python validate.py --visualize 1
...
```

#### GPUを使う場合

ホスト側にCUDA環境に対応したGPUがある場合, CUDA環境に対応したイメージからコンテナを構築可能. 例えば`simulator_dist/docker-compose.yml`を以下のように編集しておく.

```yaml
services:
  dev1:
    build:
      context: .
      dockerfile: dockerfiles/Dockerfile.gpu
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    container_name: atla4
    ports:
      - "8080:8080"
    volumes:
      - .:/workspace
    tty: true
```

実際にGPUがコンテナ内で有効になっているかどうかは以下のコマンドで確認できる.

```bash
# コンテナに入った後
$ python -c "import torch; print(torch.cuda.is_available())"
True
$ python -c "import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))"
...{GPUデバイスのリスト}...
```

### 注意点

#### Dockerで構築する場合

- 場合によっては`libPython3.10`が導入されていない状態でコンテナが立ち上がってしまう可能性がある. シミュレータを動かしたときに`ImportError: libpython3.10.so.1.0: cannot open shared object file: No such file or directory`のようなエラーが出る場合は`libPython3.10`をあらかじめインストールしておくこと.
  - 現時点ではDockerfileでlibPython3.10がインストールされるように修正してある(例えば`simualtor_dist/dockerfiles/Dockerfile.cpu`の18行目参照).
- VNCが不要であれば, Dockerfileで対応する部分(例えば`simualtor_dist/dockerfiles/Dockerfile.cpu`の21行目～25行目)をコメントアウトなどしておくことでイメージ作成が早くなる.
  - `simulator_dist/README.md`にあるようにVSCodeでコンテナにアタッチしてVSCodeで実行することで動画付きで実行は可能.
    - ホスト側とディスプレイ連携ができているか確認しておく($DISPLAY環境変数の確認, テスト用のX11アプリ(例: xeyes や xclock)を使うなど).
    - 連携できていない場合はXサーバーを別途インストールしておく.
  - `simulator_dist/README.md`にあるようにxvfbにより, 動画として保存しておいて後で確認ということも可能.
  
  ```bash
  $ python validate.py --visualize 0 --replay 1
  ...
  $ xvfb-run python replay.py
  ...
  ```

#### ローカル環境で構築する場合

- `simulator_dist/dist`以下にシミュレータを含んだ必要なライブラリに対応したwhlファイルがOS別に提供されているが, Python3.10の環境下でビルドされているため, それ以外のPythonの環境ではシミュレータが動かない.
  - インストールする場合はvenvなどでPython3.10の環境を作り, そこでインストールするなどして対応すること.
- `simulator_dist/README.md`にある通り, 必要なライブラリをあらかじめインストールしておくこと.

  ```plaintext
  torch==2.7.0, !=2.0.1
  tensorflow==2.13.0
  pandas==2.1.4
  scikit-learn==1.6.1
  lightgbm==4.5.0
  timeout-decorator==0.5.0
  opencv-python-headless==4.11.0.86
  ```

- ソースからビルドする場合(`simulator_dist/simulator/install_all.py`により行う場合)は時間がかかる.
  - 自身でカスタマイズしたい場合など
  - MacOSの場合はCMakeのバージョンは3.30未満のものを使用すること. 詳細は`simulator_dist/README.md`を参照.

- GPUが有効になっているかどうかは`nvidia-smi`コマンドやDockerの場合と同様に以下のコマンドで確認できる.

  ```bash
  # コンテナに入った後
  $ python -c "import torch; print(torch.cuda.is_available())"
  True
  $ python -c "import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))"
  ...{GPUデバイスのリスト}...
  ```

#### その他

- Google Colabで構築する場合, 提供されている`simulator_dist/dist/linux`のwhlによってインストールしてもPythonバージョンが合わないためシミュレータが動かない.
  - 現在のGoogle ColabのPythonのバージョンは3.11なのでGoogle Colabの中で直接ソースからビルドするか, Pythonのバージョンを合わせたうえでビルドしたwhlファイルによってインストールするなどの方法が考えられる.

## エージェントの作成

基本的には`simulator_dist/make_agent.ipynb`などに従うが, observationを作成する`makeObs`メソッドやその型(観測空間)を定義する`observation_space`メソッド, 行動空間を定義する`action_space`メソッドの作成の仕方についていくつか注意点がある.

### `makeObs`メソッドと`observation_space`メソッドの実装

actionを生成するためのobservationを作成する`makeObs`メソッドを実装する.

- 強化学習を実装したい場合は基本的には`makeObs`メソッドで返る値は`dict`とすればよい(policyモデルに渡す想定).
  - `observation_space`メソッドで`shape`を定義する必要がある. あらかじめどのようなobservationを作成するかを設計したうえで定義する.
  - 対戦ログとしてobservationを取得するにはjson形式で保存できることが前提であるが, 今回の場合は`numpy.ndarray`は自動的に`list`に変換される.
  - その他の型を返す場合はデフォルトでjson形式で保存できるような処理を挟む必要がある(`observation_space`メソッドとの整合も考慮).
  - 特にログが欲しくない場合はそのままでよい(保存に失敗した場合, ログは勝敗結果と自分の陣営の色のみの情報になるが対戦結果自体はリーダーボードに反映される).
- 行動判断モデルとして特に機械学習モデルを使用しないアルゴリズムを実装している場合は保存しておきたい情報をjson形式で保存できるように処理したものを返せばよい.
  - そもそもログが必要ないなら適当に`0`などの値を返しておく.

観測可能な主な情報は以下の通り.

1. 自分と味方の機体諸元(位置, 速度, 姿勢, 角速度, 残弾数)
1. 自分と味方の誘導弾諸元(位置, 速度, 目標ID, 誘導状態)
1. 相手の機体諸元(位置と速度のみ)
1. 相手の誘導弾諸元(方向のみ)
1. 戦闘開始からの経過時間

詳細は[「第4回空戦AIチャレンジ向けチュートリアル/Agent が入出力 observables と commands の形式#Agent が受け取ることのできる observables」](http://localhost:5500/docs/html/core/a198024.html#section_r7_contest_agent_observables)を参照されたい. 観測情報は階層構造になっていることに注目して例えば彼機の航跡は以下のような形で取得する.

```Python
parent.observables.at_p("/sensor/track")
```

以下では例として'common'をkeyとした残り時間, 'parent'をkeyとした自機の位置ベクトルと速度ベクトル, 'enemy'をkeyとした検知している彼機の位置ベクトルと速度ベクトル, 'friend_missile'をkeyとした自機の誘導弾の位置ベクトルと速度ベクトル, 'enemy_missile'をkeyとしたmwsで検知している2次元航跡データとしたdictを生成する.

Blue側でもRed側でも同じになるように, 自機座標系でないベクトル量は陣営座標系(自陣営の進行方向が+x方向となるようにz軸まわりに回転させ, 防衛ライン中央が原点となるように平行移動させた座標系)で表現する.

その他, observation_maskとaction_maskも定義している. observation_maskはuse_observation_maskをtrueにしたときのみ生成される, 各要素に有効な値が入っているかどうかを表すマスク. 1が有効, 0が無効を表すものとする. action_maskはuse_action_maskをtrueにしたときのみに生成される, 各parentの各行動空間の有効な選択肢を表すマスク. 1が有効, 0が無効を表すものとする. ここではtargetのみマスクを計算し, それ以外の行動については全て有効(1)を出力する.

observationはステップごとにログとして残すことが可能であるが, json形式で保存できるフォーマットである必要があるが, 前述の通りvalueを`numpy.ndarray`として定義している場合は特に気にすることはない. それ以外の場合はあらかじめPythonプリミティブの型として作成するフラグ(例えばconfigファイルで定義しておいて, インスタンス化したときに定義できるようにしておく)を定義しておいて, 適宜切り替えができるようにしておくとよい.

そしてobservationの形を定義する`observation_space`メソッドを実装する. 各keyに応じて定義した通りの形を返す.

`R7ContestSample`モジュールではより細かいobservationの作成方法が実装されているので, [simulator/sample/modules/R7ContestSample/R7ContestSample/R7ContestPyAgentSamle01.py](http://localhost:5500/docs/html/core/a198025.html)などを参照されたい.

In [None]:
import numpy as np
from ASRCAISim1.core import Agent, getValueFromJsonK, getValueFromJsonKR, getValueFromJsonKRD, MotionState, Track3D, Track2D, Time, TimeSystem, deg2rad, serialize_attr_with_type_info,AltitudeKeeper
from BasicAgentUtility.util import TeamOrigin, sortTrack3DByDistance, sortTrack2DByAngle
from gymnasium import spaces


class SampleAgent(Agent):
    ...
    class ActionInfo:
        #機体に対するコマンドを生成するための変数をまとめた構造体
        def __init__(self):
            self.dstDir=np.array([1.0,0.0,0.0]) #目標進行方向
            self.dstAlt=10000.0 #目標高度
            self.velRecovery=False #下限速度制限からの回復中かどうか
            self.asThrottle=False #加減速についてスロットルでコマンドを生成するかどうか
            self.keepVel=False #加減速について等速(dstAccel=0)としてコマンドを生成するかどうか
            self.dstThrottle=1.0 #目標スロットル
            self.dstV=300 #目標速度
            self.launchFlag=False #射撃するかどうか
            self.target=Track3D() #射撃対象
            self.lastShotTimes={} #各Trackに対する直前の射撃時刻
        def serialize(self, archive):
            serialize_attr_with_type_info(archive, self
                ,"dstDir"
                ,"dstAlt"
                ,"velRecovery"
                ,"asThrottle"
                ,"keepVel"
                ,"dstThrottle"
                ,"dstV"
                ,"launchFlag"
                ,"target"
                ,"lastShotTimes"
            )
        _allow_cereal_serialization_in_cpp = True
        def save(self, archive):
            self.serialize(archive)
        @classmethod
        def static_load(cls, archive):
            ret=cls()
            ret.serialize(archive)
            return ret


    def initialize(self):
        super().initialize()
        self.own = self.getTeam()
        self.common_dim = 1
        self.maxParentNum=getValueFromJsonK(self.modelConfig,"maxParentNum")
        self.maxFriendNum=getValueFromJsonK(self.modelConfig,"maxFriendNum")
        self.maxEnemyNum=getValueFromJsonK(self.modelConfig,"maxEnemyNum")
        self.maxFriendMissileNum=getValueFromJsonK(self.modelConfig,"maxFriendMissileNum")
        self.maxEnemyMissileNum=getValueFromJsonK(self.modelConfig,"maxEnemyMissileNum")
        self.use_observation_mask=getValueFromJsonK(self.modelConfig,"use_observation_mask")
        self.use_action_mask=getValueFromJsonK(self.modelConfig,"use_action_mask")
        self.remaining_time_clipping=getValueFromJsonKR(self.modelConfig,"remaining_time_clipping",self.randomGen)
        self.friend_dim=7
        self.horizontalNormalizer=getValueFromJsonKR(self.modelConfig,"horizontalNormalizer",self.randomGen)
        self.verticalNormalizer=getValueFromJsonKR(self.modelConfig,"verticalNormalizer",self.randomGen)
        self.fgtrVelNormalizer=getValueFromJsonKR(self.modelConfig,"fgtrVelNormalizer",self.randomGen)
        self.enemy_dim=7
        self.friend_missile_dim=7
        self.mslVelNormalizer=getValueFromJsonKR(self.modelConfig,"mslVelNormalizer",self.randomGen)
        self.enemy_missile_dim = 3


        #actionに関するもの
        # 左右旋回に関する設定
        self.dstAz_relative=getValueFromJsonK(self.modelConfig,"dstAz_relative")
        self.turnTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"turnTable")),dtype=np.float64)
        self.turnTable*=deg2rad(1.0)
        self.use_override_evasion=getValueFromJsonK(self.modelConfig,"use_override_evasion")
        if self.use_override_evasion:
            self.evasion_turnTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"evasion_turnTable")),dtype=np.float64)
            self.evasion_turnTable*=deg2rad(1.0)
            assert len(self.turnTable)==len(self.evasion_turnTable)
        else:
            self.evasion_turnTable=self.turnTable

        self.actionInfos={}
        for port,parent in self.parents.items():
            self.actionInfos[parent.getFullName()]=self.ActionInfo()

        # 加減速に関する設定
        self.accelTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"accelTable")),dtype=np.float64)

        #行動制限に関する設定
        # 場外制限に関する設定
        self.dOutLimit=getValueFromJsonKRD(self.modelConfig,"dOutLimit",self.randomGen,5000.0)
        self.dOutLimitThreshold=getValueFromJsonKRD(self.modelConfig,"dOutLimitThreshold",self.randomGen,10000.0)
        self.dOutLimitStrength=getValueFromJsonKRD(self.modelConfig,"dOutLimitStrength",self.randomGen,2e-3)

        #  高度制限に関する設定
        self.altMin=getValueFromJsonKRD(self.modelConfig,"altMin",self.randomGen,2000.0)
        self.altMax=getValueFromJsonKRD(self.modelConfig,"altMax",self.randomGen,15000.0)
        self.altitudeKeeper=AltitudeKeeper(self.modelConfig().get("altitudeKeeper",{}))

        # 同時射撃数の制限に関する設定
        self.maxSimulShot=getValueFromJsonKRD(self.modelConfig,"maxSimulShot",self.randomGen,4)

        # 下限速度の制限に関する設定
        self.minimumV=getValueFromJsonKRD(self.modelConfig,"minimumV",self.randomGen,150.0)
        self.minimumRecoveryV=getValueFromJsonKRD(self.modelConfig,"minimumRecoveryV",self.randomGen,180.0)
        self.minimumRecoveryDstV=getValueFromJsonKRD(self.modelConfig,"minimumRecoveryDstV",self.randomGen,200.0)


    def validate(self):
        #Rulerに関する情報の取得
        rulerObs=self.manager.getRuler()().observables()
        self.dOut=rulerObs["dOut"] # 戦域中心から場外ラインまでの距離
        self.dLine=rulerObs["dLine"] # 戦域中心から防衛ラインまでの距離
        self.teamOrigin=TeamOrigin(self.own==rulerObs["eastSider"],self.dLine) # 陣営座標系変換クラス定義


    def makeObs(self):
        obs = {}
        observation_mask={}
        
        # common(残り時間)
        ret=np.zeros([self.common_dim],dtype=np.float32)
        rulerObs=self.manager.getRuler()().observables
        maxTime=rulerObs['maxTime']()
        ret[0]=min((maxTime-self.manager.getElapsedTime())/60.0, self.remaining_time_clipping)

        obs['common'] = ret

        #味方機(parents→parents以外の順)
        ret = np.zeros([self.maxParentNum, self.friend_dim],dtype=np.float32)
        parent_mask=np.zeros([self.maxParentNum],dtype=np.float32)
        self.ourMotion=[]
        self.ourObservables=[]
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        parentFullNames=set()
        # まずはparents
        for port, parent in self.parents.items():
            parentFullNames.add(parent.getFullName())
            if parent.isAlive():
                self.ourMotion.append(MotionState(parent.observables["motion"]).transformTo(self.getLocalCRS()))
                #残存していればobservablesそのもの
                self.ourObservables.append(parent.observables)
            else:
                self.ourMotion.append(MotionState())
                #被撃墜or墜落済なら本体の更新は止まっているので残存している親が代理更新したものを取得(誘導弾情報のため)
                self.ourObservables.append(
                    firstAlive.observables.at_p("/shared/fighter").at(parent.getFullName()))

        # その後にparents以外
        for fullName,fObs in firstAlive.observables.at_p("/shared/fighter").items():
            if not fullName in parentFullNames:
                if fObs.at("isAlive"):
                    self.ourMotion.append(MotionState(fObs["motion"]).transformTo(self.getLocalCRS()))
                else:
                    self.ourMotion.append(MotionState())

                self.ourObservables.append(fObs)
        fIdx = 0
        for port,parent in self.parents.items():
            if fIdx>=self.maxParentNum:
                break
            fObs=self.ourObservables[fIdx]
            fMotion=self.ourMotion[fIdx]
            if fObs.at("isAlive"):
                parent_mask[fIdx]=1
                pos=self.teamOrigin.relPtoB(fMotion.pos()) #慣性座標系→陣営座標系に変換
                vel=self.teamOrigin.relPtoB(fMotion.vel()) #慣性座標系→陣営座標系に変換
                a=np.zeros([self.friend_dim],dtype=np.float32)
                ofs = 0
                a[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
                ofs += 3
                V=np.linalg.norm(vel)
                a[ofs]=V/self.fgtrVelNormalizer
                ofs+=1
                a[ofs:ofs+3]=vel/max(V, 1e-5)
                ret[fIdx, :] = a
            fIdx+=1

        obs['parent'] = ret
        if self.use_observation_mask:
            observation_mask['parent']=parent_mask

        #彼機(味方の誰かが探知しているもののみ)
        #観測されている航跡を、自陣営の機体に近いものから順にソートしてlastTrackInfoに格納する。
        #lastTrackInfoは行動のdeployでも射撃対象の指定のために参照する。
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        self.lastTrackInfo=[Track3D(t).transformTo(self.getLocalCRS()) for t in firstAlive.observables.at_p("/sensor/track")]
        sortTrack3DByDistance(self.lastTrackInfo,self.ourMotion,True)

        ret=np.zeros([self.maxEnemyNum,self.enemy_dim],dtype=np.float32)
        enemy_mask=np.zeros([self.maxEnemyNum],dtype=np.float32)
        for tIdx,track in enumerate(self.lastTrackInfo):
            if tIdx>=self.maxEnemyNum:
                break
            t=np.zeros([self.enemy_dim],dtype=np.float32)
            ofs=0
            pos=self.teamOrigin.relPtoB(track.pos()) #慣性座標系→陣営座標系に変換
            t[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ofs+=3
            vel=self.teamOrigin.relPtoB(track.vel()) #慣性座標系→陣営座標系に変換
            V=np.linalg.norm(vel)
            t[ofs]=V/self.fgtrVelNormalizer
            ofs+=1
            t[ofs:ofs+3]=vel/max(V, 1e-5)
            ofs+=3
            ret[tIdx,:]=t
            enemy_mask[tIdx]=1

        obs['enemy']=ret
        if self.use_observation_mask:
            observation_mask['enemy']=enemy_mask


        #味方誘導弾(射撃時刻の古い順にソート
        def launchedT(m):
            return Time(m["launchedT"]) if m["isAlive"] and m["hasLaunched"] else Time(np.inf,TimeSystem.TT)
        self.msls=sorted(sum([[m for m in f.at_p("/weapon/missiles")] for f in self.ourObservables],[]),key=launchedT)
        ret=np.zeros([self.maxFriendMissileNum,self.friend_missile_dim],dtype=np.float32)
        friend_missile_mask=np.zeros([self.maxFriendMissileNum],dtype=np.float32)
        for mIdx,mObs in enumerate(self.msls):
            if mIdx>=self.maxFriendMissileNum or not (mObs.at("isAlive") and mObs.at("hasLaunched")):
                break
            a=np.zeros([self.friend_missile_dim],dtype=np.float32)
            ofs=0
            mm=MotionState(mObs["motion"]).transformTo(self.getLocalCRS())
            pos=self.teamOrigin.relPtoB(mm.pos()) #慣性座標系→陣営座標系に変換
            a[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ofs+=3
            vel=self.teamOrigin.relPtoB(mm.vel()) #慣性座標系→陣営座標系に変換
            V=np.linalg.norm(vel)
            a[ofs]=V/self.mslVelNormalizer
            ofs+=1
            a[ofs:ofs+3]=vel/max(V, 1e-5)
            ret[mIdx,:]=a
            friend_missile_mask[mIdx]=1
        
        obs['friend_missile'] = ret
        if self.use_observation_mask:
            observation_mask['friend_missile'] = friend_missile_mask

        #彼側誘導弾(各機の正面に近い順にソート)
        self.mws=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            fObs=self.ourObservables[fIdx]
            self.mws.append([])
            if fObs["isAlive"]:
                if fObs.contains_p("/sensor/mws/track"):
                    for mObs in fObs.at_p("/sensor/mws/track"):
                        self.mws[fIdx].append(Track2D(mObs).transformTo(self.getLocalCRS()))
                sortTrack2DByAngle(self.mws[fIdx],fMotion,np.array([1,0,0]),True)
        ret=np.zeros([self.maxEnemyMissileNum,self.enemy_missile_dim],dtype=np.float32)
        enemy_missile_mask=np.zeros([self.maxEnemyMissileNum],dtype=np.float32)
        allMWS=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            if self.ourObservables[fIdx].at("isAlive"):
                for m in self.mws[fIdx]:
                    angle=np.arccos(np.clip(m.dir().dot(fMotion.dirBtoP(np.array([1,0,0]))),-1,1))
                    allMWS.append([m,angle])
        allMWS.sort(key=lambda x: x[1])
        for mIdx,m in enumerate(allMWS):
            if mIdx>=self.maxEnemyMissileNum:
                break
            a=np.zeros([self.enemy_missile_dim],dtype=np.float32)
            ofs=0
            origin=self.teamOrigin.relPtoB(m[0].origin()) #慣性座標系→陣営座標系に変換
            a[ofs:ofs+3]=origin/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ret[mIdx,:]=a
            enemy_missile_mask[mIdx]=1
        
        obs['enemy_missile'] = ret
        if self.use_observation_mask:
            observation_mask['enemy_missile']=enemy_missile_mask

        if self.use_observation_mask and len(observation_mask)>0:
            obs['observation_mask']=observation_mask

        if self.use_action_mask:
            self.action_mask=self.makeActionMask()
            if not self.action_mask is None:
                obs['action_mask']=self.action_mask


        return obs


    def makeActionMask(self):
        #無効な行動を示すマスクを返す
        #有効な場合は1、無効な場合は0とする。
        if self.use_action_mask:
            #このサンプルでは射撃目標のみマスクする。
            target_mask=np.zeros([1+self.maxEnemyNum],dtype=np.float32)
            target_mask[0]=1#「射撃なし」はつねに有効
            for tIdx,track in enumerate(self.lastTrackInfo):
                if tIdx>=self.maxEnemyNum:
                    break
                target_mask[1+tIdx]=1

            ret=[]
            for port,parent in self.parents.items():
                mask={}
                mask["turn"]=np.full([len(self.turnTable)],1,dtype=np.float32)
                mask["accel"]=np.full([len(self.accelTable)],1,dtype=np.float32)
                mask["target"]=target_mask
                ret.append(mask)
            return ret
        else:
            return None


    def observation_space(self):
        floatLow=np.finfo(np.float32).min
        floatHigh=np.finfo(np.float32).max
        obs_space = {
            'common': spaces.Box(floatLow,floatHigh,
                                 shape=[self.common_dim],
                                 dtype=np.float32),
            'parent': spaces.Box(floatLow,floatHigh,
                                 shape=[self.maxParentNum,self.friend_dim],
                                 dtype=np.float32),
            'enemy': spaces.Box(floatLow,floatHigh,
                                shape=[self.maxEnemyNum,self.enemy_dim],
                                dtype=np.float32),
            'friend_missile': spaces.Box(floatLow,floatHigh,
                                         shape=[self.maxFriendMissileNum,self.friend_missile_dim],
                                         dtype=np.float32),
            'enemy_missile': spaces.Box(floatLow,floatHigh,
                                        shape=[self.maxEnemyMissileNum,self.enemy_missile_dim],
                                        dtype=np.float32)
        }
        if self.use_observation_mask:
            observation_mask = {
                'parent': spaces.Box(floatLow,floatHigh,
                                     shape=[self.maxParentNum],
                                     dtype=np.float32),
                'enemy': spaces.Box(floatLow,floatHigh,
                                    shape=[self.maxEnemyNum],
                                    dtype=np.float32),
                'friend_missile': spaces.Box(floatLow,floatHigh,
                                             shape=[self.maxFriendMissileNum],
                                             dtype=np.float32),
                'enemy_missile': spaces.Box(floatLow,floatHigh,
                                            shape=[self.maxEnemyMissileNum],
                                            dtype=np.float32)
            }
            obs_space['observation_mask'] = spaces.Dict(observation_mask) # type: ignore

        if self.use_action_mask:
            single_action_mask_space_dict = {
                'turn': spaces.Box(floatLow,floatHigh,
                                   shape=[len(self.turnTable)],
                                   dtype=np.float32),
                'accel': spaces.Box(floatLow,floatHigh,
                                    shape=[len(self.accelTable)],
                                    dtype=np.float32),
                'target': spaces.Box(floatLow,floatHigh,
                                     shape=[1+self.maxEnemyNum],
                                     dtype=np.float32)
            }
            single_action_mask_space=spaces.Dict(single_action_mask_space_dict) # type: ignore
            action_mask_space_list=[]
            for port, parent in self.parents.items():
                action_mask_space_list.append(single_action_mask_space)
            obs_space['action_mask'] = spaces.Tuple(action_mask_space_list) # type: ignore

        return spaces.Dict(obs_space) # type: ignore



### `action_space`メソッドの実装

actionは`policy`によって返されて, `deploy`メソッドに渡されて自機の動きや誘導弾発射の意思決定などを行う. 今回のシミュレーション時刻の最小単位(tick)は1.0sである. 行動判断モデルが gymnasium インターフェースの外側で行動判断を行う周期はこれより長くしてもよい.

以下の項目に関連するような行動を定義しておく.

1. 自分と味方の機動
    - ある基準方位を0として目標方位(右を正)で指定する。
    - 基準方位は、dstAz_relativeフラグをTrueとした場合、自機正面となり、Falseとした場合、自陣営の進行すべき方向となる。
    - 目標方位の選択肢はturnTableで与える。
1. 射撃有無と射撃対象
    - 0を射撃なし、1〜maxEnemyNumを対応するlastTrackInfoのTrack3DとしたDiscrete形式で指定する

actionが存在すべき行動空間(このあと実装するpolicyの出力するactionの種類数)を定義する. 各parentのactionを表すDictをparentの数だけ並べたTupleとする. なお, 強化学習を前提としない場合は特に気にすることはなく何らかの適当な値を返すような実装としておいてもよい(`spaces.Discrete()`を返すなど).

[「第4回空戦AIチャレンジ向け強化学習 Agent サンプル/実装の概要/actionの形式」](http://localhost:5500/docs/html/core/a198025.html#section_r7_contest_agent_sample_action)も参照されたい.

`deploy`メソッドの実装例も示してあるので, policyが返した`action`をどのように渡しているか確認. 

In [None]:
from gymnasium import spaces
from math import atan2, cos, sin
from BasicAgentUtility.util import calcRNorm


class SampleAgent(Agent):
    ...
    def action_space(self):
        single_action_space_dict={
            'turn': spaces.Discrete(len(self.turnTable)),
            'target': spaces.Discrete(1+self.maxEnemyNum),
            'accel': spaces.Discrete(len(self.accelTable))
        }
        single_action_space=spaces.Dict(single_action_space_dict) # type: ignore
        action_space_list=[]
        for port, parent in self.parents.items():
            action_space_list.append(single_action_space)
        return spaces.Tuple(action_space_list) # 行動空間の定義
    

    def deploy(self, action):
        #observablesの収集
        #味方機(parents→parents以外の順)
        self.ourMotion=[]
        self.ourObservables=[]
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        parentFullNames=set()
        # まずはparents
        for port, parent in self.parents.items():
            parentFullNames.add(parent.getFullName())
            if parent.isAlive():
                self.ourMotion.append(MotionState(parent.observables["motion"]).transformTo(self.getLocalCRS()))
                #残存していればobservablesそのもの
                self.ourObservables.append(parent.observables)
            else:
                self.ourMotion.append(MotionState())
                #被撃墜or墜落済なら本体の更新は止まっているので残存している親が代理更新したものを取得(誘導弾情報のため)
                self.ourObservables.append(
                    firstAlive.observables.at_p("/shared/fighter").at(parent.getFullName()))

        # その後にparents以外
        for fullName,fObs in firstAlive.observables.at_p("/shared/fighter").items():
            if not fullName in parentFullNames:
                if fObs.at("isAlive"):
                    self.ourMotion.append(MotionState(fObs["motion"]).transformTo(self.getLocalCRS()))
                else:
                    self.ourMotion.append(MotionState())

                self.ourObservables.append(fObs)


        # 彼機情報だけは射撃対象の選択と連動するので更新してはいけない。
        #味方誘導弾(射撃時刻の古い順にソート)
        def launchedT(m):
            return Time(m["launchedT"]) if m["isAlive"] and m["hasLaunched"] else Time(np.inf,TimeSystem.TT)
        self.msls=sorted(sum([[m for m in f.at_p("/weapon/missiles")] for f in self.ourObservables],[]),key=launchedT)

        #彼側誘導弾(各機の正面に近い順にソート)
        self.mws=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            fObs=self.ourObservables[fIdx]
            self.mws.append([])
            if fObs["isAlive"]:
                if fObs.contains_p("/sensor/mws/track"):
                    for mObs in fObs.at_p("/sensor/mws/track"):
                        self.mws[fIdx].append(Track2D(mObs).transformTo(self.getLocalCRS()))
                sortTrack2DByAngle(self.mws[fIdx],fMotion,np.array([1,0,0]),True)

        for pIdx,parent in enumerate(self.parents.values()):
            parentFullName=parent.getFullName()
            if not parent.isAlive():
                continue
            actionInfo=self.actionInfos[parentFullName]
            myMotion=self.ourMotion[pIdx]
            myObs=self.ourObservables[pIdx]
            myMWS=self.mws[pIdx]
            myAction=action[pIdx]

            #左右旋回
            deltaAz=self.turnTable[myAction["turn"]]
            actionInfo.dstDir=self.teamOrigin.relBtoP(np.array([cos(deltaAz),sin(deltaAz),0]))
            dstAz=atan2(actionInfo.dstDir[1],actionInfo.dstDir[0])

            #上昇・下降
            dstPitch=0
            actionInfo.dstDir=np.array([actionInfo.dstDir[0]*cos(dstPitch),actionInfo.dstDir[1]*cos(dstPitch),-sin(dstPitch)])

            #加減速
            V=np.linalg.norm(myMotion.vel())
            actionInfo.asThrottle=False
            accel=self.accelTable[myAction["accel"]]
            actionInfo.dstV=V+accel
            actionInfo.keepVel = accel==0.0

            #下限速度の制限
            if V<self.minimumV:
                actionInfo.velRecovery=True
            if V>=self.minimumRecoveryV:
                actionInfo.velRecovery=False
            if actionInfo.velRecovery:
                actionInfo.dstV=self.minimumRecoveryDstV
                actionInfo.asThrottle=False

            #射撃
            #actionのパース
            shotTarget=myAction["target"]-1

            #射撃可否の判断、射撃コマンドの生成
            flyingMsls=0
            if myObs.contains_p("/weapon/missiles"):
                for msl in myObs.at_p("/weapon/missiles"):
                    if msl.at("isAlive")() and msl.at("hasLaunched")():
                        flyingMsls+=1
            if not (
                shotTarget>=0 and
                shotTarget<len(self.lastTrackInfo) and
                parent.isLaunchableAt(self.lastTrackInfo[shotTarget]) and
                flyingMsls<self.maxSimulShot
            ):
                shotTarget=-1
            if shotTarget>=0:
                actionInfo.launchFlag=True
                actionInfo.target=self.lastTrackInfo[shotTarget]
            else:
                actionInfo.launchFlag=False
                actionInfo.target=Track3D()

            self.observables[parentFullName]["decision"]={
                "Roll":("Don't care"),
                "Fire":(actionInfo.launchFlag,actionInfo.target.to_json())
            }
            if len(myMWS)>0 and self.use_override_evasion:
                self.observables[parentFullName]["decision"]["Horizontal"]=("Az_NED",dstAz)
            else:
                if self.dstAz_relative:
                    self.observables[parentFullName]["decision"]["Horizontal"]=("Az_BODY",deltaAz)
                else:
                    self.observables[parentFullName]["decision"]["Horizontal"]=("Az_NED",dstAz)
            self.observables[parentFullName]["decision"]["Vertical"]=("El",dstPitch)
            if actionInfo.asThrottle:
                self.observables[parentFullName]["decision"]["Throttle"]=("Throttle",actionInfo.dstThrottle)
            else:
                self.observables[parentFullName]["decision"]["Throttle"]=("Vel",actionInfo.dstV)


### 全てをまとめる

以下では定義したactionなどに合わせて`deploy`や`control`メソッドなどを含めたすべてを示してある. `control`メソッドでは場外に出ないような制御をかけている. これを`MyAgent.py`として作成しておく(中央集権方式).

In [None]:
import numpy as np
from ASRCAISim1.core import Agent, getValueFromJsonK, getValueFromJsonKR, getValueFromJsonKRD, LinearSegment, MotionState, Track3D, Track2D, Time, TimeSystem, deg2rad, serialize_attr_with_type_info, StaticCollisionAvoider2D,AltitudeKeeper# type: ignore
from BasicAgentUtility.util import TeamOrigin, sortTrack3DByDistance, sortTrack2DByAngle, calcRNorm # type: ignore
from math import atan2, cos, sin, sqrt
from gymnasium import spaces


class SampleAgent(Agent):
    class ActionInfo:
        #機体に対するコマンドを生成するための変数をまとめた構造体
        def __init__(self):
            self.dstDir=np.array([1.0,0.0,0.0]) #目標進行方向
            self.dstAlt=10000.0 #目標高度
            self.velRecovery=False #下限速度制限からの回復中かどうか
            self.asThrottle=False #加減速についてスロットルでコマンドを生成するかどうか
            self.keepVel=False #加減速について等速(dstAccel=0)としてコマンドを生成するかどうか
            self.dstThrottle=1.0 #目標スロットル
            self.dstV=300 #目標速度
            self.launchFlag=False #射撃するかどうか
            self.target=Track3D() #射撃対象
            self.lastShotTimes={} #各Trackに対する直前の射撃時刻
        def serialize(self, archive):
            serialize_attr_with_type_info(archive, self
                ,"dstDir"
                ,"dstAlt"
                ,"velRecovery"
                ,"asThrottle"
                ,"keepVel"
                ,"dstThrottle"
                ,"dstV"
                ,"launchFlag"
                ,"target"
                ,"lastShotTimes"
            )
        _allow_cereal_serialization_in_cpp = True
        def save(self, archive):
            self.serialize(archive)
        @classmethod
        def static_load(cls, archive):
            ret=cls()
            ret.serialize(archive)
            return ret


    def initialize(self):
        super().initialize()
        self.own = self.getTeam()
        self.common_dim = 1
        self.maxParentNum=getValueFromJsonK(self.modelConfig,"maxParentNum")
        self.maxFriendNum=getValueFromJsonK(self.modelConfig,"maxFriendNum")
        self.maxEnemyNum=getValueFromJsonK(self.modelConfig,"maxEnemyNum")
        self.maxFriendMissileNum=getValueFromJsonK(self.modelConfig,"maxFriendMissileNum")
        self.maxEnemyMissileNum=getValueFromJsonK(self.modelConfig,"maxEnemyMissileNum")
        self.use_observation_mask=getValueFromJsonK(self.modelConfig,"use_observation_mask")
        self.use_action_mask=getValueFromJsonK(self.modelConfig,"use_action_mask")
        self.remaining_time_clipping=getValueFromJsonKR(self.modelConfig,"remaining_time_clipping",self.randomGen)
        self.friend_dim=7
        self.horizontalNormalizer=getValueFromJsonKR(self.modelConfig,"horizontalNormalizer",self.randomGen)
        self.verticalNormalizer=getValueFromJsonKR(self.modelConfig,"verticalNormalizer",self.randomGen)
        self.fgtrVelNormalizer=getValueFromJsonKR(self.modelConfig,"fgtrVelNormalizer",self.randomGen)
        self.enemy_dim=7
        self.friend_missile_dim=7
        self.mslVelNormalizer=getValueFromJsonKR(self.modelConfig,"mslVelNormalizer",self.randomGen)
        self.enemy_missile_dim = 3


        #actionに関するもの
        # 左右旋回に関する設定
        self.dstAz_relative=getValueFromJsonK(self.modelConfig,"dstAz_relative")
        self.turnTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"turnTable")),dtype=np.float64)
        self.turnTable*=deg2rad(1.0)
        self.use_override_evasion=getValueFromJsonK(self.modelConfig,"use_override_evasion")
        if self.use_override_evasion:
            self.evasion_turnTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"evasion_turnTable")),dtype=np.float64)
            self.evasion_turnTable*=deg2rad(1.0)
            assert len(self.turnTable)==len(self.evasion_turnTable)
        else:
            self.evasion_turnTable=self.turnTable

        self.actionInfos={}
        for port,parent in self.parents.items():
            self.actionInfos[parent.getFullName()]=self.ActionInfo()

        # 加減速に関する設定
        self.accelTable=np.array(sorted(getValueFromJsonK(self.modelConfig,"accelTable")),dtype=np.float64)

        #行動制限に関する設定
        # 場外制限に関する設定
        self.dOutLimit=getValueFromJsonKRD(self.modelConfig,"dOutLimit",self.randomGen,5000.0)
        self.dOutLimitThreshold=getValueFromJsonKRD(self.modelConfig,"dOutLimitThreshold",self.randomGen,10000.0)
        self.dOutLimitStrength=getValueFromJsonKRD(self.modelConfig,"dOutLimitStrength",self.randomGen,2e-3)

        #  高度制限に関する設定
        self.altMin=getValueFromJsonKRD(self.modelConfig,"altMin",self.randomGen,2000.0)
        self.altMax=getValueFromJsonKRD(self.modelConfig,"altMax",self.randomGen,15000.0)
        self.altitudeKeeper=AltitudeKeeper(self.modelConfig().get("altitudeKeeper",{}))

        # 同時射撃数の制限に関する設定
        self.maxSimulShot=getValueFromJsonKRD(self.modelConfig,"maxSimulShot",self.randomGen,4)

        # 下限速度の制限に関する設定
        self.minimumV=getValueFromJsonKRD(self.modelConfig,"minimumV",self.randomGen,150.0)
        self.minimumRecoveryV=getValueFromJsonKRD(self.modelConfig,"minimumRecoveryV",self.randomGen,180.0)
        self.minimumRecoveryDstV=getValueFromJsonKRD(self.modelConfig,"minimumRecoveryDstV",self.randomGen,200.0)


    def validate(self):
        #Rulerに関する情報の取得
        rulerObs=self.manager.getRuler()().observables()
        self.dOut=rulerObs["dOut"] # 戦域中心から場外ラインまでの距離
        self.dLine=rulerObs["dLine"] # 戦域中心から防衛ラインまでの距離
        self.teamOrigin=TeamOrigin(self.own==rulerObs["eastSider"],self.dLine) # 陣営座標系変換クラス定義


    def makeObs(self):
        obs = {}
        observation_mask={}
        
        # common(残り時間)
        ret=np.zeros([self.common_dim],dtype=np.float32)
        rulerObs=self.manager.getRuler()().observables
        maxTime=rulerObs['maxTime']()
        ret[0]=min((maxTime-self.manager.getElapsedTime())/60.0, self.remaining_time_clipping)

        obs['common'] = ret

        #味方機(parents→parents以外の順)
        ret = np.zeros([self.maxParentNum, self.friend_dim],dtype=np.float32)
        parent_mask=np.zeros([self.maxParentNum],dtype=np.float32)
        self.ourMotion=[]
        self.ourObservables=[]
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        parentFullNames=set()
        # まずはparents
        for port, parent in self.parents.items():
            parentFullNames.add(parent.getFullName())
            if parent.isAlive():
                self.ourMotion.append(MotionState(parent.observables["motion"]).transformTo(self.getLocalCRS()))
                #残存していればobservablesそのもの
                self.ourObservables.append(parent.observables)
            else:
                self.ourMotion.append(MotionState())
                #被撃墜or墜落済なら本体の更新は止まっているので残存している親が代理更新したものを取得(誘導弾情報のため)
                self.ourObservables.append(
                    firstAlive.observables.at_p("/shared/fighter").at(parent.getFullName()))

        # その後にparents以外
        for fullName,fObs in firstAlive.observables.at_p("/shared/fighter").items():
            if not fullName in parentFullNames:
                if fObs.at("isAlive"):
                    self.ourMotion.append(MotionState(fObs["motion"]).transformTo(self.getLocalCRS()))
                else:
                    self.ourMotion.append(MotionState())

                self.ourObservables.append(fObs)
        fIdx = 0
        for port,parent in self.parents.items():
            if fIdx>=self.maxParentNum:
                break
            fObs=self.ourObservables[fIdx]
            fMotion=self.ourMotion[fIdx]
            if fObs.at("isAlive"):
                parent_mask[fIdx]=1
                pos=self.teamOrigin.relPtoB(fMotion.pos()) #慣性座標系→陣営座標系に変換
                vel=self.teamOrigin.relPtoB(fMotion.vel()) #慣性座標系→陣営座標系に変換
                a=np.zeros([self.friend_dim],dtype=np.float32)
                ofs = 0
                a[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
                ofs += 3
                V=np.linalg.norm(vel)
                a[ofs]=V/self.fgtrVelNormalizer
                ofs+=1
                a[ofs:ofs+3]=vel/max(V, 1e-5)
                ret[fIdx, :] = a
            fIdx+=1

        obs['parent'] = ret
        if self.use_observation_mask:
            observation_mask['parent']=parent_mask

        #彼機(味方の誰かが探知しているもののみ)
        #観測されている航跡を、自陣営の機体に近いものから順にソートしてlastTrackInfoに格納する。
        #lastTrackInfoは行動のdeployでも射撃対象の指定のために参照する。
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        self.lastTrackInfo=[Track3D(t).transformTo(self.getLocalCRS()) for t in firstAlive.observables.at_p("/sensor/track")]
        sortTrack3DByDistance(self.lastTrackInfo,self.ourMotion,True)

        ret=np.zeros([self.maxEnemyNum,self.enemy_dim],dtype=np.float32)
        enemy_mask=np.zeros([self.maxEnemyNum],dtype=np.float32)
        for tIdx,track in enumerate(self.lastTrackInfo):
            if tIdx>=self.maxEnemyNum:
                break
            t=np.zeros([self.enemy_dim],dtype=np.float32)
            ofs=0
            pos=self.teamOrigin.relPtoB(track.pos()) #慣性座標系→陣営座標系に変換
            t[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ofs+=3
            vel=self.teamOrigin.relPtoB(track.vel()) #慣性座標系→陣営座標系に変換
            V=np.linalg.norm(vel)
            t[ofs]=V/self.fgtrVelNormalizer
            ofs+=1
            t[ofs:ofs+3]=vel/max(V, 1e-5)
            ofs+=3
            ret[tIdx,:]=t
            enemy_mask[tIdx]=1

        obs['enemy']=ret
        if self.use_observation_mask:
            observation_mask['enemy']=enemy_mask


        #味方誘導弾(射撃時刻の古い順にソート
        def launchedT(m):
            return Time(m["launchedT"]) if m["isAlive"] and m["hasLaunched"] else Time(np.inf,TimeSystem.TT)
        self.msls=sorted(sum([[m for m in f.at_p("/weapon/missiles")] for f in self.ourObservables],[]),key=launchedT)
        ret=np.zeros([self.maxFriendMissileNum,self.friend_missile_dim],dtype=np.float32)
        friend_missile_mask=np.zeros([self.maxFriendMissileNum],dtype=np.float32)
        for mIdx,mObs in enumerate(self.msls):
            if mIdx>=self.maxFriendMissileNum or not (mObs.at("isAlive") and mObs.at("hasLaunched")):
                break
            a=np.zeros([self.friend_missile_dim],dtype=np.float32)
            ofs=0
            mm=MotionState(mObs["motion"]).transformTo(self.getLocalCRS())
            pos=self.teamOrigin.relPtoB(mm.pos()) #慣性座標系→陣営座標系に変換
            a[ofs:ofs+3]=pos/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ofs+=3
            vel=self.teamOrigin.relPtoB(mm.vel()) #慣性座標系→陣営座標系に変換
            V=np.linalg.norm(vel)
            a[ofs]=V/self.mslVelNormalizer
            ofs+=1
            a[ofs:ofs+3]=vel/max(V, 1e-5)
            ret[mIdx,:]=a
            friend_missile_mask[mIdx]=1
        
        obs['friend_missile'] = ret
        if self.use_observation_mask:
            observation_mask['friend_missile'] = friend_missile_mask

        #彼側誘導弾(各機の正面に近い順にソート)
        self.mws=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            fObs=self.ourObservables[fIdx]
            self.mws.append([])
            if fObs["isAlive"]:
                if fObs.contains_p("/sensor/mws/track"):
                    for mObs in fObs.at_p("/sensor/mws/track"):
                        self.mws[fIdx].append(Track2D(mObs).transformTo(self.getLocalCRS()))
                sortTrack2DByAngle(self.mws[fIdx],fMotion,np.array([1,0,0]),True)
        ret=np.zeros([self.maxEnemyMissileNum,self.enemy_missile_dim],dtype=np.float32)
        enemy_missile_mask=np.zeros([self.maxEnemyMissileNum],dtype=np.float32)
        allMWS=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            if self.ourObservables[fIdx].at("isAlive"):
                for m in self.mws[fIdx]:
                    angle=np.arccos(np.clip(m.dir().dot(fMotion.dirBtoP(np.array([1,0,0]))),-1,1))
                    allMWS.append([m,angle])
        allMWS.sort(key=lambda x: x[1])
        for mIdx,m in enumerate(allMWS):
            if mIdx>=self.maxEnemyMissileNum:
                break
            a=np.zeros([self.enemy_missile_dim],dtype=np.float32)
            ofs=0
            origin=self.teamOrigin.relPtoB(m[0].origin()) #慣性座標系→陣営座標系に変換
            a[ofs:ofs+3]=origin/np.array([self.horizontalNormalizer,self.horizontalNormalizer,self.verticalNormalizer])
            ret[mIdx,:]=a
            enemy_missile_mask[mIdx]=1
        
        obs['enemy_missile'] = ret
        if self.use_observation_mask:
            observation_mask['enemy_missile']=enemy_missile_mask

        if self.use_observation_mask and len(observation_mask)>0:
            obs['observation_mask']=observation_mask

        if self.use_action_mask:
            self.action_mask=self.makeActionMask()
            if not self.action_mask is None:
                obs['action_mask']=self.action_mask


        return obs


    def makeActionMask(self):
        #無効な行動を示すマスクを返す
        #有効な場合は1、無効な場合は0とする。
        if self.use_action_mask:
            #このサンプルでは射撃目標のみマスクする。
            target_mask=np.zeros([1+self.maxEnemyNum],dtype=np.float32)
            target_mask[0]=1#「射撃なし」はつねに有効
            for tIdx,track in enumerate(self.lastTrackInfo):
                if tIdx>=self.maxEnemyNum:
                    break
                target_mask[1+tIdx]=1

            ret=[]
            for port,parent in self.parents.items():
                mask={}
                mask["turn"]=np.full([len(self.turnTable)],1,dtype=np.float32)
                mask["accel"]=np.full([len(self.accelTable)],1,dtype=np.float32)
                mask["target"]=target_mask
                ret.append(mask)
            return ret
        else:
            return None


    def observation_space(self):
        floatLow=np.finfo(np.float32).min
        floatHigh=np.finfo(np.float32).max
        obs_space = {
            'common': spaces.Box(floatLow,floatHigh,
                                 shape=[self.common_dim],
                                 dtype=np.float32),
            'parent': spaces.Box(floatLow,floatHigh,
                                 shape=[self.maxParentNum,self.friend_dim],
                                 dtype=np.float32),
            'enemy': spaces.Box(floatLow,floatHigh,
                                shape=[self.maxEnemyNum,self.enemy_dim],
                                dtype=np.float32),
            'friend_missile': spaces.Box(floatLow,floatHigh,
                                         shape=[self.maxFriendMissileNum,self.friend_missile_dim],
                                         dtype=np.float32),
            'enemy_missile': spaces.Box(floatLow,floatHigh,
                                        shape=[self.maxEnemyMissileNum,self.enemy_missile_dim],
                                        dtype=np.float32)
        }
        if self.use_observation_mask:
            observation_mask = {
                'parent': spaces.Box(floatLow,floatHigh,
                                     shape=[self.maxParentNum],
                                     dtype=np.float32),
                'enemy': spaces.Box(floatLow,floatHigh,
                                    shape=[self.maxEnemyNum],
                                    dtype=np.float32),
                'friend_missile': spaces.Box(floatLow,floatHigh,
                                             shape=[self.maxFriendMissileNum],
                                             dtype=np.float32),
                'enemy_missile': spaces.Box(floatLow,floatHigh,
                                            shape=[self.maxEnemyMissileNum],
                                            dtype=np.float32)
            }
            obs_space['observation_mask'] = spaces.Dict(observation_mask) # type: ignore

        if self.use_action_mask:
            single_action_mask_space_dict = {
                'turn': spaces.Box(floatLow,floatHigh,
                                   shape=[len(self.turnTable)],
                                   dtype=np.float32),
                'accel': spaces.Box(floatLow,floatHigh,
                                    shape=[len(self.accelTable)],
                                    dtype=np.float32),
                'target': spaces.Box(floatLow,floatHigh,
                                     shape=[1+self.maxEnemyNum],
                                     dtype=np.float32)
            }
            single_action_mask_space=spaces.Dict(single_action_mask_space_dict) # type: ignore
            action_mask_space_list=[]
            for port, parent in self.parents.items():
                action_mask_space_list.append(single_action_mask_space)
            obs_space['action_mask'] = spaces.Tuple(action_mask_space_list) # type: ignore

        return spaces.Dict(obs_space) # type: ignore


    def action_space(self):
        single_action_space_dict={
            'turn': spaces.Discrete(len(self.turnTable)),
            'target': spaces.Discrete(1+self.maxEnemyNum),
            'accel': spaces.Discrete(len(self.accelTable))
        }
        single_action_space=spaces.Dict(single_action_space_dict) # type: ignore
        action_space_list=[]
        for port, parent in self.parents.items():
            action_space_list.append(single_action_space)
        return spaces.Tuple(action_space_list) # 行動空間の定義


    def deploy(self, action):
        #observablesの収集
        #味方機(parents→parents以外の順)
        self.ourMotion=[]
        self.ourObservables=[]
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        parentFullNames=set()
        # まずはparents
        for port, parent in self.parents.items():
            parentFullNames.add(parent.getFullName())
            if parent.isAlive():
                self.ourMotion.append(MotionState(parent.observables["motion"]).transformTo(self.getLocalCRS()))
                #残存していればobservablesそのもの
                self.ourObservables.append(parent.observables)
            else:
                self.ourMotion.append(MotionState())
                #被撃墜or墜落済なら本体の更新は止まっているので残存している親が代理更新したものを取得(誘導弾情報のため)
                self.ourObservables.append(
                    firstAlive.observables.at_p("/shared/fighter").at(parent.getFullName()))

        # その後にparents以外
        for fullName,fObs in firstAlive.observables.at_p("/shared/fighter").items():
            if not fullName in parentFullNames:
                if fObs.at("isAlive"):
                    self.ourMotion.append(MotionState(fObs["motion"]).transformTo(self.getLocalCRS()))
                else:
                    self.ourMotion.append(MotionState())

                self.ourObservables.append(fObs)


        # 彼機情報だけは射撃対象の選択と連動するので更新してはいけない。
        #味方誘導弾(射撃時刻の古い順にソート)
        def launchedT(m):
            return Time(m["launchedT"]) if m["isAlive"] and m["hasLaunched"] else Time(np.inf,TimeSystem.TT)
        self.msls=sorted(sum([[m for m in f.at_p("/weapon/missiles")] for f in self.ourObservables],[]),key=launchedT)

        #彼側誘導弾(各機の正面に近い順にソート)
        self.mws=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            fObs=self.ourObservables[fIdx]
            self.mws.append([])
            if fObs["isAlive"]:
                if fObs.contains_p("/sensor/mws/track"):
                    for mObs in fObs.at_p("/sensor/mws/track"):
                        self.mws[fIdx].append(Track2D(mObs).transformTo(self.getLocalCRS()))
                sortTrack2DByAngle(self.mws[fIdx],fMotion,np.array([1,0,0]),True)

        for pIdx,parent in enumerate(self.parents.values()):
            parentFullName=parent.getFullName()
            if not parent.isAlive():
                continue
            actionInfo=self.actionInfos[parentFullName]
            myMotion=self.ourMotion[pIdx]
            myObs=self.ourObservables[pIdx]
            myMWS=self.mws[pIdx]
            myAction=action[pIdx]

            #左右旋回
            deltaAz=self.turnTable[myAction["turn"]]
            actionInfo.dstDir=self.teamOrigin.relBtoP(np.array([cos(deltaAz),sin(deltaAz),0]))
            dstAz=atan2(actionInfo.dstDir[1],actionInfo.dstDir[0])

            #上昇・下降
            dstPitch=0
            actionInfo.dstDir=np.array([actionInfo.dstDir[0]*cos(dstPitch),actionInfo.dstDir[1]*cos(dstPitch),-sin(dstPitch)])

            #加減速
            V=np.linalg.norm(myMotion.vel())
            actionInfo.asThrottle=False
            accel=self.accelTable[myAction["accel"]]
            actionInfo.dstV=V+accel
            actionInfo.keepVel = accel==0.0

            #下限速度の制限
            if V<self.minimumV:
                actionInfo.velRecovery=True
            if V>=self.minimumRecoveryV:
                actionInfo.velRecovery=False
            if actionInfo.velRecovery:
                actionInfo.dstV=self.minimumRecoveryDstV
                actionInfo.asThrottle=False

            #射撃
            #actionのパース
            shotTarget=myAction["target"]-1

            #射撃可否の判断、射撃コマンドの生成
            flyingMsls=0
            if myObs.contains_p("/weapon/missiles"):
                for msl in myObs.at_p("/weapon/missiles"):
                    if msl.at("isAlive")() and msl.at("hasLaunched")():
                        flyingMsls+=1
            if not (
                shotTarget>=0 and
                shotTarget<len(self.lastTrackInfo) and
                parent.isLaunchableAt(self.lastTrackInfo[shotTarget]) and
                flyingMsls<self.maxSimulShot
            ):
                shotTarget=-1
            if shotTarget>=0:
                actionInfo.launchFlag=True
                actionInfo.target=self.lastTrackInfo[shotTarget]
            else:
                actionInfo.launchFlag=False
                actionInfo.target=Track3D()

            self.observables[parentFullName]["decision"]={
                "Roll":("Don't care"),
                "Fire":(actionInfo.launchFlag,actionInfo.target.to_json())
            }
            if len(myMWS)>0 and self.use_override_evasion:
                self.observables[parentFullName]["decision"]["Horizontal"]=("Az_NED",dstAz)
            else:
                if self.dstAz_relative:
                    self.observables[parentFullName]["decision"]["Horizontal"]=("Az_BODY",deltaAz)
                else:
                    self.observables[parentFullName]["decision"]["Horizontal"]=("Az_NED",dstAz)
            self.observables[parentFullName]["decision"]["Vertical"]=("El",dstPitch)
            if actionInfo.asThrottle:
                self.observables[parentFullName]["decision"]["Throttle"]=("Throttle",actionInfo.dstThrottle)
            else:
                self.observables[parentFullName]["decision"]["Throttle"]=("Vel",actionInfo.dstV)


    def control(self):
        #observablesの収集
        #味方機(parents→parents以外の順)
        self.ourMotion=[]
        self.ourObservables=[]
        firstAlive=None
        for port,parent in self.parents.items():
            if parent.isAlive():
                firstAlive=parent
                break

        parentFullNames=set()
        # まずはparents
        for port, parent in self.parents.items():
            parentFullNames.add(parent.getFullName())
            if parent.isAlive():
                self.ourMotion.append(MotionState(parent.observables["motion"]).transformTo(self.getLocalCRS()))
                #残存していればobservablesそのもの
                self.ourObservables.append(parent.observables)
            else:
                self.ourMotion.append(MotionState())
                #被撃墜or墜落済なら本体の更新は止まっているので残存している親が代理更新したものを取得(誘導弾情報のため)
                self.ourObservables.append(
                    firstAlive.observables.at_p("/shared/fighter").at(parent.getFullName()))

        # その後にparents以外
        for fullName,fObs in firstAlive.observables.at_p("/shared/fighter").items():
            if not fullName in parentFullNames:
                if fObs.at("isAlive"):
                    self.ourMotion.append(MotionState(fObs["motion"]).transformTo(self.getLocalCRS()))
                else:
                    self.ourMotion.append(MotionState())

                self.ourObservables.append(fObs)


        # 彼機情報だけは射撃対象の選択と連動するので更新してはいけない。
        #味方誘導弾(射撃時刻の古い順にソート)
        def launchedT(m):
            return Time(m["launchedT"]) if m["isAlive"] and m["hasLaunched"] else Time(np.inf,TimeSystem.TT)
        self.msls=sorted(sum([[m for m in f.at_p("/weapon/missiles")] for f in self.ourObservables],[]),key=launchedT)

        #彼側誘導弾(各機の正面に近い順にソート)
        self.mws=[]
        for fIdx,fMotion in enumerate(self.ourMotion):
            fObs=self.ourObservables[fIdx]
            self.mws.append([])
            if fObs["isAlive"]:
                if fObs.contains_p("/sensor/mws/track"):
                    for mObs in fObs.at_p("/sensor/mws/track"):
                        self.mws[fIdx].append(Track2D(mObs).transformTo(self.getLocalCRS()))
                sortTrack2DByAngle(self.mws[fIdx],fMotion,np.array([1,0,0]),True)

        #Setup collision avoider
        avoider=StaticCollisionAvoider2D()
        #北側
        c={
            "p1":np.array([+self.dOut,-5*self.dLine,0]),
            "p2":np.array([+self.dOut,+5*self.dLine,0]),
            "infinite_p1":True,
            "infinite_p2":True,
            "isOneSide":True,
            "inner":np.array([0.0,0.0]),
            "limit":self.dOutLimit,
            "threshold":self.dOutLimitThreshold,
            "adjustStrength":self.dOutLimitStrength,
        }
        avoider.borders.append(LinearSegment(c))
        #南側
        c={
            "p1":np.array([-self.dOut,-5*self.dLine,0]),
            "p2":np.array([-self.dOut,+5*self.dLine,0]),
            "infinite_p1":True,
            "infinite_p2":True,
            "isOneSide":True,
            "inner":np.array([0.0,0.0]),
            "limit":self.dOutLimit,
            "threshold":self.dOutLimitThreshold,
            "adjustStrength":self.dOutLimitStrength,
        }
        avoider.borders.append(LinearSegment(c))
        #東側
        c={
            "p1":np.array([-5*self.dOut,+self.dLine,0]),
            "p2":np.array([+5*self.dOut,+self.dLine,0]),
            "infinite_p1":True,
            "infinite_p2":True,
            "isOneSide":True,
            "inner":np.array([0.0,0.0]),
            "limit":self.dOutLimit,
            "threshold":self.dOutLimitThreshold,
            "adjustStrength":self.dOutLimitStrength,
        }
        avoider.borders.append(LinearSegment(c))
        #西側
        c={
            "p1":np.array([-5*self.dOut,-self.dLine,0]),
            "p2":np.array([+5*self.dOut,-self.dLine,0]),
            "infinite_p1":True,
            "infinite_p2":True,
            "isOneSide":True,
            "inner":np.array([0.0,0.0]),
            "limit":self.dOutLimit,
            "threshold":self.dOutLimitThreshold,
            "adjustStrength":self.dOutLimitStrength,
        }
        avoider.borders.append(LinearSegment(c))
        for pIdx,parent in enumerate(self.parents.values()):
            parentFullName=parent.getFullName()
            if not parent.isAlive():
                continue
            actionInfo=self.actionInfos[parentFullName]
            myMotion=self.ourMotion[pIdx]
            myObs=self.ourObservables[pIdx]
            originalMyMotion=MotionState(myObs["motion"]) #機体側にコマンドを送る際には元のparent座標系での値が必要

            #戦域逸脱を避けるための方位補正
            actionInfo.dstDir=avoider(myMotion,actionInfo.dstDir)

            #高度方向の補正
            n=sqrt(actionInfo.dstDir[0]*actionInfo.dstDir[0]+actionInfo.dstDir[1]*actionInfo.dstDir[1])
            dstPitch=atan2(-actionInfo.dstDir[2],n)
            #高度下限側
            bottom=self.altitudeKeeper(myMotion,actionInfo.dstDir,self.altMin)
            minPitch=atan2(-bottom[2],sqrt(bottom[0]*bottom[0]+bottom[1]*bottom[1]))
            #高度上限側
            top=self.altitudeKeeper(myMotion,actionInfo.dstDir,self.altMax)
            maxPitch=atan2(-top[2],sqrt(top[0]*top[0]+top[1]*top[1]))
            dstPitch=max(minPitch,min(maxPitch,dstPitch))
            cs=cos(dstPitch)
            sn=sin(dstPitch)
            actionInfo.dstDir=np.array([actionInfo.dstDir[0]/n*cs,actionInfo.dstDir[1]/n*cs,-sn])

            self.commands[parentFullName]={
                "motion":{
                    "dstDir":originalMyMotion.dirAtoP(actionInfo.dstDir,myMotion.pos(),self.getLocalCRS()) #元のparent座標系に戻す
                },
                "weapon":{
                    "launch":actionInfo.launchFlag,
                    "target":actionInfo.target.to_json()
                }
            }
            if actionInfo.asThrottle:
                self.commands[parentFullName]["motion"]["dstThrottle"]=actionInfo.dstThrottle
            elif actionInfo.keepVel:
                self.commands[parentFullName]["motion"]["dstAccel"]=0.0
            else:
                self.commands[parentFullName]["motion"]["dstV"]=actionInfo.dstV
            actionInfo.launchFlag=False


## policyの実装

### Policyモデルを導入する前に事前に定義した行動の出力を確認したいとき

[エージェントの作成](#エージェントの作成)で実装した行動空間の出力を確認したいときは`policy`メソッドで渡される`action_space`で適当にサンプリングしてみるとよい.

例えば下記のように`__init__.py`の中でDummyPolicyを定義してその中でサンプリングする処理を実装する.

```Python
class DummyPolicy(StandalonePolicy):
    def step(self,observation,reward,done,info,agentFullName,observation_space,action_space):
        actions = action_space.sample()
        b = []
        for a in actions:
            d = {k: int(v) for k, v in a.items()}
            b.append(d)
        print(b)
        return b
```

### Policyモデルの定義

observationを渡してactionを返すpolicyモデルを実装する. ここでは`R7ContestSample.R7ContestTorchNNSampleForHandyRL.R7ContestTorchNNSampleForHandyRL`により深層学習モデルを構築する前提とする. 

コンフィグファイル`sample_config.yml`の`policy_config`->`Learner`->`model_config`を適宜編集して深層学習モデルを新たに定義することができる.

```Yaml
actionDistributionClassGetter: actionDistributionClassGetter
use_lstm: false
lstm_cell_size: 256
lstm_num_layers: 1
lstm_dropout: 0.2
common:
    layers:
        - ["Linear",{"out_features": 16}]
        - ["ReLU",{}]
        - ["ResidualBlock",{
            "layers":[
                ["Linear",{"out_features": 16}],
                ["BatchNorm1d",{}]
            ]}]
        - ["ReLU",{}]
        - ["ResidualBlock",{
            "layers":[
                ["Linear",{"out_features": 16}],
                ["BatchNorm1d",{}]
            ]}]
parent:
    layers:
        - ["Linear",{"out_features": 64}]
        - ["ReLU",{}]
        - ["ResidualBlock",{
            "layers":[
                ["Linear",{"out_features": 64}],
                ["BatchNorm1d",{}]
            ]}]
        - ["ReLU",{}]
        - ["ResidualBlock",{
            "layers":[
                ["Linear",{"out_features": 64}],
                ["BatchNorm1d",{}]
            ]}]
...
```
"common", "parent"などの`make_obs`メソッドで生成したobservationのキーごとにネットワーク構造を定義できるようになっている.
`layers`の中で層を増やしたりノード数を増やしたりして深層学習モデルの構造を新たに定義する. 前に設定した`observation_space`や`action_space`によって入力層や出力層が決まる. 

対戦を実行する際は強化学習フレームワークから独立させてPolicyを使用するためのインターフェースによりpolicyを作成する(ここでは提供されている`ASRCAISim1.plugins.HandyRLUtility.StandaloneHandyRLPolicy`を使用). importして呼べるようにしておく.

[HandyRL(の改変版)を用いた強化学習サンプル/yaml で定義可能なニューラルネットワークのサンプル](http://localhost:5500/docs/html/core/a198026.html#section_r7_contest_handyrl_sample_nn)も参照されたい.


強化学習を前提としない場合は投稿プログラム内で適当な値を返すダミーpolicyを実装しておく.

In [None]:
import os, yaml
from R7ContestSample.R7ContestTorchNNSampleForHandyRL import R7ContestTorchNNSampleForHandyRL
from ASRCAISim1.plugins.HandyRLUtility.StandaloneHandyRLPolicy import StandaloneHandyRLPolicy # type: ignore
from ASRCAISim1.plugins.HandyRLUtility.distribution import getActionDistributionClass # type: ignore

model_config=yaml.safe_load(open(os.path.join(os.path.dirname(__file__),"model_config.yaml"),"r"))
weightPath = None
isDeterministic=False #決定論的に行動させたい場合はTrue、確率論的に行動させたい場合はFalseとする
policy = StandaloneHandyRLPolicy(R7ContestTorchNNSampleForHandyRL,model_config,weightPath,getActionDistributionClass,isDeterministic)

### 独自のPolicyモデルを使用したいとき

`R7ContestTorchNNSampleForHandyRL`以外の独自のPolicyモデルを実装したい場合は, どこかにモデルを定義したpyファイルを用意して`main.py`でimportして`custom_classes`の中に登録しておき, `sample_config.yml`の中の`policy_config`->`Learner`->`model_class`の中に対応するkeyを記載すればよい. 以下は実装したモデルが`MyPolicyModel`だった場合の例.

```Python
from policy_model import MyPolicyModel

custom_classes={
    # models
    "R7ContestTorchNNSampleForHandyRL": R7ContestTorchNNSampleForHandyRL,
    "DummyInternalModel": DummyInternalModel,
    "MyPolicyModel": MyPolicyModel,
    # match maker
    "R7ContestTwoTeamCombatMatchMaker": R7ContestTwoTeamCombatMatchMaker,
    "TwoTeamCombatMatchMonitor": TwoTeamCombatMatchMonitor,
    # action distribution class getter
    "actionDistributionClassGetter": getActionDistributionClass,
}

```

[HandyRL(の改変版)を用いた強化学習サンプル/カスタムクラスの使用](http://localhost:5500/docs/html/core/a198026.html#section_r7_contest_handyrl_sample_custom_classes)も参照されたい.

## Factoryへの追加

### 独自のAgentの登録

学習を実行するときに作成したエージェントを呼べるように登録しておく必要がある.

`./sample_config.yml`の"env_args"->"env"の値で実際に環境構築を行うモジュールを選択している(デフォルトでは`sample`という名前になっていて, `handyrl.envs.SampleEnv.sample.py`が利用されることになる.)が, シミュレーション環境構築時(`handyrl.envs.SampleEnv.sample.py`のL33-54の部分)でエージェントの登録を行っている.

```Python
# エージェントの登録
userModelID=args["userModelID"]
userModuleID=args["userModuleID"]
with open(os.path.join(userModuleID, args["modelargs"])) as f:
    model_args = json.load(f)

module = importlib.import_module(userModuleID)
assert hasattr(module, "getUserAgentClass")
assert hasattr(module, "getUserAgentModelConfig")
assert hasattr(module, "isUserAgentSingleAsset")
assert hasattr(module, "getUserPolicy")

agentClass = module.getUserAgentClass(model_args)
addPythonClass("Agent", "Agent_"+userModelID, agentClass)
Factory.addDefaultModel(
    "Agent",
    "Agent_"+userModelID,
    {
        "class": "Agent_"+userModelID,
        "config": module.getUserAgentModelConfig(model_args)
    }
)
```

`userModuleID`という名前のディレクトリに`__init__.py`があって, インポートしている. デフォルトでは`Test`という名前としている(`./Test`以下を実際に確認されたい.).

`./sample_config.yml`の"env_args"の"userModelID"の値`userModelID`が`Agent_{userModelID}`という形でクラス名として登録される. この名前と`R7_contest_learning_config_{M,S}.json`の"AgentConfigDispatcher"に記載の"Learner_e"の"model"の値を一致させることで作成したエージェントによる学習が実行可能となる(デフォルトでは"userModelID"は`Sample`としているので`Agent_Sample`としている. 各自確認されたい.).

ここでは特に扱っていないが, 並列実行する場合は全てのインスタンス上でFactoryへの追加が行われる必要がある(EnvironmentはWorkerごとにインスタンス化される(__init__がWorkerごとに呼ばれる)ため, 全てのインスタンスでFactoryへのモデル追加が独立に行われる).

デフォルトでは中央集権型として扱うため`R7_contest_learning_config_M.json`を自作エージェントに合わせるように編集している.

エージェント登録の別の方法として, R7ContestSampleに独自のクラスを作成しておいて(ビルドして`site-packages`を更新)`configs/R7_contest_agent_ruler_reward_models.json`で"Factory"の"Agent"項目で好きな名前(`新エージェント名`とする)を登録して, その"class"において, 作成した独自のクラス名を記載しておくことも可能. その際は"AgentConfigDispatcher"に記載の"Learner_e"の"model"の値を`新エージェント名`にする.

投稿可能なプログラムを作成するときはクラスを記述する部分を別のpyファイルとして作成してインポートできるようにしたり, `__init__.py`に直接書き込んでその中で呼べるようにしておく必要がある. 以下は`__init__.py`に直接書き込む場合の例.

```Python
def getUserAgentClass(args={}):
    import 独自のクラス名
    return 独自のクラス名

class 独自のクラス名():
    ...
```

### 独自のRewardの登録


シミュレーション環境構築時(`handyrl.envs.SampleEnv.sample.py`のL56-72の部分)で報酬の登録を行っている.

```Python
# 報酬の登録
userRewardModuleID = args["userRewardModuleID"]
with open(args["rewardConfig"]) as f:
    reward_config = json.load(f)

reward_module = importlib.import_module(userRewardModuleID)

rewardClass = reward_module.MyReward
addPythonClass("Reward", userRewardModuleID, rewardClass)
Factory.addDefaultModel(
    "Reward",
    userRewardModuleID,
    {
        "class": "MyReward",
        "config": reward_config
    }
)
```

`MyReward.py`の`MyReward`モジュールを読み込んで`reward_config.json`で記述されている設定ファイルを渡して登録をしている. この実装ではクラス名は`MyReward`で固定する必要がある. `MyReward.py`では, `onInnerStepEnd`メソッドにおいてインナーステップ終了時に残存機数に応じて報酬を与えるシンプルなものとなっている. 適宜改修されたい. また, [独自 Reward の実装方法](http://localhost:5500/docs/html/core/a198019.html)も参照されたい. `R7ContestSample`にも`R7ContestPyRewardSample01.py`などに報酬の実装例があるため, こちらも参照すること.

またエージェントの場合と同様に, R7ContestSampleに独自のクラスを作成しておいて(ビルドして`site-packages`を更新)`configs/R7_contest_agent_ruler_reward_models.json`で"Factory"の"Reward"項目で好きな名前(`新報酬名`とする)を登録して, その"class"において, 作成した独自のクラス名を記載しておくことも可能.

学習設定ファイル`configs/R7_contest_learning_config_{M/S}.json`の"Reward"で登録する報酬を以下のようにリストで渡すとその合計値を実際の報酬として返す. ここで実装している`MyReward.py`を報酬として学習させたい場合, `sample_config.yml`の"env_args"の"userRewardModuleID"の値を以下のように"model"の値として記述しておく. "target"を以下のように"All"とすると場に存在する者すべての陣営及びAgentが計算対象となる.

```json
[
    {"model":"MyReward","target":"All"},
    {"model":"MyWinLoseReward","target":"All"}
]
```

以下は`MyReward.py`の実装例.

In [None]:
from ASRCAISim1.core import TeamReward, nljson, Fighter


class MyReward(TeamReward):
    """
    チーム全体で共有する報酬は TeamReward を継承し、
    個別の Agent に与える報酬は AgentReward を継承する。
    """
    def __init__(self, modelConfig: nljson, instanceConfig: nljson):
        super().__init__(modelConfig, instanceConfig)
        if(self.isDummy):
            return #Factory によるダミー生成のために空引数でのインスタンス化に対応させる


    def onEpisodeBegin(self):
        """
        エピソード開始時の処理(必要に応じてオーバーライド)
        基底クラスにおいて config に基づき報酬計算対象の設定等が行われるため、
        それ以外の追加処理や設定の上書きを行いたい場合のみオーバーライドする。
        """
        super().onEpisodeBegin()


    def onStepBegin(self):
        """
        step 開始時の処理(必要に応じてオーバーライド)
        基底クラスにおいて reward(step 報酬)を 0 にリセットしているため、
        オーバーライドする場合、基底クラスの処理を呼び出すか、同等の処理が必要。
        """
        super().onEpisodeBegin()
    
    
    def onInnerStepBegin(self):
        """
        インナーステップ開始時の処理(必要に応じてオーバーライド)
        デフォルトでは何も行わないが、より細かい報酬計算が必要な場合に使用可能。
        """
        pass
    
    
    def onInnerStepEnd(self):
        """
        インナーステップ終了時の処理(必要に応じてオーバーライド)
        一定周期で呼び出されるため、極力この関数で計算する方が望ましい。
        """
        for team in self.reward: # team に属している Asset(Fighter)を取得する例
            for f in self.manager.getAssets(lambda a:a.getTeam()==team and isinstance(a,Fighter)):
                if(f().isAlive()):
                    self.reward[team] += 0.1 #例えば、残存数に応じて報酬を与える場合



## 学習の実行

作成したエージェントを学習する. 学習に使用する環境やモデルや学習条件などが`./sample_config.yml`として与えられている. 特に"env_args"の"userModuleID"と"model_args"にはそれぞれ実装したエージェント(`MyAgent.py`)が保存されるディレクトリ(デフォルトでは`Test`)とその引数ファイル(`args.json`)を設定する(`./sample_config.yml`参照). その他設定などについては[HandyRL(の改変版)を用いた強化学習サンプル/yaml の記述方法](http://localhost:5500/docs/html/core/a198026.html#section_r7_contest_handyrl_sample_yaml_format)を参照されたい. なお, `./sample_config.yml`ではオープン部門用(中央集権方式)の設定としている.


例えば学習条件としてエポック数を変えたい場合は`sample_config.yml`の`train_args`->`Learner`で`epochs`を変えればよい(-1に設定すると上限なしとなる.).

```Yaml
train_args:
    Learner:
...
        epochs: 5 # エポック数を5にしたい場合
```

詳細は[HandyRL(の改変版)を用いた強化学習サンプル/yaml の記述方法](http://localhost:5500/docs/html/core/a198026.html#section_r7_contest_handyrl_sample_yaml_format)を参照されたい

エージェント(`MyAgent.py`)とその引数ファイル(`args.json`), エージェントに対する設定ファイル(`agent_config.json`)を`./Test`以下に格納しておく. すると`./Test`は以下のようなディレクトリ構造になる.

```bash
Test
├─ __init__.py
├─ agent_config.json
├─ args.json
└─ MyAgent.py
```

そして以下のコマンドを実行する.

In [None]:
# /path/to/tutorialで実行
!python main.py sample_config.yml --train

学習済みモデルは`./results/Open/Multi/YYYYmmddHHMMSS/policies/checkpoints`以下に保存される(pthファイル).

## 投稿可能なプログラム一式としてまとめる

学習済みモデル(例えば`Learner-latest.pth`)を`./Test`以下に格納する. `args.json`の"weightPath"の値を`Learner-latest.pth`としておく. また, 学習時に使用したモデルの設定ファイルと同等のものとして`model_config.yaml`を格納する(`sample_config.yml`の"policy_config"などの内容と整合することを確認しておく.). そして最終的に以下のようなディレクトリ構造となることを確認.

```bash
Test
├─ __init__.py
├─ agent_config.json
├─ args.json
├─ Learner-latest.pth
├─ model_config.yaml
└─ MyAgent.py
```

## 対戦を実行する

作成したエージェントをサンプルルールベースモデルと戦わせる. デフォルトではオープン部門の条件(`--youth`が0)で`./Test`で作ったエージェントモジュールと`./BenchMark`で与えられるサンプルルールベースモデルの対戦となる.

In [None]:
# /path/to/tutorialで実行
!python validate.py

適宜`--color`を"Blue"や"Red"に変えて陣営の種類に応じた行動が取れているかなどを確認する(実際の対戦では陣営の色はランダムに決まる). また, `--replay`や`--visualize`を`1`にして実際にどのように動いているかを確認するなりログを参考にしてobservationの作成方法に工夫を施したり`sample_config.yml`で使用するモデルを変えたり学習の仕方に工夫を施すなどしてアルゴリズムをよりよくする.

## 応募用ファイルを作成する

作成したプログラムをzipファイルとして圧縮する.

In [None]:
# /path/to/tutorialで実行
!zip -r submit ./Test