In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from extract.locm2 import AP, Hypothesis, LOCM2
from traces import *
from observation import ActionObservation
from extract.model import *

In [3]:
def get_example_obs_bw_2traces(print_trace=False):
    objects = {
        "A": PlanningObject("unknown", "A"),
        "B": PlanningObject("unknown", "B"),
        "C": PlanningObject("unknown", "C"),
    }

    actions = {
        "unstackAB": Action("unstack", [objects["A"], objects["B"]]),
        "unstackBC": Action("unstack", [objects["B"], objects["C"]]),
        "unstackCB": Action("unstack", [objects["C"], objects["B"]]),
        "unstackBA": Action("unstack", [objects["B"], objects["A"]]),
        "unstackAC": Action("unstack", [objects["A"], objects["C"]]),
        "unstackCA": Action("unstack", [objects["C"], objects["A"]]),
        "stackAB": Action("stack", [objects["A"], objects["B"]]),
        "stackBA": Action("stack", [objects["B"], objects["A"]]),
        "stackAC": Action("stack", [objects["A"], objects["C"]]),
        "stackCA": Action("stack", [objects["C"], objects["A"]]),
        "stackBC": Action("stack", [objects["B"], objects["C"]]),
        "stackCB": Action("stack", [objects["C"], objects["B"]]),
        "putdownA": Action("putdown", [objects["A"]]),
        "putdownB": Action("putdown", [objects["B"]]),
        "putdownC": Action("putdown", [objects["C"]]),
        "pickupA": Action("pickup", [objects["A"]]),
        "pickupB": Action("pickup", [objects["B"]]),
        "pickupC": Action("pickup", [objects["C"]]),
    }

    input_action_seqs = '''
        unstack(A, B), putdown(A), pickup(B), stack(B,C), unstack(B,C), stack(B,C),pickup(A), stack(A,B)
        unstack(C, B), putdown(C), unstack(B,A), putdown(B), pickup(B), stack(B,C), pickup(A), stack(A,B)
        unstack(C,B), stack(C,A), pickup(B), putdown(B), pickup(B), stack(B,C)
        '''
       
    traces = TraceList(
        [
            Trace(
                [
                    Step(State({}), actions["unstackAB"], 1),
                    Step(State({}), actions["putdownA"], 2),
                    Step(State({}), actions["pickupB"], 3),
                    Step(State({}), actions["stackBC"], 4),
                    Step(State({}), actions["unstackBC"], 5),
                    Step(State({}), actions["stackBC"], 6),
                    Step(State({}), actions["pickupA"], 7),
                    Step(State({}), actions["stackAB"], 8),
                ]
            ),
            Trace(
                [
                    Step(State({}),actions['unstackCB'], 1),
                    Step(State({}),actions['putdownC'], 2),
                    Step(State({}),actions['unstackBA'], 3),
                    Step(State({}),actions['putdownB'], 4),
                    Step(State({}),actions['pickupB'], 5),
                    Step(State({}),actions['stackBC'], 6),
                    Step(State({}),actions['pickupA'], 7),
                    Step(State({}),actions['stackAB'], 8),

                ]
            ),
            Trace(
                [
                    Step(State({}), actions["unstackCB"], 1),
                    Step(State({}), actions["stackCA"], 2),
                    Step(State({}), actions["pickupB"], 3),
                    Step(State({}), actions["putdownB"], 4),
                    Step(State({}), actions["pickupB"], 5),
                    Step(State({}), actions["stackBC"], 6),
                ]
            ),
            
        ]
    )
    

    if print_trace:
        # traces.print()
        traces.print("detail")

    obs = traces.tokenize(ActionObservation)
    return obs

