### Log Anomaly Detection on HDFS Dataset using LSTM based models
This is a running example of a Log Anomaly Detection on public dataset HDFS. 

In `logai/applications/openset/anomaly_detection/openset_anomaly_detection_workflow.py` you can find an automated way of running the end-to-end log anomaly detection pipeline on some of the standard open log datasets (HDFS, BGL) on a variety of experimental configurations and using different log anomaly detection models


In [1]:
import os 
from logai.applications.openset.anomaly_detection.openset_anomaly_detection_workflow import OpenSetADWorkflowConfig, validate_config_dict
from logai.utils.file_utils import read_file
from logai.utils.dataset_utils import split_train_dev_test_for_anomaly_detection
import logging 
from logai.dataloader.data_loader import FileDataLoader
from logai.preprocess.hdfs_preprocessor import HDFSPreprocessor
from logai.information_extraction.log_parser import LogParser
from logai.preprocess.openset_partitioner import OpenSetPartitioner
from logai.analysis.nn_anomaly_detector import NNAnomalyDetector
from logai.information_extraction.log_vectorizer import LogVectorizer
from logai.utils import constants

### Loading config from yaml 
LogAI allows loading config from yaml file. Check out hdfs_lstm_unsupervised_parsed_sequential_config.yaml file to see the entire workflow config 

This notebook follows the configuration where we apply log parsing, take sequential log features and apply unsupervised anomaly detection using LSTM mdoels. 

For other configurations(for the same dataset), like parse-free workflow or applying semantic log features or applying supervised anomaly detection or using CNN or Transformer or LogBERT other neural model would work perfectly with this workflow code itself. The only change needed is the config yaml file. Check out some of the other config files provided in the `configs` dir corresponding to those settings 
- `hdfs_lstm_unsupervised_nonparsed_sequential_config.yaml`
- `hdfs_lstm_unsupervised_parsed_semantic_config.yaml`
- `hdfs_lstm_supervised_parsed_sequential_config.yaml`
- `hdfs_logbert_config.yaml`

Since the processing is a little heavy and mostly for illustration purposes, we will also be dumping intermediate outputs of this notebook in the `output_dir` mentioned in the config yaml file (by default it would be a `temp_dir` in this folder). You can purge the directory whenever u need 

In [2]:
config_path = "configs/hdfs_lstm_unsupervised_parsed_sequential_config.yaml"
config_parsed = read_file(config_path)
config_dict = config_parsed["workflow_config"]
validate_config_dict(config_dict)
config = OpenSetADWorkflowConfig.from_dict(config_dict)

### Load Data using FileDataLoader 
FileDataLoader takes file input (.csv or .log or other unspecified format) and loads the data as a LogRecordObject. Most of the remaining components in the pipeline uses this LogRecordObject as input/output

In [3]:
dataloader = FileDataLoader(config.data_loader_config)
logrecord = dataloader.load_data()
print (logrecord.body[constants.LOGLINE_NAME])

0       Receiving block blk_-1608999687919862906 src: ...
1       BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...
2       Receiving block blk_-1608999687919862906 src: ...
3       Receiving block blk_-1608999687919862906 src: ...
4       PacketResponder 1 for block blk_-1608999687919...
                              ...                        
4514    Deleting block blk_-2126554733521224025 file /...
4515    Deleting block blk_-66330728533676520 file /mn...
4516    Deleting block blk_872694497849122755 file /mn...
4517    Deleting block blk_3947106522258141922 file /m...
4518    Deleting block blk_-774246298521956028 file /m...
Name: logline, Length: 4519, dtype: object


### Preprocess loaded data using dataset specific Preprocessor (HDFSPreprocessor)
Each dataset has its own Preprocessor class (e.g. HDFSPreprocessor for HDFS), which is inherited from from `logai.preprocess.preprocessor.Preprocessor`. 

Similarly if you want to add your custom dataset you also need to add the CustomPreprocessor object. The main public method of preprocessor is `clean_log` whose input/output is a LogRecordObject

This is the only step in the entire pipeline that is dataset-specific. 

In [4]:
preprocessor = HDFSPreprocessor(config.preprocessor_config, config.label_filepath)
preprocessed_filepath = os.path.join(config.output_dir, 'HDFS_5k_processed.csv')            
logrecord = preprocessor.clean_log(logrecord)
logrecord.save_to_csv(preprocessed_filepath)
print (logrecord.body[constants.LOGLINE_NAME])

0           Receiving block BLOCK src IP INT dest IP INT 
1       BLOCK NameSystem.allocateBlock /mnt/hadoop/map...
2           Receiving block BLOCK src IP INT dest IP INT 
3           Receiving block BLOCK src IP INT dest IP INT 
4         PacketResponder INT for block BLOCK terminating
                              ...                        
4514    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4515    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4516    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4517    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4518    Deleting block BLOCK file /mnt/hadoop/dfs/data...
Name: logline, Length: 4519, dtype: object


### Parsing unstructured log data using LogParser
If the `parse_logline` of OpenSetADWorkflowConfig config object is True, then the preprocessed log data is parsed using the parsing algorithm and hyperparameters mentioned in the confi yaml file 

