# LTL-f BASED-TRACE ALIGNMENT

From constraint to LTL-f: http://www.diag.uniroma1.it/degiacom/papers/2014/AAAI14.pdf

From LTL-f to DFA: http://ltlf2dfa.diag.uniroma1.it/

From LTL-f to automaton: https://github.com/whitemech/logaut

LTL2DFA library: https://github.com/whitemech/LTLf2DFA/

In [1]:
import re
from typing import Match, cast #, Tuple

from ltlf2dfa.parser.ltlf import LTLfParser
from logaut.backends.common.process_mona_output import (
    parse_automaton,
    parse_mona_output,
)

'''from logaut import ltl2dfa
from pylogics.parsers import parse_ltl'''

'from logaut import ltl2dfa\nfrom pylogics.parsers import parse_ltl'

## Constraint automaton

**CONSTRAINTS**

- [x] Chain precedence activity 16 - 17
- [x] Existence activity 1
- [x] Precedence activity 9 -10
- [x] Responded existence activity 5 - 6
- [x] Chain response activity 14 - 15
- [x] Not co-existence activity 19 -20
- [x] Not succession activity 20 -21
- [x] Not chain succession activity 22 - 23
- [x] Response activity 11 - 12
- [x] Absence2 activity 2


In [2]:
def postprocess_output(output: str) -> str:
    """
    Post-process MONA output.
    Capture the output related to the MONA DFA transitions.
    :param: the raw output from the LTLf2DFA tool.
    :return: the output associated to the DFA.
    """
    regex = re.compile(
        r".*(?=\nFormula is (valid|unsatisfiable)|A counter-example)",
        flags=re.MULTILINE | re.DOTALL,
    )
    match = regex.search(output)
    if match is None:
        raise Exception("cannot find automaton description in MONA output.")
    return cast(Match, regex.search(output)).group(0)

In [3]:
parser = LTLfParser()

'''
constraint_formulas = {"existence":"F(a)", 
                       "absence2":"!(F((b & X(F(b)))))", 
                       "response":"G(k -> F(l))", 
                       "precedence":"((!(j) U i) | G(!(j)))",
                       "chain_response":"G((n -> X(o)))", 
                       "chain_precedence": "(!(q) & G((X(q) -> p)))",
                       "responded_existence":"(F(e) -> F(f))", 
                       "not_coexistence":"!((F(s) & F(t)))", 
                       "not_succession":"G((t -> !(F(u))))", 
                       "not_chain_succession":"G((v <-> !(X(w))))",
                       "formula_inventata":"(F(a) & F(b)) -> G(c)"}
'''
constraint_formulas = {"existence":"F(a)", 
                       "absence2":"!(F((b & X(F(b)))))"}

In [4]:
#constraint_formulas = {"existence":"F(a)","response":"G(k -> F(l))"}
index = 1

all_automata = {}

for type_constr,f in constraint_formulas.items():
    build_automaton = {}
    build_automaton["name"] = type_constr
    build_automaton["formula"] = f

    all_states_constr = []
    final_states_constr = []
    init_states_constr = []
    rejecting_states_constr = []
    rho_constr_basic = []
    rho_tobe_negated = []

    ## Parser + dfa #####################################################
    formula = parser(f)       # returns an LTLfFormula
    dfa = formula.to_dfa(mona_dfa_out=True)
    mona_output = parse_mona_output(postprocess_output(dfa))
    automaton = parse_automaton(mona_output)
    #automaton.to_graphviz().render("automata/"+type_constr+".dfa", view=True)

    ## save all what is needed ##########################################
    init_states_constr = 's1'
    
    states = set.union(mona_output.rejecting_states, mona_output.accepting_states)
    states.remove(0)
    states = list(states)
    for elem in states:
        all_states_constr.append('s'+str(elem))

    for elem in list(mona_output.accepting_states):
        final_states_constr.append('s'+str(elem))

    alphabet = mona_output.variable_names

    for key,elem in mona_output.transitions.items():
        s_start = key
        if(s_start != 0):
            for k,e in elem.items():
                s_end = k
                sum = 0
                if(s_end != s_start):
                    comb = list(e)[0]
                    for char in comb:
                        if(char!='X'):
                            sum += int(char)
                    if(sum > 1):
                        continue
                    elif(sum == 1):
                        index1 = comb.find('1')
                        rho_constr_basic.append("s"+str(s_start)+" "+alphabet[index1]+" s"+str(s_end))
                    elif(sum == 0):
                        indeces0 = [i for i in range(len(comb)) if comb[i] in '0']
                        res_list = [alphabet[i] for i in indeces0]
                        rho_tobe_negated.append([s_start,s_end,res_list])
                        


    build_automaton["all_states"] = all_states_constr
    build_automaton["final_states"] = final_states_constr
    build_automaton["init_state"] = init_states_constr
    build_automaton["transitions"] = rho_constr_basic
    build_automaton["symbols_constr"] = alphabet
    build_automaton["negated_transitions"] = rho_tobe_negated

    all_automata["a"+str(index)] = build_automaton

    index += 1

