In [2]:
%matplotlib inline
import gym
gym.logger.set_level(40)
import random
import numpy as np
import matplotlib.pyplot as plt
import os, sys
sys.path.insert(0,'../../RL_algorithms')
import A2C
import DDQN
import PPO
import pprint
import ipywidgets as widgets
import Utils
import csv
from IPython import display

import gym_hungry_thirsty
env_name = "hungry-thirsty-v0"
env = gym.make(env_name, size=(4,4))

your_attempts = {}
your_attempts_idx = 0

perf_tracking = {}

show_agent_window = None

In [2]:
def a2c(hyper_params, reward_fn):
    """
    Construct an A2C agent.
    """
    return A2C.create_a2c_agent(env=env, hyper_params=hyper_params, user_reward_fn=reward_fn, plot_state_visit_distribution=True)

In [1]:
def ppo(hyper_params, reward_fn):
    """
    Construct a PPO agent.
    """
    return PPO.create_ppo_agent(env=env, hyper_params=hyper_params, user_reward_fn=reward_fn, plot_state_visit_distribution=True)

In [1]:
def ddqn(hyper_params, reward_fn):
    """
    Construct a DQN agent.
    """
    return DDQN.create_ddqn_agent(env=env, hyper_params=hyper_params, user_reward_fn=reward_fn, plot_state_visit_distribution=True)

In [7]:
agent = None
epsilon = None

def train_agent(alg_and_reward_params, hyper_params, study_id):
    """
    Train the agent 
    """
    global agent, epsilon, your_attempts, your_attempts_idx, perf_tracking
    
    
    # do some error checking to make sure the hyperparameters are set before proceeding 
    if hyper_params is None:
        print ("No hyper-parameters are set. Please set these before training.")
        return 
    if None in alg_and_reward_params.values() or None in hyper_params.values():
        not_set = []
        for param in alg_and_reward_params:
            if alg_and_reward_params[param] is None:
                not_set.append(param)
        for param in hyper_params:
            if hyper_params[param] is None:
                not_set.append(param)
        print ("The following hyperparameters are not set: {}.\nPlease make a selection and try again.".format(not_set))
        return 
        
    def compute_reward(state, action, new_state):
        """
        Using the user-provided weights, compute the reward function
        """
        if state["hungry"] and state["thirsty"]:
            return hyper_params["reward_scaling_factor"] * alg_and_reward_params["Reward for state: hungry AND thirsty"]
        elif state["hungry"] and not state["thirsty"]:
            return hyper_params["reward_scaling_factor"] * alg_and_reward_params["Reward for state: hungry AND not thirsty"]
        elif not state["hungry"] and state["thirsty"]:
            return hyper_params["reward_scaling_factor"] * alg_and_reward_params["Reward for state: not hungry AND thirsty"]
        elif not state["hungry"] and not state["thirsty"]:
            return hyper_params["reward_scaling_factor"] * alg_and_reward_params["Reward for state: not hungry AND not thirsty"]
        raise Exception("Reward computation failed")
    
    # save a copy of the hyperparams set by the user 
    user_specified_hyper_params = hyper_params.copy()
    # and add some other hyperparams which may be necessary 
    hyper_params["max_steps"] = 200 
    hyper_params["sync_frequency"] = 5
    hyper_params["plot_update_freq"] = 100
    hyper_params["neural_net_hidden_size"] = 144
    hyper_params["neural_net_extra_layers"] = 0
    hyper_params["exp_replay_size"] = 5000
    hyper_params["K_epochs"] = 80 
        
    alg = alg_and_reward_params["Algorithm Choice"]
    
    trial_idx = your_attempts_idx
    
    # save params and performance to dicts ('your_attempts' and 'perf_tracking')
    env.reset(new_water_food_loc=True)

    your_attempts[trial_idx] = {"alg choice": alg, 
                                "reward": {"hungry and thirsty": alg_and_reward_params["Reward for state: hungry AND thirsty"],
                                           "hungry and not thirsty": alg_and_reward_params["Reward for state: hungry AND not thirsty"],
                                           "not hungry and thirsty": alg_and_reward_params["Reward for state: not hungry AND thirsty"],
                                           "not hungry and not thirsty": alg_and_reward_params["Reward for state: not hungry AND not thirsty"]}, 
                                "hyper_params": user_specified_hyper_params,
                                "food_loc": env.food_loc,
                                "water_loc": env.water_loc,
                               }
        
    perf_tracking[trial_idx] = {"agent": None,
                                "rewards": [], 
                                "fitness": []}
    
    fields=['trial', 'alg', 'reward_fn', 'hyper_params']
    csv_filename = "user_tests/" + study_id +".csv"
    csv_file_exists = os.path.isfile(csv_filename)
    
    with open(csv_filename, 'a') as f:
        writer = csv.writer(f)
        if not csv_file_exists:
            writer.writerow(fields)

        writer.writerow([trial_idx, 
                         your_attempts[trial_idx]['alg choice'],
                         your_attempts[trial_idx]['reward'],
                         your_attempts[trial_idx]['hyper_params']])
    
    your_attempts_idx += 1

    
    if alg == "A2C":
        agent, rewards, fitness = a2c(hyper_params, reward_fn=compute_reward)
    elif alg == "PPO":
        agent, rewards, fitness = ppo(hyper_params, reward_fn=compute_reward)
    elif alg == "DDQN":
        agent, epsilon, rewards, fitness = ddqn(hyper_params, reward_fn=compute_reward)
    else:
        print ("ERROR, {} not implemented yet".format(alg))
        
    perf_tracking[trial_idx] = {"agent": agent,
                                "rewards": rewards, 
                                "fitness": fitness}

