In [1]:
import gymnasium as gym
import tensorflow as tf
import numpy as np
import random
import collections

### Huber loss

Huber损失是用于稳健回归的损失函数

\begin{equation}
L_\delta = 
\left\{
             \begin{array}{lr}
             \frac{1}{2} (y-f(x))^{2} & |y-f(x)|\le \delta\\
             \delta \cdot (|y-f(x)|-\frac{1}{2}\delta) & otherwise
             \end{array}
\right.
\end{equation}

References: 
> https://en.wikipedia.org/wiki/Huber_loss 

> https://www.tensorflow.org/api_docs/python/tf/losses/huber_loss



In [2]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = collections.deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.99
        self.learning_rate = 0.001
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    """Huber loss for Q Learning


    """

    def _huber_loss(self, y_true, y_pred, clip_delta=1.0):
        """
        err = y_true - y_pred
        ret = 0
        if abs(error) <= clip_delta :
            ret = 0.5 * err**2
        else:
            ret = clip_delta * (abs(err) - 0.5 * clip_delta)
        return 
        """
        error = y_true - y_pred
        cond  = tf.keras.backend.abs(error) <= clip_delta

        squared_loss = 0.5 * tf.keras.backend.square(error)
        quadratic_loss = 0.5 * tf.keras.backend.square(clip_delta) + \
                        clip_delta * (tf.keras.backend.abs(error) - clip_delta)

        return tf.keras.backend.mean(tf.where(cond, squared_loss, quadratic_loss))

    def _build_model(self):
        """
        创建神经网络模型
        input_layer:
        hidden_layer:
        output_layer:
        """
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(tf.keras.layers.Dense(24, activation='relu'))
        model.add(tf.keras.layers.Dense(self.action_size, activation='linear'))
        model.compile(loss=self._huber_loss,
                      optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def update_target_model(self):
        """
        更新模型权重
        """
        # copy weights from model to target_model
        self.target_model.set_weights(self.model.get_weights())

    def memorize(self, state, action, reward, next_state, done):
        """
        存储 状态、动作、回报等信息
        """
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """

        """
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action

    def replay(self, batch_size):
        """

        """
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = self.model.predict(state)
            if done:
                target[0][action] = reward
            else:
                # a = self.model.predict(next_state)[0]
                t = self.target_model.predict(next_state)[0]
                target[0][action] = reward + self.gamma * np.amax(t)
                # target[0][action] = reward + self.gamma * t[np.argmax(a)]
            self.model.fit(state, target, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        """
        加载已有模型
        """
        self.model.load_weights(name)

    def save(self, name):
        """
        保存模型
        """
        self.model.save_weights(name)


In [21]:
EPISODES = 5

# create environment
env = gym.make('CartPole-v1')
# 获取环境中的状态数量
state_size = env.observation_space.shape[0]
# 获取环境中的动作数量
action_size = env.action_space.n
# 创建代理
agent = DQNAgent(state_size, action_size)
#  加载已有权重模型
# agent.load("./save/cartpole-ddqn.h5")

done = False

batch_size = 32

for i in range(EPISODES):
    # 重置环境
    state, _ = env.reset()
    
    # print("\n state:", state, np.shape(state))

    state = np.reshape(state, [1, state_size])

    print("\n state:", state, np.shape(state))

    # 每一个新的环境中执行5000次动作
    for time in range(50):
        # env.render() 函数用于渲染出当前的智能体以及环境的状态
        # env.render()
        # 根据当前环境做出动作
        action = agent.act(state)
        # 在环境中执行当前动作得到反馈
        next_state, reward, done, _, _ = env.step(action)
        # reward = reward if not done else -10
        
        x,x_dot,theta,theta_dot = next_state
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        reward = r1 + r2
        
        next_state = np.reshape(next_state, [1, state_size])
        # 
        agent.memorize(state, action, reward, next_state, done)
        # 状态转换
        state = next_state
        
        print('.', end='')

        if done:
            agent.update_target_model()
            print("episode: {}/{}, score: {}, e: {:.2}"
                    .format(i, EPISODES, time, agent.epsilon))
            break
        
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)
    # if e % 10 == 0:
    #     agent.save("./save/cartpole-ddqn.h5")


 state: [[-0.04790396  0.00167995  0.03117294  0.04469715]] (1, 4)
..........episode: 0/5, score: 9, e: 1.0

 state: [[-0.00307816  0.02904799  0.04880029  0.01994588]] (1, 4)
.....................................episode: 1/5, score: 36, e: 0.87

 state: [[-0.03839378 -0.03257278  0.01113296 -0.00751979]] (1, 4)
........................episode: 2/5, score: 23, e: 0.69

 state: [[0.00925898 0.01000441 0.03320426 0.01908784]] (1, 4)
...........episode: 3/5, score: 10, e: 0.62

 state: [[-0.0227491  -0.00637077  0.02658374 -0.04080787]] (1, 4)
.......................episode: 4/5, score: 22, e: 0.5
