In [27]:
# -*- coding: utf-8 -*-
"""
Script integrado: construção de log/modelo, execução do Token-Based Replay (TBR)
e impressão de resultados de forma formatada (resumo, detalhado e Markdown).
"""

from typing import Any, Dict, Iterable, List, Tuple

from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.petri_net.utils import petri_utils
from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay

# ---------------------------------------------------------------------------
# 1) UTILITÁRIOS DE FORMATAÇÃO
# ---------------------------------------------------------------------------


def _fmt_scalar(x: Any) -> str:
    if x is None:
        return "—"
    if isinstance(x, float):
        return f"{x:.6g}"
    return str(x)


def _fmt_iter(xs: Iterable[Any]) -> str:
    try:
        return ", ".join(_fmt_scalar(x) for x in xs)
    except TypeError:
        return _fmt_scalar(xs)


def _take(d: Dict[str, Any], keys: List[str]) -> Dict[str, Any]:
    return {k: d.get(k) for k in keys}


def pretty_print_tbr(replay_result: List[Dict[str, Any]], title: str = "") -> None:
    """Imprime resumo tabular e bloco detalhado por traço."""
    if title:
        print(f"\n=== {title} ===")
    if not replay_result:
        print("Resultado vazio.")
        return

    # Resumo por traço
    header = [
        "trace",
        "trace_is_fit",
        "trace_fitness",
        "missing_tokens",
        "remaining_tokens",
        "consumed_tokens",
        "produced_tokens",
    ]
    widths = [max(len(h), 5) for h in header]
    fmt_row = " | ".join(f"{{:{w}}}" for w in widths)
    sep = "-+-".join("-" * w for w in widths)

    print("\n--- RESUMO ---")
    print(fmt_row.format(*header))
    print(sep)

    for i, r in enumerate(replay_result, start=1):
        row = [
            i,
            _fmt_scalar(r.get("trace_is_fit")),
            _fmt_scalar(r.get("trace_fitness")),
            _fmt_scalar(r.get("missing_tokens")),
            _fmt_scalar(r.get("remaining_tokens")),
            _fmt_scalar(r.get("consumed_tokens")),
            _fmt_scalar(r.get("produced_tokens")),
        ]
        print(fmt_row.format(*row))

    # Detalhado
    print("\n--- DETALHADO ---")
    for i, r in enumerate(replay_result, start=1):
        print(f"\nTrace {i}")
        main_keys = [
            "trace_is_fit",
            "trace_fitness",
            "missing_tokens",
            "remaining_tokens",
            "consumed_tokens",
            "produced_tokens",
        ]
        for k, v in _take(r, main_keys).items():
            print(f"  {k:>22}: {_fmt_scalar(v)}")

        for k in ["reached_marking", "enabled_transitions_in_marking"]:
            if k in r:
                print(f"  {k:>22}: {_fmt_iter(r[k])}")

        for k in ["activated_transitions", "transitions_with_problems"]:
            if k in r:
                print(f"  {k:>22}: {_fmt_iter(r[k])}")


def tbr_to_markdown_table(replay_result: List[Dict[str, Any]]) -> str:
    """Gera tabela Markdown com as métricas principais por traço."""
    lines = [
        "| trace | trace_is_fit | trace_fitness | missing | remaining | consumed | produced |",
        "|---:|:---:|---:|---:|---:|---:|---:|",
    ]
    for i, r in enumerate(replay_result, start=1):
        row = [
            str(i),
            _fmt_scalar(r.get("trace_is_fit")),
            _fmt_scalar(r.get("trace_fitness")),
            _fmt_scalar(r.get("missing_tokens")),
            _fmt_scalar(r.get("remaining_tokens")),
            _fmt_scalar(r.get("consumed_tokens")),
            _fmt_scalar(r.get("produced_tokens")),
        ]
        lines.append("| " + " | ".join(row) + " |")
    return "\n".join(lines)


def save_markdown_table(replay_result: List[Dict[str, Any]], path: str) -> None:
    md = tbr_to_markdown_table(replay_result)
    with open(path, "w", encoding="utf-8") as f:
        f.write(md)
    print(f"Tabela Markdown salva em: {path}")


