In [22]:
import pandas as pd
import numpy as np
import re 
import copy

from tqdm.notebook import tqdm_notebook

import warnings
warnings.filterwarnings("ignore")

%load_ext autoreload
%autoreload 2

# My packages
from source import parse_mxml as pm
from source import log_representation as lr
from source import plots as plts
from source import drift_detection as dd
from source import drift_localization as dl
from source import offline_streaming_clustering as off_sc
from source import online_streaming_clustering as on_sc

from sklearn.model_selection import ParameterGrid
from joblib import Parallel, delayed, parallel_backend

import random
random.seed(42)

import os
import glob

import gc
gc.enable()

from scipy.spatial import distance
from sklearn.base import clone as sk_clone 

from copy import deepcopy

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
def insensitive_glob(pattern):
    def either(c):
        return '[%s%s]' % (c.lower(), c.upper()) if c.isalpha() else c
    return glob.glob(''.join(map(either, pattern)))

def if_any(string, lista):
    # If the string contains any of the values
    # from the list 'lista'
    for l in lista:
        if l in string:
            return True
    return False

In [4]:
# List log files
# logs = insensitive_glob("../process_mining_datasets/*/*k.MXML")
logs = [x.replace('..\\', '../') for x in logs if "2.5" not in x]

In [5]:
# reference objects and map them to strings in dict 
# used in further methods
objects = {
    "representation": {
        "binary": lambda x: lr.get_binary_representation(lr.get_traces_as_tokens(x)),
        "frequency": lambda x: lr.get_frequency_representation(lr.get_traces_as_tokens(x)),
        "transitions": lambda x: lr.get_binary_transitions_representation(lr.get_traces_as_tokens(x))
    }
}

In [6]:
# change patterns and they supported representations

binary_support = ["cb", "cf",  "cm", "rp", "pm", "fr"]
frequency_support = binary_support + ['lp', 'cp', ]
transitions_support = frequency_support + ["pl", "cd", "sw"]

### Pipeline Streaming Clustering

In [43]:
OUTPUT_PATH = "../LoanApplications_Streaming/" 

In [45]:
def my_split(s):
    return list(filter(None, re.split(r'(\d+)', s)))

def parse_name_info(name):
    x = name.split('\\')[-1].split('/')[-1]
    x_ = my_split(x)
    return {
        "tipo_mudanca": x_[0].replace('new_', ''),
        "log_size": int(x_[1]) * 1000 if int(x_[1]) != 7 else 7500,
        'file_ref': x
    }

def read_file_and_run_streaming_cluster_pipeline(args, return_result=False):
    """
    Read an event log file, represent it into a feature vector space and
    run the trace clustering method over windows. This method outputs results
    as gzip csv files into the "OUTPUT_PATH" folder, or return the result 
    as DataFrame when return_result = True.
    
    Parameters:
    -----------
        args (dict): Dictionary with the parameters and the log_file path
            requiring the following keys:
                example = {
                    'log': <PATH TO LOG_FILE>,
                    'representation': <KEY TO REPRESENTATIONS IN 'objects'>,
                    'parameters': [{
                        'model': <KEY TO MODEL IN 'objects'>, 
                        'sliding_window': <WHETHER TO USE SLIDING WINDOW>,
                        'window_size': <SIZE OF TRACE WINDOW TO USE>,
                        'sliding_step': <STEP OF SLIDING WINDOW>
                    }
        return_result (bool): Whether to return the result as DataFrame
            
    """
    
    df = pm.all_prep(open(args['log']))
    df_ = objects['representation'][args['representation']](df)
    
    resp = parse_name_info(args['log'])

    resp['input_log'] = args['log']
    resp['representation'] = args['representation']
    
    output = OUTPUT_PATH + resp['tipo_mudanca'] + '/'

    try:
        os.makedirs(output)
    except:
        pass
    
    for comb in args['parameters']:
        r = copy.deepcopy(resp)
        r.update(comb)
        
        p = {}
        for k in comb:
            if k.startswith('model_parameters__'):
                p[k.replace('model_parameters__', '')] = comb[k]

        output_file = output + '__'.join([
            r['tipo_mudanca'],
            str(r['log_size']),
            r['representation'],
            r['model']
        ] + [str(key)+"="+str(p[key]) for key in p])
        
        if os.path.isfile(output_file + '--results.pickle.gzip'):
            print("Already exists")
            continue
        
        print(output_file + "...")
        ret = on_sc.run_online_streaming_clustering(
            model_name=comb['model'],
            model_parameters=p,
            df=df_,
            use_tqdm=False,
            get_macro_clusters=True
        )
        
        ft = on_sc.extract_features_dataframe(ret, use_tqdm=False)
        
        for key in r:
            ret[key] = r[key]
        
        ret.to_pickle(output_file + '--results.pickle.gzip', compression="gzip")
        ft.to_pickle(output_file + '--features.pickle.gzip', compression="gzip")

        if return_result:
            return ret

