# CAiSE 2025 experiments: testing generators

In [None]:
from exptools import *

from pprint import pprint

Parameters for the experiments

In [None]:
from datetime import datetime, timezone
from pathlib import Path
import subprocess


MAX_TRACES = [6]
MAX_EVENTS = [20]
NOISE = [0]
THRESHOLDS = [.2]

SEED = b'\x81\x97u+'

MODEL_NAME = 'caise2025_experimental_model'
MODEL_PATH = Path(f"{MODEL_NAME}.decl")

EXPORT_PREFIX = 'tests_caise2025_'
EXPORT_PATH = Path('output', EXPORT_PREFIX + datetime.now(timezone.utc).isoformat(timespec='minutes'))
EXPORT_PATH.mkdir(parents=True, exist_ok=False)

print(f'Results written on directory <{EXPORT_PATH.as_posix()}>')
try:
    print('Git describe: ' + subprocess.check_output(['git', 'describe', '--dirty']))
except subprocess.CalledProcessError as e:
    print(f'Git info not available: {e.stderr}')

In [None]:
from Declare4Py.ProcessMiningTasks.LogGenerator.PositionalBased.PositionalBasedLogGenerator import PositionalBasedLogGenerator
from Declare4Py.ProcessMiningTasks.LogGenerator.PositionalBased.PositionalBasedLogGeneratorNG import PBLogGeneratorBaseline, PBLogGeneratorRandom, PBLogGeneratorHamming, PBLogGeneratorLevenshtein, PBLGwrapper, PBLogGeneratorOrig
from Declare4Py.ProcessMiningTasks.LogGenerator.PositionalBased.PositionalBasedModel import PositionalBasedModel


Define the set of experiments to run

In [None]:
EXPERIMENTS: list[dict] = []

# original code

for model_name, model, traces, events, noise in itertools.product([MODEL_NAME], [MODEL_PATH], MAX_TRACES, MAX_EVENTS, NOISE):
    params = {
        'model': model_name,
        'traces': traces,
        'events': events,
        'noise': noise
    }
    exp_args = {
        'init': {
            'total_traces': traces,
            'min_events': events,
            'max_events': events,
            'pb_model': model,
            'verbose': False},
        'run': {
            'equal_rule_split': True,
            'high_variability': False,
            'generate_negatives_traces': False,
            'positive_noise_percentage': noise,
            'negative_noise_percentage': noise,
            'append_results': False,}
    }
    exp_id = f'_{model_name}_{traces:04}_{events:03}_{noise:02}'
    EXPERIMENTS.append(dict(
        id_='orig' + exp_id,
        class_=PositionalBasedLogGenerator,
        args=exp_args,
        model=model,
        parameters=params,
        description='Original code'
    ))
    EXPERIMENTS.append(dict(
        id_='wrapper' + exp_id,
        class_=PBLGwrapper,
        args=exp_args,
        model=model,
        parameters=params,
        description='Original code'
    ))

# new code

for model_name, model, traces, events, noise in itertools.product([MODEL_NAME], [MODEL_PATH], MAX_TRACES, MAX_EVENTS, NOISE):
    params = {
        'model': model_name,
        'traces': traces,
        'events': events,
        'noise': noise
    }
    exp_args = {
        'init': {
            'total_traces': traces,
            'min_event': events,
            'max_event': events,
            'process_model': model,
            'log': None,
            'verbose': True,
            'seed': SEED},
        'run': {
            'equal_rule_split': True,
            'high_variability': False,
            'generate_negatives_traces': False,
            'positive_noise_percentage': noise,
            'negative_noise_percentage': noise,
            'append_results': False}}
    exp_id = f'_{model_name}_{traces:04}_{events:03}_{noise:02}'

    EXPERIMENTS.append(dict(
        id_='old' + exp_id,
        class_=PBLogGeneratorOrig,
        args=exp_args,
        model=model,
        parameters=params,
        description='Rewritten original code'
    ))
    EXPERIMENTS.append(dict(
        id_='baseline' + exp_id,
        class_=PBLogGeneratorBaseline,
        args=exp_args,
        model=model,
        parameters=params,
        description='No attempt to introduce variability in the generated logs'
    ))
    EXPERIMENTS.append(dict(
        id_='random' + exp_id,
        class_=PBLogGeneratorRandom,
        args=exp_args,
        model=model,
        parameters=params,
        description='Uses clingo randomisation to generate different models'
    ))

for model_name, model, traces, events, noise, threshold, randomise in itertools.product([MODEL_NAME], [MODEL_PATH], MAX_TRACES, MAX_EVENTS, NOISE, THRESHOLDS, [False, True]):
    params = {
        'model': model_name,
        'traces': traces,
        'events': events,
        'noise': noise,
        'threshold': threshold,
        'randomise': randomise
    }
    exp_args = {
        'init': {
            'total_traces': traces,
            'min_event': events,
            'max_event': events,
            'process_model': model,
            'log': None,
            'verbose': True,
            'seed': SEED,
            'threshold': threshold,
            'randomise': randomise},
        'run': {
            'equal_rule_split': True,
            'high_variability': False,
            'generate_negatives_traces': False,
            'positive_noise_percentage': noise,
            'negative_noise_percentage': noise,
            'append_results': False}}
    exp_id = f'_{model_name}_{traces:04}_{events:03}_{noise:02}_{int(threshold * 100):02}_{randomise}'

    EXPERIMENTS.append(dict(
        id_='hamming' + exp_id,
        class_=PBLogGeneratorHamming,
        args=exp_args,
        model=model,
        parameters=params,
        description='Hamming distance threshold implemented in ASP'
    ))
    EXPERIMENTS.append(dict(
        id_='levenshtein' + exp_id,
        class_=PBLogGeneratorLevenshtein,
        args=exp_args,
        model=model,
        parameters=params,
        description='Levenshtein distance threshold implemented in ASP'
    ))


with EXPORT_PATH.joinpath('experiments.json').open('w') as fp:
    json.dump(EXPERIMENTS, fp, indent=2, default=lambda o: repr(o))

pprint(EXPERIMENTS)

In [None]:
RESULTS: list[dict] = []

for runner in (Experiment.new(**exp_d).runner() for exp_d in EXPERIMENTS):
    header = '-' * 5 + f' {runner.id} [{runner.experiment.class_.__name__}] '
    print(header + '-' * (72 - len(header)))
    with log_to_file(EXPORT_PATH.joinpath(f'{runner.id}.log.json'), level=logging.DEBUG):
        runner.run(seed=SEED)
    runner.generator.get_results_as_dataframe().to_csv(EXPORT_PATH.joinpath(f'{runner.id}.csv'))
    RESULTS.append(runner.stats(normalise=True, columns=['resource']))
    with EXPORT_PATH.joinpath(f'{runner.id}_results.json').open('w') as fp:
        json.dump(RESULTS[-1], fp)

with EXPORT_PATH.joinpath('results.json').open('w') as fp:
    json.dump(RESULTS, fp)

df = pd.json_normalize(RESULTS)
df.to_csv(EXPORT_PATH.joinpath('results.csv'))
df