In [None]:
import pm4py
import pandas

from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.obj import EventLog

# Step 1: Read and combine the two log files into one log
log_part_1 = xes_importer.apply("Road_Traffic_Fine_Management_Process_Data/Road_Traffic_Fine_Management_Process_Part1.xes")
log_part_2 = xes_importer.apply("Road_Traffic_Fine_Management_Process_Data/Road_Traffic_Fine_Management_Process_Part2.xes")
log = EventLog(list(log_part_1) + list(log_part_2))
log_df = pm4py.convert_to_dataframe(log)


In [None]:
# Step 2: Inspect the Loaded Log
print(f"Number of traces in the log: {len(log)}\n\n")
print(f"Details of the first log (log[0]):\n{log[0]}\n\n")
print(f"Details of the first event in the first log (log[0][0]):\n{log[0][0]}\n\n")
print(f"Starting activities and their count for all traces: \n{pm4py.get_start_activities(log)}\n\n")
end_activities = pm4py.get_end_activities(log)
print(f"End activities and their count for all traces: \n{end_activities}\n\n")
end_activities_counter = 0
iterations = 0
for key, value in end_activities.items():
     end_activities_counter += value
     iterations += 1
print(f'There is a total number of {iterations} end activities all activities end as the end activities are called {end_activities_counter} times which is the same number as the numbers of traces the log holds')

In [None]:
bpmn_model = pm4py.discover_bpmn_inductive(log)
pm4py.view_bpmn(bpmn_model)

petri_net, marking_1, marking_2 = pm4py.discover_petri_net_inductive(log)
pm4py.view_petri_net(petri_net)

In [None]:
bpm_model_975 = pm4py.discover_bpmn_inductive(log, 0.025)
pm4py.view_bpmn(bpm_model_975)

In [None]:
#map the traces according to their length
print("Traces according to their length:")
trace_list_dict = {}
for entry in log:
    trace_list_dict.setdefault(len(entry), []).append(entry)
sorted_trace_list_dict = dict(
    sorted(trace_list_dict.items(), key=lambda item: len(item[1]), reverse=True)
)
trace_count = 0
for key, value in sorted_trace_list_dict.items():
    trace_count += len(value)
    print(f'Number of activities per trace: {key}, number of traces: {len(value)}, % of traces accumulated: {trace_count/len(log)}')

print("\n98.9% of all traces are either 2, 5, 6, or 3 event long. Using these to build a petri net we get...")

accumulated_traces_list = list(sorted_trace_list_dict[2]) + list(sorted_trace_list_dict[5]) + list(sorted_trace_list_dict[6]) +list(sorted_trace_list_dict[3])
accumulated_trace_log = EventLog(accumulated_traces_list)
petri_net_98, m1, m2 = pm4py.discover_petri_net_inductive(accumulated_trace_log)
pm4py.view_petri_net(petri_net_98)
print(f'Evaluating the petri net; its fittness is: {pm4py.fitness_alignments(log, petri_net_98, m1, m2)}')

In [None]:
# Using the trace list dict to find happy path
log_len_2 = EventLog(trace_list_dict.get(2))
bpm_model_2 = pm4py.discover_bpmn_inductive(log_len_2)
petri_model_2, m1_2, m2_2 = pm4py.discover_petri_net_inductive(log_len_2)
print(pm4py.get_end_activities(log_len_2))
pm4py.view_bpmn(bpm_model_2)
pm4py.view_petri_net(petri_model_2)

In [None]:
log_len_5 = EventLog(trace_list_dict.get(5))
bpm_model_5 = pm4py.discover_bpmn_inductive(log_len_5)
print(pm4py.get_end_activities(log_len_5))
pm4py.view_bpmn(bpm_model_5)

petri_model_5, m1_5, m2_5 = pm4py.discover_petri_net_inductive(log_len_5)
pm4py.view_petri_net(petri_model_5)

In [None]:
log_len_6 = EventLog(trace_list_dict.get(12))
bpm_model_6 = pm4py.discover_bpmn_inductive(log_len_6)
pm4py.view_bpmn(bpm_model_6)

In [None]:
log_len_3 = EventLog(trace_list_dict.get(3))
bpm_model_3 = pm4py.discover_bpmn_inductive(log_len_3)
pm4py.view_bpmn(bpm_model_3)

In [None]:
bpm_model_20 = pm4py.discover_bpmn_inductive(EventLog(trace_list_dict.get(20)))
pm4py.view_bpmn(bpm_model_20)

In [None]:
variants = pm4py.stats.get_variants(log_len_5)
for key, value in variants.items():
     temp_bpmn = pm4py.discover_bpmn_inductive(EventLog(value))
     pm4py.view_bpmn(temp_bpmn)
