Load datasets

In [10]:
import pandas as pd

dfTrain=pd.read_csv("./Data Sample Centralized/Train.csv")
dfTest=pd.read_csv("./Data Sample Centralized/Test.csv")
dfValidation=pd.read_csv("./Data Sample Centralized/Validation.csv")





Columns to be analyzed

In [11]:
columns=['EngSpeed', 'EngOilPress', 'EngCoolantTemp', 'EngFuelRate', 'BatteryPotential_PowerInput1']


train the model and determine threshold values

In [12]:
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score, confusion_matrix, accuracy_score, precision_score, recall_score, roc_auc_score, average_precision_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Define the number of input features (columns in the dataset)
input_dim = len(columns)  # Number of features
encoding_dim = 128  # Number of neurons in the encoding layer

# Columns that require modifications based on specific conditions

# Define the Autoencoder model
autoencoder = keras.Sequential([
    layers.Input(shape=(input_dim,)),  # Input layer with input dimension
    layers.Dense(encoding_dim, activation='tanh'),  # First hidden layer
    layers.Dense(2, activation='tanh'),  # Bottleneck layer (compressed representation)
    layers.Dense(encoding_dim, activation='tanh'),  # Expansion layer
    layers.Dense(input_dim, activation='tanh')  # Output layer (reconstructing input)
    # layers.Dense(input_dim, activation='sigmoid')  # Alternative activation
])

# Compile the model using Adam optimizer and mean squared error loss
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# Train the autoencoder on the training dataset
autoencoder.fit(
    dfTrain[columns], dfTrain[columns], 
    epochs=100, batch_size=64, shuffle=True, verbose=False,
    validation_data=(dfValidation[columns], dfValidation[columns])
)

# Extract validation data for evaluation
x_val = dfValidation[columns]

# Generate predictions using the trained autoencoder
predictions = autoencoder.predict(x_val)

# Compute the Mean Squared Error (MSE) for each feature
# mse = pd.DataFrame(np.power(x_val - predictions, 2), columns=columns)


mse = np.mean(np.power(x_val - predictions, 2), axis=1)

# Dictionary to store the best threshold for each feature
best_thresholds = {}

# Iterate over each feature to find the best threshold using F1-score

# mse_column = mse  # Get MSE values for the column
best_f1 = 0  # Initialize best F1-score
best_threshold = 0  # Initialize best threshold value

# Try different percentile-based thresholds (from 50% to 100%)
for threshold_value in range(50, 101):
    threshold = np.percentile(mse, threshold_value)  # Compute percentile threshold
    anomaly_labels = (mse > threshold)  # Identify anomalies
    y_pred_bool = ~anomaly_labels  # Convert to boolean normal/abnormal labels
    
    # Apply additional condition for specific columns

    
    y_test = dfValidation["normal_label"]  # Get actual labels
    
    # Compute F1-score
    f1 = f1_score(y_test, y_pred_bool)
    
    # Update the best threshold if F1-score improves
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold
        best_threshold_value = threshold_value





Evaluate with test dataset

In [13]:
import numpy as np
import pandas as pd
from sklearn.metrics import (f1_score, confusion_matrix, accuracy_score, 
                             precision_score, recall_score, roc_auc_score, average_precision_score)



# Extract test data based on selected columns
x_test = dfTest[columns]

# Predict results using the autoencoder
results = autoencoder.predict(x_test)

# Compute Mean Squared Error (MSE) for each column
mse = np.mean(np.power(x_test - results, 2), axis=1)


# Iterate over columns to determine anomalies

anomaly_labels = (mse > best_threshold)
y_pred_bool = ~anomaly_labels


dfTest["normal_label_pred"] = y_pred_bool



# Extract actual and predicted labels for evaluation
y_test_global = dfTest["normal_label"]
y_pred_global = dfTest['normal_label_pred']

# Compute evaluation metrics
confusion = confusion_matrix(y_test_global, y_pred_global)
accuracy = accuracy_score(y_test_global, y_pred_global)
precision = precision_score(y_test_global, y_pred_global)
recall = recall_score(y_test_global, y_pred_global)
f1 = f1_score(y_test_global, y_pred_global)
roc_auc = roc_auc_score(y_test_global, y_pred_global)
pr_auc = average_precision_score(y_test_global, y_pred_global)

# Extract values from confusion matrix
TP = confusion[1, 1]
TN = confusion[0, 0]
FP = confusion[0, 1]
FN = confusion[1, 0]

# Compute additional evaluation metrics
TPR = TP / (TP + FN)  # Sensitivity
FNR = FN / (FN + TP)  # False Negative Rate
TNR = TN / (TN + FP)  # Specificity
FPR = FP / (FP + TN)  # False Positive Rate

# Create DataFrame to store results without additional statistics
df_resultados_globales = pd.DataFrame([{
    "Accuracy": accuracy,
    "Precision": precision,
    "Recall": recall,
    "F1 Score": f1,
    "ROC AUC": roc_auc,
    "PR AUC": pr_auc,
    "TP": TP,
    "TN": TN,
    "FP": FP,
    "FN": FN,
    "TPR": TPR,
    "FNR": FNR,
    "TNR": TNR,
    "FPR": FPR
}])

# Format percentage-based metrics properly
columns_to_exclude = ["TP", "TN", "FP", "FN"]
for col in df_resultados_globales.columns:
    if col not in columns_to_exclude:
        df_resultados_globales[col] = df_resultados_globales[col].apply(lambda x: f"{x*100:.2f}%")

# Print and save the results
print(df_resultados_globales)
df_resultados_globales.to_csv("results.csv", index=False)


  Accuracy Precision  Recall F1 Score ROC AUC  PR AUC  TP  TN  FP  FN     TPR  \
0   85.86%    95.40%  89.25%   92.22%  61.29%  95.24%  83   2   4  10  89.25%   

      FNR     TNR     FPR  
0  10.75%  33.33%  66.67%  
