## **Build automata learning**

In [1]:
import pm4py
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.obj import EventLog, Trace, Event

log = xes_importer.apply("C:/Users/Simone/Desktop/UNIVERSITA/MAGISTRALE/BIOMEDICAL DECISION SUPPORT SYSTEM/Assignments/2/log-10-percent-noise.xes.gz")

parsing log, completed traces ::   0%|          | 0/100000 [00:00<?, ?it/s]

In [2]:
log[0]

{'attributes': {'concept:name': '1'}, 'events': [{'concept:name': 'Triage', 'lifecycle:transition': 'complete', 'priority': 'red', 'time:timestamp': datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc)}, '..', {'concept:name': 'Prepare', 'lifecycle:transition': 'complete', 'time:timestamp': datetime.datetime(1970, 1, 1, 2, 31, tzinfo=datetime.timezone.utc)}]}

In [3]:
from pm4py.statistics.variants.log import get as variants_get
variants = variants_get.get_variants(log)
print("Number of variants: ", len(variants))

Number of variants:  1956


In [5]:
test_log = log[10:]
train_log = log[0:10]

def create_event_log(traces_list):
	log = EventLog()
	
	for trace_data in traces_list:
		trace = Trace()
		
		for event_data in trace_data[:-1]:
			event = Event(event_data)
			trace.append(event)
		
		log.append(trace)
	
	return log

train = create_event_log(train_log)
test = create_event_log(test_log)

In [6]:
train[0][0]

{'concept:name': 'Triage', 'lifecycle:transition': 'complete', 'priority': 'red', 'time:timestamp': datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc)}

In [7]:
net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(train)
# pm4py.view_petri_net(net, initial_marking, final_marking)

In [8]:
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments

# Perform alignments
aligned_traces = alignments.apply_log(train, net, initial_marking, final_marking)
al = [t[1] for t in aligned_traces[0]['alignment']]

aligning log, completed variants ::   0%|          | 0/10 [00:00<?, ?it/s]

In [9]:
# Build the perfect alignment
perfect_aligned_log = []

for aligned in aligned_traces:
	aligned_trace = []
	for move_log, move_model in aligned['alignment']:
		# keep synchronized moves
		if move_log not in [None, ">>"] and move_model not in [None, ">>"]:
			aligned_trace.append((move_log, move_model))
		
		# insert dummy for visible model moves (if present)
		elif move_log in [None, ">>"] and move_model not in [None, ">>", "tau"]:
			aligned_trace.append(f"{move_model}_DUMMY")
		
		# skip all other cases (trace-only moves or tau)
	
	perfect_aligned_log.append(aligned_trace)



In [10]:
print("Aligned traces:" , aligned_traces[4])
# Aligned traces: {'alignment': [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('>>', None), ('X-Ray', 'X-Ray'), ('Visit', 'Visit'), ('Check', 'Check'), ('Final Visit', 'Final Visit'), ('>>', None), ('Prepare', 'Prepare')], 'cost': 2, 'visited_states': 11, 'queued_states': 44, 'traversed_arcs': 46, 'lp_solved': 4, 'fitness': 1.0, 'bwc': 140002}

print("Perfect aligned traces:" , perfect_aligned_log[1])

print("Alignment of the first trace:" , al[0])

# Alignment of the first trace: Triage

for i in enumerate(aligned_traces[0]['alignment']):
	print(i)
	
total_traces = 0
for i in enumerate(aligned_traces):
	total_traces += 1
print("Total traces: ", total_traces)

