## Imports

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

## Load Data

In [2]:
X = np.load("./data/numpy/dataset_X.npy")

X_train, X_test = train_test_split(X, test_size=0.2, random_state=42)

print("Train shape:", X_train.shape)
print("Test shape:", X_test.shape)

Train shape: (1415678, 100)
Test shape: (353920, 100)


## Dataset Class (Unsupervised)

In [3]:
class SequenceDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.LongTensor(self.sequences[idx])

## LSTM Autoencoder Model

In [4]:
class LSTMAutoencoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim=64, hidden_dim=128):
        super(LSTMAutoencoder, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # Encoder
        self.encoder = nn.LSTM(
            embedding_dim,
            hidden_dim,
            batch_first=True
        )

        # Decoder
        self.decoder = nn.LSTM(
            hidden_dim,
            embedding_dim,
            batch_first=True
        )

        self.output_layer = nn.Linear(embedding_dim, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)

        # Encode
        _, (hidden, cell) = self.encoder(embedded)

        # Repeat hidden state across sequence length
        seq_len = x.size(1)
        repeated_hidden = hidden[-1].unsqueeze(1).repeat(1, seq_len, 1)

        # Decode
        decoded, _ = self.decoder(repeated_hidden)

        output = self.output_layer(decoded)

        return output

## Create DataLoaders

In [5]:
train_dataset = SequenceDataset(X_train)
test_dataset = SequenceDataset(X_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

## Initialize Model

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = int(np.max(X) + 1)

model = LSTMAutoencoder(vocab_size).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

## Training Loop (Reconstruction Learning)

In [7]:
epochs = 10

for epoch in range(epochs):
    model.train()
    total_loss = 0

    for sequences in train_loader:
        sequences = sequences.to(device)

        optimizer.zero_grad()

        outputs = model(sequences)

        # reshape for cross entropy
        outputs = outputs.view(-1, vocab_size)
        targets = sequences.view(-1)

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}")

Epoch [1/10], Loss: 1.0402
Epoch [2/10], Loss: 0.8254
Epoch [3/10], Loss: 0.8360
Epoch [4/10], Loss: 0.6820
Epoch [5/10], Loss: 0.6380
Epoch [6/10], Loss: 0.6408
Epoch [7/10], Loss: 0.6856
Epoch [8/10], Loss: 0.7357
Epoch [9/10], Loss: 0.6801
Epoch [10/10], Loss: 0.6287


## Compute Reconstruction Error (Anomaly Score)

In [8]:
def compute_reconstruction_error(model, loader):
    model.eval()
    errors = []

    with torch.no_grad():
        for sequences in loader:
            sequences = sequences.to(device)
            outputs = model(sequences)

            loss = criterion(
                outputs.view(-1, vocab_size),
                sequences.view(-1)
            )

            errors.append(loss.item())

    return np.array(errors)

## Detect Anomalies

In [9]:
train_errors = compute_reconstruction_error(model, train_loader)
test_errors = compute_reconstruction_error(model, test_loader)

# Set threshold (example: mean + 2*std)
threshold = train_errors.mean() + 2 * train_errors.std()

anomalies = test_errors > threshold

print("Threshold:", threshold)
print("Number of anomalies detected:", anomalies.sum())

Threshold: 0.6919593113021225
Number of anomalies detected: 186
