In [38]:
%reload_ext autoreload
%autoreload 2
import random
from VLA2Systems.task_data_generator import TaskDataGenerator
# Using a simple list
# env_list = ["MiniGrid-DoorKey-16x16-v0", "MiniGrid-MultiRoom-N6-v0"]
env_list = [
    "BabyAI-OneRoomS8-v0", "BabyAI-ActionObjDoor-v0",
    "BabyAI-FindObjS5-v0","BabyAI-GoTo-v0", "BabyAI-GoToOpen-v0",
    "BabyAI-UnlockToUnlock-v0",
    "BabyAI-Synth-v0"
]

generator = TaskDataGenerator(env_list, verbose=0)
seed = random.randint(1, 1000)
generator.reset(seed=seed)
plan = generator.generate_plan()
if plan:
    text = generator.get_output_text(include_all=True)
    print(text)
else:
    print("Planning Failed")


Reasoning Stage: 
Perform these steps to directly pick up ball which is in Room 0, In order to reach room 0 open grey door at (5, 2) which is currently locked and connect Room 0 and Room 1, In order to unlock grey door at (5, 2) which is in Room 1 pick up grey key which is in Room 2, In order to reach room 2 open yellow door at (10, 2) which is currently locked and connect Room 1 and Room 2, but In order to unlock yellow door at (10, 2) which is in Room 1 pick up yellow key which is in Room 1, which is where the robot is currently in room 1.
End of Reasoning Stage
Execution Stage:
Step 1: Pick up yellow key. Reason is: In order to unlock yellow door at (10, 2) which is in Room 1 pick up yellow key which is in Room 1
Step 2: Open yellow door at (10, 2). Reason is: In order to reach room 2 open yellow door at (10, 2) which is currently locked and connect Room 1 and Room 2
Step 3: Pick up grey key. Reason is: In order to unlock grey door at (5, 2) which is in Room 1 pick up grey key which

In [22]:
from VLA2Systems.reasoning_msgs import directly_action, connecting_action, unlock_door_reason, unblock_reason
def generate_backward_logic(plan, generator):
    """
    Generates a backward reasoning paragraph based on the provided plan.
    
    Each plan entry is a tuple:
      (action, object_type, color, position, room_id, reason_msg)
    
    The reasoning starts from the goal (the last step) and works backward,
    connecting the steps with phrases like "but" and ending with the robot's current room.
    
    Parameters:
      plan (list of tuples): The task plan from start to finish.
      robot_current_room (int): The current room of the robot.
    
    Returns:
      str: The backward reasoning paragraph.
    """
    if not plan:
        return ""
    robot_current_room = generator.planner.get_current_room(generator.start_location)
    # doors = generator.knowledge_base.KB['connections']
    # Start from the goal: the last step's reason message
    
    backward_text = plan[-1][-1].strip()
    
    # If there are prior steps, add them in reverse order
    if len(plan) > 1:
        # Reverse the plan excluding the last (goal) step.
        backward_text = "Perform these steps to " + backward_text
        reversed_steps = plan[:-1][::-1]
        
        # If more than one step, join them with commas and add a "but" before the last one.
        if len(reversed_steps) == 1:
            backward_text += ", " + reversed_steps[0][-1].strip()
        else:
            for i, step in enumerate(reversed_steps):
                # Insert a connector: for the first step in the reverse order, just a comma
                if i == 0:
                    backward_text += ", " + step[-1].strip()
                # For the last one in the reversed sequence, use "but" to emphasize the need to satisfy this condition
                elif i == len(reversed_steps) - 1:
                    backward_text += ", but " + step[-1].strip()
                else:
                    backward_text += ", " + step[-1].strip()
    
    # Append an ending phrase indicating the robot's current location.
    backward_text += f", which is where the robot is currently in room {robot_current_room}."
    return backward_text

In [None]:
backward_reasoning = generate_backward_logic(generator.plan, generator)
print(backward_reasoning)

In [4]:
%reload_ext autoreload
%autoreload 2
# Using a difficulty-based dictionary
env_dict = {
    "easy": ["BabyAI-ActionObjDoor-v0"],
    "intermediate": ["BabyAI-FindObjS5-v0"],
    "hard": ["BabyAI-UnlockToUnlock-v0", "BabyAI-Synth-v0"]
}
index = 0
generator = TaskDataGenerator(env_dict)
generator.reset(difficulty="hard")  # Select from "hard" list
generator.generate_plan()
# generator.save_env_image(filename=f"Image-{generator.env_name}-{generator.seed}-{index}.png")
if plan:
    input_text = generator.get_input_text(include_robot_current_room=True, include_grid=False)
    output_text = generator.get_output_text()
    print(f"Input text:\n{input_text}")
    print(f"Output text:\n{output_text}")
