# CNN-LSTM Model for Anomaly Detection in Time Series Data

This notebook implements a hybrid CNN-LSTM model for detecting anomalies in time series data. The model combines convolutional layers to capture local patterns with LSTM layers to capture temporal dependencies.

The architecture is designed specifically for multi-sensor time series data from discharge experiments, where anomalies need to be detected as early as possible.

In [28]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (
    Dense, LSTM, Dropout, Masking, GlobalMaxPooling1D, Flatten, 
    BatchNormalization, Input, Conv1D, Activation, concatenate,
    MaxPooling1D, SpatialDropout1D, Attention, Bidirectional
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.regularizers import l2

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np
import re
import json
import os
import time
import datetime
import pickle
from enum import Enum
from typing import List, Dict, Any, Optional

## Define Data Classes and Helper Functions

Define the Signal, Discharge, SignalType, and DisruptionClass classes adapted from the original implementation. Include helper functions for data processing.

In [29]:
class SignalType(Enum):
    """Enum for signal types."""
    CorrientePlasma = 1,
    ModeLock = 2,
    Inductancia = 3,
    Densidad = 4,
    DerivadaEnergiaDiamagnetica = 5,
    PotenciaRadiada = 6,
    PotenciaDeEntrada = 7,

def get_signal_type(signal_type: int) -> SignalType:
    """
    Get the signal type from an integer.
    :param signal_type: The signal type as an integer.
    :return: The signal type as a SignalType enum.
    """
    return SignalType(signal_type) if signal_type in [s.value for s in SignalType] else None

class DisruptionClass(Enum):
    """Enum for disruption classes."""
    Normal = 0,
    Anomaly = 1,
    Unknown = 2,

class Signal:
    """Class representing a signal."""

    def __init__(self, label: str, times: list[float], values: list[float], signal_type: SignalType, disruption_class=DisruptionClass.Unknown):
        """
        Initialize a Signal object.
        :param label: The label of the signal. It is the file name
        :param times: The time values of the signal.
        :param values: The values of the signal.
        :param signal_type: The type of the signal.
        :param disruption_class: The class of the disruption.
        """
        self.label = label
        self.times = times
        self.values = values
        self.disruption_class = disruption_class
        self.signal_type = signal_type
        self.min = min(values) 
        self.max = max(values)

    def normalize(self, min_of_its_type: Any | None, max_of_its_type: Any | None):
        """
        Normalize the signal values to the range [0, 1]. Admits min and max values to normalize against them.
        :param min_of_its_type: The minimum value of the signal type.
        :param max_of_its_type: The maximum value of the signal type.
        """
        min_val = min_of_its_type if min_of_its_type is not None else self.min
        max_val = max_of_its_type if max_of_its_type is not None else self.max
    
        self.values = [(value - min_val) / (max_val - min_val) for value in self.values]

class Discharge:
    """Class representing a discharge."""

    def __init__(self, signals: list[Signal], disruption_class=DisruptionClass.Unknown):
        self.signals = signals
        self.disruption_class = disruption_class
        self.is_padded = False
        self.is_normalized = False

    def generate_similar_discharges(self, n: int):
        similar_discharges = []
        for _ in range(n):
            new_signals = []
            for signal in self.signals:
                new_signal = Signal(signal.label, signal.times, signal.values.copy(), signal.signal_type, signal.disruption_class)
                new_signal.values = np.random.normal(new_signal.values, 0.1).tolist()
                new_signals.append(new_signal)

            similar_discharges.append(Discharge(new_signals, self.disruption_class))
        return similar_discharges
    
    def generate_windows(self, window_size: int, step: int = 1, overlap: float = 0.5):
        """
        Generate windows from the signals in the discharge.
        :param window_size: The size of each window (number of elements).
        :param step: The step between elements within a window.
        :param overlap: The overlap between consecutive windows (as a fraction).
        :return: A list of discharges, each containing a list of windows
        """
        windowed_discharges = []
    
        # Calculate how many positions we advance when collecting window_size elements with step
        total_span = (window_size - 1) * step + 1
        
        # Calculate backtrack based on overlap
        backtrack = int(total_span * overlap)
        stride = total_span - backtrack
        
        # Every signal has the same length, so we can use the first one to calculate the max position
        min_length = len(self.signals[0].values)
        max_pos = min_length - total_span
        
        # Generate windows for all signals at the same positions
        pos = 0
        while pos <= max_pos:
            window_signals = []
            
            # Create a window for each signal type at the same position
            for signal in self.signals:
                window_times = []
                window_values = []
                
                for i in range(window_size):
                    idx = pos + i * step
                    window_times.append(signal.times[idx])
                    window_values.append(signal.values[idx])
                
                # Create window signal of the same type
                window = Signal(
                    signal.label,
                    window_times,
                    window_values,
                    signal.signal_type,
                    signal.disruption_class
                )
                window_signals.append(window)
            
            # Create a discharge containing all signal types
            windowed_discharges.append(Discharge(window_signals, self.disruption_class))
            
            # Move to next window start position with overlap
            pos += stride
        
        return windowed_discharges
    
    def shape(self) -> tuple[int, int]:
        """
        Get the shape of the discharge.
        :return: The shape of the discharge as a tuple (number of signals, number of values per signal).
        """
        return len(self.signals), len(self.signals[0].values) if self.signals else 0

def normalize_vec(list_values: list[Signal]):
    """
    Normalize a list of signals.
    :param list_values: The list of signals to normalize.
    """
    signals_normalized = []
    min_by_type = {}
    max_by_type = {}

    for signal in list_values:
        if signal.signal_type not in min_by_type:
            min_by_type[signal.signal_type] = signal.min
            max_by_type[signal.signal_type] = signal.max
        else:
            min_by_type[signal.signal_type] = min(min_by_type[signal.signal_type], signal.min)
            max_by_type[signal.signal_type] = max(max_by_type[signal.signal_type], signal.max)

    for signal in list_values:
        signal.normalize(min_by_type[signal.signal_type], max_by_type[signal.signal_type])
        signals_normalized.append(signal)

    return signals_normalized

def normalize(discharges: list[Discharge]) -> list[Discharge]:
    """
    Normalize the signals in a list of discharges.
    :param discharges: The list of discharges to normalize.
    :return: The normalized discharges.
    """
    all_signals = []
    for discharge in discharges:
        all_signals += discharge.signals

    normalized_signals = normalize_vec(all_signals)

    for discharge in discharges:
        for i, signal in enumerate(discharge.signals):
            discharge.signals[i].values = normalized_signals[i].values
        discharge.is_normalized = True

    return discharges

def are_normalized(discharges: list[Discharge]) -> bool:
    return all([discharge.is_normalized for discharge in discharges])

def pad(discharges: list[Discharge]) -> list[Discharge]:
    """
    Pad the signals with zeros in a list of discharges to the same length.
    :param discharges: The list of discharges to pad.
    :return: The padded discharges.
    """
    max_length = max([len(signal.values) for discharge in discharges for signal in discharge.signals])
    for discharge in discharges:
        for signal in discharge.signals:
            if len(signal.values) < max_length:
                signal.values += [0] * (max_length - len(signal.values))
        discharge.is_padded = True
    return discharges

def are_padded(discharges: list[Discharge]) -> bool:
    """
    Check if the signals in a list of discharges are padded.
    :param discharges: The list of discharges to check.
    :return: True if the signals are padded, False otherwise.
    """
    return all([discharge.is_padded for discharge in discharges])

def get_X_y(discharges: list[Discharge]) -> tuple[list[list[float]], list[int]]:
    """
    Get the X and y values from a list of discharges.
    They are parallel lists, where X is the list of signals and y is the list of disruption classes.
    :param discharges: The list of discharges to get the X and y values from.
    :return: The X and y values.
    """
    if not are_normalized(discharges):
        discharges = normalize(discharges)
    
    X = [[signal.values for signal in discharge.signals] for discharge in discharges]
    y = [discharge.disruption_class.value for discharge in discharges]

    return X, y

# Pattern for extracting sensor ID from filename
PATTERN = "DES_(\\d+)_(\\d+)"

def get_sensor_id(filename: str) -> str:
    """Extract sensor ID from signal file name"""
    match = re.match(PATTERN, filename)
    if match:
        return match.group(2)
    else:
        raise ValueError(f"Invalid signal file name format: {filename}")

## Data Loading from JSON

Create functions to load discharge data from JSON files instead of receiving it via HTTP requests.

In [None]:
def load_data_from_json(json_path):
    """
    Load discharge data from a JSON file.
    """
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    discharges = []
    
    for discharge_data in data['discharges']:
        signals = []
        
        for signal_data in discharge_data['signals']:
            signals.append(
                Signal(
                    label=signal_data['fileName'],
                    times=signal_data.get('times', discharge_data.get('times', [])),
                    values=signal_data['values'],
                    signal_type=get_signal_type(int(get_sensor_id(signal_data['fileName']))),
                    disruption_class=DisruptionClass.Anomaly if discharge_data.get('anomalyTime') else DisruptionClass.Normal
                )
            )
            
        discharges.append(
            Discharge(
                signals=signals,
                disruption_class=DisruptionClass.Anomaly if discharge_data.get('anomalyTime') else DisruptionClass.Normal
            )
        )
    
    return discharges

FileNotFoundError: [Errno 2] No such file or directory: 'todas menos la 70 y la 30.json'

## Data Preprocessing

Implement preprocessing steps including normalization, windowing, and train-test splitting. Prepare data in the correct format for the CNN-LSTM model.

In [26]:
def preprocess_data(discharges, window_size=500, step=1, overlap=0.5, augment=False, augment_factor=None):
    """
    Preprocess discharge data for model training:
    1. Optional data augmentation
    2. Windowing
    3. Normalization
    4. Train-test split
    
    Returns X_train, X_val, y_train, y_val ready for model training
    """
    # Data augmentation (optional)
    if augment and len(discharges) < 10:
        n_discharges = len(discharges)
        i = 0
        while i < n_discharges:
            d = discharges[i]
            extended = d.generate_similar_discharges(augment_factor)
            discharges.extend(extended)
            i += 1
    
    # Generate windowed data
    windowed_discharges = []
    for discharge in discharges:
        windowed_discharges.extend(
            discharge.generate_windows(
                window_size=window_size, 
                step=step, 
                overlap=overlap
            )
        )
    
    discharges = windowed_discharges
    print(f"Number of discharges after windowing: {len(discharges)}")
    
    # Get discharge shapes and classes for debugging
    for i, discharge in enumerate(discharges):
        print(f"Discharge {i} shape: {discharge.shape()}, class: {discharge.disruption_class}")
    
    # Convert to model input format
    X, y = get_X_y(discharges)
    
    # Transpose X to have shape (num_discharges, time_steps, features)
    X = np.array([np.array(signal).T for signal in X])
    y = np.array(y)
    
    print(f"X shape: {X.shape}, y shape: {y.shape}")
    
    # Split data
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, shuffle=True, stratify=y
    )
    
    return X_train, X_val, y_train, y_val

