In [None]:
# Dustminer Implementation
# Loading the normal data as Good pile and Faulty data as Bad pile

In [None]:
# Importing necessary libraries
import os
import numpy as np
import pandas as pd
from libraries.utils import get_paths, read_traces, read_json, mapint2var, is_consistent

In [None]:
# Configuration
CODE = 'theft_protection'               ### application (code) theft_protection, mamba2, lora_ducy
BEHAVIOUR_FAULTY = 'faulty_data/diag_subseq/subseq/'        ### normal, faulty_data
BEHAVIOUR_NORMAL = 'normal/'             ### normal, faulty_data
THREAD = 'single'                       ### single, multi
VER = 4                                 ### format of data collection

base_dir = './trace_data'              ### can be replaced with 'csv', 'exe_plot', 'histogram'
normalbase_path = base_dir+f'/{CODE}/{THREAD}_thread/version_{VER}/{BEHAVIOUR_NORMAL}'
faultybase_path = base_dir+f'/{CODE}/{THREAD}_thread/version_{VER}/{BEHAVIOUR_FAULTY}'

print("Normal base path:", normalbase_path)
print("Faulty base path:", faultybase_path)

In [None]:
train_base_path = os.path.join(normalbase_path, 'train_data') #'diag_refsamples500')
print("Train base path:", train_base_path)

print("Current working directory:", os.getcwd())
train_data_path = [os.path.join(train_base_path, x) for x in os.listdir(train_base_path)]
train_varlist_path = [os.path.join(normalbase_path, x) for x in os.listdir(normalbase_path) if 'varlist' in x]

######### get paths #######################
paths_log, paths_traces, varlist_path, paths_label = get_paths(faultybase_path)

train_data_path = [x for x in train_data_path if '.DS_Store' not in x]
train_varlist_path = [x for x in train_varlist_path if '.DS_Store' not in x]
paths_log = [x for x in paths_log if '.DS_Store' not in x]
paths_traces = [x for x in paths_traces if '.DS_Store' not in x]
varlist_path = [x for x in varlist_path if '.DS_Store' not in x]
paths_label = [x for x in paths_label if '.DS_Store' not in x]

paths_log.sort()
paths_traces.sort()
varlist_path.sort()
paths_label.sort()

test_data_path = paths_traces
test_label_path = paths_label

In [None]:
train_data_path

In [None]:
test_label_path

In [None]:
# # Check consistency
# if VER == 3:
#     check_con, _ = is_consistent([train_varlist_path[0]] + varlist_path)
#     if check_con:
#         to_number = read_json(varlist_path[0])
#         from_number = mapint2var(to_number)
#     else:
#         to_number = read_json(train_varlist_path[0])
#         from_number = mapint2var(to_number)

# sorted_keys = list(from_number.keys())
# sorted_keys.sort()
# var_list = [from_number[key] for key in sorted_keys]

In [None]:
def load_data(file_paths):
    data = []
    for file in file_paths:
        traces = read_traces(file)
        if isinstance(traces, list) and len(traces) <=2:
            # id_sequence = [trace for trace in traces]
            id_sequence = traces[0]
            print("id_sequence:", id_sequence)

        elif isinstance(traces, list) and len(traces) > 2:
            id_sequence = [int(trace[0]) for trace in traces if isinstance(trace, list) and len(trace) >= 2]
        
        data.append(id_sequence)

    return data


In [None]:
from collections import defaultdict
MIN_SUP = 2
SEGMENT_WIDTH = 50
TOP_K_SEGMENTS = 5
NORMALIZE_SUPPORT = True

good_log_directory = train_data_path
bad_log_directory = test_data_path

In [None]:
good_log_directory

In [None]:
bad_log_directory

In [None]:
good_sequences = load_data(good_log_directory)


In [None]:
len(good_sequences)

In [None]:
bad_sequences = load_data(bad_log_directory)

In [None]:
len(bad_sequences)

In [None]:
import math
from collections import defaultdict

def get_max_length(seq, event_id):
    max_length = 0
    start_index = seq.index(event_id)
    end_index = 0
    for i in range(start_index + 1 , len(seq)):
        if seq[i] == event_id:
            end_index = i - start_index
            if end_index > max_length:
                max_length = end_index
            start_index = i
    return max_length


def max_gap_two_lists(sequences):
    max_length = 0
    events = []
    event_id_max_length = {}

    for seq in sequences:
        if seq and isinstance(seq[0], list):
            inner_sequences = seq
        else:
            inner_sequences = [seq]  
        for inner_seq in inner_sequences:
            for i, x in enumerate(inner_seq):
                gap = get_max_length(inner_seq, x)
                if x not in event_id_max_length:
                    event_id_max_length[x] = gap
                    events.append(x)
                else:
                    if gap > event_id_max_length[x]:
                        event_id_max_length[x] = gap

                if event_id_max_length[x] > max_length:
                    max_length = event_id_max_length[x]
    return max_length

In [None]:
all_sequences_1 = good_sequences

MAX_PATTERN_LEN = max_gap_two_lists(all_sequences_1)


In [None]:
MAX_PATTERN_LEN

In [None]:
import re
from pathlib import Path

def get_ground_truth_file(path, ground_truth_files):
    filename = []
    for file in ground_truth_files:
        if Path(path).stem in Path(file).stem:
            print("from function - matched ground truth file is :", file)
            filename = file
    return filename

def get_trace_info(path):
    filename = Path(path).stem
    match = re.search(r"(trace_trial_?\d+)_(\d+)-(\d+)", filename)
    if match:
        name = match.group(1)
        start = int(match.group(2))
        end = int(match.group(3))
        test_data_name = name+'_'+str(start)+'-'+str(end)+'.json'
        return name, start, end, test_data_name
    else:
        raise ValueError("Filename format not recognized")

