# 1. Import Libraries

In [8]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import random
from collections import deque
import os

In [9]:
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

# 2. Exploratory Data Analysis

## Background Information of The Environment

### Description
The inverted pendulum swingup problem is based on the classic problem in control theory. 

The system consists of a pendulum attached at one end to a fixed point, and the other end being free.

The pendulum starts in a random position and the goal is to apply torque on the free end to swing it into an upright position, with its center of gravity right above the fixed point.

### Actions
The agent can apply a torque of $[-2.0, 2.0]$ to the pendulum. Torque is measured in $Nm$.

The direction of a positive torque acts in the counter-clockwise direction while the direction of a negative torque acts in the clockwise direction.

This is a set of continuous action space (infinite possible values of torque within fixed range that can be applied on the pendulum).

### Observations
There are 3 observations in the environment:

1. x: The x-coordinate of the pendulum's free-end, measured in $m$. Derived by $x = \cos(\theta)$ with a range of $[-1.0, 1.0]$.
2. y: The y-coordinate of the pendulum's free-end, measured in $m$. Derived by $y = \sin(\angle)$ with a range of $[-1.0, 1.0]$.
3. Angular Velocity: The angular velocity of the pendulum's free-end. The range is $[-8.0, 8.0]$.

### Rewards
After a torque is applied to the pendulum, the agent receives a reward for that action. The reward is calculated as follows:

$r = -(\theta^2 + 0.1 * \theta \_dt^2 + 0.001 * torque^2)$, where $\theta$ is the angle of the pendulum normalized between $[-\pi, \pi]$ (with 0 being in the upright position), $\theta \_dt$ is the angular velocity of the pendulum, and $torque$ is the torque applied to the pendulum.

The minimum reward that can be obtained is $-(\pi^2 + 0.1 * 82 + 0.001 * 22) = -16.2736044$, while the maximum reward is zero (pendulum is upright with zero velocity and no torque applied).

Hence $r$ is between $[-16.2736044, 0]$

### Starting State

The pendulum starts in a random angle in $[-\pi, \pi]$ with a random angular velocity in $[-1, 1]$.

### Episode Truncation

The episode is truncated after 200 steps.

# 3. Model Training

In [10]:
class DQN:
    def __init__(self,
                 InputShape = 4,
                 NActions = 2,
                 Gamma = 1,
                 ReplayMemorySize = 10000,
                 MinReplayMemory = 1000,
                 UpdateTargetEveryThisEpisodes = 1,
                 IntermediateSize = 64,
                 BatchSize = 32):
        
        # Hyperparameters. #
        
        self.InputShape = InputShape
        self.NActions = NActions
        self.Gamma = Gamma
        self.ReplayMemorySize = ReplayMemorySize
        self.MinReplayMemory = MinReplayMemory
        self.UpdateTargetEveryThisEpisodes = UpdateTargetEveryThisEpisodes
        self.IntermediateSize = IntermediateSize
        self.BatchSize = BatchSize
        
        # Main model. #
        
        self.Main = self.CreateModel('Main')
        self.Optimiser = Adam()
        
        # Target model. #
        
        self.Target = self.CreateModel('Target')
        self.Target.set_weights(self.Main.get_weights())
        
        # Replay memory. #
        
        self.ReplayMemory = deque(maxlen = ReplayMemorySize)
        
        # Target network update counter. #
        
        self.TargetUpdateCounter = 0
    
    def CreateModel(self, Type):
        inputs = Input(shape = (self.InputShape,), name = 'Input')
        x = Dense(self.IntermediateSize, activation = 'relu', name = '1stHiddenLayer')(inputs)
        x = Dense(self.IntermediateSize, activation = 'relu', name = '2ndHiddenLayer')(x)
        outputs = Dense(self.NActions, activation = 'linear', name = 'Output')(x)
        
        NN = Model(inputs, outputs, name = f'{Type}')
        NN.summary()
        
        return NN
    
    def UpdateReplayMemory(self, Information): # Information = (S, A, R, SNext, Done)
        self.ReplayMemory.append(Information)

    def Train(self, EndOfEpisode):
        
        # Only train if replay memory has enough data. #
        
        if len(self.ReplayMemory) < self.MinReplayMemory:
            print(f'DID NOT TRAIN..., replay memory = {len(self.ReplayMemory)}')
            return
        
        # Get batch of data for training. #
        
        TrainingData = random.sample(self.ReplayMemory, self.BatchSize)
        
        # Get states from training data, then get corresponding Q values. #
        
        ListOfS = np.array([element[0] for element in TrainingData])
        ListOfQ = np.array(self.Main(ListOfS))
        
        # Get future states from training data, then get corresponding Q values. #
        
        ListOfSNext = np.array([element[3] for element in TrainingData])
        ListOfQNext = self.Target(ListOfSNext)
        
        # Build actual training data for neural network. #
        
        X = []
        Y = []
        for index, (S, A, R, SNext, Done) in enumerate(TrainingData):
            if not Done:
                MaxQNext = np.max(ListOfQNext[index])
                QNext = R + self.Gamma * MaxQNext
            else:
                QNext = R
            Q = ListOfQ[index]
            Q[A] = QNext
        
            X.append(S)
            Y.append(Q)
        
        # Train model using tf.GradientTape(), defined below.
    
        self.GTfit(X, Y)
                
        # Update target network every episode. #
        
        if EndOfEpisode:
            self.TargetUpdateCounter += 1
        
        # Update target if counter is full. #
        
        if self.TargetUpdateCounter >= self.UpdateTargetEveryThisEpisodes:
            self.Target.set_weights(self.Main.get_weights())
            self.TargetUpdateCounter = 0
    
    # This is the tf.GradientTape() which significantly speeds up training of neural networks.
    
    @tf.function
    def GTfit(self, X, Y):
        
        # Train the neural network with this batch of data. #
        
        with tf.GradientTape() as tape:
            Predictions = self.Main(tf.convert_to_tensor(X), training = True)
            Loss = tf.math.reduce_mean(tf.math.square(tf.convert_to_tensor(Y) - Predictions))
        Grad = tape.gradient(Loss, self.Main.trainable_variables)
        self.Optimiser.apply_gradients(zip(Grad, self.Main.trainable_variables))

