# Vanilla LSTM

2026-01-28

Prediction-based LSTM: uses past N timesteps to predict next value. High prediction error = anomaly.
Comparing full-size (100 units, SKAB config) vs tiny (16 units) for edge deployment.

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
import pickle
from pathlib import Path
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, confusion_matrix
from tensorflow.keras.layers import LSTM, Dense, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import tensorflow as tf

import warnings
warnings.filterwarnings('ignore')

plt.rcParams['figure.figsize'] = (14, 4)

In [2]:
data_dir = Path('../data/raw/SKAB/data')

files = []
for folder in ['valve1', 'valve2', 'other']:
    files.extend(sorted((data_dir / folder).glob('*.csv')))

feature_cols = ['Accelerometer1RMS', 'Accelerometer2RMS', 'Current', 'Pressure',
                'Temperature', 'Thermocouple', 'Voltage', 'Volume Flow RateRMS']

len(files)

34

In [3]:
# SKAB params
TRAIN_SIZE = 400
N_STEPS = 5
EPOCHS = 25
BATCH_SIZE = 32
VAL_SPLIT = 0.2
Q = 0.99

def split_sequences(sequences, n_steps):
    X, y = [], []
    for i in range(len(sequences) - n_steps):
        X.append(sequences[i:i+n_steps, :])
        y.append(sequences[i+n_steps, :])
    return np.array(X), np.array(y)

def build_lstm(n_steps, n_features, units):
    model = Sequential([
        Input(shape=(n_steps, n_features)),
        LSTM(units, activation='relu', return_sequences=True),
        LSTM(units, activation='relu'),
        Dense(n_features)
    ])
    model.compile(optimizer='adam', loss='mae', metrics=['mse'])
    return model

In [4]:
def evaluate_lstm(files, units, verbose=True):
    all_y_true = []
    all_y_pred = []
    
    for i, f in enumerate(files):
        df = pd.read_csv(f, sep=';', parse_dates=['datetime'], index_col='datetime')
        X = df[feature_cols].values
        y = df['anomaly'].values
        
        X_train_raw, X_test_raw = X[:TRAIN_SIZE], X[TRAIN_SIZE:]
        y_test = y[TRAIN_SIZE:]
        
        scaler = StandardScaler()
        X_train_sc = scaler.fit_transform(X_train_raw)
        X_test_sc = scaler.transform(X_test_raw)
        
        X_tr, y_tr = split_sequences(X_train_sc, N_STEPS)
        X_te, y_te_pred = split_sequences(X_test_sc, N_STEPS)
        
        tf.random.set_seed(0)
        np.random.seed(0)
        model = build_lstm(N_STEPS, len(feature_cols), units)
        model.fit(X_tr, y_tr, validation_split=VAL_SPLIT, epochs=EPOCHS, batch_size=BATCH_SIZE,
                  verbose=0, shuffle=False,
                  callbacks=[EarlyStopping(patience=10, verbose=0),
                            ReduceLROnPlateau(factor=0.1, patience=5, min_lr=0.0001, verbose=0)])
        
        train_pred = model.predict(X_tr, verbose=0)
        residuals_train = pd.DataFrame(np.abs(y_tr - train_pred)).sum(axis=1)
        UCL = residuals_train.quantile(Q) * 5
        
        test_pred = model.predict(X_te, verbose=0)
        residuals_test = pd.DataFrame(np.abs(y_te_pred - test_pred)).sum(axis=1)
        pred = (residuals_test > UCL).astype(int).values
        
        y_test_aligned = y_test[N_STEPS:]
        all_y_true.extend(y_test_aligned)
        all_y_pred.extend(pred)
        
        if verbose and (i+1) % 10 == 0:
            print(f'{i+1}/{len(files)} files')
    
    y_true = np.array(all_y_true)
    y_pred = np.array(all_y_pred)
    
    f1 = f1_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()
    far = fp / (fp + tn) * 100
    mar = fn / (fn + tp) * 100
    size_kb = len(pickle.dumps(model)) / 1024
    
    return {'f1': f1, 'far': far, 'mar': mar, 'size_kb': size_kb, 'model': model}

## Full-size (100 units)