In [6]:
def select_run_and_show_agent():
    global your_attempts, show_agent_window 
    
    if not your_attempts:
        print ("You have not yet finished training an agent.")
        return 
    
    if show_agent_window is not None: 
        show_agent_window.clear_output()
    else:
        show_agent_window = widgets.Output(layout={'border': '1px solid black'})

    options = []
    for option in list(your_attempts.keys()):
        options.insert(0, ("Trial: {}; Alg: {}".format(option, your_attempts[option]["alg choice"]), option))
    
        
    select_run = widgets.Dropdown(
        options=options,
        description='Select trial:',
        disabled=False,
    )
    button = widgets.Button(description="See Agent")
    
    select_and_enter = widgets.VBox(children=[widgets.Box(children=[select_run, button]),show_agent_window])
    
    def on_button_clicked(b):
        show_agent_window.clear_output()
        with show_agent_window:
            alg = your_attempts[select_run.value]['alg choice']
            agent = perf_tracking[select_run.value]["agent"]
            show_agent(agent=agent, alg=alg, trial_id=select_run.value)

        
    button.on_click(on_button_clicked)
    return select_and_enter

def show_agent(agent, alg, trial_id):
    """
    Visualize the agent's performance 
    """
    global epsilon
    
    if alg == None or alg == "None":
        print ("You have not yet selected an algorithm (A2C, PPO, or DDQN).")
        return 
    elif agent == None:
        print ("You have not yet finished training an agent.")
        return 

    view_training_runs(trial_id=trial_id)
    food_loc = your_attempts[trial_id]["food_loc"]
    water_loc = your_attempts[trial_id]["water_loc"]

    env.reset(food_loc=food_loc, water_loc=water_loc)
        
    wIm = widgets.Image()
    display.display(wIm)

    if alg == "A2C":
        A2C.run_episode(env=env, 
                        actor=agent, 
                        render=True, 
                        jupyter=True,
                        canvas=wIm)
    elif alg == "PPO":
        PPO.run_episode(env=env, 
                        ppo_agent=agent, 
                        render=True, 
                        jupyter=True,
                        canvas=wIm)
    elif alg == "DDQN":
        DDQN.run_episode(env=env, 
                        agent=agent, 
                        epsilon=epsilon, 
                        render=True, 
                        jupyter=True,
                        canvas=wIm)

