In [2]:
import gym
import numpy as np


env = gym.make('MountainCar-v0')

Q learning實現

初期隨機猜測動作
逐漸建立Q表
後期再從Q表選擇動作

由於位置是連續數據
會導致無限的狀態需要紀錄
因此將連續映射至整數空間內

In [106]:
scale = 100

max_pos, max_speed = env.observation_space.high
min_pos, min_speed = env.observation_space.low

def rescale(observation):
    """從環境的值域映射至0~scale的整數域"""
    pos, speed = observation
    
    pos = scale * (pos-min_pos) / (max_pos-min_pos)
    speed = scale * (speed-min_speed) / (max_speed-min_speed)
    
    return round(pos), round(speed)

In [20]:
lr = .9
discount = .9

episodes = 10000 # 訓練次數

from collections import defaultdict
Q = defaultdict(lambda: [0, 0, 0]) #{[, , ], ..., [, , ]}

max_score = 0

for episode in range(episodes + 1):
    observation = env.reset()
    observation = rescale(observation)
    
    score = 200
    
    while True:
        action = np.argmax(Q[observation])
        # 學習初期隨機選擇動作
        if episode < 100:
            action = env.action_space.sample()

            
        next_observation, reward, done, _ = env.step(action)
        next_observation = rescale(next_observation)
            
        Q[observation][action] = (1-lr)*Q[observation][action] + lr*(reward+discount*max(Q[next_observation]))
        
        observation = next_observation
        
        score += reward
        
        if done:
            max_score = max(max_score, score)
            if episode % 500 == 0:
                print(f"episode: {episode} max_score:{max_score}")
            break

episode: 0 max_score:0
episode: 500 max_score:0
episode: 1000 max_score:0
episode: 1500 max_score:0
episode: 2000 max_score:38.0
episode: 2500 max_score:39.0
episode: 3000 max_score:44.0
episode: 3500 max_score:47.0
episode: 4000 max_score:48.0
episode: 4500 max_score:49.0
episode: 5000 max_score:49.0
episode: 5500 max_score:79.0
episode: 6000 max_score:79.0
episode: 6500 max_score:84.0
episode: 7000 max_score:84.0
episode: 7500 max_score:84.0
episode: 8000 max_score:84.0
episode: 8500 max_score:84.0
episode: 9000 max_score:87.0
episode: 9500 max_score:88.0
episode: 10000 max_score:88.0


In [101]:
import time


observation = env.reset()

score = 200

while True:
    observation = rescale(observation)
    
    action = np.argmax(Q[observation])
    observation, reward, done, _ = env.step(action)
    
    score += reward
    
    env.render()
    time.sleep(0.01)
    
    if done:
        print(f"score: {int(score)}/200")
        break
env.close()

score: 74/200


DQN實現

將上面的Q表
從字典改為神經網路
可改進狀態有無限種需要紀錄的問題

In [151]:
import tensorflow as tf


model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(2, activation='sigmoid', input_shape=(2, )),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(128, activation='sigmoid'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(3)
])
model.compile(optimizer='adam', 
              loss='mse', 
              metrics=['acc'])

In [150]:
def back_rescale(observation):
    """從0~scale間的整數域映射回原環境的值域"""
    pos, speed = observation
    
    pos = pos * (max_pos - min_pos) / scale + min_pos
    speed = speed * (max_speed-min_speed) / scale + min_speed
    
    return pos, speed

In [128]:
# Q表整理成訓練集
x = [back_rescale(observation) for observation in Q]
y = list(Q.values())

In [152]:
model.fit(x, y, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x1594cf2c100>

In [153]:
import time


observation = env.reset()

score = 200

while True:
    action = model.predict(observation.reshape(1,2)).argmax()
    observation, reward, done, _ = env.step(action)
    
    score += reward
    
    env.render()
    time.sleep(0.01)
    
    if done:
        print(f"score: {int(score)}/200")
        break
env.close()

score: 83/200
