In [7]:
import ast
from event_loop.preprocessing.dataframe import *

import pandas as pd
from const import *

from sklearn.metrics import classification_report

# Setup
load data and determine parameter settings

In [8]:
SETTING = "HR"  # Change to "PTP" for Purchase To Pay Stream Pipeline

In [9]:


# Parse json-like Message Attributes Column as object
converters={"MessageAttributes": ast.literal_eval}

if SETTING == "HR": 
    # path to training data file
    train_path = HR_TRAIN_PATH
    # path to interleaved data file for streaming
    il_path = HR_IL_PATH
    # path to ground truth data
    gt_path = HR_GT_PATH
    # path to BPMN reference model
    bpmn_path = HR_BPMN_PATH
    # data attributes for the HR process
    attributes = HR_ATTRIBUTES
    # case ID indicator attribute
    case_id_primary = HR_CASE_ID_PRIMARY
    
    
elif SETTING == "PTP": 
    train_path = PTP_TRAIN_PATH
    il_path = PTP_IL_PATH
    gt_path = PTP_GT_PATH
    bpmn_path = PTP_BPMN_PATH
    attributes = PTP_ATTRIBUTES
    case_id_primary = PTP_CASE_ID_PRIMARY

In [10]:
# load data
df_train_in = pd.read_csv(train_path, converters=converters)
df_il_in = pd.read_csv(il_path, converters=converters)
df_gt = pd.read_csv(gt_path)

# Warm Up Phase
preprocess training data and fit models

In [11]:
# preprocess training data
df_train = pre_process(df_train_in)
df_train = assign_sequence_number(df_train)
df_train = mark_start_end(df_train)

In [12]:
# according to Hadad et al. trace #128 contains erroneous data - exclude it (for PTP)
df_train_flt = df_train[~df_train["SequenceNumber"].isin([128])]

In [13]:
from event_loop.event_activity import EventActivityAssignment
from event_loop.activity_type import ActivityTypeClassifier
from event_loop.activity_boundaries import ActivityBoundariesClassifier

# initialize and fit models

activity_boundaries_classifier = ActivityBoundariesClassifier(df_train_flt, None)

activity_type_classifier = ActivityTypeClassifier(df_train)

event_activity_model = EventActivityAssignment(df_train,10, attributes)

# Streaming Phase

Main loop. Gets raw R1 data as input. 
Applies filtering, activity action and sequence classification

In [14]:
records = df_il_in.to_dict("records")

## Event Loop

In [15]:
from event_loop.event import Event
from event_loop.stack import Stack
from event_loop.event_activity import search_stack_for_request_frame, search_stream_index

import time
from event_loop.preprocessing.event import keep_event

# Parameters for the Event Loop
EVENT_LOOP_CUTOFF_NO_ACTION = 3
EVENT_LOOP_CUTOFF_END_EVENT = 3
ENTROPY_THRESHOLD = 0.4 #0.5
MAX_WINDOW_SIZE = 10
VERBOSE = False

event_buffer: list[Event] = []
attribute_buffer: list[dict] = []
stacks: list[Stack] = []
stacks_out: list[Stack] = []
event_loop_index = 0

# variables for logging processing times and buffer sizes
processing_times = []
processing_times_filter = []
buffer_sizes = []