## Model Architecture

Define the CNN-LSTM model architecture with convolutional layers, bidirectional LSTM, attention mechanism, and output layers.

In [31]:
def create_model():
    """
    Create a CNN-LSTM model for anomaly detection
    
    Architecture overview:
    1. Input layer accepting time series data with 7 signal types
    2. Parallel convolutional layers with different kernel sizes to capture patterns at different scales
    3. Batch normalization, activation, and spatial dropout for regularization
    4. Bidirectional LSTM layer with attention mechanism
    5. Concatenation of CNN and LSTM features
    6. Dense output layer with sigmoid activation for binary classification
    
    Returns compiled model ready for training
    """
    # Input layer
    inputs = Input(shape=(None, 7), name="input_layer")
    
    # Parallel convolutional layers with different kernel sizes
    c3 = Conv1D(64, kernel_size=3, padding='same', activation='relu')(inputs)
    c5 = Conv1D(64, kernel_size=5, padding='same', activation='relu')(inputs)
    c7 = Conv1D(64, kernel_size=7, padding='same', activation='relu')(inputs)
    cnn = concatenate([c3, c5, c7], axis=-1, name="concat_conv")
    
    # Batch normalization, activation, dropout and pooling
    cnn = BatchNormalization(name="bn1")(cnn)
    cnn = Activation('relu', name="act1")(cnn)
    cnn = SpatialDropout1D(0.3, name="spatial_dropout1")(cnn)
    cnn = MaxPooling1D(pool_size=2, name="maxpool1")(cnn)
    
    # Global features from CNN branch
    cnn_branch = GlobalMaxPooling1D(name="global_max_pooling")(cnn)
    
    # Bidirectional LSTM layer
    lstm = Bidirectional(
        LSTM(
            units=32,
            dropout=0.2,
            recurrent_dropout=0.2,
            kernel_regularizer=l2(1e-4),
            return_sequences=True,
            name="lstm_layer"
        ), name="lstm_bidir"
    )(cnn)
    
    # Attention mechanism
    context = Attention()([lstm, lstm])
    attn_out = GlobalMaxPooling1D()(context)
    
    # Merge CNN and attention features
    merged = concatenate([cnn_branch, attn_out], name="concat")
    
    # Output layer
    outputs = Dense(1, activation='sigmoid', name="output_layer")(merged)
    
    # Create and compile model
    model = Model(inputs, outputs, name="CNN_LSTM_Model")
    model.compile(
        optimizer=Adam(learning_rate=0.001, clipnorm=1.0),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Create the model
model = create_model()
model.summary()

## Training Configuration

Set up training parameters, callbacks for early stopping and learning rate reduction, and other configuration options.

In [32]:
def configure_training(model_path="cnn_lstm_model.keras"):
    """
    Configure training parameters and callbacks
    
    Returns a dictionary with training configuration
    """
    # Callbacks for training
    callbacks = [
        # Stop training when validation loss stops improving
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        
        # Reduce learning rate when validation loss plateaus
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            verbose=1
        ),
        
        # Save the best model
        ModelCheckpoint(
            model_path,
            monitor='val_loss',
            save_best_only=True,
            verbose=1
        )
    ]
    
    # Training parameters
    training_config = {
        'epochs': 100,
        'batch_size': 2,
        'shuffle': True,
        'callbacks': callbacks,
        'model_path': model_path
    }
    
    return training_config

