# 案例5-1：使用Sarsa算法处理金币问题

In [2]:
import random

# 定义状态空间states和动作空间actions
states = [1, 2, 3, 4, 5, 6, 7, 8]
actions = ['n', 'e', 's', 'w']

def greedy(Q,state):
    amax = 0
    key = "%d_%s" % (state, actions[0])

    qmax = Q[key]
    for i in range(len(actions)):
        key = "%d_%s" % (state, actions[i])
        q = Q[key]
        if qmax < q:
            qmax = q
            amax = i
    return actions[amax]

这段代码实现了ε-贪婪策略，在每个状态下以 1 - epsilon 的概率选择当前Q值最大的动作，以 epsilon 的概率随机选择其他动作，从而在探索和利用之间取得平衡。

In [None]:

def epsilon_greedy(Q, state, epsilon):
    """
    Q: Q函数，存储每个状态-动作对的价值。
    state: 当前状态。
    epsilon: ε值，用于控制探索与利用的平衡。
    return: 根据 ε-greedy 策略选择的动作。
    """

    # 找到当前状态下的最佳动作
    amax = 0
    key = "%d_%s"%(state, actions[0])
    qmax = Q[key]
    for i in range(len(actions)):
        key = "%d_%s"%(state, actions[i]) # 遍历所有动作，找到在当前状态下具有最大Q值的动作索引 amax。
        q = Q[key]
        if qmax < q:
            qmax = q
            amax = i
    
    # 计算每个动作的选择概率
    pro = [0.0 for i in range(len(actions))]
    pro[amax] += 1-epsilon # 将最佳动作的选择概率设置为 1-epsilon。
    for i in range(len(actions)): # 将其他动作的选择概率均分 epsilon/len(actions)。
        pro[i] += epsilon/len(actions)

    # 根据概率选择动作
    r = random.random()
    s = 0.0
    for i in range(len(actions)): # 遍历所有动作，根据选择概率选择动作。
        s += pro[i]
        if s>= r: return actions[i] # 当随机数 r 落在某个动作的选择概率区间内时，选择该动作。
    return actions[len(actions)-1]

Q函数（Q-Function）是价值函数的一种。它表示在给定状态和动作下，智能体在未来所能获得的期望回报。具体来说，Q函数 \( Q(s, a) \) 表示在状态 \( s \) 下采取动作 \( a \) 后，智能体在未来所能获得的累积回报的期望值。

In [5]:
from gym.envs.registration import register
import gym_gold.envs
import gym

env = gym.make('Gold-v0')
alpha=0.1
gamma=0.5
epsilon=0.1
random.seed(0)

Q = dict()
# 通过遍历状态空间和动作空间，初始化Q函数，全部设置为0
for s in states:
    for a in actions:
        key = "%d_%s"%(s,a)
        Q[key]=0
Q


{'1_n': 0,
 '1_e': 0,
 '1_s': 0,
 '1_w': 0,
 '2_n': 0,
 '2_e': 0,
 '2_s': 0,
 '2_w': 0,
 '3_n': 0,
 '3_e': 0,
 '3_s': 0,
 '3_w': 0,
 '4_n': 0,
 '4_e': 0,
 '4_s': 0,
 '4_w': 0,
 '5_n': 0,
 '5_e': 0,
 '5_s': 0,
 '5_w': 0,
 '6_n': 0,
 '6_e': 0,
 '6_s': 0,
 '6_w': 0,
 '7_n': 0,
 '7_e': 0,
 '7_s': 0,
 '7_w': 0,
 '8_n': 0,
 '8_e': 0,
 '8_s': 0,
 '8_w': 0}

In [6]:
gold=0
bad=0
good=0

for episode in range(1000):
    # 每次学习开始的时候，随机设置当前状态s0，并且根据∈-贪婪算法获得对应的动作a0
    s0 = env.reset()
    a0 = epsilon_greedy(Q,s0,epsilon)

    #狗屎运 初始化就拿到金币了
    if s0 == 7 :
        good+=1
        continue

    if s0 == 6 or s0 == 8 :
        bad+=1
        continue

    # 设置一个学习周期内学习步长为20，因为游戏确实比较简单，一次学习过程走20步足够了
    for t in range(20):
        observation, reward, done, info = env.step(a0)
        s1=observation
        
        # 贪婪算法
        # a1 = greedy(Q, s1)

        # epsilon贪婪算法
        a1 = epsilon_greedy(Q,s1,epsilon)

        # 根据Sarsa算法更新Q函数，并重新设置s0和a0的值
        key0 = "%d_%s" % (s0, a0)
        key1 = "%d_%s" % (s1, a1)
        # 更新Q函数
        Q[key0] = Q[key0] + alpha * (reward + gamma * Q[key1] - Q[key0])
        a0=a1
        s0=s1
        if done and s1==7 :
            print("Get Gold {}th Episode finished after {} timesteps ".format(episode,t+1))
            gold+=1
            break
        # 如果最新状态表明游戏已经结束，完成本次循环
        if done :
            print("Episode finished after {} timesteps ".format(t + 1))
            break

