# 環境作成

In [1]:
import numpy as np
import gymnasium
from gymnasium.wrappers import TimeLimit
from imitation.data import rollout
from imitation.policies.serialize import load_policy
from imitation.util.util import make_vec_env
from imitation.data.wrappers import RolloutInfoWrapper
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
from stable_baselines3.common.policies import ActorCriticCnnPolicy

from gymnasium.spaces import Box
from gymnasium.envs.registration import register
import racing_gym
import os
import json
from PIL import Image
import torch
import time


  from .autonotebook import tqdm as notebook_tqdm


## 走行データをロードする関数

In [2]:
import os
import json
import numpy as np
from PIL import Image

def load_expert_data_directory(parent_directory):
    # parent_directory内のすべてのディレクトリを取得
    subdirectories = [os.path.join(parent_directory, d) for d in os.listdir(parent_directory) if os.path.isdir(os.path.join(parent_directory, d))]
    
    all_expert_data = []  # すべてのディレクトリのexpert_dataを格納するリスト

    for data_path in subdirectories:
        # ディレクトリごとにexpert_dataの初期化
        expert_data = {'images': [], 'actions': []}

        # 画像ファイルに対応するJSONファイルを取得
        json_file_list = [json_file for json_file in os.listdir(data_path) if json_file.startswith('record_') and json_file.endswith('.json')]

        for json_file in json_file_list:
            # レコードのファイルパスを構築
            json_path = os.path.join(data_path, json_file)

            # レコードの読み込み
            try:
                with open(json_path, 'r') as json_file:
                    record_data = json.load(json_file)
            except FileNotFoundError:
                print(f"エラー：{json_path} でJSONファイルが見つかりませんでした。")
                continue
            except json.JSONDecodeError:
                print(f"エラー：{json_path} のJSONファイルのデコードに失敗しました。")
                continue

            # 画像データの読み込み
            image_file = record_data.get('cam/image_array', '')  # 画像ファイル名をJSONから取得
            image_path = os.path.join(data_path, image_file)
            try:
                image_data = np.array(Image.open(image_path))
            except FileNotFoundError:
                print(f"エラー：{image_path} で画像ファイルが見つかりませんでした。")
                continue

            # expert_dataに追加
            expert_data['images'].append(image_data)
            expert_data['actions'].append([record_data.get('user/angle', 0), record_data.get('user/throttle', 0)])
        
        # すべてのディレクトリのexpert_dataをリストに追加
        all_expert_data.append(expert_data)


    return all_expert_data


走行データのロード

In [3]:
all_expert_data = load_expert_data_directory('../../autorace/data/T/')

In [4]:
print(len(all_expert_data))
print(len(all_expert_data[0]['images']))

21
1310


画像の型を変換

In [5]:
reshaped_all_expert_data = all_expert_data.copy()
# すべてのディレクトリに対して
for i in range(len(reshaped_all_expert_data)):
    # すべての画像を変換
    for j in range(len(reshaped_all_expert_data[i]['images'])):
        # 画像の軸の順序を変更
        reshaped_image = np.transpose(reshaped_all_expert_data[i]['images'][j], (2, 0, 1))
        
        # expert_dataに変更を反映
        reshaped_all_expert_data[i]['images'][j] = reshaped_image.copy()

# 形状を確認
print(reshaped_all_expert_data[0]['images'][0].shape)

for i in range(len(reshaped_all_expert_data)):
    for j in range(len(reshaped_all_expert_data[i]['images'])):
        if reshaped_all_expert_data[i]['images'][j].shape != (3, 224, 224):
            print(i, j)

print(all_expert_data[0]['images'][0].shape)
print(reshaped_all_expert_data[0]['images'][0].shape)

(3, 224, 224)
(3, 224, 224)
(3, 224, 224)


In [6]:
# 訓練用とテスト用のデータを初期化
train_expert_data = {'images': [], 'actions': []}
test_expert_data = {'images': [], 'actions': []}

