# LSTM-based Anomaly Detection for Time Series Data

This notebook demonstrates the end-to-end process of building an optimized anomaly detection system using an LSTM Autoencoder.

## 1. Setup and Data Generation

In [1]:
import numpy as np
import pandas as pd
import os

def generate_synthetic_cmapss(filename, num_units=10, max_cycles=200):
    np.random.seed(42)
    data = []
    
    for unit_id in range(1, num_units + 1):
        # Each unit has a random lifespan between 150 and max_cycles
        lifespan = np.random.randint(150, max_cycles + 1)
        
        for cycle in range(1, lifespan + 1):
 
            s1 = 518.67  # constant
 
            s2 = 641.81 - 5 * (cycle / lifespan) + np.random.normal(0, 0.1)

            s3 = 1589.70 + np.random.normal(0, 0.5) # noise
 
            trend = (cycle / lifespan) ** 2
 
            s4 = 1400.60 + 40 * trend + np.random.normal(0, 0.2)
   
            s5 = 14.62
 
            s6 = 21.61 + 2 * trend + np.random.normal(0, 0.01)
 
            set1 = np.random.normal(0, 0.001)
            set2 = np.random.normal(0, 0.001)
            set3 = 100.0
            
            row = [unit_id, cycle, set1, set2, set3, s1, s2, s3, s4, s5, s6]
            data.append(row)
            
    columns = ['unit_id', 'cycle', 'setting1', 'setting2', 'setting3', 
               's1', 's2', 's3', 's4', 's5', 's6']
    df = pd.DataFrame(data, columns=columns)
    df.to_csv(filename, sep=' ', index=False, header=False)
    print(f"Generated {filename}")

if __name__ == "__main__":
    os.makedirs('data/cmapss', exist_ok=True)
    generate_synthetic_cmapss('data/cmapss/train_FD001.txt', num_units=100)
    generate_synthetic_cmapss('data/cmapss/test_FD001.txt', num_units=20)

print("Data generation complete.")

Generated data/cmapss/train_FD001.txt
Generated data/cmapss/test_FD001.txt
Data generation complete.


## 2. Preprocessing
We split the data into sequences. Crucially, for training, we only use the first 80% of each unit's life to ensure the model learns 'healthy' patterns only.

In [2]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import joblib
import os

def load_data(file_path):
    columns = ['unit_id', 'cycle', 'setting1', 'setting2', 'setting3', 
               's1', 's2', 's3', 's4', 's5', 's6']
    df = pd.read_csv(file_path, sep=' ', header=None, names=columns)
    return df

def preprocess_data(train_df, test_df):
    # Select features (excluding unit_id and cycle for training input)
    features = ['setting1', 'setting2', 'setting3', 's1', 's2', 's3', 's4', 's5', 's6']
    
    scaler = MinMaxScaler()
    train_df[features] = scaler.fit_transform(train_df[features])
    test_df[features] = scaler.transform(test_df[features])
    
    # Save scaler for later use in dashboard
    os.makedirs('models', exist_ok=True)
    joblib.dump(scaler, 'models/scaler.joblib')
    
    return train_df, test_df, features

def create_sequences(data, seq_length, training_mode=False):
    sequences = []
    for unit_id in data['unit_id'].unique():
        unit_data_df = data[data['unit_id'] == unit_id]
        
        # If training, only use the first 80% of data (assumed healthy)
        if training_mode:
            cutoff = int(len(unit_data_df) * 0.80)
            unit_data = unit_data_df.iloc[:cutoff, 2:].values
        else:
            unit_data = unit_data_df.iloc[:, 2:].values
            
        if len(unit_data) >= seq_length:
            for i in range(len(unit_data) - seq_length + 1):
                sequences.append(unit_data[i:i+seq_length])
    return np.array(sequences)

if __name__ == "__main__":
    train_df = load_data('data/cmapss/train_FD001.txt')
    test_df = load_data('data/cmapss/test_FD001.txt')
    
    train_df, test_df, features = preprocess_data(train_df, test_df)
    
    SEQ_LENGTH = 10
    # Enable training_mode=True for X_train to learn only from healthy data
    X_train = create_sequences(train_df, SEQ_LENGTH, training_mode=True)
    X_test = create_sequences(test_df, SEQ_LENGTH, training_mode=False)
    
    print(f"X_train shape: {X_train.shape}")
    print(f"X_test shape: {X_test.shape}")
    
    np.save('data/X_train.npy', X_train)
    np.save('data/X_test.npy', X_test)


X_train shape: (13061, 10, 9)
X_test shape: (3396, 10, 9)


## 3. Model Training
We train an LSTM Autoencoder with Dropout layers to prevent overfitting.

In [3]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, RepeatVector, TimeDistributed, Dropout

def build_lstm_autoencoder(seq_length, num_features):
    model = Sequential([
        # Encoder
        LSTM(64, activation='relu', input_shape=(seq_length, num_features), return_sequences=True),
        Dropout(0.2),
        LSTM(32, activation='relu', return_sequences=False),
        Dropout(0.2),
        RepeatVector(seq_length),
        # Decoder
        LSTM(32, activation='relu', return_sequences=True),
        Dropout(0.2),
        LSTM(64, activation='relu', return_sequences=True),
        Dropout(0.2),
        TimeDistributed(Dense(num_features))
    ])
    model.compile(optimizer='adam', loss='mae')
    return model

