In [None]:
from google.colab import drive

drive.mount('/content/drive')#, force_remount=True)

In [None]:
%cd /content/drive/My Drive/Github/RLProject/

In [None]:
#Tensorflow 1.x required for stable baselines
%tensorflow_version 1.x
!apt-get install ffmpeg freeglut3-dev xvfb;  # For visualization
!pip install stable-baselines[mpi]==2.10.2;
#OpenAI Gym package that includes LunarLander-V2
!pip install box2d-py;

In [None]:
import stable_baselines
stable_baselines.__version__

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time
import gym 
import os, sys

from stable_baselines import A2C,PPO2,DQN

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import model_from_json

import pandas as pd
from dataclasses import make_dataclass

In [None]:
class Evaluator():
  """
  Evaluator class

  Attributes

  ----------
  homemadeDQN: <class 'tensorflow.python.keras.engine.sequential.Sequential'>
    Imported trained Keras DQN model

  A2C: <class 'stable_baselines.a2c.a2c.A2C'>
    Imported trained Stable Baselines (SB) A2C model

  PPO2: <class 'stable_baselines.ppo2.ppo2.PPO2'>
    Imported trained Stable Baselines (SB) PPO2 model

  DQN: <class 'stable_baselines.deepq.dqn.DQN'>
    Imported trained Stable Baselines (SB) DQN model

  env: <class 'gym.wrappers.time_limit.TimeLimit'>
    LunarLander-V2 OpenAI Gym environment

  testEpisodes: int
    Number of episodes to test each model for

  Methods

  -------

  loadModels()
    Loads the trained models from the model paths and asigns them to a class
    variable.

  testModels(model)
    Calculates mean reward and mean prediction time over a number of episodes

  comparePerformance(testEpisodes)
    Runs testModels() for each model. Creates results dataframe.
  """
  def __init__(self):
    ####perhaps laod in paths instead
    self.homemadeDQN = None
    self.A2C = None
    self.PPO2 = None
    self.DQN = None
    self.testEpisodes = 10
    # self.testEpisodes = testEpisodes
    self.env = gym.make('LunarLander-v2')
    self.env.seed(42)

  def loadModels(self):
    """
    ----------

    SB models are loaded with a load function unique to each model 
    class.
    The Keras model is saved as a .json with weights saved separately. The model
    is first loaded then weights are set. The model.load Tensorflow function 
    is for tf.2 which is not usable with SB models
    """
    #Set to your path if there are issues
    path = str(os.getcwd())
    modelPath = path + '/SavedModels/'
    self.A2C = A2C.load(modelPath + 'finalA2CModel')
    self.PPO2 = PPO2.load(modelPath + 'finalPPO2Model')
    self.DQN = DQN.load(modelPath + 'finalDQNModel')
    # load DQN from json
    jsonFile = open((modelPath + 'model.json'), 'r')
    DQNjson = jsonFile.read()
    jsonFile.close()
    loadedDQN = model_from_json(DQNjson)
    # load weights into new model
    loadedDQN.load_weights((modelPath + "DQNWeights.tf"))
    self.homemadeDQN = loadedDQN

  def testModel(self,model):
    """
    Takes in a class attribuate model. Runs testEpisodes number of test episodes.
    Mean rewards and prediction times are calculated.

    Returns
    -------
    meanReward: float
      Mean reward over the test episodes
  
    meanPredictionTime: float
      Mean time taken to make a prediciton
    """
    rewardSum = 0.0
    runs = 0
    predictionTimeSum = 0.0
    for episode in range(self.testEpisodes):
      obs = self.env.reset()

      state = None
      done = False
      episodeReward = 0.0
      while not done:
        obs = np.reshape(obs,(1,obs.shape[0]))
        #runs prediction for stable baselines models (A2C,PPO2,DQN)
        if "stable" in str(type(model)):
          runs+=1
          t1 = time.perf_counter()      
          action, state = model.predict(obs)
          newObs, reward, done, _info = self.env.step(action[0])
          predictionTime = time.perf_counter()-t1
        #runs prediction for tensorflow DQN model
        else:
          runs+=1
          t1 = time.perf_counter()
          actions = model.predict(obs)
          action = np.argmax(actions)
          newObs, reward, done, _ = self.env.step(action)
          predictionTime = time.perf_counter()-t1
        obs = newObs
        predictionTimeSum += predictionTime
        episodeReward += reward
      rewardSum += episodeReward
    meanReward = rewardSum / self.testEpisodes
    meanPredictionTime = predictionTimeSum/runs
    return meanReward,meanPredictionTime

  def comparePerformance(self, testEpisodes):
    """
    Compares the performance of each model by calling testModel and creating a 
    pandas dataframe of the results

    Parameters
    ----------
    testEpisodes: int
      Number of episodes to test the model on

    Returns
    -------
    resultsFrame: pandas dataframe
      Dataframe that includes the results
    """
    self.testEpisodes = testEpisodes
    #Load the models
    self.loadModels()
    data=[]
    #Compile new point class for experimental results
    Point = make_dataclass("Point", [("meanRewards", int),('meanPredictionTime',float), ("models", str)])
    #Test A2C
    meanReward,meanPredictionTime = self.testModel(self.A2C)
    data.append(Point(meanReward,meanPredictionTime,'A2C'))
    #Test PPO2
    meanReward,meanPredictionTime = self.testModel(self.PPO2)
    data.append(Point(meanReward,meanPredictionTime,'PPO2'))
    #Test DQN
    meanReward,meanPredictionTime = self.testModel(self.DQN)
    data.append(Point(meanReward,meanPredictionTime,'DQN'))
    #Test homemadeDQN
    meanReward,meanPredictionTime = self.testModel(self.homemadeDQN)
    data.append(Point(meanReward,meanPredictionTime,'homemadeDQN'))
    
    #Create panads results frame
    resultFrame = pd.DataFrame(data=data,columns=['meanRewards','meanPredictionTime','models'])
    return resultFrame