# 評価用とテスト用のデータに7:3にランダムで分割
np.random.shuffle(all_expert_data)
split_index = int(len(all_expert_data) * 0.8)
print(split_index)

# 訓練用とテスト用のデータに分割
for i in range(split_index):
    train_expert_data['images'] += all_expert_data[i]['images']
    train_expert_data['actions'] += all_expert_data[i]['actions']

for i in range(split_index, len(all_expert_data)):
    test_expert_data['images'] += all_expert_data[i]['images']
    test_expert_data['actions'] += all_expert_data[i]['actions']

    
print(len(train_expert_data['actions']))
print(len(test_expert_data['actions']))

16
17734
4956


## 自作環境の宣言

In [11]:
# 学習用の環境を登録
env_id1 = 'RacingEnv-v3'  # あなたの環境の名前に変更してください
try:
    env_train = gymnasium.make(env_id1, expert_data=train_expert_data)
    print(f"Environment '{env_id1}' is successfully registered.")
except gymnasium.error.Error as e:
    print(f"Error: {e}")

# テスト用の環境を登録
env_id2 = 'RacingEnv-v3'
try:
    env_test = gymnasium.make(env_id2, expert_data=test_expert_data)
    print(f"Environment '{env_id2}' is successfully registered.")
except gymnasium.error.Error as e:
    print(f"Error: {e}")

Environment 'RacingEnv-v3' is successfully registered.
Environment 'RacingEnv-v3' is successfully registered.


  logger.deprecation(


venvの宣言

In [10]:
def _make_env():
    """Helper function to create a single environment. Put any logic here, but make sure to return a RolloutInfoWrapper."""
    _env = gymnasium.make("RacingEnv-v3", expert_data=train_expert_data)
    _env = RolloutInfoWrapper(_env)
    return _env

venv_train = DummyVecEnv([_make_env for _ in range(4)])

def _make_env():
    """Helper function to create a single environment. Put any logic here, but make sure to return a RolloutInfoWrapper."""
    _env1 = gymnasium.make("RacingEnv-v3", expert_data=test_expert_data)
    _env1 = RolloutInfoWrapper(_env1)
    return _env1

venv_test = DummyVecEnv([_make_env for _ in range(4)])

  logger.deprecation(


PPOアルゴリズムによる事前学習

In [13]:
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.ppo import CnnPolicy
from stable_baselines3.common.evaluation import evaluate_policy


expert = PPO(
    policy=CnnPolicy,
    env=env_train,
    seed=0,
    batch_size=64,
    ent_coef=0.0,
    learning_rate=0.0003,
    n_epochs=10,
    n_steps=64,
)
reward, _ = evaluate_policy(expert, env_test, 10, device='cuda:1')
print(f"Reward before training: {reward}")


# Note: if you followed step 2a, i.e. registered the environment, you can use the environment name directly

# expert = PPO(
#     policy=MlpPolicy,
#     env="custom/ObservationMatching-v0",
#     seed=0,
#     batch_size=64,
#     ent_coef=0.0,
#     learning_rate=0.0003,
#     n_epochs=10,
#     n_steps=64,
# )
expert.learn(100000)  # Note: set to 100000 to train a proficient expert
reward, _ = evaluate_policy(expert, expert.get_env(), 10, device='cuda:1')
print(f"Expert reward: {reward}")
reward, _ = evaluate_policy(expert, env_test, 10, device='cuda:1')
print(f"Expert reward: {reward}")

OutOfMemoryError: CUDA out of memory. Tried to allocate 72.00 MiB. GPU 0 has a total capacty of 10.91 GiB of which 95.00 MiB is free. Process 161064 has 10.12 GiB memory in use. Including non-PyTorch memory, this process has 266.00 MiB memory in use. Of the allocated memory 80.86 MiB is allocated by PyTorch, and 13.14 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [None]:
rng = np.random.default_rng()
rollouts = rollout.rollout(
    expert,
    venv1,
    rollout.make_sample_until(min_timesteps=None, min_episodes=50),
    rng=rng,
)
transitions = rollout.flatten_trajectories(rollouts)