## 1. Importing the relavent functions to the code

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import re

%matplotlib inline

### 2. Reading the logs as well as the event IDs

In [2]:
path_to_file = 'HDFS.log'
eid_df = pd.read_csv('HDFS.log_templates.csv')

with open(path_to_file) as input_file:
    # hdfs_head = [next(input_file) for _ in range(300000)]
    hdfs_head = input_file.readlines()
    # hdfs = input_file.readlines()



### 3. Formatting the event IDs for regex

In [3]:

eid_df['regex_pattern'] = (
    eid_df['EventTemplate']
    .str.replace(']', '', regex=False)
    .str.replace('[', '', regex=False)
    .str.replace('*', '(.*?)', regex=False)
)

eid_df

Unnamed: 0,EventId,EventTemplate,regex_pattern
0,E1,[*]Adding an already existing block[*],(.*?)Adding an already existing block(.*?)
1,E2,[*]Verification succeeded for[*],(.*?)Verification succeeded for(.*?)
2,E3,[*]Served block[*]to[*],(.*?)Served block(.*?)to(.*?)
3,E4,[*]Got exception while serving[*]to[*],(.*?)Got exception while serving(.*?)to(.*?)
4,E5,[*]Receiving block[*]src:[*]dest:[*],(.*?)Receiving block(.*?)src:(.*?)dest:(.*?)
5,E6,[*]Received block[*]src:[*]dest:[*]of size[*],(.*?)Received block(.*?)src:(.*?)dest:(.*?)of ...
6,E7,[*]writeBlock[*]received exception[*],(.*?)writeBlock(.*?)received exception(.*?)
7,E8,[*]PacketResponder[*]for block[*]Interrupted[*],(.*?)PacketResponder(.*?)for block(.*?)Interru...
8,E9,[*]Received block[*]of size[*]from[*],(.*?)Received block(.*?)of size(.*?)from(.*?)
9,E10,[*]PacketResponder[*]Exception[*],(.*?)PacketResponder(.*?)Exception(.*?)


### 4. Parsing the log file using regex to create dataframe

In [4]:
df_raw = pd.DataFrame(hdfs_head, columns=['raw_log'])

# # Regex pattern
regex_pattern = r'^(?P<Date>\d{6})\s+(?P<Time>\d{6})\s+(?P<Pid>\d+)\s+(?P<Level>\w+)\s+(?P<Component>[^:]+):\s+(?P<Content>.*)$'

# # Seperating into date, time logs
df_base = df_raw['raw_log'].str.extract(regex_pattern)

df_base.head()

Unnamed: 0,Date,Time,Pid,Level,Component,Content
0,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
1,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...
2,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
3,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
4,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...


### 5. Using Regex to get event ID for each log entry

In [5]:
# 1. Combine all patterns into one giant regex using named groups
# Format: (?P<E1>pattern1)|(?P<E2>pattern2)|...

combined_regex_str = "|".join([f"(?P<{eid}>{pat})" for eid, pat in zip(eid_df['EventId'], eid_df['regex_pattern'])])
combined_pattern = re.compile(combined_regex_str)

# 2. Define a function to extract the name of the group that matched
def get_event_id_fast(content):
    match = combined_pattern.search(content)
    if match:
        # lastgroup returns the name of the named group that participated in the match
        return match.lastgroup
    return None

# 3. Apply to the dataframe
df_base['EventId'] = df_base['Content'].apply(get_event_id_fast)

### 6. Getting the block ID as well as counting number of events per block id

In [6]:
df_base['BlockId'] = df_base['Content'].str.extract(r'(blk_-?\d+)')

# Generate all Events into columns
e_matrix = pd.crosstab(df_base['BlockId'], df_base['EventId'])
all_event_ids = [f'E{i}' for i in range(1, 30)]
e_matrix = e_matrix.reindex(columns=all_event_ids, fill_value=0)

e_matrix = e_matrix.reset_index().fillna(0)

print(e_matrix.head())

