/
rl.py
82 lines (72 loc) · 4.24 KB
/
rl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque
num_episodes = 500 # 游戏训练的总episode数量
num_exploration_episodes = 100 # 探索过程所占的episode数量
max_len_episode = 1000 # 每个episode的最大回合数
batch_size = 32 # 批次大小
learning_rate = 1e-3 # 学习率
gamma = 1. # 折扣因子
initial_epsilon = 1. # 探索起始时的探索率
final_epsilon = 0.01 # 探索终止时的探索率
class QNetwork(tf.keras.Model):
def __init__(self):
super().__init__()
self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense3 = tf.keras.layers.Dense(units=2)
def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
return x
def predict(self, inputs):
q_values = self(inputs)
return tf.argmax(q_values, axis=-1)
if __name__ == '__main__':
env = gym.make('CartPole-v1') # 实例化一个游戏环境,参数为游戏名称
model = QNetwork()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
replay_buffer = deque(maxlen=10000) # 使用一个 deque 作为 Q Learning 的经验回放池
epsilon = initial_epsilon
for episode_id in range(num_episodes):
state = env.reset() # 初始化环境,获得初始状态
epsilon = max( # 计算当前探索率
initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
final_epsilon)
for t in range(max_len_episode):
env.render() # 对当前帧进行渲染,绘图到屏幕
if random.random() < epsilon: # epsilon-greedy 探索策略,以 epsilon 的概率选择随机动作
action = env.action_space.sample() # 选择随机动作(探索)
else:
action = model.predict(np.expand_dims(state, axis=0)).numpy() # 选择模型计算出的 Q Value 最大的动作
action = action[0]
# 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
next_state, reward, done, info = env.step(action)
# 如果游戏Game Over,给予大的负奖励
reward = -10. if done else reward
# 将(state, action, reward, next_state)的四元组(外加 done 标签表示是否结束)放入经验回放池
replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
# 更新当前 state
state = next_state
if done: # 游戏结束则退出本轮循环,进行下一个 episode
print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
break
if len(replay_buffer) >= batch_size:
# 从经验回放池中随机取一个批次的四元组,并分别转换为 NumPy 数组
batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
*random.sample(replay_buffer, batch_size))
batch_state, batch_reward, batch_next_state, batch_done = \
[np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
batch_action = np.array(batch_action, dtype=np.int32)
q_value = model(batch_next_state)
y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done) # 计算 y 值
with tf.GradientTape() as tape:
loss = tf.keras.losses.mean_squared_error( # 最小化 y 和 Q-value 的距离
y_true=y,
y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
)
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables)) # 计算梯度并更新参数