In [1]:
import pm4py
import pandas
import dash
import numpy as np
import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.express as px
from plotly.subplots import make_subplots
import statistics
from collections import defaultdict
from dash import Dash, html, dcc, callback, Output, Input
from pm4py.algo.simulation.montecarlo import algorithm as montecarlo_simulation
from pm4py.algo.conformance.tokenreplay.algorithm import Variants
from pm4py.objects.log.util import interval_lifecycle
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.log.util import dataframe_utils



In [2]:
# import log
log = xes_importer.apply('review_example_large.xes', variant=xes_importer.Variants.LINE_BY_LINE)
#log = xes_importer.apply('example.xes', variant=xes_importer.Variants.LINE_BY_LINE)
log = log_converter.apply(log, variant=log_converter.Variants.TO_DATA_FRAME)
log = dataframe_utils.convert_timestamp_columns_in_df(log, timest_format=pandas.Timestamp.isoformat)

# convert to interval style log
log = pm4py.convert_to_dataframe(interval_lifecycle.to_interval(pm4py.convert_to_event_log(log)))

# filter top k
log = pm4py.filter_variants_top_k(log, 1)

# Make the log interval style
log = pm4py.convert_to_dataframe(interval_lifecycle.to_interval(pm4py.convert_to_event_log(log)))



In [3]:
# dfg, sa, ea = pm4py.discover_performance_dfg(log)
# pm4py.view_performance_dfg(dfg, sa, ea)

# net, im, fm = pm4py.convert_to_petri_net(dfg, sa, ea)

# parameters = {    
#     montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.TOKEN_REPLAY_VARIANT : Variants.BACKWARDS,
#     montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.PARAM_CASE_ARRIVAL_RATIO : 20000,
#     montecarlo_simulation.Variants.PETRI_SEMAPH_FIFO.value.Parameters.PARAM_ENABLE_DIAGNOSTICS : False
# }


# simulated_log, res = montecarlo_simulation.apply(log, net, im, fm, parameters=parameters)
# playout_log = pm4py.play_out(net, im, fm)


In [4]:
#print(simulated_log[0][0])
#print(playout_log[0][0])
#print(pm4py.convert_to_event_log(log)[0][0])

In [5]:
# Initialize a dictionary to store node and link data
activities = []
links = defaultdict(lambda: {"waiting_times": []})

ACTIVITY = "concept:name"
TRACE = "case:concept:name"
TIMESTAMP = "time:timestamp"
END  = "time:timestamp"
START = "start_timestamp"
WAITING = "waiting_times"

# Group events by case (trace) and sort them by timestamp
sorted_log = log.groupby(TRACE).apply(lambda group: group.sort_values(TIMESTAMP)).reset_index(drop=True)

previous_event = None
for idx, current_event in sorted_log.iterrows():

    # Keep track of the activities
    if current_event[ACTIVITY] not in activities:
        activities.append(current_event[ACTIVITY])

    # Calculate waiting time between activities
    if previous_event is not None and previous_event[TRACE] == current_event[TRACE]:
        waiting_time = (current_event[START] - previous_event[END]).total_seconds()
        transition = (previous_event[ACTIVITY], current_event[ACTIVITY])
        links[transition]["waiting_times"].append(waiting_time)

    previous_event = current_event


In [6]:
for transition, metrics in links.items():
    waiting_times= links[transition][WAITING]

    links[transition]['sum'] = sum(waiting_times)
    links[transition]['min'] = min(waiting_times)
    links[transition]['max'] = max(waiting_times)
    links[transition]['median'] = statistics.median(waiting_times)
    links[transition]['mean'] = statistics.mean(waiting_times)
    links[transition]['stdev'] = statistics.stdev(waiting_times) if len(waiting_times) > 1 else 0

In [7]:
source, target = zip(*links.keys())
node_labels = list(set(source + target))

source_nodes = [node_labels.index(transition[0]) for transition in links.keys()]
target_nodes = [node_labels.index(transition[1]) for transition in links.keys()]

epsilon = 1e-9

mean_values = [round(value['mean']) + epsilon for value in links.values()]
median_values = [round(value['median']) + epsilon for value in links.values()]
max_values = [round(value['max']) + epsilon for value in links.values()]
min_values = [round(value['min']) + epsilon for value in links.values()]
sum_values = [round(value['sum']) + epsilon for value in links.values()]
stdev_values = [round(value['stdev']) + epsilon for value in links.values()]

metrics = {"mean": mean_values, "median": median_values, "max":max_values, "min": min_values}

In [18]:
def seconds_to_dhms_string(seconds):
    days, remainder = divmod(seconds, 86400)
    hours, remainder = divmod(remainder, 3600)
    minutes, seconds = divmod(remainder, 60)

    _days = "" if round(days) == 0 else f"{int(days)}d"
    _hours = "" if round(hours) == 0 else f"{int(hours)}h"
    _minutes = "" if round(minutes) == 0 else f"{int(minutes)}m"
    _seconds = "" if round(seconds) == 0 and (round(minutes) != 0 or round(hours) != 0 or round(days) != 0) else f"{int(seconds)}s"

    return f"{_days} {_hours} {_minutes} {_seconds}"

