In [3]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import gym
import time
import random

from IPython import display

In [2]:
class ReplayBuffer:
    def __init__(self, max_length=10000):
        self._max_length = max_length
        self._buffer = []
        self._index = 0
    
    def append(self, state, action, reward, next_state, done):
        experience = (state, action, reward, next_state, done)
        if len(self._buffer) < self._max_length:
            self._buffer.append(experience)
        else:
            if self._index >= self._max_length:
                self._index = 0
            self._buffer[self._index] = experience
            self._index += 1
    
    def sample(self, batch_size):
        return random.sample(self._buffer, batch_size)

In [None]:
class MLP(tf.keras.Sequential):
    def __init__(self, hidden_dims):
        for dim in hidden_dims:            
            self.add(tf.keras.layers.Dense(dim, activation='relu'))

In [None]:
class CriticNetwork(tf.keras.Model):
    def __init__(self, hidden_dims):
        super().__init__()
        
        # Add some dense layers with ReLU activations
        self.mlp = MLP(hidden_dims)

        # Add a layer for the output value
        self.value = tf.keras.layers.Dense(1)
    
    def call(self, state, action):
        x = tf.concat((state, action), 1)
        x = self.mlp(x)
        return self.value(x)

In [None]:
class ActorNetwork(tf.keras.Model):
    def __init__(self, hidden_units, action_space):
        super().__init__()
        
        # Add some dense layers with ReLU activations
        self.mlp = MLP(hidden_dims)

        # Add a layer for the output action
        self.action = tf.keras.layers.Dense(action_space.shape, activation='tanh')
        
        self.scale = (action_space.high - action_space.low) / 2
        self.shift = (action_space.high + action_space.low) / 2
    
    def call(self, x):
        x = self.mlp(x)
        x = self.action(x)
        x *= self.scale
        x += self.shift
        return x

In [None]:
class DDPG:
    def __init__(self, env, hidden_dims, discount=0.99, tau=1e-2):
        self.critic = CriticNetwork(hidden_dims)
        self.actor = ActorNetwork(hidden_dims, env.action_space)
        self.discount = discount
        self.tau = tau
        
        self.target_critic = CriticNetwork(hidden_dims)
        self.target_actor = ActorNetwork(hidden_dims, env.action_space)
        
        # Create optimizers
        self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
        self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

        # Parameters for minibatching
        self.batch_size = 64
    
    def action(self, state):
        return self.actor(state).numpy()[0]
    
    def train(self, experience):
        state, action, reward, next_state, done = experience

        # Critic update
        with tf.GradientTape() as tape:
            target = reward + self.discount * tf.stop_gradient(self.critic(next_state, self.actor(next_state))) * (1 - done)
            td_error = self.critic(state, action) - target
            loss = tf.reduce_mean(td_error**2, 0)
            
        # Get gradients and apply them to the model's parameters
        grads = tape.gradient(loss, self.critic.trainable_weights)
        self.critic_optimizer.apply_gradients(zip(grads, self.critic.trainable_weights))
        
        # Actor update
        with tf.GradientTape() as tape:
            action = self.actor(state)
            value = self.critic(state, action)
            loss = -value
            
        # Get gradients and apply them to the model's parameters
        grads = tape.gradient(loss, self.actor.trainable_weights)
        self.actor_optimizer.apply_gradients(zip(grads, self.actor.trainable_weights))
        
        self.update_targets()
    
    def update_targets(self):
        for var, target_var in zip(self.critic.trainable_variables, self.target_critic.trainable_variables):
            target_var.assign(var * self.tau + target_var * (1 - self.tau))
        
        for var, target_var in zip(self.actor.trainable_variables, self.target_actor.trainable_variables):
            target_var.assign(var * self.tau + target_var * (1 - self.tau))

In [5]:
env = gym.make("Pendulum-v1")

In [8]:
env.action_space.low

array([-2.], dtype=float32)