Test PPO model

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import os

path = "/content/gdrive/MyDrive/colab_model/rocket/PPO"
if not os.path.exists(path):
  os.makedirs(path)

In [None]:
!pip install ptan tensorboardX box2D box2D-py
!pip uninstall gym -y
!pip uninstall pyglet -y
!pip install pyglet==v1.3.2

In [None]:
!git clone -b paper-training https://github.com/naufalhisyam/SpaceXReinforcementLearning.git
%cd SpaceXReinforcementLearning
!ls

In [None]:
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip3 install pyvirtualdisplay

# Virtual display
from pyvirtualdisplay import Display
import glob
import io
import base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

def show_video():
  mp4list = glob.glob('rocket_videos/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")

In [None]:
#!/usr/bin/env python3
import os
import gym
from gym import wrappers
from lib import model
import numpy as np
import torch
import ptan
import pandas as pd

NAME = "Testing" #Test run name
ENV_ID = "RocketLanderTest-v0"
MODEL_TO_LOAD = "/content/gdrive/MyDrive/colab_model/rocket/PPO/ppo-test/actor_-2.634_500_episodes_133513_steps.dat"
RECORD_RUN = './rocket_videos' #record dir
SAVE_RUN = True #save states & actions into excel file
SIMULATE_WIND = True
X_FORCE = 1000 #Wind x foce

def main():
    device = torch.device("cpu")
    env = gym.make(ENV_ID)
    if RECORD_RUN:
        env = wrappers.Monitor(env, RECORD_RUN, force=True)

    net = model.ModelActor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
    if MODEL_TO_LOAD:
        net.load_state_dict(torch.load(MODEL_TO_LOAD, map_location=device))

    obs = env.reset()
    total_reward = 0.0
    total_steps = 0
    left_or_right_movement = np.random.randint(0, 2)
    
    if SAVE_RUN:
        os.makedirs("excel_logs/states-actions/", exist_ok=True)
        x_pos, y_pos, theta = ([] for _ in range(3))
        gimbal, throttle, side_thruster = ([] for _ in range(3))
        if len(obs) == 10:
            vel_x, vel_y, ang_vel = ([] for _ in range(3))
        if SIMULATE_WIND:
            wind_x, wind_y = ([] for _ in range(2))
    
    while True:
        obs_v = ptan.agent.float32_preprocessor([obs]).to(device)
        states = env.get_states_value()
        mu_v = net(obs_v)[0]
        mu = mu_v.squeeze(dim=0).data.cpu().numpy()
        logstd = net.logstd.data.cpu().numpy()
        rnd = np.random.normal(size=logstd.shape)
        action = mu + np.exp(logstd) * rnd
        action = np.clip(action, -1, 1) 

        if SAVE_RUN:
            x_pos.append(states[0])
            y_pos.append(states[1])
            theta.append(states[2])
                
            gimbal.append(states[4])
            throttle.append(states[3])
            side_thruster.append(action[2])
                
            if len(obs) == 10:
                vel_x.append(states[7])
                vel_y.append(states[8])
                ang_vel.append(states[9])
        
        if np.isscalar(action): 
            action = [action]
        obs, reward, done, _ = env.step(action)
        total_reward += reward
        total_steps += 1
        
        #Simulate wind
        if SIMULATE_WIND:
            if states[5] == 0 and states[6] == 0:
                    env.apply_random_x_disturbance(epsilon=0.005, left_or_right=left_or_right_movement, x_force=X_FORCE)
                    #env.apply_random_y_disturbance(epsilon=0.005)
                    if SAVE_RUN:
                        winds = env.get_winds_value()
                        wind_x.append(winds[0])
                        wind_y.append(winds[1])
            
        if done:
            if SAVE_RUN:
                if len(obs) == 10:
                    state_dat = pd.DataFrame(list(zip(x_pos, y_pos, theta, vel_x, vel_y, ang_vel)),\
                        columns=['x_pos', 'y_pos', 'theta', 'vel_x', 'vel_y', 'ang_vel'])
                else:
                    state_dat = pd.DataFrame(list(zip(x_pos, y_pos, theta)),\
                        columns=['x_pos', 'y_pos', 'theta'])
                    
                act_dat = pd.DataFrame(list(zip(gimbal, throttle, side_thruster)),columns=['gimbal', 'throttle', 'side_thruster'])
                if SIMULATE_WIND:
                    wind_dat = pd.DataFrame(list(zip(wind_x, wind_y)),columns=['x_wind force', 'y_wind force'])
                
                with pd.ExcelWriter(f"/content/SpaceXReinforcementLearning/excel_logs/states-actions/ppo_states-acts_{NAME}_{round(total_reward, 3)}_{total_steps}.xlsx") as writer:
                    state_dat.to_excel(writer, sheet_name="state")
                    act_dat.to_excel(writer, sheet_name="action")
                    if SIMULATE_WIND:
                        wind_dat.to_excel(writer, sheet_name="winds")
                os.makedirs("/content/gdrive/MyDrive/colab_model/rocket/PPO/test_logs", exist_ok=True)
                !cp -a "/content/SpaceXReinforcementLearning/excel_logs/states-actions/." "/content/gdrive/MyDrive/colab_model/rocket/PPO/test_logs"
                print("Test logs saved")
            break
    env.close()
    show_video()
    print("In %d steps we got %.3f reward" % (total_steps, total_reward))

In [None]:
main()