# start of event loop
for i, event_data in enumerate(records):
    start_time = time.time()

    buffer_sizes.append(sum([len(stack) for stack in stacks]))
    # filter raw event stream
    if not keep_event(event_data):
        end_time = time.time()
        processing_times_filter.append(end_time - start_time)
        # skip event in loop
        continue
    
    # count every kept event for event loop index
    event_loop_index += 1

    # Generate Network Event from Raw Network Package
    event = Event(event_data, event_loop_index, event_buffer, SETTING)
    event_buffer.append(event)
    
    # Determine Activity Action (start, between, end) of the Event
    activity_boundaries_classifier.classify_event(event)
    activity_action = event.activity_action
    
    # Activity-Event Assignment
    
    # Start Events initialize a new stack
    if activity_action == "Activity Start": 
        stacks.append(Stack(SETTING,event))
        
    # Between Events need to be matched to a current stack
    if activity_action == "NoAction": 
        
        # Only one stack open - no matching necessary
        if len(stacks) == 1: 
            stacks[0].append_event(event)
        
        # event has a origin package number. Assign to the stack containing the origin event
        elif event.origin_request_frame: 
            idx = search_stack_for_request_frame(event.origin_request_frame, stacks)
            stacks[idx].append_event(event)
        
        else:             
            # create exclude list that contains stack with different attributes than the event
            exclude_indices =  event_activity_model.exclude_stacks_by_attribute(stacks, event)
    
            # check for potential match with event attributes
            stack_index:int = event_activity_model.check_stack_attributes(stacks, event, exclude_indices)
                   
            # index = -1 indicates no match has been found yet 
            if stack_index == -1:    
                
                # assign to stack via pattern matching
                stack_index = event_activity_model.assign_to_sequence(event,stacks, 4, exclude_indices)
            
            # fallback to stream index
            if stack_index == -1: 
                stack_index = search_stream_index(stacks, event, exclude_indices)    
            
            # fallback: add to first confident stack not in exclude indices
            if stack_index == -1:
                res = next((i for i in range(len(stacks)) if i not in exclude_indices and stacks[i].confidence),-1)
                stack_index = res
                
            # add event to determined index
            stacks[stack_index].append_event(event)
        
    if activity_action == "Activity End":
        # end events contain a request frame. Add to stack containing the request frame
        stack_index = search_stack_for_request_frame(event.origin_request_frame, stacks)
        stacks[stack_index].append_event(event)
        
        # only emit the stack if the end-event classification was confident
        if event.confidence: 
            if len(stacks) > 1: 
                stack = stacks.pop(stack_index)
                activity_type_classifier.classify_stack(stack)
                stacks_out.append(stack)
                
            else: 
                event.confidence = False
     

    # Emit closed stacks and handle uncertain events 
    for idx, stack in enumerate(stacks):
        last_event = stack[-1]
        # check for uncertain "NoAction" (between) events. These could be end events.
        if not last_event.confidence and last_event.activity_action == "NoAction":
            # If no event has been added to the stack for N event loops, pop and emit it. 
            if event_loop_index - last_event.event_loop_index > EVENT_LOOP_CUTOFF_NO_ACTION: 
                stacks.pop(idx)
                activity_type_classifier.classify_stack(stack)
                stacks_out.append(stack)
                
    for idx, stack in enumerate(stacks): 
        last_event = stack.events[-1]
        # check for uncertain "End Events". These could be between events.
        if not last_event.confidence and last_event.activity_action == "Activity End": 
            # If no event has been added to the stack for N event loops, pop it and emit it.
            if event_loop_index - last_event.event_loop_index > EVENT_LOOP_CUTOFF_END_EVENT: 
            
                # we are now sure to pop the stack. 
                stacks.pop(idx)
                activity_type_classifier.classify_stack(stack)
                stacks_out.append(stack) 
                
    end_time = time.time()
    processing_times.append(end_time - start_time)
                
# pop all stacks that are still left at the end of the event loop
for stack in stacks: 
    activity_type_classifier.classify_stack(stack)
    stacks_out.append(stack)  


## Evaluation

In [16]:
df_test, test_labels = create_eval_dataframe(df_gt, df_il_in)

In [17]:
stack_predictions = [stack.activity_type for stack in stacks_out]

In [18]:
start = [stack[0].frame_number for stack in stacks_out]
end = [stack[-1].frame_number for stack in stacks_out]

res_df = pd.DataFrame({"start_pred":start, "end_pred":end})

eval_df = df_gt[["start", "actual_end"]].merge(res_df,how="left", left_on ="start", right_on = "start_pred").fillna(-1).astype(int)
eval_df["end_pred_true"] = eval_df["actual_end"] == eval_df["end_pred"]
eval_df["start_pred_true"] = eval_df["start"] == eval_df["start_pred"]
eval_df["start_end_true"] =eval_df["start_pred_true"] == eval_df["end_pred_true"]