def select_custom_tickvals(data, num_ticks=5):
    # Calculate the minimum and maximum values in the data
    min_val = min(data)
    max_val = max(data)

    # Calculate the tick interval to achieve even spacing
    tick_interval = (max_val - min_val) / (num_ticks - 1)

    # Calculate custom tick values with even spacing
    custom_tickvals = np.arange(min_val, max_val + tick_interval, tick_interval)

    return custom_tickvals.tolist()

def get_colors(values, global_scale):
  min_value = min(values)
  max_value = max(values)
  
  if global_scale:
    min_value = min(metrics['min'])
    max_value = max(metrics['max'])

  return [get_color(value, min_value, max_value) for value in values]

def get_color(value, min_value, max_value):
    if max_value == min_value:
        normalized_value = 1
    else:
      normalized_value = (value - min_value) / (max_value - min_value)

    hue = 120 - int(120 * normalized_value)  # Hue from 120 (green) to 0 (red)
    saturation = 50  # Reduced saturation for subdued colors
    lightness = 50   # Medium lightness for a pastel effect
    return f'hsl({hue}, {saturation}%, {lightness}%)'

def generate_scatter(node, metrics, color_scale_global):
  if node == None or metrics == None: return {'layout': go.Layout(title=f'Hover over Link for information')} 
  y_axis = metrics[WAITING]
  x_axis = [i for i in range(len(y_axis))]

  y_ticks = select_custom_tickvals(y_axis)

  return {
          'data': [go.Scatter(
            x=x_axis, 
            y=y_axis, 
            mode='markers',
            marker= dict(
              color = get_colors(y_axis, color_scale_global)
            )
            
            )],
          'layout': go.Layout(
            yaxis=dict(
              tickvals = y_ticks,
              ticktext = [seconds_to_dhms_string(s) for s in y_ticks]
            )
          ),
        }

def generate_histogramm(node, color_scale_global):
  data = links[node][WAITING]
  data = [int(v) for v in data]

  unique_values, value_counts = np.unique(data, return_counts=True)

  trace = go.Histogram(x=data, nbinsx=20, opacity=0.7, marker=dict(color = get_colors(data, color_scale_global)))
  layout = go.Layout(
    xaxis=dict(title='Value'),
    yaxis=dict(title='Frequency'),
    bargap=0.05
  )
  return go.Figure(data=[trace], layout=layout)
   

def generate_sankey(metric, color_scale_global=False):
  return go.Figure(go.Sankey(
    arrangement = "snap",
    valuesuffix = "s",

    node = dict(
      pad = 50,
      thickness = 10,
      line = dict(width = 0),
      label = node_labels,
    ),
    link = dict(
      source = source_nodes,
      target = target_nodes,
      value = metrics[metric],
      color = get_colors(metrics[metric], color_scale_global),
      customdata = [seconds_to_dhms_string(v) for v in metrics[metric]],
      hovertemplate =  metric + ": %{customdata}"
  )))

def get_sorce_target_from_hover_data(data):
  if data is None: return
  if 'group' in data['points'][0]: return
  idx = data['points'][0]['index']
  return (source[idx], target[idx])
  


In [19]:
app = dash.Dash(__name__)

sankey_div = html.Div([dcc.Graph(id='sankey-plot')], style={'width': '66%', 'padding': '20px'})

# Filter div with 33% width
filter_div = html.Div(
    children=[
        dcc.Dropdown(
            id='metric-dropdown',
            options=[{'label': key, 'value': key} for key in metrics.keys()],
            value='median',
        ),
        dcc.Checklist(
            id='global-color-scale',
            options=[{'label': 'Use global color scale', 'value': True}],
            value=[]
        ),
        html.Div([html.Plaintext(id='hover-details', children="Hover over a Sankey element to see details:")])
    ],
    style={'width': '33%', 'padding': '20px'}
)

# Details div with children having even spacing horizontally
details_div = html.Div([
    html.Div([dcc.Graph(id='scatter-plot', figure={})], style={'flex': 1}),
    html.Div([dcc.Graph(id='hist-plot', figure={})], style={'flex': 1}),
], style={'display': 'flex', 'flex-wrap': 'wrap'})

# Overall layout
app.layout = html.Div(
    style={'backgroundColor': 'white', 'zoom': '50%'},
    children=[
        html.Div(style={'display': 'flex'}, children=[sankey_div, filter_div]),  # First row
        details_div,  # Second row
    ]
)

@callback(
    Output('hover-details', 'children'),
    Output('scatter-plot', 'figure'),
    Output('hist-plot', 'figure'),
    Input('sankey-plot', 'hoverData'),
    Input('global-color-scale', 'value')
)
def update_details(hover_data, color_scale_global):
    node = get_sorce_target_from_hover_data(hover_data)

    if node is None: return "",{},{}

    metrics = [f"{k} : {seconds_to_dhms_string(v)}" for k,v in links[node].items() if k != "waiting_times"]

    return "\n".join(metrics), generate_scatter(node, links[node], color_scale_global), generate_histogramm(node, color_scale_global)

# Filter Sankey
@callback(
    Output('sankey-plot', 'figure'),
    Input('metric-dropdown', 'value'),
    Input('global-color-scale', 'value')
)
def update_sankey(metric, color_scale_global):
    return generate_sankey(metric, color_scale_global)
    

# Run the app in Jupyter notebook inline mode
if __name__ == '__main__':
    app.run_server(mode="inline")