# Anomaly Detection Model Training for Acoustic Data
#
This notebook trains and evaluates three different models for anomaly detection using your audio dataset:
#
- **K-means Clustering:** An unsupervised method trained on normal data only. Anomaly scores are computed as the distance to the learned cluster center.
- **Convolutional Autoencoder:** An unsupervised deep learning approach where a higher reconstruction error indicates an anomaly.
- **Support Vector Machine (SVM):** A supervised classifier trained on both normal and anomalous features.
- **Gaussian Mixture Model (GMM):** Fits a probabilistic model to normal data. Test samples
   with very low likelihood (log-probability) are flagged as anomalies.
- **LSTM Autoencoder:** Groups consecutive sliding-window feature vectors into sequences. An LSTM
   autoencoder is trained on normal sequences; high reconstruction error on a test sequence suggests an anomaly.
- **GAN-based Anomaly Detection:** A GAN is trained on normal data. After training, the discriminator’s
   output (interpreted as the probability that a sample is normal) is used to compute an anomaly score.
#
Evaluation metrics include accuracy, precision, recall, F1 score, and ROC curves.
#
**Dataset:** We use two recordings—one "normal" and one "anomalous"—and segment each into overlapping windows to extract features.


## 1. Imports and Setup


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import librosa.display

from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                             confusion_matrix, roc_auc_score, roc_curve)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.svm import SVC

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping

from scipy.fft import fft
from matplotlib import ticker

sns.set(style="whitegrid")
plt.rcParams.update({'font.size': 10})



## 2. Data Loading and Feature Extraction
#
We load the "normal" and "anomalous" recordings and then segment each audio signal into overlapping windows.
For each window, we extract features including:
- 13 MFCCs (mean values)
- Spectral features: centroid, rolloff, contrast
- Temporal feature: zero crossing rate
#
These features form the basis for our models.


In [None]:
def load_audio_file(file_path):
    """Load a .wav file and return audio and sample rate."""
    audio, sr = librosa.load(file_path, sr=None)
    return audio, sr


def extract_features(audio, sr, n_mfcc=13):
    """Extract MFCC, spectral, and temporal features from an audio segment."""
    # MFCCs (mean for each coefficient)
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc)
    mfccs_mean = np.mean(mfccs, axis=1)

    # Spectral features
    spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=sr))
    spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio, sr=sr))
    spectral_contrast = np.mean(librosa.feature.spectral_contrast(y=audio, sr=sr))

    # Temporal feature
    zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(y=audio))

    features = np.concatenate([mfccs_mean,
                               [spectral_centroid, spectral_rolloff, spectral_contrast, zero_crossing_rate]])
    return features


def sliding_window_features(audio, sr, window_duration=1.0, hop_duration=0.5, n_mfcc=13):
    """
    Segment the audio signal into overlapping windows and extract features for each window.
    Returns a 2D array of shape (n_samples, n_features).
    """
    window_length = int(window_duration * sr)
    hop_length = int(hop_duration * sr)
    features_list = []
    for start in range(0, len(audio) - window_length, hop_length):
        window = audio[start:start + window_length]
        feat = extract_features(window, sr, n_mfcc=n_mfcc)
        features_list.append(feat)
    return np.array(features_list)


# Paths for the two recordings (update these paths if needed)
path_normal = "../../Data/raw/13_real/Normal_knackgeräusche.wav"
path_anomaly = "../../Data/raw/13_real/Anomaly_knackgeräusche.wav"

# Load recordings
audio_normal, sr_normal = load_audio_file(path_normal)
audio_anomaly, sr_anomaly = load_audio_file(path_anomaly)
assert sr_normal == sr_anomaly, "Sampling rates do not match!"

# Extract features from overlapping windows
features_normal = sliding_window_features(audio_normal, sr_normal, window_duration=1.0, hop_duration=0.5)
features_anomaly = sliding_window_features(audio_anomaly, sr_anomaly, window_duration=1.0, hop_duration=0.5)