else:
    print("Planning Failed")


Input text:
Knowledge Base:
Room 0:
  red box is at (3, 6) in Room 0
  blue box is at (6, 6) in Room 0
  grey door is at (4, 7) in Room 0 and is currently closed
Room 1:
  red box is at (8, 5) in Room 1
  purple key is at (11, 3) in Room 1
  purple ball is at (12, 5) in Room 1
  green door is at (14, 3) in Room 1 and is currently closed
Room 2:
  green box is at (16, 2) in Room 2
  green door is at (14, 3) in Room 2 and is currently closed
  blue door is at (16, 7) in Room 2 and is currently closed
Room 3:
  green key is at (2, 8) in Room 3
  grey door is at (4, 7) in Room 3 and is currently closed
  yellow door is at (7, 10) in Room 3 and is currently closed
Room 4:
  grey box is at (8, 8) in Room 4
  blue key is at (9, 13) in Room 4
  blue box is at (11, 13) in Room 4
  yellow door is at (7, 10) in Room 4 and is currently closed
  grey door is at (12, 14) in Room 4 and is currently closed
Room 5:
  blue ball is at (16, 10) in Room 5
  yellow ball is at (20, 11) in Room 5
  blue door 

In [None]:
# from VLA2Systems.task_data_collector import DataCollector
# collector = DataCollector("configs/data_collection_config.yaml")
# collector.collect_data()


In [None]:
import os
from datasets import load_from_disk

def print_dataset_samples(dataset_path, num_samples=5):
    if not os.path.exists(dataset_path):
        print(f"Error: Dataset path '{dataset_path}' does not exist.")
        return
    
    print(f"Loading dataset from {dataset_path}...")
    dataset = load_from_disk(dataset_path)
    
    print(f"Dataset loaded! Total samples: {len(dataset)}")
    
    print(f"Showing {min(num_samples, len(dataset))} samples:")
    for i, sample in enumerate(dataset.select(range(min(num_samples, len(dataset))))):
        print(f"\nSample {i+1}:")
        print(f"Input: \n{sample['input']}")
        print(f"Output: \n{sample['output']}")
        print("-" * 50)

dataset_name = "./datasets/robot_LLM_grid_dataset_10k/hard/"
print_dataset_samples(dataset_name)


# Merge the data based on a split.

In [None]:
import random
from datasets import load_from_disk, Dataset

# Define dataset paths
dataset_paths = {
    "easy": "./datasets/robot_LLM_grid_dataset_10k/easy/",
    "intermediate": "./datasets/robot_LLM_grid_dataset_10k/intermediate/",
    "hard": "./datasets/robot_LLM_grid_dataset_10k/hard/"
}

# Define contribution percentages (must sum to 1.0)
contribution_percentages = {
    "easy": 0.85,         # 40% from easy
    "intermediate": 0.12, # 30% from intermediate
    "hard": 0.03          # 30% from hard
}

# Load datasets
datasets = {key: load_from_disk(path) for key, path in dataset_paths.items()}

# Determine total dataset size
total_size = sum(len(ds) for ds in datasets.values())
target_size = min(len(ds) for ds in datasets.values())  # Use smallest dataset as a baseline
target_size = 3000

# Calculate samples per dataset
sample_sizes = {key: int(target_size * percentage) for key, percentage in contribution_percentages.items()}

# Sample and merge datasets
mixed_data = []
for key, dataset in datasets.items():
    sampled_data = dataset.shuffle(seed=42).select(range(sample_sizes[key]))  # Random sampling
    mixed_data.extend(sampled_data)

# Convert to Hugging Face Dataset
mixed_dataset = Dataset.from_list(mixed_data)

# Shuffle final dataset
mixed_dataset = mixed_dataset.shuffle(seed=42)

# Save the mixed dataset (optional)
mixed_dataset.save_to_disk("./datasets/robot_LLM_grid_dataset_10k/mixed_dataset")

print("Final dataset size:", len(mixed_dataset))