In [1]:
from itertools import product
import heapq
import json

In [2]:
class PlanningDomain:

    def __init__(self, actions=()):
        self.actions = tuple(actions)

    def generate_groundings(self, objects):
        grounded_actions = list()
        for action in self.actions:
            parameters = [objects[type_] for type_ in action.types]
            combinations = set()

            for param_set in product(*parameters):
                param_set_frozen = frozenset(param_set)

                # Skip if unique and not all parameters are distinct
                if action.unique and len(param_set_frozen) != len(param_set):
                    continue

                # Skip if same and parameters set is already used
                if action.same and param_set_frozen in combinations:
                    continue

                combinations.add(param_set_frozen)
                grounded_actions.append(action.generate_grounding(*param_set))

        return grounded_actions

In [3]:
class PlanningAction:

    def __init__(self, name, parameters=(), preconditions=(), effects=(), unique=False, same=False):
        self.name = name
        if len(parameters) > 0:
            self.types, self.arg_names = zip(*parameters)
        else:
            self.types = tuple()
            self.arg_names = tuple()
        self.preconditions = preconditions
        self.effects = effects
        self.unique = unique
        self.same = same

    def generate_grounding(self, *args):
        return GroundedPlanningAction(self, *args)

    def __str__(self):
        arg_list = ','.join(['%s' % pair for pair in zip(self.arg_names)])
        return '%s(%s)' % (self.name, arg_list)

In [4]:
def flatten_nested_list(nested_list):
    flat_list = []
    for item in nested_list:
        if isinstance(item, list):
            flat_list.extend(flatten_nested_list(item))
        else:
            flat_list.append(item)
    return flat_list

In [5]:
def create_planning_domain(data):
    actions = list()

    for action, params in data["action"].items():
        parameters = list()
        for type_, variable_list in params["parameters"].items():
            for variable in variable_list:
                parameters.append((type_, variable))

        preconditions = list()
        for name, predicates in params["precondition"].items():
            preconditions.append(tuple(flatten_nested_list([name, predicates])))

        effects = list()
        for name, predicates in params["effect"].items():
            effects.append(tuple(flatten_nested_list([name, predicates])))

        actions.append(PlanningAction(action, parameters, preconditions, effects))

    return PlanningDomain(actions)

In [6]:
class PlanningTask:

    def __init__(self, planning_domain, objects, initial_state=(), goal_state=()):
        self.grounded_actions = planning_domain.generate_groundings(objects)

        predicates = list(initial_state)
        functions = dict()
        self.initial_state = PlanningState(predicates, functions)

        self.goal_state = list(goal_state)

In [7]:
def apply_grounding(arg_names, args):
    name_arg_mapping = dict(zip(arg_names, args))

    def substitute_predicate(predicate):
        return predicate[0:1] + tuple(name_arg_mapping.get(arg, arg) for arg in predicate[1:])

    return substitute_predicate

In [8]:
class GroundedPlanningAction:

    def __init__(self, action, *args):
        self.name = action.name
        ground = apply_grounding(action.arg_names, args)

        self.written = ground((self.name,) + action.arg_names)

        self.preconditions = [ground(pre) for pre in action.preconditions]
        self.effects = [ground(effect) for effect in action.effects]

    def __str__(self):
        arg_list = ','.join(map(str, self.written[1:]))
        return '%s(%s)' % (self.written[0], arg_list)

In [9]:
def create_planning_problem(planning_domain, data):
    objects = dict()

    for type_, variables in data["objects"].items():
        objects[type_] = variables

    initial_state = list()
    for name, object_list in data["init"].items():
        for obj in object_list:
            initial_state.append(tuple(flatten_nested_list([name, obj])))

    goal_state = list()
    for name, object_list in data["goal"].items():
        for obj in object_list:
            goal_state.append(tuple(flatten_nested_list([name, obj])))

    return PlanningTask(planning_domain, objects, initial_state, goal_state)

