In [1]:
import sys
from unified_planning.shortcuts import *
from collections import defaultdict
from unified_planning.io import PDDLWriter

In [2]:
Location = UserType('Location')
Robot = UserType('Robot')

at = Fluent('at', BoolType(), robot=Robot, location=Location)
visited = Fluent('visited', BoolType(), robot=Robot, location=Location)
connected = Fluent('connected', BoolType(), l_from=Location, l_to=Location)

In [3]:
move = InstantaneousAction('move', robot=Robot, l_from=Location, l_to=Location)
robot = move.parameter('robot')
l_from = move.parameter('l_from')
l_to = move.parameter('l_to')
move.add_precondition(at(robot, l_from))
move.add_precondition(connected(l_from, l_to))
move.add_effect(at(robot, l_from), False)
move.add_effect(at(robot, l_to), True)
move.add_effect(visited(robot, l_to), True)

In [4]:
r1 = Object('r1', Robot)
NLOC = 10
locations = [Object('l%s' % i, Location) for i in range(NLOC)]

problem = Problem('robot_with_simulated_effects')
problem.add_fluent(at, default_initial_value=False)
problem.add_fluent(visited, default_initial_value=False)
problem.add_fluent(connected, default_initial_value=False)
problem.add_action(move)

problem.add_object(r1)
problem.add_objects(locations)

problem.set_initial_value(at(r1, locations[0]), True)
problem.set_initial_value(visited(r1, locations[0]), True)
for i in range(NLOC - 1):
    problem.set_initial_value(connected(locations[i], locations[i+1]), True)
problem.set_initial_value(connected(locations[4], locations[8]), True)

In [5]:
goals = {}
goals[visited(r1, locations[5])] = -5
goals[visited(r1, locations[7])] = 6
goals[visited(r1, locations[9])] = 10
#goals[visited(r1, locations[9])] = 20

problem.add_quality_metric(up.model.metrics.Oversubscription(goals))

In [6]:
with OneshotPlanner(name="oversubscription[symk]") as planner:
    result = planner.solve(problem)
    plan = result.plan
    assert plan is not None
    print("%s returned: %s" % (planner.name, result.plan))

OversubscriptionPlanner[SymK] returned: SequentialPlan:
    move(r1, l0, l1)
    move(r1, l1, l2)
    move(r1, l2, l3)
    move(r1, l3, l4)
    move(r1, l4, l5)
    move(r1, l5, l6)
    move(r1, l6, l7)
    move(r1, l7, l8)
    move(r1, l8, l9)


In [8]:
with OneshotPlanner(name="symk") as planner:
    result = planner.solve(problem)
    plan = result.plan
    assert plan is not None
    print("%s returned: %s" % (planner.name, result.plan))

[96m[1mNOTE: To disable printing of planning engine credits, add this line to your code: `up.shortcuts.get_environment().credits_stream = None`
[0m[96m  *** Credits ***
[0m[96m  * In operation mode `OneshotPlanner` at line 1 of `/tmp/ipykernel_78959/2499456800.py`, [0m[96myou are using the following planning engine:
[0m[96m  * Engine name: SymK
  * Developers:  David Speck (cf. https://github.com/speckdavid/symk/blob/master/README.md )
[0m[96m  * Description: [0m[96mSymK is a state-of-the-art domain-independent optimal and top-k planner.[0m[96m
[0m[96m
[0mSymK returned: SequentialPlan:
    move(r1, l0, l1)
    move(r1, l1, l2)
    move(r1, l2, l3)
    move(r1, l3, l4)
    move(r1, l4, l5)
    move(r1, l5, l6)
    move(r1, l6, l7)
    move(r1, l7, l8)
    move(r1, l8, l9)


In [9]:
with OneshotPlanner(name="symk", params={"plan_cost_bound": 7}) as planner:
    result = planner.solve(problem)
    plan = result.plan
    assert plan is not None
    print("%s returned: %s" % (planner.name, result.plan))

[96m  *** Credits ***
[0m[96m  * In operation mode `OneshotPlanner` at line 1 of `/tmp/ipykernel_78959/2446649554.py`, [0m[96myou are using the following planning engine:
[0m[96m  * Engine name: SymK
  * Developers:  David Speck (cf. https://github.com/speckdavid/symk/blob/master/README.md )
[0m[96m  * Description: [0m[96mSymK is a state-of-the-art domain-independent optimal and top-k planner.[0m[96m
[0m[96m
[0mSymK returned: SequentialPlan:
    move(r1, l0, l1)
    move(r1, l1, l2)
    move(r1, l2, l3)
    move(r1, l3, l4)
    move(r1, l4, l8)
    move(r1, l8, l9)


In [None]:
with OneshotPlanner(name="symk-opt", params={"plan_cost_bound": 200}) as planner:
    result = planner.solve(problem, output_stream=sys.stdout)
    plan = result.plan
    assert plan is not None
    print("%s returned: %s" % (planner.name, result.plan))

In [None]:
plans_by_utility = defaultdict(lambda: [])
pv = PlanValidator(problem_kind=problem.kind)

with AnytimePlanner(name='symk', params={"number_of_plans": 3}) as planner:
    for i, result in enumerate(planner.get_solutions(problem, output_stream=sys.stdout)): # output_stream=sys.stdout): 
        if result.status == up.engines.PlanGenerationResultStatus.INTERMEDIATE:
            pv_res = pv.validate(problem, result.plan)
            utility = pv_res.metric_evaluations[problem.quality_metrics[0]]
            plans_by_utility[utility].append(result.plan)
            print(f"{planner.name} found {i} plans...")
        elif result.status in unified_planning.engines.results.POSITIVE_OUTCOMES:
            print()
            print(f"{planner.name} found {i} plans!")
            for cost, plans in plans_by_utility.items():
                print(f"{planner.name} found {len(plans)} plans with utilites {cost}.")
        elif result.status not in unified_planning.engines.results.POSITIVE_OUTCOMES:
            print("No plan found.") 

In [None]:
plans_by_utility = defaultdict(lambda: [])
pv = PlanValidator(problem_kind=problem.kind)

with AnytimePlanner(name='symk-opt', params={"number_of_plans": 3}) as planner:
    for i, result in enumerate(planner.get_solutions(problem, output_stream=sys.stdout)): # output_stream=sys.stdout): 
        if result.status == up.engines.PlanGenerationResultStatus.INTERMEDIATE:
            pv_res = pv.validate(problem, result.plan)
            utility = pv_res.metric_evaluations[problem.quality_metrics[0]]
            plans_by_utility[utility].append(result.plan)
            print(f"{planner.name} found {i+1} plans...")
        elif result.status in unified_planning.engines.results.POSITIVE_OUTCOMES:
            print()
            print(f"{planner.name} found {i+1} plans!")
            for cost, plans in plans_by_utility.items():
                print(f"{planner.name} found {len(plans)} plans with utilites {cost}.")
        elif result.status not in unified_planning.engines.results.POSITIVE_OUTCOMES:
            print("No plan found.")

In [None]:
# Validator
# problem.clear_quality_metrics()
# problem.add_quality_metric(MinimizeSequentialPlanLength())
# pv = PlanValidator(problem_kind=problem.kind)
# pv_res = pv.validate(problem, result.plan)
# pv_res.metric_evaluations[problem.quality_metrics[0]]