# Model development for anomaly detection

Generally, what we are trying to do is to estimate the density of the "normal" - "good" scenario

Two approaches were followed

- Estimating the density by identifying mean and covariance of the "normal/good" distribution. Then using Mahalanobis distance to see how far a data instance is from a distribution.
- Kernel based density estimation

In [None]:
import os
import sys
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import copy
# Append library path
lib_path = os.path.join(os.path.dirname(os.getcwd()), "lib")
sys.path.append(lib_path)
import numpy as np
import boto3
import time
import data_prep, feature_extraction, model_evaluations
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KernelDensity
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from sklearn import linear_model
from sklearn.neighbors import LocalOutlierFactor

# Loading the data

Adding all required files

In [None]:
# Base data directory location
data_loc = os.path.join(os.path.dirname(os.getcwd()), "DATA")

# file name
file_names = {
    0: "machine_ON_no-ref_start-error_1.csv",  # Machine turned ON, and the parameter switch enable error
    1: "machine_ON_no-ref_start-error_2.csv",
    2: "machine_ON_no-ref_start-error_3.csv",
    3: "machine_ON_no-ref_start-error_4.csv",
    4: "machine_ON_ref_no-error_1.csv",  # Machine ON referenced and no-error idling
    5: "machine_ON_ref_no-error_2.csv",  # Machine ON referenced and no-error idling
    6: "machine_ON_ref_no-error_3.csv",
    7: "machine_ON_ref_no-error_4.csv",
    8: "machine_ON_ref_no-error_5.csv",
    9: "machine_ON_ref_no-error_6.csv",
    10: "machine_ON_ref_no-error_7.csv",
    11: "machine_ON_ref_no-error_8.csv",
    12: "machine_ON_ref_no-error_9.csv",
    13: "machine_ON_ref_no-error_10.csv",
    14: "machine_ON_ref_overtravel-error_x_neg_1.csv",  # Machine ON referenced and Overtravel for X negative
    15: "machine_ON_ref_overtravel-error_x_pos_1.csv",  # Machine ON referenced and Overtravel for X positive
    16: "machine_ON_no-ref_overtravel-error_x_neg_1.csv",  # Machine ON not-referenced and Overtravel for X negative
    17: "machine_ON_no-ref_overtravel-error_x_pos_1.csv", # Machine ON not-referenced and Overtravel for X positive
    18: "machine_ON_ref_overtravel-error_x_neg_axes-extreme_1.csv", # Reference and overtravel in X
    19: "machine_ON_ref_overtravel-error_x_neg_axes-extreme_2.csv", # Referenced and overtravel in X
    20: "machine_ON_ref_overtravel-error_x_pos_axes-extreme_1.csv", # Referenced and overtravel in X
    21: "machine_ON_ref_overtravel-error_y_neg_axes-extreme_1.csv",  # Machine ON referenced and Overtravel for Y negative
    22: "machine_ON_ref_overtravel-error_y_neg_1.csv", # Machine and ON referenced and Overtravel in Y
    23: "machine_ON_ref_overtravel-error_y_pos_1.csv",  # Machine ON referenced and Overtravel for Y positive
    24: "machine_ON_ref_overtravel-error_y_pos_axes-extreme_1.csv",
    25: "machine_ON_ref_overtravel-error_z_neg_1.csv",  # Machine ON referenced and Overtravel for Z negative
    26: "machine_ON_ref_overtravel-error_z_neg_axes-extreme_1.csv",
    27: "machine_ON_ref_overtravel-error_z_pos_1.csv",  # Machine ON referenced and Overtravel for Z positive
    28: "machine_ON_ref_overtravel-error_z_pos_axes-extreme_1.csv",
    29: "machine_ON_no-ref_1.csv",
    30: "machine_ON_no-ref_2.csv"
}

In [None]:
# load the data
index = 5
df = pd.read_csv(os.path.join(data_loc, file_names[index]), header="infer", index_col="no")
fig = plt.figure(figsize=(25, 5))
axs = fig.add_axes([0, 0, 1, 1])
df["PowerSum"][-120:].plot(ax=axs)