In [23]:
def get_example_obs(print_trace=False, ex=1):
    objects = {
        "c1": PlanningObject("container", "c1"),
        "c2": PlanningObject("container", "c2"),
        "c3": PlanningObject("container", "c3"),
        "j1": PlanningObject("jack", "j1"),
        "j2": PlanningObject("jack", "j2"),
        "wr1": PlanningObject("wrench", "wr1"),
        "wr2": PlanningObject("wrench", "wr2"),
    }
    fluents = {
        "open1": Fluent("open", [objects["c1"]]),
        "open2": Fluent("open", [objects["c2"]]),
        "open3": Fluent("open", [objects["c3"]]),
        "j1in": Fluent("in", [objects["j1"], objects["c1"]]),
        "j2in": Fluent("in", [objects["j2"], objects["c2"]]),
        "wr1in": Fluent("in", [objects["wr1"], objects["c1"]]),
        "wr2in": Fluent("in", [objects["wr2"], objects["c2"]]),
    }
    actions = {
        "open1": Action("open", [objects["c1"]]),
        "open2": Action("open", [objects["c2"]]),
        "open3": Action("open", [objects["c3"]]),
        "close1": Action("close", [objects["c1"]]),
        "close2": Action("close", [objects["c2"]]),
        "close3": Action("close", [objects["c3"]]),
        "fetchj1": Action("fetch_jack", [objects["j1"], objects["c1"]]),
        "fetchj2": Action("fetch_jack", [objects["j2"], objects["c2"]]),
        "putj1": Action("putaway_jack", [objects["j1"], objects["c1"]]),
        "putj2": Action("putaway_jack", [objects["j2"], objects["c2"]]),
        "fetchwr1": Action("fetch_wrench", [objects["wr1"], objects["c1"]]),
        "fetchwr2": Action("fetch_wrench", [objects["wr2"], objects["c2"]]),
        "putwr1": Action("putaway_wrench", [objects["wr1"], objects["c1"]]),
        "putwr2": Action("putaway_wrench", [objects["wr2"], objects["c2"]]),
        "closewr": Action("close", [objects["wr1"]]),
    }

    if ex == 1:
        # open(c1); fetch jack(j1,c1); fetch wrench(wr1,c1); close(c1);
        # open(c2); fetch wrench(wr2,c2); fetch jack(j2,c2); close(c2);
        # open(c3); close(c3)
        states_true = [
            ["open3", "j1in", "j2in", "wr1in", "wr2in"],
            ["open3", "open1", "j1in", "j2in", "wr1in", "wr2in"],
            ["open3", "open1", "j2in", "wr1in", "wr2in"],
            ["open3", "open1", "j2in", "wr2in"],
            ["open3", "j2in", "wr2in"],
            ["open3", "open2", "j2in", "wr2in"],
            ["open3", "open2", "j2in"],
            ["open3", "open2"],
            ["open3"],
            [],
            ["open3"],
        ]
        states = [
            State({fluent: name in state_true for name, fluent in fluents.items()})
            for state_true in states_true
        ]
        traces = TraceList(
            [
                Trace(
                    [
                        Step(states[0], actions["open1"], 1),
                        Step(states[1], actions["fetchj1"], 2),
                        Step(states[2], actions["fetchwr1"], 3),
                        Step(states[3], actions["close1"], 4),
                        Step(states[4], actions["open2"], 5),
                        Step(states[5], actions["fetchwr2"], 6),
                        Step(states[6], actions["fetchj2"], 7),
                        Step(states[7], actions["close2"], 8),
                        Step(states[9], actions["close3"], 9),
                        Step(states[8], actions["open3"], 10),
                        Step(states[10], None, 11),
                        # Step(states[10], actions["closewr"], 11),
                        # Step(states[11], None, 12),
                    ]
                ),
            ]
        )
    else:
        # open(c1); putaway jack(j1,c1); close(c1); open(c2); putaway jack(j2,c2);
        # open(c1); fetch jack(j1,c1); fetch wrench(wr1,c1);
        # fetch jack(j2,c2); close(c1);
        states_true = [
            ["wr1in", "wr2in"],
            ["open1", "wr1in", "wr2in"],
            ["open1", "wr1in", "wr2in", "j1in"],
            ["wr1in", "wr2in", "j1in"],
            ["open2", "wr1in", "wr2in", "j1in"],
            ["open2", "wr1in", "wr2in", "j1in", "j2in"],
            ["open1", "open2", "wr1in", "wr2in", "j1in", "j2in"],
            ["open1", "open2", "wr1in", "wr2in", "j2in"],
            ["open1", "open2", "wr2in", "j2in"],
            ["open1", "open2", "wr2in"],
            ["open2", "wr2in"],
        ]
        states = [
            State({fluent: name in state_true for name, fluent in fluents.items()})
            for state_true in states_true
        ]
        traces = TraceList(
            [
                Trace(
                    [
                        Step(states[0], actions["open1"], 1),
                        Step(states[1], actions["putj1"], 2),
                        Step(states[2], actions["close1"], 3),
                        Step(states[3], actions["open2"], 4),
                        Step(states[4], actions["putj2"], 5),
                        Step(states[5], actions["open1"], 6),
                        Step(states[6], actions["fetchj1"], 7),
                        Step(states[7], actions["fetchwr1"], 8),
                        Step(states[8], actions["fetchj2"], 9),
                        Step(states[9], actions["close1"], 10),
                    ]
                ),
            ]
        )

    if print_trace:
        # traces.print()
        traces.print("details")

    obs = traces.tokenize(ActionObservation)
    return obs

In [26]:
obs_tracelist = get_example_obs_bw_2traces(True)
# obs_tracelist = get_example_obs(True, ex=2)


  warn(f'Invalid view {view}. Defaulting to "details".')











