# Transformers for Peak Detection

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import math
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
from tensorflow.keras.optimizers import Adam

from sklearn.metrics import accuracy_score, f1_score
plt.style.use("seaborn-v0_8")
from sklearn.preprocessing import MinMaxScaler

In [None]:
not_non_features = [
       'LOAD|1', 'LOAD|2', 'LOAD|3', 'LOAD|6',
       'ENC_POS|1', 'ENC_POS|2', 'ENC_POS|3','ENC_POS|6',
       'CTRL_DIFF2|1', 'CTRL_DIFF2|2', 'CTRL_DIFF2|3', 'CTRL_DIFF2|6',
       'TORQUE|1', 'TORQUE|2', 'TORQUE|3', 'TORQUE|6',
       'DES_POS|1', 'DES_POS|2', 'DES_POS|3', 'DES_POS|6',

       #'CURRENT|1', 'CURRENT|2', 'CURRENT|3',


       'CTRL_DIFF|1', 'CTRL_DIFF|2', 'CTRL_DIFF|3' ,'CTRL_DIFF|6',
       'CTRL_POS|1', 'CTRL_POS|2', 'CTRL_POS|3', 'CTRL_POS|6',
       'VEL_FFW|1', 'VEL_FFW|2','VEL_FFW|3', 'VEL_FFW|6',
       #'POWER|1','POWER|2', 'POWER|3', 'POWER|6',
       'CONT_DEV|1','CONT_DEV|2', 'CONT_DEV|3', 'CONT_DEV|6',
       #'A_DBD|0',
       'CMD_SPEED|1', 'CMD_SPEED|2', 'CMD_SPEED|3', 'CMD_SPEED|6',
       'TORQUE_FFW|1', 'TORQUE_FFW|2', 'TORQUE_FFW|3', 'TORQUE_FFW|6',
       'ENC1_POS|1', 'ENC1_POS|2', 'ENC1_POS|3','ENC1_POS|6',
       'ENC2_POS|1', 'ENC2_POS|2', 'ENC2_POS|3', 'ENC2_POS|6']
target = 'CURRENT|6'

## Model Implementation

In [None]:
# 🧠 hyperparameters
FEATS = 1                  # Input feature dimension
WINDOW_SIZE = 10           # Sequence length
D_MODEL = 8               # Model dimension (used for projecting input)
DIM_FEEDFORWARD = 64       # Hidden units in FFN
NUM_HEADS = 1              # Attention heads (must divide D_MODEL)
NUM_ENCODER_LAYERS = 1     # Transformer encoder layers
NUM_DECODER_LAYERS = 1     # Transformer decoder layers
DROPOUT = 0.1              # Dropout rate

# ----------- 1. Load CSV and Config ----------- #
csv_path = "/content/dataset_20.csv"         # 🔁 Change this to your file
target_col = "CURRENT|6"               # 🔁 Column name for univariate time series
window_size = 30
batch_size = 128
num_epochs = 100
lr = 0.001

print("\nDONE - Load CSV and Config")



DONE - Load CSV and Config


In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=DROPOUT, max_len=500):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Create sinusoidal positional encoding matrix
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sine to even indices, cosine to odd indices
        pe[:, 0::2] = torch.sin(pos * div_term)
        pe[:, 1::2] = torch.cos(pos * div_term[:(d_model // 2 + 1)])  # handles both even/odd d_model

        pe = pe.unsqueeze(0)  # shape: (1, max_len, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # Add positional encoding to input tensor
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)


class PeakAD_Transformer(nn.Module):
    def __init__(self):
        super().__init__()

        # Project input features into higher dimension
        self.input_proj = nn.Linear(FEATS, D_MODEL)

        # Add positional encoding for combined input + error sequence
        self.pos_encoder = PositionalEncoding(D_MODEL * 2, DROPOUT, window_size)

        # Define encoder block
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=D_MODEL * 2,
            nhead=NUM_HEADS,
            dim_feedforward=DIM_FEEDFORWARD,
            dropout=DROPOUT
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=NUM_ENCODER_LAYERS)

        # Define decoder block
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=D_MODEL * 2,
            nhead=NUM_HEADS,
            dim_feedforward=DIM_FEEDFORWARD,
            dropout=DROPOUT
        )
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=NUM_DECODER_LAYERS)

        # Output layer to project back to original feature dimension
        self.output_layer = nn.Sequential(
            nn.Linear(D_MODEL * 2, FEATS),
            nn.Sigmoid()
        )

    def encode(self, src, err, tgt):
        # Project source and error
        src_enc = self.input_proj(src)
        err_enc = self.input_proj(err)

        # Concatenate source and error for self-conditioning
        combined = torch.cat((src_enc, err_enc), dim=2)
        combined = combined * math.sqrt(D_MODEL)

        # Add positional encoding
        encoded = self.pos_encoder(combined)

        # Pass through transformer encoder
        memory = self.encoder(encoded)

        # Project and duplicate target for decoding
        tgt_proj = self.input_proj(tgt)
        tgt_combined = torch.cat((tgt_proj, tgt_proj), dim=2)

        return tgt_combined, memory

    def forward(self, src, tgt):
        # Phase 1: Without anomaly signal
        err = torch.zeros_like(src)
        tgt1, mem1 = self.encode(src, err, tgt)
        x1 = self.output_layer(self.decoder(tgt1, mem1))

        # Phase 2: With self-conditioning using reconstruction error
        err = (x1 - src) ** 2
        tgt2, mem2 = self.encode(src, err, tgt)
        x2 = self.output_layer(self.decoder(tgt2, mem2))

        return x2  # Final reconstructed sequence

