In [1]:
import numpy as np
import pandas as pd

### mining log template model

In [3]:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
import json
import logging
import os
import sys
import time
from os.path import dirname

def mineModel(f, cfg):
    config = TemplateMinerConfig()
    config.load(cfg)
    config.profiling_enabled = True
    template_miner = TemplateMiner(config=config)   
    lines = f.readlines()              

    line_count = 0
    start_time = time.time()
    batch_start_time = start_time
    batch_size = 10000
    logger = logging.getLogger(__name__)
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')

    for line in lines:
        s = line
        line = line.rstrip()
        line = line.partition(": ")[2]
        line = line.replace('blk_','blk#')
        if line == '' : continue
        result = template_miner.add_log_message(line)
        line_count += 1
        if line_count % batch_size == 0:
            time_took = time.time() - batch_start_time
            rate = batch_size / time_took
            logger.info(f"Processing line: {line_count}, rate {rate:.1f} lines/sec, "
                        f"{len(template_miner.drain.clusters)} clusters so far.")
            batch_start_time = time.time()
        if result["change_type"] != "none":
            result_json = json.dumps(result)
            logger.info(f"Input ({line_count}): " + line)
            logger.info("Result: " + result_json)

    time_took = time.time() - start_time
    rate = line_count / time_took
    logger.info(f"--- Done processing file in {time_took:.2f} sec. Total of {line_count} lines, rate {rate:.1f} lines/sec, "
                f"{len(template_miner.drain.clusters)} clusters")

    sorted_clusters = sorted(template_miner.drain.clusters, key=lambda it: it.size, reverse=True)
    for cluster in sorted_clusters:
        logger.info(cluster)
    
    return template_miner

### Parse log files from Hadoop

#### Move all files to a single directory

In [2]:
import os
import shutil

source_path = os.path.abspath(r'/Users/shuming/Downloads/Hadoop')     # source directory
target_path = os.path.abspath(r'/Users/shuming/Downloads/merged_hadoop/')    # target directory

if not os.path.exists(target_path):     # creat target directory if it doesn't exist 
    os.makedirs(target_path)

if os.path.exists(source_path):    
    
    for root, dirs, files in os.walk(source_path):
        for file in files:
            src_file = os.path.join(root, file)
            shutil.copy(src_file, target_path)
            print(src_file)

print('copy complete')

/Users/shuming/Downloads/Hadoop/abnormal_label.txt
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000007.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000013.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000012.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000006.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000010.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000004.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000005.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000011.log
/Users/shuming/Downloads/Hadoop/application_1445087491445_0005/container_1445087491445_0005_01_000015

#### Merged all files into one log file

In [4]:
#os module consists of many functions about file and directory processing
import os  
meragefiledir = '/Users/shuming/Downloads/merged_hadoop/'
filenames=os.listdir(meragefiledir)  
# target file path
file=open('/Users/shuming/Downloads/merged_hadoop/merged.log','w')  
   
for filename in filenames:  
    if(filename != 'merged.log'):
        filepath=meragefiledir+filename    
        for line in open(filepath):  
            file.writelines(line)  
        file.write('\n')  
 
file.close()

#### Train Drain3 model with existing logs

In [14]:
mine_template = mineModel(f = open("../data/merged_hadoop/merged.log"),cfg='../data/drain3.ini')

Starting Drain3 template miner
Input (1): loaded properties from hadoop-metrics2.properties
Result: {"change_type": "cluster_created", "cluster_id": 1, "cluster_size": 1, "template_mined": "loaded properties from hadoop-metrics2.properties", "cluster_count": 1}
Input (2): Scheduled snapshot period at 10 second(s).
Result: {"change_type": "cluster_created", "cluster_id": 2, "cluster_size": 1, "template_mined": "Scheduled snapshot period at <:NUM:> second(s).", "cluster_count": 2}
Input (3): MapTask metrics system started
Result: {"change_type": "cluster_created", "cluster_id": 3, "cluster_size": 1, "template_mined": "MapTask metrics system started", "cluster_count": 3}
Input (4): Executing with tokens:
Result: {"change_type": "cluster_created", "cluster_id": 4, "cluster_size": 1, "template_mined": "Executing with tokens:", "cluster_count": 4}
Input (5): Kind: mapreduce.job, Service: job_1445076437777_0005, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@666adef3)
R

#### Get templates

In [5]:
normal_log = ['1445087491445_0005','1445087491445_0007', 
'1445175094696_0005','1445062781478_0011','1445062781478_0016','1445062781478_0019'
'1445076437777_0002','1445076437777_0005','1445144423722_0021','1445144423722_0024'
'1445182159119_0012']

