In [1]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
from collections import deque

In [2]:
env = gym.make('CartPole-v0')
input_shape = [4]
n_outputs = 2

model = keras.models.Sequential([
    keras.layers.Dense(32, activation='elu', input_shape=input_shape),
    keras.layers.Dense(32, activation='elu'),
    keras.layers.Dense(2)
])

In [3]:

def epsilon_greedy_policy(state, epsilon=0):
    if np.random.randn() < epsilon:
        return np.random.randint(2)
    else:
        Q_value = model.predict(state[np.newaxis])
        return np.argmax(Q_value[0])
# replay_buffer 存储的是 <= maxlen 个五元组
replay_buffer = deque(maxlen=2000)


In [4]:

def sample_experiences(batch_size):
    # 从 replay_buffer 中取出 batch_size 个元素
    indic_ind = np.random.randint(len(replay_buffer), size=batch_size )
    experiences = [replay_buffer[experience] for experience in indic_ind]
    states, actions, rewards, nexts, dones = [
        np.array([experience[index] for experience in experiences])
        for index in range(5)
    ]
    return states, actions, rewards, nexts, dones


In [5]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    nextstate, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, nextstate, done))

    return nextstate, reward, done, info


In [6]:
batch_size = 32
discount_factor = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn = keras.losses.mean_squared_error


In [11]:

def training_step(batch_size):
    # Q_learning Target: reward + gamma*argmax{a'}Q(s', a') - Q(s, a)
    experiences = sample_experiences(batch_size)
    states, actions, rewards, nexts, dones = experiences
    # 计算Q(s',a')时不能放在 tape 中，因为这个时候需要 fixed network
    Q_values = np.max(model.predict(nexts), axis=1)
    Q_Target = rewards + discount_factor * Q_values
    # 使用 one-hot 编码 & 元素乘法 再根据坐标轴求和 结果会变成Q_values结构
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        # Wrong
        # Q_values = tf.reduce_sum(mask * model.predict(states), axis=1, keepdims=True)
        # 因为是在训练阶段，如果使用predict函数，所进行的操作不会把变量算成trainable_variables
        Q_values = tf.reduce_sum(mask * model(states), axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(Q_Target, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))


In [8]:
import os
root_logdir = os.path.join(os.curdir, 'my_logs')

def get_run_logdir():
    import time
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    return os.path.join(root_logdir, run_id)

rurn_logdir = get_run_logdir()

In [13]:
# test
test_logdir = get_run_logdir()
writer = tf.summary.create_file_writer(test_logdir)
with writer.as_default():
    for step in range(1, 1000+1):
        tf.summary.scalar("my_scalar", np.sin(step / 10), step=step)
        data = (np.random.randn(100) + 2) * step / 100
        tf.summary.histogram("my_hist", data, buckets=50, step=step)
        images = np.random.randn(2, 32, 32, 3)
        tf.summary.image("my_image", images * step / 1000, step=step)
        texts = ['the step is ' + str(step) , "its ssquare is " + str(step**2)]
        tf.summary.text('my_text', texts, step=step)
        sine_wave = tf.math.sin(tf.range(12000) / 48000 * 2 * np.pi * step)
        audio = tf.reshape(tf.cast(sine_wave, tf.float32), [1, -1, 1])
        tf.summary.audio('my_audio', audio, sample_rate=48000, step=step)