In [1]:
import numpy as np
import pandas as pd

# Motivations

We're on the stage where we're looking for correlation between trace similarity and exhibiting similar LTL expressions. Here, we combine the code we used in the two previous sections in order to further develop sub-conversation exploration/discovery from notions of similarity based on the modified edit-distance/substitution and insertion scores.

# Functions

Below we'll store the functions used, written originally and copied over from the other files.

In [2]:
#helper function to count the occurrences of violations to the label
def count_label (sub, labels):
    assert type(sub) == list
    assert type(labels) == list
    
    count = 0
    for item in sub:
        if item not in labels:
            count += 1
    return count

#until-N definition
def until_N (trace, x, y, N):
    
    assert type(trace) == list
    assert type(x) == list
    assert type(y) == list
    
    sol = []
    current = N
    s = -1
    e = -1
    for i in range(len(trace)):
        
        if (trace[i] in x) and e == -1 and s == -1: #finding first instance of x
            s = i
        
        if (s != -1) and (trace[i] not in x) and (trace[i] not in y): #started count and violates Until
            
            if current <= 0: #no more N to give
                s = -1
                e = -1
                current = N
                continue #search for next
                
            else: #more N to give, decrement
                current -= 1
        
        if s != -1 and (trace[i] in y): #found instance of y and x
            e = i
            sol.append((s,e, count_label(trace[s:e],x), e-s)) #append starting and ending index, with number of appearances of x
            s = -1
            e = -1
    
    return sol

In [3]:
#STEPS for Substitution Score

#STEP 1: Define the symbols in the list of traces
def define_symbols (traces):
    assert type(traces) == list
    symbols = []
    for item in traces:
        symbols.append(set(item))
    x = symbols[0]
    for i in range(len(symbols)):
        x = x.union(symbols[i])
    
    return list(x)


#STEP 2: Define the set of all 3-grams in the logs and their frequencies
def three_grams (traces):
    assert type(traces) == list
    g3 = []
    g3_freq = {}
    for trace in traces:
        for i in range(len(trace)-2):
            g3.append(", ".join(list(trace[i:i+3])))
            try:
                g3_freq[", ".join(list(trace[i:i+3]))] += 1
            except:
                g3_freq[", ".join(list(trace[i:i+3]))] = 1
    return list(set(g3)), g3_freq


#STEP 3: Define the context for symbol a
def define_context(grams):
    
    assert type(grams) == list
    
    context = {}
    for gram in grams:
        x,a,y = gram.split(", ")
        try:
            context[a].append("{0}, {1}".format(x,y))
        except:
            context[a] = []
            context[a].append("{0}, {1}".format(x,y))
            
    #clear dups
    for k in list(context.keys()):
        context[k] = list(set(context[k]))
    
    return context


#STEP 4: define pairs of context
def context_pairs (context):
    
    assert type(context) == dict
    
    context_pairs = {}
    for a in list(context.keys()):
        for b in list(context.keys()):
            if a != b:
                context_pairs["{0}, {1}".format(a, b)] = list(set(context[a]).intersection(set(context[b])))
    
    return context_pairs


#STEP 5: define co-occurrence combinations
def define_cooccurrence(symbols, context_pairs, gram_freq):
    
    assert type(context_pairs) == dict
    assert type(gram_freq) == dict
    assert type(symbols) == list
    
    co_occur = {}
    for k in list(context_pairs.keys()):
        for item in context_pairs[k]:
            for a in symbols:
                for b in symbols:
                    x,y = item.split(", ")[0], item.split(", ")[1]
                    if a == b:
                        try:
                            n = gram_freq["{0}, {1}, {2}".format(x,a,y)]
                            co_occur["{0}, {1}({2}, {3})".format(x,y,a,b)] = (n*(n-1))/2
                        except:
                            co_occur["{0}, {1}({2}, {3})".format(x,y,a,b)] = 0.0
                        
                    elif a != b:
                        try:
                            n_i = gram_freq["{0}, {1}, {2}".format(x,a,y)]
                            n_j = gram_freq["{0}, {1}, {2}".format(x,b,y)]
                            co_occur["{0}, {1}({2}, {3})".format(x,y,a,b)] = n_i*n_j
                        except:
                            co_occur["{0}, {1}({2}, {3})".format(x,y,a,b)] = 0.0
    
    return co_occur


#STEP 6: Define the count of co-occurrences for symbols a,b for all contexts
def co_occur_combos(symbols, con_pairs, co_occurs):
    assert type(symbols) == list
    assert type(con_pairs) == dict
    assert type(co_occurs) == dict
    
    co_occur_combos = {}
    for a in symbols:
        for b in symbols:
            total = 0.0
            for k in list(con_pairs.keys()):
                for item in con_pairs[k]:
                    total += co_occurs["{0}({1}, {2})".format(item,a,b)]
            co_occur_combos["{0}, {1}".format(a,b)] = total
    
    return co_occur_combos


