In [6]:
import os
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

Matplotlib is building the font cache; this may take a moment.


In [9]:
class LSTMAutoencoder:
    def __init__(self, timesteps=10, features=18):
        self.timesteps = timesteps
        self.features = features
        self.scaler = StandardScaler()
        self.model = None
        self.threshold = None  

    def create_sequences(self, data):
        """Creates overlapping sequences from data"""
        sequences = [data[i:i + self.timesteps] for i in range(len(data) - self.timesteps + 1)]
        return np.array(sequences)

    def build_model(self):
        """Defines the LSTM autoencoder architecture"""
        inputs = keras.Input(shape=(self.timesteps, self.features))
        x = layers.LSTM(24, activation='tanh', return_sequences=True)(inputs)
        encoded = layers.LSTM(6, activation='tanh', return_sequences=False)(x)
        x = layers.RepeatVector(self.timesteps)(encoded)
        x = layers.LSTM(6, activation='tanh', return_sequences=True)(x)
        x = layers.LSTM(24, activation='tanh', return_sequences=True)(x)
        decoded = layers.TimeDistributed(layers.Dense(self.features))(x)

        model = keras.Model(inputs, decoded)
        model.compile(optimizer='adam', loss='mae')
        return model  

    def preprocess(self, data):
        """Scales and converts data into sequences"""
        if isinstance(data, pd.DataFrame):
            data = data.values
        data = self.scaler.fit_transform(data)
        return self.create_sequences(data)

    def train(self, data, epochs=50, batch=64, val_split=0.2):
        """Trains the LSTM autoencoder"""
        X = self.preprocess(data)
        X_train, X_val = train_test_split(X, test_size=val_split, shuffle=False)

        self.model = self.build_model()

        early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

        start = time.time()
        history = self.model.fit(X_train, X_train, 
            epochs=epochs, batch_size=batch,
            validation_data=(X_val, X_val), 
            callbacks=[early_stop], shuffle=True)
        print(f"Training Time: {time.time() - start:.2f}s")

        # Compute threshold dynamically without the typical "mean + 3*std"
        val_errors = np.abs(self.model.predict(X_val) - X_val).mean(axis=(1, 2))
        self.threshold = np.percentile(val_errors, 95)  

        # Plot training loss
        plt.plot(history.history['loss'], label='Train Loss')
        plt.plot(history.history['val_loss'], label='Val Loss')
        plt.legend()
        plt.show()

    def detect(self, data):
        """Detects anomalies"""
        if self.model is None or self.threshold is None:
            raise RuntimeError("Train the model first.")
        
        sequences = self.preprocess(data)
        errors = np.abs(self.model.predict(sequences) - sequences).mean(axis=(1, 2))
        return errors > self.threshold

In [None]:
def load_csv(filepath, columns):
    """Loads CSV and extracts required columns"""
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File {filepath} not found.")
    
    df = pd.read_csv(filepath)
    return df[columns] if set(columns).issubset(df.columns) else None

NORMAL_DATA_PATH = '/normal_data.csv'
ANOMALY_DATA_PATH = '/anomaly_data.csv'

feature_columns = [
    f'actual_q_{i}' for i in range(6)] + \
    [f'actual_qd_{i}' for i in range(6)] + \
    [f'actual_current_{i}' for i in range(6)]

detector = LSTMAutoencoder(
    n_timesteps=10, 
    n_features=len(feature_columns)
)

try:
    # Load and train on normal data
    normal_data = load_data(NORMAL_DATA_PATH, feature_columns)
    detector.train(normal_data)
    
    # Detect anomaly robot joint
    anomaly_data = load_data(ANOMALY_DATA_PATH, feature_columns)
    anomalies = detector.detect_anomalies(anomaly_data)
    
    print(f"Detected {anomalies.sum()} anomalies out of {len(anomalies)} sequences")

except Exception as e:
    print(f"An error occurred: {e}")