def find_sequence_ground_truth(test_data_path, ground_truth):
    trace = read_traces(test_data_path)
    name, start, end,test_data_name = get_trace_info(test_data_path)
    sequence = [int(ev[0]) for ev in trace if isinstance(ev, list) and len(ev) >= 2]
    gt_start_end_pair = [[x[0], x[1]] for x in ground_truth]
    return sequence, gt_start_end_pair


def create_labels(sequence, gt_start_end_pair, test_data_start_index, test_data_end_index):
    start_index = test_data_start_index
    end_index = test_data_end_index
    event_list = []
    event_id_list = []
    for start, end in gt_start_end_pair:
        event_list = []
        for event_id in range(start_index, end_index):
            if event_id >= start and event_id <= end:
                print("Event ID {} is in ground truth range ({}, {})".format(event_id, start, end))
                print("event_id - start_index:", event_id , start_index)
                event_list.append(sequence[event_id - start_index])
        if event_list:
            event_id_list.append(event_list)

    return event_id_list

In [None]:
from libraries.anomaly_detection import discover_test_files, load_ground_truth_dir, build_labels
import json 
 
gt_path   = f"{base_dir}/{CODE}/{THREAD}_thread/version_{VER}/faulty_data/labels" # example
ground_truth_path = [os.path.join(gt_path, x) for x in os.listdir(gt_path)]

print("ground truth path:", ground_truth_path)

new_label = {}

for test_data in test_data_path:
    print("Test data file:", test_data)


    print("---------------------------------------------------")
    test_data_name_1, test_data_start_index, test_data_end_index, test_data_name= get_trace_info(test_data)
    print("Test data name is : ", test_data_name_1)
    print("Test data start index is : ", test_data_start_index)
    print("Test data end index is : ", test_data_end_index)


    ground_truth_filename = get_ground_truth_file(test_data_name_1, ground_truth_path)
    if not ground_truth_filename:
        print("No matching ground truth file found for test data:", test_data)
        continue

    else:
        print("Ground truth file name is : ", ground_truth_filename)


        ground_truth_raw = read_traces(ground_truth_filename)                                               # read ground truth labels from the label file
        ground_truth = ground_truth_raw['labels']                                                # extract labels from dictionary from ground truth data

        label_trace_name = list(ground_truth.keys())[0]
        ground_truth = ground_truth[label_trace_name]

        print("ground truth:", ground_truth)

        print("The test data file ", test_data, " is not corresponding to ground truth file : ", ground_truth_filename)
        print("The test data file ", test_data, " is corresponding to ground truth file : ", ground_truth_filename)
        sequence, gt_start_end_pair = find_sequence_ground_truth(test_data, ground_truth)
        print("Event ID sequence is : ", sequence)
        print("Ground truth start-end pairs are : ", gt_start_end_pair)
        labels = create_labels(sequence, gt_start_end_pair, test_data_start_index, test_data_end_index)
        print("Labels are : ", labels)
        new_label[test_data_name] = labels
        print("New label dictionary is : ", new_label)

output_dir = f"{base_dir}/{CODE}/{THREAD}_thread/version_{VER}/faulty_data/"
os.makedirs(output_dir, exist_ok=True)

output_path = os.path.join(output_dir, "gt_test_data_labels.json")

with open(output_path, "w", encoding="utf-8") as f:
    json.dump(new_label, f, indent=4)

print(f"\n Saved to file: {output_path}")


In [None]:

from collections import defaultdict
from math import ceil

def has_substring(pattern, sequence):
    """
    This function checks if the given pattern is a subsequence of sequence.
    parameters : 
        pattern - list(int)
        sequence - list(int)
    Returns:
        True if the pattern appears as a continous match in sequence.
        False if no match
    """
    pattern = tuple(pattern)
    sequence = tuple(sequence)
    m = len(pattern)
    n = len(sequence)
    if m == 0:
        return True
    if m > n:
        return False
    for i in range(n - m + 1):
        if sequence[i:i+m] == pattern:
            return True
    return False

def count_substring(pattern, sequence):
    """
    This function counts the number of times the given pattern appears as continuous match in sequence.
    parameters : 
        pattern - list(int)
        sequence - list(int)
    Returns:
        count - (int) the count of occurrences of pattern in sequence.
    """
    pattern = tuple(pattern)
    sequence = tuple(sequence)
    m = len(pattern)
    n = len(sequence)
    if m == 0 or m > n:
        return 0
    count = 0
    for i in range(n - m + 1):
        if sequence[i:i+m] == pattern:
            count += 1
    return count

def is_subsequence(small, big):
    it = iter(big)
    return all(x in it for x in small)

def next_same_index(seq):
    """"
    This function is used to compute next occurence of every element in the sequence.
    parameters :
        seq - list(int)
    Returns:
        nxt - list(int) where nxt[i] is the index of the next occurrence of seq[i] in seq and n if none.
    """
    last_pos = {}
    n = len(seq)
    nxt = [n] * n
    for i in range(n - 1, -1, -1):
        v = seq[i]
        if v in last_pos:
            nxt[i] = last_pos[v]
        last_pos[v] = i
    return nxt

def dynamic_window_sequence(sequences):
    """
    This function generates dynamic windows for each sequence in sequences.
    For example, for sequence [1,2,3,1,4,2], the dynamic windows are: [1,2,3],[1,4,2]
    parameters :
        sequences - list of list(int)
    Returns:
        final_windows - list of list(int) containing all dynamic windows from all sequences.
    """
    final_windows = []
    for seq in sequences:
        if not seq:
            continue
        nxt = next_same_index(seq)
        n = len(seq)
        for i in range(n):
            j = nxt[i]
            if j > i:
                final_windows.append(seq[i:j])
    return final_windows