# ---------------------------------------------------------------------------
# 2) CONSTRUÇÃO DO LOG SINTÉTICO
# ---------------------------------------------------------------------------


def build_tiny_log() -> EventLog:
    """
    Constrói um log com 2 trilhas:
      - Trace1: A -> B -> C (conforme ao modelo linear)
      - Trace2: A -> C      (desvio: pula B)
    """
    log = EventLog()

    trace1 = Trace()
    trace1.append(Event({"concept:name": "A"}))
    trace1.append(Event({"concept:name": "B"}))
    trace1.append(Event({"concept:name": "C"}))
    log.append(trace1)

    trace2 = Trace()
    trace2.append(Event({"concept:name": "A"}))
    trace2.append(Event({"concept:name": "C"}))
    log.append(trace2)

    return log


# ---------------------------------------------------------------------------
# 3) CONSTRUÇÃO DO MODELO (REDE DE PETRI)
# ---------------------------------------------------------------------------


def build_linear_abc(
    use_tau_and_pend: bool = False,
) -> Tuple[PetriNet, Marking, Marking]:
    """
    Constrói a rede linear:
        p_start -> A -> p_a -> B -> p_b -> C -> p_c [-> tau_end -> p_end (opcional)]
    Se 'use_tau_and_pend' = False:
        final_marking = {p_c: 1}  (traço A-B-C é perfeitamente conforme)
    Se 'use_tau_and_pend' = True:
        adiciona tau_end e p_end, e exige final_marking = {p_end: 1}
        (o TBR pode ou não disparar τ para fechar; resultado depende da variante)
    """
    net = PetriNet("Exemplo")

    # Lugares
    p_start = PetriNet.Place("p_start")
    p_a = PetriNet.Place("p_a")
    p_b = PetriNet.Place("p_b")
    p_c = PetriNet.Place("p_c")
    net.places.update({p_start, p_a, p_b, p_c})

    # Transições observáveis
    t_a = PetriNet.Transition("A", "A")
    t_b = PetriNet.Transition("B", "B")
    t_c = PetriNet.Transition("C", "C")
    net.transitions.update({t_a, t_b, t_c})

    # Conectividade principal
    petri_utils.add_arc_from_to(p_start, t_a, net)
    petri_utils.add_arc_from_to(t_a, p_a, net)
    petri_utils.add_arc_from_to(p_a, t_b, net)
    petri_utils.add_arc_from_to(t_b, p_b, net)
    petri_utils.add_arc_from_to(p_b, t_c, net)
    petri_utils.add_arc_from_to(t_c, p_c, net)

    # Marcação inicial
    im = Marking({p_start: 1})

    if not use_tau_and_pend:
        # Sem τ: exige terminar em p_c
        fm = Marking({p_c: 1})
        return net, im, fm

    # Com τ e p_end explícitos
    t_end = PetriNet.Transition("tau_end", None)
    net.transitions.add(t_end)
    p_end = PetriNet.Place("p_end")
    net.places.add(p_end)
    petri_utils.add_arc_from_to(p_c, t_end, net)
    petri_utils.add_arc_from_to(t_end, p_end, net)

    fm = Marking({p_end: 1})
    return net, im, fm


# ---------------------------------------------------------------------------
# 4) EXECUÇÃO DO TBR E IMPRESSÃO
# ---------------------------------------------------------------------------


def run_and_report(
    log: EventLog, net: PetriNet, im: Marking, fm: Marking, title: str
) -> List[Dict[str, Any]]:
    """Executa TBR, imprime resultados formatados e retorna o replay_result."""
    replay_result = token_based_replay.apply(log, net, im, fm)
    pretty_print_tbr(replay_result, title=title)
    print("\n--- TABELA (Markdown) ---")
    print(tbr_to_markdown_table(replay_result))
    return replay_result