The input and output of the LogParser is the LogRecordObject

In [5]:
parser = LogParser(config.log_parser_config)
parsed_result = parser.parse(logrecord.body[constants.LOGLINE_NAME])
logrecord.body[constants.LOGLINE_NAME] = parsed_result[constants.PARSED_LOGLINE_NAME]
parsed_filepath = os.path.join(config.output_dir, 'HDFS_5k_parsed.csv')
logrecord.save_to_csv(parsed_filepath)
print (logrecord.body[constants.LOGLINE_NAME])

0            Receiving block BLOCK src IP INT dest IP INT
1       BLOCK NameSystem.allocateBlock /mnt/hadoop/map...
2            Receiving block BLOCK src IP INT dest IP INT
3            Receiving block BLOCK src IP INT dest IP INT
4         PacketResponder INT for block BLOCK terminating
                              ...                        
4514    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4515    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4516    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4517    Deleting block BLOCK file /mnt/hadoop/dfs/data...
4518    Deleting block BLOCK file /mnt/hadoop/dfs/data...
Name: logline, Length: 4519, dtype: object


### Partitioning or segmenting the log data using the OpenSetPartitioner
The next step in the log anomaly detection pipeline involves partitioning the log-lines into segments. 

This is particularly important for Sequence based Anomaly Detection models which take sequences of loglines as input and learns the representation at the sequence-level in order to detect anomaly patterns at the level of individual loglines as well as log sequences. 

In [6]:
partitioner = OpenSetPartitioner(config.open_set_partitioner_config)
partitioned_filepath = os.path.join(config.output_dir, 'HDFS_5k_parsed_sliding10.csv')
logrecord = partitioner.partition(logrecord)
logrecord.save_to_csv(partitioned_filepath)
print (logrecord.body[constants.LOGLINE_NAME])

  for group_values, data in logrecord_df.groupby(grouper):
  for group_values, data in logrecord_df.groupby(grouper):


