# Mountain car
小车在受重力影响的山谷中, 到达旗帜则算成功, 可参考教学https://www.jiqizhixin.com/articles/2018-04-17-3

In [1]:
from IPython.display import display, HTML

# Create an HTML img tag to display the image with a set width and height
image_html = '<img src="../image/mountain_car.png" width="400" height="300" alt="Mountain Car"/>'

# Use IPython.display to show the image
display(HTML(image_html))

In [7]:
import gym
import numpy as np
import pandas as pd
import time

env = gym.make("MountainCar-v0", render_mode="rgb_array")

# 有三种离散动作: 向左, 不动, 向右
print(f"动作: {env.action_space}")
# 观测中第一个元素是x轴坐标, 第二个元素是小车速度
print(f"x轴最大值和速度最大值: {env.observation_space.high}")
print(f"x轴最小值和速度最小值: {env.observation_space.low}")

CHUNK_SIZE = 20
DISCRETE_OBSERVATION_SPACE_SIZE = [CHUNK_SIZE] * len(env.observation_space.high)
print(f"离散观测结果集合为{DISCRETE_OBSERVATION_SPACE_SIZE}的二维数组")
discrete_os_win_size = (env.observation_space.high - env.observation_space.low) / DISCRETE_OBSERVATION_SPACE_SIZE
print(f"每两个临近状态之间的差值: {discrete_os_win_size}")

# 学习率 α
LEARNING_RATE = 0.1
# 衰退因子 γ
GAMMA = 0.95
# 贪心算法的ε
EPSILON = 0.1
# 训练周期
EPISODES = 500

# 初始q表, 随机填入-2到0之间的值
q_table_size = DISCRETE_OBSERVATION_SPACE_SIZE + [env.action_space.n]
q_table = np.random.uniform(low=-2, high=0, size=q_table_size)
print(f"Q表维度: {q_table.shape}")

动作: Discrete(3)
x轴最大值和速度最大值: [0.6  0.07]
x轴最小值和速度最小值: [-1.2  -0.07]
离散观测结果集合为[20, 20]的二维数组
每两个临近状态之间的差值: [0.09  0.007]
Q表维度: (20, 20, 3)


In [3]:
def get_discrete_state(state):
    discrete_state = ((state - env.observation_space.low) / discrete_os_win_size).astype(int)
    # 确保第一个元素(位置)的取值在 0 到 (DISCRETE_OBSERVATION_SPACE_SIZE - 1) 之间, 因为有时小车的惯性会超出边界
    discrete_state[0] = np.clip(discrete_state[0], 0, CHUNK_SIZE - 1)
    return tuple(discrete_state)

initial_observation, _ = env.reset(seed=42)
# 第一个元素为位置状态(共20个), 第二个元素为速度状态(共20个)
print(f"初始位置{initial_observation}")
discrete_state = get_discrete_state(initial_observation)
print(f"初始状态{discrete_state}")

初始位置[-0.4452088  0.       ]
初始状态(8, 10)


In [None]:
import time
from gym.wrappers.monitoring.video_recorder import VideoRecorder

# 刚开始非常慢, 第一回合或许会花费几分钟, 但是只要成功一次, 就会形成雪球效应越来越快
for ep in range(EPISODES):
    # 每回合都要重置
    terminated = False
    initial_observation, _ = env.reset(seed=42)
    discrete_state = get_discrete_state(initial_observation)
    # 记录开始时间
    start_time = time.time()
    # 在第?回合收集画面
    if ep in [0, 499]:
        render = True
        file_path = f"../video/mountain_car_ep{ep}.mp4"
        video = VideoRecorder(env, file_path)
    else:
        render = False
        video.close()
        env.close()

    while not terminated:
        if np.random.random() > EPSILON:
            # 0是向左, 1是不动, 2是向右
            action = np.argmax(q_table[discrete_state])
        # 有一定几率进行随机选择而非最优解, 用于跳出局部最优, EPSILON越大则这个几率越大
        else:
            action = np.random.randint(0, env.action_space.n)
        new_state, reward, terminated, _, _ = env.step(action)
        new_discrete_state = get_discrete_state(new_state)
        if render:
            env.render()
            video.capture_frame()
        if not terminated:
            # max q value for the next state calculated above
            max_future_q = np.max(q_table[new_discrete_state])
            # q value for the current action and state
            current_q = q_table[discrete_state + (action, )]
            new_q = current_q + LEARNING_RATE * (reward + GAMMA * max_future_q - current_q)
            # 更新q表
            q_table[discrete_state + (action, )] = new_q
        # 到达目标地点, 奖励设置为0
        elif new_state[0] >= env.goal_position:
            end_time = time.time()
            print(f"在{ep}回合到达目标地点, 用时{(end_time - start_time):.2f}s")
            q_table[discrete_state + (action, )] = 0
        discrete_state = new_discrete_state
video.close()
env.close()