In [17]:
line = 'Successfully connected to /10.190.173.170:50010 for BP-1347369012-10.190.173.170-1444972147527:blk_1073742826_2022'
mine_template.add_log_message(line)
match = mine_template.match(line)
match.cluster_id

total          : took    12.06 s (100.00%),    193,633 samples,   62.29 ms / 1000 samples,       16,052.84 hz
mask           : took     7.40 s ( 61.36%),    193,633 samples,   38.22 ms / 1000 samples,       26,162.14 hz
drain          : took     3.46 s ( 28.67%),    193,633 samples,   17.86 ms / 1000 samples,       55,999.23 hz
tree_search    : took     1.37 s ( 11.36%),    193,633 samples,    7.08 ms / 1000 samples,      141,287.14 hz
cluster_exist  : took     0.92 s (  7.63%),    193,332 samples,    4.76 ms / 1000 samples,      210,105.46 hz
create_cluster : took     0.01 s (  0.06%),        301 samples,   25.32 ms / 1000 samples,       39,495.87 hz


31

In [11]:
label = []
events = []
identifier = []

# open files one by one
meragefiledir = '../data/merged_hadoop/'
filenames=os.listdir(meragefiledir)  
   
for filename in filenames: 
    sequence = ''
    filepath=meragefiledir+filename   
    if filename[10:28] in normal_log:
        label.append(0)
    else:
        label.append(1) 
    for line in open(filepath):  
        try:
            line = line.rstrip()
            line = line.partition(": ")[2]
            line = line.replace('blk_','blk#')
            if line == '' : continue
            match = mine_template.match(line)
            sequence=sequence + ('E'+ str(match.cluster_id)+ ' ')
        except:
            print(line)
            break        
    identifier.append(filename)
    events.append(sequence)


Successfully connected to /10.190.173.170:50010 for BP-1347369012-10.190.173.170-1444972147527:blk_1073742826_2022
Successfully connected to /10.190.173.170:50010 for BP-1347369012-10.190.173.170-1444972147527:blk_1073742829_2025
DFSOutputStream ResponseProcessor exception  for block BP-1347369012-10.190.173.170-1444972147527:blk_1073743028_2240
DFSOutputStream ResponseProcessor exception  for block BP-1347369012-10.190.173.170-1444972147527:blk_1073743509_2728
Abandoning BP-1347369012-10.190.173.170-1444972147527:blk_1073743056_2272
Successfully connected to /10.190.173.170:50010 for BP-1347369012-10.190.173.170-1444972147527:blk_1073742826_2022
Abandoning BP-1347369012-10.190.173.170-1444972147527:blk_1073743517_2738
Abandoning BP-1347369012-10.190.173.170-1444972147527:blk_1073742931_2130
Abandoning BP-1347369012-10.190.173.170-1444972147527:blk_1073742903_2102
DFSOutputStream ResponseProcessor exception  for block BP-1347369012-10.190.173.170-1444972147527:blk_1073742860_2056
Succe

### Parse Log files from HDFS
#### Train model

In [5]:
mine_template = mineModel(f = open("../DATA/HDFS.log"),cfg='../data/drain3_hdfs.ini')

Input (1): Receiving block blk#-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010
Result: {"change_type": "cluster_created", "cluster_id": 1, "cluster_size": 1, "template_mined": "Receiving block <:BLOCKID:> src: /10.250.19.102:54106 dest: /10.250.19.102:50010", "cluster_count": 1}
Input (2): BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk#-1608999687919862906
Result: {"change_type": "cluster_created", "cluster_id": 2, "cluster_size": 1, "template_mined": "BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job 200811092030 0001/job.jar. <:BLOCKID:>", "cluster_count": 2}
Input (3): Receiving block blk#-1608999687919862906 src: /10.250.10.6:40524 dest: /10.250.10.6:50010
Result: {"change_type": "cluster_template_changed", "cluster_id": 1, "cluster_size": 2, "template_mined": "Receiving block <:BLOCKID:> src: <:*:> dest: <:*:>", "cluster_count": 2}
Input (5): PacketResponder 1 for block blk#-1608999687919862906 t

In [6]:
line = 'Received block blk_-1608999687919862906 src: /10.250.14.224:35754 dest: /10.250.14.224:50010 of size 91178'
line = line.replace('blk_','blk#')
print(line)
match = mine_template.match(line)
print(match.get_template())
for p in  mine_template.extract_parameters(match.get_template(),line,False):
    print(p)

Received block blk#-1608999687919862906 src: /10.250.14.224:35754 dest: /10.250.14.224:50010 of size 91178
Received block <:BLOCKID:> src: <:*:> dest: <:*:> of size <:*:>
ExtractedParameter(value='blk#-1608999687919862906', mask_name='BLOCKID')
ExtractedParameter(value='/10.250.14.224:35754', mask_name='*')
ExtractedParameter(value='/10.250.14.224:50010', mask_name='*')
ExtractedParameter(value='91178', mask_name='*')


