#### The code is expanding the work available in: https://github.com/pucrs-automated-planning/pddl-parser , and; Specific snippets (for heuristics and informed (Dijkstra, A*, Greedy Best First) and uninformed (DFS, BFS) search algorithms) can be also mutated from: https://github.com/APLA-Toolbox/PythonPDDL

#### Check URL: https://github.com/remykarem/py2pddl for dynamic PDDL-based planning (Dynamic Python to Dynamic PDDL regeneration)

In [1]:
### Install a pip package (e.g., numpy) in the current Jupyter kernel
#import sys
#!{sys.executable} -m pip install numpy

## PDDL Parser (only):

In [2]:
## running the PDDL.py main file from the terminal using python
#!python PDDL.py # examples/dwr/dwr.pddl examples/dwr/pb1.pddl

## PDDL Planner (baseline BFS planner):

In [3]:
#!python -B planner.py examples/dinner/dinner.pddl examples/dinner/pb1.pddl -v

In [4]:
def pause(variable): # function to pause and print values in the console 
    print("********************************")
    print(f"Debug: \n{type(variable)}\n\n{variable}\n")
    wait = input("Press Enter to continue!")
    print("********************************")

In [5]:
def validate_conditions(state, positive, negative): # function to validate if positive and negative conditions are valid at a given state
        return positive.issubset(state) and negative.isdisjoint(state)

In [6]:
def state_transition(state, positive, negative): # function to apply the transition of the state (activate positive and deactivate negative effects)
        return state.difference(negative).union(positive)

In [7]:
def grounding_all_actions(parser): # Grounding all actions to generate every valid instantiation 
                                   # (based on the number and types of objects defined in problem.pddl)
    
    grounded_actions = []
    for action in parser.actions:
        for act in action.groundify(parser.objects, parser.types):
            # parser.objects are all instantiated objects (e.g., crane: [a, b, c, d])
            # parser.types are all different types of instantiated objects (e.g., crane)
            grounded_actions.append(act)
            
    return grounded_actions

In [8]:
def applicable_actions(state, grounded_actions): # Listing  all grounded (existing) actions that are applicable
                                                 # at the current state
    
    applicable_actions = []
    for act in grounded_actions:
        positive = act.positive_preconditions
        negative = act.negative_preconditions
    
        if validate_conditions(state, positive, negative):
            applicable_actions.append(act) 
    
    return applicable_actions

In [9]:
def solve_BFS(parser, grounded_actions):
        
        # Parsed data (all three objects are type: frozenset i.e., immutable static objects)
        state = parser.state # initial problem.pddl state
        goal_pos = parser.positive_goals # goal state positive conditions
        goal_not = parser.negative_goals # goal state negative conditions
        
        # Check if the goal state has been reached (no planning is required)
        if validate_conditions(state, goal_pos, goal_not):
            print('\nInitial state meets the goal conditions!')
            return []
                               
        # Graph Search
        closed_set = set([state]) # we already checked if the initial state is the goal state so we consider it in the closed_set
        frontier_set = [state, None] # frontier_set is a list of: [ state, (action that led to this state, plan that led to this state from the root) ]
        
        while frontier_set: # while frontier_set is not empty
            
            # implementing Breadth-First-Non-Informed-Search (BFS) where the visited/closed states set is served in FIFO manner
            # since the state under consideration is popped(0) which is the oldest of the appended 'frontier' set 'new_states'
            state = frontier_set.pop(0) # popping out (remove and assign) the first (or appended) state from the frontier_set
            plan = frontier_set.pop(0) # popping out (remove and assign) the first (or appended) plan from the frontier_set i.e., None
            
            for act in grounded_actions: # iterate over all grounded actions
                
                if validate_conditions(state, act.positive_preconditions, act.negative_preconditions): # check if the grounded action is currently applicable
                    new_state = state_transition(state, act.add_effects, act.del_effects)
                    
                    if new_state not in closed_set: # check if the new_state has not been visited/evaluated already
                        if validate_conditions(new_state, goal_pos, goal_not): # check if the new state is the goal state
                            
                            full_plan = [ act ] # initialise the full plan with the last action
                            while plan:
                                act, plan = plan # iteratively unfolding the enveloped plan and the respective sequence of actions
                                full_plan.insert(0, act) # populate the full_plan 0-position entry with the previous action i.e., the sequence of actions led to the goal state

                            return full_plan
                        
                        closed_set.add(new_state) # appending (at the end of the list) the new_state (which is not the goal state)
                        frontier_set.append(new_state) # appending (at the end of the list) the new_state (which is not the goal state)
                        frontier_set.append((act, plan)) # appending the grounded action led to the new_state alongside with the up-to-date plan(t) = [ action(t), plan(t-1) ]
                    
        return None # return 'None' if the goal state has not been reached

