In [309]:
## import all necessary libraries ##

import numpy as np
import random
import math
from datetime import datetime
import matplotlib.pyplot as plt
from sklearn import model_selection as ms

#########################

from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.filtering.log.attributes import attributes_filter
from pm4py.algo.filtering.log.variants import variants_filter
from pm4py.statistics.traces.log import case_statistics
from pm4py.objects.log.util import insert_classifier

#########################

import pomegranate as pom

In [326]:
class log:
    
    ### SETUP ###
    
    ## load dataset, generate mapping and generate strings
    def __init__(self, path):
        self.strings = []
        self.transl = {}
        
        self.log = xes_importer.apply(path)

        
        try:
            # generate custom activity classifier
            self.log, activity_key = insert_classifier.insert_activity_classifier_attribute(self.log, "Activity classifier")
            for trace in self.log:
                for event in trace:
                    custom_classifier = ""
                    for activity_classifier in self.log.classifiers["Activity classifier"]:
                        custom_classifier = custom_classifier + event[activity_classifier] + "+"
                    custom_classifier = custom_classifier[:-1]
                    event["customClassifier"] = custom_classifier
        except:
            for trace in self.log:
                for event in trace:
                    event["customClassifier"] = event["concept:name"]
    
        self.clear_caches()
        self.gen_mapping()
        self.read_lof()
        
    def clear_caches(self):
        self.Nk_res_dict = {} # N_k result cache
    
    ## generate mapping from activity to char
    def gen_mapping(self):
        ## generate mapping from activities to chars ##
        activities = list(attributes_filter.get_attribute_values(self.log, "customClassifier").keys())
        
        for i, a in enumerate(activities):
            self.transl[a] = str(i+1)
    
    def read_lof(self):
        self.lof = np.genfromtxt(path + ".csv")
        self.anomaly = []
        
        cutoff = np.percentile(self.lof, 90)
        
        for i, l in enumerate(self.lof):
            if l >= cutoff:
                self.anomaly.append(["anomaly"] * len(self.log[i]))
            else:
                self.anomaly.append(["no anomaly"] * len(self.log[i]))

                
    def gen_lists(self):
        lists = []
        for trace in self.log:
            tlist = np.empty(len(trace))
            for i, event in enumerate(trace):
                tlist[i] = self.transl[event["customClassifier"]]
            lists.append(tlist)
        return lists

## Setup data

In [370]:
path = "Datasets/BPIC13.xes"

# import log
tlog = log(path)

lists = tlog.gen_lists()
is_anomaly = tlog.anomaly

# split data into training and test
lists_train, lists_test, is_anomaly_train, is_anomaly_test = ms.train_test_split(lists, is_anomaly, test_size = 0.2, random_state = 2)

parsing log, completed traces ::   0%|          | 0/1487 [00:00<?, ?it/s]

## Setup model

In [371]:
model = pom.HiddenMarkovModel.from_samples(
    pom.DiscreteDistribution, 
    n_components=2,
    X=lists_train, 
    labels=is_anomaly_train,
    #state_names=["anomaly", "no anomaly", "unknown"],
    state_names=["anomaly", "no anomaly"],
    algorithm="labeled")
model.bake()

## Evaluate

In [372]:
false_positive = 0 # anomaly incorrectly detected
false_negative = 0 # anomaly incrrectly not detected
true_positive = 0 # anomaly correctly detectd
true_negative = 0 # anomaly correclty not detected

for i, t in enumerate(lists_test):
    prediction = model.predict(t)
    if prediction[-1] == 0 and is_anomaly_test[i][-1] == "no anomaly":
        true_negative = true_negative + 1
    if prediction[-1] == 0 and is_anomaly_test[i][-1] == "anomaly":
        false_negative = false_negative + 1
    if prediction[-1] == 1 and is_anomaly_test[i][-1] == "no anomaly":
        false_positive = false_positive + 1
    if prediction[-1] == 1 and is_anomaly_test[i][-1] == "anomaly":
        true_positive = true_positive + 1

TPR = true_positive / (true_positive + false_negative)
TNR = true_negative / (true_negative + false_positive)
        
FPR = false_positive / (false_positive + true_negative)
FNR = false_negative / (false_negative + true_positive)

In [373]:
print("FPR\t\t\t" + str(round(FPR * 100, 1)) + "%")
print("FNR\t\t\t" + str(round(FNR * 100, 1)) + "%")
print("\n")
print("TNR (sensitivity)\t" + str(round(TNR * 100, 1)) + "%")
print("TPR (specificity)\t" + str(round(TPR * 100, 1)) + "%")

FPR			0.8%
FNR			90.6%


TNR (sensitivity)	99.2%
TPR (specificity)	9.4%