#STEP 7: Define norm on the count of co-occur combos
def define_norm (co_combos):
    assert type(co_combos) == dict
    norm = 0.0
    for k in list(co_combos.keys()):
        norm += co_combos[k]
    
    return norm


#STEP 8: Define matrix M over A x A
def define_matrix (symbols, co_combos, norm):
    assert type(symbols) == list
    assert type(co_combos) == dict
    assert type(norm) == float
    
    mat_M = {}
    for a in symbols:
        for b in symbols:
            mat_M["{0}, {1}".format(a,b)] = co_combos["{0}, {1}".format(a,b)]/norm
    
    return mat_M


#STEP 9: Define the probability of occurrence
def prob_occur (symbols, mat_M):
    assert type(symbols) == list
    assert type(mat_M) == dict
    
    p = {}
    for a in symbols:
        total = 0
        for b in symbols:
            if a != b:
                total += mat_M["{0}, {1}".format(a,b)]
        total += mat_M["{0}, {1}".format(a,a)]
        p["{0}".format(a)] = total
    
    return p


#STEP 10: Define the expected values
def exp_val (symbols, prob):
    assert type(symbols) == list
    assert type(prob) == dict
    
    e_val = {}
    for a in symbols:
        for b in symbols:
            if a == b:
                e_val["{0}, {1}".format(a,b)] = prob["{0}".format(a)]**2
            else:
                e_val["{0}, {1}".format(a,b)] = 2*prob["{0}".format(a)]*prob["{0}".format(b)]
    
    return e_val


#STEP 11: Define the function for substitution scores
def sub_scores (traces):
    assert type(traces) == list
    
    symbols = define_symbols(traces)
    three_gs, three_gs_freq = three_grams(traces)
    cons = define_context(three_gs)
    con_pairs = context_pairs(cons)
    co_occurs = define_cooccurrence(symbols, con_pairs, three_gs_freq)
    co_combos = co_occur_combos(symbols, con_pairs, co_occurs)
    norm = define_norm(co_combos)
    matM = define_matrix(symbols, co_combos, norm)
    probs = prob_occur(symbols, matM)
    e_val = exp_val(symbols, probs)
    
    sub_costs = {}
    for a in symbols:
        for b in symbols:
            if a!=b:
                try:
                    sub_costs["{0}, {1}".format(a,b)] = np.log2(matM["{0}, {1}".format(a,b)]/e_val["{0}, {1}".format(a,b)])
                except:
                    sub_costs["{0}, {1}".format(a,b)] = -1000
    
    return sub_costs

In [4]:
#STEPS 1-3 are the same for Insertion Score

#STEP 4: Define occurence of 3-gram counts
def occ_count (symbols, cons, grams, gfreq):
    assert type(symbols) == list
    assert type(grams) == list
    assert type(cons) == dict
    
    o_counts = {}
    for a in list(cons.keys()):
        for pair in cons[a]:
            x = pair.split(", ")[0]
            y = pair.split(", ")[1]
            o_counts["{0}, {1}({2})".format(x,y,a)] = gfreq["{0}, {1}, {2}".format(x,a,y)]
    
    return o_counts


#STEP 5: define countRgivenL
def countRgL (symbols, ocounts):
    assert type(symbols) == list
    assert type(ocounts) == dict
    
    rgl_counts = {}
    
    for a in symbols:
        for x in symbols:
            #if a !=x:
            total = 0
            for k in list(ocounts.keys()):
                if k.split("(")[0].split(", ")[0] == x and k.split("(")[1] == "{0})".format(a):
                    total += ocounts[k]
            rgl_counts["{0}/{1}".format(a,x)] = total
    
    return rgl_counts


#STEP 6: define norm(a)
def rgl_norm (symbols, rgl_counts):
    assert type(symbols) == list
    assert type(rgl_counts) == dict
    
    rgl_norms = {}
    
    for a in symbols:
        total = 0
        for x in symbols:
            #if a !=x:
            total += rgl_counts["{0}/{1}".format(a,x)]
        rgl_norms["{0}".format(a)] = total
    
    return rgl_norms


#STEP 7: define the probability of all symbols
def rgl_prob (trace):
    assert type(trace) == list
    
    p = {}
    for item in trace:
        for a in item:
            try:
                p["{0}".format(a)] += 1
            except:
                p["{0}".format(a)] = 1
    
    tot_len = 0
    for item in trace:
        tot_len += len(item)
    
    for k in list(p.keys()):
        p[k] = p[k]/tot_len
    
    return p


