<div style="text-align: center; font-size: 40px;">
    <b>Final Project</b>
    <br>
    Jarry Guillaume
    <br>
    
</div>


In [27]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import arff
import pandas as pd
import kmedoids 
import os 
import glob

For this project, we will use the categorical Datasets from the ADRepository-Anomaly-detection-datasets github repository. It is available here : 

- https://github.com/GuansongPang/ADRepository-Anomaly-detection-datasets?tab=readme-ov-fil

Since our article is focused on aonmaly detection for discrete timeseries, these dataset will allow us to deploy some of the techniques showcased in the article. Let's start ! 

In [3]:
folder_path = "ADRepository-Anomaly-detection-datasets/categorical data/"
datasets = []

for filepath in glob.glob(os.path.join(folder_path, "*")):
    try: 
        data, meta = arff.loadarff(filepath)
        datasets.append((data, meta))
    except: 
        print(f"Error while parsing file : {filepath}")

## Intro : Generating Synthetic Data :

Since the data we tried to find online for synthetic timeseries was rarely labeled, we propose to generate some synthetic data so that we can test and implement as many algorithm for our discreet anomaly detection library. We can then test our algorithm on some real, less labeled data.

### Markovian models :     

This class will generate synthetic data that creates Markovian Discreet sequences.

In [4]:
ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

In [5]:
class MarkovianSequences: 
    def __init__(self, transition_matrix, hidden_matrix=None, n_sequences=100, sequence_length=50): 
        self.transition_matrix = transition_matrix
        self.n_symbols = len(transition_matrix)
        self.symbols = ALPHABET[:self.n_symbols]
        self.sequence_length = sequence_length
        self.n_sequences = n_sequences
        self.hidden_matrix = hidden_matrix
        self.check_probabilities()
    
    def check_probabilities(self): 
        for i in range(self.transition_matrix.shape[0]):
            if not np.isclose(np.sum(self.transition_matrix[i]), 1.0):
                raise ValueError(f"Row {i} of transition_matrix does not sum to 1.")

        if self.hidden_matrix is not None:
            if self.hidden_matrix.shape[0] != self.hidden_matrix.shape[1]:
                raise ValueError("hidden_matrix must be square.")
            for i in range(self.hidden_matrix.shape[0]):
                if not np.isclose(np.sum(self.hidden_matrix[i]), 1.0):
                    raise ValueError(f"Row {i} of hidden_matrix does not sum to 1.")
            self.n_hidden = self.hidden_matrix.shape[0]
            if self.n_hidden != self.transition_matrix.shape[0]:
                raise ValueError("Number of hidden states does not match the dimension of transition_matrix.")
            
    def generate_sequence(self, initial_state=None): 
        if initial_state is None:
            current_state = np.random.choice(self.n_symbols)
        else:
            current_state = initial_state

        sequence = [self.symbols[current_state]]
        for _ in range(self.sequence_length - 1):
            next_state = np.random.choice(self.n_symbols, p=self.transition_matrix[current_state])
            sequence.append(self.symbols[next_state])
            current_state = next_state
        return sequence
    
    def generate_hidden_sequence(self, initial_state=None):       
        if initial_state is None:
            current_hidden_state = np.random.choice(self.n_hidden)
        else:
            current_hidden_state = initial_state

        current_symbol = np.random.choice(self.n_symbols, p=self.transition_matrix[current_hidden_state])
        sequence = [self.symbols[current_symbol]]

        for _ in range(self.sequence_length - 1):
            next_hidden_state = np.random.choice(self.n_hidden, p=self.hidden_matrix[current_hidden_state])
            emitted_symbol = np.random.choice(self.n_symbols, p=self.transition_matrix[next_hidden_state])
            sequence.append(self.symbols[emitted_symbol])
            current_hidden_state = next_hidden_state

        return sequence

    def generate_all_sequences(self):
        all_seqs = []
        for _ in range(self.n_sequences):
            if self.hidden_matrix is not None: 
                seq = self.generate_hidden_sequence()
            else: 
                seq = self.generate_sequence()
            all_seqs.append(seq)
        return all_seqs