## Load xes file
containing the trace

In [5]:
log_path = "dataset/logs/synthetic-logs/10constraints/1-constraint-inverted/log-from-10constr-model-1constr_inverted-1-50.xes"

In [6]:
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
voc = {}
for i in range(len(alphabet)):
    voc[i+1] = alphabet[i]

def convertNumberToChar (val):
    return voc[val]

In [7]:
# Legge dal file .xes, estrapola le tracce, trasforma il valore delle tracce in caratteri,
# infine aggiungile al log sottoforma di stringhe
def readLog (log_path):
    # Inizializzazione variabili
    flag = False;           # Indica quando dobbiamo leggere un evento dal file
    trace = [];             # Lista di eventi sottoforma di interi
    traceChar = [];         # Lista di eventi sottoforma di char
    traceString = "";       # Stringa composta da eventi sottoforma di char
    log = []               # Lista di tracce ognuna delle quali è una stringa di char

    # Apriamo il file e leggiamolo riga per riga
    f = open  (log_path)
    f1 = f.readlines()
    
    # Per ogni riga del file...
    for x in f1:
        # Se c'è un evento, attiviamo la flag
        if (x.__contains__("<event>")):
            flag = True
        # Se flag attiva e siamo sulla riga dove è presente il nome dell'evento,
        # estrapoliamo il nome dell'evento e appendiamolo a trace
        if (flag and x.__contains__('<string key="concept:name"')):
            val = x.split('value="activity ',1)[1]
            val = val.split('"')[0]                
            trace.append(val)
            flag = False
        # Quando non ci sono più eventi possiamo lavorare sulla traccia in questione
        if (x.__contains__("</trace>")):
            for event in trace:
                traceChar.append(convertNumberToChar(int(event)))  # Converti gli eventi in char 
            traceString = "".join(traceChar)                       # Lista di eventi -> stringa
            log.append(traceString)                                # Appendi stringa a log

            # Inizializza nuovamente le variabili
            trace = []
            traceChar = []
            traceString = ""
    return log

In [8]:
log = readLog(log_path) # list of traces

#trace = log[2]
#trace = 'stkai'
#print (trace)

## Trace automaton

In [9]:
#traces_log = log
traces_log = ['ASTABI']

all_traces = []

for t in traces_log:
    trace = {}
    
    symbols_trace = list(set(t))
    
    ## Build trace automaton ###################################
    rho_trace_basic = []
    Q_trace = []
    for i in range(len(t)):
        Q_trace.append('t'+str(i))
        rho_trace_basic.append('t'+str(i)+" "+t[i] +" "+ 't'+str(i+1))

    Q_trace.append('t'+str(len(t)))
    init_state_trace = Q_trace[0]
    final_state_trace = Q_trace[-1]

    trace["symbols_trace"] = symbols_trace
    trace["init_state_trace"] = init_state_trace
    trace["final_state_trace"] = final_state_trace
    trace["Q_trace"] = Q_trace
    trace["rho_trace_basic"] = rho_trace_basic

    all_traces.append(trace)

    ## Now we add the transitions for the negated symbols ######

    for ID,automaton in all_automata.items():
        symb_constr = set(automaton["symbols_constr"])
        symb_trace = set(symbols_trace)
        all_symbs = list(set.union(symb_constr, symb_trace))

        trans = automaton["transitions"]
        for t_neg in automaton["negated_transitions"]:
            symbs = all_symbs.copy()
            for t in t_neg[2]:
                symbs.remove(t)
            for elem in symbs:
                if ("s"+str(t_neg[0])+" "+elem+" s"+str(t_neg[1]) not in trans):
                    trans.append("s"+str(t_neg[0])+" "+elem+" s"+str(t_neg[1]))

        automaton["transitions"] = trans

