Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
169 lines (116 sloc) 6.13 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 29 19:49:01 2018
@author: aidanrocke
"""
import tensorflow as tf
import numpy as np
class policy_gradients:
def __init__(self,diameter,seed,horizon,num_paths,B):
self.diameter = diameter ## diameter makes sense in the inverted pendulum formalism
self.horizon = horizon ## length of the policy gradient rollouts
self.num_paths = num_paths ## number of rollouts
self.pv = tf.placeholder(tf.float32, [None, 2]) ## angular momenta
self.theta = tf.placeholder(tf.float32,[None,1])
self.seed = seed ## the random seed
self.action_bound, self.variance_bound = np.pi, np.e ## magic constraints
## define height:
self.height = self.diameter*tf.cos(self.theta)
## define output of policy network:
self.mu, self.log_sigma = self.controller()
## define what is necessary for the loss:
self.action = self.sample_action()
self.reinforce_loss = self.reinforce_loss()
self.value_estimate = self.value_estimator()
self.baseline = self.baseline() if B else tf.reduce_mean(self.constant_baseline())
self.average_loss = -1.0*tf.reduce_mean(tf.subtract(self.reinforce_loss,self.baseline)) + \
0.5*tf.reduce_mean(tf.square(self.value_estimate-self.height))
## collect trainable variables:
#self.TV = tf.get_collection(key = tf.GraphKeys.TRAINABLE_VARIABLES)
self.TV = tf.trainable_variables()
## define training operations:
self.optimizer = tf.train.AdagradOptimizer(0.01)
self.accum_vars = [tf.Variable(tf.zeros_like(tv.initialized_value()), trainable=False) for tv in self.TV]
self.zero_ops = [tv.assign(tf.zeros_like(tv)) for tv in self.accum_vars]
self.gvs = self.optimizer.compute_gradients(self.average_loss, self.TV)
self.accum_ops = [self.accum_vars[i].assign_add(gv[0]) for i, gv in enumerate(self.gvs)]
self.train_step = self.optimizer.apply_gradients([(self.accum_vars[i], gv[1]) for i, gv in enumerate(self.gvs)])
#self.train_step = self.optimizer.apply_gradients([(self.accum_vars[i].assign(self.accum_vars[i]), gv[1])
# for i, gv in enumerate(self.gvs)])
def init_weights(self,shape,var_name):
"""
Xavier initialisation of neural networks
"""
initializer = tf.contrib.layers.xavier_initializer(seed=self.seed)
return tf.Variable(initializer(shape),name = var_name)
def two_layer_net(self, X, w_h, w_h2, w_o,bias_1, bias_2):
"""
A generic method for creating two-layer networks
input: weights
output: neural network
"""
h = tf.nn.elu(tf.add(tf.matmul(X, w_h),bias_1))
h2 = tf.nn.elu(tf.add(tf.matmul(h, w_h2),bias_2))
return tf.matmul(h2, w_o)
def controller(self):
"""
The policy gradient model is a neural network that
parametrises a conditional Gaussian.
input: state(i.e. angular momenta)
output: action to be taken i.e. appropriate horizontal acceleration
"""
with tf.variable_scope("policy_net"):
tf.set_random_seed(self.seed)
W_h = self.init_weights([2,100],"W_h")
W_h2 = self.init_weights([100,50],"W_h2")
W_o = self.init_weights([50,10],"W_o")
# define bias terms:
bias_1 = self.init_weights([100],"bias_1")
bias_2 = self.init_weights([50],"bias_2")
eta_net = self.two_layer_net(self.pv,W_h, W_h2, W_o,bias_1,bias_2)
W_mu = self.init_weights([10,1],"W_mu")
W_sigma = self.init_weights([10,1],"W_sigma")
self.mu = tf.multiply(tf.nn.tanh(tf.matmul(eta_net,W_mu)),self.action_bound)
self.log_sigma = tf.multiply(tf.nn.tanh(tf.matmul(eta_net,W_sigma)),self.variance_bound)
return self.mu, self.log_sigma
def value_estimator(self):
"""
This value function is used to approximate the expected value of each state.
input: state
output: value estimate
"""
with tf.variable_scope("value_estimate"):
tf.set_random_seed(self.seed)
w_h = self.init_weights([2,10],"w_h")
w_o = self.init_weights([10,1],"w_o")
### bias terms:
bias_1 = self.init_weights([10],"bias_1")
bias_2 = self.init_weights([1],"bias_2")
h = tf.nn.elu(tf.add(tf.matmul(self.pv, w_h),bias_1))
return tf.add(tf.matmul(h, w_o),bias_2)
def sample_action(self):
"""
Samples an action from the stochastic controller which happens
to be a conditional Gaussian.
"""
dist = tf.contrib.distributions.Normal(self.mu,tf.exp(self.log_sigma))
return dist.sample()
def reinforce_loss(self):
"""
The REINFORCE loss without subtracting a baseline.
"""
dist = tf.contrib.distributions.Normal(self.mu, tf.exp(self.log_sigma))
return dist.log_prob(self.mu)*self.height
def constant_baseline(self):
"""
Constant baseline computed using the average reward.
"""
dist = tf.contrib.distributions.Normal(self.mu, tf.exp(self.log_sigma))
return dist.log_prob(self.mu)*tf.reduce_mean(self.height)
def baseline(self):
"""
A state-dependent baseline calculated using the value estimator V(s).
"""
dist = tf.contrib.distributions.Normal(self.mu, tf.exp(self.log_sigma))
return dist.log_prob(self.mu)*self.value_estimate