In [6]:
def test_get_sort(obs_tracelist, debug=False):
   

    sorts = LOCM2._get_sorts(obs_tracelist, debug)

    return sorts

In [7]:
sorts = test_get_sort(obs_tracelist, False)
print(sorts)

{'c2': 1, 'c1': 1, 'c3': 1, 'j2': 2, 'j1': 2, 'wr2': 3, 'wr1': 3}


In [8]:
AML, obj_traces_overall = LOCM2._locm2_step1(obs_tracelist, sorts, True)

Sort.0 AML:


Unnamed: 0,open.0,fetch_jack.0,fetch_wrench.0,close.0
open.0,0,1,1,0
fetch_jack.0,0,0,1,1
fetch_wrench.0,0,1,0,1
close.0,2,0,0,1


Sort.1 AML:


Unnamed: 0,open.1,fetch_jack.2,fetch_wrench.2,close.1
open.1,0,1,1,0
fetch_jack.2,0,0,1,1
fetch_wrench.2,0,1,0,1
close.1,1,0,0,0


Sort.2 AML:


Unnamed: 0,fetch_jack.1
fetch_jack.1,0


Sort.3 AML:


Unnamed: 0,fetch_wrench.1
fetch_wrench.1,0


In [9]:
AML_with_holes = LOCM2._locm2_step2(AML, True)

Sort.1 AML with holes:


  df1.iloc[idx2,col] = 'hole'
  df1.iloc[idx1, col] = 'hole'
  df1.iloc[idx2,col] = 'hole'


Unnamed: 0,open.1,fetch_jack.2,fetch_wrench.2,close.1
open.1,0,1,1,hole
fetch_jack.2,0,hole,1,1
fetch_wrench.2,0,1,hole,1
close.1,1,0,0,0


Sort.2 AML with holes:


Unnamed: 0,fetch_jack.1
fetch_jack.1,0


Sort.3 AML with holes:


Unnamed: 0,fetch_wrench.1
fetch_wrench.1,0


In [10]:
H_per_sort = LOCM2._locm2_step3(AML_with_holes, True)

#holes in Sort.1: 3
#holes in Sort.2: 0
#holes in Sort.3: 0


In [11]:
transitions_per_sort = LOCM2._locm2_step4(AML_with_holes)
consecutive_transitions_per_sort = LOCM2._locm2_step5(AML_with_holes)

In [12]:
S = LOCM2._locm2_step6(AML, H_per_sort, transitions_per_sort, consecutive_transitions_per_sort)

### Sort.1

no holes
[]

Removed redundancy transition set list
[]


#### Final transition set list

[{fetch_wrench.0, open.0, fetch_jack.0, close.0}]


### Sort.2

3 holes


#### Hole 1: {fetch_jack.2}

Checking candidate set *{fetch_wrench.2, fetch_jack.2}* of **Sort.2** for well formedness and Validity

Unnamed: 0,fetch_wrench.2,fetch_jack.2
fetch_wrench.2,0,1
fetch_jack.2,1,0


0 1
This subset is well-formed.
This subset is valid.
Adding this subset {fetch_wrench.2, fetch_jack.2} to the locm2 transition set.
Hole that is covered now:
[fetch_jack.2]


#### Hole 2: {open.1, close.1}

Checking candidate set *{fetch_jack.2, open.1, close.1}* of **Sort.2** for well formedness and Validity

Unnamed: 0,fetch_jack.2,open.1,close.1
fetch_jack.2,0,0,1
open.1,1,0,0
close.1,0,1,0


0 1
0 2
1 2
This subset is well-formed.
This subset is valid.
Adding this subset {fetch_jack.2, open.1, close.1} to the locm2 transition set.
Hole that is covered now:
[open.1, close.1]


#### Hole 3: {fetch_wrench.2}

Hole {fetch_wrench.2} is already covered.

[{fetch_wrench.2, fetch_jack.2}, {fetch_jack.2, open.1, close.1}]

Removed redundancy transition set list
[{fetch_wrench.2, fetch_jack.2}, {fetch_jack.2, open.1, close.1}]


#### Final transition set list

[{fetch_wrench.2, fetch_jack.2}, {fetch_jack.2, open.1, close.1}, {fetch_wrench.2, fetch_jack.2, open.1, close.1}]


### Sort.3

no holes
[]

Removed redundancy transition set list
[]


#### Final transition set list

[{fetch_jack.1}]


### Sort.4

no holes
[]

Removed redundancy transition set list
[]


#### Final transition set list

[{fetch_wrench.1}]


In [13]:
TS_overall, ap_state_pointers, OS = LOCM2._step1(obj_traces_overall, sorts, S, AML, True)

