In [2]:
# Mapping of object names to their types
objects = ["can", 'door']
object_areas = ['pick', 'drop', 'activate', 'lightswitch']
grippers = ['gripper']
types = {"location":"physobj", "object":"physobj", "gripper":"physobj"}  # Types of entities in the environment
obj_types = {obj: "object" for obj in objects}
obj_types.update({area: "location" for area in object_areas})
obj_types.update({gripper: "gripper" for gripper in grippers})

In [3]:
predicates_type = {'at(can,activate)': 'num', 'at(can,drop)': 'num', 'at(can,lightswitch)': 'num', 'at(can,pick)': 'num', 'grasped(can)':'bool'}

In [10]:
predicates_type = {'at': 'num', 'grasped':'bool', 'picked':'bool', 'dropped':'bool', 'activated':'bool', 'lightoff':'bool', 'locked':'bool'}

In [5]:
!pip install antlr4-python3-runtime

Collecting antlr4-python3-runtime
  Downloading antlr4_python3_runtime-4.13.1-py3-none-any.whl (144 kB)
[K     |████████████████████████████████| 144 kB 3.0 MB/s eta 0:00:01
[?25hInstalling collected packages: antlr4-python3-runtime
Successfully installed antlr4-python3-runtime-4.13.1


In [5]:
!pip install tarski

Collecting tarski
  Downloading tarski-0.8.2-py3-none-any.whl (213 kB)
[K     |████████████████████████████████| 213 kB 826 kB/s eta 0:00:01
[?25hCollecting multipledispatch
  Downloading multipledispatch-1.0.0-py3-none-any.whl (12 kB)
Processing /home/lorangpi/.cache/pip/wheels/63/93/52/480c17789fbf685f5d0e33e83c311f4a844cd6e2f5cf036c03/antlr4_python3_runtime-4.7.2-py3-none-any.whl
[31mERROR: pddlpy 0.4.4 has requirement antlr4-python3-runtime==4.13.1, but you'll have antlr4-python3-runtime 4.7.2 which is incompatible.[0m
Installing collected packages: multipledispatch, antlr4-python3-runtime, tarski
Successfully installed antlr4-python3-runtime-4.7.2 multipledispatch-1.0.0 tarski-0.8.2


In [11]:
#import pddlpy
import re
from collections import namedtuple

class Action:
    def __init__(self, parameters, preconditions, effects, numerical_preconditions=None, numerical_effects=None, function_effects=None, cost=None, name=None):
        self.parameters = parameters
        self.preconditions = preconditions
        self.effects = effects
        self.numerical_preconditions = numerical_preconditions if numerical_preconditions is not None else {}
        self.numerical_effects = numerical_effects if numerical_effects is not None else {}
        self.function_effects = function_effects if function_effects is not None else {}
        self.cost = cost
        self.name = name if name != None else "a0"
        self._to_pddl()

    # def __eq__(self, other):
    #     if isinstance(other, Action):
    #         if (other.numerical_effects == {} and other.function_effects == {} and other.effects != {}) or (self.numerical_effects == {} and self.function_effects == {} and self.effects != {}):
    #             # Compare preconditions, effects, numerical_effects and numerical_preconditions
    #             equal = (self.preconditions == other.preconditions and
    #                     self.effects == other.effects and
    #                     self.numerical_effects == other.numerical_effects and
    #                     self.numerical_preconditions == other.numerical_preconditions)
    #         else:
    #             # Compare preconditions, effects, and numerical_effects only
    #             equal = (self.preconditions == other.preconditions and
    #                     self.effects == other.effects and
    #                     self.numerical_effects == other.numerical_effects)

    #         # Compare function_effects, ignoring 'total-cost'
    #         self_function_effects = {k: v for k, v in self.function_effects.items() if k != 'total-cost'}
    #         other_function_effects = {k: v for k, v in other.function_effects.items() if k != 'total-cost'}
    #         equal = equal and self_function_effects == other_function_effects

    #         return equal

    #     return False

    def __eq__(self, other):
        if isinstance(other, Action):
            # get a list of the types of a0.used_parameters
            self_types = [type for param, type in self.used_parameters]
            other_types = [type for param, type in other.used_parameters]
            # convert it to a string separated by spaces
            self_grounding = self.name + ' ' + ' '.join(self_types)
            other_grounding = other.name + ' ' + ' '.join(other_types)
            # ground the actions
            self_ground_with_types = self.ground_action(action_string=self_grounding)
            other_ground_with_types = other.ground_action(action_string=other_grounding)
            # Compare preconditions and effects
            equal = self_ground_with_types.preconditions == other_ground_with_types.preconditions
            equal = equal and self.effects == other.effects
            
            # Compare numerical_effects, ignoring 'total-cost'
            #self_numerical_effects = {k: v for k, v in self.numerical_effects.items() if k != 'total-cost'}
            #other_numerical_effects = {k: v for k, v in other.numerical_effects.items() if k != 'total-cost'}
            #equal = equal and self_numerical_effects == other_numerical_effects

            # Compare function_effects, ignoring 'total-cost'
            self_ground_with_types_function_effects = {k: v for k, v in self_ground_with_types.function_effects.items() if k != 'total-cost'}
            other_ground_with_types_function_effects = {k: v for k, v in other_ground_with_types.function_effects.items() if k != 'total-cost'}
            equal = equal and self_ground_with_types_function_effects == other_ground_with_types_function_effects

            return equal

        return False

    def _cheaper_cost_(self, other):
        if other == self:
            if self.function_effects['total-cost'] < other.function_effects['total-cost']:
                self.function_effects = other.function_effects
                return True
        return False
    
    def _same_effects_(self, other):
        same_effects = True
        # Check whether the effects, numerical_effects and function_effects are the same
        if self.effects != other.effects or self.numerical_effects != other.numerical_effects or self.function_effects != other.function_effects:
            same_effects = False
        return same_effects
    
    def _is_weaker_(self, other):
        # Check whether the preconditions are less constraining (weaker) than the other action
        weaker = True
        for predicate, value in self.preconditions.items():
            if predicate not in other.preconditions.keys():
                weaker = False
                break
            elif value != other.preconditions[predicate]:
                weaker = False
                break
        return weaker

    def _to_pddl(self):
        # Transform to PDDL format
        preconditions = []
        for predicate, value in self.preconditions.items():
            if "(" in predicate and ")" in predicate:
                formatted_predicate = f'({predicate.split("(")[0]} {" ".join(predicate.split("(")[1][:-1].split(",")).replace("  ", " ")})'
            else:
                formatted_predicate = f'({predicate})'
            if value:
                preconditions.append(formatted_predicate)
            else:
                preconditions.append(f'(not {formatted_predicate})')
        preconditions = ' '.join(preconditions)

        effects = []
        for predicate, value in self.effects.items():
            if "(" in predicate and ")" in predicate:
                formatted_predicate = f'({predicate.split("(")[0]} {" ".join(predicate.split("(")[1][:-1].split(",")).replace("  ", " ")})'
            else:
                formatted_predicate = f'({predicate})'
            if value:
                effects.append(formatted_predicate)
            else:
                effects.append(f'(not {formatted_predicate})')
        effects = ' '.join(effects)

        if self.numerical_preconditions != {}:
            numerical_preconditions = ' '.join([f'(= ({predicate.replace(",", " ").replace("(", " ").replace(")", " ")}) {value})' for predicate, value in self.numerical_preconditions.items()])
        else:
            numerical_preconditions = ''
        if self.numerical_effects != {}:
            numerical_effects = ' '.join([f'(= ({predicate.replace(",", " ").replace("(", " ").replace(")", " ")}) {value})' for predicate, value in self.numerical_effects.items()])
        else:
            numerical_effects = ''
        if self.function_effects != {}:
            function_effects = ' '.join([f'(increase ({predicate.replace(",", " ").replace("(", " ").replace(")", " ")}) {value})' if int(value) > 0 else f'(decrease ({predicate.replace(",", " ").replace("(", " ").replace(")", " ")}) {-value})' for predicate, value in self.function_effects.items()])
        else:
            function_effects = ''

        # FOR NOW it does not account for numerical preconditions (so that the agent does not need to know location requirements for actions)
        if self.effects == {}:# and self.numerical_effects == {}:
            numerical_preconditions = ''
        
        used_parameters = []
        for param, type in self.parameters:
            param_str = f'?{param}'
            if param_str in preconditions or param_str in effects or param_str in numerical_preconditions or param_str in numerical_effects or param_str in function_effects:
                used_parameters.append((param, type))
        self.used_parameters = used_parameters
        parameters = ' '.join([f'?{param} - {type}' for param, type in used_parameters])


        return f'(:action {self.name}\n  :parameters ({parameters})\n  :precondition (and {preconditions} {numerical_preconditions})\n  :effect (and {effects} {function_effects} {numerical_effects}))\n'

    def ground_action(self, action_string):
        action_parts = action_string.split(' ')
        name = action_parts[0]
        parameters = [(part, type) for part, type in zip(action_parts[1:], [param[1] for param in self.used_parameters])]

        # Create a mapping from parameters to placeholders
        param_to_placeholder = {f'?{placeholder[0]}': param[0] for param, placeholder in zip(parameters, self.used_parameters)}
        # print("\nDEBUGGING")
        # print(self._to_pddl())
        # print(action_parts)
        # print(parameters)
        # print(self.used_parameters)
        # print(self.parameters)
        # print(param_to_placeholder)
        # Ground the preconditions and effects
        grounded_preconditions = {self._replace_placeholders(predicate, param_to_placeholder): value
                                for predicate, value in self.preconditions.items()}
        grounded_numerical_preconditions = {self._replace_placeholders(predicate, param_to_placeholder): value for predicate, value in self.numerical_preconditions.items()}
        grounded_effects = {self._replace_placeholders(predicate, param_to_placeholder): value
                            for predicate, value in self.effects.items()}
        grounded_numerical_effects = {self._replace_placeholders(predicate, param_to_placeholder): value for predicate, value in self.numerical_effects.items()}
        grounded_function_effects = {self._replace_placeholders(predicate, param_to_placeholder): value for predicate, value in self.function_effects.items()}

        return Action(parameters, grounded_preconditions, grounded_effects, numerical_preconditions=grounded_numerical_preconditions ,numerical_effects=grounded_numerical_effects, function_effects=grounded_function_effects, name=name)

    def _replace_placeholders(self, predicate, param_to_placeholder):
        for placeholder, param in param_to_placeholder.items():
            predicate = predicate.replace(placeholder, param)
        return predicate.replace(' ', '')

    def __str__(self):
        return self._to_pddl()

def load_action_from_file(domain_file, problem_file, numerical):

    actions = []
    try:
        domprob = pddlpy.DomainProblem(domain_file, problem_file)
        for operator in domprob.operators():
            operator_details = domprob.domain.operators[operator]
            parameters = list(operator_details.variable_list.items())
            parameters = [(param[0].replace('?', ''), param[1]) for param in parameters]
            preconditions = operator_details.precondition_pos
            neg_preconditions = operator_details.precondition_neg
            effects = operator_details.effect_pos
            neg_effects = operator_details.effect_neg
            effects = [atom.predicate for atom in list(effects)]
            neg_effects = [atom.predicate for atom in list(neg_effects)]
            preconditions = [atom.predicate for atom in list(preconditions )]
            neg_preconditions = [atom.predicate for atom in list(neg_preconditions )]
            effects = {f'{effect[0]}({", ".join(effect[1:])})': True for effect in list(effects)}
            effects.update({f'{effect[0]}({", ".join(effect[1:])})': False for effect in neg_effects})
            pre = {f'{pre[0]}({", ".join(pre[1:])})': True for pre in preconditions}
            pre.update({f'{pre[0]}({", ".join(pre[1:])})': False for pre in neg_preconditions})
            actions.append(Action(parameters, pre, effects, name=operator))
    except Exception as e:
        print("Error loading actions from domain and problem files: ", e)
    return actions
    
def restrict_action(actions, grounded_action):
    # Parse the grounded action
    grounded_action_name = grounded_action.split('(')[0]
    grounded_action_params = grounded_action.split('(')[1][:-1].split(', ')

    # Create a new list of actions
    new_actions = []

    # Iterate over the actions
    for action in actions:
        # If the action name matches the grounded action name
        if action.name == grounded_action_name:

            # Iterate over the parameters of the action
            for i, (param, type) in enumerate(action.parameters):
                # Create a copy of the action
                new_action = Action(action.parameters.copy(), action.preconditions.copy(), action.effects.copy(), name=action.name + str(i))

                # Add a new precondition that the parameter cannot have that value
                new_action.preconditions[f'= ?{param} {grounded_action_params[i]}'] = False

                # Add the new action to the list of new actions
                new_actions.append(new_action)
        else:
            # If the action name does not match the grounded action name, add the action to the list of new actions without modifying it
            new_actions.append(action)

    # Return the new list of actions
    return new_actions

def replace_actions_in_domain(file, new_file_path, actions):
    with open(file, 'r') as f:
        domain = f.read()
    #print(domain)
    start, rest = domain.split('(:predicates', 1)
    start, constants = start.rsplit('(:constants', 1)
    predicates = '(:predicates' + rest
    constants = '(:constants' + constants

    domain = start + constants  + predicates

    try:
        start, end = domain.split('(:action', 1)
        end = ')'+end.rsplit(')', 1)[1]
    except:
        start, end  = domain.split('()', 1)
        end = ')'+end.rsplit(')', 1)[1]

    new_actions = '\n'.join([action._to_pddl() for action in actions])
    #print("New Actions = ", new_actions)

    new_file = new_file_path
    with open(new_file, 'w') as f:
        f.write(start + new_actions + end)

def extract_objects_from_predicate(predicate):
    if '(' in predicate and ')' in predicate:
        objects = predicate.split('(')[1].split(')')[0].split(',')
        return [obj.strip() for obj in objects if obj.strip()]  # remove empty strings and strip whitespace
    else:
        return []  # return an empty list if the predicate does not take any objects
    
def replace_whole_word(text, old_word, new_word):
    return re.sub(r'\b' + old_word + r'\b', new_word, text)

def operator_learner_minimal_effects_ungrounded(predicates_t, predicates_t1, type_mapping, name=None):
    parameters = []
    preconditions = {}
    effects = {}

    for predicate, value in predicates_t.items():
        objects = extract_objects_from_predicate(predicate)
        parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        for obj in objects:
            parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
        preconditions[parametrized_predicate] = value

    for predicate, value in predicates_t1.items():
        if predicate not in predicates_t or predicates_t1[predicate] != predicates_t[predicate]:
            objects = extract_objects_from_predicate(predicate)
            parameters.extend([(obj, type_mapping[obj]) for obj in objects])
            parametrized_predicate = predicate
            for obj in objects:
                parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
            effects[parametrized_predicate] = value

    parameters = list(set(parameters))  # remove duplicates

    return Action(parameters, preconditions, effects, name=name)

def operator_learner_minimal_effects_grounded(predicates_t, predicates_t1, type_mapping, name=None):
    parameters = []
    preconditions = {}
    effects = {}

    for predicate, value in predicates_t.items():
        objects = extract_objects_from_predicate(predicate)
        #parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        #for obj in objects:
        #    parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
        preconditions[parametrized_predicate] = value

    for predicate, value in predicates_t1.items():
        if predicate not in predicates_t or predicates_t1[predicate] != predicates_t[predicate]:
            objects = extract_objects_from_predicate(predicate)
            parameters.extend([(obj, type_mapping[obj]) for obj in objects])
            parametrized_predicate = predicate
            #for obj in objects:
            #    parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
            effects[parametrized_predicate] = value

    parameters = list(set(parameters))  # remove duplicates

    return Action(parameters, preconditions, effects, name=name)

def operator_learner_nothing_grounded(predicates_t, predicates_t1, type_mapping, name=None):
    parameters = []
    preconditions = {}
    effects = {}

    for predicate, value in predicates_t.items():
        objects = extract_objects_from_predicate(predicate)
        parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        for obj in objects:
            parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
        preconditions[parametrized_predicate] = value

    for predicate, value in predicates_t1.items():
        #if predicate not in predicates_t or predicates_t1[predicate] != predicates_t[predicate]:
        objects = extract_objects_from_predicate(predicate)
        parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        for obj in objects:
            parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
        effects[parametrized_predicate] = value

    parameters = list(set(parameters))  # remove duplicates

    return Action(parameters, preconditions, effects, name=name)

def operator_learner_minimal_effects_grounded_preconditions(predicates_t, predicates_t1, type_mapping, name=None):
    parameters = []
    preconditions = {}
    effects = {}

    for predicate, value in predicates_t.items():
        objects = extract_objects_from_predicate(predicate)
        #parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        #for obj in objects:
        #    parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
        preconditions[parametrized_predicate] = value

    for predicate, value in predicates_t1.items():
        if predicate not in predicates_t or predicates_t1[predicate] != predicates_t[predicate]:
            objects = extract_objects_from_predicate(predicate)
            parameters.extend([(obj, type_mapping[obj]) for obj in objects])
            parametrized_predicate = predicate
            for obj in objects:
                parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj)
            effects[parametrized_predicate] = value

    parameters = list(set(parameters))  # remove duplicates

    return Action(parameters, preconditions, effects, name=name)


def numerical_operator_learner(s1, s2, type_mapping, predicates_type, name=None, negatives=False):
    parameters = []
    preconditions = {}
    effects = {}
    numerical_preconditions = {}
    numerical_effects = {}
    function_effects = {}
    type_id = {type: 0 for type in type_mapping.values()}
    obj_id = {obj: None for obj in type_mapping.keys()}

    for predicate, value in s1.items():
        predicate_name = predicate.split('(')[0]
        objects = extract_objects_from_predicate(predicate)
        #parameters.extend([(obj, type_mapping[obj]) for obj in objects])
        parametrized_predicate = predicate
        for obj in objects:
            if obj_id[obj] is None:
                obj_id[obj] = type_mapping[obj] + str(type_id[type_mapping[obj]])
                type_id[type_mapping[obj]] += 1
            parameters.extend([(obj_id[obj], type_mapping[obj])])
            parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj_id[obj])
        if predicates_type[predicate_name] == 'num':
            numerical_preconditions[parametrized_predicate] = value
        elif predicates_type[predicate_name] == 'bool':
            #if value:
            preconditions[parametrized_predicate] = bool(value)
            #elif predicate in s2.items() and s2[predicate]:
            #    preconditions[parametrized_predicate] = bool(value)

    # If any bool predicate in the precondition, remove all numerical preconditions that do no evaluate to 0
    if any([predicates_type[predicate.split('(')[0]] == 'bool' for predicate in preconditions.keys()]):
        numerical_preconditions = {k: v for k, v in numerical_preconditions.items() if v <= 3}

    for predicate, value in s2.items():
        predicate_name = predicate.split('(')[0]
        if predicates_type[predicate_name] == 'num':
            if predicate in s1:
                objects = extract_objects_from_predicate(predicate)
                #parameters.extend([(obj, type_mapping[obj]) for obj in objects])
                parametrized_predicate = predicate
                for obj in objects:
                    if obj_id[obj] is None:
                        obj_id[obj] = type_mapping[obj] + str(type_id[type_mapping[obj]])
                        type_id[type_mapping[obj]] += 1
                    #parameters.extend([(obj_id[obj], type_mapping[obj])])
                    parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj_id[obj])
                    # Check if all objects in the predicate have been parametrized (with a ?)
                change = value - s1[predicate]
                if change != 0:
                    function_effects[parametrized_predicate] = change
            else:
                objects = extract_objects_from_predicate(predicate)
                #parameters.extend([(obj, type_mapping[obj]) for obj in objects])
                parametrized_predicate = predicate
                for obj in objects:
                    if obj_id[obj] is None:
                        obj_id[obj] = type_mapping[obj] + str(type_id[type_mapping[obj]])
                        type_id[type_mapping[obj]] += 1
                    #parameters.extend([(obj_id[obj], type_mapping[obj])])
                    parametrized_predicate = replace_whole_word(parametrized_predicate, obj, '?' + obj_id[obj])
                    numerical_effects[parametrized_predicate] = value
        elif predicates_type[predicate_name] == 'bool':
            if predicate not in s1 or s1[predicate] != value:
                objects = extract_objects_from_predicate(predicate)
                parametrized_predicate = predicate
                for obj in objects:
                    if obj_id[obj] is None:
                        obj_id[obj] = type_mapping[obj] + str(type_id[type_mapping[obj]])
                        type_id[type_mapping[obj]] += 1
                    #parameters.extend([(obj_id[obj], type_mapping[obj])])
                    parametrized_predicate = replace_whole_word(predicate, obj, '?' + obj_id[obj])
                effects[parametrized_predicate] = bool(value)

    parameters = list(set(parameters))  # remove duplicates

    return Action(parameters, preconditions, effects, numerical_preconditions, numerical_effects, function_effects, name=name)

def merge_actions(a0, a1):
    a0_function_effects = {k: v for k, v in a0.function_effects.items() if k != 'total-cost'}
    a1_function_effects = {k: v for k, v in a1.function_effects.items() if k != 'total-cost'}
    ray_of_effect = 3
    # Check if the actions are of the same type and have the same effects
    if a0.effects == a1.effects and a0.numerical_effects == a1.numerical_effects and a0_function_effects==a1_function_effects:
        # Merge the preconditions
        merging_a0_a1 = all(item in a1.preconditions.items() for item in a0.preconditions.items()) and all(item in a1.numerical_preconditions.items() for item in a0.numerical_preconditions.items())
        merging_a1_a0 = all(item in a0.preconditions.items() for item in a1.preconditions.items()) and all(item in a0.numerical_preconditions.items() for item in a1.numerical_preconditions.items())
        if merging_a0_a1:
            merged_preconditions = a0.preconditions
            merged_numerical_preconditions = a0.numerical_preconditions
        elif merging_a1_a0:
            merged_preconditions = a1.preconditions
            merged_numerical_preconditions = a1.numerical_preconditions
        else:
            # Remove precondition that are not in both actions or that have different values
            merged_preconditions = {k: v for k, v in a0.preconditions.items() if k in a1.preconditions and a1.preconditions[k] == v}
            merged_numerical_preconditions = {k: v for k, v in a0.numerical_preconditions.items() if k in a1.numerical_preconditions and a1.numerical_preconditions[k] == v}
            #merged_preconditions = {**a0.preconditions, **a1.preconditions}
            #merged_numerical_preconditions = {**a0.numerical_preconditions, **a1.numerical_preconditions}
        merged_action = Action(a0.parameters, merged_preconditions, a0.effects, merged_numerical_preconditions, a0.numerical_effects, a0.function_effects, name=a0.name)
        return merged_action
    else:
        return False

def split_action(action):
    # Split the action into a list of actions with one effect each
    # Create a list of actions to store the split actions 
    actions = []
    cost = action.function_effects['total-cost']
    counter = 0
    # Iterate over the function effects
    for effect, value in action.function_effects.items():
        if effect == 'total-cost':
            continue
        # Create a copy of the action
        new_action = Action(action.parameters.copy(), action.preconditions.copy(), {}, name=action.name + '_' + str(counter))
        # Add the effect to the new action
        new_action.function_effects = {effect: value, 'total-cost': cost}
        # Add the new action to the list of actions
        actions.append(new_action)
        counter += 1
    for effect, value in action.numerical_effects.items():
        # Create a copy of the action
        new_action = Action(action.parameters.copy(), action.preconditions.copy(), {}, function_effects={'total-cost':cost}, name=action.name + '_' + str(counter))
        # Add the effect to the new action
        new_action.numerical_effects = {effect: value}
        # Add the new action to the list of actions
        actions.append(new_action)
        counter += 1
    for effect, value in action.effects.items():
        # Create a copy of the action
        new_action = Action(action.parameters.copy(), action.preconditions.copy(), {}, function_effects={'total-cost':cost}, name=action.name + '_' + str(counter))
        # Add the effect to the new action
        new_action.effects = {effect: value}
        # Add the new action to the list of actions
        actions.append(new_action)
        counter += 1
    return actions

def update_action_cost(action, cost):
    if type(action) == list:
        for a in action:
            a.function_effects.update({'total-cost': cost})
    else:
        action.function_effects.update({'total-cost': cost})
    return action


In [22]:
s1 = {'at(can,activate)': 1, 'at(can,drop)': 0, 'at(can,lightswitch)': 2, 'at(can,pick)': 4, 'grasped(can)':1.0, 'picked(can)':0.0, 'locked(door)':True}
s2 = {'at(can,activate)': 2, 'at(can,drop)': 0, 'at(can,lightswitch)': 2, 'at(can,pick)': 5, 'grasped(can)':0.0, 'picked(can)':0.0, 'locked(door)':False}
a0 = numerical_operator_learner(s1, s2, obj_types, predicates_type, name="a0")

In [23]:
print(a0.parameters)
print(a0.preconditions)
print(a0.numerical_preconditions)
print(a0.effects)
a0.function_effects.update({'total-cost':1})
print(a0.numerical_effects)
print(a0.function_effects, '\n')
print(a0._to_pddl())

[('location1', 'location'), ('location0', 'location'), ('object1', 'object'), ('object0', 'object'), ('location3', 'location'), ('location2', 'location')]
{'grasped(?object0)': True, 'picked(?object0)': False, 'locked(?object1)': True}
{'at(?object0,?location0)': 1, 'at(?object0,?location1)': 0}
{'grasped(?object0)': False, 'locked(?object1)': False}
{}
{'at(?object0,?location0)': 1, 'at(?object0,?location3)': 1, 'total-cost': 1} 

(:action a0
  :parameters (?location1 - location ?location0 - location ?object1 - object ?object0 - object ?location3 - location)
  :precondition (and (grasped ?object0) (not (picked ?object0)) (locked ?object1) (= (at ?object0 ?location0 ) 1) (= (at ?object0 ?location1 ) 0))
  :effect (and (not (grasped ?object0)) (not (locked ?object1)) (increase (at ?object0 ?location0 ) 1) (increase (at ?object0 ?location3 ) 1) (increase (total-cost) 1) ))



In [31]:
sa = {'at(can,activate)': 2, 'at(can,drop)': 0, 'at(can,lightswitch)': 4, 'at(can,pick)': 5, 'grasped(can)':1.0, 'picked(can)':0.0, 'locked(door)':True}
sb = {'at(can,activate)': 3, 'at(can,drop)': 0, 'at(can,lightswitch)': 4, 'at(can,pick)': 6, 'grasped(can)':0.0, 'picked(can)':0.0, 'locked(door)':False}
a1 = numerical_operator_learner(sa, sb, obj_types, predicates_type, name="a1")

In [32]:
print(a1.parameters)
print(a1.preconditions)
print(a1.numerical_preconditions)
print(a1.effects)
a1.function_effects.update({'total-cost':2})
print(a1.numerical_effects)
print(a1.function_effects)

[('location1', 'location'), ('location0', 'location'), ('object1', 'object'), ('object0', 'object'), ('location3', 'location'), ('location2', 'location')]
{'grasped(?object0)': True, 'picked(?object0)': False, 'locked(?object1)': True}
{'at(?object0,?location1)': 0}
{'grasped(?object0)': False, 'locked(?object1)': False}
{}
{'at(?object0,?location0)': 1, 'at(?object0,?location3)': 1, 'total-cost': 2}


In [33]:
print(a0._to_pddl())
print(a1._to_pddl())

(:action a0
  :parameters (?location1 - location ?location0 - location ?object1 - object ?object0 - object ?location3 - location)
  :precondition (and (grasped ?object0) (not (picked ?object0)) (locked ?object1) (= (at ?object0 ?location0 ) 1) (= (at ?object0 ?location1 ) 0))
  :effect (and (not (grasped ?object0)) (not (locked ?object1)) (increase (at ?object0 ?location0 ) 1) (increase (at ?object0 ?location3 ) 1) (increase (total-cost) 1) ))

(:action a1
  :parameters (?location1 - location ?location0 - location ?object1 - object ?object0 - object ?location3 - location)
  :precondition (and (grasped ?object0) (not (picked ?object0)) (locked ?object1) (= (at ?object0 ?location1 ) 0))
  :effect (and (not (grasped ?object0)) (not (locked ?object1)) (increase (at ?object0 ?location0 ) 1) (increase (at ?object0 ?location3 ) 1) (increase (total-cost) 2) ))



In [34]:
a0 == a1

True

In [35]:
# get a list of the types of a0.used_parameters
types = [type for param, type in a0.used_parameters]
# convert it to a string separated by spaces
types = a0.name + ' ' + ' '.join(types)
ground_with_types = a0.ground_action(action_string=types)
print(types)
ground_with_types = a0.ground_action(action_string=types)
print(ground_with_types._to_pddl())

a0 location location object object location
(:action a0
  :parameters ()
  :precondition (and (grasped object) (not (picked object)) (locked object) (= (at object location ) 0))
  :effect (and (not (grasped object)) (not (locked object)) (increase (at object location ) 1) (increase (total-cost) 1) ))



In [36]:
# get a list of the types of a0.used_parameters
types = [type for param, type in a1.used_parameters]
# convert it to a string separated by spaces
types = a1.name + ' ' + ' '.join(types)
ground_with_types = a1.ground_action(action_string=types)
print(types)
ground_with_types = a1.ground_action(action_string=types)
print(ground_with_types._to_pddl())

a1 location location object object location
(:action a1
  :parameters ()
  :precondition (and (grasped object) (not (picked object)) (locked object) (= (at object location ) 0))
  :effect (and (not (grasped object)) (not (locked object)) (increase (at object location ) 1) (increase (total-cost) 2) ))



In [37]:
a_merged = merge_actions(a0, a1)
print(a_merged)

(:action a0
  :parameters (?location1 - location ?location0 - location ?object1 - object ?object0 - object ?location3 - location)
  :precondition (and (grasped ?object0) (not (picked ?object0)) (locked ?object1) (= (at ?object0 ?location1 ) 0))
  :effect (and (not (grasped ?object0)) (not (locked ?object1)) (increase (at ?object0 ?location0 ) 1) (increase (at ?object0 ?location3 ) 1) (increase (total-cost) 1) ))



In [29]:
print(a0 == a1)
print(a0._ischeaper_(a1))

True


AttributeError: 'Action' object has no attribute '_ischeaper_'

In [164]:
# Use the function to merge the actions
merged_action = merge_actions(a0, a1)
print(merged_action._to_pddl())

True
{'at(?can,?activate)': 2, 'at(?can,?drop)': 0, 'at(?can,?lightswitch)': 2, 'at(?can,?pick)': 5}
(:action a0
  :parameters (?activate - location ?can - object ?drop - location ?lightswitch - location ?pick - location)
  :precondition (and (grasped ?can) (at(?can,?activate) 2) (at(?can,?drop) 0) (at(?can,?lightswitch) 2) (at(?can,?pick) 5))
  :effect (and (not (grasped ?can)) (at(can,activate) (increase (at(can,activate)) 1)) (total-cost 1)))



In [156]:
actions2 = load_action_from_file("./PDDL_files/domain.pddl", "./PDDL_files/problem.pddl")
for a in actions2:
    print(a.effects)

{('at_gripper', '?g', '?from')}
{('at_gripper', '?g', '?from'), ('at', '?o', '?from'), ('picked_up', '?o')}
{('at', '?obj', '?location'), ('at_gripper', '?gripper', '?location')}
{('at_gripper', '?gripper', '?location'), ('picked_up', '?obj')}
{'at_gripper(?g, ?to)': True, 'at_gripper(?g, ?from)': False}
{'at_gripper(?g, ?to)': True, 'at(?o, ?to)': True, 'at(?o, ?from)': False, 'at_gripper(?g, ?from)': False}
{'picked_up(?obj)': True}
{'picked_up(?obj)': False}


In [133]:
actions2.append(merged_action)

In [135]:
replace_actions_in_domain("./PDDL_files/domain.pddl", actions2)

(:action move
  :parameters (?from - location ?to - location ?g - gripper)
  :precondition (and (at_gripper ?g ?from) )
  :effect (and (at_gripper ?g ?to) (not (at_gripper ?g ?from)) ))

(:action move_obj
  :parameters (?from - location ?to - location ?g - gripper ?o - object)
  :precondition (and (at_gripper ?g ?from) (at ?o ?from) (picked_up ?o) )
  :effect (and (at_gripper ?g ?to) (at ?o ?to) (not (at ?o ?from)) (not (at_gripper ?g ?from)) ))

(:action pick
  :parameters (?obj - object ?location - location ?gripper - gripper)
  :precondition (and (at_gripper ?gripper ?location) (at ?obj ?location) (not (picked_up ?obj)) )
  :effect (and (picked_up ?obj) ))

(:action drop
  :parameters (?obj - object ?location - location ?gripper - gripper)
  :precondition (and (at_gripper ?gripper ?location) (picked_up ?obj) )
  :effect (and (not (picked_up ?obj)) ))

(:action a0
  :parameters (?activate - location ?can - object ?drop - location ?lightswitch - location ?pick - location)
  :precondit

In [165]:
state = {'at(can,activate)': 36, 'at(can,drop)': 14, 'at(can,lightswitch)': 67, 'at(can,pick)': 50, 'at_grab_level(gripper,can)': 1, 'at_gripper(gripper,activate)': 36, 'at_gripper(gripper,drop)': 18, 'at_gripper(gripper,lightswitch)': 66, 'at_gripper(gripper,pick)': 50, 'door_collision': 0, 'dropped_off': 0, 'grasped(can)': 100, 'light_off': 0, 'locked(door)': 0, 'open(door)': 0, 'open_gripper(gripper)': 0, 'over(gripper,can)': 0, 'picked_up(can)': 15}

In [230]:
import tempfile
from tarski.io.pddl.parser import PDDLparser
from tarski.io.pddl.instance import AssignmentEffectData
from tarski.syntax import CompoundFormula, Formula, Atom
from tarski.syntax.terms import CompoundTerm
from tarski.syntax.function import Function

# Extract preconditions
def extract_preconditions(formula):
    preconditions = {}
    if isinstance(formula, CompoundFormula):
        for subformula in formula.subformulas:
            preconditions.update(extract_preconditions(subformula))
    elif isinstance(formula, Atom):
        if formula.predicate.symbol == "=":
            preconditions[formula.terms[0].symbol] = formula.terms[1].value == 1
    return preconditions

def convert_action_data_to_action(action_data):
    # Extract parameters
    parameters = [(param.symbol, param.sort.name) for param in action_data.parameters]

    preconditions = extract_preconditions(action_data.pre)

    # Extract effects
    effects = {}
    numerical_effects = {}
    function_effects = {}
    for eff in action_data.post:
        if type(eff) == AssignmentEffectData:
            if isinstance(eff.lhs, str):
                lhs_symbol = eff.lhs
            else:
                lhs_symbol = eff.lhs.symbol.name
            if type(eff.rhs) == CompoundTerm:
                #print(type(eff.rhs.symbol.name.value))
                if eff.rhs.symbol.name.value in ['+', '-']:
                    print("True ", eff.rhs)
                    print("True ", eff.rhs.subterms[0].subterms)
                    try:
                        function_effects[eff.rhs.subterms[0].subterms.symbol] = int(eff.rhs.subterms[1].name)
                    except:
                        function_effects[eff.rhs.subterms[0].subterms] = int(eff.rhs.subterms[1].name)
            else:
                print(eff.rhs)
                effects[lhs_symbol] = eff.rhs
        else:
            numerical_effects[eff.lhs.symbol] = eff.rhs

    # Create Action object
    action = Action(parameters, preconditions, effects, numerical_preconditions=None, numerical_effects=numerical_effects, function_effects=function_effects, name=action_data.name)

    return action

def load_action_from_file(domain_file, problem_file, numerical=False):
    actions = []
    parser = PDDLparser(debug=False)
    with tempfile.NamedTemporaryFile() as f:
        parser.build(logfile=f.name)

        pddl_data = ""
        with open(domain_file) as instream:
            pddl_data += instream.read()
        with open(problem_file) as instream:
            pddl_data += instream.read()

        parser.parse(pddl_data)
        instance = parser.instance

        print("PDDL Types", instance.types)
        print("Functions", instance.functions)
        print("Actions", instance.actions)

        for action in instance.actions:
            structured_action = convert_action_data_to_action(action)
            actions.append(structured_action)

    return actions


In [231]:
actions_learned = load_action_from_file("../PDDL_files/domain_numerical.pddl", "../PDDL_files/problem_numerical.pddl")

Domain Parsed
PDDL Types OrderedDict([('object', Sort(object)), ('Object', Sort(Object)), ('item', Sort(item)), ('location', Sort(location)), ('gripper', Sort(gripper))])
Functions OrderedDict([('total_cost', total_cost/0), ('at-gripper', at-gripper/2), ('at', at/2), ('at-grab-level', at-grab-level/2), ('open', open/1)])
Actions [ActionData(name='a0', parameters=[?item0 (item), ?door0 (item)], pre=(=(door-collision(),0) and (=(dropped-off(),0) and (=(grasped(?item0),1) and (=(light-off(),0) and (=(locked(?door0),1) and =(picked-up(?item0),1)))))), post=[AssignmentEffectData(lhs=picked-up(?item0), rhs=0), AssignmentEffectData(lhs=total_cost(), rhs=+(total_cost(), 1))]), ActionData(name='a1', parameters=[?location1 (location), ?location2 (location), ?gripper0 (gripper), ?item0 (item), ?door0 (item)], pre=(=(door-collision(),0) and (=(dropped-off(),0) and (=(grasped(?item0),1) and (=(light-off(),0) and (=(locked(?door0),1) and =(picked-up(?item0),1)))))), post=[AssignmentEffectData(lhs=at

TypeError: unhashable type: 'Variable'

In [223]:
for act in actions_learned:
    print(act.function_effects)

{'total_cost': 1}
{'at': 1, 'at-gripper': 1, 'total_cost': 4}


In [195]:
a01 = Action({}, {'door_collision': False, 'dropped_off': False, 'grasped(?object0)': False, 'light_off': False, 'locked(?door0)': False, 'picked_up(?object0)': False}, {}, function_effects={'at_gripper(?gripper0,?location1)': -1, 'total-cost': 1})
a02 = Action({}, {'door_collision': False, 'dropped_off': False, 'grasped(?object0)': False, 'light_off': False, 'locked(?door0)': False, 'picked_up(?object0)': False}, {}, function_effects={'at_gripper(?gripper0,?location1)': -1, 'total-cost': 1})


In [196]:
a01 == a02

True