EventId                   BlockId  E1  E2  E3  E4  E5  E6  E7  E8  E9  ...  \
0        blk_-1000002529962039464   0   0   0   0   3   0   0   0   3  ...   
1         blk_-100000266894974466   0   0   6   3   3   0   0   0   3  ...   
2        blk_-1000007292892887521   0   0   0   0   3   0   0   0   3  ...   
3        blk_-1000014584150379967   0   1   6   3   3   0   0   0   3  ...   
4        blk_-1000028658773048709   0   0   0   0   3   0   0   0   3  ...   

EventId  E20  E21  E22  E23  E24  E25  E26  E27  E28  E29  
0          0    0    1    0    0    0    3    0    0    0  
1          0    3    1    3    0    0    3    0    0    0  
2          0    0    1    0    0    0    3    0    0    0  
3          0    3    1    3    0    0    3    0    0    0  
4          0    3    1    3    0    0    3    0    0    0  

[5 rows x 30 columns]


In [8]:
# Get Highest Level per blockId
block_severity = df_base.groupby('BlockId', as_index=False)['Level'].max()
block_severity.rename(columns={'Level': 'MaxLevel'}, inplace=True)

print(block_severity.head())

custom_map = {'INFO': 1, 'WARN': 2, 'ERROR': 3, 'FATAL': 100}
block_severity['MaxLevelNum'] = block_severity['MaxLevel'].map(custom_map)
print(block_severity.head())

# Merge with highest level per block
e_matrix = pd.merge(e_matrix, block_severity, on='BlockId', how='inner')

e_matrix.to_csv('test_occurrence.csv')

print(e_matrix.head())

                    BlockId MaxLevel
0  blk_-1000002529962039464     INFO
1   blk_-100000266894974466     WARN
2  blk_-1000007292892887521     INFO
3  blk_-1000014584150379967     WARN
4  blk_-1000028658773048709     INFO
                    BlockId MaxLevel  MaxLevelNum
0  blk_-1000002529962039464     INFO            1
1   blk_-100000266894974466     WARN            2
2  blk_-1000007292892887521     INFO            1
3  blk_-1000014584150379967     WARN            2
4  blk_-1000028658773048709     INFO            1
                    BlockId  E1  E2  E3  E4  E5  E6  E7  E8  E9  ...  E24  \
0  blk_-1000002529962039464   0   0   0   0   3   0   0   0   3  ...    0   
1   blk_-100000266894974466   0   0   6   3   3   0   0   0   3  ...    0   
2  blk_-1000007292892887521   0   0   0   0   3   0   0   0   3  ...    0   
3  blk_-1000014584150379967   0   1   6   3   3   0   0   0   3  ...    0   
4  blk_-1000028658773048709   0   0   0   0   3   0   0   0   3  ...    0   

   E25  E26  E2

### 7. Training and testing the cleaned data provided to have a model

In [14]:
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

# declaring some var
scaled = StandardScaler()
tree = DecisionTreeClassifier(max_depth=5)

train_df = pd.read_csv('Event_occurrence_matrix.csv')

# splitting data
x = train_df.drop(['BlockId','Label','Type'],axis=1)
y = train_df.Label
xtrain, xtest, ytrain, ytest = train_test_split(x,y,test_size=0.2,random_state=33)

# Training data
tree.fit(xtrain, ytrain)
y_pred_tree = tree.predict(xtest)

# Testing data
print(confusion_matrix(ytest, y_pred_tree))
print(classification_report(ytest, y_pred_tree))

[[  3387     15]
 [    18 111593]]
              precision    recall  f1-score   support

        Fail       0.99      1.00      1.00      3402
     Success       1.00      1.00      1.00    111611

    accuracy                           1.00    115013
   macro avg       1.00      1.00      1.00    115013
weighted avg       1.00      1.00      1.00    115013



In [19]:
from sklearn.tree import export_text