In [6]:
class MarkovianDatasetGenerator: 
    def __init__(self, transition_matrices, hidden_matrices, n_sequences=100, sequence_length=50): 
        self.transition_matrices = transition_matrices
        self.hidden_matrices = hidden_matrices 
        self.n_sequences = n_sequences
        self.sequence_length = sequence_length
        self.generators = self.init_transform()
    
    def init_transform(self):
        self.generators = []
        self.dataset = []
        for transition_matrix, hidden_matrix in zip(self.transition_matrices, self.hidden_matrices): 
            generator = MarkovianSequences(transition_matrix, 
                                           hidden_matrix=hidden_matrix, 
                                           n_sequences=self.n_sequences, 
                                           sequence_length=self.sequence_length) 
            self.generators.append(generator)
        return self.generators

    def generate(self): 
        self.dataset = []
        for generator in self.generators: 
            sequences = generator.generate_all_sequences()
            self.dataset.extend(sequences)

        return self.dataset

And now let us generate a dataset mixing a hidden Markow model and two markow models and let us wrap them up into the same dataset. We will also add an anomaly dataset, which will be another Markov model, with different probabilities.

In [12]:
transition_matrix1 = np.array([
    [0.7, 0.2, 0.1],
    [0.1, 0.7, 0.2],
    [0.2, 0.1, 0.7]
])

hidden_matrix1 = np.array([
    [0.7, 0.2, 0.1],
    [0.1, 0.8, 0.1],
    [0.2, 0.2, 0.6]
])

transition_matrix2 = np.array([
    [0.6, 0.3, 0.1],  # Emission distribution from hidden state 0
    [0.2, 0.5, 0.3],  # Emission distribution from hidden state 1
    [0.1, 0.2, 0.7]   # Emission distribution from hidden state 2
])

N = 5  
transition_matrix3 = np.random.rand(N, N)  
row_sums = transition_matrix3.sum(axis=1, keepdims=True)
transition_matrix3 = transition_matrix3 / row_sums

transition_matrices = [transition_matrix1, transition_matrix2, transition_matrix3]
hidden_matrices = [hidden_matrix1, None, None ]

In [14]:
generator = MarkovianDatasetGenerator(transition_matrices, hidden_matrices, n_sequences=200, sequence_length=50)
train_dataset = generator.generate()

In [15]:
N = 3
transition_matrix4 = np.random.rand(N, N)  
row_sums = transition_matrix4.sum(axis=1, keepdims=True)
transition_matrix4 = transition_matrix4 / row_sums
test_generator = MarkovianDatasetGenerator([transition_matrix4, transition_matrix3], [None, None], n_sequences=20, sequence_length=50)
test_dataset = test_generator.generate()

# I) Kernell-Based Techniques 

In [None]:
class KernellBase: 
    def __init__(self, dataset, similarity_metric): 
        self.dataset = dataset
        self.similarity_metric = similarity_metric
        self.similarity_matrix = None
        self.medoids = None

    def compute_similarity_matrix(self): 
        n = len(self.dataset)
        self.similarity_matrix = np.zeros((n, n))
        for i in range(n):
            for j in range(i, n):
                sim = self.similarity_metric(self.dataset[i], self.dataset[j])
                self.similarity_matrix[i,j] = sim
                self.similarity_matrix[j,i] = sim  # Symmetric
        
        self.distance_matrix = 1 - self.similarity_matrix
        return self.similarity_matrix
    
    def compute_kemedoids(self, kmax=10, kmin=1): 
        km = kmedoids.dynmsc(self.distance_matrix, kmax, kmin)
        self.medoids = [self.dataset[medoid] for medoid in km.medoids]
        return self.medoids

    def knearest_predict(self, test_sequence, k_nearest=5):
        similarities = []
        for sequence in self.dataset:  
            similarities.append(self.similarity_metric(test_sequence, sequence))
        
        similarities.sort(reverse=True)
        anomaly_score = 1 / similarities[k_nearest]
        return anomaly_score

    def clustering_predict(self, test_sequence, kmax=10, kmin=1):
        if self.similarity_matrix is None: 
            self.compute_similarity_matrix()
        if self.medoids is None:
            self.compute_kemedoids(kmax=kmax, kmin=kmin)

        max_similarity = 0
        for medoid in self.medoids: 
            max_similarity = max(max_similarity, self.similarity_metric(test_sequence, medoid))
        
        return 1 / max_similarity       

