# Test Service

Intended to test the service.py evaluator.
Runs the service.py and a simple client.



# Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [3]:
import PIL
from flatland.utils.rendertools import RenderTool
import imageio
import os

In [4]:
from IPython.display import clear_output
from IPython.core import display
import ipywidgets as ipw
display.display(display.HTML("<style>.container { width:95% !important; }</style>"))

In [5]:
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.schedule_generators import sparse_schedule_generator
from flatland.envs.malfunction_generators import malfunction_from_file, no_malfunction_generator
from flatland.envs.rail_generators import rail_from_file
from flatland.envs.schedule_generators import schedule_from_file
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.envs.persistence import RailEnvPersister
from flatland.evaluators.client import FlatlandRemoteClient, TimeoutException
import flatland.evaluators.service as fes

In [6]:
import pickle
import redis
import subprocess as sp
import shlex
import time
import pkg_resources as pr
import importlib_resources as ir
import sys, os
import pandas as pd

In [7]:
!pwd

/home/jeremy/projects/aicrowd/rl-trains/flatland5/notebooks


### Find the real path of the `env_data` package (should be copied by tox)

In [8]:
with ir.path("env_data.tests", "test_001.pkl") as oPath:
    sPath = oPath
print(type(sPath), sPath)

<class 'pathlib.PosixPath'> /home3/jeremy/projects/aicrowd/rl-trains/flatland5/env_data/tests/test_001.pkl


In [9]:
sDirRoot = "/" + "/".join(sPath.parts[1:-1] + ("service_test",""))
sDirRoot

'/home3/jeremy/projects/aicrowd/rl-trains/flatland5/env_data/tests/service_test/'

### Clear any old redis keys

In [10]:
oRedis = redis.Redis()

In [11]:
lKeys = oRedis.keys("flatland*")
lKeys

[b'flatland-rl::FLATLAND_RL_SERVICE_ID::response::9233d209716f4ae78a5dbe124de67e27']

In [12]:
for sKey in lKeys:
    print("Deleting:", sKey)
    oRedis.delete(sKey)

Deleting: b'flatland-rl::FLATLAND_RL_SERVICE_ID::response::9233d209716f4ae78a5dbe124de67e27'


### Remove `/tmp/output.csv`

In [13]:
!rm -f /tmp/output.csv

### kill any old `service.py` process

In [14]:
!ps -ef | grep -i python | grep -i flatland.evaluators.service | awk '{print $2}' | xargs kill

In [15]:
osEnv2 = os.environ.copy()

### Timeouts copied from service.py

In [16]:
#MAX_SUCCESSIVE_TIMEOUTS = int(os.getenv("FLATLAND_MAX_SUCCESSIVE_TIMEOUTS", 10))

# 8 hours (will get debug timeout from env variable if applicable)
#OVERALL_TIMEOUT = int(os.getenv(
#    "FLATLAND_OVERALL_TIMEOUT",
#    8 * 60 * 60))

# 10 mins
#INTIAL_PLANNING_TIMEOUT = int(os.getenv(
#    "FLATLAND_INITIAL_PLANNING_TIMEOUT",
#    10 * 60))

# 10 seconds
#PER_STEP_TIMEOUT = int(os.getenv(
#    "FLATLAND_PER_STEP_TIMEOUT",
#    10))

# 5 min - applies to the rest of the commands
#DEFAULT_COMMAND_TIMEOUT = int(os.getenv(
#    "FLATLAND_DEFAULT_COMMAND_TIMEOUT",
#    5 * 60))

### Set some short timeouts for testing

In [17]:
osEnv2["FLATLAND_OVERALL_TIMEOUT"]="10"
osEnv2["FLATLAND_PER_STEP_TIMEOUT"] = "2"
osEnv2["FLATLAND_MAX_SUCCESSIVE_TIMEOUTS"] = "2"

### Create the python command for `service.py`

In [18]:
#sCmd = "python -m flatland.evaluators.service --test_folder ../env_data/tests/service_test --mergeDir ./tmp/merge --actionDir ./tmp/actions --pickle --missingOnly"
#sCmd = "python -m flatland.evaluators.service --test_folder ../env_data/tests/service_test --pickle" # --verbose"
sCmd = f"python -m flatland.evaluators.service --test_folder {sDirRoot} --pickle" # --verbose"
lsCmd = shlex.split(sCmd)
print(sCmd)
print(lsCmd)

