# Mining Specifications from Execution Traces

* The content of this notebook corresponds to the analysis done in section 4.3 in the paper.
* As mentioned in the paper, it took 3 days to mine sequences of 27 projects.
* In this notebook we use 1 project as example, but it still takes more than 10 minutes to complete (Run carefully).

## Loading data

In [1]:
## We use this project as example
import json

with open("/DyPyBench/callgraph_seq/p13.json") as dseq:
    seq_calls = json.load(dseq)

## Extracting and Preprocessing callers/callees sequences

In [4]:
## Parsing the sequence to callers and the sequence of calles within each caller
## The format is something like:
    ## -- caller
    ##      -- callee
    ##      -- callee
    ##      -- callee ...
def split_by_callers(calls_sequence):
    counter = 1
    caller = calls_sequence[0].split("##")[-1]
    splitted_sequences = []
    s_temp = []
    s_temp.append(calls_sequence[0])
    i = 1
    while True:
        if i == len(calls_sequence):
            splitted_sequences.append(s_temp)
            break
        c = calls_sequence[i]
        if c.startswith("#START#"):
            if counter == 0 and c.split("##")[-1] != caller:
                splitted_sequences.append(s_temp)
                s_temp = [c]
                if i == len(calls_sequence):
                    splitted_sequences.append(s_temp)
                    break
                else:
                    caller = c
            else:
                s_temp.append(c)
            counter += 1
        elif c.startswith("#END#"):
            counter -= 1
            s_temp.append(c)
        i = i + 1
    return splitted_sequences

splitted_seqs = split_by_callers(seq_calls)

In [6]:
## recursively extract sequences callees within each caller
## 
final_list = []
seq_calls = [seq_calls]
while True:
    temp_seq = []
    for sc in seq_calls:
        temp_seq.extend(split_by_callers(sc))
    final_list.extend(temp_seq)
    seq_calls = []
    for s in temp_seq:
        if len(s) not in [0, 1, 2]:
            seq_calls.append(s[1:-1])

    if len(seq_calls)==0:
        break

simple_final_list = []
for l in final_list:
    simple_final_list.append([c.split("##")[0] for c in l])

In [8]:
## number of sequences
len(simple_final_list)

4831

In [10]:
## Filtering sequences of calls that were interrupted with an exception or sudden termination
filtered_list = []
for l in simple_final_list:
    new_l = []
    i = 0
    while l[0].startswith("#END#"):
        l = l[1:]
    c_old = l[0].split("#")[2]
    for c in l[1:]:
        if c.split("#")[2] == c_old:
            new_l.append(c_old)
        c_old = c.split("#")[2]
    filtered_list.append(new_l)
filtered_list = [f  for f in filtered_list if len(f)!=0]

In [12]:
## Number of valid sequences
len(filtered_list)

4213

In [10]:
## Transform from tree format to linear format (continous 1D array)
def inline_sequence(calls_list):
    if len(calls_list) == 2:
        return [calls_list]
    first_item = calls_list[0]
    first_name = first_item.replace("#START#", "").replace("#END#", "")
    counter = 1
    sub_list = []
    list_of_sub_lists = []
    sub_list.append(first_item)
    i = 1
    while True:
        next_item = calls_list[i]
        sub_list.append(next_item)
        if next_item == first_item:
            counter += 1
        elif next_item == "#END#"+first_name:
            counter -= 1
        if counter == 0:
            list_of_sub_lists.append(sub_list)
            sub_list = []
            i += 1
            if i == len(calls_list):
                return list_of_sub_lists
            first_item = calls_list[i]
            counter = 1
            sub_list.append(first_item)
            first_name = first_item.replace("#START#", "").replace("#END#", "")        
        i += 1    
        if i == len(calls_list):
            return list_of_sub_lists
inlined_seqs = inline_sequence(seq_calls)

