In [1]:
import gym
import tensorflow as tf
import numpy as np
import random
from collections import deque

In [2]:
GAMMA = 0.95 # discount factor
LEARNING_RATE=0.01
class Policy_Gradient():
    def __init__(self, env):
        # init some parameters
        self.time_step = 0
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
        self.ep_obs, self.ep_as, self.ep_rs = [], [], []
        self.create_softmax_network()

        # Init session
        self.session = tf.InteractiveSession()
        self.session.run(tf.global_variables_initializer())

    def create_softmax_network(self):
        #create input layer
        self.state_input = tf.placeholder("float", [None, self.state_dim])
        self.tf_acts = tf.placeholder(tf.int32, [None, ], name="actions_num")
        self.tf_vt = tf.placeholder(tf.float32, [None, ], name="actions_value")
        w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
        h1 = tf.layers.dense(inputs=self.state_input, units=16, activation=tf.nn.relu,kernel_initializer=w_initializer,bias_initializer=b_initializer)
        h2 = tf.layers.dense(inputs=h1, units=16, activation=tf.nn.relu,kernel_initializer=w_initializer,bias_initializer=b_initializer)
        self.softmax_input = tf.layers.dense(inputs=h2, units=self.action_dim,kernel_initializer=w_initializer,bias_initializer=b_initializer)
        #create output layer
        #softmax output
        self.all_act_prob = tf.nn.softmax(self.softmax_input, name='act_prob')
        self.neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.softmax_input,
                                                                      labels=self.tf_acts)
        self.loss = tf.reduce_mean(self.neg_log_prob * self.tf_vt)  # reward guided loss

        self.train_op = tf.train.AdamOptimizer(LEARNING_RATE).minimize(self.loss)

    def choose_action(self, observation):
        prob_weights = self.session.run(self.all_act_prob, feed_dict={self.state_input: observation[np.newaxis, :]})
        action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel())  # select action w.r.t the actions prob
        return action

    def store_transition(self, s, a, r):
        self.ep_obs.append(s)
        self.ep_as.append(a)
        self.ep_rs.append(r)

    def learn(self):
        #蒙特卡罗策略梯度reinforce
        discounted_ep_rs = np.zeros_like(self.ep_rs)
        running_add = 0
        for t in reversed(range(0, len(self.ep_rs))):
            running_add = running_add * GAMMA + self.ep_rs[t]
            discounted_ep_rs[t] = running_add

        discounted_ep_rs -= np.mean(discounted_ep_rs)
        discounted_ep_rs /= np.std(discounted_ep_rs)

        # train on episode
        self.session.run(self.train_op, feed_dict={
             self.state_input: np.vstack(self.ep_obs),
             self.tf_acts: np.array(self.ep_as),
             self.tf_vt: discounted_ep_rs,
        })

        self.ep_obs, self.ep_as, self.ep_rs = [], [], []    # empty episode data

In [3]:
#define Cartpole env and Hayper params
ENV_NAME = 'CartPole-v0'
EPISODE = 3000 # Episode limitation
STEP = 3000 # Step limitation in an episode
TEST = 10 # The number of experiment test every 100 episode

In [4]:
def main():
    env = gym.make(ENV_NAME)
    agent = Policy_Gradient(env)

    for episode in range(EPISODE):
        state = env.reset()
        for step in range(STEP):
            action = agent.choose_action(state) # 根据概率进行动作的选择
            next_state,reward,done,_ = env.step(action)
            agent.store_transition(state, action, reward) #存储阶段T中的所有时间段
            state = next_state
            #基于策略的就是要在每一个时间段之后进行训练，这么考虑所谓按策略就是按照成功或是失败，只要最后的结果不要中间的过程。
            #不管中间是好是坏只要最后成功了就是希望最高回报，失败就是低回报
            if done:
                agent.learn()
                break

    # Test every 100 episodes
        if episode % 100 == 0:
            total_reward = 0
            for i in range(TEST):
                state = env.reset()
                for j in range(STEP):
                    env.render()
                    action = agent.choose_action(state) # direct action for test
                    state,reward,done,_ = env.step(action)
                    total_reward += reward
                    if done:
                        break
            ave_reward = total_reward/TEST
            print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)

if __name__ == '__main__':
    main()

Instructions for updating:
Use keras.layers.dense instead.
Instructions for updating:
Colocations handled automatically by placer.
episode:  0 Evaluation Average Reward: 25.9
episode:  100 Evaluation Average Reward: 200.0


KeyboardInterrupt: 