Episode finished after 9 timesteps 
Episode finished after 19 timesteps 
Get Gold 3th Episode finished after 12 timesteps 
Get Gold 5th Episode finished after 7 timesteps 
Get Gold 6th Episode finished after 5 timesteps 
Get Gold 7th Episode finished after 3 timesteps 
Get Gold 8th Episode finished after 3 timesteps 
Get Gold 9th Episode finished after 3 timesteps 
Get Gold 10th Episode finished after 3 timesteps 
Get Gold 12th Episode finished after 3 timesteps 
Get Gold 13th Episode finished after 3 timesteps 
Get Gold 14th Episode finished after 3 timesteps 
Get Gold 15th Episode finished after 3 timesteps 
Get Gold 16th Episode finished after 3 timesteps 
Get Gold 17th Episode finished after 3 timesteps 
Get Gold 18th Episode finished after 3 timesteps 
Get Gold 19th Episode finished after 3 timesteps 
Get Gold 20th Episode finished after 4 timesteps 
Get Gold 21th Episode finished after 3 timesteps 
Get Gold 22th Episode finished after 3 timesteps 
Get Gold 23th Episode finished a

  "Future gym versions will require that `Env.reset` can be passed a `seed` instead of using `Env.seed` for resetting the environment random number generator."
  "Future gym versions will require that `Env.reset` can be passed `options` to allow the environment initialisation to be passed additional information."
  f"The result returned by `env.reset()` was not a tuple of the form `(obs, info)`, where `obs` is a observation and `info` is a dictionary containing additional information. Actual type: `{type(result)}`"
  "Core environment is written in old step API which returns one bool instead of two. "


In [7]:

"episode:{} get gold:{} bad luck:{} good luck:{} lose:{}".format(episode,gold,bad,good,episode-gold-good-bad)

'episode:999 get gold:975 bad luck:0 good luck:0 lose:24'

# 案例5-2：使用Q Learning算法处理金币问题

In [8]:
import random

states = [1, 2, 3, 4, 5, 6, 7, 8]
actions = ['n', 'e', 's', 'w']

def greedy(Q,state):
    amax = 0
    key = "%d_%s" % (state, actions[0])

    qmax = Q[key]
    for i in range(len(actions)):
        key = "%d_%s" % (state, actions[i])
        q = Q[key]
        if qmax < q:
            qmax = q
            amax = i
    return actions[amax]

def epsilon_greedy(Q, state, epsilon):
    amax = 0
    key = "%d_%s"%(state, actions[0])
    qmax = Q[key]
    for i in range(len(actions)):
        key = "%d_%s"%(state, actions[i])
        q = Q[key]
        if qmax < q:
            qmax = q
            amax = i

    pro = [0.0 for i in range(len(actions))]
    pro[amax] += 1-epsilon
    for i in range(len(actions)):
        pro[i] += epsilon/len(actions)


    r = random.random()
    s = 0.0
    for i in range(len(actions)):
        s += pro[i]
        if s>= r: return actions[i]
    return actions[len(actions)-1]

In [None]:

import gym

env = gym.make('Gold-v0')
alpha=0.1
gamma=0.5
epsilon=0.1
random.seed(0)

Q = dict()

for s in states:
    for a in actions:
        key = "%d_%s"%(s,a)
        Q[key]=0

gold=0
bad=0
good=0

