In [1]:
import torch
import torch.nn as nn

class LSTMAutoencoder(nn.Module):
    """
    LSTM Autoencoder for multivariate time series anomaly detection.
    
    Architecture:
        Encoder: Input -> LSTM -> Hidden State (bottleneck)
        Decoder: Hidden State -> Repeat -> LSTM -> Reconstructed Output
    
    Args:
        input_dim (int): Number of features (e.g., 1 for univariate, 5 for multivariate)
        hidden_dim (int): Hidden size of LSTM (bottleneck dimension)
        num_layers (int): Number of LSTM layers (default: 1)
        dropout (float): Dropout rate (default: 0.0)
    """
    def __init__(self, input_dim: int, hidden_dim: int = 64, num_layers: int = 1, dropout: float = 0.0):
        super().__init__()
        
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # Encoder: compress time series into a latent vector
        self.encoder = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        
        # Decoder: reconstruct the sequence from latent vector
        self.decoder = nn.LSTM(
            input_size=hidden_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        
        # Output projection: map decoder output to original feature space
        self.fc_out = nn.Linear(hidden_dim, input_dim)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass: reconstruct input sequence.
        
        Args:
            x (torch.Tensor): Input time series of shape [B, T, D]
                B = batch size
                T = sequence length
                D = number of features (input_dim)
                
        Returns:
            recon_x (torch.Tensor): Reconstructed sequence of shape [B, T, D]
        """
        B, T, D = x.shape
        
        # Encode: get final hidden state
        _, (h_n, c_n) = self.encoder(x)  # h_n: [num_layers, B, hidden_dim]
        
        # Use last layer's hidden state as bottleneck
        latent = h_n[-1]  # [B, hidden_dim]
        
        # Repeat latent vector to match sequence length
        decoder_input = latent.unsqueeze(1).repeat(1, T, 1)  # [B, T, hidden_dim]
        
        # Decode
        decoder_output, _ = self.decoder(decoder_input)
        
        # Project to original feature space
        recon_x = self.fc_out(decoder_output)  # [B, T, D]
        
        return recon_x

In [None]:
import torch
import numpy as np

# Simulate 1000 sequences of 50-timestep CPU usage (normal + some anomalies)
np.random.seed(42)
#生成一个模拟的“正常”时间序列数据集,950条时间序列.每一个序列50个时间步.每一个时间步有一个变量.共47500个浮点数
# 从均值0.5,标准差0.1的正太分布中随机采样
# [ 
#  seq0_t0_f0, seq0_t1_f0, ..., seq0_t49_f0,
#  seq1_t0_f0, seq1_t1_f0, ..., seq1_t49_f0,
#  ...
#  seq949_t0_f0, ..., seq949_t49_f0
# ]
# seqX 表示第 X 条时间序列（共 950 条）
# tY 表示第 Y 个时间步（0 到 49）
# f0 表示第 0 个特征（因为只有 1 个特征）
normal_data = np.random.normal(loc=0.5, scale=0.1, size=(950, 50, 1)) 
anomaly_data = np.random.normal(loc=0.9, scale=0.2, size=(50, 50, 1))  # high CPU
# normal_data:   [样本0, 样本1, ..., 样本949]     → 950 个样本
# anomaly_data:  [样本0, 样本1, ..., 样本49]      → 50 个样本
# ------------------------------------------------------------
# data:          [样本0～949 (正常), 样本0～49 (异常)] → 1000 个样本
# 这是个拼接动作,np.concatenate([normal_data, anomaly_data], axis=0) 时，除了拼接轴（axis=0）之外，其他所有维度的大小必须完全一致。
# 合并两个NumPy 数组,类型为NumPy ndarray . 形状(1000,50,1), 数据类型float64. 存储在RAM中,由NumPy管理
data = np.concatenate([normal_data, anomaly_data], axis=0)
# 生成一个torch.tensor , 类型为torch.float32 , 保存在cpu上.
data = torch.tensor(data, dtype=torch.float32)

# Create model
model = LSTMAutoencoder(input_dim=1, hidden_dim=32)
criterion = nn.MSELoss(reduction='none')  # per-timestep loss
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training loop (unsupervised)
model.train()
for epoch in range(50):
    optimizer.zero_grad()
    recon = model(data)
    loss = criterion(recon, data).mean()
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")

# Anomaly scoring: reconstruction error per sequence
model.eval()
with torch.no_grad():
    recon = model(data)
    seq_errors = torch.mean((recon - data) ** 2, dim=[1, 2])  # [1000]

# Threshold: top 5% as anomalies
threshold = torch.quantile(seq_errors, 0.95)
anomaly_pred = seq_errors > threshold

print(f"Detected {anomaly_pred.sum().item()} anomalies (expected ～50)")

Epoch 0, Loss: 0.338000
Epoch 10, Loss: 0.182544
Epoch 20, Loss: 0.037992
Epoch 30, Loss: 0.032330
Epoch 40, Loss: 0.025277
Detected 50 anomalies (expected ～50)


In [7]:
type(normal_data)

numpy.ndarray