print("Normal features shape:", features_normal.shape)
print("Anomaly features shape:", features_anomaly.shape)

# Create labels: normal = 0, anomaly = 1
labels_normal = np.zeros(features_normal.shape[0])
labels_anomaly = np.ones(features_anomaly.shape[0])

# Combine the data
X = np.vstack([features_normal, features_anomaly])
y = np.concatenate([labels_normal, labels_anomaly])


## 3. Data Preprocessing
#
We standardize the feature set and split the data into training and test sets.
For the unsupervised models (K-means and autoencoder), we use only normal data for training.


In [None]:
# Standardize the features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split into training and test sets (stratified by label)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42, stratify=y)

# For unsupervised training on normal data:
X_train_normal = X_train[y_train == 0]

print("Training set shape:", X_train.shape)
print("Test set shape:", X_test.shape)
print("Normal training set shape:", X_train_normal.shape)


## 4. Model 1: K-means Clustering (Unsupervised)
#
We train K-means clustering on the normal data (assuming a single cluster).
The anomaly score is computed as the minimum distance to the cluster center.
A threshold is set (e.g., 95th percentile of training scores) to classify anomalies.


In [None]:
# Train K-means on normal training data
n_clusters = 1  # Assume normal data is unimodal
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
kmeans.fit(X_train_normal)

# Compute anomaly scores (distance to cluster center) for training and test sets
train_scores = kmeans.transform(X_train_normal).min(axis=1)
test_scores = kmeans.transform(X_test).min(axis=1)

# Set threshold based on the 95th percentile of training anomaly scores
threshold_kmeans = np.percentile(train_scores, 95)
print("K-means threshold:", threshold_kmeans)

# Predict anomalies: if score > threshold, label as anomaly (1)
y_pred_kmeans = (test_scores > threshold_kmeans).astype(int)

# Evaluate K-means performance
accuracy_kmeans = accuracy_score(y_test, y_pred_kmeans)
precision_kmeans = precision_score(y_test, y_pred_kmeans)
recall_kmeans = recall_score(y_test, y_pred_kmeans)
f1_kmeans = f1_score(y_test, y_pred_kmeans)

print("K-means Performance:")
print("Accuracy:", accuracy_kmeans)
print("Precision:", precision_kmeans)
print("Recall:", recall_kmeans)
print("F1 Score:", f1_kmeans)


## 5. Model 2: Convolutional Autoencoder (Unsupervised)
#
We build a convolutional autoencoder that learns to reconstruct normal data.
At test time, a high reconstruction error indicates an anomaly.
#
The input is reshaped to (samples, features, 1) to be compatible with 1D convolution layers.


In [None]:
# Reshape data for the autoencoder
X_train_normal_ae = X_train_normal.reshape(-1, X_train_normal.shape[1], 1)
X_test_ae = X_test.reshape(-1, X_test.shape[1], 1)
input_shape = X_train_normal_ae.shape[1:]

# Build a simple convolutional autoencoder
input_layer = layers.Input(shape=input_shape)
x = layers.Conv1D(32, 3, activation='relu', padding='same')(input_layer)
x = layers.MaxPooling1D(2, padding='same')(x)
x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
encoded = layers.MaxPooling1D(2, padding='same')(x)

x = layers.Conv1D(16, 3, activation='relu', padding='same')(encoded)
x = layers.UpSampling1D(2)(x)
x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
x = layers.UpSampling1D(2)(x)
decoded = layers.Conv1D(1, 3, activation='sigmoid', padding='same')(x)

autoencoder = models.Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
autoencoder.summary()

# Train the autoencoder on normal data
early_stopping = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
history = autoencoder.fit(X_train_normal_ae, X_train_normal_ae,
                          epochs=50, batch_size=32,
                          validation_split=0.1, callbacks=[early_stopping], verbose=1)

# Compute reconstruction error on test set
X_test_pred = autoencoder.predict(X_test_ae)
reconstruction_error = np.mean(np.abs(X_test_pred - X_test_ae), axis=(1, 2))