In [None]:
if __name__=='__main__':
  evaluator = Evaluator()
  z = evaluator.comparePerformance(testEpisodes=100)

# Plotting training scores
## Don't run

In [None]:
# plt.rc('font',size=14)
# plt.rcParams['font.family'] = 'serif'
# plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']
# x=np.load("/content/drive/My Drive/Github/RLProject/SavedModels/PPO2Logs/evaluations.npz")
# results = x['results'].squeeze()
# meanResultsPPO = x['results'].squeeze().mean(axis=1)
# timestepsPPO=x['timesteps']
# x=np.load("/content/drive/My Drive/Github/RLProject/SavedModels/A2CLogs/evaluations.npz")
# results = x['results'].squeeze()
# meanResultsA2C = x['results'].squeeze().mean(axis=1)
# timestepsA2C=x['timesteps']
# x=np.load("/content/drive/My Drive/Github/RLProject/SavedModels/DQNLogs/evaluations.npz")
# results = x['results'].squeeze()
# meanResultsDQN = x['results'].squeeze().mean(axis=1)
# timestepsDQN=x['timesteps']
# KerasDQNData = pd.read_csv('/content/drive/My Drive/Github/RLProject/SavedModels/DQNReward.csv')
# KerasDQNData = KerasDQNData.iloc[0:30]
# meanResultsKerasDQN = KerasDQNData['meanReward']
# timestepsKerasDQN = KerasDQNData['episodeNumber']
# A2Cline,=plt.plot(timestepsA2C,np.log(meanResultsA2C),label='A2C')
# PPOline,=plt.plot(timestepsPPO,np.log(meanResultsPPO),label='PPO2')
# DQNline,=plt.plot(timestepsDQN,np.log(meanResultsDQN),label='DQN')
# KerasDQNline,=plt.plot(timestepsKerasDQN,np.log(meanResultsKerasDQN),label='KerasDQN')
# # plt.yscale('log')
# plt.legend(handles=[A2Cline, DQNline, PPOline, KerasDQNline])
# plt.xlabel('Steps')
# plt.ylabel('Mean reward on validation \n test')
# # plt.savefig('/content/drive/My Drive/Github/RLProject/SavedModels/trainingresults.png', bbox_inches="tight")