In [10]:
def export_plan(plan, verbose = True):
    
    print('\n----------------------------')
    if type(plan) is list:
        print('Plan:')
        for act in plan:
            print(act if verbose else act.name + ' ' + ' '.join(act.parameters))
    else:
        print('No plan was found')
        exit(1)

In [11]:
def export_parser(parser):
    
    print('\n----------------------------')
    print('Domain name: ' + parser.domain_name)
    for act in parser.actions:
        print(act)
    print('Problem name: ' + parser.problem_name)
    print('Objects: ' + str(parser.objects))
    print('Types: ' + str(parser.types))
    print('Initial State: ' + str(parser.state))
    print('Positive goal conditions: ' + str(parser.positive_goals))
    print('Negative goal conditions: ' + str(parser.negative_goals))

In [12]:
def generate_plan(parser, solver='BFS'):
    import time
    #export_parser(parser)
    
    start_planner_time = time.time()
    grounded_actions = grounding_all_actions(parser)

    # Planner instantiation
    if solver == 'BFS':
        plan = solve_BFS(parser, grounded_actions)
    elif solver == 'DFS':
        print('No DFS planner found!\nProceeding with BFS!')
        plan = solve_BFS(parser)
        
    plan_time = time.time()

    export_plan(plan)
    print('Planner Time: ' + str(plan_time-start_planner_time) + 's\n')
    
    return plan, grounded_actions

In [13]:
def check_goal(parser, plan, step):
    # checking after new reached state (as the plan is being executed) whether goal has been achieved
    
    flag = False

    if validate_conditions(parser.state, parser.positive_goals, parser.negative_goals):
        flag = True
        print('\nReached state meets the goal state conditions:')
        print(parser.state)
        plan_length = len(plan)
        for counter in range(step+1,plan_length): # popping out (discarding) the last plan steps which
                                                  # were actually not executed to reach the goal
            plan.pop(-1)
            
    return plan, flag

In [14]:
def facts_check(parser, facts): # check whether a fact is valid/true in the currently reached state
    state = parser.state
    # state is a type of 'frozenset' and fact is a type of triplet-'tuple'
    
    # just to randomly sample one already existing fact in the state and validate code
    # from random import sample
    # facts_number = 1
    # facts = sample(state, facts_number)[0]
    
    if facts in state:
        flag = True
    else:
        flag = False
                    
    return flag

In [15]:
def facts_modify(parser, facts, mod_action): # modify a fact accordingly (discard or add) in the currently reached state
    
    state = parser.state
    
    if mod_action == 'rem':
        state.difference(facts) # discard fact from state frozenset
    elif mod_action == 'add':
        state.union(facts) # add fact in state frozenset
    
    parser.state = state
    
    return parser

In [16]:
def exrtact_parser_objects(parser): # extracting the objects defined/registered in the parser
    
    keys = parser.objects.keys()
    objects = []
    objects_set = set()

    for key in keys:
        temp_list = parser.objects.get(key)
        objects.extend(temp_list)
        objects_set = objects_set.union(set(temp_list))
        
    return keys, objects, objects_set

In [17]:
def objects_check(parser, items): # check whether an item is valid/true/exists in the declared objects set
    
    _, _, objects_set = exrtact_parser_objects(parser)
    # objects is a type of 'list-of-lists' and items is a value from a parser.objects.key()
    
    # just to randomly sample one already existing fact in the state and validate code
    # from random import sample
    # items_number = 1
    # items = sample(objects, items_number)[0]
    
    if set(items).intersection(objects_set) == set(items):
        flag = True # all items have been found in objects
    else:
        flag = False # not all items have been found in objects
    
    return flag

In [18]:
def objects_modify(parser, items, mod_action): # modify a fact accordingly (discard or add) in the currently reached state
    
    if objects_check(parser, items):
        
        keys, objects, objects_set = exrtact_parser_objects(parser)
        
        if mod_action == 'rem':
            for item in items:
                for key in keys:
                    operation_rem = set(parser.objects.get(key)).difference(set([item]))                    
                    parser.objects[key] = list(operation_rem) # discard fact from objects dictionary
        
        elif mod_action == 'add':
            for item in items:
                for key in keys:
                    operation_add = set(parser.objects.get(key)).union(set([item]))
                    parser.objects[key] = list(operation_add) # add fact from objects dictionary
        
    return parser # returning the modified parser

## Generating the initial plan