## Preliminary steps for PDDL

In [10]:
import random
trace = all_traces[random.randint(0,len(all_traces))]

#type_constraints = ['existence', 'absence2', 'response', 'precedence', 'chain_response','responded_existence', 'chain_precedence', 'not_coexistence', 'not_succession', 'not_chain_succession']
#type_constraints = ["existence","response"]

In [11]:
M = 0
for ID,a in all_automata.items():
    i = len(a['all_states'])
    if (i >= M):
        M = i
        index_max = ID
all_states_constr_max = all_automata[index_max]['all_states'] 

symbols_tot = []
for ID,a in all_automata.items():
    for s in a["symbols_constr"]:
        symbols_tot.append(s)
symbols_tot = list(set(symbols_tot).union(set(trace["symbols_trace"])))

## Planning domain D

In [12]:
domain_name = "domain_multi"
problem_name = "problem_multi"

In [13]:
pddl_domain_initial = "(define (domain "+domain_name+") "\
                "(:requirements :strips :typing :action-costs) "\
                "(:types trace_state automaton_state - state activity automaton_name) "

pddl_domain_predicates = "(:predicates (trace ?t1 - trace_state "\
                            "?e - activity "\
                            "?t2 - trace_state) "\
                            "(automaton ?a1 - automaton_name "\
                            "?s1 - automaton_state "\
                            "?e - activity "\
                            "?s2 - automaton_state) "\
                            "(cur_state ?a1 - automaton_name ?s - state) "\
                            "(final_state ?a1 - automaton_name ?s - state)) "


pddl_domain_actions =  "(:action sync "\
                        ":parameters (?t1 - trace_state ?e - activity ?t2 - trace_state) "\
                        ":precondition (and (cur_state ?a1 ?t1) (trace ?t1 ?e ?t2)) "\
                        ":effect(and (not (cur_state ?a1 ?t1)) (cur_state ?a1 ?t2) "\
                        "(forall (?a1 - automaton_name) "\
                        "(forall (?s1 ?s2 - automaton_state) "\
                        "(when (and (cur_state ?a1 ?s1) "\
                        "(automaton ?a1 ?s1 ?e ?s2)) "\
                        "(and (not (cur_state ?a1 ?s1)) "\
                        "(cur_state ?a1 ?s2))))))) "\
                        "(:action add "\
                        ":parameters (?e - activity) "\
                        ":effect (and (increase (total-cost) 1) "\
                        "(forall (?a1 - automaton_name) "\
                        "(forall (?s1 ?s2 - automaton_state) "\
                        "(when (and (cur_state ?a1 ?s1) "\
                        "(automaton ?a1 ?s1 ?e ?s2)) "\
                        "(and (not (cur_state ?a1 ?s1)) "\
                        "(cur_state ?a1 ?s2))))))) "\
                        "(:action del "\
                        ":parameters (?t1 - trace_state ?e - activity "\
                        "?t2 - trace_state) "\
                        ":precondition (and (cur_state ?a1 ?t1) (trace ?t1 ?e ?t2)) "\
                        ":effect (and (increase (total-cost) 1) "\
                        "(not (cur_state ?a1 ?t1)) (cur_state ?a1 ?t2))))"

pddl_domain = pddl_domain_initial + pddl_domain_predicates + pddl_domain_actions

print (pddl_domain)

