In [None]:
%load_ext autoreload
%autoreload 2
import numpy as np
import pandas as pd
import os
os.chdir(r"..")
from ctuFaultDetector.utils import *
from ctuFaultDetector.visual import *
from ctuFaultDetector.models.deviationClassifier import *


## Data load

At first we will need to prepair the data. We have a csv file with all the signals. The columns of the csv file are:

idx, label, meas_id, Force_x, Force_y, Force_z, Torque_x, Torque_y, Torque_z

- idx is the identifier of the signal
- label is the boolean or None label of the signal
- meas_id is the identifier of the group of measurement (e.g. day of the measurement) in the form of an int 1 - n
- The other columns are signal feature columns


We create variables:

data, signals, labels, correct_signals, anom_signals

- data: list of tuples in the form of (signal : pd.DataFrame, label : bool)
- signals: list of signals in the form of pd.Dataframe
- labels: list of boolean/None labels
- correct_signals: list of signals with label "True"
- wrong_signals: list of signals with label "False"
- unlabeled_signals: list of signals with label "None"



### Loading the data.
We will begin our presentation with loading the data from the dataset. Execute the following cell to do that

In [None]:
data = load_data("./ctuFaultDetector/data/dataset.csv", id = [1,2,5,6])
signals = [i[0] for i in data]
labels = [i[1] for i in data]
correct_signals = [transform_pd_to_npy(i[0]) for i in data if i[1] == False]
anom_signals = [transform_pd_to_npy(i[0]) for i in data if i[1]==True]
print("Number of correct: ", len(correct_signals), ", Number of anomalous: ", len(anom_signals))
print()
print("Number of signals:", len(data))
print()
print("Format of a signal:\n\n", data[0])

Now that we have loaded the data, let us see the methods we developed. Let's divide the dataset into a training and testing split!

In [None]:
np.random.seed(42)
np.random.shuffle(data)
training_set, testing_set = get_n_th_fold(data, 0, small_train=False)
correct_training_signals = [transform_pd_to_npy(i[0]) for i in training_set if i[1] == False]
anom_training_signals = [transform_pd_to_npy(i[0]) for i in training_set if i[1]==True]
correct_testing_signals = [transform_pd_to_npy(i[0]) for i in testing_set if i[1] == False]
anom_testing_signals = [transform_pd_to_npy(i[0]) for i in testing_set if i[1]==True]
print(f"The length of the training set: {len(training_set)}")
print(f"Number of correct/anomalous processes: {len(correct_training_signals)}/{len(anom_training_signals)}")
print("\n")
print(f"The length of the testing set: {len(testing_set)}")
print(f"Number of correct/anomalous processes: {len(correct_testing_signals)}/{len(anom_testing_signals)}")

Ok, now we are all set up to start testing the methods.

# n-$\sigma$ classifier

We set up a deviation classifier for 6 dimensional signals with n = 3.

In [None]:
dk = deviationClassifier(6,3)

Let's train it with the prepared training dataset. We choose the accuracy criterion to optimize it for maximum accuracy.

In [None]:
dk.fit_whole_supervised_dataset(training_set, "ACC")

Let's see a basic evaluation of a signal from the testing dataset using a supervised n-$\sigma$ classifier and its visualization!

In [None]:
random_sample = np.random.randint(0, len(testing_set))
signal, label = testing_set[random_sample]
pred = dk.predict_full_signal(signal)
print(f"Prediction: {pred}")
print(f"Real label: {label}")

Now let's evaluate the whole testing set, this time without visualization.

In [None]:
def evaluate_offline_dk(classifier, testing_set):
    correct_testing_signals = [transform_pd_to_npy(i[0]) for i in testing_set if i[1] == False]
    anom_testing_signals = [transform_pd_to_npy(i[0]) for i in testing_set if i[1]==True]
    correct_perf = 0
    anomaly_perf = 0
    for sig in correct_testing_signals:
        correct_perf += not classifier.predict_full_signal(sig, vis = False)
    for sig in anom_testing_signals:
        anomaly_perf += classifier.predict_full_signal(sig, vis = False)
    print(f"Correctly predicted successfull signals: {correct_perf}/{len(correct_testing_signals)} --> TNR = {correct_perf/len(correct_testing_signals)}")
    print(f"Correctly predicted anomalous signals: {anomaly_perf}/{len(anom_testing_signals)} --> TPR =  {anomaly_perf/len(anom_testing_signals)}")
    print(f"Accuracy: {(correct_perf+anomaly_perf)/len(testing_set)}")