## Train Model

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

# ----------- 2. Load and Normalize ----------- #
df = pd.read_csv(csv_path)
y = df[[target_col]].values
scaler_y = MinMaxScaler()
y_scaled = scaler_y.fit_transform(y)

print("\nDONE - Load and Normalize")

# ----------- 3. Create Sliding Window Data ----------- #
def create_windows(data, window_size):
    return np.array([data[i:i+window_size] for i in range(len(data) - window_size)])

X_all = create_windows(y_scaled, window_size)
y_all = X_all.copy()  # reconstruction target = input

print("\nDONE - Create Sliding Window")

# ----------- 4. Train-Test Split ----------- #

X_train, X_test = train_test_split(X_all, test_size=0.2, random_state=42)
# Convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

print("\nDONE - Train-Test Split")


# ----------- 6. Train the Model ----------- #
model = PeakAD_Transformer()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
train_loader = DataLoader(TensorDataset(X_train_tensor, X_train_tensor), batch_size=batch_size, shuffle=True)

model.train()
for epoch in range(num_epochs):
    total_loss = 0
    for xb, yb in train_loader:
        optimizer.zero_grad()
        x2 = model(xb, xb)
        loss = criterion(x2, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss:.4f}")

print("\n✅ DONE - Model Trained")




DONE - Load and Normalize

DONE - Create Sliding Window

DONE - Train-Test Split




Epoch 1/100, Loss: 0.0857
Epoch 2/100, Loss: 0.0620
Epoch 3/100, Loss: 0.0483
Epoch 4/100, Loss: 0.0353
Epoch 5/100, Loss: 0.0277
Epoch 6/100, Loss: 0.0210
Epoch 7/100, Loss: 0.0168
Epoch 8/100, Loss: 0.0139
Epoch 9/100, Loss: 0.0109
Epoch 10/100, Loss: 0.0091
Epoch 11/100, Loss: 0.0072
Epoch 12/100, Loss: 0.0058
Epoch 13/100, Loss: 0.0048
Epoch 14/100, Loss: 0.0042
Epoch 15/100, Loss: 0.0037
Epoch 16/100, Loss: 0.0033
Epoch 17/100, Loss: 0.0031
Epoch 18/100, Loss: 0.0027
Epoch 19/100, Loss: 0.0026
Epoch 20/100, Loss: 0.0025
Epoch 21/100, Loss: 0.0024
Epoch 22/100, Loss: 0.0023
Epoch 23/100, Loss: 0.0021
Epoch 24/100, Loss: 0.0021
Epoch 25/100, Loss: 0.0020
Epoch 26/100, Loss: 0.0019
Epoch 27/100, Loss: 0.0018
Epoch 28/100, Loss: 0.0017
Epoch 29/100, Loss: 0.0016
Epoch 30/100, Loss: 0.0015
Epoch 31/100, Loss: 0.0015
Epoch 32/100, Loss: 0.0015
Epoch 33/100, Loss: 0.0015
Epoch 34/100, Loss: 0.0014
Epoch 35/100, Loss: 0.0013
Epoch 36/100, Loss: 0.0012
Epoch 37/100, Loss: 0.0013
Epoch 38/1

## Plot RE

