In [1]:
# 检查依赖包版本是否正确
!pip list | grep paddlepaddle
!pip list | grep parl
!pip list | grep rlschool

paddlepaddle-gpu              1.8.2.post107
parl                          1.3.1
rlschool                      0.3.1


In [2]:
import os
import numpy as np

import parl
from parl import layers
from paddle import fluid
from parl.utils import logger
from parl.utils import action_mapping # 将神经网络输出映射到对应的 实际动作取值范围内
from parl.utils import ReplayMemory # 经验回放
from parl.algorithms import DDPG
from parl.utils.scheduler import LinearDecayScheduler, PiecewiseScheduler

from rlschool import make_env  # 使用 RLSchool 创建飞行器环境

import matplotlib.pyplot as plt

In [3]:
class ActorModel(parl.Model):
    def __init__(self, act_dim, model_tag): 
        self.model_tag = model_tag
        if self.model_tag == 1:
            # simple model
            hid1_size = 100
            hid2_size = 100
            self.fc1 = layers.fc(size=hid1_size, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc2 = layers.fc(size=hid2_size, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc3 = layers.fc(size=act_dim, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
        else:
            hid1_size = 100
            hid2_size = 100
            hid3_size = 100
            self.fc1 = layers.fc(size=hid1_size, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc2 = layers.fc(size=hid2_size, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc3 = layers.fc(size=hid3_size, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc4 = layers.fc(size=act_dim, act='tanh', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))

    def policy(self, obs): 
        if self.model_tag == 1:
            hid = self.fc1(obs)
            hid = self.fc2(hid)
            logits = self.fc3(hid)
        else:                
            hid1 = self.fc1(obs)
            hid2 = self.fc2(hid1)
            hid3 = self.fc3(hid2)
            logits = self.fc4(hid3) 
        return logits

In [4]:
class CriticModel(parl.Model):
    def __init__(self, model_tag): 
        self.model_tag = model_tag
        if self.model_tag == 1:
            hid_size = 100

            self.fc1 = layers.fc(size=hid_size, act='relu', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc2 = layers.fc(size=1, act=None)
        else:
            hid1_size = 100
            hid2_size = 100

            self.fc1 = layers.fc(size=hid1_size, act='relu', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc2 = layers.fc(size=hid2_size, act='relu', param_attr=fluid.initializer.Normal(loc=0.0, scale=0.1))
            self.fc3 = layers.fc(size=1, act=None)

    def value(self, obs, act):
        # 输入 state, action, 输出对应的Q(s,a) 
        if self.model_tag == 1:
            concat = layers.concat([obs, act], axis=1)
            hid = self.fc1(concat)
            Q = self.fc2(hid)
            Q = layers.squeeze(Q, axes=[1])
        else:
            hid1 = self.fc1(obs)
            concat = layers.concat([hid1, act], axis=1)
            hid2 = self.fc2(concat)
            Q = self.fc3(hid2)
            Q = layers.squeeze(Q, axes=[1])
        return Q

In [5]:
class QuadrotorModel(parl.Model):
    def __init__(self, act_dim, model_tag):
        self.model_tag = model_tag
        self.actor_model = ActorModel(act_dim, self.model_tag)
        self.critic_model = CriticModel(self.model_tag)

    def policy(self, obs):
        return self.actor_model.policy(obs)

    def value(self, obs, act):
        return self.critic_model.value(obs, act)

    def get_actor_params(self):
        return self.actor_model.parameters()

In [6]:
class QuadrotorAgent(parl.Agent):
    def __init__(self, algorithm, obs_dim, act_dim=4):
        assert isinstance(obs_dim, int)
        assert isinstance(act_dim, int)
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        super(QuadrotorAgent, self).__init__(algorithm)

        # 注意，在最开始的时候，先完全同步target_model和model的参数
        self.alg.sync_target(decay=0)

    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()

        with fluid.program_guard(self.pred_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            self.pred_act = self.alg.predict(obs)

        with fluid.program_guard(self.learn_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            act = layers.data(
                name='act', shape=[self.act_dim], dtype='float32')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            next_obs = layers.data(
                name='next_obs', shape=[self.obs_dim], dtype='float32')
            terminal = layers.data(name='terminal', shape=[], dtype='bool')
            _, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,
                                                 terminal)

    def predict(self, obs):
        obs = np.expand_dims(obs, axis=0)
        act = self.fluid_executor.run(
            self.pred_program, feed={'obs': obs},
            fetch_list=[self.pred_act])[0]
        # print(act)
# 在rlschool 0.3.0调整会更好，0.3.1无需调整，因为初始状态不是平稳而是随机        
#         # 调整输出到均值附近
#         act_mean = act.mean(axis=1)
#         act = act_mean + (act - act_mean) * 0.1

        return act

    def learn(self, obs, act, reward, next_obs, terminal):
        feed = {
            'obs': obs,
            'act': act,
            'reward': reward,
            'next_obs': next_obs,
            'terminal': terminal
        }
        critic_cost = self.fluid_executor.run(
            self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]
        self.alg.sync_target()

        return critic_cost

In [7]:
def run_episode(env, agent, rpm):
    obs = env.reset()
    total_reward, steps = 0, 0
    while True:
        steps += 1
        batch_obs = np.expand_dims(obs, axis=0)
        action = agent.predict(batch_obs.astype('float32'))
        action = np.squeeze(action)

        # 给输出动作增加探索扰动，输出限制在 [-1.0, 1.0] 范围内
        action = np.clip(np.random.normal(action, 1.0), -1.0, 1.0)
        # 动作映射到对应的 实际动作取值范围 内, action_mapping是从parl.utils那里import进来的函数
        action = action_mapping(action, env.action_space.low[0],
                                env.action_space.high[0])

        next_obs, reward, done, info = env.step(action)
        rpm.append(obs, action, REWARD_SCALE * reward, next_obs, done)

        if rpm.size() > MEMORY_WARMUP_SIZE:
            batch_obs, batch_action, batch_reward, batch_next_obs, \
                    batch_terminal = rpm.sample_batch(BATCH_SIZE)
            critic_cost = agent.learn(batch_obs, batch_action, batch_reward,
                                      batch_next_obs, batch_terminal)

        obs = next_obs
        total_reward += reward

        if done:
            break
    return total_reward, steps

# 评估 agent, 跑 5 个episode，总reward求平均
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        total_reward, steps = 0, 0
        while True:
            batch_obs = np.expand_dims(obs, axis=0)
            action = agent.predict(batch_obs.astype('float32'))
            action = np.squeeze(action)
            action = np.clip(action, -1.0, 1.0)  # the action should be in range [-1.0, 1.0]
            action = action_mapping(action, env.action_space.low[0], 
                                    env.action_space.high[0])

            next_obs, reward, done, info = env.step(action)

            obs = next_obs
            total_reward += reward
            steps += 1
            
            if render:
                env.render()
            if done:
                break
        eval_reward.append(total_reward)
    return np.mean(eval_reward)

In [8]:
def draw_results(score_list, title='', path='./fig_dir/'):
    # 画出训练过程reward历史曲线 
    plt.plot(score_list, color='green', label='train')
#     plt.plot(test_score_list, color='red', label='test')
    plt.title('Reward History {}'.format(title))
    plt.legend()
    if path != '' and not os.path.exists(path):
        os.makedirs(path)
    plt.savefig(path + title + '.png')
    plt.show()

In [9]:
def main(ACTOR_LR=0.0002, CRITIC_LR=0.001, model_tag=1, load_model=False, go_steps=1, f_best=''):
    # 创建飞行器环境
    env = make_env("Quadrotor", task="velocity_control") # Yellow arrow is the expected velocity vector; orange arrow is the real velocity vector.
    env.reset()
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]

    # 根据parl框架构建agent
    model = QuadrotorModel(act_dim, model_tag=model_tag)
    algorithm = DDPG(model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
    agent = QuadrotorAgent(algorithm, obs_dim, act_dim)

    # parl库也为DDPG算法内置了ReplayMemory，可直接从 parl.utils 引入使用
    rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)

    # 启动训练
    logger.info('Params: ACTOR_LR={}, CRITIC_LR={}, model_tag={}, Pid {}'.format(ACTOR_LR, CRITIC_LR, model_tag, os.getpid()))
    test_flag = 0
    total_steps = 0
    early_stop = 0
    last_reward = -1e9
    best_reward = -1e9
    # 与下文结合，使得学习率在训练到90%时保持初始学习率的1%水平继续训练
    actor_lr_scheduler = LinearDecayScheduler(ACTOR_LR, int(TRAIN_TOTAL_STEPS*0.9))
    critic_lr_scheduler = LinearDecayScheduler(CRITIC_LR, int(TRAIN_TOTAL_STEPS*0.9))
    score_list = []
    
    # load best results and continue training
    if load_model == True and os.path.exists(f_best):
        agent.restore(f_best + '.ckpt')
        rpm.load(f_best + '.rpm.npz')
        actor_lr_scheduler.step(step_num=go_steps)
        critic_lr_scheduler.step(step_num=go_steps)
        logger.info('load model success. Pid {}'.format(os.getpid()))


    while total_steps < TRAIN_TOTAL_STEPS:
        train_reward, steps = run_episode(env, agent, rpm)
        total_steps += steps
        #logger.info('Steps: {} Reward: {} Pid: {}'.format(total_steps, train_reward, os.getpid())) # 打印训练reward
        score_list.append(train_reward)

        # 可以在这里修改学习率, 可以用 parl.utils.scheduler 中的 LinearDecayScheduler 进行修改，也可以自行修改
        agent.alg.actor_lr = max(actor_lr_scheduler.step(step_num=steps), ACTOR_LR/100)
        agent.alg.critic_lr = max(critic_lr_scheduler.step(step_num=steps), CRITIC_LR/100)

        if total_steps // TEST_EVERY_STEPS >= test_flag: # 每隔一定step数，评估一次模型
            while total_steps // TEST_EVERY_STEPS >= test_flag:
                test_flag += 1
    
            evaluate_reward = evaluate(env, agent)
            logger.info('Steps {}, Test reward: {}, Pid {}'.format(total_steps, evaluate_reward, os.getpid())) # 打印评估的reward


            # 每评估一次，优于最优模型就保存一次模型和记忆回放，以训练的step数命名，DEBUG时则一直保存模型和图片  
            if evaluate_reward > best_reward or DEBUG:  # velocity control task reward will always be negative
                ckpt = 'model_dir/steps_{}_evaluate_reward_{}_ACTOR_LR_{}_CRITIC_LR_{}_model_tag_{}'.format(total_steps, int(evaluate_reward), ACTOR_LR, CRITIC_LR, model_tag)
                agent.save(ckpt + '.ckpt')
                rpm.save(ckpt + '.rpm')
                logger.info('Current actor_lr: {}  critic_lr: {}  Pid {} ckpt {}'.format(agent.alg.actor_lr, agent.alg.critic_lr, os.getpid(), ckpt))
                
                # 每次保存模型时画出当前reward趋势图            
                draw_results(score_list, '_'.join([str(ACTOR_LR), str(CRITIC_LR), str(model_tag), str(total_steps)]))
            
                # update best reward
                best_reward = evaluate_reward
                
            
            # early_stop, 超过20%训练进度且连续5次测评reward下降则提前终止
            if evaluate_reward > last_reward:
                early_stop = 0
            else:
                early_stop += 1
            last_reward = evaluate_reward
            if total_steps > TRAIN_TOTAL_STEPS / 5 and early_stop >= 5: 
                logger.info('No good results, stop training. Params: ACTOR_LR={}, CRITIC_LR={}, model_tag={}, Pid {}'.format(ACTOR_LR, CRITIC_LR, model_tag, os.getpid()))
                break
    # 训练结束，画出reward趋势图，并保存最终模型
    draw_results(score_list, '_'.join([str(ACTOR_LR), str(CRITIC_LR), str(model_tag), str(total_steps)]))
    ckpt = 'model_dir/steps_{}_evaluate_reward_{}_ACTOR_LR_{}_CRITIC_LR_{}_model_tag_{}'.format(total_steps, int(evaluate_reward), ACTOR_LR, CRITIC_LR, model_tag)
    agent.save(ckpt + '.ckpt')
    rpm.save(ckpt + '.rpm')
    logger.info('Current actor_lr: {}  critic_lr: {}  Pid {} ckpt {}'.format(agent.alg.actor_lr, agent.alg.critic_lr, os.getpid(), ckpt))
    

In [10]:
def parallel(num_cores=2, num_gpus=0):
    assert isinstance(num_cores, int)
    assert num_cores > 0
    from multiprocessing import Pool 
    
    # 多进程不能使用GPU
    os.environ['CUDA_VISIBLE_DEVICES'] = ''

    print('Parent process %s.' % os.getpid()) 

    p = Pool(num_cores)
    used_gpus = 0
    for ACTOR_LR in [0.0001, 0.0002, 0.0005, 0.001, 0.002]:  # 0.0002
        for CRITIC_LR in [0.001, 0.005, 0.01]:  # 0.001
            for model_tag in [1, 2]: # below logs are only for model_tag=2                   
                p.apply_async(main, args=(ACTOR_LR, CRITIC_LR, model_tag))

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')


def one(ACTOR_LR=0.0002, CRITIC_LR=0.001, model_tag=2, load_model=False, go_steps=1, f_best='', gpu=''):
    # 默认不使用GPU，在我的服务器出现以下错误：
    # ExternalError:  Cublas error, CUBLAS_STATUS_NOT_INITIALIZED  at (/paddle/paddle/fluid/platform/cuda_helper.h:32)
    os.environ['CUDA_VISIBLE_DEVICES'] = gpu
    main(ACTOR_LR, CRITIC_LR, model_tag, load_model, go_steps, f_best)


def find_best():
    best = -1000000
    best_f = ''
    for _, _, files in os.walk('./model_dir'):
        for f in files:
            reward = int(f[f.find('reward')+7:f.find('ACTOR')-1])
            if reward >= best:
                best = reward
                best_f = f
    return 'model_dir/' + best_f


def fine_tune(ACTOR_LR=0.0002, CRITIC_LR=0.005, episodes=10):
    for i in range(episodes):
        f_best = find_best()
        logger.info('Current best: {}, finetune it...'.format(f_best)) 
        #todo: extract go_steps from f_best
        one(ACTOR_LR=ACTOR_LR, CRITIC_LR=CRITIC_LR, model_tag=2, load_model=True, go_steps=1, f_best=f_best)

In [11]:
def test(total_steps, evaluate_reward, ACTOR_LR=0.0002, CRITIC_LR=0.001, model_tag=2, render=False):
    # 请设置ckpt为你训练中效果最好的一次评估保存的模型文件名称
    ckpt = 'model_dir/steps_{}_evaluate_reward_{}_ACTOR_LR_{}_CRITIC_LR_{}_model_tag_{}.ckpt'.format(total_steps, int(evaluate_reward), ACTOR_LR, CRITIC_LR, model_tag)
    env = make_env("Quadrotor", task="velocity_control")
    env.reset()
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]
    model = QuadrotorModel(act_dim, model_tag=model_tag)
    algorithm = DDPG(model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
    agent = QuadrotorAgent(algorithm, obs_dim, act_dim)
    # 加载模型
    if os.path.exists(ckpt):
        agent.restore(ckpt)
        logger.info('Test Model file {}'.format(ckpt))
    else:
        logger.info('No Model file {}'.format(ckpt))
        return -1
    evaluate_reward = evaluate(env, agent, render=render)
    logger.info('Evaluate reward: {}'.format(evaluate_reward)) # 打印评估的reward

In [12]:
def test_best():
    best = -1000000
    best_f = ''
    for _, _, files in os.walk('./model_dir'):
        for f in files:
            reward = int(f[f.find('reward')+7:f.find('ACTOR')-1])
            if reward >= best:
                best = reward
                best_f = f 
    res = best_f.split('_') 
    test(res[1], res[4], float(res[7]), float(res[10]), int(res[-1].split('.')[0]), render=False) 

In [13]:
ACTOR_LR = 0.0002   # Actor网络更新的 learning rate
CRITIC_LR = 0.005   # Critic网络更新的 learning rate

GAMMA = 0.99        # reward 的衰减因子，一般取 0.9 到 0.999 不等
TAU = 0.001         # target_model 跟 model 同步参数 的 软更新参数
MEMORY_SIZE = 2*1e6   # replay memory的大小，越大越占用内存
MEMORY_WARMUP_SIZE = 2*1e4      # replay_memory 里需要预存一些经验数据，再从里面sample一个batch的经验让agent去learn
REWARD_SCALE = 0.01       # reward 的缩放因子
BATCH_SIZE = 512          # 每次给agent learn的数据数量，从replay memory随机里sample一批数据出来
TRAIN_TOTAL_STEPS = 1e6   # 总训练步数
TEST_EVERY_STEPS = 1e4    # 每个N步评估一下算法效果，每次评估5个episode求平均reward

In [None]:
DEBUG = False
%matplotlib inline
parallel(15)

Parent process 1127.
Waiting for all subprocesses done...
[32m[06-24 14:58:45 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.0001, CRITIC_LR=0.001, model_tag=2, Pid 2091
[32m[06-24 14:58:47 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.002, CRITIC_LR=0.001, model_tag=2, Pid 2104
[32m[06-24 14:58:48 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.0005, CRITIC_LR=0.01, model_tag=2, Pid 2100
[32m[06-24 14:58:48 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.0002, CRITIC_LR=0.01, model_tag=2, Pid 2097
[32m[06-24 14:58:48 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.0001, CRITIC_LR=0.005, model_tag=2, Pid 2092
[32m[06-24 14:58:49 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.0005, CRITIC_LR=0.005, model_tag=2, Pid 2099
[32m[06-24 14:58:49 MainThread @<ipython-input-9-8259d0fb9623>:17][0m Params: ACTOR_LR=0.002, CRITIC_LR=0.01, model_tag=2, Pid 