In [4]:
%load_ext autoreload
import datetime
# import ete3
import itertools
import json
import Levenshtein
import logging
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
# import re
import xml.etree.ElementTree as ET
# import xml.dom.minidom
# import xmlschema

In [2]:
%autoreload
from maxes.xes_loader2 import XesLoader, XesLog
# from maxes.xes_file import XesFile
# from maxes.analyze_xes import AnalyzeXes
# import maxes.analyze_sequence
import maxes.serialization.serialize
# import maxes.graphs
import maxes.utils
from maxes.generators.xes_generator.xes_generator1 import XesGenerator1

In [None]:
maxes.utils.init_notebook() # RUN ONLY ONCE

In [11]:
def generate_xes_for_file(source_file_path, destination_file_path):
    loader = XesLoader()

    logging.info("Loading")
    log = loader.load(source_file_path)

    if len(loader.errors):
        raise RuntimeError("Errors while loading XES")

    logging.info("Fitting")
    generator = XesGenerator1(debug=True).fit(log)

    logging.info("Generating")
    generated_log = generator.generate()

    logging.info("Serializing")
    generated_log_ET = maxes.serialization.serialize.Serializer().serialize(generated_log, xml_log_skeleton=log.loader.xml_log_skeleton)

    logging.info("Formatting XML")
    ET.indent(generated_log_ET)

    logging.info("Writing XML")
    ET.register_namespace("", "http://www.xes-standard.org")
    with open(destination_file_path, "w") as file:
        generated_log_ET.write(file, encoding="unicode")

In [None]:
# generate_xes_for_file(
#     source_file_path="/vt/md/maxes/maxes/data/Activities of daily living of several individuals_1_all/data/activitylog_uci_detailed_labour.xes/activitylog_uci_detailed_labour.xes",
#     destination_file_path="/vt/md/maxes/maxes/output/generated_test.xes")

# generate_xes_for_file(
#     source_file_path="/vt/md/maxes/maxes/data/BPI Challenge 2013, open problems_1_all/BPI_Challenge_2013_open_problems.xes/BPI_Challenge_2013_open_problems.xes",
#     destination_file_path="/vt/md/maxes/maxes/output/generated_bpi_2013_open_problems.xes")

# TODO: Extend trace range
# generate_xes_for_file(
#     source_file_path="/vt/md/maxes/maxes/data/NASA Crew Exploration Vehicle (CEV) Software Event Log_1_all/data/nasa-cev-complete-single-trace.xes/nasa-cev-complete-single-trace.xes",
#     destination_file_path="/vt/md/maxes/maxes/output/generated_bpi_2013_open_problems.xes")

generate_xes_for_file(
    source_file_path="/vt/md/maxes/maxes/data/Conformance Checking Challenge 2019 (CCC19)_1_all/data/CCC19 - Log XES.xes",
    destination_file_path="/vt/md/maxes/maxes/output/gemerated_ccc19.xes")


In [None]:
# file_path = "/vt/md/maxes/maxes/data/BPI Challenge 2013, open problems_1_all/BPI_Challenge_2013_open_problems.xes/BPI_Challenge_2013_open_problems.xes"
# file_path = "/vt/md/maxes/maxes/data/NASA Crew Exploration Vehicle (CEV) Software Event Log_1_all/data/nasa-cev-complete-single-trace.xes/nasa-cev-complete-single-trace.xes"
file_path="/vt/md/maxes/maxes/data/Conformance Checking Challenge 2019 (CCC19)_1_all/data/CCC19 - Log XES.xes"

log = XesLoader().load(file_path)

generator = XesGenerator1(debug=True).fit(log)

logging.info("Generating")
generated_log = generator.generate()

In [13]:
len(log.df)

1394

In [14]:
len(log.traces)

20

In [15]:
a = list(log.traces[0].df["concept:name"])
b = list(generated_log.traces[0].df["concept:name"])

Levenshtein.ratio(a, b)

0.44318181818181823