(define (domain domain_multi) (:requirements :strips :typing :action-costs) (:types trace_state automaton_state - state activity automaton_name) (:predicates (trace ?t1 - trace_state ?e - activity ?t2 - trace_state) (automaton ?a1 - automaton_name ?s1 - automaton_state ?e - activity ?s2 - automaton_state) (cur_state ?a1 - automaton_name ?s - state) (final_state ?a1 - automaton_name ?s - state)) (:action sync :parameters (?t1 - trace_state ?e - activity ?t2 - trace_state) :precondition (and (cur_state ?a1 ?t1) (trace ?t1 ?e ?t2)) :effect(and (not (cur_state ?a1 ?t1)) (cur_state ?a1 ?t2) (forall (?a1 - automaton_name) (forall (?s1 ?s2 - automaton_state) (when (and (cur_state ?a1 ?s1) (automaton ?a1 ?s1 ?e ?s2)) (and (not (cur_state ?a1 ?s1)) (cur_state ?a1 ?s2))))))) (:action add :parameters (?e - activity) :effect (and (increase (total-cost) 1) (forall (?a1 - automaton_name) (forall (?s1 ?s2 - automaton_state) (when (and (cur_state ?a1 ?s1) (automaton ?a1 ?s1 ?e ?s2)) (and (not (cur_sta

## PDDL problem

In [14]:
pddl_problem_initial = "(define (problem "+problem_name+") (:domain "+domain_name+") "
pddl_problem_objects = "(:objects "

for q in trace["Q_trace"]:
    pddl_problem_objects += q+" "
pddl_problem_objects += "- trace_state "


for q in all_states_constr_max:
    pddl_problem_objects += q+" "
pddl_problem_objects += "- automaton_state "


for ID, a in all_automata.items():
    pddl_problem_objects += ID+" "
pddl_problem_objects += "- automaton_name "


for s in symbols_tot:
    pddl_problem_objects += s+" "
pddl_problem_objects += "- activity"

pddl_problem_objects += ") "


pddl_problem_init = "(:init (= (total-cost) 0) (cur_state "+trace["init_state_trace"]+") "
for t in trace["rho_trace_basic"]:
    pddl_problem_init += "(trace "+t+") "
pddl_problem_init += "(final_state "+trace["final_state_trace"]+") "

for ID, a in all_automata.items():
    pddl_problem_init += "(cur_state "+ID+" "+a["init_state"]+") " 
    for t in a["transitions"]:
        pddl_problem_init += "(automaton "+ID+" "+t+") "

    for i in range(len(a["final_states"])):
        pddl_problem_init += "(final_state "+ID+" "+a["final_states"][i]+") "
pddl_problem_init += ") "


pddl_problem_goal = "(:goal (forall (?a - automaton_name) (forall (?s - state) "\
                    "(imply (cur_state ?a ?s) (final_state ?a ?s))))) "
pddl_problem_metric = "(:metric minimize (total-cost)))"
pddl_problem = pddl_problem_initial + pddl_problem_objects + pddl_problem_init + pddl_problem_goal + pddl_problem_metric
print (pddl_problem)

(define (problem problem_multi) (:domain domain_multi) (:objects t0 t1 t2 t3 t4 t5 t6 - trace_state s1 s2 s3 - automaton_state a1 a2 - automaton_name B T I S A - activity) (:init (= (total-cost) 0) (cur_state t0) (trace t0 A t1) (trace t1 S t2) (trace t2 T t3) (trace t3 A t4) (trace t4 B t5) (trace t5 I t6) (final_state t6) (cur_state a1 s1) (automaton a1 s1 A s2) (final_state a1 s2) (cur_state a2 s1) (automaton a2 s1 B s2) (automaton a2 s2 B s3) (final_state a2 s1) (final_state a2 s2) ) (:goal (forall (?a - automaton_name) (forall (?s - state) (imply (cur_state ?a ?s) (final_state ?a ?s))))) (:metric minimize (total-cost)))


## Save .pddl files

In [15]:
file1 = open("PDDL/"+domain_name+".pddl", "w")
file1.write(pddl_domain)
file1.close()

file2 = open("PDDL/"+problem_name+".pddl", "w")
file2.write(pddl_problem)
file2.close()

Fast-downward planner:
`./fast-downward.py domain_multi.pddl problem_multi.pddl --search "astar(blind())"`