# Imitation Learning Dataset Generation Notebook

This notebook is designed to generate additional datasets via imitation learning. It instantiates the default IsaacLab environment, overrides key parameters (such as the number of trials and parallel environments), loads an annotated dataset, and then runs a dataset generation workflow.


In [None]:
import nest_asyncio
nest_asyncio.apply()

from argparse import ArgumentParser, Namespace
from isaaclab.app import AppLauncher

def get_default_config():
    """Creates an argument parser, adds both custom and AppLauncher arguments, and returns parsed defaults."""
    parser = ArgumentParser(description="Generate demonstrations for Isaac Lab environments.")
    parser.add_argument("--task", type=str, default=None, help="Name of the task.")
    parser.add_argument("--generation_num_trials", type=int, default=10, help="Number of demos to generate.")
    parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to instantiate.")
    parser.add_argument("--input_file", type=str, default="path/to/your/input_file.hdf5", help="Path to the source dataset file.")
    parser.add_argument("--output_file", type=str, default="./datasets/output_dataset.hdf5", help="Path to export episodes.")
    parser.add_argument("--pause_subtask", action="store_true", help="Pause after every subtask during generation.")
    AppLauncher.add_app_launcher_args(parser)
    return parser.parse_args([])

args_cli = get_default_config()

# ===================== MODIFY PARAMETERS HERE =====================
# Update these parameters according to your needs
custom_overrides = {
    "task": "Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0",  # Task name
    "num_envs": 9,                                       # Number of parallel environments
    "generation_num_trials": 10,                         # Number of demonstrations to generate
    "input_file": "datasets/annotated_dataset.hdf5",     # Source dataset path
    "output_file": "datasets/generated_dataset.hdf5",    # Output dataset path
}
# ===============================================================

# Update the default configuration
args_dict = vars(args_cli)
args_dict.update(custom_overrides)
args_cli = Namespace(**args_dict)

# Now launch the simulator with the final configuration
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app


# Configure environment

In [None]:
import asyncio
import gymnasium as gym
import numpy as np
import random
import torch

import isaaclab_mimic.envs  # noqa: F401
from isaaclab_mimic.datagen.generation import env_loop, setup_env_config, setup_async_generation
from isaaclab_mimic.datagen.utils import get_env_name_from_dataset, setup_output_paths, interactive_update_randomizable_params

import isaaclab_tasks  # noqa: F401
num_envs = args_cli.num_envs

# Setup output paths and get env name
output_dir, output_file_name = setup_output_paths(args_cli.output_file)
env_name = args_cli.task or get_env_name_from_dataset(args_cli.input_file)

# Configure environment
env_cfg, success_term = setup_env_config(
    env_name=env_name,
    output_dir=output_dir,
    output_file_name=output_file_name,
    num_envs=num_envs,
    device=args_cli.device,
    generation_num_trials=args_cli.generation_num_trials,
)


# create environment
env = gym.make(env_name, cfg=env_cfg)

# set seed for generation
random.seed(env.unwrapped.cfg.datagen_config.seed)
np.random.seed(env.unwrapped.cfg.datagen_config.seed)
torch.manual_seed(env.unwrapped.cfg.datagen_config.seed)

# reset before starting
env.reset()

In [None]:
for event_term in env.unwrapped.event_manager._mode_term_cfgs["reset"]:
    if hasattr(event_term, "randomizable_params") and event_term.randomizable_params is not None:
        print(f"Updating parameters for event: {event_term.func.__name__}")
        interactive_update_randomizable_params(event_term, event_term.randomizable_params, env=env)

# Start data generation workflow

In [None]:
# Setup and run async data generation
async_components = setup_async_generation(
    env=env,
    num_envs=args_cli.num_envs,
    input_file=args_cli.input_file,
    success_term=success_term,
    pause_subtask=args_cli.pause_subtask
)

try:
    asyncio.ensure_future(asyncio.gather(*async_components['tasks']))
    env_loop(env, async_components['action_queue'], 
            async_components['info_pool'], async_components['event_loop'])
except asyncio.CancelledError:
    print("Tasks were cancelled.")
simulation_app.close()