Now let us try our Kernell based methods with the longest common sequence kernell suggested in the article.

In [22]:
def LCS_length(seq1, seq2):
    len1, len2 = len(seq1), len(seq2)
    dp = [[0]*(len2+1) for _ in range(len1+1)]
    for i in range(1, len1+1):
        for j in range(1, len2+1):
            if seq1[i-1] == seq2[j-1]:
                dp[i][j] = dp[i-1][j-1] + 1
            else:
                dp[i][j] = max(dp[i-1][j], dp[i][j-1])
    return dp[len1][len2]

def nLCS(seq1, seq2):
    lcs = LCS_length(seq1, seq2)
    return lcs / ( (len(seq1)*len(seq2))**0.5 )

In [45]:
kernell_based = KernellBase(train_dataset, nLCS)
distance_matrix = kernell_based.compute_similarity_matrix()
medoids = kernell_based.compute_kemedoids()

2.0833333333333335

# II) Window Based Techniques :

In [None]:
from sklearn.svm import OneClassSVM

class WindowUnsupervisedSVM: 
    def __init__(self, dataset, n_symbols):
        self.n_symbols = n_symbols
        self.dataset = self.one_hot_encoding(dataset)

    def one_hot_encoding(self): 
        one_hot_encoded = []
        for seq in self.sequences:
            mat = np.zeros((self.n_symbols, len(seq)), dtype=int)

            for pos, symbol in enumerate(seq):
                s_idx = self.symbol_to_idx[symbol]
                mat[s_idx, pos] = 1

            one_hot_encoded.append(mat)

        return one_hot_encoded
    
    def train_classifier(self): 
        self.svm = OneClassSVM(gamma='auto').fit(self.dataset)
        return self.svm
    
    def predict(self, test): 
        test = self.one_hot_encoding(test)
        return self.svm.predict(test) 

In [None]:
from sklearn.svm import SVC

class WindowBased: 
    def __init__(self, dataset, window_length): 
        self.window_length = window_length
        self.dataset = self.partition(dataset)

    def partition(self, dataset): 
        partition = []
        for sequence in dataset: 
            partition = [sequence[i*self.window_length:(i+1)*self.window_length] for i in range(int(len(sequence) // self.window_length))]
            partition.append(partition)
        
        return partition
    
    def train_lookahead_pair(self, k_look_ahead=5): 
        self.lookahead_dict = {}
        self.k_look_ahead = k_look_ahead
        for partition in self.dataset:
            for seq in partition: 
                for i in range(len(seq) - k_look_ahead):
                    pair = (seq[i], seq[i + k_look_ahead])
                    self.lookahead_dict[pair] = self.lookahead_dict.get(pair, 0) + 1
        return self.lookahead_dict

    def test_lookahead_pairs(self, test_dataset): 
        test_dataset = self.partition(test_dataset)

        anomaly_score = []
        for partition in test_dataset: 
            for sequence in partition:
                anomalies = []
                for i in range(len(sequence) - self.k_look_ahead):
                    pair = (sequence[i], sequence[i + self.k_look_ahead])
                    anomalies.append(self.lookahead_dict.get(pair, 0))

            anomaly_score.append()

        return anomaly_score
    
    def train_normal_dictionary(self): 
        self.frequency_dictionary = {}
        for partition in self.dataset: 
            for sequence in partition: 
                sequence = tuple(sequence)
                if sequence in self.frequency_dictionary.keys():
                    self.frequency_dictionary[tuple(sequence)] = self.frequency_dictionary.get(sequence) + 1

        return self.frequency_dictionary

    def test_normal_dictionary(self, test_dataset):
        test_dataset = self.partition(test_dataset) 

        anomaly_scores = []
        for partition in test_dataset: 
            anomalies = []
            for sequence in partition: 
                anomalies.append(self.frequency_dictionary.get(tuple(sequence)))
            
            score = self.process_anomaly(anomalies)
            anomaly_scores.append(score)

        return anomaly_scores
    
    def t_side(self): 
        return anomaly_score