# Get training configuration
training_config = configure_training()
print("Training configuration:")
for k, v in training_config.items():
    if k != 'callbacks':
        print(f"  {k}: {v}")
print("  callbacks: EarlyStopping, ReduceLROnPlateau, ModelCheckpoint")

Training configuration:
  epochs: 100
  batch_size: 2
  shuffle: True
  model_path: cnn_lstm_model.keras
  callbacks: EarlyStopping, ReduceLROnPlateau, ModelCheckpoint


## Model Training

Train the model with the prepared data and visualize the training process with loss and accuracy plots.

## Visualization and Evaluation

Evaluate the model performance on validation data and create visualizations to understand the model's behavior.

In [None]:
# Plot training history
plt.figure(figsize=(14, 5))

# Plot training & validation loss
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot training & validation accuracy
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

# Evaluate model on validation data
val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=1)
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_accuracy:.4f}")

# Make predictions and calculate some metrics
y_pred_proba = model.predict(X_val)
y_pred = (y_pred_proba > 0.5).astype("int32")

# Confusion matrix
from sklearn.metrics import confusion_matrix, classification_report

cm = confusion_matrix(y_val, y_pred)
print("Confusion Matrix:")
print(cm)

# Classification report
print("\nClassification Report:")
print(classification_report(y_val, y_pred))