In [12]:
def produce_sub_sequences(inlined_seqs):
    first_list = set([tuple(l) for l in inlined_seqs])
    second_list = set([tuple(l) for l in inlined_seqs])

    new_list = []
    while True:
        for l in second_list:
            if len(l) != 2:
                inseq = inline_sequence(l[1:-1])
                new_list.extend(inseq)
                new_list.append(l)
            else:
                new_list.append(l)
        new_list = set([tuple(l) for l in new_list])
        if new_list == first_list:
            break
        else:
            first_list = new_list
            second_list = new_list
            new_list = []
    return set([tuple(l) for l in new_list if len(l)!= 2])
sub_sequences = produce_sub_sequences(inlined_seqs)

In [15]:
callers_sequences = {}

for seq in sub_sequences:
    caller= seq[0].replace("#START#", "")
    dirty_callees = seq[1:-1]
    next_callee = dirty_callees[0]
    i = 1
    clean_callees = []
    while True:
        if dirty_callees[i] != next_callee.replace("#START#", "#END#"):
            i += 1
        else:
            clean_callees.append(next_callee.replace("#START#", ""))
            i += 1
            if i == len(dirty_callees):
                break
            else:
                next_callee = dirty_callees[i]
                i += 1

    if caller in callers_sequences:
        callers_sequences[caller].append(clean_callees)
    else:
        callers_sequences[caller] = [clean_callees]


In [2]:
def get_calls_set(calls):
    parsed_list = []
    if calls[0] == "#START#":
        sub_calls = calls[1:-1]
        func_name = sub_calls[0]
        func_calls = []
        sub_calls = calls[1:]
        stop = False
        f_calls = []
        count_start = 0
        count_end = 0
        for s in sub_calls:
            if count_start == 1 and s!= "#END#":
                func_calls.append(s)
            if s == "#START#":
                count_start += 1
                f_calls.append(s)
            elif s == "#END#":
                count_end += 1
                f_calls.append(s)
            else:
                f_calls.append(s)
            if count_start == count_end and count_start != 0:
                parsed_list.extend(get_calls_set(f_calls))
                count_start = 0
                count_end = 0
                f_calls = []

        parsed_list.append((func_name, func_calls))

    return parsed_list

In [15]:
## Considering sequences with only 3 element at least
filtered_list = [f for f in filtered_list if len(f) >= 3]

## Finally, mining sequence patterns

In [17]:
## Here we use the package Seq2Pat
## for more information check: https://github.com/fidelity/seq2pat
from sequential.seq2pat import Seq2Pat, Attribute

seq2pat = Seq2Pat(sequences=filtered_list[:10])


patterns = seq2pat.get_patterns(min_frequency=3)

In [18]:
## A view of the list of patterns
patterns

[['builtins.str', 'bytes.join', 10],
 ['builtins.str', 'str.encode', 10],
 ['builtins.str', 'str.encode', 'bytes.join', 10],
 ['str.encode', 'bytes.join', 10],
 ['http.HTTPStatus', 'builtins.str', 9],
 ['http.HTTPStatus', 'builtins.str', 'bytes.join', 9],
 ['http.HTTPStatus', 'builtins.str', 'str.encode', 9],
 ['http.HTTPStatus', 'builtins.str', 'str.encode', 'bytes.join', 9],
 ['http.HTTPStatus', 'bytes.join', 9],
 ['http.HTTPStatus', 'str.encode', 9],
 ['http.HTTPStatus', 'str.encode', 'builtins.str', 9],
 ['http.HTTPStatus', 'str.encode', 'builtins.str', 'bytes.join', 9],
 ['http.HTTPStatus', 'str.encode', 'builtins.str', 'str.encode', 9],
 ['http.HTTPStatus',
  'str.encode',
  'builtins.str',
  'str.encode',
  'bytes.join',
  9],
 ['http.HTTPStatus', 'str.encode', 'bytes.join', 9],
 ['http.HTTPStatus', 'str.encode', 'str.encode', 9],
 ['http.HTTPStatus', 'str.encode', 'str.encode', 'bytes.join', 9],
 ['str.encode', 'builtins.str', 9],
 ['str.encode', 'builtins.str', 'bytes.join', 9