print(len(variants))
type(variants)

In [None]:
from pm4py.objects.log.obj import EventLog, Trace
from pm4py.stats import get_variants
import pm4py

# Get unique variants of traces in the log with their frequencies
variants = get_variants(log)
counter_trace_list = {str(key): len(value) for key, value in variants.items()}

# Sort counter_trace_list by occurrences in descending order
sorted_counter_trace_list = dict(sorted(counter_trace_list.items(), key=lambda item: item[1], reverse=True))

accumulated_traces = 0
total_number_of_traces = len(log)

# Discover and view BPMN for each unique variant
for trace_sequence, count in sorted_counter_trace_list.items():
    # Create an EventLog object with the events in the variant
    variant_log = EventLog()
    trace = Trace()
    for activity in eval(trace_sequence):  # Convert string back to list
        trace.append({'concept:name': activity})
    variant_log.append(trace)

    # Discover BPMN for the variant trace
    temp_bpmn = pm4py.discover_bpmn_inductive(variant_log)
    pm4py.view_bpmn(temp_bpmn)
    accumulated_traces += count
    print(f"Variant {trace_sequence} appears \n{count} times.\nPercentage of trace occurence in log:  {count/total_number_of_traces}%\nAccumulated percentage of trace occurence: {accumulated_traces/total_number_of_traces}")

print(len(variants))

In [None]:
# Find the top 10 variants that contribute to 98.3% of the log
top_10_variants = list(sorted_counter_trace_list.items())[:10]
log_98 = EventLog()

for trace_sequence, count in top_10_variants:
    trace = Trace()
    activities = trace_sequence.strip("[]").replace("'", "").split(", ")
    for activity in activities:
        trace.append({'concept:name': activity.strip()})
    
    # Append the trace multiple times to reflect its frequency
    for _ in range(count):
        log_98.append(trace)

# Discover BPMN for the top 10 variants
bpmn_98 = pm4py.discover_bpmn_inductive(log_98)
pm4py.view_bpmn(bpmn_98)

#Discover Petri net for the top 10 variants
petri_net_top10, m1_top10, m2_top10 = pm4py.discover_petri_net_inductive(log_98)
pm4py.view_petri_net(petri_net_top10)
print(f'Evaluating the petri net; its fittness is: {pm4py.fitness_alignments(log, petri_net_top10,m1_top10,m2_top10)}')

In [None]:
# Analysis by time (find longest traces)
variants_durations = pm4py.get_all_case_durations(log,)
print(type(variants_durations))
print(variants_durations[1])
print(type(variants_durations[0]))
len(variants_durations)

# Create a dictionary to map case IDs to their durations
trace_duration_map = dict()

# Iterate through the log to populate the trace-duration map
counter = 0
for trace in log:
    # Use the trace attributes to get the case ID
    case_id = trace.attributes.get('concept:name')
    if case_id:  # Ensure the case_id exists
        # Get the start and end timestamp for the trace
        start_time = trace[0]['time:timestamp']
        end_time = trace[-1]['time:timestamp']
        # Calculate the duration
        duration = (end_time - start_time).total_seconds()  # in seconds
        # Store in the map
        trace_duration_map[case_id] = duration

# Display a slice of the trace-duration map
print(f' slice of trace_duration map:\n{({key: trace_duration_map[key] for key in list(trace_duration_map.keys())[150365:150370]})}')

print(f'Number of traces: {len(trace_duration_map)}')



In [None]:
import matplotlib.pyplot as plt
# Extract the time values
times = list(trace_duration_map.values())

# Create the histogram
bins = 20  # Number of bins for the histogram
hist, bin_edges, patches = plt.hist(times, bins=bins, edgecolor='black')

# Annotate with the number of cases
for i, count in enumerate(hist):
    plt.text(bin_edges[i] + (bin_edges[i+1] - bin_edges[i]) / 2,  # Position at bin center
             count + 1,  # Slightly above the bar
             int(count),  # Case count as text
             ha='center', fontsize=8)

# Format x-axis with custom labels
def format_duration(seconds):
    months = seconds // (30 * 24 * 3600)
    days = (seconds % (30 * 24 * 3600)) // (24 * 3600)
    return f"{int(months)}m {int(days)}d"

bin_labels = [format_duration(edge) for edge in bin_edges]
plt.xticks(ticks=bin_edges, labels=bin_labels, rotation=45, fontsize=8)