#### Run pipeline for specific case(s)

In [46]:
all_metrics = read_file_and_run_streaming_cluster_pipeline({
    'log': logs[0],
    'representation': 'binary',
    'parameters': [{
        'model': 'MOACluStream',
        'model_parameters__h': 150,
        'model_parameters__m': 30,
        'model_parameters__t': 2,
        'model_parameters__k_macro': 3,
    }]
}, return_result=True)

LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3...


In [47]:
all_metrics.head(2)

Unnamed: 0_level_0,col_names,micro,micro_ids,micro_radius,micro_weights,outliers,outliers_ids,outliers_radius,outliers_weights,macro,...,tipo_mudanca,log_size,file_ref,input_log,representation,model,model_parameters__h,model_parameters__m,model_parameters__t,model_parameters__k_macro
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,"[Appraise_property, Approve_application, Asses...","[[1, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1,...",[0],[2e-25],[1.0],,[],,,"[[1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0,...",...,pm,10000,pm10k.mxml,/Users/rafaelgaspar/Library/CloudStorage/Googl...,binary,MOACluStream,150,30,2,3
1,"[Appraise_property, Approve_application, Asses...","[[1, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 1,...","[0, 1]","[2e-25, 2e-25]","[1.0, 1.0]",,[],,,"[[1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0,...",...,pm,10000,pm10k.mxml,/Users/rafaelgaspar/Library/CloudStorage/Googl...,binary,MOACluStream,150,30,2,3


### Run Experiments with several parameters combinations

In [10]:
grid_logs = list(ParameterGrid([
    {
        "log": logs,
        "representation": ['binary', 'frequency', 'transitions']
    }
]))

grid_parameters = list(ParameterGrid({
    'model': ['MOACluStream'],
    'model_parameters__h': [50, 150, 200],
    'model_parameters__m': [30, 50],
    'model_parameters__t': [2],
    'model_parameters__k_macro': [2, 3],
}))

combs = []
for x in grid_logs:
    dic = x.copy()
    dic['parameters'] = grid_parameters
    
    combs.append(dic)
len(combs), len(grid_parameters), len(combs) * len(grid_parameters)

(75, 18, 1350)

#### Run parallely

In [None]:
final_resp = Parallel(n_jobs=3)(
    delayed(read_file_and_run_clustering_pipeline)(comb) for comb in tqdm_notebook(combs)
)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=75.0), HTML(value='')))

### Detection Pipeline

In [48]:
# Drift detection parameters
drift_config = list(ParameterGrid([
    {
        "rolling_window": [50, 100, 150, 200],
        "std_tolerance": [1.25, 1.5, 1.75, 2],
        'min_tol': [0.01, 0.03] 
    }
]))

In [49]:
# List all files obtained after the trace clustering pipeline

clusterizacoes = glob.glob(OUTPUT_PATH + "*/*.pickle.gzip")

In [50]:
clusterizacoes

['LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3--features.pickle.gzip',
 'LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3--results.pickle.gzip']

In [51]:
# Combinations to run

combs_deteccao = []
for log in clusterizacoes:
    combs_deteccao.append({
        'input': log,
        'combinations': drift_config
    })

In [52]:
# Replace part of the OUTPUT_PATH to create a new folder 
# for the detection results

