### Interface planner with sim_env

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import numpy as np

from unified_planning.shortcuts import *
from unified_planning.io import PDDLReader

from PIL import Image

from igibson.action_primitives.fetch_robot_semantic_actions_env import FetchRobotSemanticActionEnv
from igibson.custom_utils import get_env_config, print_properties
import igibson.render_utils as render_utils

import warnings
warnings.filterwarnings("ignore")

In [None]:
def print_symbolic_state(state):
    print("-"*76)
    print("Symbolic state: \n")
    for k in state:
        true_states = {}
        for key in state[k].keys():
            val = state[k][key] # bool
            if val: 
                true_states[key] = val
                
        print(k, true_states)
    print("-"*76)

def execute_plan(sim_env, plan):
    image, symbolic_state = sim_env.get_state_and_image()

    image.show()
    print_symbolic_state(symbolic_state)
    
    for action in plan:
        try:
            success, image, symbolic_state = sim_env.apply_action(action)
            print(f'Action {action} executed. Success: {success}')

            image.show()
            print_symbolic_state(symbolic_state)
            
            legal = True
            
        except (ValueError, AssertionError) as e:
            print(f"Action {action} failed: {e}")
            success = False
            legal = False

def translate_str_to_dict(action):
    action_name, action_args = action.split('(')

    # Remove trailing ')'
    action_args = action_args.split(')')[0] 
    
    if ',' in action_args:
        # Multiple arguments: split by comma, remove extra white spaces
        action_args = [s.strip() for s in action_args.split(',')]
    else:
        # Single arg
        action_args = [action_args]

    return {'action':action_name, 'params':action_args}

In [None]:
# Disable print of credits
up.shortcuts.get_env().credits_stream = None 

# PDDL configs
pddl_dir = 'pddl' 
domain_file = os.path.join(pddl_dir,'domain.pddl')
pddl_problem_file = 'pddl/easy/cleaning_out_drawers_nextto.pddl'
reader = PDDLReader()

# Env config
task = "cleaning_out_drawers"
scene_id =  "Benevolence_1_int"

# Init env
sim_env = FetchRobotSemanticActionEnv(task, scene_id, verbose=False)

In [None]:
# Parse problem and domain
problem = reader.parse_problem(domain_file, pddl_problem_file)
    
# Make a plan for the problem
with OneshotPlanner(problem_kind=problem.kind) as planner:
    result = planner.solve(problem)

# Check if planner found a plan
if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
    print("Plan found.")
    # Prepare plan in the right format
    plan = [translate_str_to_dict(str(action)) for action in result.plan.actions]
    for action in plan:
        print(action)
else:
    print("No plan found.")
    plan = []

execute_plan(sim_env, plan)

### Run on all problems of a split

In [None]:
import os

In [None]:
from unified_planning.shortcuts import *
from unified_planning.io import PDDLReader

In [None]:
reader = PDDLReader()

pddl_dir = 'pddl' 
split_dir = os.path.join(pddl_dir, 'easy')
domain_file = os.path.join(pddl_dir,'domain.pddl')
test_activity_files = [file for file in os.listdir(split_dir) if file!='.ipynb_checkpoints']

for test_activity_file in test_activity_files:
    test_activity_file = os.path.join(split_dir, test_activity_file)
    print(f"\nLooking for plan for {test_activity_file}")
    problem = reader.parse_problem(domain_file, test_activity_file)
    
    # Make a plan for the problem
    with OneshotPlanner(problem_kind=problem.kind) as planner:
        result = planner.solve(problem)
        
        # print(result.log_messages)
        
    if result.status == up.engines.PlanGenerationResultStatus.SOLVED_SATISFICING:
        print("Plan found.")
        for action in result.plan.actions:
            print(action)
    else:
        print("No plan found.")

### Exploration of bddl

In [None]:
import os
import bddl

