<a href="https://colab.research.google.com/github/vinitakawale/LogAnalysis/blob/main/HDFS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
!git clone https://github.com/salesforce/logai.git

fatal: destination path 'logai' already exists and is not an empty directory.


In [2]:
%cd logai

/content/logai


/content/logai


In [18]:
from logai.dataloader.openset_data_loader import os, OpenSetDataLoader, OpenSetDataLoaderConfig
import json

#File Configuration
filepath = os.path.join("examples", "datasets", "HDFS_2000.log")
# filepath = os.path.join("../dc_7.log")
# filepath = "/examples/datasets/HDFS_2000.log"
# filepath = os.path.join("..", "datasets", "HDFS_2000.log")

dataset_name = "hdfs"
data_loader = OpenSetDataLoader(
    OpenSetDataLoaderConfig(
        dataset_name=dataset_name,
        filepath=filepath)
)

logrecord = data_loader.load_data()
print("logrecord", logrecord)

logrecord.to_dataframe().head(5)


logrecord LogRecordObject(timestamp=               timestamp
0    2008-11-09 20:35:18
1    2008-11-09 20:35:18
2    2008-11-09 20:35:19
3    2008-11-09 20:35:19
4    2008-11-09 20:35:19
...                  ...
1996 2008-11-09 20:35:46
1997 2008-11-09 20:35:46
1998 2008-11-09 20:35:46
1999 2008-11-09 20:35:46
2000 2008-11-09 20:35:46

[2001 rows x 1 columns], attributes=     Level
0     INFO
1     INFO
2     INFO
3     INFO
4     INFO
...    ...
1996  INFO
1997  INFO
1998  INFO
1999  INFO
2000  INFO

[2001 rows x 1 columns], resource=Empty DataFrame
Columns: []
Index: [], trace_id=Empty DataFrame
Columns: []
Index: [], span_id=     span_id
0        143
1         35
2        143
3        145
4        145
...      ...
1996     165
1997     165
1998     166
1999     166
2000     167

[2001 rows x 1 columns], severity_text=Empty DataFrame
Columns: []
Index: [], severity_number=Empty DataFrame
Columns: []
Index: [], body=                                                logline
0     dfs.Data

Unnamed: 0,logline,timestamp,Level,span_id
0,dfs.DataNode$DataXceiver: Receiving block blk_...,2008-11-09 20:35:18,INFO,143
1,dfs.FSNamesystem: BLOCK* NameSystem.allocateBl...,2008-11-09 20:35:18,INFO,35
2,dfs.DataNode$DataXceiver: Receiving block blk_...,2008-11-09 20:35:19,INFO,143
3,dfs.DataNode$DataXceiver: Receiving block blk_...,2008-11-09 20:35:19,INFO,145
4,dfs.DataNode$PacketResponder: PacketResponder ...,2008-11-09 20:35:19,INFO,145


Preprocess
In preprocessing step user can retrieve and replace any regex strings and clean the raw loglines. This can be very useful to improve information extraction of the unstructured part of logs, as well as generate more structured attributes with domain knowledge.

Here in the example, we use the below regex to retrieve IP addresses.

In [19]:
from logai.preprocess.preprocessor import PreprocessorConfig, Preprocessor
from logai.utils import constants
loglines = logrecord.body[constants.LOGLINE_NAME]
# attributes = logrecord.attributes

preprocessor_config = PreprocessorConfig(
    custom_replace_list=[
        [r"\d+\.\d+\.\d+\.\d+", "<IP>"],   # retrieve all IP addresses and replace with <IP> tag in the original string.
    ]
)

preprocessor = Preprocessor(preprocessor_config)

clean_logs, custom_patterns = preprocessor.clean_log(
    loglines
)

Parsing
After preprocessing, we call auto-parsing algorithms to automatically parse the cleaned logs.

In [20]:
from logai.information_extraction.log_parser import LogParser, LogParserConfig
from logai.algorithms.parsing_algo.drain import DrainParams

# parsing
parsing_algo_params = DrainParams(
    sim_th=0.5, depth=5
)

log_parser_config = LogParserConfig(
    parsing_algorithm="drain",
    parsing_algo_params=parsing_algo_params
)

parser = LogParser(log_parser_config)
parsed_result = parser.parse(clean_logs)

parsed_loglines = parsed_result['parsed_logline']
print("Parsed loglines:")
print(parsed_loglines)

Parsed loglines:
0       dfs.DataNode$DataXceiver: Receiving block * sr...
1       dfs.FSNamesystem: BLOCK* NameSystem.allocateBl...
2       dfs.DataNode$DataXceiver: Receiving block * sr...
3       dfs.DataNode$DataXceiver: Receiving block * sr...
4       dfs.DataNode$PacketResponder: PacketResponder ...
                              ...                        
1996    dfs.DataNode$DataXceiver: Receiving block * sr...
1997    dfs.DataNode$DataXceiver: Receiving block * sr...
1998    dfs.DataNode$DataXceiver: Receiving block * sr...
1999    dfs.DataNode$DataXceiver: Receiving block * sr...
2000    dfs.DataNode$DataXceiver: Receiving block * sr...
Name: parsed_logline, Length: 2001, dtype: object


Time-series Anomaly Detection
Here we show an example to conduct time-series anomaly detection with parsed logs.

Feature Extraction
After parsing the logs and get log templates, we can extract timeseries features by coverting these parsed loglines into counter vectors.

In [21]:
!pip install salesforce-merlion



In [22]:
import pandas as pd
from logai.information_extraction.feature_extractor import FeatureExtractorConfig, FeatureExtractor

from dateutil.parser import parse

config = FeatureExtractorConfig(
    group_by_time="15min",
    group_by_category=['parsed_logline'],
)

feature_extractor = FeatureExtractor(config)
attributes = logrecord.attributes

# # timestamps = logrecord.timestamp["timestamp"].to_datetime64()
timestamps = pd.to_datetime(logrecord.timestamp["timestamp"], format="%Y-%m-%d %H:%M:%S")
# timestamps = logrecord.timestamp["timestamp"]
parsed_loglines = parsed_result['parsed_logline']
counter_vector = feature_extractor.convert_to_counter_vector(
    log_pattern=parsed_loglines,
    attributes=attributes,
    timestamps=timestamps
)

counter_vector.head(5)


Unnamed: 0,parsed_logline,timestamp,Level,event_index,counts
0,dfs.DataNode$DataTransfer: <IP>:50010:Transmit...,2008-11-09 20:30:00,"[INFO, INFO, INFO, INFO]","[24, 72, 79, 525]",4
1,dfs.DataNode$DataXceiver: <IP>:50010 Served bl...,2008-11-09 20:30:00,"[INFO, INFO, INFO, INFO, INFO, INFO, INFO, INF...","[46, 47, 64, 69, 70, 71, 80, 81, 82, 83, 84, 8...",404
2,dfs.DataNode$DataXceiver: Received block blk_-...,2008-11-09 20:30:00,"[INFO, INFO, INFO, INFO, INFO, INFO, INFO]","[16, 25, 65, 66, 77, 118, 428]",7
3,dfs.DataNode$DataXceiver: Receiving block * sr...,2008-11-09 20:30:00,"[INFO, INFO, INFO, INFO, INFO, INFO, INFO, INF...","[0, 2, 3, 13, 14, 17, 18, 19, 26, 37, 38, 41, ...",1150
4,dfs.DataNode$PacketResponder: PacketResponder ...,2008-11-09 20:30:00,"[INFO, INFO, INFO, INFO, INFO, INFO, INFO, INF...","[4, 5, 8, 20, 22, 27, 39, 42, 44, 54, 56, 58]",12


Anomaly Detection
With the generated counter_vcetor, you can use AnomalyDetector to detect timeseries anomalies. Here we use an algorithm in Merlion library called DynamicBaseLine.

In [23]:
!pip install datasets



In [24]:
from logai.analysis.anomaly_detector import AnomalyDetector, AnomalyDetectionConfig
from sklearn.model_selection import train_test_split
import pandas as pd

counter_vector["attribute"] = counter_vector.drop(
                [
                    constants.LOG_COUNTS,
                    constants.LOG_TIMESTAMPS,
                    constants.EVENT_INDEX
                ],
                axis=1
            ).apply(
                lambda x: "-".join(x.astype(str)), axis=1
            )

attr_list = counter_vector["attribute"].unique()
anomaly_detection_config = AnomalyDetectionConfig(
    algo_name='dbl'
)

res = pd.DataFrame()
print("attr_list", attr_list)
for attr in attr_list:
    print("attr", attr)
    print("counter_vector", counter_vector["attribute"])
    temp_df = counter_vector[counter_vector["attribute"] == attr]
    print("temp_df", temp_df)
    if temp_df.shape[0] >= constants.MIN_TS_LENGTH:
        train, test = train_test_split(
            temp_df[[constants.LOG_TIMESTAMPS, constants.LOG_COUNTS]],
            shuffle=False,
            train_size=0.3
        )
        print('train', train.shape)
        print('test', test.shape)
        anomaly_detector = AnomalyDetector(anomaly_detection_config)

        anomaly_detector.fit(train)
        anom_score = anomaly_detector.predict(test)
        res = res.append(anom_score)


attr_list ["dfs.DataNode$DataTransfer: <IP>:50010:Transmitted block blk_-1608999687919862906 to /<IP>:50010-['INFO', 'INFO', 'INFO', 'INFO']"
 "dfs.DataNode$DataXceiver: <IP>:50010 Served block * to /<IP>-['INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'INFO', 'I

In [25]:
# Get anomalous datapoints
anomalies = counter_vector.iloc[res[res>0].index]
anomalies.head(5)

Unnamed: 0,parsed_logline,timestamp,Level,event_index,counts,attribute



Semantic Anomaly Detection
We can also use the log template for semantic based anomaly detection. In this approach, we retrieve the semantic features from the logs. This includes two parts: vectorizing the unstructured log templates and encoding the structured log attributes.

Vectorization for unstructured loglines
Here we use word2vec to vectorize unstructured part of the logs. The output will be a list of numeric vectors that representing the semantic features of these log templates.

In [26]:
import nltk
nltk.download('punkt')
from logai.information_extraction.log_vectorizer import VectorizerConfig, LogVectorizer

vectorizer_config = VectorizerConfig(
    algo_name = "word2vec"
)

vectorizer = LogVectorizer(
    vectorizer_config
)

# Train vectorizer
vectorizer.fit(parsed_loglines)

# Transform the loglines into features
log_vectors = vectorizer.transform(parsed_loglines)
log_vectors.to_csv('../log_vectors.csv')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Categorical Encoding for log attributes
We also do categorical encoding for log attributes to convert the strings into numerical representations.

In [27]:
from logai.information_extraction.categorical_encoder import CategoricalEncoderConfig, CategoricalEncoder

encoder_config = CategoricalEncoderConfig(name="label_encoder")

encoder = CategoricalEncoder(encoder_config)

attributes_encoded = encoder.fit_transform(attributes)

Feature Extraction
Then we extract and concate the semantic features for both the unstructured and structured part of logs.

In [28]:
from logai.information_extraction.feature_extractor import FeatureExtractorConfig, FeatureExtractor

timestamps = logrecord.timestamp['timestamp']

config = FeatureExtractorConfig(
    max_feature_len=100
)

feature_extractor = FeatureExtractor(config)

_, feature_vector = feature_extractor.convert_to_feature_vector(log_vectors, attributes_encoded, timestamps)

  block_list = gb.mean().reset_index()


Anomaly Detection
With the extracted log semantic feature set, we can perform anomaly detection to find the abnormal logs. Here we use isolation_forest as an example.

In [29]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(feature_vector, train_size=0.7, test_size=0.3)

from logai.algorithms.anomaly_detection_algo.isolation_forest import IsolationForestParams
from logai.analysis.anomaly_detector import AnomalyDetectionConfig, AnomalyDetector

algo_params = IsolationForestParams(
    n_estimators=10,
    max_features=100
)
config = AnomalyDetectionConfig(
    algo_name='isolation_forest',
    algo_params=algo_params
)

anomaly_detector = AnomalyDetector(config)
anomaly_detector.fit(train)
res = anomaly_detector.predict(test)
# obtain the anomalous datapoints
anomalies = res[res==1]



In [30]:
#Check the corresponding loglines
# print(loglines)
df = loglines.iloc[anomalies.index]
df.to_csv('../anomaliesHDFS.csv')

In [31]:
#Check the corresponding attributes
attributes.iloc[anomalies.index].head()

Unnamed: 0,Level
1316,INFO
1222,INFO
409,INFO
429,INFO
1987,INFO
