In [1]:
import numpy as np
import time

from env import Env
from dqn import DQN

In [2]:
# 这里我们定义多个游戏地图，后面可以使用不同的游戏地图观察agent的行为
def game_map_1(environment):
    environment.reset()
    environment.add_item('yellow_star', (3, 3), credit=1000, pickable=True)
    environment.add_item('yellow_star', (0, 7), credit=1000, pickable=True)
    environment.add_item('red_ball', (5, 6), terminal=True, label="Exit")

In [3]:
# set the environment
env = Env((8, 8), (160, 90), default_rewards=0)

In [4]:
# select a game
game_map_1(env)

In [5]:
for _ in range(1):
    action = env.action_space.sample()
    print(action)
    reward, next, end = env.step(action)
    print(reward, next, end)
    time.sleep(0.2)
    
env.reset()

N
0 (0, 0) False


In [6]:
# 下面两个函数将位置信息转换为状态信息
def location_one_hot(location, map_dimension):
    row, column = location
    total_rows, total_columns = map_dimension
    
    assert row < total_rows and column < total_columns
    
    # 将`行`和`列`合并为一个ID，后面用于`one hot`编码
    location_id = row * total_columns + column
    
    # `one hot`编码
    one_hot = [0] * (total_rows * total_columns)
    one_hot[location_id] = 1
    
    return one_hot

def location_multi_hot(locations, map_dimension):
    total_rows, total_columns = map_dimension
    one_hot = [0] * (total_rows * total_columns)
    
    for loc in locations:
        row, column = loc
        
        assert row < total_rows and column < total_columns
        
        # 将`行`和`列`合并为一个ID用于`one hot`编码
        location_id = row * total_columns + column
        one_hot[location_id] = 1
        
    return one_hot

# 下面函数将环境的全部信息转换成状态信息
def state_from_environment(environment):    
    # 环境地图大小
    dimension = (environment.map.n_rows, environment.map.n_columns)

    # agent状态信息
    agent_state = location_one_hot(environment.agent.at, dimension)

    star_locations = []
    exit_location = None
    for item in environment.map.all_items:        
        if item.pickable == True:
            star_locations.append(item.index)
        elif item.terminal == True:
            exit_location = item.index
        else:
            assert False, "Unknown item in the environment"
            
    # 必须给agent设置一个出口
    assert exit_location != None, "You must have a exit point for agent!"

    # 出口Exit状态信息
    exit_state = location_one_hot(exit_location, dimension)

    # 环境中星星的状态信息
    stars_state = location_multi_hot(star_locations, dimension)

    n_items = len(environment.agent.bag_of_objects)

    # 返回所有信息的组合
    #return agent_state + exit_state + stars_state
    return [n_items] + agent_state + [environment.steps]

In [7]:
# hyperparameters
lr = 0.001
gamma = 0.7

training_episodes = 1

In [8]:
dqn = DQN(env.map.n_squares+2, env.action_space.n_actions, lr, gamma, experience_limit=4000)

In [9]:
# 调试信息
action_dict = env.action_space.dict_from_actions()
print(action_dict)
episode = 0

# 随机产生经验数据用于后面进行训练
while True:
    env.reset()
    env.show = False
    
    location = env.agent.at
    next_location = location
    
    # 获取初始状态
    state = state_from_environment(env)

    this_episode = []
    episode += 1
    
    end = False
    while end == False:
        # 随机选取一个动作
        action = env.action_space.sample()
        reward, next_location, end = env.step(action)

        # 获取agent走了一步后的环境状态信息
        next_state = state_from_environment(env)

        # 记录这一步
        action_id = env.action_space.action_id(action)
        this_episode.append((state, action_id, reward, next_state, end))
        
        dqn.fill_experience((state, action_id, reward, next_state, end))
        
        # 调试信息
        #print("action: {}, action_id {}".format(action, action_id))
        
        state = next_state
        location = next_location
        
    # 调试信息
    #print("current episode {}".format(episode))
    
    # 将当前回合加入经验数据
    #dqn.fill_experience(this_episode)
    
    # 如果经验数据已满则跳出循环
    if dqn.experience.is_full:
        break

{'N': 2, 'E': 0, 'W': 1, 'S': 3}