0       Receiving block BLOCK src IP INT dest IP INT[S...
1       Receiving block BLOCK src IP INT dest IP INT[S...
2       BLOCK NameSystem.allocateBlock /mnt/hadoop/map...
3       Receiving block BLOCK src IP INT dest IP INT[S...
4       Receiving block BLOCK src IP INT dest IP INT[S...
                              ...                        
3417    IP INT Got exception while serving BLOCK to IP...
3418    IP INT Got exception while serving BLOCK to IP...
3419    IP INT Got exception while serving BLOCK to IP...
3420    IP INT Got exception while serving BLOCK to IP...
3421    IP INT Got exception while serving BLOCK to IP...
Name: logline, Length: 3422, dtype: object


### Creating Train, Dev, Test splits
This step splits the log dataset in form of a LogRecordObject) and generates LogRecordObject objects for each of train, dev and test splits. 

In [7]:
train_filepath = os.path.join(config.output_dir, 'HDFS_5k_parsed_sliding10_unsupervised_train.csv')
dev_filepath = os.path.join(config.output_dir, 'HDFS_5k_parsed_sliding10_unsupervised_dev.csv')
test_filepath = os.path.join(config.output_dir, 'HDFS_5k_parsed_sliding10_unsupervised_test.csv')

(train_data, dev_data, test_data) = split_train_dev_test_for_anomaly_detection(
                logrecord,training_type=config.training_type,
                test_data_frac_neg_class=config.test_data_frac_neg,
                test_data_frac_pos_class=config.test_data_frac_pos,
                shuffle=config.train_test_shuffle
            )

train_data.save_to_csv(train_filepath)
dev_data.save_to_csv(dev_filepath)
test_data.save_to_csv(test_filepath)
print ('Train/Dev/Test Anomalous', len(train_data.labels[train_data.labels[constants.LABELS]==1]), 
                                   len(dev_data.labels[dev_data.labels[constants.LABELS]==1]), 
                                   len(test_data.labels[test_data.labels[constants.LABELS]==1]))
print ('Train/Dev/Test Normal', len(train_data.labels[train_data.labels[constants.LABELS]==0]), 
                                   len(dev_data.labels[dev_data.labels[constants.LABELS]==0]), 
                                   len(test_data.labels[test_data.labels[constants.LABELS]==0]))

indices_train/dev/test:  2242 252 984
Train/Dev/Test Anomalous 0 0 424
Train/Dev/Test Normal 2242 252 560


### Vectorizing Log data into Log Features using LogVectorizer
This step transforms the raw log (text) data into machine-readable vectors. 

For most of the neural anomaly detectors in this library, the exact nature of log vectorization to be applied might be tightly bound to the type of anomaly detector applied (for e.g. for LogBERT model, the corresponding LogBERT vectorizer has to be applied). 

For the more traditional statistical ML based anomaly detectors (which typically take the input features in form of pandas DataFrame object), you can apply any of the available vectorizers or implement a custom convertor before giving the vectorizer's output to the anomaly detector.

For neural anomaly detection models (LSTM, Transformer, CNN) there are two types of LogVectorizers available - Sequential and Semantics which repectively represent log sequences as sequence of log-ids (i.e. ids of loglines) or sequences of token ids.  

- `fit` method of logVectorizer is for any tailoring of the logVectorizer model towards the given log training dataset (e.g. a Word2Vec or FastText based logVectorizer model might need to be trained on the given log training data or simply pruning and constructing a vocabulary for the log data from the standard pretrained Word2Vec model

- `transform` method of logVectorizer transforms the raw log data (LogRecordObject) to log features (whose data-type is govered by the logVectorizer class, for e.g. logBERT based logVectorizer generates an output of HFDataset (HuggingFace Dataset object) type

In [8]:
vectorizer = LogVectorizer(config.log_vectorizer_config)
vectorizer.fit(train_data)
train_features = vectorizer.transform(train_data)
dev_features = vectorizer.transform(dev_data)
test_features = vectorizer.transform(test_data)

### Training Neural Anomaly Detection model on log features using NNAnomalyDetector
This step constructs a neural anomaly detector model (for e.g. lstm based sequence anomaly detector which is a type of a Forecast based neural anomaly detector. It can be used in two modes 

- Primarily as an unsupervised learning model, where it learns to forecast the id of the next log-line, given an input log-sequence. During inference, based on the model's confidence in the forecasting for a given log sequence, the model infers whether the input is anomalous or not. In the unsupervised mode, it assumes that the training data is entirely consisting of normal log sequence. Presence of any anomalous log sequence is permissible but will degrade the model performance

- It can also be used in the standard supervised learning mode, where given a log sequence as input, the model learns to directly predict the label (anomalous or not). In this mode the model needs to see both normal and anomalous logs during training. In many situations the supervised model can actually perform somewhat worser than unsupervised version, because of overfitting or bias towards anomaly patterns seen during training, especially if there are only rare occurrences of anomalies in the train data.

In this example we are using the unsupervised mode of the LSTM based Anomaly Detector

In [9]:
anomaly_detector = NNAnomalyDetector(config=config.nn_anomaly_detection_config)
anomaly_detector.fit(train_features, dev_features)

INFO:root:Start training on 561 batches with cpu.
INFO:root:Batch 100, training loss : 2.5444133508205415
INFO:root:Batch 200, training loss : 2.2127846387028693
INFO:root:Batch 300, training loss : 1.8919825654228528
INFO:root:Batch 400, training loss : 1.657002859003842
INFO:root:Batch 500, training loss : 1.4728448408097028
INFO:root:Epoch 1/10, training loss: 1.3847237395225573 [4.745373964309692s]
INFO:root:Evaluating dev data.
INFO:root:Dev Loss: 0.6193237191154843
INFO:root:Dev acc @ top-1: 0.8611111111111112  correct: 217 out of 252
INFO:root:Saving model to temp_output/HDFS_5k_parsed_session_supervised_AD/model_lstm/model.ckpt
INFO:root:Batch 100, training loss : 0.8119356613606215
INFO:root:Batch 200, training loss : 0.6999477495253086
INFO:root:Batch 300, training loss : 0.6303683702647686
INFO:root:Batch 400, training loss : 0.5877456868812442
INFO:root:Batch 500, training loss : 0.5587526946216822
INFO:root:Epoch 2/10, training loss: 0.5479684943422914 [5.022781848907471s]

### Predicting Anomalous Log Sequences using the trained NNAnomalyDetector 
This step simply runs the predict method of the NNAnomalyDetctor, which takes the test log features and returns the prediction results. 

Currently the prediction results are raw python objects - For e.g. in this case, it returns a dict object which maintains some of the standard self-explanatory metrics (like F1, precision, recall) as well as a vector of instance-wise predictions and true labels. For other NNAnomalyDetector models, the data-type of the predict_results object might be different (for eg. for LogBERT model, the predict_results object returned is a pandas DataFrame consisting of the prediction results and evaluation metrics)

In [10]:
predict_results = anomaly_detector.predict(test_features)
print (predict_results)

INFO:root:Evaluating test data.
INFO:root:Finish inference. [0.36537694931030273s]
INFO:root:Calculating acc sum.
INFO:root:Finish generating store_df.
INFO:root:Finish counting [0.06994795799255371s]
INFO:root:Best result: f1: 1.0 rc: 1.0 pc: 1.0


{'f1': 1.0, 'rc': 1.0, 'pc': 1.0, 'pred': 0     1
1     1
2     1
3     1
4     1
5     0
6     0
7     0
8     0
9     0
10    0
11    0
12    0
13    0
14    0
15    0
16    1
17    0
18    0
19    0
20    0
21    0
22    0
23    0
24    1
25    1
26    0
27    1
28    1
29    0
Name: window_pred_anomaly_10, dtype: int64, 'true': 0     1
1     1
2     1
3     1
4     1
5     0
6     0
7     0
8     0
9     0
10    0
11    0
12    0
13    0
14    0
15    0
16    1
17    0
18    0
19    0
20    0
21    0
22    0
23    0
24    1
25    1
26    0
27    1
28    1
29    0
Name: window_anomalies, dtype: int64}
