## Data Preprocessing
#experimental_data_exploration

In [6]:
PATH = '/home/darshana/CYRIL_Charly_121923/PROSPEKT/PMS0001_28+0__050623_181214/Spectra/PST_001__050623_181214_det2.csv'


In [15]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tensorflow.python.keras import layers, Model

In [17]:

# Load the spectra data
dataframe = pd.read_csv(PATH, header=None)
raw_data = dataframe.values


In [None]:

# Extract labels and data points
labels = raw_data[:, -1]
data = raw_data[:, :-1]

# Split the data into training and testing sets
train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

# Normalize the data
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)
train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)
train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

# Convert labels to boolean values
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

# Separate normal and anomalous data for visualization
normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]
anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

# Define and compile the model
class AnomalyDetector(Model):
    def __init__(self):
        super(AnomalyDetector, self).__init__()
        self.encoder = tf.keras.Sequential([
            layers.Dense(32, activation="relu"),
            layers.Dense(16, activation="relu"),
            layers.Dense(8, activation="relu")])

        self.decoder = tf.keras.Sequential([
            layers.Dense(16, activation="relu"),
            layers.Dense(32, activation="relu"),
            layers.Dense(train_data.shape[1], activation="sigmoid")])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss='mae')

# Train the model
history = autoencoder.fit(normal_train_data, normal_train_data, 
                          epochs=20, 
                          batch_size=512,
                          validation_data=(test_data, test_data),
                          shuffle=True)

# Visualize training and validation losses
import matplotlib.pyplot as plt
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

# Calculate reconstruction errors for normal and anomalous data
encoded_normal_data = autoencoder.encoder(normal_test_data).numpy()
decoded_normal_data = autoencoder.decoder(encoded_normal_data).numpy()

encoded_anomalous_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_anomalous_data = autoencoder.decoder(encoded_anomalous_data).numpy()

# Visualize reconstruction for normal data
plt.plot(normal_test_data[0], 'b')
plt.plot(decoded_normal_data[0], 'r')
plt.fill_between(np.arange(data.shape[1]), decoded_normal_data[0], normal_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

# Visualize reconstruction for anomalous data
plt.plot(anomalous_test_data[0], 'b')
plt.plot(decoded_anomalous_data[0], 'r')
plt.fill_between(np.arange(data.shape[1]), decoded_anomalous_data[0], anomalous_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

# Calculate and visualize reconstruction errors distribution for normal data
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

# Calculate threshold for anomaly detection
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

# Calculate and visualize reconstruction errors distribution for anomalous data
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

# Define prediction function
def predict(model, data, threshold):
    reconstructions = model(data)
    loss = tf.keras.losses.mae(reconstructions, data)
    return tf.math.less(loss, threshold)

# Define function to print evaluation metrics
def print_stats(predictions, labels):
    print("Accuracy = {}".format(accuracy_score(labels, predictions)))
    print("Precision = {}".format(precision_score(labels, predictions)))
    print("Recall = {}".format(recall_score(labels, predictions)))

# Make predictions and print evaluation metrics
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)