In [None]:
# review your selections for each of your training runs
def view_training_runs(trial_id=None):
    """
    Pretty print 'your_attempts' dict
    """
    global your_attempts

    if not your_attempts:
        print ("You have not yet trained an agent.")
        return 
    
    if trial_id is None:
        trial_set = your_attempts.keys()
    else:
        trial_set = [trial_id]
    
    for i in trial_set:
        print ("Trial {}-----------------------------------------".format(i))
        print ("     {:10s}: {}".format('Algorithm', your_attempts[i]['alg choice']))

        print ("     {:10s}".format('Reward Function:'))
        for j in your_attempts[i]['reward'].keys():
            print("          {:10s}: {:1.2f}".format("r(" + j + ")",
                                                     your_attempts[i]['reward'][j]))

        print ("     {:10s}".format('Hyper-parameters:'))
        for j in your_attempts[i]['hyper_params'].keys():
            print("          {:10s}: {:1.5f}".format(j,
                                                     your_attempts[i]['hyper_params'][j]))


        print ("------------------------------------------------\n\n")

In [None]:
def review_past_run():
    global your_attempts

    if not your_attempts:
        print ("You have not yet trained an agent.")
        return 

    out = widgets.Output(layout={'border': '1px solid black'})
    
    options = []
    for option in list(your_attempts.keys()):
        options.insert(0, ("Trial: {}; Alg: {}".format(option, your_attempts[option]["alg choice"]), option))
        
    select_run = widgets.Dropdown(
        options=options,
        description='Select trial:',
        disabled=False,
    )
    button = widgets.Button(description="See Trial Metrics")
    
    select_and_enter = widgets.VBox(children=[widgets.Box(children=[select_run, button]),out])
    
    def on_button_clicked(b):
        out.clear_output()
        with out: 
            plotting = {
                0: ("Not Hungry Count Per Episode\n" +
                    r"$\Sigma_{(s, a, s') \in \tau} \mathbb{1}(s\mathrm{[is\_hungry]=False)}$",
                    "fitness", ("Episode", "Not Hungry Count"), True),
                1: ("Undiscounted Return\n" +
                    r"Summed Reward Per Episode: $\Sigma_{(s, a, s') \in \tau} r'(s)$",
                    "rewards", ("Episode", "Return"), False),
            }


            fig = Utils.InteractiveLearningCurvePlot(num_axes=len(plotting.items()))

            for idx in plotting.keys():
                title, list_name, labels, draw_scaling_lines = plotting[idx]
                plotting_data = perf_tracking[select_run.value][list_name]
                if len(plotting_data) > 1:
                    fig.update_subplot(axis_id=idx,
                                       title=title,
                                       learning_performance=plotting_data,
                                       labels=labels, 
                                       draw_scaling_lines=draw_scaling_lines)

            view_training_runs(trial_id=select_run.value)
        
    button.on_click(on_button_clicked)
    return select_and_enter

In [None]:
def submit_agent():
    global your_attempts

    if not your_attempts:
        print ("You have not yet trained an agent.")
        return 
    
    options = []
    for option in list(your_attempts.keys()):
        options.append(("Trial: {}; Alg: {}".format(option, your_attempts[option]["alg choice"]), option))
        
    select_run = widgets.Dropdown(
        options=options,
        description='Select trial:',
        disabled=False,
    )
    
    button = widgets.Button(description="Submit")    
    
    final_selection = widgets.HBox(children=[select_run, button])

    def on_button_clicked(b):
        selected_trial = select_run.value
        f = open("user_tests/" + study_id +".txt", "w")
        f.write("Selected agent: {}\n".format(selected_trial))
        f.write("ALG: {}\n".format(your_attempts[selected_trial]['alg choice']))
        f.write("REWARD: {}\n".format(your_attempts[selected_trial]['reward']))
        f.write("HYPER_PARAMS: {}".format(your_attempts[selected_trial]['hyper_params']))
        print ("You selected trial: {}".format(selected_trial))
        view_training_runs(trial_id=selected_trial)
        
    button.on_click(on_button_clicked)
    
    return final_selection

In [3]:
%%html
<style>
.output_wrapper button.btn.btn-default,
.output_wrapper .ui-dialog-titlebar {
  display: none;
}
</style>