<a href="https://colab.research.google.com/github/jun-wei-lin/NCHU/blob/main/Deep%20Reinforcement%20Learning/HW4_DQN%20Variants/HW4_3%20Enhance%20DQN%20for%20random%20mode%20WITH%20Training%20Tips/dqn_keras_random.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from collections import deque

# 🧪 模擬用 Random Mode 環境（起點與環境隨機）
class RandomGridEnv:
    def __init__(self):
        self.state_size = 4
        self.action_size = 2

    def reset(self):
        return np.random.rand(self.state_size)

    def step(self, action):
        next_state = np.random.rand(self.state_size)
        reward = 1.0 if random.random() > 0.3 else 0.0
        done = random.random() > 0.9
        return next_state, reward, done, {}

# Q-Network 建立
def create_q_model(state_size, action_size):
    inputs = keras.Input(shape=(state_size,))
    x = layers.Dense(64, activation='relu')(inputs)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(action_size, activation='linear')(x)
    return keras.Model(inputs=inputs, outputs=outputs)

# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, transition):
        self.buffer.append(transition)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

# 訓練流程
def train_dqn(env, episodes=100, gamma=0.99, epsilon=0.1, batch_size=32, buffer_capacity=1000):
    state_size = env.state_size
    action_size = env.action_size

    q_model = create_q_model(state_size, action_size)
    target_model = create_q_model(state_size, action_size)
    target_model.set_weights(q_model.get_weights())

    optimizer = keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0)  # Gradient clipping
    loss_fn = keras.losses.MeanSquaredError()

    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=1e-3,
        decay_steps=100,
        decay_rate=0.96,
        staircase=True
    )

    replay_buffer = ReplayBuffer(buffer_capacity)

    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            if random.random() < epsilon:
                action = random.randint(0, action_size - 1)
            else:
                q_values = q_model.predict(np.expand_dims(state, axis=0), verbose=0)
                action = np.argmax(q_values[0])

            next_state, reward, done, _ = env.step(action)
            replay_buffer.add((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            if len(replay_buffer) >= batch_size:
                batch = replay_buffer.sample(batch_size)
                states, actions, rewards, next_states, dones = zip(*batch)

                states = np.array(states)
                next_states = np.array(next_states)
                rewards = np.array(rewards, dtype=np.float32)
                dones = np.array(dones, dtype=np.float32)

                next_qs = target_model.predict(next_states, verbose=0)
                max_next_qs = np.max(next_qs, axis=1)
                targets = rewards + (1 - dones) * gamma * max_next_qs

                with tf.GradientTape() as tape:
                    qs = q_model(states)
                    selected_qs = tf.reduce_sum(qs * tf.one_hot(actions, action_size), axis=1)
                    loss = loss_fn(targets, selected_qs)

                grads = tape.gradient(loss, q_model.trainable_variables)
                optimizer.learning_rate = lr_schedule(episode)
                optimizer.apply_gradients(zip(grads, q_model.trainable_variables))

        if episode % 10 == 0:
            target_model.set_weights(q_model.get_weights())

        print(f"Episode {episode + 1}/{episodes} - Total Reward: {total_reward}")

    return q_model

# 測試流程
def test_policy(env, model, episodes=5):
    for ep in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0
        while not done:
            q_values = model.predict(np.expand_dims(state, axis=0), verbose=0)
            action = np.argmax(q_values[0])
            state, reward, done, _ = env.step(action)
            total_reward += reward
        print(f"Test Episode {ep+1}: Total Reward = {total_reward}")

# 主程式入口
if __name__ == "__main__":
    env = RandomGridEnv()
    model = train_dqn(env)
    test_policy(env, model)


Episode 1/100 - Total Reward: 4.0
Episode 2/100 - Total Reward: 2.0
Episode 3/100 - Total Reward: 4.0
Episode 4/100 - Total Reward: 1.0
Episode 5/100 - Total Reward: 1.0
Episode 6/100 - Total Reward: 4.0
Episode 7/100 - Total Reward: 3.0
Episode 8/100 - Total Reward: 2.0
Episode 9/100 - Total Reward: 11.0
Episode 10/100 - Total Reward: 5.0
Episode 11/100 - Total Reward: 0.0
Episode 12/100 - Total Reward: 10.0
Episode 13/100 - Total Reward: 3.0
Episode 14/100 - Total Reward: 1.0
Episode 15/100 - Total Reward: 21.0
Episode 16/100 - Total Reward: 1.0
Episode 17/100 - Total Reward: 11.0
Episode 18/100 - Total Reward: 3.0
Episode 19/100 - Total Reward: 15.0
Episode 20/100 - Total Reward: 19.0
Episode 21/100 - Total Reward: 2.0
Episode 22/100 - Total Reward: 16.0
Episode 23/100 - Total Reward: 16.0
Episode 24/100 - Total Reward: 1.0
Episode 25/100 - Total Reward: 8.0
Episode 26/100 - Total Reward: 11.0
Episode 27/100 - Total Reward: 2.0
Episode 28/100 - Total Reward: 8.0
Episode 29/100 - Tot