# ---------------------------------------------------------------------------
# 5) MAIN
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    log = build_tiny_log()

    # Variante 1: sem τ; final em p_c (traço A-B-C deve ter fitness = 1)
    net1, im1, fm1 = build_linear_abc(use_tau_and_pend=False)
    run_and_report(
        log, net1, im1, fm1, title="VARIANTE 1 — Sem τ, final_marking = {p_c:1}"
    )

    # Variante 2: com τ e p_end; final em p_end (resultado depende se a τ é disparada no fechamento)
    net2, im2, fm2 = build_linear_abc(use_tau_and_pend=True)
    run_and_report(
        log,
        net2,
        im2,
        fm2,
        title="VARIANTE 2 — Com τ explícita, final_marking = {p_end:1}",
    )

replaying log with TBR, completed traces :: 100%|██████████| 2/2 [00:00<00:00, 9565.12it/s]



=== VARIANTE 1 — Sem τ, final_marking = {p_c:1} ===

--- RESUMO ---
trace | trace_is_fit | trace_fitness | missing_tokens | remaining_tokens | consumed_tokens | produced_tokens
------+--------------+---------------+----------------+------------------+-----------------+----------------
    1 | True         | 1             | 0              | 0                | 4               | 4              
    2 | False        | 0.666667      | 1              | 1                | 3               | 3              

--- DETALHADO ---

Trace 1
            trace_is_fit: True
           trace_fitness: 1
          missing_tokens: 0
        remaining_tokens: 0
         consumed_tokens: 4
         produced_tokens: 4
         reached_marking: p_c
  enabled_transitions_in_marking: 
   activated_transitions: (A, 'A'), (B, 'B'), (C, 'C')
  transitions_with_problems: 

Trace 2
            trace_is_fit: False
           trace_fitness: 0.666667
          missing_tokens: 1
        remaining_tokens: 1
         consu

replaying log with TBR, completed traces :: 100%|██████████| 2/2 [00:00<00:00, 8516.35it/s]


=== VARIANTE 2 — Com τ explícita, final_marking = {p_end:1} ===

--- RESUMO ---
trace | trace_is_fit | trace_fitness | missing_tokens | remaining_tokens | consumed_tokens | produced_tokens
------+--------------+---------------+----------------+------------------+-----------------+----------------
    1 | True         | 1             | 0              | 0                | 5               | 5              
    2 | False        | 0.75          | 1              | 1                | 4               | 4              

--- DETALHADO ---

Trace 1
            trace_is_fit: True
           trace_fitness: 1
          missing_tokens: 0
        remaining_tokens: 0
         consumed_tokens: 5
         produced_tokens: 5
         reached_marking: p_end
  enabled_transitions_in_marking: 
   activated_transitions: (A, 'A'), (B, 'B'), (C, 'C'), (tau_end, None)
  transitions_with_problems: 

Trace 2
            trace_is_fit: False
           trace_fitness: 0.75
          missing_tokens: 1
        remaini




In [2]:
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Dict, Iterable, List, Tuple, Union, Mapping, Sequence
from datetime import datetime, timedelta, timezone

from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.log.exporter.xes import exporter as xes_exporter

ActivitySeq = Union[str, Sequence[str]]
FreqTable = Union[
    Mapping[Tuple[str, ...], int],  # {('a','c','d'): 10, ...}
    Iterable[Tuple[int, ActivitySeq]],  # [(10, "a,c,d"), (5, ('a','b')),...]
]


def _parse_trace(trace: ActivitySeq) -> Tuple[str, ...]:
    """Aceita 'a,c,d' | '(a, c, d, e, h)' | ['a','c','d'] e devolve ('a','c','d')."""
    if isinstance(trace, (list, tuple)):
        return tuple(str(x).strip() for x in trace)
    s = str(trace).strip()
    # remove parênteses e espaços, separa por vírgula
    if s.startswith("(") and s.endswith(")"):
        s = s[1:-1]
    parts = [p.strip() for p in s.split(",") if p.strip()]
    return tuple(parts)