# Set threshold based on normal training error (95th percentile)
X_train_pred = autoencoder.predict(X_train_normal_ae)
train_error = np.mean(np.abs(X_train_pred - X_train_normal_ae), axis=(1, 2))
threshold_ae = np.percentile(train_error, 95)
print("Autoencoder threshold:", threshold_ae)

# Predict anomalies based on reconstruction error
y_pred_ae = (reconstruction_error > threshold_ae).astype(int)

# Evaluate autoencoder performance
accuracy_ae = accuracy_score(y_test, y_pred_ae)
precision_ae = precision_score(y_test, y_pred_ae)
recall_ae = recall_score(y_test, y_pred_ae)
f1_ae = f1_score(y_test, y_pred_ae)

print("Autoencoder Performance:")
print("Accuracy:", accuracy_ae)
print("Precision:", precision_ae)
print("Recall:", recall_ae)
print("F1 Score:", f1_ae)


## 6. Model 3: Support Vector Machine (SVM) – Supervised
#
We train an SVM classifier on the standardized features.
This supervised model uses labels to learn the distinction between normal and anomalous windows.


In [None]:
# Train the SVM classifier
svm = SVC(kernel='rbf', probability=True, random_state=42)
svm.fit(X_train, y_train)

# Predict on the test set
y_pred_svm = svm.predict(X_test)

# Evaluate SVM performance
accuracy_svm = accuracy_score(y_test, y_pred_svm)
precision_svm = precision_score(y_test, y_pred_svm)
recall_svm = recall_score(y_test, y_pred_svm)
f1_svm = f1_score(y_test, y_pred_svm)

print("SVM Performance:")
print("Accuracy:", accuracy_svm)
print("Precision:", precision_svm)
print("Recall:", recall_svm)
print("F1 Score:", f1_svm)


## 7. Model 4: Gaussian Mixture Model (GMM)
#
The GMM is trained on normal data only. For each sample, the log-likelihood is computed.
Samples with log-likelihood below a threshold (set from the training distribution) are flagged as anomalies.


In [None]:
# Train a GMM on normal data
gmm = GaussianMixture(n_components=1, covariance_type='full', random_state=42)
gmm.fit(X_train_normal)

# Compute log-likelihood scores for training and test data
train_ll = gmm.score_samples(X_train_normal)  # Higher is better
test_ll = gmm.score_samples(X_test)

# Set threshold as the 5th percentile (i.e. very low likelihood indicates anomaly)
threshold_gmm = np.percentile(train_ll, 5)
print("GMM log-likelihood threshold:", threshold_gmm)

# Predict anomalies: if log-likelihood is below threshold, label as anomaly (1)
y_pred_gmm = (test_ll < threshold_gmm).astype(int)

# Evaluate GMM
accuracy_gmm = accuracy_score(y_test, y_pred_gmm)
precision_gmm = precision_score(y_test, y_pred_gmm)
recall_gmm = recall_score(y_test, y_pred_gmm)
f1_gmm = f1_score(y_test, y_pred_gmm)

print("GMM Performance:")
print("Accuracy:", accuracy_gmm)
print("Precision:", precision_gmm)
print("Recall:", recall_gmm)
print("F1 Score:", f1_gmm)



## 8. Model 5: LSTM Autoencoder
#
Since an LSTM is designed for sequential data, we group consecutive sliding-window feature vectors into sequences.
For example, with a sequence length of 10, each training sample is a 10-step sequence.
#
The LSTM autoencoder is trained on normal sequences. At test time, a high reconstruction error is an indicator of an anomaly.


In [None]:
def create_sequences(data, sequence_length=10):
    """Reshape 2D data (samples, features) into 3D sequences (n_sequences, sequence_length, features)."""
    sequences = []
    for i in range(len(data) - sequence_length + 1):
        sequences.append(data[i:i + sequence_length])
    return np.array(sequences)


sequence_length = 10
X_lstm_train = create_sequences(X_train_normal, sequence_length)
X_lstm_test = create_sequences(X_test, sequence_length)