OLD_prefix = "LoanApplications_Streaming"
NEW_prefix = "LoanApplications_Streaming__DETECTION"

In [83]:
def drift_detect_pipeline(args, return_results=False):
    all_metrics = pd.read_pickle(args["input"], compression='gzip')
#     all_metrics['sum_diff_centroids'] = all_metrics['diff_centroids'].apply(
#         lambda x: np.abs(x.sum(axis=0)).sum() if not x is np.nan and x.shape[0] > 1 else np.nan
#     )
    
    path_file = args["input"].replace("--results.pickle.gzip", "").split('\\')
    
    args.update({
        "tipo_mudanca": path_file[-1].split('__')[0],
        "log_size": int(path_file[-1].split('__')[1]),
        "model": path_file[-1].split('__')[3],
        "representation": path_file[-1].split('__')[2],
    })
    
    #unpack model parameters
    for model_p in path_file[-1].split('__')[4:]:
        args[model_p.split('=')[0]] = model_p.split('=')[1]
    
    
    for combination in args['combinations']:
        c = deepcopy(combination)
        c.update(args)
        
        __drift_detect_pipeline(
            all_metrics, c, return_results=return_results
        )
    

def __drift_detect_pipeline(all_metrics, args, return_results=False):     
    base_name = args["input"].replace(".pickle.gzip", "")
    base_name = base_name.replace("EXEC__LoanApplications_STREAMING", "EXEC__LoanApplications_STREAMING__DETECTION")
    
    to_string = [
        str(args["rolling_window"]),
        str(args["std_tolerance"]).replace(".", "-"), 
        str(args["min_tol"]).replace(".", "-")
    ]
    
    if "smooth_factor" in args:
        to_string += [str(args["smooth_factor"]).replace(".", "-")]
    
    try:
        os.makedirs(base_name)
    except:
        pass
    
    final_name = base_name.replace("\\", "/") + "/" + "_".join(to_string) + ".pickle.gzip"
    
    if os.path.isfile(final_name):
        try:
            x = pd.read_pickle(final_name, compression='gzip')
            print("Already exists")
            return
        except:
            pass
    
    # all_metrics = pd.read_csv(args["input"], index_col=0)
    
    # print(info)
    # Monta o vetor de resposta
    y_true = [x for x in range(int(args['log_size']/10), args['log_size'], int(args['log_size']/10))]
    
    results = []
    for col in all_metrics.select_dtypes(include=np.number).columns:
        if (col not in ["k"] and not col.startswith("diff") ) or col in ["diff_centroids"]:
            r = deepcopy(args)
            r["measure"] = col

            # print(col)
            # print(all_metrics[col].dtype)

            detected_drifts, extra = dd.detect_concept_drift(
                all_metrics, 
                col,
                args["rolling_window"],
                args["std_tolerance"],
                args["min_tol"]
            )

            metrics_results = dd.get_metrics(
                detected_drifts,
                y_true,
                int(args["h"])
            )

            r.update(args)
            r.update(metrics_results)

            results.append(r)

            gc.collect()
    
    print(final_name)
    ret = pd.DataFrame(results)
    ret.to_pickle(
        final_name,
        compression="gzip"
    )

    if return_results:
        return ret
    # print(col, len(results))

In [85]:
detection_results = drift_detect_pipeline({
    'input': 'LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3--features.pickle.gzip',
    'combinations': [{
       'min_tol': 0.01,
       'rolling_window': 3,
       'std_tolerance': 1.25
    }, {
       'min_tol': 0.02,
       'rolling_window': 3,
       'std_tolerance': 1.25
    }]
}, return_results=True)

LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3--features/3_1-25_0-01.pickle.gzip
LoanApplications_Streaming/pm/pm__10000__binary__MOACluStream__h=150__m=30__t=2__k_macro=3--features/3_1-25_0-02.pickle.gzip


In [88]:
detection_results.sort_values('F1', ascending=False).head(2)

#### Run parallely

In [None]:
final_resp = Parallel(n_jobs=3)(
    delayed(drift_detect_pipeline)(comb_d) for comb_d in tqdm_notebook(combs_deteccao)
)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=54.0), HTML(value='')))