In [11]:
EnvName = 'Pendulum-v0'
IntermediateSize = 64
Epsilon = 0.1
ShowEvery = 10
InputShape = 3
NActions = 40

In [12]:
def PendulumActionConverter(A, NActions=NActions):
    ActualTorque = (A / NActions - 0.5) * 4
    return ActualTorque

def PendulumInverseActionConverter(A, NActions=NActions):
    ActualA = round((A + 2) * (NActions - 1) / 4)
    return(ActualA)

def OneEpisode():
    env = gym.make(f'{EnvName}')
    S = env.reset()
    ListOfRewards = []
    Done = False
    while not Done:
        Q = DQN.Main(S.reshape(-1, S.shape[0]))
        if np.random.rand() < Epsilon:
            AStep = env.action_space.sample()
            A = PendulumInverseActionConverter(AStep[0])
        else:
            A = np.argmax(Q)
            A = PendulumActionConverter(A)
            AStep = np.array([A])
            A = PendulumInverseActionConverter(A)
        if not _ % ShowEvery and len(DQN.ReplayMemory) >= DQN.MinReplayMemory:
            env.render()
        SNext, R, Done, Info = env.step(AStep)
        DQN.UpdateReplayMemory((S, A, R, SNext, Done))
        DQN.Train(Done)
        ListOfRewards.append(R)
        if Done:
            print(f'Finished! Return: {np.sum(ListOfRewards)}')
            returns.append(np.sum(ListOfRewards))
            env.close()
            return
        S = SNext

def ReturnPlot():
    plt.figure(figsize=(10, 5))

    plt.plot(returns)
    plt.xlabel("Episodes")
    plt.ylabel("Returns")
    plt.legend()

    if not os.path.exists('plots'):
        os.makedirs('plots')
    plt.savefig("plots/returns.png")
    plt.show()

In [13]:
import time
STARTTIME = time.time()

returns = []
DQN = DQN(InputShape = InputShape, NActions = NActions)
for _ in range(150):
    print(f'Episode {_}')
    OneEpisode()

print(f'Total time taken: {time.time() - STARTTIME} seconds ...')

ReturnPlot()

Episode 0
DID NOT TRAIN..., replay memory = 1
DID NOT TRAIN..., replay memory = 2
DID NOT TRAIN..., replay memory = 3
DID NOT TRAIN..., replay memory = 4
DID NOT TRAIN..., replay memory = 5
DID NOT TRAIN..., replay memory = 6
DID NOT TRAIN..., replay memory = 7
DID NOT TRAIN..., replay memory = 8
DID NOT TRAIN..., replay memory = 9
DID NOT TRAIN..., replay memory = 10
DID NOT TRAIN..., replay memory = 11
DID NOT TRAIN..., replay memory = 12
DID NOT TRAIN..., replay memory = 13
DID NOT TRAIN..., replay memory = 14
DID NOT TRAIN..., replay memory = 15
DID NOT TRAIN..., replay memory = 16
DID NOT TRAIN..., replay memory = 17
DID NOT TRAIN..., replay memory = 18
DID NOT TRAIN..., replay memory = 19
DID NOT TRAIN..., replay memory = 20
DID NOT TRAIN..., replay memory = 21
DID NOT TRAIN..., replay memory = 22
DID NOT TRAIN..., replay memory = 23
DID NOT TRAIN..., replay memory = 24
DID NOT TRAIN..., replay memory = 25
DID NOT TRAIN..., replay memory = 26
DID NOT TRAIN..., replay memory = 27


KeyboardInterrupt: 