In [16]:
for i in range(len(log.traces)):
    a = list(log.traces[i].df["concept:name"])
    b = list(generated_log.traces[i].df["concept:name"])

    distance = Levenshtein.distance(a, b)
    ratio = Levenshtein.ratio(a, b)

    print(f"{i}: distance = {distance}, ratio = {ratio}")

0: distance = 51, ratio = 0.484375
1: distance = 48, ratio = 0.5
2: distance = 65, ratio = 0.5032258064516129
3: distance = 54, ratio = 0.49230769230769234
4: distance = 34, ratio = 0.631578947368421
5: distance = 56, ratio = 0.4444444444444444
6: distance = 47, ratio = 0.6119402985074627
7: distance = 40, ratio = 0.5892857142857143
8: distance = 68, ratio = 0.592964824120603
9: distance = 58, ratio = 0.48571428571428577
10: distance = 38, ratio = 0.5423728813559322
11: distance = 44, ratio = 0.5137614678899083
12: distance = 74, ratio = 0.49689440993788825
13: distance = 62, ratio = 0.4383561643835616
14: distance = 58, ratio = 0.5
15: distance = 54, ratio = 0.4347826086956522
16: distance = 60, ratio = 0.5540540540540541
17: distance = 43, ratio = 0.4864864864864865
18: distance = 49, ratio = 0.38383838383838387
19: distance = 54, ratio = 0.5620915032679739


In [35]:
distances = []

def df_to_sequence(df: pd.DataFrame, keys: list[str]):
    # Returns list of tuples: [(), (), (), ...]
    return list(df[keys].itertuples(index=False, name=None))

def mean_levenstein_distance(original_log, generated_log, keys=['concept:name', 'lifecycle:transition']):
    for original_trace, generated_trace in zip(original_log.traces, generated_log.traces):
        original_sequence = df_to_sequence(original_trace.df, keys=keys)
        generated_sequence = df_to_sequence(generated_trace.df, keys=keys)

        distance = Levenshtein.ratio(original_sequence, generated_sequence)

        distances.append(distance)

    return np.array(distances).mean()

print(mean_levenstein_distance(log, generated_log, keys=['concept:name', 'lifecycle:transition']))
print(mean_levenstein_distance(log, generated_log, keys=['concept:name']))

0.5124237486555038
0.5124237486555039


In [31]:
list(log.traces[0].df[['concept:name', 'lifecycle:transition']].itertuples(index=False, name=None))

