In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!mkdir -p drive/MyDrive/data/{hdfs_tokenized,hdfs_bert}
!mkdir -p drive/MyDrive/data/torch_dataset/{train,test}


In [None]:
!pip freeze | grep keras

In [None]:
!pip install tensorflow==2.15.0 --upgrade

In [None]:
!pip install transformers[torch] datasets tokenizers evaluate --upgrade

In [1]:
from datasets import load_dataset, load_from_disk
from transformers import BertConfig, BertForMaskedLM, DataCollatorForLanguageModeling, TrainingArguments, Trainer
from transformers import BertTokenizerFast
import numpy as np
from transformers import AutoConfig, AutoModelForMaskedLM
from transformers import TrainingArguments, Trainer
from transformers import AutoTokenizer
from itertools import chain
import pandas as pd
import pathlib
import re
import evaluate
import torch
from transformers import pipeline
import torch.nn.functional as F
from typing import List


2024-03-09 21:52:48.159576: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-09 21:52:48.159712: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-09 21:52:48.192540: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-03-09 21:52:48.335203: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
IS_COLAB = False
COLAB_PREFIX = 'drive/MyDrive/' if IS_COLAB else ''

MAX_LENGTH = 128
HDFS_UNCLEANED_PATH = f"{COLAB_PREFIX}data/hdfsv1.log"
HDFS_CLEANED_PATH = f"{COLAB_PREFIX}data/hdfsv1_regex.log"
TOKENIZER_PATH = f'{COLAB_PREFIX}data/hdfs_tokenized'

files = [HDFS_CLEANED_PATH]

# Pre-Processing

In [None]:
log = pathlib.Path(HDFS_UNCLEANED_PATH).read_text()

## Regex substitution

In [5]:
ipv4_pattern = '(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])'
ipv6_pattern = '(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)(?:%25(?:[A-Za-z0-9\\-._~]|%[0-9A-Fa-f]{2})+)?'
number_pattern = '-?\d+'
datetime_pattern = '\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}'

In [4]:
def regex_clean(text):
    text = re.sub(ipv4_pattern, 'IP', text)
    text = re.sub(ipv6_pattern, 'IP', text)
    text = re.sub(datetime_pattern, 'DATE', text)
    text = re.sub(number_pattern, 'NUM', text)
    return text

In [None]:
with open('hdfsv1_regex.log', 'w') as f:
    f.write(regex_clean(log))

## Fitting WordPiece tokenizer

In [3]:
# tokenizer = BertWordPieceTokenizer()
# tokenizer.train(files)
# tokenizer.save_model(TOKENIZER_PATH)
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

## Dataset preparation

In [None]:
dataset = load_dataset('text', data_files=['data/hdfsv1_regex.log'], split='train')

In [None]:
train_dataset = dataset.select(range(0, 1000))
test_dataset = dataset.select(range(1000, 2000))

In [None]:
# dataset = dataset.train_test_split(train_size=0.01, test_size=0.01, shuffle=False)

In [None]:
def encode(examples):
    """Mapping function to tokenize the sentences passed without truncation"""
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=MAX_LENGTH, return_special_tokens_mask=True)

# tokenizing the train dataset
train_dataset = train_dataset.map(encode, batched=True)
# tokenizing the testing dataset
test_dataset = test_dataset.map(encode, batched=True)