In [19]:
from PDDL import PDDL_Parser
import time

all_plans = []
domain = "examples/dwr/dwr.pddl"
problem = "examples/dwr/pb1.pddl"

start_time = time.time()
# Parser instantiation
parser = PDDL_Parser()
parser.parse_domain(domain)
parser.parse_problem(problem)
parse_time = time.time()
print('Parse Time: ' + str(parse_time-start_time) + 's\n')

# Generate plan
active_plan, grounded_actions = generate_plan(parser)
all_plans.append(active_plan)

Parse Time: 0.0016760826110839844s


----------------------------
Plan:
action: take
  parameters: ('k1', 'cc', 'cb', 'p1', 'l1')
  positive_preconditions: [['empty', 'k1'], ['in', 'cc', 'p1'], ['on', 'cc', 'cb'], ['top', 'cc', 'p1'], ['attached', 'p1', 'l1'], ['belong', 'k1', 'l1']]
  negative_preconditions: [['equal', 'cc', 'cb'], ['equal', 'cc', 'pallet']]
  add_effects: [['holding', 'k1', 'cc'], ['top', 'cb', 'p1']]
  del_effects: [['empty', 'k1'], ['in', 'cc', 'p1'], ['on', 'cc', 'cb'], ['top', 'cc', 'p1']]

action: load
  parameters: ('k1', 'r1', 'cc', 'l1')
  positive_preconditions: [['holding', 'k1', 'cc'], ['unloaded', 'r1'], ['at', 'r1', 'l1'], ['belong', 'k1', 'l1']]
  negative_preconditions: [['equal', 'cc', 'pallet']]
  add_effects: [['empty', 'k1'], ['loaded', 'r1', 'cc']]
  del_effects: [['holding', 'k1', 'cc'], ['unloaded', 'r1']]

action: move
  parameters: ('r1', 'l1', 'l2')
  positive_preconditions: [['at', 'r1', 'l1'], ['adjacent', 'l1', 'l2']]
  negative_preconditi

## Creating a dynamic re-planning framework

In [20]:
# function for advancing the system state by applying 'apply_plan_steps_to_replan' of the 'plan' action sequence
def progress_system_state(parser, plan, grounded_actions, simulation_step, apply_plan_steps_to_replan=1):    
    if apply_plan_steps_to_replan > len(plan):
        apply_plan_steps_to_replan = len(plan)

    if type(plan) is list:
        for step in range(0,apply_plan_steps_to_replan):
            act = plan[step] # applying the "optimal" action (according to the currently active_plan)
            # act = sample(applicable_actions(parser.state, grounded_actions),1)[0] # selecting a random (applicable) action
            
            simulation_step += 1
            
            print('\n**************************')
            print(f'Simulation Step: {simulation_step}')
            print('**************************\n')
            print('Applying Planned Action:')
            print(act)
            new_state = state_transition(parser.state, act.add_effects, act.del_effects)
            parser.state = new_state
            plan, goal_flag = check_goal(parser, plan, step)
            
            if goal_flag:
                return parser, simulation_step, plan
        
        print('\nReached state:')
        print(parser.state)
        
    return parser, simulation_step, []

In [21]:
def impose_stochastic_disturbance(parser, simulation_step, type_of_disturbance='none'):
    
    if type_of_disturbance == 'none':
        new_change_applied = 0
        return parser, new_change_applied
    else:
        new_change_applied = 1
    
    ## iterate the solve(domain, problem) function, over the optimal plan states / actions / steps 
    ## changing different aspects of the deterministic environment evolution:
    ## e.g., assume that the plan will be applied and change anything at the 2nd node/state of the plan in order to re-plan:
    ## different initial state (consider the refined current initial state to be valid with a specific likelihood) -> use a random sampler to choose between 
                ## state_transition(state, positiveA, negativeA) and state_transition(state, positiveB, negativeB) function;
    ## different number of same type parser.objects (simulate robot failures or new grid obstacles/blockades from accidents) -> extend parser.objects entries appropriately
    ## different goals (simulate dynamically changing goal) -> extend / refine the goal_pos and goal_not conditions appropriately
    # pause(parser.objects)
    
    while True: 
        selected_key = input(f'\nSelect key to adjust (or "none" to do nothing): {parser.objects.keys()}\n')
        if (selected_key == 'none'):
            return parser, new_change_applied
        
        if (selected_key in parser.objects.keys()):
            break
        else:
            print('Wrong input. Try again!')

    objects_in_key = parser.objects.get(selected_key)

    while True:
        object_adjustment_type = input(f'\nSelect to Remove, Add objects or Continue [rem/add/none]: {adjustment_type}\n')
        if (object_adjustment_type == 'none'):
            return parser, new_change_applied
        if object_adjustment_type == 'rem' or 'add':
            break
        else:
            print('Wrong input. Try again!')

    if adjustment_type == 'rem':
        while True:
            object_to_remove = input(f'\nSelect objects to remove: {objects_in_key}\n')
            if object_to_remove in objects_in_key:
                break
            else:
                print('Wrong input. Try again!')
            
            # ???????????????????   
            # search all objects and facts (initial state, current state, goal state) and remove any references
            # that relate to "object_to_remove"

    elif adjustment_type == 'add':
            # ???????????????????   
            # search all objects and facts (initial state, current state, goal state) and add any additional facts
            # that relate to "object_to_add", e.g., if a new 'container' is added then declaring its location is also needed
            print('elif cannot be left empty since otherwise the PY parser detects and error!')
            
    return parser, new_change_applied