Aligned traces: {'alignment': [('Triage', 'Triage'), ('Register', 'Register'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('>>', None), ('Visit', 'Visit'), ('Check', 'Check'), ('>>', None), ('Check', 'Check'), ('Final Visit', 'Final Visit'), ('>>', None), ('>>', None)], 'cost': 4, 'visited_states': 12, 'queued_states': 44, 'traversed_arcs': 46, 'lp_solved': 1, 'fitness': 1.0, 'bwc': 140002}
Perfect aligned traces: [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('Visit', 'Visit'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('Final Visit', 'Final Visit')]
Alignment of the first trace: Triage
(0, ('Triage', 'Triage'))
(1, ('Register', 'Register'))
(2, ('Check', 'Check'))
(3, ('>>', None))
(4, ('X-Ray', 'X-Ray'))
(5, ('Check', 'Check'))
(6, ('>>', None))
(7, ('Check', 'Check'))
(8, ('>>', None))
(9, ('Visit', 'Visit'))
(10, ('Final Visit', 'Final Visit'))
(11, ('Check', 'Check'))
(12, ('>>', None))
(13, ('>>', None))
Total traces:  10


In [11]:
print(perfect_aligned_log)

[[('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('Check', 'Check'), ('Visit', 'Visit'), ('Final Visit', 'Final Visit'), ('Check', 'Check')], [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('Visit', 'Visit'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('Final Visit', 'Final Visit')], [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('X-Ray', 'X-Ray'), ('Visit', 'Visit'), ('Check', 'Check'), ('Check', 'Check'), ('Final Visit', 'Final Visit'), ('Prepare', 'Prepare')], [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('X-Ray', 'X-Ray'), ('Visit', 'Visit'), ('Check', 'Check'), ('Check', 'Check'), ('Final Visit', 'Final Visit')], [('Triage', 'Triage'), ('Register', 'Register'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('Visit', 'Visit'), ('Check', 'Check'), ('Check', 'Check'), ('Final Visit', 'Final Visit')], [('Triage', 'Triage'), ('Register', 'Register'), ('Visit', 'Visit

#### **Build automata learning**

In [12]:
from pm4py.objects.petri_net import semantics
import random 
random.seed(42)

class AutomataLearningWrapper:
	def __init__(self, random_aligned, net, initial_marking, final_marking):
		self.alignment = random_aligned
		self.net = net
		self.current_marking = initial_marking
		self.current_index = 0
		self.final_marking = final_marking
	
	def step(self):
		if self.current_index >= len(self.alignment):  
			self.current_index += 1
			return None, self.final_marking
		
		# extract the transition from alignment (assuming tuple structure)
		model_transition_name = self.alignment[self.current_index][1]
		transition = next(t for t in self.net.transitions if t.label == model_transition_name) # take the second position of the tuple, because represent the model/net transition
		print("Transition to fire:", transition)
		
		enabled = semantics.enabled_transitions(self.net, self.current_marking)
		# enabled = [t.label for t in enabled]  # get the labels of the enabled transitions
		print("Enabled transitions:", enabled)
		
		if transition not in enabled:
			self.current_index += 1
			return None, self.current_marking
		
		# fire the transition
		self.current_marking = semantics.execute(transition, self.net, self.current_marking)
		
		self.current_index += 1
		
		return transition, self.current_marking

# Test automata learning wrapper
random_aligned = random.choice(perfect_aligned_log)
print("Randomly selected aligned trace for testing:", random_aligned)
print("-----")
automata_learning = AutomataLearningWrapper(random_aligned, net, initial_marking, final_marking)
for i in range(0, 20):
	transition, current_marking = automata_learning.step()
	print(f"Step {i}: Transition={transition}, Marking={current_marking}")
	print("-----")


Randomly selected aligned trace for testing: [('Triage', 'Triage'), ('Register', 'Register'), ('Check', 'Check'), ('Visit', 'Visit'), ('X-Ray', 'X-Ray'), ('Check', 'Check'), ('Final Visit', 'Final Visit')]
-----
Transition to fire: (b4ff1be5-6c8c-4fe0-bc08-e943bd6a3e7b, 'Triage')
Enabled transitions: {(b4ff1be5-6c8c-4fe0-bc08-e943bd6a3e7b, 'Triage')}
Step 0: Transition=(b4ff1be5-6c8c-4fe0-bc08-e943bd6a3e7b, 'Triage'), Marking=['p_3:1']
-----
Transition to fire: (548226c0-1c80-4b5b-ae88-97f936b6e64e, 'Register')
Enabled transitions: {(548226c0-1c80-4b5b-ae88-97f936b6e64e, 'Register')}
Step 1: Transition=(548226c0-1c80-4b5b-ae88-97f936b6e64e, 'Register'), Marking=['p_14:1', 'p_18:1', 'p_8:1']
-----
Transition to fire: (fa27f118-fb17-4d81-a83b-90d82aa25931, 'Check')
Enabled transitions: {(3e3074d6-d883-4886-9e98-4cacf28a160a, 'X-Ray'), (fa27f118-fb17-4d81-a83b-90d82aa25931, 'Check'), (c24662ff-ad4a-4a53-828c-89dab722c0e9, 'Visit')}
Step 2: Transition=(fa27f118-fb17-4d81-a83b-90d82aa25931,