In [1]:
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
from extract import LOCM
from utils import read_plan, read_json_file
from evaluator import ExecutabilityEvaluator
from collections import defaultdict

In [2]:
def test_exe(domain_name, traces, test_traces=None):
    try:
        locm = LOCM(state_param=True, viz=False)
        model = locm.extract_model(traces)
        
        pddl_domain = model.to_pddl_domain(domain_name)
        evaluator = ExecutabilityEvaluator(pddl_domain, debug=True)
        if test_traces is not None:
            exe = evaluator.get_overall_executability('l', test_traces[0], set(), set())
        else:
            exe = evaluator.get_overall_executability('l', traces[0], set(), set())
        if exe < 1:
            print(f"Warning! domain: {domain_name} - executability: {exe}")
        return exe
    except Exception as e:
        print(f"Error processing domain {domain_name}: {e}")
        return 0

In [3]:
def test_acceptance(domain_name,train_traces, test_traces):
    debug = {}
    try:
        locm = LOCM(state_param=True, viz=False, debug=debug)
        model = locm.extract_model(train_traces)
        learned_domain = model.to_pddl_domain(domain_name)

        evaluator = ExecutabilityEvaluator(learned_domain, debug=True)
        res = []
        for trace in test_traces:
            valid_acceptance, _ = evaluator.get_acceptance_rate(trace)
            res.append(valid_acceptance)
        if len(res) == 0:
            return 0
        return sum(res) / len(res)
    except Exception as e:
        print(f"Error processing domain {domain_name}: {e}")
        return 0,1
    

In [4]:
plan= "(unstack b3?object b2?object),(putdown b3?object),(unstack b2?object b1?object),(putdown b2?object),(pickup b2?object),(stack b2?object b3?object),(pickup b1?object),(stack b1?object b2?object)"
plan2 = "(pickup b2?object),(stack b2?object b1?object),(pickup b3?object),(stack b3?object b2?object)"

trace = read_plan(plan)
trace2 = read_plan(plan2)

for step in trace:
    print(step.action)

traces = [trace, trace2]
# exe = test_exe("blocksworld", traces)
# print(exe)
acc = test_acceptance("blocksworld", traces,traces)
print(acc)
# test_exe("test_bw", read_plan(plan2))

unstack b3 b2
putdown b3
unstack b2 b1
putdown b2
pickup b2
stack b2 b3
pickup b1
stack b1 b2
1.0


In [5]:
test_plan = "(pickup b2?object),(stack b2?object b1?object),(pickup b3?object),(stack b3?object b2?object)"
test_trace = read_plan(test_plan)
po_trace = test_trace.to_partial_ordered_trace(0.9)

test_plan2 = "(pickup b2?object),(stack b2?object b1?object),(unstack b3?object b4?object),(putdown b3?object)"
test_trace2 = read_plan(test_plan2)
po_trace2 = test_trace2.to_partial_ordered_trace(0.3)

# test_exe("plan1", trace)
test_acceptance("p2b", [po_trace, po_trace2], [test_trace])

1.0

In [5]:
train_traces = defaultdict(list)
data = read_json_file("../../data/training_data/traces_plan_po_r10.json")
for learning_obj in data:
    domain_name = learning_obj["domain"]
    train_traces[domain_name].append(learning_obj)

In [6]:
with open("./locm_self_acceptance_rate.csv", "w") as f:
    f.write("ID, Domain, len, len%, valid_exe, invalid_exe\n")
    for domain, items in train_traces.items():
        print(f"Testing domain: {domain}")
        for item in items:
            traces = item["traces"]

            try:
                acceptance_rate = test_acceptance(domain, traces, traces)
            except Exception as e:
                print(f"Error in balanced executability for domain {domain}: {e}")
                continue

        # Test balanced executability
            f.write(f"{item['id']}, {domain}, {item['total_length']}, {item['len%']}, {acceptance_rate}\n")
            f.flush()
            print(f"Balanced Executability for {domain}: {acceptance_rate}")

Testing domain: blocksworld
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{2, 3}, {1, 4, 5}, {6, 7}, {8}]
{unstack.0: (1-2), putdown.0: (3-4), pickup.0: (5-6), stack.0: (7-8)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unstack.0: (1-2), putdown.0: (3-4)}
0
Balanced Executability for blocksworld: 1.0
[{1, 4}, {2, 3}]
{unsta

In [7]:
# tmp = None
# for learning_obj in data:
#     if learning_obj['domain'] == 'childsnack':
#         if learning_obj['id'] == 5:
#             if learning_obj['total_length'] == 96:
#                 tmp = learning_obj
#                 break

In [None]:
# traces = tmp['traces']
# print(traces)
# locm = LOCM(state_param=True, viz=False,debug = {})
# model = locm.extract_model(traces)
# pddl_domain = model.to_pddl_domain("childsnack")
# pddl_domain.dump()