# Print the rules the model is using
m = tree.predict(e_matrix.drop(['BlockId','MaxLevel','MaxLevelNum'],axis=1))

tree_rules = export_text(tree, feature_names=list(x.columns))
print(tree_rules)

|--- E9 <= 2.50
|   |--- class: Fail
|--- E9 >  2.50
|   |--- E20 <= 0.50
|   |   |--- E26 <= 3.50
|   |   |   |--- E27 <= 0.50
|   |   |   |   |--- E13 <= 1.50
|   |   |   |   |   |--- class: Success
|   |   |   |   |--- E13 >  1.50
|   |   |   |   |   |--- class: Fail
|   |   |   |--- E27 >  0.50
|   |   |   |   |--- E4 <= 9.50
|   |   |   |   |   |--- class: Fail
|   |   |   |   |--- E4 >  9.50
|   |   |   |   |   |--- class: Success
|   |   |--- E26 >  3.50
|   |   |   |--- E23 <= 1.00
|   |   |   |   |--- E29 <= 0.50
|   |   |   |   |   |--- class: Success
|   |   |   |   |--- E29 >  0.50
|   |   |   |   |   |--- class: Fail
|   |   |   |--- E23 >  1.00
|   |   |   |   |--- E2 <= 3.50
|   |   |   |   |   |--- class: Fail
|   |   |   |   |--- E2 >  3.50
|   |   |   |   |   |--- class: Success
|   |--- E20 >  0.50
|   |   |--- E3 <= 7.50
|   |   |   |--- E23 <= 4.50
|   |   |   |   |--- class: Fail
|   |   |   |--- E23 >  4.50
|   |   |   |   |--- class: Success
|   |   |--- E3 >  7

### 8.Miscellaneous

### 9.New Way to format the data

In [50]:
# format data to have timestamp column and interval

df_base['Timestamp'] = pd.to_datetime(
    df_base['Date'] + ' ' + df_base['Time'], 
    format='%y%m%d %H%M%S'
)

df_base.head()

Unnamed: 0,Date,Time,Pid,Level,Component,Content,EventId,BlockId,Timestamp
0,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,blk_-1608999687919862906,2008-11-09 20:35:18
1,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,E22,blk_-1608999687919862906,2008-11-09 20:35:18
2,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,blk_-1608999687919862906,2008-11-09 20:35:19
3,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,E5,blk_-1608999687919862906,2008-11-09 20:35:19
4,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,E11,blk_-1608999687919862906,2008-11-09 20:35:19


### 9. Formatting the data before export by comparing with provided dataset

In [51]:
# 2. Sort by BlockId and then Timestamp to ensure the sequence is chronological
df_base = df_base.sort_values(['BlockId', 'Timestamp'])

# 3. Aggregate EventIds into a list and calculate time intervals
def get_sequences(group):
    # Get the sequence of Event IDs
    features = group['EventId'].tolist()
    
    # Calculate difference between consecutive timestamps in seconds
    intervals = group['Timestamp'].diff().dt.total_seconds().fillna(0).tolist()
    
    # Sum of all intervals
    latency = sum(intervals)
    
    return pd.Series({
        'Features': features,
        'TimeIntervals': intervals,
        'Latency': latency
    })

# 4. Apply to get the final result
block_seq = df_base.groupby('BlockId').apply(get_sequences).reset_index()

block_seq.to_csv('test_exp_traces.csv')

print(block_seq.head())

  block_seq = df_base.groupby('BlockId').apply(get_sequences).reset_index()


                    BlockId  \
0  blk_-1000002529962039464   
1   blk_-100000266894974466   
2  blk_-1000007292892887521   
3  blk_-1000014584150379967   
4  blk_-1000028658773048709   

                                            Features  \