def _normalize_freq_table(table: FreqTable) -> List[Tuple[Tuple[str, ...], int]]:
    """Converte qualquer formato aceito para lista padronizada [(('a','b'), 3), ...]."""
    if isinstance(table, Mapping):
        return [(tuple(k), int(v)) for k, v in table.items()]
    out: List[Tuple[Tuple[str, ...], int]] = []
    for freq, tr in table:
        out.append((_parse_trace(tr), int(freq)))
    return out


def build_xes_from_frequencies(
    freq_table: FreqTable,
    out_path: str,
    *,
    # como a tabela original: a,b,c,d,e,f,g,h
    activity_labels: Mapping[str, str] | None = None,
    # geração opcional de timestamps sintéticos
    add_timestamps: bool = True,
    base_time: datetime | None = None,
    delta_between_cases: timedelta = timedelta(minutes=3),
    delta_between_events: timedelta = timedelta(seconds=15),
    # políticas de nomes de cases
    case_prefix: str = "σ",
    keep_activity_letters_in_concept_name: bool = True,
) -> EventLog:
    """
    Gera um XES a partir de uma tabela de frequências de traços.
    - freq_table: mapeamento ou lista com (freq, trace)
    - out_path: caminho do arquivo .xes a ser salvo
    - activity_labels: mapeia 'a'->'register request', etc. (opcional)
    - add_timestamps: se True, cria timestamps sintéticos e ciclo de vida 'complete'
    - keep_activity_letters_in_concept_name: se True, 'concept:name' do evento = letra (ex.: 'a').
      O rótulo semântico, se fornecido, vai em 'activity:label'.
    Retorna o EventLog criado.
    """
    rows = _normalize_freq_table(freq_table)
    log = EventLog()

    # base temporal
    if add_timestamps:
        if base_time is None:
            base_time = datetime.now(timezone.utc).replace(microsecond=0)
        current_case_start = base_time
    else:
        current_case_start = None  # type: ignore

    case_counter = 0
    for trace_idx, (activities, freq) in enumerate(rows, start=1):
        for rep in range(freq):
            case_counter += 1
            tr = Trace()
            # id do case no nível de traço (XES: concept:name no trace)
            tr.attributes["concept:name"] = f"{case_prefix}{case_counter}"

            # gerar eventos
            if add_timestamps:
                t0 = current_case_start
            for pos, act in enumerate(activities):
                # concept:name do evento = letra por padrão (importante para alinhamento)
                ev_name = (
                    act
                    if keep_activity_letters_in_concept_name
                    else activity_labels.get(act, act) if activity_labels else act
                )
                e = Event({"concept:name": ev_name})

                # rótulo semântico adicional (opcional)
                if activity_labels:
                    e["activity:label"] = activity_labels.get(act, act)

                if add_timestamps:
                    e["time:timestamp"] = t0 + pos * delta_between_events  # type: ignore
                    e["lifecycle:transition"] = "complete"

                tr.append(e)

            log.append(tr)

            if add_timestamps:
                current_case_start += (
                    delta_between_cases  # próxima instância começa depois
                )

    # exporta
    xes_exporter.apply(log, out_path)
    return log

In [3]:
ACT_LABELS = {
    "a": "register request",
    "b": "examine thoroughly",
    "c": "examine casually",
    "d": "check ticket",
    "e": "decide",
    "f": "reinitiate request",
    "g": "pay compensation",
    "h": "reject request",
}

freqs = [
    (455, "(a, c, d, e, h)"),
    (191, "a, b, d, e, g"),
    (177, "a, d, e, g"),
    (144, "a, b, d, e, h"),
    # ... complete com as restantes linhas se desejar
]

log = build_xes_from_frequencies(
    freqs,
    out_path="Lfull.xes",
    activity_labels=ACT_LABELS,  # opcional (só para legenda)
    add_timestamps=True,  # gera tempos sintéticos
    keep_activity_letters_in_concept_name=True,  # mantém 'a','b',... em concept:name
)
print(f"XES salvo em: Lfull.xes  - casos: {len(log)}")

  from .autonotebook import tqdm as notebook_tqdm
exporting log, completed traces :: 100%|██████████| 967/967 [00:00<00:00, 22175.83it/s]

XES salvo em: Lfull.xes  - casos: 967