print("LSTM training sequences shape:", X_lstm_train.shape)
print("LSTM test sequences shape:", X_lstm_test.shape)

# Build the LSTM Autoencoder
timesteps = X_lstm_train.shape[1]
feature_dim = X_lstm_train.shape[2]

input_seq = layers.Input(shape=(timesteps, feature_dim))
encoded = layers.LSTM(64, activation='relu', return_sequences=False)(input_seq)
decoded = layers.RepeatVector(timesteps)(encoded)
decoded = layers.LSTM(64, activation='relu', return_sequences=True)(decoded)
decoded = layers.TimeDistributed(layers.Dense(feature_dim))(decoded)

lstm_autoencoder = models.Model(input_seq, decoded)
lstm_autoencoder.compile(optimizer='adam', loss='mse')
lstm_autoencoder.summary()

# Train the autoencoder on normal sequences
early_stop = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
lstm_history = lstm_autoencoder.fit(X_lstm_train, X_lstm_train,
                                    epochs=50,
                                    batch_size=32,
                                    validation_split=0.1,
                                    callbacks=[early_stop],
                                    verbose=1)

# Compute reconstruction error on test sequences
X_lstm_test_pred = lstm_autoencoder.predict(X_lstm_test)
recon_error = np.mean(np.abs(X_lstm_test_pred - X_lstm_test), axis=(1, 2))

# Set threshold from the training reconstruction errors
X_lstm_train_pred = lstm_autoencoder.predict(X_lstm_train)
train_recon_error = np.mean(np.abs(X_lstm_train_pred - X_lstm_train), axis=(1, 2))
threshold_lstm = np.percentile(train_recon_error, 95)
print("LSTM AE threshold:", threshold_lstm)

# For the test set, we need labels for each sequence.
# For simplicity, assume that if any window in a sequence was anomalous, the sequence is anomalous.
# Here we approximate by taking the mode from the overlapping segments.
# (In practice, you might recompute sequences for the labeled data.)
y_test_seq = y_test[sequence_length - 1:]  # This is a rough approximation

# Predict anomalies from reconstruction error
y_pred_lstm = (recon_error > threshold_lstm).astype(int)

# Since lengths may not match exactly, evaluate on the available sequences
accuracy_lstm = accuracy_score(y_test_seq, y_pred_lstm[:len(y_test_seq)])
precision_lstm = precision_score(y_test_seq, y_pred_lstm[:len(y_test_seq)])
recall_lstm = recall_score(y_test_seq, y_pred_lstm[:len(y_test_seq)])
f1_lstm = f1_score(y_test_seq, y_pred_lstm[:len(y_test_seq)])

print("LSTM AE Performance:")
print("Accuracy:", accuracy_lstm)
print("Precision:", precision_lstm)
print("Recall:", recall_lstm)
print("F1 Score:", f1_lstm)


## 9. Model 6: GAN-based Anomaly Detection
#
We build a simple GAN with fully-connected networks.
The GAN is trained on normal features only. After training, the discriminator's output is used as
an anomaly score (lower probability indicates anomaly).
#
Here we use a simple iterative training loop.


In [None]:
# GAN parameters
latent_dim = 10
feature_dim = X_train_normal.shape[1]
batch_size = 32
epochs = 1000  # For demonstration; increase for better convergence


# Build Generator: maps latent vector to feature space
def build_generator():
    model = models.Sequential([
        layers.Dense(32, activation='relu', input_dim=latent_dim),
        layers.Dense(64, activation='relu'),
        layers.Dense(feature_dim, activation='linear')
    ])
    return model


