# HDFS Anomaly Detection Screener

This notebook loads the pre-trained AllLinLog model and runs inference on the HDFS test set for anomaly screening.

## Requirements
- `torch`
- `numpy`
- `pandas`
- `scikit-learn`
- `tqdm`
- `tiktoken`
- `linformer`

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm
import tiktoken
from linformer import Linformer
from datetime import datetime
import time
import random
import os
import re

# Configuration
LOG_FILE = "./logs/HDFS.log"
LABEL_FILE = "./logs/anomaly_label_HDFS.csv"
MODEL_PATH = "./best_model_HDFS/best_model_HDFS20250804_201746.pth"
WINDOW = 'session'  # HDFS uses session-based windowing
TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
TEST_RATIO = 0.15
SEED = 42
BATCH_SIZE = 8
MAX_TOKEN_LENGTH = 18000  # Will be updated after data loading

# Model hyperparameters (must match training)
CL100K_VOCAB_SIZE = 100264  # GPT4 BPE
EMBEDDING_DIM = 128
FF_HIDDEN_DIM = 128
NUM_LAYERS = 1
NUM_HEADS = 4
K = 32  # Linformer projection dimension
DROPOUT = 0.5
MAX_SEGMENT_LENGTHS = 298  # Must match trained model checkpoint

def set_seed(seed=42):
    """Set random seed for reproducibility."""
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"Random seed set to {seed}")

set_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

Random seed set to 42
Using device: cuda


In [3]:
# Dataset class
class LogDataset(Dataset):
    def __init__(self, sessions):
        self.sessions = sessions

    def __len__(self):
        return len(self.sessions)

    def __getitem__(self, idx):
        session = self.sessions[idx]
        return {
            'input_ids': session['input_ids'],
            'segment_ids': session['segment_ids'],
            'session_label': session['session_label']
        }


def load_gpt4_tokenizer():
    """Load the GPT-4 BPE tokenizer."""
    print("Loading cl100k_base (GPT-4) tokenizer...")
    return tiktoken.get_encoding("cl100k_base")


def tokenize_and_construct_input(log_sequence, tokenizer, max_len=18000):
    """Tokenize log messages and construct input IDs and segment IDs."""
    input_ids = []
    segment_ids = []

    allowed_special = {"<|startoftext|>", "<|endoftext|>"}
    bos_token = tokenizer.encode("<|startoftext|>", allowed_special=allowed_special)[0]
    eos_token = tokenizer.encode("<|endoftext|>", allowed_special=allowed_special)[0]

    for i, log in enumerate(log_sequence):
        tokens = tokenizer.encode(log, allowed_special=allowed_special)
        if i == 0:
            tokens = [bos_token] + tokens
        tokens = tokens + [eos_token]
        input_ids.extend(tokens)
        segment_ids.extend([i] * len(tokens))

    if len(input_ids) > max_len:
        input_ids = input_ids[:max_len]
        segment_ids = segment_ids[:max_len]

    return input_ids, segment_ids


def create_sessions_with_segment_ids(log_data, tokenizer, label_file=None, max_len=18000):
    """Process HDFS log data into sessions grouped by block ID."""
    # Group logs by block ID
    session_dict = {}
    for line in tqdm(log_data, desc="Grouping logs by session"):
        tokens = line.split()
        if len(tokens) < 2:
            continue
        try:
            timestamp_str = " ".join(tokens[:2])
            timestamp = datetime.strptime(timestamp_str, '%y%m%d %H%M%S').timestamp()
        except Exception:
            continue

        blk_ids = list(set(re.findall(r'(blk_-?\d+)', line)))
        if len(blk_ids) != 1:
            continue
        blk_id = blk_ids[0]

        if blk_id not in session_dict:
            session_dict[blk_id] = []
        session_dict[blk_id].append((timestamp, line))

    # Load label mapping if provided
    label_mapping = {}
    if label_file:
        label_df = pd.read_csv(label_file, engine='c', na_filter=False)
        label_df = label_df.set_index("BlockId")
        label_mapping = label_df["Label"].to_dict()

    sessions = []
    for blk_id, events in tqdm(session_dict.items(), desc="Processing sessions"):
        events.sort(key=lambda x: x[0])
        log_sequence = [msg for (ts, msg) in events]
        if label_file:
            session_label = 1 if label_mapping.get(blk_id, "Normal") == "Anomaly" else 0
        else:
            session_label = 0
        input_ids, segment_ids = tokenize_and_construct_input(log_sequence, tokenizer, max_len)
        sessions.append({
            "block_id": blk_id,
            "input_ids": input_ids,
            "segment_ids": segment_ids,
            "session_label": session_label
        })

    return sessions

