<a href="https://colab.research.google.com/github/zyz314/100-Days-Of-ML-Code/blob/master/Project%207-1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

class QLearner:
    def __init__(self, num_states, num_actions, alpha, gamma, rar, radr, dyna, verbose):
        """
        初始化 QLearner 类

        参数:
        num_states (int): 状态数
        num_actions (int): 动作数
        alpha (float): 学习率
        gamma (float): 折扣率
        rar (float): 随机动作率
        radr (float): 随机动作衰减率
        dyna (int): Dyna更新次数
        verbose (bool): 是否打印调试信息
        """
        self.num_states = num_states
        self.num_actions = num_actions
        self.alpha = alpha
        self.gamma = gamma
        self.rar = rar
        self.radr = radr
        self.dyna = dyna
        self.verbose = verbose

        # 创建一个大小为(num_states, num_actions)的Q表，并初始化为0
        self.q_table = np.zeros((num_states, num_actions))

        # 如果使用Dyna-Q，初始化模型存储
        self.experience = []

    def choose_action(self, state):
        """
        根据当前状态选择一个行动，考虑探索和利用的平衡

        参数:
        state (int): 当前状态

        返回:
        int: 选择的行动
        """
        if np.random.random() < self.rar:
            return np.random.randint(self.num_actions)
        return np.argmax(self.q_table[state])

    def querysetstate(self, state):
        """
        设置当前状态为state，并选择一个行动

        参数:
        state (int): 当前状态

        返回:
        int: 选择的行动
        """
        self.state = state
        self.action = self.choose_action(state)
        return self.action

    def query(self, new_state, reward):
        """
        使用新的状态new_state和奖励reward更新Q表

        参数:
        new_state (int): 新状态
        reward (float): 奖励

        返回:
        int: 选择的下一个行动
        """
        # 获取当前Q值
        current_q_value = self.q_table[self.state, self.action]
        # 计算最大Q值
        next_max_q_value = np.max(self.q_table[new_state])
        # 更新Q值
        self.q_table[self.state, self.action] = (
            (1 - self.alpha) * current_q_value + self.alpha * (reward + self.gamma * next_max_q_value)
        )

        # 存储经验
        if self.dyna > 0:
            self.experience.append((self.state, self.action, new_state, reward))
            for _ in range(self.dyna):
                state, action, state_next, reward_dyn = self.experience[np.random.randint(len(self.experience))]
                max_q_value_dyn = np.max(self.q_table[state_next])
                self.q_table[state, action] = (
                    (1 - self.alpha) * self.q_table[state, action]
                    + self.alpha * (reward_dyn + self.gamma * max_q_value_dyn)
                )

        # 选择下一个行动
        self.action = self.choose_action(new_state)

        # 更新随机动作率
        self.rar *= self.radr

        # 更新当前状态
        self.state = new_state

        return self.action

    def author(self):
        """
        返回你的Georgia Tech用户名

        返回:
        str: 用户名
        """
        return 'yzheng438'

    def study_group(self):
        """
        返回学习小组成员的Georgia Tech用户名，以逗号分隔

        返回:
        str: 用户名列表
        """
        return 'yzheng438'

# 初始化Q学习器
learner = QLearner(num_states=100, num_actions=4, alpha=0.2, gamma=0.9, rar=0.98, radr=0.999, dyna=0, verbose=False)

# 读取导航问题的地图文件，初始化起始状态
def read_map_file(filename):
    """
    读取地图文件，返回地图和起始位置

    参数:
    filename (str): 地图文件的路径

    返回:
    tuple: 地图和起始位置
    """
    with open(filename, 'r') as f:
        map_data = [list(map(int, line.strip().split(','))) for line in f]

    start_position = next((i, row.index(2)) for i, row in enumerate(map_data) if 2 in row)

    return map_data, start_position

map_data, start_position = read_map_file('map_file.csv')
state = start_position[0] * 10 + start_position[1]

# 设置学习器的初始状态
action = learner.querysetstate(state)

# 定义一个函数来计算奖励
def compute_reward(state, map_data):
    """
    计算奖励

    参数:
    state (tuple): 当前状态 (行, 列)
    map_data (list): 地图

    返回:
    int: 奖励
    """
    row, col = state
    if map_data[row][col] == 0:
        return -1
    if map_data[row][col] == 5:
        return -100
    if map_data[row][col] == 3:
        return +1
    return 0

# 定义一个函数来更新状态
def update_state(state, action):
    """
    更新状态

    参数:
    state (tuple): 当前状态 (行, 列)
    action (int): 行动 (0: 北, 1: 东, 2: 南, 3: 西)

    返回:
    tuple: 新状态 (行, 列)
    """
    row, col = state
    if action == 0:
        return row - 1, col
    if action == 1:
        return row, col + 1
    if action == 2:
        return row + 1, col
    if action == 3:
        return row, col - 1

# 重复进行学习直到收敛
converged = False
while not converged:
    # 根据当前行动更新状态
    row, col = divmod(state, 10)
    next_state = update_state((row, col), action)
    next_state_flat = next_state[0] * 10 + next_state[1]

    # 根据新状态获取奖励
    reward = compute_reward(next_state, map_data)

    # 使用新的状态和奖励更新Q表并获取下一个行动
    action = learner.query(next_state_flat, reward)

    # 如果到达目标位置，重新设置起始状态
    if map_data[next_state[0]][next_state[1]] == 3:
        state = start_position[0] * 10 + start_position[1]
    else:
        state = next_state_flat

    # 判断是否收敛的逻辑 (根据具体需求实现)
    # 这里假设我们有一个简单的收敛判断方法
    # converged = check_convergence()