# Data preparation

- Segment
- Identify the anomalous and non-anomalous class
- Feature extraction
- Generate training data

## Segmentation

In [None]:
segment_secs = 60
wavelet_nperseg = 15

In [None]:
# Dont choose "no" and "sample_time" as they will be added later to the beginning
# Chosen - Three different power components for three phases
chosen_cols = ["Power1", "Power2", "Power3", "PowerReac1", "PowerReac2", "PowerReac3", "PowerApp1", "PowerApp2", "PowerApp3"]
segmented_data = {}
for index, file_name in file_names.items():
    path = os.path.join(data_loc, file_name)
    temp = data_prep.segment_data(file_name=path, col_names=chosen_cols, segment_secs=segment_secs)
    # Remove the sample_time col
    temp = temp[:, 1:, :]
    segmented_data[file_name] =  temp

In [None]:
# Print to ensure that segmentation is successful
for file_name in segmented_data.keys():

    sys.stdout.write(f"For the file-{file_name} the shape-{segmented_data[file_name].shape}\n")

## Determine classes

- Anomaly - 0
- Not Anomaly - 1


In [None]:
# Associations between the classes and the files in this study
class_file_association = {
    "on-ref": ["machine_ON_ref_no-error_1.csv", "machine_ON_ref_no-error_2.csv", "machine_ON_ref_no-error_3.csv", "machine_ON_ref_no-error_4.csv", "machine_ON_ref_no-error_5.csv", "machine_ON_ref_no-error_6.csv", "machine_ON_ref_no-error_7.csv", "machine_ON_ref_no-error_8.csv", "machine_ON_ref_no-error_9.csv", "machine_ON_ref_no-error_10.csv"],# "machine_ON_no-ref_1.csv", "machine_ON_no-ref_2.csv"],

    "on-noref-error": ["machine_ON_no-ref_start-error_1.csv", "machine_ON_no-ref_start-error_2.csv", "machine_ON_no-ref_start-error_3.csv", "machine_ON_no-ref_start-error_4.csv"],

    "overtravel-x": ["machine_ON_ref_overtravel-error_x_neg_1.csv", "machine_ON_ref_overtravel-error_x_pos_1.csv", "machine_ON_no-ref_overtravel-error_x_neg_1.csv", "machine_ON_no-ref_overtravel-error_x_pos_1.csv", "machine_ON_ref_overtravel-error_x_neg_axes-extreme_1.csv",
    "machine_ON_ref_overtravel-error_x_neg_axes-extreme_2.csv", "machine_ON_ref_overtravel-error_x_pos_axes-extreme_1.csv"],

    "overtravel-y": ["machine_ON_ref_overtravel-error_y_neg_1.csv", "machine_ON_ref_overtravel-error_y_pos_1.csv",
                    "machine_ON_ref_overtravel-error_y_neg_axes-extreme_1.csv", "machine_ON_ref_overtravel-error_y_pos_axes-extreme_1.csv"],

    "overtravel-z": ["machine_ON_ref_overtravel-error_z_neg_1.csv", "machine_ON_ref_overtravel-error_z_pos_1.csv", "machine_ON_ref_overtravel-error_z_neg_axes-extreme_1.csv"] # , "machine_ON_ref_overtravel-error_z_pos_axes-extreme_1.csv"],
}

In [None]:
# Okay
class_segmented_data = {}
for class_instance in class_file_association.keys():
    for index, file_name in enumerate(class_file_association[class_instance]):

        if index == 0:
            class_segmented_data[class_instance] = segmented_data[file_name]
        else:
            class_segmented_data[class_instance] = np.append(class_segmented_data[class_instance], segmented_data[file_name], axis=-1)

In [None]:
# Reshape the data appropriately
for class_instance in class_segmented_data.keys():
    class_segmented_data[class_instance] = np.transpose(class_segmented_data[class_instance], (2, 1, 0))

