In [9]:
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize
from mutation import MyMutation
from crossover import TraceCrossover
from encoder import Encoder
from sampling import MySampling
from callback import UpdatePopulationCallback
import numpy as np
from tools import Tools
from survival import MySurvival
from Declare4Py.ProcessModels.DeclareModel import DeclareModel
from Declare4Py.D4PyEventLog import D4PyEventLog
import warnings
import random
from Declare4Py.ProcessMiningTasks.ConformanceChecking.MPDeclareResultsBrowser import MPDeclareResultsBrowser
from Declare4Py.ProcessMiningTasks.ConformanceChecking.MPDeclareAnalyzer import MPDeclareAnalyzer
import pandas as pd
import logging
from terminator import MyTermination
from problem import Problem_single_ElementWise, MyProblem_Problem, MyProblem_Problem2, Problem_multi_ElementWise, Problem_single_ElementWise_noConstraints
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.termination.default import DefaultSingleObjectiveTermination
import testSetup
from pymoo.operators.mutation.pm import PolynomialMutation
from pymoo.operators.crossover.sbx import SBX
from mutation import IntegerPolynomialMutation
from testSetup import Setup




logging.getLogger('matplotlib').setLevel(logging.WARNING)
warnings.filterwarnings("ignore", ".*feasible.*")


# create random pop
n_traces = 100
trace_length = 50
variable_boundaries = [1] * trace_length

trace = [
    "Return ER", "LacticAcid", "Return ER", "IV Liquid", "ER Triage",
    "ER Sepsis Triage", "IV Antibiotics", "LacticAcid", "Admission NC",
    "Release B", "IV Liquid", "LacticAcid", "ER Registration",
    "ER Sepsis Triage", "Admission NC", "Admission IC", "LacticAcid",
    "ER Sepsis Triage", "LacticAcid", "Release A"
]

declare = DeclareModel().parse_from_file("../declare_models/model1.decl")

activities_name = declare.get_model_activities()

timestamps = Tools.generate_random_timestamps(trace_length)

data = {
    'case:concept:name': ['1'] * trace_length,
    'concept:name': ['1'] * trace_length,
    'timestamp': pd.to_datetime(timestamps),
}

dataframe = pd.DataFrame(data)

event_log = D4PyEventLog()
event_log.log = dataframe
event_log.timestamp_key = "timestamp"
event_log.activity_key = "concept:name"

event_log.to_eventlog()

model_constraints = declare.get_decl_model_constraints()

encoder = Encoder(activities_name)

print(activities_name)


['Release B', 'ER Triage', 'ER Registration', 'ER Sepsis Triage', 'Leucocytes', 'CRP', 'LacticAcid', 'IV Antibiotics', 'Admission NC', 'IV Liquid', 'Release A', 'Return ER', 'Admission IC']


In [10]:




from pymoo.termination.default import DefaultMultiObjectiveTermination




# initial_population = [[random.choice(activities_name) for _ in range(trace_length)] for _ in range(n_traces)]

# Load CSV file with correct column names
df = pd.read_csv("../declare_models/model1_initial_pop.csv", usecols=['Case ID', 'Activity'])

# Group activities by case, filter cases with at least 50 activities, and select the first 10 cases
initial_population = [
    activities[:50] for activities in
    (df.groupby('Case ID')['Activity'].apply(list).values) if len(activities) >= 50
][:10]




print(initial_population)
initial_encoded_pop = [encoder.encode(trace) for trace in initial_population]
print(initial_encoded_pop)


lower_bounds = 0
upper_bounds = len(activities_name) - 1



mutation = IntegerPolynomialMutation(prob=0.4,eta=20) # low eta (5–10) → More exploration - high eta (15–50) → More exploitation

crossover = SBX(prob=0.9, eta=15)
sampling = MySampling(initial_population=initial_encoded_pop)
pop_size = 3000
survival = MySurvival(n_children_survive=50)

