In [None]:
import gym
import random
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from collections import deque
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential, load_model, Model


## Build Model
<!-- <h3>Base Model</h3> -->
- Epsilon = 1
- Epsilon_min = 0.01
- Epsilon_decay = 0.99
- Discount_rate = 0.8
- Train_start = 1000
- Batch_size = 64

In [None]:
class DQN:
    def __init__(self,InputShape=4, 
                 Epsilon = 0.1, 
                 Epsilon_Decay=0.98,
                 NActions=2,
                 UpdatePerEp=1,
                 Dense=64,
                 BatchSize=32,
                 learning_rate=0.01,
                 weights = None):
        
        # Preset Parameters. #
        self.Epsilon = Epsilon
        self.InputShape = InputShape
        self.ReplayMemorySize = 10000
        self.MinReplayMemory = 1000

        # Hyperparameters. # (Tuneable)
        self.NActions = NActions
        self.NoUpdate = UpdatePerEp
        self.Dense = Dense
        self.BatchSize = BatchSize
        self.Memory = deque(maxlen=self.ReplayMemorySize)
        self.Epsilon_Decay = Epsilon_Decay
        
        # Build Target And Main Model

        self.Main = self.CreateModel("Main")
        self.Target = self.CreateModel("Target")

        # Load Weights if there are
        if weights != None:
            self.Main.load_weights(weights)
        self.Target.set_weights(self.Main.get_weights())
        self.Optimiser = Adam()

        # Target network update counter. #
        self.TargetUpdateCounter = 0

    def CreateModel(self, Type):
        inputs = Input(shape = (self.InputShape), name = 'Input')
        x = Dense(self.Dense, activation = 'relu', name = '1stHiddenLayer')(inputs)
        x = Dense(self.Dense, activation = 'relu', name = '2ndHiddenLayer')(x)
        outputs = Dense(self.NActions, activation = 'linear', name = 'Output')(x)
        
        NN = Model(inputs, outputs, name = f'{Type}')
        NN.summary()

        return NN

    def UpdateMemory(self, info):
        self.Memory.append(info)

    def pls_save_weights(self, name):
        self.Main.save_weights(f"DQN_weights/Main{name}.h5")
        self.Target.save_weights(f"DQN_weights/Target{name}.h5")

    def PendulumActionConverter(self, A):
        ActualTorque = (A / self.NActions - 0.5) * 4
        return ActualTorque

    def PendulumInverseActionConverter(self, A):
        ActualA = round((A + 2) * (self.NActions - 1) / 4)
        return  (ActualA)

    def Get_Action(self, state, env):
        if np.random.rand() < self.Epsilon:
            action = env.action_space.sample()
            a = self.PendulumInverseActionConverter(action[0])
            return action, a
        else:
            step = np.argmax(state)
            a = self.PendulumActionConverter(step)
            action = np.array([a])
            a = self.PendulumInverseActionConverter(a)
            return action, a

    def decay(self):
        self.Epsilon = self.Epsilon*self.Epsilon_Decay
    
    def Train(self, EndOfEpisode):
        if len(self.Memory) < self.MinReplayMemory:
            return
        
        TrainingData = random.sample(self.Memory, self.BatchSize)

        ListOfS = np.array([element[0] for element in TrainingData])

        ListOfQ = np.array(self.Main(ListOfS))

        ListOfSNext = np.array([element[3] for element in TrainingData])
        ListOfQNext = self.Target(ListOfSNext)

        X = []
        Y = []
        for index, (S, A, R, SNext, Done) in enumerate(TrainingData):
            if not Done:
                MaxQNext = np.max(ListOfQNext[index])
                QNext = R + self.Epsilon * MaxQNext
            else:
                QNext = R
            Q = ListOfQ[index]
            Q[A] = QNext

            X.append(S)
            Y.append(Q)

        self.GTfit(X, Y)

        # Update target network every episode. #
        if EndOfEpisode:
            self.TargetUpdateCounter += 1

        if self.TargetUpdateCounter >= self.NoUpdate:
            self.Target.set_weights(self.Main.get_weights())
            self.TargetUpdateCounter = 0

    @tf.function
    def GTfit(self, X, Y):
        with tf.GradientTape() as tape:
            Predictions = self.Main(tf.convert_to_tensor(X), training=True)
            Loss = tf.math.reduce_mean(
                tf.math.square(tf.convert_to_tensor(Y) - Predictions)
            )
        Grad = tape.gradient(Loss, self.Main.trainable_variables)
        self.Optimiser.apply_gradients(zip(Grad, self.Main.trainable_variables))