for episode in range(1000):
    s0 = env.reset()
    a0 = epsilon_greedy(Q,s0,epsilon)

    #狗屎运 初始化就拿到金币了
    if s0 == 7 :
        good+=1
        continue

    if s0 == 6 or s0 == 8 :
        bad+=1
        continue

    for t in range(20):
        # 使用a0作用于环境，转移到状态s1，得到奖励值reward
        observation, reward, done, info = env.step(a0)
        s1=observation
        # 【*****变化点********】根据贪婪算法获得s1对应的动作a1，注意这里使用的是贪婪算法而不是∈-贪婪算法
        a1 = greedy(Q, s1)
        # 【******************】
        key0 = "%d_%s" % (s0, a0)
        key1 = "%d_%s" % (s1, a1)
        # 更新Q函数
        Q[key0] = Q[key0] + alpha * (reward + gamma * Q[key1] - Q[key0])

        # 【*****变化点********】根据∈-贪婪算法获得s1对应的动作a1，并重新设置s0和a0。
        a1 = epsilon_greedy(Q,s1,epsilon)
        # 【******************】可以看出Q Learning算法最终选择执行的操作还是根据∈-贪婪算法，但是更新Q值使用的是贪婪算法对应的值
        a0=a1
        s0=s1
        if done and s1==7 :
            print("Get Gold {}th Episode finished after {} timesteps ".format(episode,t+1))
            gold+=1
            break
        if done :
            print("Episode finished after {} timesteps ".format(t + 1))
            break

Episode finished after 9 timesteps 
Episode finished after 19 timesteps 
Episode finished after 13 timesteps 
Episode finished after 3 timesteps 
Episode finished after 2 timesteps 
Episode finished after 2 timesteps 
Episode finished after 10 timesteps 
Episode finished after 1 timesteps 
Episode finished after 9 timesteps 
Episode finished after 7 timesteps 
Episode finished after 19 timesteps 
Get Gold 40th Episode finished after 15 timesteps 
Episode finished after 18 timesteps 
Episode finished after 12 timesteps 
Episode finished after 19 timesteps 
Episode finished after 20 timesteps 
Episode finished after 3 timesteps 
Episode finished after 12 timesteps 
Get Gold 55th Episode finished after 12 timesteps 
Get Gold 56th Episode finished after 9 timesteps 
Get Gold 57th Episode finished after 3 timesteps 
Get Gold 58th Episode finished after 3 timesteps 
Get Gold 59th Episode finished after 5 timesteps 
Get Gold 60th Episode finished after 4 timesteps 
Get Gold 61th Episode finis

In [12]:
"episode:{} get gold:{} bad luck:{} good luck:{} lose:{}".format(episode,gold,bad,good,episode-gold-good-bad)

'episode:999 get gold:920 bad luck:0 good luck:0 lose:79'

In [None]:
# pip install -e /d:/Documents/天池竞赛/WebSecMLLearnNote/Code3

# 案例5-3：使用DQN算法处理CartPole问题

In [24]:
import random
import gym
import numpy as np
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras.layers import Dense, Input
import os
import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')
tf.autograph.set_verbosity(0)

这段代码实现了一个基于深度 Q 网络（DQN）的智能体类，用于强化学习任务。智能体通过与环境交互，存储经验数据，并使用这些数据进行模型训练和更新，以学习最优的决策策略。同时，还提供了模型的保存和加载功能，以及可视化模型结构的操作。