termination_multi = DefaultMultiObjectiveTermination(
    xtol=1e-8,
    cvtol=1e-6,
    ftol=0.0025,
    period=30,
    n_max_gen=1000,
    n_max_evals=100000
)

termination_single = DefaultSingleObjectiveTermination(
    xtol=1e-8,
    cvtol=1e-6,
    ftol=1e-6,
    period=20,
    n_max_gen=1000,
    n_max_evals=100000
)


termination = MyTermination(n_required=pop_size * 1.3)
# termination = get_termination("n_gen", 10)
# termination1 = termination
# termination2 = termination
print('\n----------------------------------------------\n')
print(len(trace))
print('\n----------------------------------------------\n')
print(trace_length)

for x in initial_population:
    dataframe['concept:name'] = pd.DataFrame(x)
    event_log.log = dataframe
    event_log.to_eventlog()

    basic_checker = MPDeclareAnalyzer(log=event_log, declare_model=declare, consider_vacuity=False)
    conf_check_res: MPDeclareResultsBrowser = basic_checker.run()
    metric_state = conf_check_res.get_metric(trace_id=0, metric="state")
    metric_num_violation = conf_check_res.get_metric(trace_id=0, metric="num_violations")

    print("Solution Decoded:")
    print(metric_num_violation)
    print("-------------------------------------------")