In [None]:
evaluate_offline_dk(testing_set)

Let's train the classifier to minimal TPR of 0.8 on the training data!

In [None]:
dk.fit_whole_supervised_dataset(training_set, "TPR", 0.8)
print("\n\nTesting results:")
evaluate_offline_dk(dk, testing_set)

Let's see the result for TNR higher than 0.9.

In [None]:
dk.fit_whole_supervised_dataset(training_set, "TNR", 0.9)
print("\n\nTesting results:")
evaluate_offline_dk(dk, testing_set)

Let's see how the unsupervised classifier performs!

## Unsupervised

We will train the unsupervised deviation classifier with an unsupervised dataset and test the results. Let's remove the labels from the training dataset and fit it to the classifier. The evaluation will be done with labeled testing data from signals that were never seen by the classifier.

In [None]:
unlabeled_training_set = [(sig, None) for sig, label in training_set]
print(unlabeled_training_set[0])

As you can see we removed the labeles.
Now that we have the dataset without labels, we can train the classifier. Let's assume the fail rate of 19\% (twice remove the worst 10\% of the dataset) and train the detector.

#### In dk.fit_whole_unsupervised_dataset:

*You can adjust the "criterion" parameter to one from ["TPR", "TNR", "ACC", "sACC"] and play around with value parameter so see all possible tunings. 

*You can also change the "success_ratio" parameter to remove more or less samples.

*Or adjust the n_of_filtration_steps to do the removal process multiple times.

In [None]:
dk.fit_whole_unsupervised_dataset(unlabeled_training_set, success_ratio = 0.9, criterion = "sACC", n_of_filtration_steps = 2)
print("\n\nTesting results:")
evaluate_offline_dk(dk, testing_set)

Let's explore the online variation of this method, which is capable of evaluating partial signals.

# Online detector

The online training takes more time than the offline training. Run the next cell.

In [None]:
dk.online_fit(training_set, criterion = "sACC", value = 3, print_progress = True)

Let's test a random sample from the testing dataset with visualization

In [None]:
random_sample = np.random.randint(0, len(testing_set))
signal, label = testing_set[random_sample]
signal = transform_pd_to_npy(signal)
pred = dk.predict_partial_signal(transform_pd_to_npy(signal[:700, :]))
print(f"Prediction: {pred}")
print(f"Real label: {label}")

Now that we have trained the classifier, we will measure its performance.

In [None]:
timestamps = np.linspace(0, 1000, 10)
counter = 0
positive = 0
totally_wrong_anom_signals = 0
for signal in correct_testing_signals:
    for timestamp in timestamps:
        counter += 1
        positive += dk.predict_partial_signal(signal[:int(timestamp), :], vis = False)
    TP_pos, TN_pos, FP_pos, FN_pos = 0, counter - positive, positive, 0
    TP_anom, TN_anom, FP_anom, FN_anom = 0,0,0,0
for signal in anom_testing_signals:
    lastpred = False
    for timestamp in timestamps:
        counter += 1
        prediction = dk.predict_partial_signal(signal[:int(timestamp), :], vis = False)
        if not lastpred and not prediction:
            if timestamp == timestamps[-1]:
                TN_anom -= len(timestamps)
                FN_anom += len(timestamps)
                totally_wrong_anom_signals += 1
            TN_anom += 1
        elif (not lastpred and prediction) or (lastpred and prediction):
            TP_anom += 1
        elif lastpred and not prediction:
            FN_anom += 1
        lastpred = prediction or lastpred
TP, TN, FP, FN = TP_pos + TP_anom, TN_pos + TN_anom, FP_pos + FP_anom, FN_pos + FN_anom
tpr = TP/(TP+FN)
fpr = 1-(TN/(FP+TN))
print(f"\n\n\nTP: {TP}, TN: {TN}, FP: {FP}, FN: {FN}\n\n")
print(f"Accuracy {(TP+TN)/(TP+TN+FP+FN)}")
print(f"TPR: {tpr}, FPR: {fpr}")
print(f"TP+FN: {TP+FN}, TN+FP: {TN+FP}")
print(f"Poorly predicted anom signals as a whole: {totally_wrong_anom_signals}")