def windows_by_anchor(sequences):
    """
    This function groups dynamic windows by their anchor first element.
    parameters :
        sequences - list of list(int)
    Returns:
        window - dict where window[a] is the list of windows starting with anchor a.

        For example: For sequence [1,2,3,1,4,2], the dynamic windows are: [1,2,3],[1,4,2]
        Output: 1 : [[1,2,3],[1,4,2]]
    """
    window = {}
    for w in dynamic_window_sequence(sequences):
        if not w:
            continue
        a = w[0]
        if a not in window:
            window[a] = []
        window[a].append(w)
    return window

def candidate_allowed(pattern, x, repeated_event_id):
    """
    This function is used to check if at any stage of cartesian we are able to append that with x.
    Parameters:
        pattern - tuple(int) - (6,7,8)
        x - int - event_id in the sequence 6 or 7 or 8
        repeated_event_id - bool - flag to allow or disallow repeated event ids in the pattern.
    Returns:
        True if appending x to pattern is allowed, False otherwise.
    
        For example : (6,7,8), x=6 - This return true if repeated_event_id flag is True else False
                      (6,7,8), x=8 - This return false in both cases as adjacent duplicates are not allowed.
    """
    if not pattern: 
        return True
    if x == pattern[-1]:
        return False 
    if repeated_event_id is False and x in pattern:
        return False 
    return True

def generate_frequent_patterns(sequences, min_sup, max_len=None, repeated_event_id=False):
    """
    This function helps to mine frequent continuous patterns over dynamic windows indexed by anchor.

    For each candidate pattern p = (a, x, y, ...), support is the number of dynamic windows that START with anchor 'a' and contain p as a contiguous
    substring. Candidates are generated Apriori-style via right-extensions and filtered by `candidate_allowed`:
      - No adjacent duplicates (always).
      - If `repeated_event_id` is False: no repeated symbols anywhere in p.
      - If `repeated_event_id` is True : repeated symbols allowed if non-adjacent.

    Parameters:
        sequences -  (list[list[int]]): sequences of event IDs.
        min_sup (int): Minimum window-support to keep a pattern if greater than or equal to min_sup.
        max_len (int or None): Maximum pattern length K to explore. If None, expand until no further candidates survive.
        repeated_event_id (bool): Whether to allow non-adjacent repeats in patterns.

    Returns:
        dict[tuple[int, ...], int]: Map from pattern tuple to its window-support.

    """
    by_anchor = windows_by_anchor(sequences)

    all_patterns = {}
    S1 = set()                          # For S1 cartesian
    for a, wins in by_anchor.items():
        c = len(wins)
        if c >= min_sup:
            all_patterns[(a,)] = c
            S1.add(a)

    cur_level = {}
    if len(S1) > 0:
        candidate_len2 = set()
        for i in S1:
            for j in S1:
                if not candidate_allowed((i,), j, repeated_event_id):
                    continue
                candidate_len2.add((i, j))
        if candidate_len2:
            counts2 = defaultdict(int)
            for cand in candidate_len2:
                a = cand[0]
                for w in by_anchor.get(a, []):
                    if has_substring(cand, w):
                        counts2[cand] += 1
            for p, c in counts2.items():
                if c >= min_sup:
                    cur_level[p] = c
                    all_patterns[p] = c

    level = 3           # From S3
    while cur_level:
        if max_len is not None and level > max_len:
            break

        candidates = set()
        for p in cur_level.keys():
            for x in S1:
                if not candidate_allowed(p, x, repeated_event_id):
                    continue
                candidates.add(p + (x,))

        if not candidates:
            break

        counts_k = defaultdict(int)
        for cand in candidates:
            a = cand[0]
            for w in by_anchor.get(a, []):
                if has_substring(cand, w):
                    counts_k[cand] += 1

        next_level = {}
        for p, c in counts_k.items():
            if c >= min_sup:
                next_level[p] = c
                all_patterns[p] = c

        if not next_level:
            break

        cur_level = next_level
        level += 1

    return all_patterns


def compress_patterns(patterns):
    """
    Compress frequent patterns by removing redundant shorter ones.
    For each pattern P, if there exists a longer pattern Q such that:
        - P appears as a CONTIGUOUS substring of Q, and has support(P) == support(Q), then P is dropped (only Q is kept).

    Parameters:
        patterns (dict[tuple[int, ...], int]): Dictionary mapping each pattern tuple to its support count.

    Returns:
        dict[tuple[int, ...], int]: Compressed dictionary containing only longest unique-support patterns.

    For example:
        Input  : {(6,7):3, (6,7,8):3, (6,8):2}
        Output : {(6,7,8):3, (6,8):2} Because (6,7) is a substring of (6,7,8) and both have same support=3.
    """
    items = list(patterns.items())
    items.sort(key=lambda x: (-len(x[0]), x[0]))
    compressed_patterns = {}
    for pattern, support in items:
        drop = False
        for other in compressed_patterns.keys():
            if patterns[other] == support and has_substring(pattern, other):
                drop = True
                break
        if not drop:
            compressed_patterns[pattern] = support
    return compressed_patterns