[('Hand washing', 'start'),
 ('Hand washing', 'complete'),
 ('Ultrasound configuration', 'start'),
 ('Ultrasound configuration', 'complete'),
 ('Anatomic identification', 'start'),
 ('Anatomic identification', 'complete'),
 ('Compression identification', 'start'),
 ('Compression identification', 'complete'),
 ('Gel in probe', 'start'),
 ('Gel in probe', 'complete'),
 ('Get in sterile clothes', 'start'),
 ('Get in sterile clothes', 'complete'),
 ('Hand washing', 'start'),
 ('Hand washing', 'complete'),
 ('Get in sterile clothes', 'start'),
 ('Get in sterile clothes', 'complete'),
 ('Clean puncture area', 'start'),
 ('Clean puncture area', 'complete'),
 ('Drap puncture area', 'start'),
 ('Drap puncture area', 'complete'),
 ('Cover probe', 'start'),
 ('Cover probe', 'complete'),
 ('Put sterile gel', 'start'),
 ('Put sterile gel', 'complete'),
 ('Anatomic identification', 'start'),
 ('Anatomic identification', 'complete'),
 ('Ultrasound configuration', 'start'),
 ('Ultrasound configuration

In [None]:
# Approaches:

# First best distance match
# TODO: Mean best distance match
# Mean best distance match with withdrawing
# TODO: Absolute recursive permutations withdrawing

In [4]:
# Helpers

class TraceSequence:
    index: int
    trace: any
    sequence: list[list[str]]

    def __init__(self, index, trace, sequence):
        self.index = index
        self.trace = trace
        self.sequence = sequence

def df_to_sequence(df: pd.DataFrame, keys: list[str] = ['concept:name', 'lifecycle:transition']):
    # Returns list of tuples: [(), (), (), ...]
    return list(df[keys].itertuples(index=False, name=None))

def _extract_sequences(log, keys: list[str] = ['concept:name', 'lifecycle:transition']) -> list[TraceSequence]:
    sequences = []

    for index, trace in enumerate(log.traces):
        sequence = df_to_sequence(trace.df, keys)
        trace_sequence = TraceSequence(index, trace, sequence)
        sequences.append(trace_sequence)

    return sequences


In [5]:
# First best distance match

def first_best_distance_match(original_log, generated_log):
    original_sequences = _extract_sequences(original_log)
    generated_sequences = _extract_sequences(generated_log)

    best_score = 0
    best_pair = ()

    for original_sequence, generated_sequence in itertools.product(original_sequences, generated_sequences):
        score = Levenshtein.ratio(original_sequence.sequence, generated_sequence.sequence)

        if score > best_score:
            best_score = score
            best_pair = (original_sequence, generated_sequence)

    return best_score, best_pair

score, pair = first_best_distance_match(log, generated_log)

print(score)
print(pair[0].index, pair[1].index)

0.7457627118644068
1 12


In [7]:
print("Double-check:")
ratio = Levenshtein.ratio(
    df_to_sequence(log.traces[4].df),
    df_to_sequence(generated_log.traces[15].df),
)
print(ratio)
distance = Levenshtein.distance(
    df_to_sequence(log.traces[4].df),
    df_to_sequence(generated_log.traces[15].df),
)
print(distance)

Double-check:
0.5470085470085471
45


In [8]:
# Mean best distance match with withdrawing

def mean_best_distance_match(original_log, generated_log, keys: list[str] = ['concept:name', 'lifecycle:transition']):
    original_sequences = _extract_sequences(original_log, keys=keys)
    generated_sequences = _extract_sequences(generated_log, keys=keys)

    pairs_scores: list[list[list[TraceSequence], int]] = []

    for original_sequence, generated_sequence in itertools.product(original_sequences, generated_sequences):
        score = Levenshtein.ratio(original_sequence.sequence, generated_sequence.sequence)

        item = {
            "original_sequence": original_sequence,
            "generated_sequence": generated_sequence,
            "score": score
        }

        pairs_scores.append(item)

    best_pairs_scores = []

    while len(pairs_scores) > 0:
        # find best scored pair
        best_pair = max(pairs_scores, key=lambda item: item["score"])
        best_pairs_scores.append(best_pair)

        # Remove all pairs containing first or second sequence

        pairs_scores = [
            pair_and_score
            for pair_and_score in pairs_scores
            if pair_and_score["original_sequence"].index != best_pair["original_sequence"].index and\
                pair_and_score["generated_sequence"].index != best_pair["generated_sequence"].index
        ]

    return best_pairs_scores

best_pairs_distances = mean_best_distance_match(log, generated_log)

for item in best_pairs_distances:
    original_sequence = item["original_sequence"]
    generated_sequence = item["generated_sequence"]
    score = item["score"]

    print(f"{original_sequence.index} <-> {generated_sequence.index}: {score}")

scores = [item["score"] for item in best_pairs_distances]
mean = np.array(scores).mean()
print(f"mean: {mean}")

1 <-> 12: 0.7457627118644068
5 <-> 11: 0.6719999999999999
9 <-> 5: 0.6666666666666667
17 <-> 16: 0.653061224489796
6 <-> 19: 0.6524822695035462
18 <-> 18: 0.6495726495726496
10 <-> 0: 0.6491228070175439
4 <-> 8: 0.6349206349206349
3 <-> 4: 0.6333333333333333
19 <-> 14: 0.6164383561643836
16 <-> 13: 0.6153846153846154
0 <-> 17: 0.6015037593984962
8 <-> 10: 0.5921787709497206
15 <-> 6: 0.5563909774436091
14 <-> 7: 0.5521472392638036
11 <-> 15: 0.5504587155963303
7 <-> 3: 0.5324675324675325
2 <-> 1: 0.5263157894736843
13 <-> 9: 0.4752475247524752
12 <-> 2: 0.42553191489361697
mean: 0.6000493746578422


In [1]:
l1 = ['a1', 'a2', 'a3']
l2 = ['b1', 'b2', 'b3']

# list(itertools.product(l1, l2))

import itertools

[list(zip(x, l2)) for x in itertools.permutations(l1, len(l2))]

[[('a1', 'b1'), ('a2', 'b2'), ('a3', 'b3')],
 [('a1', 'b1'), ('a3', 'b2'), ('a2', 'b3')],
 [('a2', 'b1'), ('a1', 'b2'), ('a3', 'b3')],
 [('a2', 'b1'), ('a3', 'b2'), ('a1', 'b3')],
 [('a3', 'b1'), ('a1', 'b2'), ('a2', 'b3')],
 [('a3', 'b1'), ('a2', 'b2'), ('a1', 'b3')]]

In [5]:
import typing

T = typing.TypeVar('T')

def all_combinations_of_list_mappings(
        l1: typing.Sequence[T],
        l2: typing.Sequence[T]
        ) -> typing.Sequence[typing.Sequence[tuple[T, T]]]:
    return (zip(l1_item, l2) for l1_item in itertools.permutations(l1, len(l2)))
    # return [list(zip(l1_item, l2)) for l1_item in itertools.permutations(l1, len(l2))]

# print(len(list(all_combinations_of_list_mappings(log.traces, generated_log.traces))))

In [9]:
import math

def count_generator(gen):
    return len(list(gen))

l1 = [0, 1, 2, 3, 4, 5]
l2 = [0, 1, 2, 3, 4, 5]

print(count_generator(all_combinations_of_list_mappings(l1, l2)))
print(math.factorial(20))


720
2432902008176640000


In [11]:
def count_generator(gen):
    return len(list(gen))

l1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
l2 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# print(count_generator(all_combinations_of_list_mappings(l1, l2)))

%timeit count_generator(all_combinations_of_list_mappings(l1, l2))

2.28 s ± 39.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
def count_generator2(gen):
    return sum(1 for _ in gen)

%timeit count_generator2(all_combinations_of_list_mappings(l1, l2))

1.08 s ± 26.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [13]:
import collections

consumeall = collections.deque(maxlen=0).extend

def count_generator3(gen):
    cnt = itertools.count()
    consumeall(zip(gen, cnt))
    return next(cnt)

%timeit count_generator3(all_combinations_of_list_mappings(l1, l2))

1.17 s ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:

def mean_best_distance_match2(original_log, generated_log, keys: list[str] = ['concept:name', 'lifecycle:transition']):
    original_sequences = _extract_sequences(original_log, keys=keys)
    generated_sequences = _extract_sequences(generated_log, keys=keys)

    pairs_scores: dict[tuple[int, int], float] = {}

    for original_sequence, generated_sequence in itertools.product(original_sequences, generated_sequences):
        score = Levenshtein.ratio(original_sequence.sequence, generated_sequence.sequence)

        pair_key = (original_sequence.index, generated_sequence.index)
        pairs_scores[pair_key] = score

    best_mean_score = 0
    best_combination = None
    best_pairs_scores = []

    for pairs in all_combinations_of_list_mappings(original_sequences, generated_sequences):
        all_pairs_score = 0

        for pair in pairs:
            original_sequence, generated_sequence = pair
            pair_key = (original_sequence.index, generated_sequence.index)
            pair_score = pairs_scores[pair_key]
            all_pairs_scores += pair_score

        mean_score = float(all_pairs_score) / float(len(pairs))

        if mean_score > best_mean_score:
            best_mean_score = mean_score
            best_combination = pairs

    return best_mean_score, best_combination