#STEP 8: define rglNorm
def normed_counts (symbols, rgl, norms):
    assert type(symbols) == list
    assert type(rgl) == dict
    assert type(norms) == dict
    
    normed_rgls = {}
    
    for a in symbols:
        for b in symbols:
            normed_rgls["{0}/{1}".format(a,b)] = rgl["{0}/{1}".format(a,b)]/norms["{0}".format(a)]
    
    return normed_rgls


#STEP 9: Define the function for insertion score
def insert_scores (traces):
    assert type(traces) == list
    
    symbols = define_symbols(traces)
    grams, freq = three_grams(traces)
    cons = define_context(grams)
    oc = occ_count(symbols, cons, grams, freq)
    rgl = countRgL(symbols, oc)
    norms = rgl_norm(symbols, rgl)
    probs = rgl_prob(traces)
    norm_rgls = normed_counts(symbols ,rgl, norms)
    
    scores = {}
    for a in symbols:
        for b in symbols:
            scores["{0}/{1}".format(a,b)] = np.log2(norm_rgls["{0}/{1}".format(a,b)]/probs["{0}".format(a)]*probs["{0}".format(b)])
    
    #replace -inf
    for k in list(scores.keys()):
        if scores[k] == -np.inf:
            scores[k] = -1000
    
    return scores

In [5]:
#function definition for calculating similarity
def calc_similarity(trace1, trace2, sub_cost, ins_cost, probs):
    
    assert type(trace1) == type(trace2) == list
    
    #pad traces
    trace1 = ["_"] + trace1
    trace2 = ["_"] + trace2
    
    #set shorter one as tr1
    if len(trace1) > len(trace2):
        copy = trace1
        trace1 = trace2
        trace2 = copy

    M = len(trace1)
    N = len(trace2)
    sim_table = np.zeros((M,N)) #establish table
    s_score = sub_cost #get substitution score
    ins_score = ins_cost #get insertion score
    p = probs #get probabilities
    
    #fill table, horizontal -> vertical
    for i in range(M):
        for j in range(N):
            
            #original fill horizontal
            if i == 0:
                if j == 0: #first fill
                    sim_table[i][j] = 1000
                elif j == 1: #first insert
                    sim_table[i][j] = p["{0}".format(trace2[j])]
                else: #rest fill, base insert scores
                    sim_table[i][j] = ins_score["{0}/{1}".format(trace2[j], trace2[j-1])] + sim_table[i][j-1]
            
            #original fill vertical
            elif j == 0:
                if i == 0:#first fill
                    sim_table[i][j] = 1000
                elif i == 1:
                    sim_table[i][j] = p["{0}".format(trace1[i])]
                else: #rest fill, base is the opposite of insert scores
                    sim_table[i][j] = -1*ins_score["{0}/{1}".format(trace1[i], trace1[i-1])] + sim_table[i-1][j]
            
            elif trace1[i] == trace2[j]: #no changes
                sim_table[i][j] = sim_table[i-1][j-1]
            
            else: #substitution, insertion or deletion
                
                #determine the min
                op = np.argmax([sim_table[i-1][j], sim_table[i][j-1], sim_table[i-1][j-1]]) #in order, removal, insertion, substitution
                if op == 0:
                    sim_table[i][j] = -1 + sim_table[i-1][j]#-1*ins_score["{0}/{1}".format(trace2[j],trace1[i])] + sim_table[i-1][j] #removal
                elif op == 1:
                    sim_table[i][j] = ins_score["{0}/{1}".format(trace2[j],trace1[i])] + sim_table[i][j-1] #insertion
                elif op == 2:
                    sim_table[i][j] = s_score["{0}, {1}".format(trace1[i],trace2[j])] + sim_table[i-1][j-1] #substitution
                
    return sim_table[i][j] #final score

In [44]:
#define the function to find all relevant sub-conversations in a trace
def find_sub_conversations (trace, labels, c_cap, l):
    assert type(trace) == list
    assert type(labels) == list
    assert type(c_cap) == float
    assert type(l) == int
    
    #counts
    sub_convos = {}
    
    #cycle through label pairs
    for i in range(len(labels)):
        for j in range(len(labels)):
            
            if i != j: #no repeats
                candidate = until_N(trace, [labels[i]], [labels[j]], 50) #cap at 50
                
                for item in candidate:
                    if item[3] > 10 and item [3] < l and item[2] < item[3]*c_cap:
                        try:
                            sub_convos["{0} -> {1}".format(labels[i], labels[j])] += 1
                        except:
                            sub_convos["{0} -> {1}".format(labels[i], labels[j])] = 1
    
    return sub_convos