def support_patterns(sequences, patterns):
    """
    Compute per-sequence support for continuous patterns using dynamic windows.
    For each pattern p (a tuple of event IDs), compute:
      - across: number of sequences in which p occurs at least once as a continuous substring inside any dynamic window.
      - in_file_avg: average number of continous occurrences per sequence, computed ONLY over sequence where p occurs (current sequence).

    Computing support:
      1) For each file (sequence), we build dynamic windows and group them by their anchor (window[0]).
      2) For a pattern p = (a, ...), we look only at windows anchored at 'a' and sum count_substring(p, window) across those windows.
      3) If the summed count for that sequence > 0, the sequence contributes to 'across' and contributes its count toward the present-only average.

    Parameters:
        sequences (list[list[int]]): List traces as sequences of event IDs.
        patterns (Iterable[tuple[int, ...]]): Patterns to measure.

    Returns:
        dict[tuple[int, ...], dict]: For each pattern p, a dict with:
            {
              "across": int,
              "in_file_avg": float
            }
    """
    support = {}
    if not patterns:
        return support

    sequence_window_by_anchor = []
    for seq in sequences:
        seq_1 = {}
        for w in dynamic_window_sequence([seq]):
            if not w:
                continue
            a = w[0]
            seq_1.setdefault(a, []).append(w)
        sequence_window_by_anchor.append(seq_1)

    per_sequence_counts = {p: [0]*len(sequences) for p in patterns}

    for i, t in enumerate(sequence_window_by_anchor):
        for p in patterns:
            if not p:
                continue
            a = p[0]
            cnt = 0
            for w in t.get(a, []):
                cnt += count_substring(p, w)          
            per_sequence_counts[p][i] = cnt

    for p, vec in per_sequence_counts.items():
        present = sum(1 for v in vec if v > 0)
        total = sum(v for v in vec if v > 0)
        support[p] = {"across": present, "in_file_avg": (float(total) / float(present)) if present > 0 else 0.0}
    return support

def generateFrequentSubSequences(sequences, K, Scommon, min_sup, repeated_event_id=False):
    """
    Generate all frequent continous patterns of exact length K from a set of sequences, using dynamic windows grouped by anchor.
    This function is used for progressive discriminative mining process. At each iteration, it mines only the K-length patterns (not all lengths) 
    by extending the (K-1)-length patterns from the previous step.

    Candidate generation:
      - For K=1, each anchor (window first element) is a 1-gram.
      - For K=2, base = all frequent 1-grams (anchors) with support ≥ min_sup.
      - For K>2, base = patterns from `Scommon` having length K-1.
      - Each base pattern is extended on the right by one symbol from S1, the set of frequent anchors.

    Support counting:
      - Support(p) = number of dynamic windows that start with anchor p[0] and contain pattern p contiguously at least once.
      - Candidates with support ≥ min_sup are retained as frequent K-grams.

    repeated_event_id = False → discards all repeated symbols and adjacent duplicates
    repeated_event_id = True  → allows non-adjacent repeats but still discards adjacent duplicates

    Parameters:
        sequences : list[list[int]] - Input event sequences
        K : int - pattern length to mine
        Scommon : set[tuple[int, ...]] - Set of frequent patterns common to both good and bad sequences from the previous iteration
        min_sup : int - Minimum window-support threshold patterns with support ≥ min_sup
        repeated_event_id : bool, default=False Whether to allow repeated event IDs within a pattern

    Returns
    -------
    dict[tuple[int, ...], int] - Dictionary mapping each frequent pattern tuple of event IDs of length K to its support count.

    For example
    -------
    sequences = [[6,7,8,6,7,8,9]]
    GenerateFrequentSubSequences(sequences, K=2, Scommon=set(), min_sup=1)
    output - {(6,7): 2, (7,8): 2, (8,6): 1, (8,9): 1}
    """
    by_anchor = windows_by_anchor(sequences)

    if K == 1:
        result = {}
        for a, wins in by_anchor.items():
            c = len(wins)
            if c >= min_sup:
                result[(a,)] = c
        return result

    base = set()
    if Scommon and len(Scommon) > 0:
        for pat in Scommon:
            if len(pat) == K - 1:
                base.add(pat)
    else:
        if K == 2:
            for a, wins in by_anchor.items():
                if len(wins) >= min_sup:
                    base.add((a,))
        else:
            return {}

    S1 = set()
    for a, wins in by_anchor.items():
        if len(wins) >= min_sup:
            S1.add(a)

    candidates = set()
    for p in base:
        for x in S1:
            if not candidate_allowed(p, x, repeated_event_id):
                continue
            candidates.add(p + (x,))

    if not candidates:
        return {}

    counts_k = defaultdict(int)
    for cand in candidates:
        a = cand[0]
        for w in by_anchor.get(a, []):
            if has_substring(cand, w):
                counts_k[cand] += 1

    result = {}
    for p, c in counts_k.items():
        if c >= min_sup:
            result[p] = c
    return result

def findCommon(good_pattern_seq, bad_pattern_seq):
    """
    Function to find patterns that are common to both good and bad sequence groups at the current pattern length K.
    These common patterns are then used as the base for generating longer candidate patterns (K+1) in the next iteration.

    Parameters:
        good_pattern_seq : dict[tuple[int, ...], int] - Frequent continous patterns of length K mined from the good sequences.
        bad_pattern_seq : dict[tuple[int, ...], int] - Frequent continous patterns of length K mined from the bad sequences.

    Returns:
        set[tuple[int, ...]] - Set of pattern tuples that are present in both good and bad pattern sets.

    For example::
    good_pattern_seq = {(6,7,8): 4, (7,8,9): 3}
    bad_pattern_seq  = {(6,7,8): 2, (8,9,10): 5}
    findCommon(good_pattern_seq, bad_pattern_seq)
    output - {(6,7,8)}
    """
    common = set()
    if good_pattern_seq and bad_pattern_seq:
        for p in good_pattern_seq:
            if p in bad_pattern_seq:
                common.add(p)
    return common