# Build Discriminator: maps feature vector to probability of being "real" (normal)
def build_discriminator():
    model = models.Sequential([
        layers.Dense(64, activation='relu', input_dim=feature_dim),
        layers.Dense(32, activation='relu'),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=optimizers.Adam(learning_rate=0.0002), loss='binary_crossentropy')
    return model


generator = build_generator()
discriminator = build_discriminator()

# Freeze discriminator when training the GAN (generator+discriminator)
discriminator.trainable = False
gan_input = layers.Input(shape=(latent_dim,))
generated_features = generator(gan_input)
gan_output = discriminator(generated_features)
gan = models.Model(gan_input, gan_output)
gan.compile(optimizer=optimizers.Adam(learning_rate=0.0002), loss='binary_crossentropy')

# Prepare training data for GAN (normal features only)
X_gan = X_train_normal
real_labels = np.ones((X_gan.shape[0], 1))
fake_labels = np.zeros((X_gan.shape[0], 1))

# Training loop for GAN
import tqdm

n_batches = int(X_gan.shape[0] / batch_size)

for epoch in range(epochs):
    for _ in range(n_batches):
        # Train discriminator
        idx = np.random.randint(0, X_gan.shape[0], batch_size)
        real_data = X_gan[idx]

        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        fake_data = generator.predict(noise)

        d_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        d_loss_fake = discriminator.train_on_batch(fake_data, np.zeros((batch_size, 1)))
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # Train generator via GAN
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        g_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

    if epoch % 200 == 0:
        print(f"Epoch {epoch} / {epochs} - D loss: {d_loss:.4f}, G loss: {g_loss:.4f}")

# After training, use the discriminator's output as an anomaly score.
# For a given sample x, a low D(x) indicates that the sample does not resemble normal data.
# Compute anomaly scores on the test set.
d_scores = discriminator.predict(X_test)  # Probability of being normal
# Define anomaly score as: score = 1 - D(x)
anomaly_scores = 1 - d_scores.flatten()

# Set threshold based on the 95th percentile of anomaly scores for normal training data.
d_scores_train = discriminator.predict(X_train_normal)
train_anomaly_scores = 1 - d_scores_train.flatten()
threshold_gan = np.percentile(train_anomaly_scores, 95)
print("GAN-based threshold:", threshold_gan)

y_pred_gan = (anomaly_scores > threshold_gan).astype(int)

# Evaluate GAN-based anomaly detection
accuracy_gan = accuracy_score(y_test, y_pred_gan)
precision_gan = precision_score(y_test, y_pred_gan)
recall_gan = recall_score(y_test, y_pred_gan)
f1_gan = f1_score(y_test, y_pred_gan)

print("GAN-based Model Performance:")
print("Accuracy:", accuracy_gan)
print("Precision:", precision_gan)
print("Recall:", recall_gan)
print("F1 Score:", f1_gan)

## 7. Additional Evaluation: ROC Curves
#
We plot ROC curves for each model.
- For K-means, we use the distance score (higher means more anomalous).
- For the autoencoder, we use the reconstruction error.
- For the SVM, we use the probability estimates for the positive class.


In [None]:
def plot_roc(y_true, scores, model_name):
    fpr, tpr, _ = roc_curve(y_true, scores)
    auc = roc_auc_score(y_true, scores)
    plt.plot(fpr, tpr, label=f'{model_name} (AUC = {auc:.2f})')


plt.figure(figsize=(8, 6))

# K-means: use the distance scores (directly, higher is more anomalous)
plot_roc(y_test, test_scores, "K-means")
# Autoencoder: use reconstruction error
plot_roc(y_test, reconstruction_error, "Autoencoder")
# SVM: use predicted probabilities for the anomaly class
svm_probs = svm.predict_proba(X_test)[:, 1]
plot_roc(y_test, svm_probs, "SVM")

plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curves")
plt.legend()
plt.tight_layout()
plt.show()


## 8. Conclusions
#
- **K-means Clustering:** Uses a simple unsupervised approach based on the distance from the learned normal cluster.
- **Convolutional Autoencoder:** Learns a compressed representation of normal data, with reconstruction error serving as an anomaly score.
- **SVM:** Leverages labeled data in a supervised framework.
#
Evaluation metrics (accuracy, precision, recall, F1 score, ROC curves) provide insights into model performance.
#
Future work could include hyperparameter tuning, additional feature engineering, and exploring ensemble approaches to further improve detection performance.