#modify the function to find all relevant sub-conversations in a trace
def compare_sub_conversations (trace1, trace2, labels, c_cap, l):
    assert type(trace1) == list
    assert type(trace2) == list
    assert type(labels) == list
    assert type(c_cap) == float
    assert type(l) == int
    
    #counts
    sub_convos1 = {}
    sub_convos2 = {}
    
    #cycle through label pairs
    for i in range(len(labels)):
        for j in range(len(labels)):
            
            if i != j: #no repeats
                candidate = until_N(trace1, [labels[i]], [labels[j]], 50)
                sub_convos1["{0} -> {1}".format(labels[i], labels[j])] = 0
                
                for item in candidate:
                    if item[3] > 10 and item [3] < l and item[2] < item[3]*c_cap:
                        sub_convos1["{0} -> {1}".format(labels[i], labels[j])] += 1
    
    #do for second trace
    for i in range(len(labels)):
        for j in range(len(labels)):

            if i != j: #no repeats
                candidate = until_N(trace2, [labels[i]], [labels[j]], 50)
                sub_convos2["{0} -> {1}".format(labels[i], labels[j])] = 0

                for item in candidate:
                    if item[3] > 10 and item [3] < l and item[2] < item[3]*c_cap:
                        sub_convos2["{0} -> {1}".format(labels[i], labels[j])] += 1
    
    #compare and give summary report
    report = {}
    report_total = 0
    for key in list(sub_convos1.keys()):
        report[key] = np.abs(sub_convos1[key]-sub_convos2[key])
        report_total += report[key]
    
    return report, report_total

# Overview

The goal is to draw a correlation between similarity of traces (derived from substitution and insertion scores) and the LTL sequences exhibited. To do so, we'll have to address the following considerations:

- Similarity measure in the paper originally was done via using event logs from a consistent process (healthcare). Our event logs are not from a consistent process. This could have effects on the similarity (derived from effects on the scores obtained from the eventlogs as "training"), so we need to consider the types of processes we pick for our traces.
- We need to establish first a way to find LTL sequences that have interesting properties (sequence length and count for violations of N), as those sub-sequences are what we can consider "sub-conversations" and are the LTL sequences that are worth looking at in a frequency-based analysis, as well as critical in drawing the correlation we want in this part.

# Clustering

Presumably, one of the ways we can do this correlation is by looking at the results of clustering, to see if there are correlations between the clusters made by

- similarity values alone (Case A)
- weighting values alone (Case B)
- similarity and weighting values (Case C)

In [13]:
#using sklearn
from sklearn.cluster import AgglomerativeClustering

#sample 3 by 3 clustering
dis_mat = np.array([[0,1,2],[1,0,3],[2,3,0]])

clusters = AgglomerativeClustering(affinity="precomputed", linkage="average").fit(dis_mat)
clusters.labels_

array([0, 0, 1])

In [39]:
a = ["a","b","b","a","b","b","a","c"]
b = ["b","a","a","b","a","a","b","c"]
#c = ["c","c","c","c","c","c","a","c"]
c = ["a","a","a","b","b","b","c","a"]
scost = sub_scores([a,b,c])
icost = insert_scores([a,b,c])
prob = rgl_prob([a,b,c])
t_run = [a,b,c]
dis_mat = np.zeros((len(t_run),len(t_run)))

for i in range(len(t_run)):
    for j in range(len(t_run)):
        dis_mat[i][j] = 1/(1000+calc_similarity(t_run[i],t_run[j], scost, icost, prob))



In [40]:
clusters = AgglomerativeClustering(affinity="precomputed", linkage="average").fit(dis_mat)
clusters.labels_

array([0, 1, 0])

In [41]:
dis_mat

array([[0.0005    , 0.0005007 , 0.00050052],
       [0.0005007 , 0.0005    , 0.00100173],
       [0.00050061, 0.00050072, 0.0005    ]])

In [42]:
#sample data
dat1 = pd.read_csv('./graham.norton.s22.e08_data.csv')
dat2 = pd.read_csv('./graham.norton.s22.e12_data.csv')
dat3 = pd.read_csv('./blackpink_data.csv')
test1 = list(dat1.L)
test2 = list(dat2.L)
test3 = list(dat3.L)

In [43]:
eventlog = [test1,test2,test3]
scost = sub_scores(eventlog)
icost = insert_scores(eventlog)
prob = rgl_prob(eventlog)
dis_mat = np.zeros((len(eventlog), len(eventlog)))

for i in range(len(eventlog)):
    for j in range(len(eventlog)):
        dis_mat[i][j] = 1/(1000+calc_similarity(eventlog[i],eventlog[j], scost, icost, prob))

print(dis_mat)
clusters = AgglomerativeClustering(affinity="precomputed", linkage="average").fit(dis_mat)
print(clusters.labels_)



[[0.0005     0.0007274  0.00131518]
 [0.0007274  0.0005     0.00191831]
 [0.00131518 0.00191831 0.0005    ]]
[0 0 1]


In [None]:
#weighting values