def FindDiscriminative(A, B, num_A, num_B, theta=0.8, delta=0.8):
    """
    Identify discriminative patterns that occur significantly more often in one group of sequences (A) than in another group (B), based on ratio thresholds.
    A pattern p is marked as discriminative if any of the following hold:
    1. B has zero evidence of the pattern: - B_stats[p]["across"] == 0, or B_stats[p]["in_file_avg"] == 0
    2. The ratio of presence across sequences exceeds theta: (across_A / across_B) >= theta
    3. The ratio of average in-sequence occurrences exceeds delta: (in_file_avg_A / in_file_avg_B) >= delta

    Parameters:
        A : dict[tuple[int, ...], dict] - Support statistics for patterns in group A (e.g., bad or good sequences).
            Each entry has:
                {
                "across": int,        # number of sequences where pattern appears
                "in_seq_avg": float   # avg. number of continous occurrences per sequence
                }
        B : dict[tuple[int, ...], dict] - Equivalent support statistics for patterns in group B.
        num_A : int - Number of sequences in group A.
        num_B : int - Number of sequences in group B.
        theta : float, default=0.8 - Threshold for "across-sequence" ratio test (presence ratio).
        delta : float, default=0.8 - Threshold for "in-sequence" ratio test (average frequency ratio).

    Returns:
        set[tuple[int, ...]] - Set of patterns that are discriminative for group A compared to B.

    for exmaple::
        A_stats = { (6,7,8): {"across": 5, "in_file_avg": 2.3} }
        B_stats = { (6,7,8): {"across": 1, "in_file_avg": 0.4} }
        FindDiscriminative(A_stats, B_stats, nA=10, nB=10, theta=0.8, delta=0.8)
        output - {(6,7,8)}
    """
    discriminative = set()
    for p in A:
        support_a = A[p]
        if p not in B:
            discriminative.add(p)
            continue
        support_b = B[p]

        seq_a = float(support_a["across"]) / float(num_A) if num_A > 0 else 0.0
        seq_b = float(support_b["across"]) / float(num_B) if num_B > 0 else 0.0
        per_seq_count_A = support_a["in_file_avg"]
        per_seq_count_B = support_b["in_file_avg"]

        if seq_b == 0.0 or per_seq_count_B == 0.0:
            discriminative.add(p)
            continue

        if seq_b > 0.0 and (seq_a / seq_b) >= theta:
            discriminative.add(p)
            continue
        if per_seq_count_B > 0.0 and (per_seq_count_A / per_seq_count_B) >= delta:
            discriminative.add(p)
            continue
    return discriminative