In [None]:
EnvName = 'Pendulum-v0'
InputShape = 3
def Main(episodes = 300, 
         filename = "base", 
         InputShape=3, 
         NActions=40, 
         Dense=64, 
         Epsilon=0.1, 
         Epsilon_Decay=0.98, 
         UpdatePerEp=3):
    # Instantiate DQN
    dqn = DQN(InputShape=InputShape, 
              NActions=NActions, 
              Dense=Dense, 
              Epsilon=Epsilon, 
              Epsilon_Decay=Epsilon_Decay, 
              UpdatePerEp=UpdatePerEp)
    MovingAverage = []
    ListOfScores = []
    ShowEvery = 25
    # Store Scores and Moving Average
    score = 0
    for i in range(episodes):
        print("episode", i)
        Done = False
        counter = 0
        env = gym.make(f'{EnvName}')
        S = env.reset()
        score = 0  # Initialize score for each episode

        while not Done: 
            Q = dqn.Main(S.reshape(-1, S.shape[0]))
            action, A = dqn.Get_Action(Q, env)

            if not i % ShowEvery and len(dqn.Memory) >= dqn.MinReplayMemory:
                env.render()

            SNext, R, Done, Info = env.step(action)
            
            dqn.UpdateMemory((S, A, R, SNext, Done))
            dqn.Train(Done)
            score += R
            S = SNext
            if Done:
                # dqn.decay()
                ListOfScores.append(score)
                if score >= max(ListOfScores):
                    dqn.pls_save_weights(filename)
                MovingAverage.append(np.mean(ListOfScores[-100:]))
                print(f'Finished! Return: {score}', print(f'DQN Mem: {len(dqn.Memory)} DQN Ep : {dqn.Epsilon}'))
                env.close()
    return MovingAverage, ListOfScores
episode_number = np.arange(100)

## Base_Model

In [None]:
EPISODES = 300
episode_number = np.arange(EPISODES)

In [8]:
Base_MA_Scores, Base_Scores = Main(episodes=EPISODES)

In [None]:
plt.figure(figsize=(10, 8))

plt.subplot(2, 1, 1)
sns.lineplot(x=episode_number, y=Base_Scores, label="Score")
plt.title('Average Reward Per Episode')

plt.subplot(2, 1, 2)
sns.lineplot(x=episode_number, y=Base_MA_Scores, label="Average Score")
plt.title('Moving Average Score')

plt.tight_layout()
plt.show()

## Tune Model

In [None]:
number_of_actions = [30,40,50]
Dense_tune = [40, 64, 88]
Epsilon = [0.1, 0.2, 0.3]
Epsilon_Decay = [0.98, 0.97, 0.96, 0.95]
UpdateNo = [1,2,3]

### Number of Actions/Binning

In [None]:
NAction_average_score = [] # Array of Array of Moving Average Scores for each Bins & Episode
NAaction_Scores = [] # Array of Array of Scores for each Bins & Episode

for e in number_of_actions:
    bins_scores_ma, bins_res_scores, = \
        Main(NActions = e, filename=f"NAction{e}")
    NAction_average_score.append(bins_scores_ma)
    NAaction_Scores.append(bins_res_scores)

In [None]:
plt.figure(figsize=(10, 8))

# Average Score
plt.subplot(2,1,1)
for i in range(len(number_of_actions)):
    sns.lineplot(x=episode_number, y=NAaction_Scores[i], label=number_of_actions[i])
plt.title('Score')

# Moving Average Score
plt.subplot(2, 1, 2)
for i in range (len(number_of_actions)):
    sns.lineplot(x=episode_number, y=NAction_average_score[i], label=number_of_actions[i])
plt.title('Moving Average Score')

plt.tight_layout()
plt.show()

In [None]:
STOP

## Number of Dense

In [None]:
Dense_average_score = [] # Array of Array of Moving Average Scores for each Bins & Episode
Dense_Scores = [] # Array of Array of Scores for each Bins & Episode

for e in Dense_tune:
    Dense_scores_ma, Dense_res_scores, = \
        Main(NAaction = 40, Dense=e, filename=f"Dense{e}")
    Dense_average_score.append(Dense_scores_ma)
    Dense_Scores.append(Dense_res_scores)

In [None]:
plt.figure(figsize=(10, 8))

# Average Score
plt.subplot(2,1,1)
for i in range(len(Dense_tune)):
    sns.lineplot(x=episode_number, y=Dense_average_score[i], label=Dense_tune[i])
plt.title('Score')

# Moving Average Score
plt.subplot(2, 1, 2)
for i in range (len(Dense_tune)):
    sns.lineplot(x=episode_number, y=Dense_Scores[i], label=Dense_tune[i])
plt.title('Moving Average Score')

plt.tight_layout()
plt.show()

In [None]:
STOP

## UpdateNo