In [1]:
%cd ..

/home/juanhevia/IDIL


In [3]:
from collections import defaultdict
import pandas as pd 
import numpy as np
import torch
import matplotlib.pyplot as plt 
import os 
from idil_algs.IDIL.agent.mental_iql import MentalIQL
from munch import Munch
import yaml
import idil_gym
from idil_algs.baselines.IQLearn.dataset.expert_dataset import ExpertDataset
from idil_algs.baselines.IQLearn.utils.utils import make_env
import gym


RESULTS_PATH="./idil_train/result/"


def load_expert_data_w_labels(demo_path, num_trajs, n_labeled, seed):
    expert_dataset = ExpertDataset(demo_path, num_trajs, 1, seed + 42)
    print(f'--> Expert memory size: {len(expert_dataset)}')

    cnt_label = 0
    traj_labels = []
    for i_e in range(num_trajs):
        if "latents" in expert_dataset.trajectories:
            expert_latents = expert_dataset.trajectories["latents"][i_e]
        else:
            expert_latents = None

        if i_e < n_labeled:
            traj_labels.append(expert_latents)
            cnt_label += 1
        else:
            traj_labels.append(None)

    print(f"num_labeled: {cnt_label} / {num_trajs}, num_samples: ",
        len(expert_dataset))
    return expert_dataset, traj_labels, cnt_label

def compute_sequence_accuracy(pred_latents, true_latents):
    pred_latents = np.array(pred_latents)
    true_latents = np.array(true_latents)
    return np.sum(pred_latents == true_latents) / len(true_latents)

# Agent loading

def get_run_path(env_name: str, run_id: str):
    """
    Get the path where we store 'model' and 'log' data for a given run
    """
    # read one dir below, as there is always a date directory
    _path = os.path.join(RESULTS_PATH, env_name, 'idil', run_id)
    _date_folder = os.listdir(_path)[0]
    return os.path.join(_path, _date_folder)

def get_run_config(run_path:str):
    """
    Parse run YAML configuration and return as Munch dictioanry object
    """

    with open(os.path.join(run_path, 'log', 'config.yaml') , "r") as f:
        run_conf = yaml.load(f, Loader=yaml.FullLoader)
        run_conf = Munch(run_conf)
    return run_conf

def get_agent(run_path: str, run_conf: Munch):
    """
    Load the agent from the run path
    """
    # load env
    env = make_env(run_conf.env_name)

    _obs_space_dim = env.observation_space.n if isinstance(env.observation_space, gym.spaces.Discrete) else env.observation_space.shape[0]
    _act_space_dim = env.action_space.n if isinstance(env.action_space, gym.spaces.Discrete) else env.action_space.shape[0]    

    miql_agent = MentalIQL(config=run_conf,
                           obs_dim=_obs_space_dim,
                           action_dim=_act_space_dim,
                           lat_dim=run_conf.dim_c, # obs dim and action dim are hardcoded for now, they belogn to CleanupSingle
                           discrete_obs=isinstance(env.observation_space, gym.spaces.Discrete),
                           discrete_act=isinstance(env.action_space, gym.spaces.Discrete))

    prefix = os.listdir(os.path.join(run_path, 'model'))[0].split("_pi")[0]

    miql_agent.load(os.path.join(run_path, 'model', prefix))
    return miql_agent

# Actions accuracy

In [None]:
def backtest_action_trajectory(expert_dataset, agent):
    """
    Given a precomputed set of latents and states,
    run the agent to sample an action and see how the actions differ
    """

    agent_action_trajs = []

    for traj_idx in range(len(expert_dataset.trajectories["states"])):
        traj_states = expert_dataset.trajectories["states"][traj_idx]
        traj_latents = expert_dataset.trajectories["latents"][traj_idx]

        _action_traj = []
        for _state, _lat in zip(traj_states, traj_latents):
            _action = agent.choose_policy_action(_state, _lat)
            _action_traj.append(_action)

        agent_action_trajs.append(_action_traj)

    return agent_action_trajs

def compute_action_accuracy(expert_dataset, agent):
    """
    Compute the action accuracy between expert and agent
    """

    agent_action_trajs = backtest_action_trajectory(expert_dataset, agent)

    accs = []
    for i in range(len(expert_dataset.trajectories["states"])):
        _test_acts_expert = np.array(expert_dataset.trajectories["actions"][i])
        _test_acts_agent =  np.array(agent_action_trajs[i])

        accs.append(np.sum(_test_acts_expert == _test_acts_agent) / len(_test_acts_expert))

    return accs