def mine_discriminative_patterns_progressive(good_seqs,bad_seqs,min_sup,theta=0.8,delta=0.8,k_start=1,k_max=10,stop_when_found=True,normalize=True,repeated_event_id=False):
    """
    Mine and score discriminative continous patterns between two groups of sequences good vs bad using a two-stage DustMiner-style pipeline.
    Stage 1 — Progressive discriminative mining
      • For K = k_start..k_max:
          - Generate frequent K-grams separately from good and bad sequences via GenerateFrequentSubSequences.
          - Compute per-pattern support stats in each group for both across and in seqeunce group.
          - Mark patterns discriminative via FindDiscriminative using θ (across ratio) and δ (in-sequence ratio).
          - Update Scommon = intersection of frequent K-grams (good ∩ bad) for next K.
      • At the end of Stage 1, we keep patterns discriminative for the bad group.

    Stage 2 — Quantitative scoring
      • Determine longest discriminative length (max_len_found).
      • Build full frequent pattern lattices up to max_len_found for each group via generate_frequent_patterns(...) to get supports for all prefixes.
      • Compute normalized probabilities P_bad(p) and P_good(p) using a chain rule over continous extensions; score each pattern with delta = P_bad - P_good.
      • Return patterns with positive delta, sorted by (delta, p, length p).

    Parameters:
        good_seqs : list[list[int]] - Traces labeled as good (normal data).
        bad_seqs : list[list[int]] - Traces labeled as bad (faulty).
        min_sup : int - Minimum window-support threshold used in Stage 1
        theta : float, default=0.8 - Across-sequence presence ratio threshold (A/B) in FindDiscriminative.
        delta : float, default=0.8 - In-sequence average count ratio threshold (A/B) in FindDiscriminative.
        k_start : int, default=1 - Initial pattern length for progressive mining.
        k_max : int - Maximum pattern length to mine during Stage 1.
        stop_when_found : bool, default=True - If True, break the mining as soon as any discriminative set sequenceGood and sequencebad is non-empty.
        normalize : bool, default=True - If True, compute Stage-2 chain probabilities; if False, fall back to across-sequence rates.
        repeated_event_id : bool, default=False - allow non-adjacent repeats if True and adjacent duplicates are always disallowed.

    Returns:
        dict[tuple[int, ...], dict] - Mapping each discriminative pattern p to: {"bad": float, "good": float, "delta": float} sorted by decreasing delta

    """
    Scommon = set()
    good_discriminative = set()
    bad_discriminative = set()
    K = k_start
    length_good_seq = len(good_seqs)
    length_bad_seq = len(bad_seqs)

    while True:
        if K > k_max:
            break

        k_frequent_good_patterns = generateFrequentSubSequences(good_seqs, K, Scommon, min_sup, repeated_event_id=repeated_event_id)
        k_frequent_bad_patterns = generateFrequentSubSequences(bad_seqs,  K, Scommon, min_sup, repeated_event_id=repeated_event_id)
        print("Good sequence mine patterns at stage : ",K," is : ", k_frequent_good_patterns)
        print("Bad sequence mine patterns at stage :",K, " is :", k_frequent_bad_patterns)

        if not k_frequent_good_patterns and not k_frequent_bad_patterns:
            break

        good_seq_support = support_patterns(good_seqs, set(k_frequent_good_patterns.keys()) if k_frequent_good_patterns else set())
        bad_seq_support = support_patterns(bad_seqs,  set(k_frequent_bad_patterns.keys())  if k_frequent_bad_patterns else set())

        if k_frequent_good_patterns:
            discriminative_good = FindDiscriminative(good_seq_support, bad_seq_support, length_good_seq, length_bad_seq, theta=theta, delta=delta)
            for p in discriminative_good:
                if p in k_frequent_good_patterns:
                    good_discriminative.add(p)

        if k_frequent_bad_patterns:
            discriminative_bad = FindDiscriminative(bad_seq_support, good_seq_support, length_bad_seq, length_good_seq, theta=theta, delta=delta)
            for p in discriminative_bad:
                if p in k_frequent_bad_patterns:
                    bad_discriminative.add(p)

        Scommon = findCommon(k_frequent_good_patterns, k_frequent_bad_patterns)
        K += 1
        if stop_when_found:
            if len(good_discriminative) > 0 or len(bad_discriminative) > 0:
                break

    patterns = set(bad_discriminative)
    if not patterns:
        return {}

    max_len_found = 1
    for p in patterns:
        if len(p) > max_len_found:
            max_len_found = len(p)

    full_bad  = generate_frequent_patterns(bad_seqs,  min_sup, max_len=max_len_found, repeated_event_id=repeated_event_id)
    full_good = generate_frequent_patterns(good_seqs, min_sup, max_len=max_len_found, repeated_event_id=repeated_event_id)

    def prefix_out_sum(full):
        i = defaultdict(int)
        for q, c in full.items():
            if len(q) >= 2:
                i[q[:-1]] += c
        return i

    bad_out  = prefix_out_sum(full_bad)
    good_out = prefix_out_sum(full_good)

    def prob_chain(pattern, full, out, total_events):
        if total_events <= 0:
            return 0.0
        if len(pattern) == 1:
            return float(full.get(pattern, 0)) / float(total_events)
        val = float(full.get((pattern[0],), 0)) / float(total_events)
        if val == 0.0:
            return 0.0
        for i in range(1, len(pattern)):
            pref = pattern[:i]
            n = float(full.get(pattern[:i+1], 0))
            d = float(out.get(pref, 0))
            if n == 0.0 or d == 0.0:
                return 0.0
            val *= (n / d)
        return val

    total_bad  = sum(len(s) for s in bad_seqs)
    total_good = sum(len(s) for s in good_seqs)

    discriminative = {}
    for p in patterns:
        if normalize:
            b = prob_chain(p, full_bad,  bad_out,  total_bad)
            g = prob_chain(p, full_good, good_out, total_good)
        else:
            bad_support = support_patterns(bad_seqs,  [p])
            good_support = support_patterns(good_seqs, [p])
            b = float(bad_support.get(p, {"across":0})["across"]) / float(max(1, length_bad_seq))
            g = float(good_support.get(p, {"across":0})["across"]) / float(max(1, length_good_seq))
        d = b - g
        if d > 0:
            discriminative[p] = {"bad": b, "good": g, "delta": d} 

    discriminative_sorted = dict(sorted(discriminative.items(), key=lambda x: (-x[1]["delta"], -len(x[0]), x[0])))
    return discriminative_sorted


In [None]:
S1 = {e for seq in (good_sequences + bad_sequences) for e in seq}
k_max_1 = len(S1)

print("MAX PATTERN LENGTH:", k_max_1)


discriminative_patterns = mine_discriminative_patterns_progressive(
    good_sequences,
    bad_sequences,
    min_sup=2,
    theta=0.8, 
    delta=0.8, 
    k_start=1,
    k_max=k_max_1,
    stop_when_found=False,
    normalize=True,
    repeated_event_id=True,
)

# discriminative_patterns = mine_discriminative_patterns_progressive(
#     [[6,7,8,9,6,7,8,9,10,11,12],[6,7,8,9,10,11,12,13,6,7,8,6,7,8,9]],
#     [[6,7,8,9,6,7,8,7,9,10],[6,7,8,9,10,11,12]],
#     min_sup=2,
#     theta=0.8,
#     delta=0.8,
#     k_start=1,
#     k_max=10,
#     stop_when_found=False,
#     normalize=True,
#     repeated_event_id=True,
# )

In [None]:
discriminative_patterns

In [None]:
len(discriminative_patterns)

In [None]:
from collections import defaultdict
from math import ceil

def segment_fixed_width(sequence, width, overlap=False):
    """
    Function splits a sequence into fixed-width segments. Each segment is represented as a tuple (start_index, end_index, subsequence)
    where the indices refer to the original sequence.

    Parameters:
        sequence : list or tuple - input sequence
        width : int - segment window size. If width = 0, returns an empty list.
        overlap : bool, If False - Create non-overlapping segments of size width. If True - Create overlapping segments with stride = width - 1 
        sliding by one new element each time

    Returns:
        list[tuple[int, int, list]] - list of tuples, where each tuple contains: (start_index, end_index, subsequence) representing the start and end indices 
        and the actual slice of the sequence.

    for eg:
        sequence = [1, 2, 3, 4, 5, 6]
        segment_fixed_width(sequence, width=3, overlap=False)
        output - [(0, 3, [1, 2, 3]), (3, 6, [4, 5, 6])]

        segment_fixed_width(sequence, width=3, overlap=True)
        output - [(0, 3, [1, 2, 3]), (2, 5, [3, 4, 5]), (4, 6, [5, 6])]
    """
    if width <= 0:
        return []
    segment = []
    n = len(sequence)
    if not overlap:
        i = 0
        while i < n:
            j = min(i + width, n)
            segment.append((i, j, sequence[i:j]))
            i += width
    else:
        stride = max(1, width - 1)  
        i = 0
        while i < n:
            j = min(i + width, n)
            segment.append((i, j, sequence[i:j]))
            if j == n:
                break
            i += stride
    return segment