TS_overall: 
 [{FSM.0 @ Sort.0: {zero zero: [open.0, fetch_jack.0, fetch_wrench.0, close.0, open.0, fetch_wrench.0, fetch_jack.0, close.0, close.0, open.0]}, FSM.0 @ Sort.1: {container c1: [], container c2: [], container c3: []}, FSM.1 @ Sort.1: {container c1: [open.1, fetch_jack.2, close.1], container c2: [open.1, fetch_jack.2, close.1], container c3: [close.1, open.1]}, FSM.2 @ Sort.1: {container c1: [open.1, fetch_jack.2, fetch_wrench.2, close.1], container c2: [open.1, fetch_wrench.2, fetch_jack.2, close.1], container c3: [close.1, open.1]}, FSM.0 @ Sort.2: {jack j1: [], jack j2: []}, FSM.1 @ Sort.2: {jack j1: [], jack j2: []}, FSM.2 @ Sort.2: {jack j1: [], jack j2: []}, FSM.0 @ Sort.3: {wrench wr1: [fetch_wrench.1], wrench wr2: [fetch_wrench.1]}, FSM.1 @ Sort.3: {wrench wr1: [], wrench wr2: []}, FSM.2 @ Sort.3: {wrench wr1: [], wrench wr2: []}}]
ap_state_pointers: 
 defaultdict(<class 'dict'>, {FSM.0 @ Sort.0: {fetch_wrench.0: (1 -> 2), open.0: (3 -> 4), fetch_jack.0: (5 -> 6), cl

In [14]:
HS = LOCM2._step3(TS_overall, ap_state_pointers, OS, sorts, AML, True)

Learned HS:


In [15]:
Bindings = LOCM2._step4(HS, debug=True)

defaultdict(<class 'dict'>, {})


In [16]:
Bindings = LOCM2._step5(HS, Bindings, True)

In [17]:
fluents, actions = LOCM2._step7(OS, ap_state_pointers, sorts,Bindings,{}, debug=True)

bindings:
{}

fluents:
{(fsmFSM.1 @ Sort.1_state2 sort1),
 (fsmFSM.0 @ Sort.1_state0 sort1),
 (fsmFSM.1 @ Sort.1_state0 sort1),
 (fsmFSM.2 @ Sort.1_state1 sort1),
 (fsmFSM.0 @ Sort.3_state0 sort3),
 (fsmFSM.0 @ Sort.3_state1 sort3),
 (fsmFSM.0 @ Sort.2_state1 sort2),
 (fsmFSM.2 @ Sort.1_state0 sort1),
 (fsmFSM.0 @ Sort.2_state0 sort2),
 (fsmFSM.0 @ Sort.1_state1 sort1),
 (fsmFSM.1 @ Sort.1_state1 sort1)}

actions:
{(open sort1),
 (close sort1),
 (fetch_wrench sort3 sort1),
 (fetch_jack sort2 sort1)}



In [18]:
state_machines = LOCM2.get_state_machines(ap_state_pointers, OS, Bindings)
for sm in state_machines:
    sm.render(view=True)

In [19]:
model = Model(fluents, actions)

In [20]:
model.to_pddl('locm')

close
{(fsmFSM.1 @ Sort.1_state2 sort1), (fsmFSM.2 @ Sort.1_state1 sort1)}
[x0 (sort1)]
(fsmFSM.1 @ Sort.1_state2 sort1)
[0]
(fsmFSM.2 @ Sort.1_state1 sort1)
[0]
fetch_wrench
{(fsmFSM.2 @ Sort.1_state1 sort1), (fsmFSM.0 @ Sort.1_state0 sort1), (fsmFSM.0 @ Sort.3_state0 sort3)}
[x0 (sort3), x1 (sort1)]
(fsmFSM.2 @ Sort.1_state1 sort1)
[1]
(fsmFSM.0 @ Sort.1_state0 sort1)
[1]
(fsmFSM.0 @ Sort.3_state0 sort3)
[0]
fetch_jack
{(fsmFSM.2 @ Sort.1_state1 sort1), (fsmFSM.1 @ Sort.1_state0 sort1), (fsmFSM.0 @ Sort.1_state1 sort1), (fsmFSM.0 @ Sort.2_state0 sort2)}
[x0 (sort2), x1 (sort1)]
(fsmFSM.2 @ Sort.1_state1 sort1)
[1]
(fsmFSM.1 @ Sort.1_state0 sort1)
[1]
(fsmFSM.0 @ Sort.1_state1 sort1)
[1]
(fsmFSM.0 @ Sort.2_state0 sort2)
[0]
open
{(fsmFSM.2 @ Sort.1_state0 sort1), (fsmFSM.1 @ Sort.1_state1 sort1)}
[x0 (sort1)]
(fsmFSM.2 @ Sort.1_state0 sort1)
[0]
(fsmFSM.1 @ Sort.1_state1 sort1)
[0]