In [22]:
from random import sample

apply_steps_until_next_replan = 1 # e.g., apply applied_plan_steps_to_replan=1 step(s) of the currently active plan 
                                  # and then replan considering as an initial state the current state
number_of_replanning_attempts = 100 # e.g., number of attempts to replan after 'apply_plan_steps_to_replan'
simulation_step = 0                 # initial value for the total simulation step

new_change_applied = 0            # flag that indicates a changes has been imposed at the current timestep (compared to previous timestep)
number_of_replanning_attempts = min(number_of_replanning_attempts, len(active_plan)) # making sure than the replanning 
                                                                    # attempts are always less than the initial plans' length

for replans in range(number_of_replanning_attempts):
    
    if active_plan:  # if active_plan is not empty
        
        parser, simulation_step, active_plan_head_used = progress_system_state(parser, active_plan, grounded_actions, simulation_step, apply_steps_until_next_replan)
        parser, new_change_applied = impose_stochastic_disturbance(parser, simulation_step, type_of_disturbance='none')
        
        # e.g., Remove selected items from already declared/registered objects in the parser
        # ??????????????????????????????
        #items = ['r1', 'l1']
        #objects_modify(parser, items, 'rem')
        # ??????????????????????????????
        
        # e.g., Remove or Make Negative all relevant facts with the removed items in the parser
        # ??????????????????????????????
        #facts = sample(parser.state, 1)[0]
        #if facts_check(parser.state, facts):
        #    facts_modify(parser.state, facts, mod_action = 'add')
        # ??????????????????????????????
        
        if active_plan_head_used: # if active_plan_tail is not empty then goal state has been unexpectedly reached before finishing the plan
            all_plans.pop(-1) # removing the entire plan that was most recently generated
            all_plans.append(active_plan_head_used) # to replace it with its head/fraction that was actually used to reach goal state
            break
            
        # print(f'Attempt to replan: {replans}')
        
        # replanning after applying actions from the previously generated plan
        if new_change_applied:
            active_plan, grounded_actions = generate_plan(parser)
            all_plans.append(active_plan) # replanning after applying actions based on the previously generated plan
        else:
            active_plan.pop(0)
            


**************************
Simulation Step: 1
**************************

Applying Planned Action:
action: take
  parameters: ('k1', 'cc', 'cb', 'p1', 'l1')
  positive_preconditions: [['empty', 'k1'], ['in', 'cc', 'p1'], ['on', 'cc', 'cb'], ['top', 'cc', 'p1'], ['attached', 'p1', 'l1'], ['belong', 'k1', 'l1']]
  negative_preconditions: [['equal', 'cc', 'cb'], ['equal', 'cc', 'pallet']]
  add_effects: [['holding', 'k1', 'cc'], ['top', 'cb', 'p1']]
  del_effects: [['empty', 'k1'], ['in', 'cc', 'p1'], ['on', 'cc', 'cb'], ['top', 'cc', 'p1']]


Reached state:
frozenset({('adjacent', 'l2', 'l1'), ('unloaded', 'r1'), ('in', 'cb', 'p1'), ('in', 'ce', 'q1'), ('belong', 'k2', 'l2'), ('equal', 'cd', 'cd'), ('equal', 'cc', 'cc'), ('on', 'cb', 'ca'), ('equal', 'pallet', 'pallet'), ('equal', 'cf', 'cf'), ('belong', 'k1', 'l1'), ('top', 'pallet', 'q2'), ('attached', 'q1', 'l1'), ('on', 'cd', 'pallet'), ('equal', 'ce', 'ce'), ('in', 'ca', 'p1'), ('occupied', 'l1'), ('holding', 'k1', 'cc'), ('attache