def score_segment(seg_seq, discrinimative_keys):
    """
    Compute a discriminative score for a segment based on pattern matches.
    For each discriminative pattern p in discriminative_keys, check whether p appears as a subsequence within the given segment `seg_seq`.
    The score is the total number of discriminative patterns that match.

    Parameters:
        seg_seq : list[int] - segment
        discriminative_keys : [tuple[int]] - discriminative patterns where each pattern is a tuple of event IDs.

    Returns:
        int - number of discriminative patterns that appear as subsequences within the segment.
    """
    count = 0
    for p in discrinimative_keys:
        if is_subsequence(p, seg_seq):
            count += 1
    return count

def merge_consecutive_segments(segs_1):
    """
    Merge consecutive segments from the same sequence into larger segments.
    This function takes a list of segment dictionaries each describing a subsequence with start/end positions, a score, and the originating sequence and merges adjacent segments that:
    Belong to the same sequence, and they are continous in position (the end of one equals the start of the next), and also Both have positive scores.
    When merged, the resulting segments: 'end' is extended to the new segments end, 'seq' is concatenated, 'score' is summed.

    Parameters:
        segs_1 : list[dict] - List of segment dictionaries. Each dictionary should contain at least:

    Returns:
        list[dict] - List of merged segment dictionaries with adjacent segments combined.

    for eg:
        segs_1 = [{"file": "A", "start": 0, "end": 3, "seq": [6,7,8], "score": 2},{"file": "A", "start": 3, "end": 6, "seq": [9,10,11], "score": 1},
        {"file": "A", "start": 6, "end": 9, "seq": [12,13,14], "score": 0},
        {"file": "B", "start": 0, "end": 3, "seq": [1,2,3], "score": 1}]
        merge_consecutive_segments(segs_1)
        output - [ {"file": "A", "start": 0, "end": 6, "seq": [6,7,8,9,10,11], "score": 3}, {"file": "A", "start": 6, "end": 9, "seq": [12,13,14], "score": 0},
        {"file": "B", "start": 0, "end": 3, "seq": [1,2,3], "score": 1}]
    """
    if not segs_1:
        return []
    segs_1.sort(key=lambda d: (d["file"], d["start"]))
    merged = []
    current = None
    for i in segs_1:
        if current is None:
            current = dict(i)
        else:
            same_file = (i["file"] == current["file"])
            contiguous = (i["start"] == current["end"])
            if same_file and contiguous and current["score"] > 0 and i["score"] > 0:
                current["end"] = i["end"]
                current["seq"] = current["seq"] + i["seq"]
                current["score"] = current["score"] + i["score"]
            else:
                merged.append(current)
                current = dict(i)
    if current is not None:
        merged.append(current)
    return merged


def stage2_mining(bad_seqs,discriminative_patterns,seg_width=50,top_k=5,max_len_stage2=None,overlap=False):
    """
    Stage-2 mining:
    This stage performs localized pattern mining on the highest-scoring faulty regions identified using discriminative patterns from Stage 1. 
    It then aggregates patterns that are consistent across all selected segments.

    Segment each bad sequence into fixed-width windows.
    Score each segment by counting how many discriminative patterns appear.
    Merge adjacent high-scoring segments from the same file.
    Select top-K merged segments based on score and length.
    Mine frequent contiguous patterns within each selected segment.
    Keep only patterns that appear in all top segments.

    Parameters:
        bad_seqs : list[list[int]] - List of faulty (bad) sequences.
        discriminative_patterns : dict or iterable[tuple[int]] - Discriminative patterns discovered in Stage 1 (keys are pattern tuples).
        seg_width : int, default=50 - Segment size for splitting each sequence.
        top_k : int, default=5 - Maximum number of top-scoring merged segments to mine.
        max_len_stage2 : int or None, default=None - Maximum pattern length mined within each selected segment.
        overlap : bool, default=False - If True generate overlapping windows during segmentation.

    Returns:
        dict[tuple[int], int] - Final dictionary of Stage-2 frequent patterns that appear in all selected top-K faulty segments with aggregated support counts.
    """
    if isinstance(discriminative_patterns, dict):
        discriminative_keys = list(discriminative_patterns.keys())
    else:
        discriminative_keys = list(discriminative_patterns)

    if not discriminative_keys:
        return {}

    segs = []
    for fidx, seq in enumerate(bad_seqs):
        pieces = segment_fixed_width(seq, seg_width, overlap=overlap)
        for (s, e, seg) in pieces:
            sc = score_segment(seg, discriminative_keys)
            segs.append({"file": fidx, "start": s, "end": e, "seq": seg, "score": sc})

    if not segs:
        return {}

    merged = merge_consecutive_segments(segs)
    if not merged:
        return {}

    order = list(range(len(merged)))
    order.sort(key=lambda i: (merged[i]["score"], len(merged[i]["seq"])), reverse=True)

    selected = []
    if top_k is None or top_k <= 0:
        for i in order:
            selected.append(merged[i])
    else:
        for i in order:
            if merged[i]["score"] <= 0:
                continue
            selected.append(merged[i])
            if len(selected) >= top_k:
                break
        if len(selected) == 0:
            take = min(top_k, len(order))
            for i in order[:take]:
                selected.append(merged[i])

    if not selected:
        return {}

    per_seg_fp = []
    for i in selected:
        mined = generate_frequent_patterns([i["seq"]], min_sup=1, max_len=max_len_stage2)
        mined = compress_patterns(mined)
        per_seg_fp.append(mined)

    if not per_seg_fp:
        return {}

    K = len(per_seg_fp)
    need = K # max(1, ceil(1.0 * K))

    appear_in = defaultdict(int)
    support_sum = defaultdict(int)
    for d in per_seg_fp:
        for p, c in d.items():
            appear_in[p] += 1
            support_sum[p] += c

    final_seq = {}
    for p, kcount in appear_in.items():
        if kcount >= need:
            final_seq[p] = support_sum[p]

    final_seq = compress_patterns(final_seq)
    return final_seq