model = build_lstm_autoencoder(10, 9)
model.summary()


  super().__init__(**kwargs)


In [4]:
def train():
    X_train = np.load('data/X_train.npy')
    
    seq_length = X_train.shape[1]
    num_features = X_train.shape[2]
    
    model = build_lstm_autoencoder(seq_length, num_features)
    
    # Early stopping to prevent overfitting
    early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, mode='min')
    
    history = model.fit(
        X_train, X_train,
        epochs=50,
        batch_size=32,
        validation_split=0.1,
        callbacks=[early_stop]
    )
    
    os.makedirs('models', exist_ok=True)
    model.save('models/lstm_autoencoder.keras')
    print("Model saved to models/lstm_autoencoder.keras")
    
    return history

if __name__ == "__main__":
    train()


Epoch 1/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 17ms/step - loss: 0.1378 - val_loss: 0.0404
Epoch 2/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 15ms/step - loss: 0.0483 - val_loss: 0.0386
Epoch 3/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.0444 - val_loss: 0.0370
Epoch 4/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.0428 - val_loss: 0.0382
Epoch 5/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.0418 - val_loss: 0.0376
Epoch 6/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.0411 - val_loss: 0.0388
Epoch 7/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.0406 - val_loss: 0.0380
Epoch 8/50
[1m368/368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.0402 - val_loss: 0.0384
Model saved to models/lstm_auto

## 4. Anomaly Detection
We calculate the reconstruction error and determine a threshold based on the 99th percentile of the training error.

In [5]:
import numpy as np
import tensorflow as tf
import os

def calculate_anomaly_scores(model, data):
    reconstructions = model.predict(data)
    # Mean Absolute Error per sequence
    mse = np.mean(np.abs(reconstructions - data), axis=(1, 2))
    return mse

def find_threshold(model, train_data, percentile=99):
    mse = calculate_anomaly_scores(model, train_data)
    threshold = np.percentile(mse, percentile)
    return threshold

if __name__ == "__main__":
    X_train = np.load('data/X_train.npy')
    X_test = np.load('data/X_test.npy')
    
    model = tf.keras.models.load_model('models/lstm_autoencoder.keras')
    
    threshold = find_threshold(model, X_train)
    print(f"Calculated threshold: {threshold}")
    
    test_mse = calculate_anomaly_scores(model, X_test)
    anomalies = test_mse > threshold
    print(f"Number of anomalies detected in test set: {np.sum(anomalies)} out of {len(X_test)}")
    
    np.save('models/threshold.npy', np.array([threshold]))


[1m409/409[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 6ms/step
Calculated threshold: 0.05056601769378456
[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step
Number of anomalies detected in test set: 661 out of 3396


## 5. Evaluation
Evaluating Precision, Recall, and F1 Score on the test set.

In [8]:
import numpy as np
import pandas as pd
from sklearn.metrics import precision_score, recall_score, f1_score
import tensorflow as tf

def evaluate():
    X_test = np.load('data/X_test.npy')
    threshold = np.load('models/threshold.npy')[0]
    model = tf.keras.models.load_model('models/lstm_autoencoder.keras')
    
    reconstructions = model.predict(X_test)
    mse = np.mean(np.abs(reconstructions - X_test), axis=(1, 2))
    predictions = mse > threshold
    
    # Define ground truth: For synthetic data, let's say the last 20% of cycles for each unit are anomalies
    # We need to map sequences back to their units to do this properly, 
    # but for a simple evaluation, we can estimate.
    
    # Load original test data to get unit info
    columns = ['unit_id', 'cycle', 'setting1', 'setting2', 'setting3', 
               's1', 's2', 's3', 's4', 's5', 's6']
    test_df = pd.read_csv('data/cmapss/test_FD001.txt', sep=' ', header=None, names=columns)
    
    seq_length = 10
    gt_anomalies = []
    
    for unit_id in test_df['unit_id'].unique():
        unit_data = test_df[test_df['unit_id'] == unit_id]
        lifespan = len(unit_data)
        # Mark last 15% as anomalous
        anomaly_start = int(lifespan * 0.85)
        
        unit_gt = [1 if i >= anomaly_start else 0 for i in range(lifespan)]
        # sequences start from 0 to lifespan-seq_length
        if lifespan >= seq_length:
            for i in range(lifespan - seq_length + 1):
                # sequence is anomalous if its last point is in the anomaly zone
                gt_anomalies.append(unit_gt[i + seq_length - 1])
                
    gt_anomalies = np.array(gt_anomalies)
    
    # Align lengths if necessary (should match if logic is correct)
    min_len = min(len(predictions), len(gt_anomalies))
    predictions = predictions[:min_len]
    gt_anomalies = gt_anomalies[:min_len]
    
    precision = precision_score(gt_anomalies, predictions)
    recall = recall_score(gt_anomalies, predictions)
    f1 = f1_score(gt_anomalies, predictions)
    
    print(f"\nPrecision Value: {precision:.4f}")
    print(f"Recall Value: {recall:.4f}")
    print(f"F1 Score Value: {f1:.4f}")

if __name__ == "__main__":
    evaluate()


[1m107/107[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 13ms/step

Precision Value: 0.8260
Recall Value: 0.9964
F1 Score Value: 0.9032