In [None]:
# Main data processing function that will concatenate all texts from our dataset and generate chunks of
# max_seq_length.
# grabbed from: https://github.com/huggingface/transformers/blob/main/examples/pytorch/language-modeling/run_mlm.py
def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
    # customize this part to your needs.
    if total_length >= MAX_LENGTH:
        total_length = (total_length // MAX_LENGTH) * MAX_LENGTH
    # Split by chunks of max_len.
    result = {
        k: [t[i : i + MAX_LENGTH] for i in range(0, total_length, MAX_LENGTH)]
        for k, t in concatenated_examples.items()
    }
    return result

# Note that with `batched=True`, this map processes 1,000 texts together, so group_texts throws away a
# remainder for each of those groups of 1,000 texts. You can adjust that batch_size here but a higher value
# might be slower to preprocess.
#
# To speed up this part, we use multiprocessing. See the documentation of the map method for more information:
# https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.map
train_dataset = train_dataset.map(group_texts, batched=True,
                                desc=f"Grouping texts in chunks of {MAX_LENGTH}")
test_dataset = test_dataset.map(group_texts, batched=True,
                                desc=f"Grouping texts in chunks of {MAX_LENGTH}")


In [None]:
# convert them from lists to torch tensors
train_dataset.set_format("torch")
test_dataset.set_format("torch")

In [None]:
train_dataset.save_to_disk('drive/MyDrive/data/torch_dataset/train')
test_dataset.save_to_disk('drive/MyDrive/data/torch_dataset/test')

# Training LAnoBERT

First, we'll load all our saved datasets and trained tokenizer.

In [None]:
train_dataset = load_from_disk('data/torch_dataset/train')
test_dataset = load_from_disk('data/torch_dataset/test')

In [None]:
from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm_probability=0.15)

In [None]:
# model_config = BertConfig(vocab_size=tokenizer.vocab_size, max_position_embeddings=MAX_LENGTH)
# model = BertForMaskedLM(config=model_config)

config = AutoConfig.from_pretrained("google-bert/bert-base-uncased")
model = AutoModelForMaskedLM.from_config(config)