python -m flatland.evaluators.service --test_folder /home3/jeremy/projects/aicrowd/rl-trains/flatland5/env_data/tests/service_test/ --pickle
['python', '-m', 'flatland.evaluators.service', '--test_folder', '/home3/jeremy/projects/aicrowd/rl-trains/flatland5/env_data/tests/service_test/', '--pickle']


### Run the command with Popen (output goes to jupyter stdout not notebook)

In [19]:
oPipe = sp.Popen(lsCmd, env=osEnv2)

In [20]:
oPipe.poll()

In [21]:
#oFRC = FlatlandRemoteClient(test_envs_root="../env_data/tests/service_test/", verbose=False, use_pickle=True)
oFRC = FlatlandRemoteClient(test_envs_root=sDirRoot, verbose=False, use_pickle=True)

In [22]:
#env, env_dict = RailEnvPersister.load_new("../env_data/tests/service_test/Test_0/Level_0.pkl") # env_file)
env, env_dict = RailEnvPersister.load_new(f"{sDirRoot}/Test_0/Level_0.pkl") # env_file)
ldActions = env_dict["actions"]

In [23]:
def expert_controller(obs, _env):
    return ldActions[_env._elapsed_steps]

def random_controller(obs, _env):
    dAct = {}
    for iAg in range(len(_env.agents)):
        dAct[iAg] = np.random.randint(0, 5)
    return dAct

In [24]:
oObsB = DummyObservationBuilder()

In [25]:
oObsB.get()

True

In [26]:
def run_submission(slow_ep=1, delay=2):
    episode = 0
    obs = True
    while obs:
        obs, info = oFRC.env_create(obs_builder_object=oObsB)
        if not obs:
            print("null observation - all envs completed!")
            break
        print(f"Episode : {episode}")
        

        print(oFRC.env.dones['__all__'])

        while True:
            if episode < 3:
                action = expert_controller(obs, oFRC.env)
            else:
                action = random_controller(obs, oFRC.env)
            
            time_start = time.time()
            
            if (episode == slow_ep) and (oFRC.env._elapsed_steps > 10):
                time.sleep(2)
                
            try:
                observation, all_rewards, done, info = oFRC.env_step(action)
                time_diff = time.time() - time_start
                print(".", end="")
                if done['__all__']:
                    print("\nCompleted Episode : ", episode)
                    print("Reward : ", sum(list(all_rewards.values())))
                    break
            except TimeoutException as err:
                print("Timeout: ", err)
                break
            
        episode += 1
        
    print(f"Evaluation Complete - episodes={episode} - send submit message...")
    print(oFRC.submit())
    print("All done.")
        

In [27]:
try:
    run_submission()
except TimeoutException as timeoutException:
    print("Timed out.")
    print(timeoutException)
    

DEPRECATED - use FileMalfunctionGen instead of malfunction_from_file
DEPRECATED - RailEnv arg: malfunction_and_process_data - use malfunction_generator
Episode : 0
False
...........................................................................................................................................................................
Completed Episode :  0
Reward :  10.0
DEPRECATED - use FileMalfunctionGen instead of malfunction_from_file
DEPRECATED - RailEnv arg: malfunction_and_process_data - use malfunction_generator
Episode : 1
False
...........Error received:  {'type': 'FLATLAND_RL.ENV_STEP_TIMEOUT'}
Timeout:  FLATLAND_RL.ENV_STEP_TIMEOUT
null observation - all envs completed!
Evaluation Complete - episodes=2 - send submit message...
## Client Performance Stats
	 - env_creation_wait_time	 => min: 0.0015025138854980469 || mean: 0.004898786544799805 || max: 0.008767366409301758
	 - internal_env_reset_time	 => min: 0.0014307498931884766 || mean: 0.0017570257186889648 || max: 0.

### Kill the evaluator process we started earlier

In [28]:
!ps -ef | grep -i python | grep -i flatland.evaluators.service | awk '{print $2}' | xargs kill

In [29]:
pd.read_csv("/tmp/output.csv").T

Unnamed: 0,0,1
filename,Test_0/Level_0.pkl,Test_0/Level_1.pkl
test_id,Test_0,Test_0
env_id,Level_0,Level_1
n_agents,5,5
x_dim,25,25
y_dim,25,25
n_cities,2,2
max_rails_in_city,3,3
malfunction_interval,50,50
n_envs_run,50,50