#### Get Templates

In [27]:
event = []
templates = []
identifier = []

f = open("../DATA/HDFS.log")
lines = f.readlines()     

for line in lines:   
   try:
      line = line.partition(': ')[2]
      line = line.replace('blk_','blk#')
      match = mine_template.match(line)
      event.append('E'+ str(match.cluster_id))
      templates.append(match.get_template())
      ps = mine_template.extract_parameters(match.get_template(),line,False)
      for p in ps:
         if p.mask_name == 'BLOCKID':
            identifier.append(p.value)
            break
   except:
      print(line)

hdfs_templates = pd.DataFrame({'event': event,'template': templates,'identifier':identifier})
hdfs_templates['identifier'] = hdfs_templates['identifier'].str.replace('blk#','blk_')
hdfs_templates.to_csv('../data/hdfs_templates.csv', index=False)
   

In [28]:
hdfs_templates

Unnamed: 0,event,template,identifier
0,E1,Receiving block <:BLOCKID:> src: <:*:> dest: <...,blk_-1608999687919862906
1,E2,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,blk_-1608999687919862906
2,E1,Receiving block <:BLOCKID:> src: <:*:> dest: <...,blk_-1608999687919862906
3,E1,Receiving block <:BLOCKID:> src: <:*:> dest: <...,blk_-1608999687919862906
4,E3,PacketResponder <:*:> for block <:BLOCKID:> <:*:>,blk_-1608999687919862906
...,...,...,...
11175624,E15,Verification succeeded for <:BLOCKID:>,blk_-6171368032583208892
11175625,E15,Verification succeeded for <:BLOCKID:>,blk_6195025276114316035
11175626,E15,Verification succeeded for <:BLOCKID:>,blk_-3339773404714332088
11175627,E15,Verification succeeded for <:BLOCKID:>,blk_1037231945509285002


#### Generate template sequence, group by identifiers.

In [29]:
hdfs_log_df = hdfs_templates.groupby('identifier')['event'].apply(lambda x:x.str.cat(sep=' ')).reset_index()

#### Merged Labels

In [30]:
label_df = pd.read_csv('../data/anomaly_label.csv')
hdfs_log_df = label_df.merge(hdfs_log_df, left_on='BlockId', right_on='identifier')
for label in ["Normal","Anomaly"]:
    if label == "Normal":
        hdfs_log_df.iloc[hdfs_log_df[label==hdfs_log_df['Label']].index.tolist(),1] = 0
    else:
        hdfs_log_df.iloc[hdfs_log_df[label==hdfs_log_df['Label']].index.tolist(),1] = 1

In [33]:
hdfs_log_df

Unnamed: 0,BlockId,Label,identifier,event
0,blk_-1608999687919862906,0,blk_-1608999687919862906,E1 E2 E1 E1 E3 E3 E4 E4 E3 E4 E5 E5 E5 E6 E1 E...
1,blk_7503483334202473044,0,blk_7503483334202473044,E1 E1 E2 E1 E3 E4 E3 E4 E3 E4 E5 E5 E5 E10 E15...
2,blk_-3544583377289625738,1,blk_-3544583377289625738,E1 E2 E1 E1 E3 E4 E3 E4 E3 E4 E10 E5 E5 E5 E10...
3,blk_-9073992586687739851,0,blk_-9073992586687739851,E1 E11 E1 E1 E3 E4 E3 E4 E3 E4 E5 E5 E5 E15 E1...
4,blk_7854771516489510256,0,blk_7854771516489510256,E1 E1 E12 E1 E3 E4 E3 E4 E3 E4 E5 E5 E5 E15 E1...
...,...,...,...,...
575056,blk_1019720114020043203,0,blk_1019720114020043203,E1 E12 E1 E1 E5 E3 E4 E3 E4 E3 E4 E5 E5 E34 E3...
575057,blk_-2683116845478050414,0,blk_-2683116845478050414,E1 E12 E1 E1 E3 E4 E5 E3 E4 E3 E4 E5 E5 E34 E3...
575058,blk_5595059397348477632,0,blk_5595059397348477632,E1 E1 E1 E12 E3 E4 E3 E4 E3 E4 E5 E5 E5 E34 E3...
575059,blk_1513937873877967730,0,blk_1513937873877967730,E1 E1 E1 E12 E3 E4 E3 E4 E3 E4 E5 E5 E5 E34 E3...


In [32]:
hdfs_log_df.to_csv("../data/hdfs_log.csv")