In [18]:
import pandas as pd
import pm4py

### Importing the log

In [None]:
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.conversion.log import converter
file_path = "./Road_Traffic_Fine_Management_Process.xes"
xes_event_log = xes_importer.apply(file_path)

event_log = converter.apply(xes_event_log, variant=converter.Variants.TO_DATA_FRAME)

event_log

In [None]:
start_activities = pm4py.get_start_activities(event_log)
end_activities = pm4py.get_end_activities(event_log)
print("Start activities: {}\nEnd activities: {}".format(start_activities, end_activities))

### Process Discovery
Petri Net (using two different algorithms)

In [None]:
from pm4py.visualization.petri_net import visualizer as pn_visualizer

# Alpha Miner
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
alpha_net, alpha_initial_marking, alpha_final_marking = alpha_miner.apply(event_log)
alpha_graphviz = pn_visualizer.apply(alpha_net, alpha_initial_marking, alpha_final_marking)
alpha_graphviz.graph_attr['bgcolor'] = 'white'
pn_visualizer.view(alpha_graphviz)
# pn_visualizer.save(alpha_graphviz, "pn_alpha_miner.png")

# Inductive Miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
ind_net, ind_initial_marking, ind_final_marking = inductive_miner.apply(event_log)
ind_graphviz = pn_visualizer.apply(ind_net, ind_initial_marking, ind_final_marking)
ind_graphviz.graph_attr['bgcolor'] = 'white'
pn_visualizer.view(ind_graphviz)
# pn_visualizer.save(ind_graphviz, "pn_inductive_miner.png")

Other possible process models

In [None]:
# Process Tree
process_tree = pm4py.discover_process_tree_inductive(event_log)
pm4py.view_process_tree(process_tree)

# BPMN Model
bpmn_model = pm4py.convert_to_bpmn(process_tree)
pm4py.view_bpmn(bpmn_model)

# Process Map (Directly Follows Graph)
dfg, start_activities, end_activities = pm4py.discover_dfg(event_log)
pm4py.view_dfg(dfg, start_activities, end_activities)

# Heuristic Miner
heu = pm4py.discover_heuristics_net(event_log)
pm4py.view_heuristics_net(heu)

### Decision points

In [4]:
# For each decision point (place with at least 2 outgoing arcs), gets the labels of the target transitions
# Invisible transitions (in case of the inductive miner) are not taken into account for now: decision nodes with invisible transitions are simply not considered in the analysis

decision_points_and_trans = dict()

def find_decision_point_transitions(dec_point):
    for arc in dec_point.out_arcs:
        if arc.target.label is not None:
            decision_points_and_trans[dec_point].append(arc.target.label)
        #else:
            #trans_out = arc.target
            #trans_out_arcs = trans_out.out_arcs
            #for arc2 in trans_out_arcs:
                #find_decision_point_transitions(arc2.target)

    # Removing nodes with following invisible transitions (= only one transition in the dictionary)
    if len(decision_points_and_trans[dec_point]) < 2:
        del decision_points_and_trans[dec_point]

for place in ind_net.places:
    if len(place.out_arcs) >= 2:
        decision_points_and_trans[place] = list()
        find_decision_point_transitions(place)

decision_points_and_trans

{p_7: ['Appeal to Judge',
  'Send Fine',
  'Insert Date Appeal to Prefecture',
  'Payment']}

### Observation instances
Now that for every decision point we have the list of observed transitions, we need to build the observation instances.
The observation instances of a decision point are the instances (x, t) where x are the observed values of the attributes, while t is the observed transition.
So, every time we see a transition t in the event log, we retrieve the values x of the attributes before the transition happens, and we add the instance to the observation instances for that decision point.

In [None]:
import math
amount = dict()
gb = event_log.groupby(['case:concept:name'])

# ONLY 1 DECISION POINT FOR NOW, NEEDS TO BE GENERALIZED (MULTI-DIMENSIONAL MATRIX?)
def create_dataframe_for_dp(dp):
    trans_dataframes = list()
    for trans in decision_points_and_trans[dp]:
        amount[trans] = list()
        # For each transition, I put in this list the values of the attributes observed before that transition happened (TESTING ONLY WITH ATTRIBUTE "AMOUNT" FOR NOW)
        # Need to look in the event_log dataframe and handle NaN/NIL values, going back within the same case
        for case in gb.groups:
            for row in gb.groups[case]:
                if event_log.iloc[row]['concept:name'] == trans:
                    row_iter = row
                    # If nan, go back within the same case to find the value (if any) until going back would be too much (beginning of that case)
                    while row_iter >= gb.groups[case][0]:
                        if not math.isnan(event_log.iloc[row_iter]['amount']):
                            amount[trans].append(event_log.iloc[row_iter]['amount'])
                            break
                        row_iter = row_iter - 1
        # Creating the dataframe
        df_trans = pd.DataFrame(amount[trans], columns=['amount'])
        df_trans['decision point'] = dp
        df_trans['transition'] = trans
        trans_dataframes.append(df_trans)

    res_df = pd.concat(trans_dataframes)
    return res_df

# For each decision point found, it creates a dataframe containing the attribute values for each event of interest, according to the observed transitions
dp_dataframes = list()
for decision_point in decision_points_and_trans.keys():
    df_dp = create_dataframe_for_dp(decision_point)
    dp_dataframes.append(df_dp)

# In the end, we have a single dataframe for all the decision points, containing [attribute values, decision point, observed transition] for every event of interest
final_df = pd.concat(dp_dataframes)

final_df

In [None]:
final_df

### Decision Tree

In [56]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split

features = ['amount']
classes = final_df['transition'].unique()

X_train, X_test, Y_train, Y_test = train_test_split(final_df['amount'], final_df['transition'], random_state=0)

In [None]:
tree_clf = DecisionTreeClassifier(max_depth = 2, random_state = 0)

# Reshape since we are using only 1 attribute for now
X_train_np = X_train.to_numpy().reshape(-1, 1)
Y_train_np = Y_train.to_numpy().reshape(-1, 1)
tree_clf.fit(X_train_np, Y_train_np)

In [None]:
from sklearn import tree
import matplotlib.pyplot as plt

fig, axes = plt.subplots(nrows = 1,ncols = 1,figsize = (4,4), dpi=300)
tree.plot_tree(tree_clf,
               feature_names = features,
               class_names = classes,
               filled = True)

fig.savefig('tree.png')