# Test Service

Intended to test the service.py evaluator.
Runs the service.py and a simple client.

Setup
---

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import PIL
from flatland.utils.rendertools import RenderTool
import imageio
import os

In [None]:
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.schedule_generators import sparse_schedule_generator
from flatland.envs.malfunction_generators import malfunction_from_file, no_malfunction_generator
from flatland.envs.rail_generators import rail_from_file
from flatland.envs.schedule_generators import schedule_from_file
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.envs.persistence import RailEnvPersister
from flatland.evaluators.client import FlatlandRemoteClient, TimeoutException
import flatland.evaluators.service as fes

In [None]:
import pickle
import redis
import subprocess as sp
import shlex
import time
import pkg_resources as pr
import importlib_resources as ir
import sys, os
import pandas as pd

In [None]:
sDirRoot = "/Users/flaurent/Sites/flatland/neurips2020-flatland-starter-kit/scratch/test-neurips2020-round2-v0"

In [None]:
!ps -ef | grep -i python | grep -i flatland.evaluators.service

In [None]:
def kill_evaluator():
    # kill previous evaluator
    !ps -ef | grep -i python | grep -i flatland.evaluators.service | awk '{print $2}' | xargs kill

In [None]:
def cleanup():
    oRedis = redis.Redis()
    lKeys = oRedis.keys("flatland*")
    for sKey in lKeys:
        print("Deleting:", sKey)
        oRedis.delete(sKey)
    
    !rm -f /tmp/output.csv
    
    kill_evaluator()

In [None]:
def configure_env(overall=8*60*60, planning=5*60, step=10, successive=10, mean_percentage=0.25):
    osEnv2 = os.environ.copy()
    osEnv2["FLATLAND_OVERALL_TIMEOUT"]=str(overall)
    osEnv2["FLATLAND_PER_STEP_TIMEOUT"] = str(step)
    osEnv2["FLATLAND_INITIAL_PLANNING_TIMEOUT"] = str(planning)
    osEnv2["FLATLAND_MAX_SUCCESSIVE_TIMEOUTS"] = str(successive)
    osEnv2["TEST_MIN_PERCENTAGE_COMPLETE_MEAN"] = str(mean_percentage)
    return osEnv2

In [None]:
def start_evaluator(env):
    sCmd = f"python -m flatland.evaluators.service --test_folder {sDirRoot} --pickle" # --verbose"
    lsCmd = shlex.split(sCmd)
    print(sCmd)
    
    oPipe = sp.Popen(lsCmd, env=env)
    oPipe.poll()

In [None]:
def start_client():
    oFRC = FlatlandRemoteClient(test_envs_root=sDirRoot, verbose=False, use_pickle=True)
    env, env_dict = RailEnvPersister.load_new(f"{sDirRoot}/Test_0/Level_0.pkl")
    return oFRC

In [None]:
def random_controller(obs, _env):
    np.random.seed(0)
    
    dAct = {}
    for iAg in range(len(_env.agents)):
        dAct[iAg] = np.random.randint(0, 5)
    return dAct

In [None]:
def run_submission(oFRC, slow_ep=-1, debug=False):
    def log(txt, end="\n"):
        if debug: print(txt, end)
    
    dummy_obs = DummyObservationBuilder()
    episode = 0
    obs = True
    
    while obs:
        obs, info = oFRC.env_create(obs_builder_object=dummy_obs)
        log(oFRC.current_env_path)
        log(f"Episode : {episode}")
        
        if not obs:
            log("None observation - all envs completed!")
            break
        
        while True:
            action = random_controller(obs, oFRC.env)
            
            if slow_ep != -1:
                # make a specific episode artificially slow
                if (episode == slow_ep) and (oFRC.env._elapsed_steps > 10):
                    time.sleep(2)
                
            try:
                observation, all_rewards, done, info = oFRC.env_step(action)
                #log(".", end="")
                if done['__all__']:
                    log("\nCompleted Episode : ", episode)
                    log("Reward : ", sum(list(all_rewards.values())))
                    break
            except TimeoutException as err:
                log("Timeout: ", err)
                break
            
        episode += 1
        
    #print(f"Evaluation Complete - episodes={episode} - send submit message...")
    #print(oFRC.submit())
    #print("All done.")

Tests
---

In [None]:
%%time

# Normal submission
cleanup()
config = configure_env(overall=8*60*60, planning=5*60, step=10, successive=10, mean_percentage=0.1)
start_evaluator(config)
client = start_client()
run_submission(client)

#try:
#    run_submission(client)
#except Exception as e:
#    print("== EXCEPTION! ===")
#    print(e)

In [None]:
%%time

# Overall timeout
cleanup()
config = configure_env(overall=3, planning=5*60, step=10, successive=10, mean_percentage=0.0)
start_evaluator(config)
client = start_client()
run_submission(client, debug=False)

In [None]:
%%time

# Step timeout
# FIXME fails!
"""
cleanup()
config = configure_env(overall=3, planning=5*60, step=10, successive=10, mean_percentage=0.0)
start_evaluator(config)
client = start_client()
run_submission(client, slow_ep=2, debug=True)
"""

In [None]:
#pd.read_csv("/tmp/output.csv").T

Cleanup
---

In [None]:
kill_evaluator()