In [None]:
# Print to ensure that the files have been loaded correctly
for class_instance in class_segmented_data.keys():

    sys.stdout.write(f"The class-{class_instance} has the shape-{class_segmented_data[class_instance].shape}\n")

# Feature Extraction

- Extract all the features

In [None]:
class_dataset_features = {}
for class_instance in class_segmented_data.keys():
    dataset_features = []
    for row in class_segmented_data[class_instance]:
        computed_features = []
        for col in row:
            freq_args = [{"axis": 0}, {"axis": 0}, {"axis": 0, "nperseg": wavelet_nperseg}]
            freq_time_args = [{"wavelet": "db1"}, {"wavelet": "db1"}, {"wavelet": "db1"}]
            # Extract all features
            computed_features += feature_extraction.compute_all_features(col, freq_args=freq_args, freq_time_args=freq_time_args)

        # Append to a list
        dataset_features.append(computed_features)

    # Add to class instance
    class_dataset_features[class_instance] = np.array(dataset_features)

In [None]:
sys.stdout.write("After feature extraction process\n\n")
for class_instance in class_dataset_features.keys():

    sys.stdout.write(f'For the class-{class_instance} , the extracted features has the shape={class_dataset_features[class_instance].shape}\n')

# Model development

- Estimating the mean and covariance of non-anomalous condition
- Using kernels to estimate density

In [None]:
# Get the train and testing datasets
X_train = class_dataset_features["on-ref"]

# Split the training dataset - Uncomment if needed
# X_train, X_train_test = train_test_split(X_train, test_size=0.2, random_state=42)
# Add the unseen normal class
# X_test["normal-test"] = X_train_test

# Split it by different know classes
X_test = {}
for class_name in class_dataset_features.keys():
    if class_name != "on-ref":
        X_test[class_name] = class_dataset_features[class_name]


## Custom DynamoDB Query

- Select any data from table and make a query

In [None]:
from data_loader_dynamodb import DynamoDBDataLoader

start_time = int(time.time()) - 75
end_time = int(time.time())

# Instance
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
ddb = DynamoDBDataLoader(table_name="robonano1_energy_wn", region="us-east-1", dynamodb=dynamodb)

# Query
ddb.query_data(sample_time_range=(start_time, end_time))
# Get the dataframe
ddb.get_dataframe()
# get data
data = ddb.data

# Choose appropriate columns
chosen_cols = ["Power1", "Power2", "Power3", "PowerReac1", "PowerReac2", "PowerReac3", "PowerApp1", "PowerApp2", "PowerApp3"]
data = data[chosen_cols]
data = data.to_numpy()[0:60, :]

# Feature extraction
# Apply col by col
freq_args = [{"axis": 0}, {"axis": 0}, {"axis": 0, "nperseg": 30}]
freq_time_args = [{"wavelet": "db1"}, {"wavelet": "db1"}, {"wavelet": "db1"}]
computed_features = []
for col_index in range(data.shape[1]):
    computed_features += feature_extraction.compute_all_features(data[:, col_index], freq_args, freq_time_args)
# Convert from list to numpy array
query_pred_data = np.array(computed_features)[np.newaxis, :]

## PCA and Mahalanobis distance Anomaly detection

- Reduce the dimension using PCA
- Use mahalanobis distance as the metric to identify if things go out of distribution



### Estimating mean and covariance

In [None]:
from sklearn.base import BaseEstimator, ClassifierMixin