In [4]:
# Model Architecture
class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, max_seq_len, segment_vocab_size, embedding_dim=128):
        super(EmbeddingLayer, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.segment_embedding = nn.Embedding(segment_vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(max_seq_len, embedding_dim)

    def forward(self, input_ids, segment_ids, position_ids=None):
        if position_ids is None:
            position_ids = torch.arange(input_ids.size(1), device=input_ids.device).unsqueeze(0).repeat(input_ids.size(0), 1)
        E_token = self.token_embedding(input_ids)
        E_segment = self.segment_embedding(segment_ids)
        E_position = self.position_embedding(position_ids)
        return E_token + E_segment + E_position


class LinformerEncoderLayer(nn.Module):
    def __init__(self, embedding_dim, seq_len, num_heads=2, ff_hidden_dim=128, k=128, dropout=0.1):
        super(LinformerEncoderLayer, self).__init__()
        self.self_attention = Linformer(
            dim=embedding_dim,
            seq_len=int(seq_len),
            depth=1,
            heads=num_heads,
            k=k,
            one_kv_head=True,
            share_kv=True
        )
        self.norm1 = nn.LayerNorm(embedding_dim)
        self.ffn = nn.Sequential(
            nn.Linear(embedding_dim, ff_hidden_dim),
            nn.ReLU(),
            nn.Linear(ff_hidden_dim, embedding_dim)
        )
        self.norm2 = nn.LayerNorm(embedding_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attention_output = self.self_attention(x)
        x = self.norm1(x + self.dropout(attention_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x


class LinformerTransformerEncoder(nn.Module):
    def __init__(self, num_layers, embedding_dim, seq_len, num_heads=2, ff_hidden_dim=128, k=128, dropout=0.1):
        super(LinformerTransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([
            LinformerEncoderLayer(embedding_dim, seq_len, num_heads, ff_hidden_dim, k, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x


class AllLinLog(nn.Module):
    def __init__(self, vocab_size, max_seq_len, segment_vocab_size, embedding_dim=128,
                 num_layers=1, num_heads=2, ff_hidden_dim=128, k=128, num_classes=2, dropout=0.1, max_segment_lengths=100):
        super(AllLinLog, self).__init__()
        self.embedding_layer = EmbeddingLayer(vocab_size, max_seq_len, segment_vocab_size=max_segment_lengths, embedding_dim=embedding_dim)
        self.encoder = LinformerTransformerEncoder(num_layers, embedding_dim, max_seq_len, num_heads, ff_hidden_dim, k, dropout)
        self.fc = nn.Linear(embedding_dim, num_classes)

    def forward(self, input_ids, segment_ids, position_ids, attention_mask=None):
        embeddings = self.embedding_layer(input_ids, segment_ids, position_ids)
        encoder_output = self.encoder(embeddings)
        pooled_output = torch.mean(encoder_output, dim=1)
        logits = self.fc(pooled_output)
        return logits

In [5]:
# Load and prepare data (same split as training for reproducibility)
print("Loading logs from:", LOG_FILE)
start_time = time.time()

with open(LOG_FILE, mode="r", encoding='utf8') as f:
    logs = [x.strip() for x in tqdm(f, desc="Reading Logs")]

print(f"Loaded {len(logs)} logs in {time.time() - start_time:.2f} seconds.")

tokenizer = load_gpt4_tokenizer()
all_sessions = create_sessions_with_segment_ids(logs, tokenizer, label_file=LABEL_FILE, max_len=MAX_TOKEN_LENGTH)

# Calculate max token length
token_lengths = [len(session["input_ids"]) for session in all_sessions]
MAX_TOKEN_LENGTH = max(token_lengths)
print(f"Max tokens in sessions: {MAX_TOKEN_LENGTH}")
print(f"Number of sessions: {len(all_sessions)}")

# Perform the same stratified split as training
session_labels = [s["session_label"] for s in all_sessions]

# First split: train and temp (val+test)
train_sessions, temp_sessions, train_labels, temp_labels = train_test_split(
    all_sessions,
    session_labels,
    test_size=(1 - TRAIN_RATIO),
    stratify=session_labels,
    random_state=42
)

# Second split: val and test
val_relative = VAL_RATIO / (VAL_RATIO + TEST_RATIO)
temp_labels = [s["session_label"] for s in temp_sessions]
val_sessions, test_sessions, val_labels, test_labels = train_test_split(
    temp_sessions,
    temp_labels,
    test_size=(1 - val_relative),
    stratify=temp_labels,
    random_state=42
)

print(f"\nDataset Split:")
print(f"Train sessions: {len(train_sessions)} | Val sessions: {len(val_sessions)} | Test sessions: {len(test_sessions)}")

# Test set statistics
test_normal = sum(s['session_label'] == 0 for s in test_sessions)
test_anomalous = sum(s['session_label'] == 1 for s in test_sessions)
print(f"\nTest set => Normal: {test_normal} | Anomalous: {test_anomalous}")
print(f"Anomalous ratio: {test_anomalous/(test_anomalous + test_normal):.2%}")

Loading logs from: ./logs/HDFS.log


Reading Logs: 11175629it [00:04, 2713327.46it/s]


Loaded 11175629 logs in 4.12 seconds.
Loading cl100k_base (GPT-4) tokenizer...


Grouping logs by session: 100%|██████████| 11175629/11175629 [00:54<00:00, 205719.94it/s]
Processing sessions: 100%|██████████| 575061/575061 [03:14<00:00, 2960.53it/s] 


Max tokens in sessions: 15166
Number of sessions: 575061

Dataset Split:
Train sessions: 402542 | Val sessions: 86259 | Test sessions: 86260

Test set => Normal: 83734 | Anomalous: 2526
Anomalous ratio: 2.93%


In [6]:
# Collate function for DataLoader
def collate_fn(batch):
    input_ids = [torch.tensor(item["input_ids"], dtype=torch.long) for item in batch]
    segment_ids = [torch.tensor(item["segment_ids"], dtype=torch.long) for item in batch]
    session_labels = torch.tensor([item["session_label"] for item in batch], dtype=torch.long)

    padded_input_ids = pad_sequence(input_ids, batch_first=True, padding_value=0)
    padded_segment_ids = pad_sequence(segment_ids, batch_first=True, padding_value=0)

    padded_input_ids = padded_input_ids[:, :MAX_TOKEN_LENGTH]
    padded_segment_ids = padded_segment_ids[:, :MAX_TOKEN_LENGTH]

    # Clamp segment_ids to valid range for embedding
    padded_segment_ids = torch.clamp(padded_segment_ids, 0, MAX_SEGMENT_LENGTHS - 1)

    attention_masks = (padded_input_ids != 0).long()

    return padded_input_ids, padded_segment_ids, attention_masks, session_labels


# Create test dataloader
test_dataset = LogDataset(test_sessions)
test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    collate_fn=collate_fn,
    pin_memory=True
)

print(f"Test DataLoader created with {len(test_loader)} batches")

Test DataLoader created with 10783 batches


In [7]:
# Load the pre-trained model
print(f"Loading model from: {MODEL_PATH}")

model = AllLinLog(
    vocab_size=CL100K_VOCAB_SIZE,
    max_seq_len=MAX_TOKEN_LENGTH,
    segment_vocab_size=MAX_SEGMENT_LENGTHS,
    embedding_dim=EMBEDDING_DIM,
    num_layers=NUM_LAYERS,
    num_heads=NUM_HEADS,
    ff_hidden_dim=FF_HIDDEN_DIM,
    k=K,
    num_classes=2,
    dropout=DROPOUT,
    max_segment_lengths=MAX_SEGMENT_LENGTHS
).to(device)

model.load_state_dict(torch.load(MODEL_PATH, map_location=device, weights_only=True))
model.eval()

total_params = sum(p.numel() for p in model.parameters())
print(f"Model loaded successfully!")
print(f"Total parameters: {total_params:,}")
print(f"Model size: {total_params * 4 / (1024**2):.2f} MB")

Loading model from: ./best_model_HDFS/best_model_HDFS20250804_201746.pth
Model loaded successfully!
Total parameters: 15,501,506
Model size: 59.13 MB


In [18]:
# Run inference on test set
def evaluate_test_set(model, test_loader, device):
    """Evaluate the model on the test set and return predictions."""
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for batch in tqdm(test_loader, desc="Running Inference"):
            input_ids, segment_ids, attention_masks, labels = [b.to(device) for b in batch]
            logits = model(input_ids, segment_ids, position_ids=None, attention_mask=attention_masks)
            probs = torch.softmax(logits, dim=1)
            preds = logits.argmax(dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    return np.array(all_preds), np.array(all_labels), np.array(all_probs)


# Run evaluation
print("\n" + "="*60)
print("RUNNING INFERENCE ON TEST SET")
print("="*60)

predictions, labels, probabilities = evaluate_test_set(model, test_loader, device)


RUNNING INFERENCE ON TEST SET


Running Inference: 100%|██████████| 10783/10783 [00:28<00:00, 376.04it/s]


In [19]:
# Generate and display results
target_names = ["Normal", "Anomalous"]

print("\n" + "="*60)
print("CLASSIFICATION REPORT")
print("="*60)
print(classification_report(labels, predictions, target_names=target_names, digits=5))

print("\n" + "="*60)
print("CONFUSION MATRIX")
print("="*60)
cm = confusion_matrix(labels, predictions)
cm_df = pd.DataFrame(cm, index=target_names, columns=[f"Pred_{n}" for n in target_names])
print(cm_df)

# Calculate key metrics
accuracy = (predictions == labels).mean()
tn, fp, fn, tp = cm.ravel()

print("\n" + "="*60)
print("SUMMARY METRICS")
print("="*60)
print(f"Accuracy: {accuracy:.4f}")
print(f"True Positives (Anomalies detected): {tp}")
print(f"True Negatives (Normal correctly identified): {tn}")
print(f"False Positives (False alarms): {fp}")
print(f"False Negatives (Missed anomalies): {fn}")


CLASSIFICATION REPORT
              precision    recall  f1-score   support

      Normal    0.99952   0.99992   0.99972     83734
   Anomalous    0.99719   0.98416   0.99064      2526

    accuracy                        0.99946     86260
   macro avg    0.99836   0.99204   0.99518     86260
weighted avg    0.99945   0.99946   0.99945     86260


CONFUSION MATRIX
           Pred_Normal  Pred_Anomalous
Normal           83727               7
Anomalous           40            2486

SUMMARY METRICS
Accuracy: 0.9995
True Positives (Anomalies detected): 2486
True Negatives (Normal correctly identified): 83727
False Positives (False alarms): 7
False Negatives (Missed anomalies): 40


In [22]:
# Screener function for new HDFS log sessions
def screen_hdfs_logs(log_messages, model, tokenizer, device, max_len=18000, max_segment=298):
    """
    Screen a sequence of HDFS log messages for anomalies.
    
    Args:
        log_messages: List of log message strings (from one block/session)
        model: Trained AllLinLog model
        tokenizer: GPT-4 tokenizer
        device: torch device
        max_len: Maximum token length (for truncation only)
        max_segment: Maximum segment ID for embedding
    
    Returns:
        dict with prediction, probability, and confidence
    """
    model.eval()
    
    # Tokenize
    input_ids, segment_ids = tokenize_and_construct_input(log_messages, tokenizer, max_len)
    
    # Clamp segment_ids to valid range
    segment_ids = [min(s, max_segment - 1) for s in segment_ids]
    
    # Convert to tensors - DO NOT pad to max_len (causes signal dilution in mean pooling)
    input_ids_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)
    segment_ids_tensor = torch.tensor([segment_ids], dtype=torch.long).to(device)
    
    attention_mask = (input_ids_tensor != 0).long()
    
    with torch.no_grad():
        logits = model(input_ids_tensor, segment_ids_tensor, position_ids=None, attention_mask=attention_mask)
        probs = torch.softmax(logits, dim=1)
        pred = logits.argmax(dim=1).item()
    
    return {
        "prediction": "Anomalous" if pred == 1 else "Normal",
        "anomaly_probability": probs[0, 1].item(),
        "normal_probability": probs[0, 0].item(),
        "confidence": probs[0, pred].item(),
        "sequence_length": len(input_ids)
    }


# Example usage
print("\nScreener function ready!")
print("Usage: result = screen_hdfs_logs(log_messages_list, model, tokenizer, device)")


Screener function ready!
Usage: result = screen_hdfs_logs(log_messages_list, model, tokenizer, device)


In [25]:
# Demo: Screen sample sessions from test set
print("\n" + "="*60)
print("DEMO: Screening sample sessions from test set")
print("="*60)

# Get a few samples (normal and anomalous)
normal_samples = [s for s in test_sessions if s["session_label"] == 0][:2]
anomalous_samples = [s for s in test_sessions if s["session_label"] == 1][11:13]

for i, sample in enumerate(normal_samples + anomalous_samples):
    actual = "Normal" if sample["session_label"] == 0 else "Anomalous"
    block_id = sample.get("block_id", "Unknown")
    
    # Process through model - DO NOT pad to MAX_TOKEN_LENGTH
    # Just use the actual sequence length (matching DataLoader behavior)
    input_ids = torch.tensor([sample["input_ids"]], dtype=torch.long).to(device)
    segment_ids = torch.tensor([sample["segment_ids"]], dtype=torch.long).to(device)
    
    # Clamp segment_ids
    segment_ids = torch.clamp(segment_ids, 0, MAX_SEGMENT_LENGTHS - 1)
    
    attention_mask = (input_ids != 0).long()
    
    with torch.no_grad():
        logits = model(input_ids, segment_ids, position_ids=None, attention_mask=attention_mask)
        probs = torch.softmax(logits, dim=1)
        pred = "Anomalous" if logits.argmax(dim=1).item() == 1 else "Normal"
    
    status = "✓" if pred == actual else "✗"
    print(f"\nSample {i+1} (Block: {block_id}): Actual={actual}, Predicted={pred} {status}")
    print(f"  Anomaly probability: {probs[0, 1].item():.4f}")
    print(f"  Sequence length: {len(sample['input_ids'])} tokens")


DEMO: Screening sample sessions from test set

Sample 1 (Block: blk_-7247421860583020514): Actual=Normal, Predicted=Normal ✓
  Anomaly probability: 0.0000
  Sequence length: 904 tokens

Sample 2 (Block: blk_4609193020729643794): Actual=Normal, Predicted=Normal ✓
  Anomaly probability: 0.0000
  Sequence length: 904 tokens

Sample 3 (Block: blk_-7082627475978321318): Actual=Anomalous, Predicted=Anomalous ✓
  Anomaly probability: 1.0000
  Sequence length: 213 tokens

Sample 4 (Block: blk_-221433238572346863): Actual=Anomalous, Predicted=Anomalous ✓
  Anomaly probability: 1.0000
  Sequence length: 701 tokens
