In [1]:
import gymnasium as gym
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import random
import os
from tensorflow.keras.models import load_model

In [2]:
# 定义代理类
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = []
        self.gamma = 0.95    # 折扣率
        self.epsilon = 0.1   # 探索率
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.005
        self.model = self._build_model()

    def _build_model(self):
        # 构建并返回神经网络架构
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=0.001))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # 返回最大Q值的动作

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    

In [3]:
def one_hot_state(state, state_size):
    one_hot = np.zeros(state_size)
    one_hot[state] = 1
    return np.reshape(one_hot, [1, state_size])

# 实验执行函数
def run_experiment(agent, env, episodes, batch_size, max_steps):
    steps = 0
    for e in range(episodes):
        observation = env.reset()
        state = observation[0]
        state = one_hot_state(state, env.observation_space.n)
        done = False
        total_reward = 0
        
        while not done:
            action = agent.act(state)
            next_state, reward, done, *_ = env.step(action)
            next_state = one_hot_state(next_state, env.observation_space.n)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            steps += 1  # 增加步数计数器

            # 训练代理
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)

            # 如果达到最大步数，则强制结束回合
            if steps >= max_steps:
                break

        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward}")

In [4]:
# 评估训练过的代理
def evaluate_agent(agent, env, episodes):
    total_rewards = 0
    max_steps = 99  # set a maximum of 99 steps per episode.
    for _ in range(episodes):
        observation = env.reset()
        state = observation[0]
        state = one_hot_state(state, env.observation_space.n)
        done = False
        total_reward = 0
        for step in range(max_steps):
            if done:
                break
            else:
                action = np.argmax(agent.model.predict(state)[0])  # 选择最佳动作
                next_state, reward, done, _ = env.step(action)
                state = one_hot_state(next_state, env.observation_space.n)
                total_reward += reward
            total_rewards += total_reward
        avg_reward = total_rewards / episodes
    print(f"Average Reward after {episodes} episodes: {avg_reward}")

In [5]:

# 主函数
def main():
    env = gym.make('Taxi-v3')
    state_size = env.observation_space.n
    action_size = env.action_space.n
    episodes = 1000
    batch_size = 32
    max_steps = 99  # 设置最大步数以避免无限循环

    # 创建代理实例
    agent = DQNAgent(state_size, action_size)

    # 运行实验
    run_experiment(agent, env, episodes, batch_size, max_steps)

    # 保存训练后的代理模型
    agent.model.save('trained_taxi_agent.h5')
    print("Trained model saved as trained_taxi_agent.h5")

# 调用主函数
if __name__ == "__main__":
    main()




Episode 1/1000, Total Reward: -153
Episode 2/1000, Total Reward: -1
Episode 3/1000, Total Reward: -1
Episode 4/1000, Total Reward: -1
Episode 5/1000, Total Reward: -1
Episode 6/1000, Total Reward: -1
Episode 7/1000, Total Reward: -1
Episode 8/1000, Total Reward: -1
Episode 9/1000, Total Reward: -1
Episode 10/1000, Total Reward: -1
Episode 11/1000, Total Reward: -1
Episode 12/1000, Total Reward: -1
Episode 13/1000, Total Reward: -1
Episode 14/1000, Total Reward: -1
Episode 15/1000, Total Reward: -1
Episode 16/1000, Total Reward: -1
Episode 17/1000, Total Reward: -1
Episode 18/1000, Total Reward: -1
Episode 19/1000, Total Reward: -1
Episode 20/1000, Total Reward: -1
Episode 21/1000, Total Reward: -1
Episode 22/1000, Total Reward: -1
Episode 23/1000, Total Reward: -1
Episode 24/1000, Total Reward: -1
Episode 25/1000, Total Reward: -1
Episode 26/1000, Total Reward: -1
Episode 27/1000, Total Reward: -1
Episode 28/1000, Total Reward: -1
Episode 29/1000, Total Reward: -1
Episode 30/1000, Tota

KeyboardInterrupt: 

In [None]:
# 设置环境
env = gym.make('Taxi-v3', render_mode = 'human')
episodes = 100  # 评估的回合数

# 加载训练后的模型
trained_model = load_model('trained_taxi_agent.h5')
trained_model.epsilon = 0

# 评估代理
evaluate_agent(trained_model, env, episodes)


print(f"Episode {episode + 1}: Total Reward = {total_reward}")