[['LacticAcid', 'CRP', 'ER Sepsis Triage', 'IV Antibiotics', 'IV Liquid', 'Leucocytes', 'Release A', 'Leucocytes', 'ER Registration', 'Admission NC', 'Release B', 'ER Triage', 'Admission NC', 'Release B', 'Release A', 'IV Liquid', 'Admission IC', 'IV Liquid', 'Admission NC', 'Release B', 'Admission IC', 'LacticAcid', 'Admission IC', 'ER Registration', 'Leucocytes', 'LacticAcid', 'LacticAcid', 'LacticAcid', 'Leucocytes', 'Admission IC', 'Admission IC', 'Leucocytes', 'Admission IC', 'CRP', 'ER Registration', 'Admission IC', 'CRP', 'LacticAcid', 'LacticAcid', 'ER Registration', 'LacticAcid', 'Leucocytes', 'Release A', 'CRP', 'Admission NC', 'Release B', 'CRP', 'Release A', 'CRP', 'Admission IC'], ['Release A', 'ER Registration', 'ER Sepsis Triage', 'IV Antibiotics', 'ER Registration', 'ER Registration', 'Admission NC', 'Release B', 'Admission NC', 'Release B', 'IV Liquid', 'Admission IC', 'CRP', 'CRP', 'Leucocytes', 'Admission NC', 'Release B', 'ER Registration', 'ER Registration', 'ER Re

In [11]:


# Step 1: Read the CSV file
file_path = "results/encoded_traces_2025-02-05/ID_3_run_1_multi_yes_constraints.csv"  # Change this to your actual CSV file path

# Read CSV assuming events are separated by semicolons
df = pd.read_csv(file_path, header=None, delimiter=";")

# Convert each row into a list of event sequences
traces = df.values.tolist()

encoded_traces = [encoder.encode(trace) for trace in traces]



# Step 3: Select a trace (e.g., the first one)
selected_trace = encoded_traces[0]

# Step 4: Compute diversity (average Hamming distance)
population = np.array(encoded_traces)  # Convert to NumPy array
trace_array = np.array(selected_trace)  # Convert trace to NumPy

# Compute Hamming distance
diversity_score = Tools.calculate_diversity(trace_array,population)

# Step 5: Print Results
print(f"Selected Trace (Encoded): {selected_trace}")
print(f"Population Trace (Encoded): {population}")
print(f"Diversity Score: {diversity_score}")
print(f"Diversity Score: {diversity_score/50}")




Selected Trace (Encoded): [9, 9, 10, 4, 8, 0, 5, 10, 1, 9, 10, 4, 10, 9, 3, 7, 10, 10, 5, 2, 4, 8, 0, 10, 4, 6, 6, 4, 11, 4, 5, 2, 4, 10, 5, 4, 10, 9, 4, 10, 2, 8, 0, 10, 9, 8, 0, 8, 0, 9]
Population Trace (Encoded): [[ 9  9 10 ...  8  0  9]
 [ 9  9 10 ...  8  0  9]
 [ 9  9 10 ...  8  0 10]
 ...
 [ 4  9 10 ...  8  0 11]
 [ 9  9  9 ...  8  0  9]
 [ 9  9 10 ...  8  0 10]]
Diversity Score: 10.318
Diversity Score: 0.20636
Diversity Score: 0.816


In [12]:
import csv
from testSetup import Setup
from problem2 import ProblemSingleElementWise, ProblemMultiNoConstElementWise, ProblemMultiElementWise

# (encoder, declare, event_log, dataframe, activities_name) = Setup.initialize_shared_components("../declare_models/model1.decl", trace_length)


multi_problem = Problem_multi_ElementWise(
    trace_length=trace_length,
    encoder=encoder,
    d4py=declare,
    initial_population=initial_encoded_pop,
    xl=lower_bounds,xu=upper_bounds,
    event_log=event_log,
    dataframe=dataframe
)

single_problem = ProblemSingleElementWise(
    trace_length=trace_length,
    encoder=encoder,
    d4py=declare,
    initial_population=initial_encoded_pop,
    xl=lower_bounds,xu=upper_bounds,
    event_log=event_log,
    dataframe=dataframe
)


multi_algorithm = NSGA2(
    problem=multi_problem,
    pop_size=pop_size,
    sampling=sampling,
    crossover=crossover,
    mutation=mutation,
    callback=UpdatePopulationCallback(),
    eliminate_duplicates=False,
)

single_algorithm = GA(
    problem=single_problem,
    pop_size=pop_size,
    sampling=sampling,
    crossover=crossover,
    mutation=mutation,
    callback=UpdatePopulationCallback(),
    eliminate_duplicates=False,
)


result = minimize(multi_problem, multi_algorithm, termination=termination_multi, seed=1, verbose=True)
data = result.algorithm.callback.get_data()
diversity_scores = data.get("diversity_history", None)
constraint_scores = data.get("constraint_history", None)
n_violations_scores = data.get("n_violations_history", None)
n_generations = data.get("generations", None)
print(n_generations)
print(constraint_scores)


Setup.plot_progress(diversity_scores=diversity_scores, constraint_scores=n_violations_scores, n_violations_scores=None, n_generations=n_generations)

G = np.array([individual.G for individual in result.pop])
F = np.array([individual.F[1] for individual in result.pop])
traces = [individual.X.tolist() for individual in result.pop]


data = result.algorithm.callback.get_data()
diversity_scores = data.get("diversity_history", None)
print(diversity_score)
# save the feasible traces
for i, (trace, g) in enumerate(zip(traces, F)):
    if g == 0:
        decoded_trace =  encoder.decode(trace)

        dataframe['concept:name'] = pd.DataFrame(decoded_trace)
        event_log.log = dataframe
        event_log.to_eventlog()

        basic_checker = MPDeclareAnalyzer(log=event_log, declare_model=declare, consider_vacuity=False)
        conf_check_res: MPDeclareResultsBrowser = basic_checker.run()
        print(f"Feasible Trace {i + 1}: {trace}")

        # print("Solution Decoded:", encoder.decode(trace))
        print(conf_check_res.get_metric(trace_id=0, metric="num_violations"))
        print("-------------------------------------------")

        with open("simple_run_decoded_traces.csv", "w") as f:
            # Convert the trace into event;event;event format
            encoded_trace = ";".join(map(str, trace))
            f.write(f"{encoded_trace}\n")


Supposed trace[ 6  5  3  7  9  4 10  4  2  8  0  1  8  0 10  9 12  9  8  0 12  6 12  2
  4  6  6  6  4 12 12  4 12  5  2 12  5  6  6  2  6  4 10  5  8  0  5 10
  5 12]
Current population10
Diversity score-40.8
Population [[6, 5, 3, 7, 9, 4, 10, 4, 2, 8, 0, 1, 8, 0, 10, 9, 12, 9, 8, 0, 12, 6, 12, 2, 4, 6, 6, 6, 4, 12, 12, 4, 12, 5, 2, 12, 5, 6, 6, 2, 6, 4, 10, 5, 8, 0, 5, 10, 5, 12], [10, 2, 3, 7, 2, 2, 8, 0, 8, 0, 9, 12, 5, 5, 4, 8, 0, 2, 2, 2, 2, 12, 9, 12, 1, 2, 4, 12, 9, 5, 8, 0, 5, 8, 0, 9, 6, 11, 12, 12, 12, 5, 6, 6, 2, 12, 8, 0, 12, 10], [6, 10, 5, 2, 10, 3, 7, 12, 2, 4, 6, 11, 8, 0, 5, 4, 5, 9, 11, 9, 5, 1, 11, 2, 2, 10, 8, 0, 4, 2, 8, 0, 6, 6, 4, 12, 8, 0, 2, 9, 6, 5, 2, 10, 2, 2, 5, 2, 9, 12], [2, 11, 6, 12, 10, 10, 9, 3, 7, 10, 1, 12, 2, 12, 5, 9, 2, 11, 12, 10, 5, 6, 6, 9, 11, 2, 2, 5, 5, 10, 6, 12, 9, 9, 11, 2, 4, 5, 9, 2, 8, 0, 2, 4, 8, 0, 11, 5, 12, 9], [4, 12, 3, 7, 9, 2, 10, 8, 0, 4, 11, 11, 8, 0, 8, 0, 11, 5, 9, 6, 2, 12, 6, 9, 12, 5, 11, 8, 0, 9, 2, 8, 0, 10, 11, 1, 9

KeyboardInterrupt: 

In [5]:



problem = Problem_single_ElementWise(
    trace_length=trace_length,
    encoder=encoder,
    d4py=declare,
    initial_population=initial_encoded_pop,
    xl=lower_bounds,xu=upper_bounds,
    event_log=event_log,
    dataframe=dataframe
)

callback = UpdatePopulationCallback(problem=problem, plot=0)

algorithm = GA(
    problem=problem,
    pop_size=pop_size,
    sampling=sampling,
    crossover=crossover,
    mutation=mutation,
    callback=callback,
    termination=termination,
    eliminate_duplicates=False,
)


result = minimize(problem, algorithm, termination=termination2, seed=1, verbose=True)

G = np.array([individual.G for individual in result.pop])
traces = [individual.X.tolist() for individual in result.pop]

# Print the feasible traces
for i, (trace, g) in enumerate(zip(traces, G)):
    if g == 0:
        print(f"Feasible Trace {i + 1}: {trace}")

print("Best Solution Encoded:", result.X)
# print("Best Solution Decoded:", [encoder.decode(solution) for solution in result.X])
print("Objective Values:", result.F)
print("Constraint Values:", result.G)


TypeError: UpdatePopulationCallback.__init__() got an unexpected keyword argument 'problem'

only few millisecond faster than the 2 obj algorithm

In [8]:



problem = MyProblem_Problem(
    trace_length=trace_length,
    encoder=encoder,
    d4py=declare,
    initial_population=initial_encoded_pop,
    xl=lower_bounds,xu=upper_bounds,
    event_log=event_log,
    dataframe=dataframe
)

callback = UpdatePopulationCallback(problem=problem, plot=0)


algorithm = NSGA2(
    problem=problem,
    pop_size=pop_size,
    sampling=sampling,
    crossover=crossover,
    mutation=mutation,
    callback=callback,
    termination=termination,
    eliminate_duplicates=False,
)

result = minimize(problem, algorithm, termination=termination1, seed=1, verbose=True)


print("Best Solution Encoded:", result.X)
print("Best Solution Decoded:", [encoder.decode(solution) for solution in result.X])
print("Objective Values:", result.F)
print("Constraint Values:", result.G)

n_gen  |  n_eval  | n_nds  |     cv_min    |     cv_avg    |      eps      |   indicator  
     1 |       10 |      1 |  0.3333333333 |  0.7666666667 |             - |             -
     2 |     2010 |      1 |  0.000000E+00 |  0.6755833333 |             - |             -
     3 |     4010 |      1 |  0.000000E+00 |  0.5208333333 |  0.4000000000 |         ideal
     4 |     6010 |      1 |  0.000000E+00 |  0.4137500000 |  0.000000E+00 |             f
     5 |     8010 |      2 |  0.000000E+00 |  0.3443333333 |  0.000000E+00 |             f
     6 |    10010 |      1 |  0.000000E+00 |  0.2790000000 |  0.2000000000 |         ideal
     7 |    12010 |      5 |  0.000000E+00 |  0.2414166667 |  0.1000000000 |         ideal
     8 |    14010 |      1 |  0.000000E+00 |  0.1887500000 |  0.4000000000 |         ideal
     9 |    16010 |      1 |  0.000000E+00 |  0.1365000000 |  0.1000000000 |         ideal
    10 |    18010 |      1 |  0.000000E+00 |  0.1208333333 |  0.000000E+00 |             f

Seems a little slower than with ElementalWise problem object

In [9]:



problem = MyProblem_Problem2(
    trace_length=trace_length,
    encoder=encoder,
    d4py=declare,
    initial_population=initial_encoded_pop,
    xl=lower_bounds,xu=upper_bounds,
    event_log=event_log,
    dataframe=dataframe
)

callback = UpdatePopulationCallback(problem=problem, plot=0)

algorithm = GA(
    problem=problem,
    pop_size=pop_size,
    sampling=sampling,
    crossover=crossover,
    mutation=mutation,
    callback=callback,
    eliminate_duplicates=False,
)

result = minimize(problem, algorithm, termination=termination2, seed=1, verbose=True)

print("Best Solution Encoded:", result.X)
# print("Best Solution Decoded:", [encoder.decode(solution) for solution in result.X])
print("Objective Values:", result.F)
print("Constraint Values:", result.G)


n_gen  |  n_eval  |     cv_min    |     cv_avg    |     f_avg     |     f_min    
     1 |       10 |  0.3333333333 |  0.7666666667 |             - |             -
     2 |     2010 |  0.1666666667 |  0.6750833333 |             - |             -
     3 |     4010 |  0.000000E+00 |  0.5258333333 | -1.790000E+01 | -1.790000E+01
     4 |     6010 |  0.000000E+00 |  0.4196666667 | -1.773333E+01 | -1.790000E+01
     5 |     8010 |  0.000000E+00 |  0.3510000000 | -1.774211E+01 | -1.820000E+01
     6 |    10010 |  0.000000E+00 |  0.2821666667 | -1.785918E+01 | -1.850000E+01
     7 |    12010 |  0.000000E+00 |  0.2451666667 | -1.788387E+01 | -1.860000E+01
     8 |    14010 |  0.000000E+00 |  0.1990000000 | -1.793148E+01 | -1.900000E+01
     9 |    16010 |  0.000000E+00 |  0.1422500000 | -1.797440E+01 | -1.900000E+01
    10 |    18010 |  0.000000E+00 |  0.1280833333 | -1.801793E+01 | -1.900000E+01
    11 |    20010 |  0.000000E+00 |  0.1108333333 | -1.807164E+01 | -1.900000E+01
    12 |    2201