In [10]:
class PlanningState:

    def __init__(self, predicates, functions, predecessor=None):
        self.predicates = frozenset(predicates)
        self.functions = tuple(functions.items())
        self.f_dict = functions
        self.predecessor = predecessor

    def apply(self, action):
        new_predicates = set(self.predicates)
        new_predicates |= set(action.effects)
        new_predicates -= set(action.preconditions)

        new_functions = dict()
        new_functions.update(self.functions)

        return PlanningState(new_predicates, new_functions, (self, action))

    def extract_plan(self):
        plan = list()
        current_state = self
        while current_state.predecessor is not None:
            plan.append(current_state.predecessor[1])
            current_state = current_state.predecessor[0]

        plan.reverse()
        return plan

    def __hash__(self):
        return hash((self.predicates, self.functions))

    def __eq__(self, other):
        return ((self.predicates, self.functions) ==
                (other.predicates, other.functions))

    def __lt__(self, other):
        return hash(self) < hash(other)

In [11]:
def null_heuristic():
    return 0

In [12]:
def is_goal_state_reached(state, goal_state):
    return all(pred in state.predicates for pred in goal_state) and sum(['holding' in item[0] for item in state.predicates]) < 3

In [13]:
def h_g_heuristic(state, new_state, goal_state):
    h_g_value = 0
    for pred in new_state.predicates:
        if pred in state.predicates:
            h_g_value += 1
        if pred in goal_state:
            h_g_value += 1
    return h_g_value


def astar(planning_problem, heuristic=h_g_heuristic, initial_state=None, goal_state=None):
    if heuristic is None:
        heuristic = null_heuristic
    if initial_state is None:
        initial_state = planning_problem.initial_state
    if goal_state is None:
        goal_state = tuple(planning_problem.goal_state)

    closed_states = set()
    fringe = [(heuristic(initial_state, PlanningState(list(), dict()), goal_state), initial_state)]
    heapq.heapify(fringe)

    while True:
        if len(fringe) == 0:
            return None

        current_heuristic, current_state = heapq.heappop(fringe)

        if is_goal_state_reached(current_state, goal_state):
            plan = current_state.extract_plan()
            return plan

        if current_state not in closed_states:
            closed_states.add(current_state)
            successor_states = set(current_state.apply(action)
                                   for action in planning_problem.grounded_actions
                                   if is_goal_state_reached(current_state, action.preconditions))

            for successor_state in successor_states:
                if successor_state not in closed_states:
                    heapq.heappush(fringe, (heuristic(current_state, successor_state, goal_state), successor_state))


In [14]:
# Load domain and task data from JSON files
with open('domain.json', 'r') as domain_file:
    domain_data = json.load(domain_file)

with open('Task1.json', 'r') as task_file:
    task_data = json.load(task_file)

In [15]:
# Create planning domain and problem
planning_domain = create_planning_domain(domain_data)
planning_problem = create_planning_problem(planning_domain, task_data)

In [16]:
# Print actions and action combinations
print("Actions:")
[print(action.__str__()) for action in planning_domain.actions]

Actions:
pick-up(x)
put-down(x)
stack(x,y)
unstack(x,y)


[None, None, None, None]

In [17]:
print("\nAction Combinations:")
[print(grounded_action.__str__()) for grounded_action in planning_problem.grounded_actions]


Action Combinations:
pick-up(D)
pick-up(B)
pick-up(A)
pick-up(C)
put-down(D)
put-down(B)
put-down(A)
put-down(C)
stack(D,D)
stack(D,B)
stack(D,A)
stack(D,C)
stack(B,D)
stack(B,B)
stack(B,A)
stack(B,C)
stack(A,D)
stack(A,B)
stack(A,A)
stack(A,C)
stack(C,D)
stack(C,B)
stack(C,A)
stack(C,C)
unstack(D,D)
unstack(D,B)
unstack(D,A)
unstack(D,C)
unstack(B,D)
unstack(B,B)
unstack(B,A)
unstack(B,C)
unstack(A,D)
unstack(A,B)
unstack(A,A)
unstack(A,C)
unstack(C,D)
unstack(C,B)
unstack(C,A)
unstack(C,C)


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [18]:
print("\nPlan using A*:")
[print(action.__str__()) for action in astar(planning_problem)]


Plan using A*:
pick-up(B)
pick-up(C)
stack(B,A)
stack(C,B)
pick-up(D)
stack(D,C)


[None, None, None, None, None, None]