In [None]:
def plot_reconstruction_errors_per_timestep(model, scaler_y, X_input, threshold_percentile=98):
    model.eval()
    with torch.no_grad():
        x2 = model(X_input, X_input)  # [B, window, 1]

        # Use only the last timestep of each reconstructed sequence
        recon = x2[:, -1, 0].cpu().numpy()  # shape: [B]
        true = X_input[:, -1, 0].cpu().numpy()  # shape: [B]

        # Inverse transform back to original scale
        recon_orig = scaler_y.inverse_transform(recon.reshape(-1, 1)).flatten()
        true_orig = scaler_y.inverse_transform(true.reshape(-1, 1)).flatten()

        # Compute per-timestep error
        errors = np.abs(true_orig - recon_orig)

        # Threshold for peak detection
        threshold = np.percentile(errors, threshold_percentile)
        peaks = [i for i, e in enumerate(errors) if e > threshold]

        # Plot
        plt.figure(figsize=(14, 6))
        plt.plot(errors, label="Reconstruction Error (per timestamp)", alpha=0.8)
        plt.axhline(threshold, color="red", linestyle="--", label=f"{threshold_percentile}th percentile")
        plt.scatter(peaks, [errors[i] for i in peaks], color='red', s=10, label="Detected Peaks")
        plt.title("TranAD-style Peak Detection (1 error per timestamp)")
        plt.xlabel("Timestamp Index")
        plt.ylabel("Reconstruction Error")
        plt.legend()
        plt.grid(True)
        plt.show()

        return errors, peaks

## To shorten Dataset Length

In [None]:

df = pd.read_csv("/content/dataset.csv")

# Take 20% of the data
df_20 = df.sample(frac=0.2, random_state=42).reset_index(drop=True)

# Save to new file
df_20.to_csv("/content/dataset_20.csv", index=False)

print("✅ New file 'dataset_20.csv' created with 20% of the data.")

✅ New file 'dataset_20.csv' created with 20% of the data.


### Datasset Info

In [None]:
df_20.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8403 entries, 0 to 8402
Data columns (total 92 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   CYCLE         8403 non-null   int64  
 1   TORQUE|1      8403 non-null   float64
 2   TORQUE|2      8403 non-null   float64
 3   TORQUE|3      8403 non-null   float64
 4   TORQUE|4      8403 non-null   float64
 5   TORQUE|5      8403 non-null   float64
 6   TORQUE|6      8403 non-null   float64
 7   CTRL_DIFF2|1  8403 non-null   float64
 8   CTRL_DIFF2|2  8403 non-null   float64
 9   CTRL_DIFF2|3  8403 non-null   float64
 10  CTRL_DIFF2|4  8403 non-null   float64
 11  CTRL_DIFF2|5  8403 non-null   float64
 12  CTRL_DIFF2|6  8403 non-null   float64
 13  DES_POS|1     8403 non-null   float64
 14  DES_POS|2     8403 non-null   float64
 15  DES_POS|3     8403 non-null   float64
 16  DES_POS|4     8403 non-null   float64
 17  DES_POS|5     8403 non-null   float64
 18  DES_POS|6     8403 non-null 

In [None]:
df.shape

(8403, 92)

IMP Code snippets

In [None]:
# 8. Plot Reconstruction Errors (per timestamp)
def plot_reconstruction_errors_per_timestep(model, scaler_y, X_input, threshold_percentile=98):
    model.eval()
    with torch.no_grad():
        x2 = model(X_input, X_input)
        recon = x2.cpu().numpy().reshape(-1, 1)
        true = X_input.cpu().numpy().reshape(-1, 1)
        recon_orig = scaler_y.inverse_transform(recon).flatten()
        true_orig = scaler_y.inverse_transform(true).flatten()
        errors = np.abs(true_orig - recon_orig)

        threshold = np.percentile(errors, threshold_percentile)
        peaks = [i for i, e in enumerate(errors) if e > threshold]

        # Plot
        plt.figure(figsize=(12, 6))
        plt.plot(errors, label="Reconstruction Error (per timestamp)")
        plt.axhline(threshold, color="red", linestyle="--", label=f"{threshold_percentile}th percentile")
        plt.scatter(peaks, [errors[i] for i in peaks], color='red', s=10, label="Detected Peaks")
        plt.title("TranAD-style Peak Detection (Transformer)")
        plt.xlabel("Timestamp Index")
        plt.ylabel("Reconstruction Error")
        plt.legend()
        plt.grid(True)
        plt.show()

        return errors, peaks

# 9. Run and Plot
errors, peaks = plot_reconstruction_errors_per_timestep(model, scaler_y, X_test_tensor)
print(f"\n✅ DONE - {len(peaks)} Peaks Detected")