In [10]:
# 打印Action-Value信息
def show_action_values(env):
    location = env.agent.at
    state = state_from_environment(env)
    
    state = np.array(state)
    matrix_form = state.reshape((1, *state.shape))
    action_values = dqn.action_values(matrix_form)[0]  
    
    # debug
    #print("begin of iterating...")
    #print(action_values)
    
    text_dict = {}    
    for action_id, value in enumerate(action_values):
        action = env.action_space.action_from_id(action_id)
        value = np.round(value, 2)
        
        # debug
        #print(action, end=" ")
        
        text_dict[action] = str(value)

    # debug
    #print("end of iterating...")
    
    env.draw_text(location, text_dict)

In [11]:
training_episodes = 50000
total_losses = 0
max_value = 0
for episode in range(1, training_episodes+1):
    env.reset()  # 复位环境
    env.show = False
    
    # 此时环境刚复位，获取此时的环境状态信息
    state = state_from_environment(env)

    # 记录agent走一步前后的两个位置
    location = env.agent.at
    next_location = location
    
    # 记录此回合agent的经历
    this_episode = []
    
    # 调式信息
    not_moving = []
    
    end = False  # 表明此回合是否结束
    while end == False:
        # 打印当前状态的Action-Value值
        show_action_values(env)
        
        # 查询DQN由当前状态获取动作
        # 注意：使用DQN时一般将动作编码为从0开始的连续数字，DQN内部以及其输入输出
        # 都使用这种数字代表动作。
        # 环境理解的动作可能不是数字，所以要进行转换。
        action_id = dqn.next_action(state)
        action = env.action_space.action_from_id(action_id)

        # 指导agent走一步，环境返回这一步行动产生的reward，agent的新位置和agent是否到达了出口
        reward, next_location, end = env.step(action)
        
        # 获取agent走了一步后的环境状态信息
        next_state = state_from_environment(env)

        # 记录这一步
        #this_episode.append((state, action_id, reward, next_state, end))

        dqn.fill_experience((state, action_id, reward, next_state, end))
        
        state = next_state

        if env.steps >= 300:
            break
        
        # 调式信息
        #if location == next_location:
            #not_moving.append(env.steps)
        #print(state)
        #print("step {}: {} ----> {} {} reward {}".format(env.steps, location, next_location, action, reward))
        #print(next_state)
        location = next_location
            
    
    #dqn.fill_experience(this_episode)
    
    # 训练DQN
    #if dqn.experience.is_full:
        #batch_size = 20
        #loss = dqn.learn_from_experience(batch_size)
        #dqn.experience.clear()
    #else:
        #continue
    
    #loss = dqn.train_an_episode(this_episode)

    batch_size = 400
    loss = dqn.train_batch_states(batch_size)
    
    total_losses += loss

    if batch_size == 0:
        print("trained episodes {}: current loss is {:.4f}, total avg loss is {:.4f}".format(
                                            episode, 
                                            loss, 
                                            total_losses/episode))
    else:
        print("trained episodes {}: current batch avg loss is {:.4f}, total avg loss is {:.4f}".format(
                                            episode, 
                                            loss / batch_size, 
                                            total_losses/(episode*batch_size)))

trained episodes 1: current batch avg loss is 37.3205, total avg loss is 37.3205
trained episodes 2: current batch avg loss is 12.6816, total avg loss is 25.0011
trained episodes 3: current batch avg loss is 61.2673, total avg loss is 37.0898
trained episodes 4: current batch avg loss is 12.5149, total avg loss is 30.9461
trained episodes 5: current batch avg loss is 18.6236, total avg loss is 28.4816
trained episodes 6: current batch avg loss is 6.0420, total avg loss is 24.7417
trained episodes 7: current batch avg loss is 42.4022, total avg loss is 27.2646
trained episodes 8: current batch avg loss is 6.2449, total avg loss is 24.6371
trained episodes 9: current batch avg loss is 55.7594, total avg loss is 28.0952
trained episodes 10: current batch avg loss is 36.8559, total avg loss is 28.9712
trained episodes 11: current batch avg loss is 36.9632, total avg loss is 29.6978
trained episodes 12: current batch avg loss is 0.0650, total avg loss is 27.2284
trained episodes 13: current

In [12]:
# ---- 测试 ----
# 复位环境并获取初始状态
env.reset()
env.show = True

end = False  # 表明此回合是否结束
while end == False:
    # debug
    show_action_values(env)

    # 获取环境状态
    state = state_from_environment(env)
    
    # 从DQN获取Policy并选取具有最大Value值的动作作为下一个动作
    action_id = dqn.best_action(state)
    action = env.action_space.action_from_id(action_id)

    # debug
    #print("action:", action)
    
    # agent执行此动作
    reward, next_location, end = env.step(action)
    time.sleep(0.2)
