In [1]:
# preprocess the trace
#todo extract trace features
from pm4py import read_xes
import numpy as np
import math
from typing import Dict, Iterable, Tuple, List
from pm4py.objects.log.obj import EventLog
import pm4py.util.xes_constants as xes
import numpy as np

In [2]:
import os 
os.chdir('../')

In [3]:
def extractTraces(log: EventLog, activityName_key:str=xes.DEFAULT_NAME_KEY)-> np.ndarray:
    out = np.empty(len(log), dtype=object)
    for index, case in enumerate(log):
        out[index] = tuple(evt[activityName_key] for evt in case)
    return out

In [4]:
def extractTraceswithTime(log: EventLog, activityName_key:str=xes.DEFAULT_NAME_KEY, timestamp_key: str = xes.DEFAULT_TIMESTAMP_KEY)-> np.ndarray:
    traces_data = np.empty(len(log), dtype=object)
    for i, case in enumerate(log):
        # For each event in the case, create a tuple of (activity, timestamp)
        # and store them in a list for the current trace.
        trace = [
            (event[activityName_key], event[timestamp_key]) 
            for event in case
        ]
        traces_data[i] = trace
        
    return traces_data

In [5]:
def calculate_time_since_last(traces_with_time: np.ndarray) -> np.ndarray:
    """
    Calculates the time elapsed since the previous event for each event in a trace.

    Args:
        traces_with_time: A NumPy array where each element is a list of 
                          (activity, timestamp) tuples for a trace.

    Returns:
        A NumPy array where each element is a list of durations in seconds,
        representing the time since the last event.
    """
    all_transition_times = np.empty(len(traces_with_time), dtype=object)

    for i, trace in enumerate(traces_with_time):
        if not trace:
            all_transition_times[i] = []
            continue

        transition_times = [0.0] 
        
        for j in range(1, len(trace)):
            # Get the timestamp of the current and previous event
            previous_timestamp = trace[j-1][1]  # Index 1 is the timestamp
            current_timestamp = trace[j][1]
            
            # Calculate the difference and convert to seconds
            delta = current_timestamp - previous_timestamp
            transition_times.append(delta.total_seconds())
            
        all_transition_times[i] = transition_times
        
    return all_transition_times


In [6]:
log = read_xes('data/sudden_trace_noise0_1000_IOR.xes')

  from .autonotebook import tqdm as notebook_tqdm
parsing log, completed traces :: 100%|████| 1000/1000 [00:00<00:00, 1786.57it/s]


In [57]:
traces = extractTraces(log)
traces_times = extractTraceswithTime(log)
transition_times = calculate_time_since_last(traces_times)

In [58]:
print(traces[0])
print(transition_times[0])


('A', 'B', 'C', 'A', 'D', 'F', 'E', 'G', 'I', 'J', 'K', 'M', 'O')
[0.0, 2010.352, 1422.1, 1781.231, 1779.077, 1953.313, 1627.008, 2083.933, 1779.16, 1402.309, 1983.764, 1771.816, 1994.466]


In [59]:
#==> how to extract the features that feed to GAN
# first find the max length trace 
max_length = max([len(trace) for trace in traces])

In [60]:
# encoded sequence 
unique_activities = sorted(list(set(activity for trace in traces for activity in trace)))
activity_to_int = {activity: i + 1 for i, activity in enumerate(unique_activities)}
int_to_activity = {i + 1: activity for i, activity in enumerate(unique_activities)}
integer_encoded_traces = []
for trace in traces:
    encoded_trace = [activity_to_int[activity] for activity in trace]
    integer_encoded_traces.append(encoded_trace)

In [79]:
def safe_padding(traces, max_length):
    padded_traces = []
    for trace in traces:
        new_trace = list(trace)
        new_trace = new_trace[:max_length]
        padding_needed = max_length - len(new_trace)
        new_trace.extend([0.0] * padding_needed) # Using 0.0 for float context
        padded_traces.append(new_trace)
    return padded_traces

In [80]:
paddedTraces=safe_padding(integer_encoded_traces, max_length)

In [81]:
padded_time = safe_padding(transition_times, max_length)

In [82]:
from sklearn.preprocessing import MinMaxScaler

In [83]:
scaler = MinMaxScaler()

In [94]:
padded_time = np.array(padded_time, dtype=np.float32)
padded_trace = np.array(paddedTraces)

In [85]:
normalized_time_features = scaler.fit_transform(padded_time)

In [90]:
normalized_time_features.shape

(1000, 37)

In [96]:
padded_trace.shape

(1000, 37)

In [102]:
vocab_size = len(unique_activities) + 1

In [103]:
control_flow_matrix = np.eye(vocab_size)[padded_trace]
print(f"Shape after one-hot encoding: {control_flow_matrix.shape}")

Shape after one-hot encoding: (1000, 37, 17)


In [104]:
reshaped_time_features = np.reshape(padded_time, (1000, max_length, 1))
print(f"Shape of time features after reshape: {reshaped_time_features.shape}")

Shape of time features after reshape: (1000, 37, 1)


In [105]:
final_features = np.concatenate([control_flow_matrix, reshaped_time_features], axis=2)

In [106]:
final_features.shape

(1000, 37, 18)