In [None]:
stage2_patterns = stage2_mining(
    bad_seqs=bad_sequences,
    discriminative_patterns=discriminative_patterns,
    seg_width=50,
    top_k=5,
    max_len_stage2=None,
    overlap=False
)

In [None]:
stage2_patterns

In [None]:
df1 = pd.DataFrame([
    {'Pattern': pattern, 'Bad Support': v['bad'], 'Good Support': v['good'], 'Difference': v['delta']}
    for pattern, v in sorted(discriminative_patterns.items(), key=lambda x: -x[1]['delta'])
])

df2 = pd.DataFrame([
    {'Pattern': pattern, 'Support': support}
    for pattern, support in sorted(stage2_patterns.items(), key=lambda x: -x[1])
])

from IPython.display import display
print("Stage 1: Discriminative Patterns")
display(df1)

print("\nStage 2: Infrequent Root-Cause Patterns (from top-K segments)")
display(df2)

In [None]:
stage2_pattern_sorted = sorted(stage2_patterns.items(), key=lambda x: (-x[1], -len(x[0]), x[0]))
top5_stage2_patterns = stage2_pattern_sorted[:5]

In [None]:
top5_stage2_patterns

In [None]:
import json
from math import ceil

LABEL_PATH = './trace_data/theft_protection/single_thread/version_4/faulty_data/gt_test_data_labels.json'
MIN_COVERAGE = 0.60 

def normalize_patterns(discrinimative_obj):
    if discrinimative_obj is None:
        return []
    if isinstance(discrinimative_obj, dict):
        seq = discrinimative_obj.keys()
    else:
        seq = discrinimative_obj

    out = []
    for p in seq:
        if isinstance(p, (list, tuple, set)):
            out.append(tuple(p))
    return out

def is_continous_subsequence(pattern, seq):
    if not pattern:
        return False
    pattern = tuple(pattern)
    seq = tuple(seq)
    m, n = len(pattern), len(seq)
    if m > n:
        return False
    for i in range(n - m + 1):
        if seq[i:i+m] == pattern:
            return True
    return False

def compare_gt(gt_seq, patterns, min_cov):
    if not gt_seq:
        return []
    need_len = ceil(min_cov * len(gt_seq))
    matched = []
    for P in patterns:
        if len(P) >= need_len and is_continous_subsequence(P, gt_seq):
            matched.append(P)
    return matched

def normalize_gt_sequences(gt_seq_list):
    if not gt_seq_list:
        return []
    if isinstance(gt_seq_list[0], int):
        return [gt_seq_list]
    else:
        return [seq for seq in gt_seq_list if isinstance(seq, list) and len(seq) > 0]

with open(LABEL_PATH, "r") as f:
    label_map = json.load(f)

discriminative_patterns_seq = normalize_patterns(stage2_patterns)

tp = fp = fn = tn = 0
matched_patterns = set()   

for fname, raw_gt in label_map.items():
    gt_seqs = normalize_gt_sequences(raw_gt)
    actual_positive = len(gt_seqs) > 0
    predicted_positive = False

    if actual_positive:
        for gt_seq in gt_seqs:
            matched_here = compare_gt(gt_seq, discriminative_patterns_seq, MIN_COVERAGE)
            if matched_here:
                matched_patterns.update(matched_here) 
                predicted_positive = True
                break
    else:
        predicted_positive = False

    if actual_positive and predicted_positive:
        tp += 1
    elif actual_positive and not predicted_positive:
        fn += 1
    elif not actual_positive and predicted_positive:
        fp += 1
    else:
        tn += 1

if tp + fp > 0:
    precision = tp / (tp + fp)
else:
    precision = 0.0

if tp + fn > 0:
    recall = tp / (tp + fn)
else:
    recall = 0.0

if (precision + recall) > 0:
    f1 = (2 * precision * recall) / (precision + recall)
else:
    f1 = 0.0

print(f"TP={tp} FP={fp} FN={fn} TN={tn}")
print(f"precision={precision:.3f}  recall={recall:.3f}  f1={f1:.3f}")

matched_patterns = sorted(matched_patterns, key=lambda x: (len(x), x))
print(f"\nMatched {len(matched_patterns)} discriminative patterns with GT sequences:")

avg_value_length_tp_discriminative = 0
for p in matched_patterns:
    print(p)
    avg_value_length_tp_discriminative += len(p)

avg_value_length_tp_discriminative = avg_value_length_tp_discriminative / len(matched_patterns) if matched_patterns else 0



In [None]:
avg_value_length_tp_discriminative