In [None]:
activities = sorted(
        [
            item
            for item in os.listdir(os.path.join(os.path.dirname(bddl.__file__), "activity_definitions"))
            if item != "domain_igibson.bddl"
        ]
    )

assert len(activities)==100, f"Only {len(activities)} activities found."

In [None]:
os.path.join(os.path.dirname(bddl.__file__), "activity_definitions")

In [None]:
activities

In [None]:
domain_file = os.path.join(os.path.dirname(bddl.__file__), "activity_definitions", "domain_igibson.bddl")
with open(domain_file, "r") as f:
    domain = f.read()
    
print(domain)

In [None]:
predicates_start = domain.find(":predicates")
all_predicates = []
for i, line in enumerate(domain[predicates_start:].split("\n")[1:]):
    if line.strip() == ")":
        break
    predicate = line.strip().split()[0][1:]
    print(f"Predicate {i+1}: {predicate}")
    all_predicates.append(predicate)
    
print(f"Total predicates: {len(all_predicates)}")

In [None]:
from bddl.parsing import parse_problem, parse_predicates

In [None]:
def parse_predicates_custom(state, all_predicates, found_predicates=None):
    if found_predicates is None:
        found_predicates = {'known': set(), 'unknown': set()}
        
    for item in state:
        # print(item)
        if item in ['and', 'or', 'not', 'forall', 'exists', 'forpairs', 'forn']:
            continue
        if type(item) == list and len(item) == 1:
            continue
        if item[0].startswith("?"): # Ignore variables (type)
            continue
        if type(item[0]) == str and type(item[1]) == str:
            if item[0] in all_predicates:
                found_predicates['known'].add(item[0])
            else:
                found_predicates['unknown'].add(item[0])
        else:
            parse_predicates_custom(item, all_predicates, found_predicates)
            
    return found_predicates

In [None]:
activity = activities[5]
print(activity)

problem, objects, initial_state, goal_state = parse_problem(activity, "0", domain_name="igibson")
parse_predicates_custom(goal_state, all_predicates)

In [None]:
predicates_for_activity = {}

for activity in activities:
    problem, objects, initial_state, goal_state = parse_problem(activity, "0", domain_name="igibson")
    found_predicates = parse_predicates_custom(initial_state, all_predicates)
    found_predicates_for_goal = parse_predicates_custom(goal_state, all_predicates)
    found_predicates['known'] = found_predicates['known'].union(found_predicates_for_goal['known'])
    found_predicates['unknown'] = found_predicates['unknown'].union(found_predicates_for_goal['unknown'])
    predicates_for_activity[activity] = found_predicates

In [None]:
# Print a global list of predicates (known and unknown), count how many activities they appear in

all_predicates = set()
all_predicates_known = set()


for activity, predicates in predicates_for_activity.items():
    all_predicates.update(predicates['known'])
    all_predicates_known.update(predicates['known'])
    all_predicates.update(predicates['unknown'])
    
predicate_count = {predicate: 0 for predicate in all_predicates}
for activity, predicates in predicates_for_activity.items():
    for predicate in predicates['known']:
        predicate_count[predicate] += 1
    for predicate in predicates['unknown']:
        predicate_count[predicate] += 1
        
predicate_count = {k: v for k, v in sorted(predicate_count.items(), key=lambda item: item[1], reverse=True)}

for predicate, count in predicate_count.items():
    print(f"{predicate} ({'known' if predicate in all_predicates_known else 'unknown'}): {count}")

In [None]:
predicate_activities = {predicate: [] for predicate in all_predicates}
for activity, predicates in predicates_for_activity.items():
    for predicate in predicates['known']:
        predicate_activities[predicate].append(activity)
    for predicate in predicates['unknown']:
        predicate_activities[predicate].append(activity)
        
for predicate, acts in predicate_activities.items():
    print(f"{predicate}: {acts}")

In [None]:
from bddl.config import get_definition_filename

In [None]:
test_activity = activities[5]
print(test_activity)