In [9]:
# 1.定义DQNAgent类
class DQNAgent:
    def __init__(self, state_size, action_size):
        """
        初始化 DQNAgent 类的实例

        :param state_size: 环境状态的维度大小
        :param action_size: 动作空间的大小，即可能的动作数量
        """
        self.state_size = state_size
        self.action_size = action_size
        # 创建一个双端队列作为记忆库，最大长度为 2000，用于存储智能体的经验
        self.memory = deque(maxlen=2000)
        # 折扣因子，用于计算未来奖励的折扣值，0.95 表示未来奖励的重要性相对当前奖励有所降低
        self.gamma = 0.95
        # 探索率，初始值为 0.4，表示智能体有 40%的概率进行随机探索
        self.epsilon = 0.4
        self.epsilon_min = 0.01  # 探索率的最小值，当探索率降低到这个值时，不再继续降低
        self.epsilon_decay = 0.995  # 探索率的衰减率，每次更新后，探索率乘以这个值
        self.learning_rate = 0.001  # 模型学习率，用于调整模型权重的更新步长

        # 构建深度 Q 网络模型
        self.model = self._build_model()
    def _plot_model(self):
        """
        使用 plot_model 函数可视化深度 Q 学习模型结构
        """
        # 可视化 MLP 结构，并将可视化结果保存为 'dqn-cartpole-v0-mlp.png' 文件，不显示形状信息
        plot_model(self.model, to_file='dqn-cartpole-v0-mlp.png', show_shapes=True)

    def _build_model(self):
        """
        构建深度 Q 学习模型

        :return: 构建好的 Keras 模型
        """
        # 创建一个顺序模型
        model = Sequential()
        # 添加输入层，输入形状为 (state_size,)，即根据环境状态的维度确定输入形状
        model.add(Input(shape=(self.state_size,)))
        # 添加第一个隐藏层，有 24 个神经元，使用 relu 激活函数
        model.add(Dense(24, activation='relu'))
        # 添加第二个隐藏层，同样有 24 个神经元，使用 relu 激活函数
        model.add(Dense(24, activation='relu'))
        # 添加输出层，神经元数量为 action_size，使用线性激活函数，输出每个动作的 Q 值估计
        model.add(Dense(self.action_size, activation='linear'))
        # 编译模型，使用均方误差（MSE）作为损失函数，Adam 优化器，并传入学习率
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        """
        将智能体的经验（状态、动作、奖励、下一个状态、是否结束）存储到记忆库中

        :param state: 当前状态
        :param action: 采取的动作
        :param reward: 获得的奖励
        :param next_state: 下一个状态
        :param done: 是否完成当前 episode
        """
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """
        根据当前状态决定智能体的动作

        :param state: 当前状态
        :return: 智能体要采取的动作
        """
        # 实现∈-贪婪算法，以 epsilon 的概率进行随机探索，选择一个随机动作
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        # 否则，使用模型预测当前状态下每个动作的 Q 值，并选择 Q 值最大的动作
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        """
        从记忆库中随机采样一批经验数据，并进行模型训练更新

        :param batch_size: 每次采样的经验数据数量
        """
        # 从记忆库中随机抽取 batch_size 个经验数据
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            # 计算目标 Q 值，如果 episode 未结束，则目标 Q 值为当前奖励加上未来最大 Q 值的折扣值
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            # 预测当前状态下的 Q 值
            target_f = self.model.predict(state)
            # 更新目标 Q 值中采取的动作对应的 Q 值为计算得到的目标 Q 值
            target_f[0][action] = target
            # 使用更新后的目标 Q 值对模型进行一次训练，不显示训练过程的详细信息
            self.model.fit(state, target_f, epochs=1, verbose=0)
        # 如果探索率大于最小值，则按照衰减率进行衰减
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        """
        加载预训练的模型权重

        :param name: 权重文件的名称
        """
        self.model.load_weights(name)

    def save(self, name):
        """
        保存当前模型的权重

        :param name: 保存权重的文件名称
        """
        self.model.save_weights(name)

In [25]:
env = gym.make('CartPole-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

state_size
#print action_size

  f"The environment {id} is out of date. You should consider "


4

In [26]:

agent = DQNAgent(state_size, action_size)

done = False
batch_size = 32
avg=0
agent._plot_model()

In [28]:
EPISODES = 20
for e in range(EPISODES):
    state = env.reset()
    # print(state)
    state = np.reshape(state[0], [1, state_size])
    for time in range(100):
        # env.render()
        action = agent.act(state)
        next_state, reward, done,_,_ = env.step(action)
        # 如果游戏结束，需要把奖励reward设置为-10
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        # 将当前的状态state，动作action，奖励reward保存到记忆体中
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print("episode: {}/{}, score: {}, e: {:.2}" .format(e, EPISODES, time, agent.epsilon))
            avg+=time
            break
    if len(agent.memory) > batch_size:
        agent.replay(batch_size)

# print "Avg score:{}".format(avg/1000)

episode: 0/20, score: 8, e: 0.4
episode: 1/20, score: 10, e: 0.4
episode: 2/20, score: 10, e: 0.4
episode: 3/20, score: 11, e: 0.39
episode: 4/20, score: 8, e: 0.39
episode: 5/20, score: 9, e: 0.39
episode: 6/20, score: 10, e: 0.39
episode: 7/20, score: 8, e: 0.39
episode: 8/20, score: 9, e: 0.38
episode: 9/20, score: 7, e: 0.38
episode: 10/20, score: 9, e: 0.38
episode: 11/20, score: 11, e: 0.38
episode: 12/20, score: 10, e: 0.38
episode: 13/20, score: 9, e: 0.37
episode: 14/20, score: 8, e: 0.37
episode: 15/20, score: 11, e: 0.37
episode: 16/20, score: 13, e: 0.37
episode: 17/20, score: 11, e: 0.37
episode: 18/20, score: 8, e: 0.37
episode: 19/20, score: 9, e: 0.36


In [None]:
"Avg score:{}".format(avg/20)