display(eval_df)
print(f"Overall matching accuracy: {(eval_df['start_pred_true'].mean() + eval_df['end_pred_true'].mean()) / 2}")

Unnamed: 0,start,actual_end,start_pred,end_pred,end_pred_true,start_pred_true,start_end_true
0,17,325,17,325,True,True,True
1,356,1192,356,1192,True,True,True
2,1212,1520,1212,1520,True,True,True
3,1582,2336,1582,2336,True,True,True
4,2354,2664,2354,2664,True,True,True
5,2708,4461,2708,4461,True,True,True
6,3057,4871,3057,4871,True,True,True
7,4467,4881,4467,4881,True,True,True
8,4939,6164,4939,6164,True,True,True
9,5606,6859,5606,6859,True,True,True


Overall matching accuracy: 1.0


In [19]:
unique_no_nan = lambda x: list(filter(None, pd.unique(x)))
first_unique = lambda x: unique_no_nan(x)[0]

def compare_values(x,y):
    # Multi index and casting magic - I just want to compare the bp_ids lol
    x = int(x[0])
    y = int(y[0])

    return x == y

if SETTING == "PTP": 
    # Create dataframe with mapping of frame numbers to event stacks
    frame_numbers = [event.frame_number for idx,stack in enumerate(stacks_out) for event in stack]
    stack_numbers = [idx for idx,stack in enumerate(stacks_out) for event in stack]
    case_id = [stack.case_id["id"]  if stack.case_id else -1 for idx, stack in enumerate(stacks_out) for event in stack]
    sniff_time =  [event.sniff_time for idx,stack in enumerate(stacks_out) for event in stack]
    sale_order_ids = [event.attributes["sale_order_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    sale_order_line_ids = [event.attributes["sale_order_line_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    purchase_requisition_ids = [event.attributes["purchase_requisition_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    purchase_requisition_line_ids = [event.attributes["purchase_requisition_line_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    purchase_order_ids = [event.attributes["purchase_order_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    sale_order_line_id_case_id=  [stack.case_id["sale_order_line_id"] if stack.case_id else -1 for idx, stack in enumerate(stacks_out) for event in stack]
    
    
    df_frame_numbers = pd.DataFrame(data={"frame.number": frame_numbers, "sniff_time": sniff_time, "stack_idx": stack_numbers, "sale_order_id": sale_order_ids,"sale_order_line_id": sale_order_line_ids,"sale_order_line_id_case_id":sale_order_line_id_case_id,"purchase_requisition_id": purchase_requisition_ids,"purchase_requisition_line_id": purchase_requisition_line_ids, "purchase_order_id":purchase_order_ids, "case_id": case_id})
    
    # Merge Activity Name from ground truth frame to event sequences for evaluation
    merged_df = df_frame_numbers.merge(df_gt[["activity_name","start","bp_id"]], how="left",left_on="frame.number", right_on="start").drop(columns="start")
    
    merged_df[["activity_name","bp_id"]] = merged_df.groupby("stack_idx")[["activity_name","bp_id"]].ffill()
    
    # Merge with filtered interleaved test data
    merged_df = df_test.merge(merged_df, on="frame.number")
    
    res = merged_df.groupby("stack_idx").agg(sale_order_id = ("sale_order_id", unique_no_nan),sale_order_line_id=("sale_order_line_id", unique_no_nan), sale_order_line_id_case_id=("sale_order_line_id_case_id", unique_no_nan),purchase_requisition_id=("purchase_requisition_id", unique_no_nan),purchase_requisition_line_id=("purchase_requisition_line_id", unique_no_nan),purchase_order_id=("purchase_order_id",unique_no_nan),case_id=("case_id", first_unique),bp_id=("bp_id", unique_no_nan),frame_number_min=("frame.number","min"),frame_number_max =  ("frame.number","max"),sniff_time_min=("sniff_time_x","min"),sniff_time_max=("sniff_time_x","min"), activity_name=("activity_name", lambda x: x.head(1)))
    
    res = res.merge(eval_df[["start_pred_true","end_pred_true","start_end_true"]], left_index=True, right_index=True)
    res["stack_prediction"] = stack_predictions
    # Apply the custom function to compare 'sale_order_line_id' and 'sale_order_line_id_case_id'
    res["bp_true"] = res.apply(lambda x: compare_values(x["sale_order_line_id_case_id"], x["bp_id"]), axis = 1)
    res["activity_true"] = res["activity_name"] ==  res["stack_prediction"]
    #res.loc["Mean","bp_true"] = res["bp_true"].mean()
    #res.loc["Mean","activity_true"] = res["activity_true"].mean()
    
elif SETTING == "HR": 
    # Create dataframe with mapping of frame numbers to event stacks
    frame_numbers = [event.frame_number for idx,stack in enumerate(stacks_out) for event in stack]
    stack_numbers = [idx for idx,stack in enumerate(stacks_out) for event in stack]
    applicant_ids = [event.attributes["applicant_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    activity_ids = [event.attributes["activity_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    mail_ids = [event.attributes["mail_id"] for idx,stack in enumerate(stacks_out) for event in stack]
    sniff_time =  [event.sniff_time for idx,stack in enumerate(stacks_out) for event in stack]
    case_id = [stack.case_id["id"]  if stack.case_id else -1 for idx, stack in enumerate(stacks_out) for event in stack]
    
    df_frame_numbers = pd.DataFrame(data={"frame.number": frame_numbers, "sniff_time": sniff_time, "stack_idx": stack_numbers, "applicant_id": applicant_ids,"activity_id": activity_ids, "mail_id":mail_ids,"case_id": case_id})
    
    # Merge Activity Name from ground truth frame to event sequences for evaluation
    merged_df = df_frame_numbers.merge(df_gt[["activity_name","start","bp_id"]], how="left",left_on="frame.number", right_on="start").drop(columns="start")
    
    merged_df[["activity_name","bp_id"]] = merged_df.groupby("stack_idx")[["activity_name","bp_id"]].ffill()
    #merged_df["activity_name"] = merged_df.groupby("stack_idx")["bp_id"].ffill()
    
    # Merge with filtered interleaved test data
    merged_df = df_test.merge(merged_df, on="frame.number")  
    
    res = merged_df.groupby("stack_idx").agg(applicant_id = ("applicant_id", unique_no_nan),activity_id=("activity_id", unique_no_nan), mail_id=("mail_id", unique_no_nan),case_id=("case_id", first_unique),bp_id=("bp_id", unique_no_nan),frame_number_min=("frame.number","min"),frame_number_max =  ("frame.number","max"),sniff_time_min=("sniff_time_x","min"),sniff_time_max=("sniff_time_x","min"), activity_name=("activity_name", lambda x: x.head(1)))
    res["stack_prediction"] = stack_predictions
    # Apply the custom function to compare 'sale_order_line_id' and 'sale_order_line_id_case_id'
    res["bp_true"] = res.apply(lambda x: compare_values(x["applicant_id"], x["bp_id"]), axis = 1)
    res["activity_true"] = res["activity_name"] ==  res["stack_prediction"]
    #res.loc["Mean","bp_true"] = res["bp_true"].mean()
    #res.loc["Mean","activity_true"] = res["activity_true"].mean()  

### Activity Boundaries

In [20]:
df_aa_test = pd.DataFrame(df_test[["frame.number", "ActivityAction"]])
df_aa_test["ActivityAction"] = "NoAction"
df_aa_test.loc[df_aa_test["frame.number"].isin(eval_df["end_pred"]), "ActivityAction"] = "Activity End"
df_aa_test.loc[df_aa_test["frame.number"].isin(eval_df["start_pred"]), "ActivityAction"] = "Activity Start"
print(classification_report(test_labels, df_aa_test["ActivityAction"]))

                precision    recall  f1-score   support

  Activity End       1.00      1.00      1.00        37
Activity Start       1.00      1.00      1.00        37
      NoAction       1.00      1.00      1.00      1239

      accuracy                           1.00      1313
     macro avg       1.00      1.00      1.00      1313
  weighted avg       1.00      1.00      1.00      1313


### Activity Type 

In [21]:
print(classification_report(res["activity_name"], res["stack_prediction"]))

                                 precision    recall  f1-score   support

               ContractProposal       1.00      1.00      1.00         1
 GenerateJobApplicationActivity       1.00      1.00      1.00        10
         PerformAnInterviewCall       1.00      1.00      1.00         5
      PerformAnInterviewMeeting       1.00      1.00      1.00         3
           ResumeReviewActivity       1.00      1.00      1.00        10
ScheduleAnInterviewActivityCall       1.00      1.00      1.00         5
     ScheduleAnInterviewMeeting       1.00      1.00      1.00         3

                       accuracy                           1.00        37
                      macro avg       1.00      1.00      1.00        37
                   weighted avg       1.00      1.00      1.00        37


### Case ID

In [22]:
first_int = lambda x: int(x[0])
pred = res[case_id_primary].map(first_int)
true = res["bp_id"].map(first_int)

print("------------------ Activity Type --------------")
print(classification_report(true[pred!= -1], pred[pred!= -1], zero_division=0.0))

------------------ Activity Type --------------
              precision    recall  f1-score   support

        1224       1.00      1.00      1.00         2
        1225       1.00      1.00      1.00         4
        1226       1.00      1.00      1.00         2
        1227       1.00      1.00      1.00         2
        1228       1.00      1.00      1.00         6
        1229       1.00      1.00      1.00         7
        1230       1.00      1.00      1.00         2
        1231       1.00      1.00      1.00         6
        1232       1.00      1.00      1.00         4
        1233       1.00      1.00      1.00         2

    accuracy                           1.00        37
   macro avg       1.00      1.00      1.00        37
weighted avg       1.00      1.00      1.00        37


In [23]:
events_out = res.sort_values(by= "sniff_time_min")[["sniff_time_min","stack_prediction","case_id"]].reset_index(drop=True)
events_out.columns = ["timestamp", "activity", "case_id"]
events_out["timestamp"] = pd.to_datetime(events_out["timestamp"])
events_out["case_id"] = events_out["case_id"].astype(str)

### Conformance

In [24]:
import pm4py

bpmn_graph = pm4py.read_bpmn(bpmn_path)
petri_net, im, fm = pm4py.convert_to_petri_net(bpmn_graph)

In [25]:
args = {"case_id_key":'case_id', "activity_key":'activity', "timestamp_key":'timestamp'}

In [26]:
log_fitness = pm4py.conformance.fitness_alignments(events_out, petri_net,im,fm, **args)["log_fitness"]

log_precision =pm4py.conformance.precision_alignments(events_out, petri_net,im,fm, **args)

print("Precision:  ", log_precision)
print("Fitness:    ",log_fitness )
print("_______________________________")
print("Conformance:", (log_fitness+log_precision) /2)

aligning log, completed variants ::   0%|          | 0/4 [00:00<?, ?it/s]

computing precision with alignments, completed variants ::   0%|          | 0/6 [00:00<?, ?it/s]

Precision:   1.0
Fitness:     0.9999649135118066
_______________________________
Conformance: 0.9999824567559032


### Efficiency

In [27]:
import sys
import pickle
model_size = [
    sys.getsizeof(pickle.dumps(activity_boundaries_classifier.model)),
    sys.getsizeof(pickle.dumps(activity_type_classifier.model)),
    sys.getsizeof(pickle.dumps(event_activity_model.model))
]
print("model sizes in MB", f"{sum(model_size) / (1024**2):.4f}")

from statistics import mean,stdev
print("average processing time:",f"{mean(processing_times)*1000:.4f} ms")
print("max buffer size:", max(buffer_sizes),f"({max(buffer_sizes) / len(records) * 100:.4f})")

model sizes in MB 0.7847
average processing time: 0.3015 ms
max buffer size: 154 (0.2508)