0  [E5, E5, E5, E22, E11, E9, E11, E9, E26, E26, ...   
1  [E22, E5, E5, E5, E26, E26, E26, E11, E9, E11,...   
2  [E5, E5, E22, E5, E11, E9, E11, E9, E11, E9, E...   
3  [E5, E22, E5, E5, E26, E26, E26, E11, E9, E11,...   
4  [E5, E5, E5, E22, E11, E9, E11, E9, E11, E9, E...   

                                       TimeIntervals  Latency  
0  [0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, ...      3.0  
1  [0.0, 0.0, 0.0, 0.0, 35.0, 0.0, 0.0, 0.0, 0.0,...  30958.0  
2  [0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...      2.0  
3  [0.0, 1.0, 1.0, 1.0, 38.0, 0.0, 0.0, 0.0, 0.0,...  31460.0  
4  [0.0, 0.0, 0.0, 0.0, 17.0, 0.0, 0.0, 0.0, 0.0,...  24078.0  


In [43]:
block_seq.Features.apply(tuple).nunique()

834

In [29]:
files = ['anomaly_label.csv','Event_occurrence_matrix.csv','Event_traces.csv','HDFS.log_templates.csv']

pandas_list = []

for file in files:
    pandas_list.append(pd.read_csv(file))

pandas_list[2].head()

Unnamed: 0,BlockId,Label,Type,Features,TimeInterval,Latency
0,blk_-1608999687919862906,Success,,"[E5,E22,E5,E5,E11,E11,E9,E9,E11,E9,E26,E26,E26...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",3802
1,blk_7503483334202473044,Success,,"[E5,E5,E22,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",3802
2,blk_-3544583377289625738,Fail,21.0,"[E5,E22,E5,E5,E11,E9,E11,E9,E11,E9,E3,E26,E26,...","[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",3797
3,blk_-9073992586687739851,Success,,"[E5,E22,E5,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",50448
4,blk_7854771516489510256,Success,,"[E5,E5,E22,E5,E11,E9,E11,E9,E11,E9,E26,E26,E26...","[0.0, 0.0, 1.0, 48.0, 0.0, 0.0, 0.0, 0.0, 0.0,...",50583


In [35]:
pandas_list[2].Features.nunique()

18373

## 10. HDFS V2 Logs

In [4]:
import os

hdfs2_folder = './HDFS_v2_node_logs'
hdfs2_csv_folder = hdfs2_folder + '/hdfsv2_csv'
hdfs2_files = [a for a in os.listdir(hdfs2_folder) if '.log' in a]

with open(hdfs2_folder+'/'+hdfs2_files[0],'r') as reader:
    hdfs2_df0 = reader.readlines()

# Check how the log is formatted before parsing and cleaning

for i,line in enumerate(hdfs2_df0[:20]):
    print(i,line)

0 2015-12-03 14:37:47,611 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: STARTUP_MSG: 

1 /************************************************************

2 STARTUP_MSG: Starting DataNode

3 STARTUP_MSG:   host = mesos-master-1/10.10.34.11

4 STARTUP_MSG:   args = []

5 STARTUP_MSG:   version = 2.7.1

6 STARTUP_MSG:   classpath = /usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/commons-digester-1.8.jar:/usr/local/hadoop/share/hadoop/common/lib/activation-1.1.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-configuration-1.6.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-math3-3.1.1.jar:/usr/local/hadoop/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/usr/local/hadoop/share/hadoop/common/lib/hamcrest-core-1.3.jar:/usr/local/hadoop/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/usr/local/hadoop/share/hadoop/common/lib/slf4j-api-1.7.10.jar:/usr/local/hadoop/share/hadoop/common/lib/jetty-util-6.1.26.jar:/usr/local/hadoop/share/hadoop/common/lib/jersey-co

In [5]:
# Omitting the front part of the dataset since it does not follow rest of the log format
df_raw = pd.DataFrame(hdfs2_df0[10:],columns=['raw'])
log_reg_pattern = r'^(?P<Date>\d{4}-\d{2}-\d{2})\s+(?P<Time>\d{2}:\d{2}:\d{2},\d{3})\s+(?P<Level>[A-Z]+)\s+(?P<Component>[\w\.]+):\s+(?P<Content>.*)$'

# Seperating the values to columns in dataframe
df_base = df_raw['raw'].str.extract(log_reg_pattern)

df_base.head()

Unnamed: 0,Date,Time,Level,Component,Content
0,2015-12-03,"14:37:47,618",INFO,org.apache.hadoop.hdfs.server.datanode.DataNode,"registered UNIX signal handlers for [TERM, HUP..."
1,2015-12-03,"14:37:48,253",INFO,org.apache.hadoop.metrics2.impl.MetricsConfig,loaded properties from hadoop-metrics2.properties
2,2015-12-03,"14:37:48,315",INFO,org.apache.hadoop.metrics2.impl.MetricsSystemImpl,Scheduled snapshot period at 10 second(s).
3,2015-12-03,"14:37:48,315",INFO,org.apache.hadoop.metrics2.impl.MetricsSystemImpl,DataNode metrics system started
4,2015-12-03,"14:37:48,319",INFO,org.apache.hadoop.hdfs.server.datanode.BlockSc...,Initialized block scanner with targetBytesPerS...


### 10.1 Event Identification and storing

In [6]:
print('Size of dataset:',df_base.shape)
print('Database describe:', df_base.describe())
print('Database info:', df_base.info())
print('Empty values:', df_base.isna().sum())
print('Number of unique content:',df_base.Content.nunique())

Size of dataset: (2614796, 5)
Database describe:               Date          Time    Level  \
count      2611375       2611375  2611375   
unique         579       1015724        3   
top     2016-10-06  21:44:05,989     INFO   
freq        456424           232  2610681   

                                              Component  \
count                                           2611375   
unique                                               19   
top     org.apache.hadoop.hdfs.server.datanode.DataNode   
freq                                            1060635   

                                                  Content  
count                                             2611375  
unique                                            2594943  
top     Got finalize command for block pool BP-1088411...  
freq                                                 2300  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2614796 entries, 0 to 2614795
Data columns (total 5 columns):
 #   Column     D

In [7]:
#  Drop null values
df_base.dropna()

Unnamed: 0,Date,Time,Level,Component,Content
0,2015-12-03,"14:37:47,618",INFO,org.apache.hadoop.hdfs.server.datanode.DataNode,"registered UNIX signal handlers for [TERM, HUP..."
1,2015-12-03,"14:37:48,253",INFO,org.apache.hadoop.metrics2.impl.MetricsConfig,loaded properties from hadoop-metrics2.properties
2,2015-12-03,"14:37:48,315",INFO,org.apache.hadoop.metrics2.impl.MetricsSystemImpl,Scheduled snapshot period at 10 second(s).
3,2015-12-03,"14:37:48,315",INFO,org.apache.hadoop.metrics2.impl.MetricsSystemImpl,DataNode metrics system started
4,2015-12-03,"14:37:48,319",INFO,org.apache.hadoop.hdfs.server.datanode.BlockSc...,Initialized block scanner with targetBytesPerS...
...,...,...,...,...,...
2614791,2017-07-31,"12:32:00,003",INFO,org.apache.hadoop.hdfs.server.datanode.DataNode,Got finalize command for block pool BP-1088411...
2614792,2017-07-31,"13:16:34,202",INFO,org.apache.hadoop.hdfs.server.datanode.Directo...,BlockPool BP-108841162-10.10.34.11-14400743609...
2614793,2017-07-31,"18:32:00,024",INFO,org.apache.hadoop.hdfs.server.datanode.DataNode,Successfully sent block report 0x633af28a2a98c...
2614794,2017-07-31,"18:32:00,025",INFO,org.apache.hadoop.hdfs.server.datanode.DataNode,Got finalize command for block pool BP-1088411...


In [19]:
# Considering content has alot of unique data and there is no event list to reference off, it is best to try and create one



Index(['Date', 'Time', 'Level', 'Component', 'Content'], dtype='object')

In [21]:
print('1234567890'[1:3])

23