class MahalanobisDistanceClassifer(BaseEstimator, ClassifierMixin):

    def __init__(self, threshold_level, **kwargs):

        # Get the arguments
        self.kwargs = kwargs

        # Training threshold level
        self.threshold_level = threshold_level

        # Training parameters
        self.covariance = None
        self.inv_covariance = None
        self.mean = None
        self.trained_threshold = None


    def fit(self, X, y=None):

        # Compute the centroid of the distribution
        self.covariance = np.cov(X, rowvar=self.kwargs["rowvar"]) if "rowvar" in self.kwargs.keys() else np.cov(X)
        self.inv_covariance = np.linalg.inv(self.covariance)
        self.mean = np.mean(X, axis=0)

        # Determine the threshold
        training_distances = self.__distance_distribution(X, self.mean, self.inv_covariance)
        self.trained_threshold = self.__compute_threshold(training_distances, level=self.threshold_level)

    def predict(self, X, y=None, distance_type="mahalanobis"):

        # Compute the distance
        if distance_type == "mahalanobis":
            distances = self.__distance_distribution(X, self.mean, self.inv_covariance)
            # Make predictions
            distances = np.array(distances)
            predictions = np.where(distances < self.trained_threshold, 0, 1)

        else:
            raise Exception("Distance metric not implemented")

        return predictions

    @staticmethod
    def __compute_threshold(distances, level):
        # Compute mean and STD
        normal_distances_mean = np.mean(distances)
        normal_distances_std = np.std(distances)

        return normal_distances_mean + (level * normal_distances_std)

    @staticmethod
    def __distance_metric(X, mean, inv_cov, metric_type="mahalanobis"):

        assert len(X.shape) == 2, "The X for prediction must be an array, and not a vector"

        # Distance metric
        if metric_type == "mahalanobis":
            # difference
            difference = (X - mean).T

            return np.sqrt(difference.T.dot(inv_cov).dot(difference))

        else:
            raise Exception("Metric not defined")

    def __distance_distribution(self, X, mean, inv_cov, metric_type="mahalanobis"):

        dd = []
        for index, item in enumerate(X):
            distance = self.__distance_metric(item[np.newaxis, :], mean, inv_cov, metric_type=metric_type).squeeze()
            dd.append(distance.tolist())

        return dd

    def compute_distributions(self, X):

        # Compute the distances for different data
        distances = self.__distance_distribution(X, self.mean, self.inv_covariance)

        return distances


In [None]:
# Apply PCA
pca = PCA(n_components=140, svd_solver="full")

# Transform data
X_train_PCA = pca.fit_transform(X_train)

X_test_PCA = {}
for class_name in X_test:
    X_test_PCA[class_name] = pca.transform(X_test[class_name])

### Mahalanobis distance distribution
- Distance from the center of the distribution for the non-anomalous data

In [None]:
# Model initialization and fitting
md = MahalanobisDistanceClassifer(threshold_level=3, rowvar=False)
# Fit the model
md.fit(X_train_PCA)

In [None]:
# Compute the distance for both Train and Test
distances_training = md.compute_distributions(X_train_PCA)
distances_testing = {}
for class_name in X_test_PCA.keys():
    distances_testing[class_name] = md.compute_distributions(X_test_PCA[class_name])

# Merged dictionaries
distributions = copy.deepcopy(distances_testing)
distributions["normal"] = distances_training

In [None]:
# Creating figures
fig = plt.figure(figsize=(10, 8))
axs = fig.add_axes([0, 0, 1, 1])

# Plotting the distributions
dist_plot = sns.kdeplot(data = distributions, fill=True, ax=axs, palette="pastel")
# Setting labels and properties
dist_plot.set_xlabel("Mahalanobis Distance")
dist_plot.set_title("Mahalanobis distance distribution")
dist_plot.set_xlim([0, 250])

In [None]:
# Plotting the combined anomalous class
distances_testing_combined = []
for class_name in distances_testing.keys():
    distances_testing_combined += distances_testing[class_name]
# Get the distributions
distributions = dict()
distributions["normal"] = distances_training
distributions["anomaly"] = distances_testing_combined

In [None]:
# Creating figures
fig = plt.figure(figsize=(10, 8))
axs = fig.add_axes([0, 0, 1, 1])

# Plotting the distributions
dist_plot = sns.kdeplot(data = distributions, fill=True, ax=axs, palette="pastel")
# Setting labels and properties
dist_plot.set_xlabel("Mahalanobis Distance")
dist_plot.set_title("Mahalanobis distance distribution")
dist_plot.set_xlim([0, 250])

### Determine Threshold

- Generally 3$\sigma$ from the mean

