# Squential Event Feature Extraction

### Hadoop Distributed File System
The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware.

Log Structure | Description
--- | ---
yy/MM/dd | date
hh/mm/ss | time
_ _ _ _ | a 2-4 digit code (not sure exactly what it represents)
INFO/WARN/DEBUG/ERROR/etc. | type of the logging event and priority
dfs.____:_____ | general source of logging event
\_____\<blk id\>_____ | log message

Example:
- `081109 203807 222 INFO dfs.DataNode$PacketResponder: PacketResponder 0 for block blk_-6952295868487656571 terminating`


### Import Libraries

In [1]:
# parsing and wrangling
import pandas as pd
import numpy as np
import regex as re

# misc
from datetime import datetime as dt
import itertools

## HDFS Feature Engineering

In the future, incorporating the time between sequential log events will be considered as it may be a useful feature for improving anomaly detection.

In [7]:
## import raw log data

# full set
raw = pd.read_csv('All-Data/HDFS/Raw/HDFS.log', header=None, sep='\n')[0]

# sample set
#raw = pd.read_csv('../All-Data/HDFS/Raw/HDFS_2k.log', header=None, sep='\n')[0]

raw.head()

0    081109 203615 148 INFO dfs.DataNode$PacketResp...
1    081109 203807 222 INFO dfs.DataNode$PacketResp...
2    081109 204005 35 INFO dfs.FSNamesystem: BLOCK*...
3    081109 204015 308 INFO dfs.DataNode$PacketResp...
4    081109 204106 329 INFO dfs.DataNode$PacketResp...
Name: 0, dtype: object

In [22]:
## import anomaly labels

labels = pd.read_csv('../All-Data/HDFS/Raw/anomaly_label.csv')
labels.Label = [1 if x == "Anomaly" else 0 for x in labels.Label]

length = len(labels)
anomalies = len(labels[labels.Label == 1])

print('Length labels: ', length)
print('Anomalies: ', anomalies)
print('% Anomalous: ', round(anomalies/length*100, 2),'%')

labels.head()

Length labels:  575061
Anomalies:  16838
% Anomalous:  2.93 %


Unnamed: 0,BlockId,Label
0,blk_-1608999687919862906,0
1,blk_7503483334202473044,0
2,blk_-3544583377289625738,1
3,blk_-9073992586687739851,0
4,blk_7854771516489510256,0


In [27]:
## parse for block IDs

# extract ids from log in the correct event order
blocks_in_order = raw.str.findall(r'(blk_.[\d]*)')
unlist_vectorized = np.vectorize(lambda x: x[0])
blocks_in_order = pd.Series(unlist_vectorized(blocks_in_order))

# label the extracted block ids via a conversion dictionary
binarizer = dict(zip(labels.BlockId, labels.Label))
binarizer_vectorized = np.vectorize(lambda x: binarizer[x])
blocks_binarized = pd.Series(binarizer_vectorized(blocks_in_order))

labeled_blks = pd.DataFrame({'blkID':blocks_in_order, 'anomaly':blocks_binarized})
# checkpoint
#labeled_blks.to_feather('Data/labeled_blks.feather')

print('Unique blocks in Log File: ', len(blocks_in_order.unique()))
print('Anomalous blocks in the sample Log File: ', binarizer_vectorized(blocks_in_order.unique()).sum()) 

labeled_blks.head()


Unique blocks in Log File:  1994
Anomalous blocks in the sample Log File:  68


Unnamed: 0,blkID,anomaly
0,blk_38865049064139660,0
1,blk_-6952295868487656571,0
2,blk_7128370237687728475,0
3,blk_8229193803249955061,0
4,blk_-6670958622368987959,0


In [28]:
## extract raw messages

full_msg = raw.str.extract(r'((?<=:\s).*)')[0]
full_msg.head()

# checkpoint
#full_msg.to_csv('Data/full_msg.csv', index=None, header=None)
#full_msg = pd.read_csv('Data/full_msg.csv', index_col=False, header=None, squeeze=True)


0    PacketResponder 1 for block blk_38865049064139...
1    PacketResponder 0 for block blk_-6952295868487...
2    BLOCK* NameSystem.addStoredBlock: blockMap upd...
3    PacketResponder 2 for block blk_82291938032499...
4    PacketResponder 2 for block blk_-6670958622368...
Name: 0, dtype: object

In [30]:
## remove all unique event identifiers (block ids) to obtain a general message structure

# split log into chunks for RAM friendly processing
n_chunks = 200
chunked_msgs = np.array_split(full_msg, n_chunks)

# vectorized function to join key words from list to str
toSentence = np.vectorize(lambda x: " ".join(x))

# for storing generalized sentences
str_msgs = pd.Series(dtype='object')

for chunk in range(n_chunks):
    # extract and join key words without IDs
    sentences = pd.Series(toSentence(chunked_msgs[chunk].str.findall(r'([A-Za-z]+)')))
    str_msgs = pd.concat([str_msgs, sentences])
    # save RAM
    del sentences

print('Messages extracted: ', len(str_msgs))

Messages extracted:  2000


In [31]:
## convert general message structure to numeric

coded_msgs = pd.Categorical(str_msgs).codes 

print('Unique Message Types: ', len(pd.Series(coded_msgs).unique()))

coded_msgs

Unique Message Types:  19


array([12, 12,  0, ..., 15, 12, 15], dtype=int8)

In [36]:
## create final sequetial feature

# earlier checkpoint
#labeled_blks = pd.read_feather('Data/labeled_blks.feather')

# block ID and event codes frame
blk_events = pd.DataFrame({'blk_ID': labeled_blks.blkID, 'msg_code':coded_msgs})

# groupby by block ID to create event sequences 
blk_event_sequences = blk_events.groupby('blk_ID')['msg_code'].apply(list).reset_index(name='sequence')

# add anomaly labels using a vectorized labelling dictionary
blk_key = dict(labels.values)
vectorized_anom_labeler = np.vectorize(lambda x: blk_key[x])
blk_event_sequences['anomaly'] = pd.Series(vectorized_anom_labeler(blk_event_sequences.blk_ID))

# export the feature
blk_event_sequences.to_csv('../Data/blk_event_sequences.csv', index = False, header = True)

blk_event_sequences.head()

Unnamed: 0,blk_ID,sequence,anomaly
0,blk_-1030832046197982436,[4],0
1,blk_-1046472716157313227,[10],0
2,blk_-1049340855430710153,[12],0
3,blk_-1055254430948037872,[0],0
4,blk_-1067234447809438340,[12],0


In [41]:
## perform a quick check

labels = pd.read_csv('../All-Data/HDFS/Raw/anomaly_label.csv')
labels.Label = [1 if x == "Anomaly" else 0 for x in labels.Label]

# how many block IDs in the final dataset?
target_row_count = len(labels)
final_row_count = len(blk_event_sequences)

# how many anomalies in the final dataset?
original_anomaly_count = len(labels[labels.Label==1])
final_anomaly_count = len(blk_event_sequences[blk_event_sequences.anomaly==1])


# if the numbers don't match two things may be happening:
# 1) something went wrong earlier
# 2) you're using the sample dataset

print(f'Total blocks present {final_row_count} of {target_row_count}')
print(f'Total anomalies present {final_anomaly_count} of {original_anomaly_count}')

Total blocks present 1994 of 575061
Total anomalies present 68 of 16838