In [5]:
results_full = evaluate_lstm(files, units=100)
print(f"F1:  {results_full['f1']:.2f}  (SKAB: 0.54)")
print(f"FAR: {results_full['far']:.2f}% (SKAB: 12.54%)")
print(f"MAR: {results_full['mar']:.2f}% (SKAB: 59.53%)")
print(f"Size: {results_full['size_kb']:.0f} KB")

10/34 files
20/34 files
30/34 files
F1:  0.48  (SKAB: 0.54)
FAR: 10.93% (SKAB: 12.54%)
MAR: 65.41% (SKAB: 59.53%)
Size: 1494 KB


## Tiny (16 units)

In [6]:
results_tiny = evaluate_lstm(files, units=16)
print(f"F1:  {results_tiny['f1']:.2f}  (full: {results_full['f1']:.2f})")
print(f"FAR: {results_tiny['far']:.2f}% (full: {results_full['far']:.2f}%)")
print(f"MAR: {results_tiny['mar']:.2f}% (full: {results_full['mar']:.2f}%)")
print(f"Size: {results_tiny['size_kb']:.0f} KB (full: {results_full['size_kb']:.0f} KB)")

10/34 files
20/34 files
30/34 files
F1:  0.37  (full: 0.48)
FAR: 3.06% (full: 10.93%)
MAR: 76.99% (full: 65.41%)
Size: 77 KB (full: 1494 KB)


## Comparison

In [7]:
comparison = pd.DataFrame({
    'Model': ['Full (100 units)', 'Tiny (16 units)'],
    'F1': [results_full['f1'], results_tiny['f1']],
    'FAR %': [results_full['far'], results_tiny['far']],
    'MAR %': [results_full['mar'], results_tiny['mar']],
    'Size KB': [results_full['size_kb'], results_tiny['size_kb']]
})
comparison

Unnamed: 0,Model,F1,FAR %,MAR %,Size KB
0,Full (100 units),0.480784,10.934192,65.408115,1494.289062
1,Tiny (16 units),0.366324,3.055683,76.993577,76.786133


In [8]:
f1_drop = (results_full['f1'] - results_tiny['f1']) / results_full['f1'] * 100
size_reduction = (results_full['size_kb'] - results_tiny['size_kb']) / results_full['size_kb'] * 100
print(f"F1 drop: {f1_drop:.1f}%")
print(f"Size reduction: {size_reduction:.1f}%")

F1 drop: 23.8%
Size reduction: 94.9%


## Latency

In [9]:
# prepare sample data
sample = pd.read_csv(files[5], sep=';', parse_dates=['datetime'], index_col='datetime')
X_s = sample[feature_cols].values
sc = StandardScaler()
X_sc = sc.fit_transform(X_s[:TRAIN_SIZE])
X_seq, _ = split_sequences(X_sc, N_STEPS)
single = X_seq[:1]

# train quick models for latency test
tf.random.set_seed(0)
m_full = build_lstm(N_STEPS, 8, 100)
m_full.fit(X_seq, X_seq[:, -1, :], epochs=3, verbose=0)

tf.random.set_seed(0)
m_tiny = build_lstm(N_STEPS, 8, 16)
m_tiny.fit(X_seq, X_seq[:, -1, :], epochs=3, verbose=0)

# warmup
m_full.predict(single, verbose=0)
m_tiny.predict(single, verbose=0)

# full
times = []
for _ in range(500):
    t0 = time.perf_counter()
    m_full.predict(single, verbose=0)
    times.append(time.perf_counter() - t0)
lat_full = np.median(times) * 1000

# tiny
times = []
for _ in range(500):
    t0 = time.perf_counter()
    m_tiny.predict(single, verbose=0)
    times.append(time.perf_counter() - t0)
lat_tiny = np.median(times) * 1000

print(f"Full (100 units): {lat_full:.1f} ms")
print(f"Tiny (16 units): {lat_tiny:.1f} ms")

Full (100 units): 55.8 ms
Tiny (16 units): 67.9 ms


---

Shrinking 100â†’16 units: 95% size reduction, 26% F1 drop. Latency similar due to Keras overhead.
TF Lite deployment should show bigger latency difference.