In [None]:
# Use the PCA to reduce dimension
pca = PCA(n_components=140, svd_solver="full")

# Transform data
X_train_PCA = pca.fit_transform(X_train)

X_test_PCA = {}
for class_name in X_test:
    X_test_PCA[class_name] = pca.transform(X_test[class_name])


### Test the threshold

- Apply and compare performance

In [None]:
# Initialize evaluations
kwargs = {
    "accuracy_score": {},
    "balanced_accuracy_score": {},
    "f1_score": {},
    "recall_score": {},
    "precision_score": {},

}

# Dataset after PCA application
threshold_levels = [1, 2, 3, 4]
evaluations = {}
for threshold_level in threshold_levels:
    # Model initialization and fitting
    md = MahalanobisDistanceClassifer(threshold_level=threshold_level, rowvar=False)
    # Fit the model
    md.fit(X_train_PCA)

    # Make predictions
    y_pred1 = md.predict(X_train_PCA)
    y_true1 = np.repeat(0, len(y_pred1))
    for index, class_name in enumerate(X_test_PCA.keys()):
        temp = md.predict(X_test_PCA[class_name])
        if index == 0:
            y_pred2 = temp
        else:
            y_pred2 = np.concatenate([y_pred2, temp])
    y_true2 = np.repeat(1, len(y_pred2))

    # Performance metrics
    model_eval = model_evaluations.ModelEval()
    evaluations[round(md.trained_threshold, 2)] = copy.deepcopy(model_eval.compute_all_metrics(np.concatenate([y_true1, y_true2]), np.concatenate([y_pred1, y_pred2]), kwargs=kwargs))

# Get it as dataFrame
evaluations = pd.DataFrame(evaluations)
evaluations = evaluations.T.copy()

In [None]:
# Display the evaluations
evaluations

### Plotting results

In [None]:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_axes([0, 0, 1, 1])

# Evaluation labels
labels = ["1$\sigma$", "2$\sigma$", "3$\sigma$", "4$\sigma$"]
width = 0.15
x = np.arange(4)

# Plotting
ax.bar(x - 0.225, evaluations["accuracy_score"], width, label="Accuracy Score")
ax.bar(x - 0.075, evaluations["f1_score"], width, label="F1-Score")
ax.bar(x + 0.075, evaluations["precision_score"], width, label="Precision Score")
ax.bar(x + 0.225, evaluations["recall_score"], width, label="Recall Score")

ax.set_xlabel("Thresholds")
ax.set_ylabel("Proportion")
ax.set_title("")
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.legend()

### Single shot prediction

- Visualize the outputs from prediction

In [None]:
# Do a single shot prediction
md.predict(X_train_PCA[0][np.newaxis, :])

### Pipeline testing

- Create and test pipelines

In [None]:
from sklearn.pipeline import Pipeline

# pipeline of the estimator
md_estimator = [
    ('reduce_dim', pca),
    ('clf', md)
]
md_pipeline = Pipeline(md_estimator)

In [None]:
md_pipeline.predict(X_train[0][np.newaxis, :])

### Custom DynamoBD Test

- Test a custom query

In [None]:
md_pipeline.predict(query_pred_data)

In [None]:
md.predict(pca.transform(query_pred_data))

The above two cells should have same results

## Kernel density estimation

- Estimate the density of the normal class
- Score and evaluate to determine if there is an anomaly in the data

The Kernel density estimation does not have a predict method, can I implement that

In [None]:
class KDEAnomalyDetector(KernelDensity):

    def __init__(self, quantile_threshold, **kwargs):
        super(KDEAnomalyDetector, self).__init__(**kwargs)

        # Thresholds
        self.trained_threshold = None
        self.quantile_threshold = quantile_threshold

    def fit(self, X, y=None, sample_weight=None):

        # Fit the super class to the data
        super(KDEAnomalyDetector, self).fit(X, y, sample_weight)

        # Get the scores for the trained case
        normal_scores = super(KDEAnomalyDetector, self).score_samples(X)
        # Compute threshold from normal scores
        self.trained_threshold = np.quantile(normal_scores, q=self.quantile_threshold)

    def predict(self, X):

        # Score the sample using the super class
        scores = super(KDEAnomalyDetector, self).score_samples(X)
        # Relative to threshold - make predictions
        predictions = np.where(scores < self.trained_threshold, 1, 0)

        return predictions