# Plot confusion matrix
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
classes = ['Normal', 'Anomaly']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes)
plt.yticks(tick_marks, classes)

# Add text annotations to confusion matrix
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, format(cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
plt.show()

## Save Model

Save the trained model to an .h5 file for later use.

In [8]:
model_path = training_config['model_path']
model.save(model_path)
print(f"Model saved to {model_path}")

# Function to load the model later
def load_model(model_path='cnn_lstm_model.keras'):
    """Load a saved model"""
    if model_path.endswith('_pickle.h5'):
        # Load using pickle
        with open(model_path, 'rb') as f:
            loaded_model = pickle.load(f)
    else:
        # Load using Keras
        loaded_model = tf.keras.models.load_model(model_path)
    
    return loaded_model


Model saved to cnn_lstm_model.keras


## Conclusion

We've successfully built and trained a CNN-LSTM model for anomaly detection in time series data from discharge experiments. The model combines:

1. Convolutional layers with multiple kernel sizes to capture local patterns at different scales
2. Bidirectional LSTM with attention mechanism to capture temporal dependencies
3. Dropout and batch normalization for regularization to prevent overfitting

The model can be deployed to detect anomalies in new discharge data. For production use, you might want to:

1. Perform hyperparameter tuning to optimize performance
2. Implement cross-validation for more robust evaluation
3. Consider data augmentation techniques for small datasets
4. Add explainability methods to understand model decisions