training_args = TrainingArguments(
    output_dir='data/hdfs_bert',
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    weight_decay=0.01,
    num_train_epochs =1,
    eval_accumulation_steps=2,
    # per_device_train_batch_size=5,
    # gradient_accumulation_steps=8,  # accumulating the gradients before updating the weights
    # per_device_eval_batch_size=64,
    logging_steps=1,             # evaluate, log and save model checkpoints every 1000 step
    save_strategy='epoch',
    load_best_model_at_end=True,  # whether to load the best model (in terms of loss) at the end of training
    save_total_limit=3,           # whether you don't have much space so you let only 3 model weights saved in the disk
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

In [None]:
trainer.train()

# Manual Inference

In [6]:
MODEL_PATH = 'data/hdfs_bert/checkpoint-99'
model = BertForMaskedLM.from_pretrained(MODEL_PATH)

In [None]:
mask_filler = pipeline('fill-mask', model, tokenizer=tokenizer)

## LAnoBERT inference algorithm

- Input: $l_n$
- Output: $abnormal_{loss}$, $abnormal_{prob}$

$s_{len}$: Sequence length of a log

$d$: Embedding dimension

$l_n$: An individual log sequence $\in \mathbb{R}^{s_{len} \times d}$

LAnoBERT: Proposed model

TOPK: Top-k aggregation functions

In [7]:
from torch.nn import CrossEntropyLoss

# Caching
KEY = set()
DICT = {}

def calculate_abnormal_scores(logs: List[str]):
    """
    Inputs
    --------
    logs: list[str] - List of log sequences
    s_len: int - sequence length of every log
    model: BERT

    Outputs
    -------
    List of anomaly loss and probability score pairs for every log seq
    """

    def top_k(tensor: List[int], k: int, strategy = 'max'):
        return torch.topk(torch.tensor(tensor), k, dim = -1, largest = strategy == 'max').values.mean().item()


    # Top-k parameter
    K = 5
    scores = [] # Output

    for log in logs:
        log = regex_clean(log)
        if log not in DICT:
            log_tokens = tokenizer([log], return_tensors='pt')
            input_ids = log_tokens['input_ids']
            batch_labels = []
            batch_tokens = []
            for i in range(1, log_tokens['input_ids'].shape[1] - 1):
                labels = input_ids.detach().clone()
                curr_mask_i = input_ids.detach().clone()
                curr_mask_i[0, i] = tokenizer.mask_token_id
                labels[curr_mask_i != tokenizer.mask_token_id] = -100

                batch_tokens.append(curr_mask_i)
                batch_labels.append(labels)

            s_len = log_tokens['input_ids'].shape[1] # number of tokens

            batch_size = 50
            batch_tokens = torch.concat(batch_tokens).split(batch_size)
            batch_labels = torch.concat(batch_labels).split(batch_size)

            P, L = [], []
            for b in range(len(batch_tokens)):
                l = batch_labels[b]
                curr_batch_size = l.shape[0]
                attention_mask = torch.ones((curr_batch_size, s_len), dtype=torch.long)
                token_type_ids = torch.zeros(curr_batch_size, s_len, dtype=torch.long)

                output = model(input_ids=batch_tokens[b], attention_mask=attention_mask, token_type_ids=token_type_ids, labels = l)
                logits, loss = output.logits, output.loss

                lfn = CrossEntropyLoss(ignore_index=-100, reduction='none')
                loss = lfn(logits.view(-1, 30522), l.view(-1)).view(logits.size(0), -1).sum(dim=1)
                probabilities = F.softmax(logits, dim = -1)
                values, indices = torch.max(l, dim = 1)
                prob = probabilities[torch.arange(indices.size(0)), indices, values]

                P.extend(prob.tolist())
                L.extend(loss.tolist())

            abnormal_loss = top_k(L, K)
            abnormal_prob = top_k(P, K, 'min')

            KEY.add(log)
            DICT[log] = (abnormal_loss, abnormal_prob)
            scores.append((abnormal_loss, abnormal_prob))
        else:
            abnormal_loss, abnormal_prob = DICT[log]
            scores.append((abnormal_loss, abnormal_prob))
    return scores

# Evaluation on HDFS

In [8]:
true_labels = pd.read_csv('./data/preprocessed/anomaly_label.csv')
true_labels['Label'] = (true_labels['Label'] == 'Anomaly').astype('int64')

In [9]:
log = pathlib.Path(HDFS_UNCLEANED_PATH).read_text().splitlines()
test_logs = log[5000:5500]

In [10]:
scores = calculate_abnormal_scores(test_logs)

In [18]:
eval_df = pd.DataFrame()
eval_df['block_id'] = [re.findall('blk_-?\d+', l)[0] for l in test_logs]
eval_df['seq'] = test_logs


In [21]:
eval_df = pd.merge(eval_df, true_labels, left_on='block_id', right_on='BlockId').drop(columns=['BlockId'])

In [23]:
scores = np.array(scores)
eval_df['loss'] = scores[:, 0]
eval_df['prob'] = scores[:, 1]

In [27]:
eval_df = eval_df.rename(columns={'Label': 'is_anomaly'})

In [29]:
eval_df.sort_values(by='is_anomaly', ascending=False)

Unnamed: 0,block_id,seq,is_anomaly,loss,prob
491,blk_7523193419675083274,081109 203627 190 INFO dfs.DataNode$DataXceive...,1,3.004169,0.051241
490,blk_7523193419675083274,081109 203627 183 INFO dfs.DataNode$DataXceive...,1,3.004169,0.051241
497,blk_-6689614308409336057,081109 203627 190 INFO dfs.DataNode$DataXceive...,1,3.004169,0.051241
23,blk_8729162853545308118,081109 203624 178 INFO dfs.DataNode$DataXceive...,1,3.004169,0.051241
118,blk_7956543127401791181,081109 203624 30 INFO dfs.FSNamesystem: BLOCK*...,1,10.620078,0.000025
...,...,...,...,...,...
163,blk_-2864044183052157382,081109 203625 154 INFO dfs.DataNode$PacketResp...,0,3.004169,0.051241
162,blk_-2983481749155087856,081109 203625 34 INFO dfs.FSNamesystem: BLOCK*...,0,3.004169,0.051241
161,blk_-2983481749155087856,081109 203625 32 INFO dfs.FSNamesystem: BLOCK*...,0,10.620078,0.000025
160,blk_-2983481749155087856,081109 203625 30 INFO dfs.FSNamesystem: BLOCK*...,0,10.620078,0.000025