In [None]:
# Apply PCA
pca = PCA(n_components=140, svd_solver="full")

# Transform data
X_train_PCA = pca.fit_transform(X_train)

X_test_PCA = {}
for class_name in X_test:
    X_test_PCA[class_name] = pca.transform(X_test[class_name])

In [None]:
# Estimate the density
# kde = KernelDensity(kernel="gaussian")
kde = KDEAnomalyDetector(quantile_threshold=0.02, kernel="gaussian")

# Fit the training data
kde.fit(X_train_PCA)

# Get scores
normal_predictions = kde.predict(X_train_PCA)
anomaly_predictions = {}
for class_name in X_test_PCA.keys():
    anomaly_predictions[class_name] = kde.predict(X_test_PCA[class_name])

### Determine threshold

- The threshold will be determined internally using the modified class

### Test the threshold

In [None]:
# Initialize evaluations
kwargs = {
    "accuracy_score": {},
    "balanced_accuracy_score": {},
    "f1_score": {},
    "recall_score": {},
    "precision_score": {},

}

# Combine all anomaly classes
for index, class_name in enumerate(anomaly_predictions.keys()):
    if index == 0:
        temp = anomaly_predictions[class_name]
    else:
        temp = np.concatenate([temp, anomaly_predictions[class_name]])

# Prediction
# y_pred1 = np.where(normal_scores < threshold, 1, 0)
y_pred1 = normal_predictions
y_true1 = np.repeat(0, X_train_PCA.shape[0])
# y_pred2 = np.where(temp < threshold, 1, 0)
y_pred2 = temp
y_true2 = np.repeat(1, temp.shape[0])

# Performance metrics
model_eval = model_evaluations.ModelEval()
evaluations = model_eval.compute_all_metrics(np.concatenate([y_true1, y_true2]), np.concatenate([y_pred1, y_pred2]), kwargs=kwargs)


In [None]:
# Print out the evaluations
evaluations

### Single shot prediction
- To understand how the results look like
- Prediction is usually a vector (n,)

In [None]:
kde.predict(X_train_PCA[0][np.newaxis, :])

### Inference
- The performance was 100%. Needs to be tested with more new-normal scenarios
    - The new normal does not work. The threshold is too tight
- Have to modify the threshold in some way
- Maybe sample all possible errors (overtravel-x, y, z etc.,) compute something that provides more space to the new normal

**A big issue here that needs to be rectified**

### Pipeline testing

- Testing if the pipeline works
- Create and predict on the training data

In [None]:
from sklearn.pipeline import Pipeline

# pipeline of the estimator
kde_estimator = [
    ('reduce_dim', pca),
    ('clf', kde)
]
kde_pipeline = Pipeline(kde_estimator)

In [None]:
kde_pipeline.predict(X_train[0][np.newaxis, :])

### Custom DynamoBD Test

- Test a custom query

In [None]:
kde_pipeline.predict(query_pred_data)

In [None]:
kde.score(pca.transform(query_pred_data))

## Isolation Forest - Outlier Estimation

- Using this because of High Dimensionality of the input dataset
- Application of PCA is optional - Depends on model's performance

In [None]:
# Apply PCA
pca = PCA(n_components=140, svd_solver="full")

# Training data
X_train_isoforest = pca.fit_transform(X_train)
# X_train_isoforest = X_train

# Testing data
X_test_isoforest = {}
for class_name in X_test:
    X_test_isoforest[class_name] = pca.transform(X_test[class_name])
    # X_test_isoforest[class_name] = X_test[class_name]

In [None]:
# Fit only on the good state of the machine
iso_forest = IsolationForest(n_estimators=1000, bootstrap=False, contamination=0.05)
# Fit the training dataset
iso_forest.fit(X_train_isoforest)