plt.title("Trace Duration Distribution")
plt.xlabel("Duration (Months and Days)")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Filter for durations up to 60 months (5 years)
threshold_seconds = 60 * 30 * 24 * 3600  # 60 months in seconds
filtered_times = [t for t in times if t <= threshold_seconds]
excluded_count = len(times) - len(filtered_times)

# Define bin edges for 6-month intervals
bin_edges = np.arange(0, threshold_seconds + 1, 6 * 30 * 24 * 3600)  # 6 months per bin

# Create the histogram for filtered times with custom colors
hist, _, patches = plt.hist(filtered_times, bins=bin_edges, edgecolor='black', color='orange')

# Annotate with the number of cases
for i, count in enumerate(hist):
    plt.text(bin_edges[i] + (bin_edges[i + 1] - bin_edges[i]) / 2,  # Position at bin center
             count + 1,  # Slightly above the bar
             int(count),  # Case count as text
             ha='center', fontsize=8)

# Format x-axis with custom labels
def format_duration(seconds):
    months = seconds / (30 * 24 * 3600)
    return f"{int(months)} months"

# Create bin labels for 6-month intervals
bin_labels = [format_duration(edge) for edge in bin_edges]

# Set the x-ticks and labels
plt.xticks(ticks=bin_edges, labels=bin_labels, rotation=45, fontsize=8)

plt.title("Trace Duration Distribution (Up to 60 Months)")
plt.xlabel("Duration (Months)")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()

# Print case counts
print(f"Total cases included in the histogram: {len(filtered_times)}")
print(f"Number of cases longer than 60 months: {excluded_count}")

In [None]:
# Discover event attributes, start activities, and end activities
event_attributes = pm4py.get_event_attributes(log)

print(event_attributes)



# Discover the Directly Follows Graph (DFG)
dfg, start_activities, end_activities = pm4py.discover_dfg(log)
print(type(dfg))
print(len(dfg))

# Visualize the filtered DFG
pm4py.view_dfg(dfg, start_activities=start_activities, end_activities=end_activities)

# Set a threshold value (e.g., only keep transitions with a frequency greater than 1000)
threshold = len(log)*0.025
print(threshold)
filtered_dfg = {k: v for k, v in dfg.items() if v > threshold}

# Now visualize the filtered DFG
pm4py.view_dfg(filtered_dfg, start_activities=start_activities, end_activities=end_activities, max_num_edges=100)

# Print event attributes
print("Event Attributes:", event_attributes)

for event_attribute in event_attributes:
    event_attributes_values = pm4py.get_event_attribute_values(log, event_attribute)
    print(event_attributes_values)


In [None]:
import pandas as pd
import pm4py

# Step 1: Get all possible consecutive event pairs (Directly Follows)
dfg = pm4py.discover_dfg(log)

# Step 2: Collect the timestamps for consecutive events
# We'll collect the timestamps for each consecutive event pair and calculate the time difference
event_pairs = []

for trace in log:
    for i in range(len(trace) - 1):
        # Get the current event and the next event
        current_event = trace[i]
        next_event = trace[i + 1]
        
        # Get timestamps of the current event and next event
        current_time = current_event['time:timestamp']
        next_time = next_event['time:timestamp']
        
        # Calculate the time difference in seconds
        time_diff = (next_time - current_time).total_seconds()  # In seconds
        
        # Append the event pair and time difference
        event_pairs.append(((current_event['concept:name'], next_event['concept:name']), time_diff))

# Convert the event pairs list to a DataFrame for better analysis
df = pd.DataFrame(event_pairs, columns=['Event Pair', 'Time Diff (seconds)'])

# Step 3: Compute statistical measures for each event pair
# Function to convert seconds into years, months, and days
def format_duration(seconds):
    # Convert seconds to years, months, and days
    years = seconds // (365 * 24 * 3600)  # Approximation: 1 year = 365 days
    months = (seconds % (365 * 24 * 3600)) // (30 * 24 * 3600)  # Approximation: 1 month = 30 days
    days = (seconds % (30 * 24 * 3600)) // (24 * 3600)
    return f"{int(years)}y {int(months)}m {int(days)}d"

# Calculate statistical measures for each event pair
stats = df.groupby('Event Pair')['Time Diff (seconds)'].agg(['mean', 'median', 'min', 'max', 'std'])

# Replace NaN values with 0 won't work... we have to delete them
stats.dropna(inplace = True)

# Convert the time statistics to years, months, and days for better readability
for stat in ['mean', 'median', 'min', 'max', 'std']:
    # Convert seconds to integers before formatting
    stats[stat] = stats[stat].apply(lambda x: format_duration(x) if isinstance(x, (int, float)) else x)

# Print the statistics for each event pair
print(stats)