### Prediction

- Calling the predict method

In [None]:
# Predicting the training data again
y_pred1 = iso_forest.predict(X_train_isoforest)
y_true1 = np.repeat(0, X_train_isoforest.shape[0])
temp_count = 0
for index, class_name in enumerate(X_test_isoforest.keys()):
    predictions = iso_forest.predict(X_test_isoforest[class_name])
    if index == 0:
        y_pred2 = predictions
        temp_count += predictions.shape[0]
    else:
        y_pred2 = np.concatenate([y_pred2, predictions])
        temp_count += predictions.shape[0]
y_true2 = np.repeat(1, temp_count)

# Replace -1/+1
y_pred1 = np.where(y_pred1 == 1, 0, 1)
y_pred2 = np.where(y_pred2 == 1, 0, 1)

# Evaluation
# Performance metrics
model_eval = model_evaluations.ModelEval()
evaluations = model_eval.compute_all_metrics(np.concatenate([y_true1, y_true2]), np.concatenate([y_pred1, y_pred2]), kwargs=kwargs)

In [None]:
# Print the evaluations
evaluations

Performed reasonable well. Not as good as the Kernal Density Estimation. But reasonable

### Single shot prediction

- To see the shape of the final output

In [None]:
# Make a single prediction
iso_forest.predict(X_train_isoforest[100][np.newaxis, :])

### Custom DynamoBD Test

- Test a custom query

In [None]:
# PCA transform if required
query_transformed_data = pca.transform(query_pred_data)
# query_transformed_data =  query_pred_data

# Make predictions
iso_forest.predict(query_transformed_data)

## LOF (Local Outlier Factor)

- Neighbors of 20
- with novelty=True

In [None]:
# Apply PCA
pca = PCA(n_components=140, svd_solver="full")

# Training data
X_train_lof = pca.fit_transform(X_train)
# X_train_lof = X_train

# Testing data
X_test_lof = {}
for class_name in X_test:
    X_test_lof[class_name] = pca.transform(X_test[class_name])
    # X_test_lof[class_name] = X_test[class_name]

In [None]:
# Initialize the classifier
lof = LocalOutlierFactor(n_neighbors=50, novelty=True, contamination=0.05, leaf_size=100)
# Fit on the training data - with or without PCA reduction
lof.fit(X_train_lof)


### Prediction

In [None]:
# Initialize evaluations
kwargs = {
    "accuracy_score": {},
    "balanced_accuracy_score": {},
    "f1_score": {},
    "recall_score": {},
    "precision_score": {},

}

# Predicting the training data again
y_pred1 = lof.predict(X_train_lof)
y_true1 = np.repeat(0, X_train_lof.shape[0])
temp_count = 0
for index, class_name in enumerate(X_test_lof.keys()):
    predictions = lof.predict(X_test_lof[class_name])
    if index == 0:
        y_pred2 = predictions
        temp_count += predictions.shape[0]
    else:
        y_pred2 = np.concatenate([y_pred2, predictions])
        temp_count += predictions.shape[0]
y_true2 = np.repeat(1, temp_count)

# Replace -1/+1
y_pred1 = np.where(y_pred1 == 1, 0, 1)
y_pred2 = np.where(y_pred2 == 1, 0, 1)

# Evaluation
# Performance metrics
model_eval = model_evaluations.ModelEval()
evaluations = model_eval.compute_all_metrics(np.concatenate([y_true1, y_true2]), np.concatenate([y_pred1, y_pred2]), kwargs=kwargs)

In [None]:
evaluations

The performance is equivalent to Isolation Forest

### Single shot Prediction

In [None]:
# Make a single prediction
lof.predict(X_train_lof[100][np.newaxis, :])

### Custom DynamoDB Test

- Test a custom query

In [None]:
# PCA transform if required
query_transformed_data = pca.transform(query_pred_data)
# query_transformed_data =  query_pred_data

# Make predictions
lof.predict(query_transformed_data)