In [8]:
'''
!pip install numpy
!pip install pandas
!pip install sklearn
!pip install tensorflow
!pip install keras
!pip install matplotlib
!pip install seaborn
!pip install plotly
!pip install tqdm
'''

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.models import Sequential

from keras.layers import LSTM, GRU,SimpleRNN
from keras.layers.core import Dense, Activation, Dropout
from keras.layers import Embedding
from keras.layers import BatchNormalization
from keras.utils import np_utils
from sklearn import preprocessing, decomposition, model_selection, metrics, pipeline
from keras.layers import GlobalMaxPooling1D, Conv1D, MaxPooling1D, Flatten, Bidirectional, SpatialDropout1D
from keras.preprocessing import sequence, text
from keras.callbacks import EarlyStopping


import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from plotly import graph_objs as go
import plotly.express as px
import plotly.figure_factory as ff

In [9]:
# get [log key, delta time] as input for deeplog
input_dir  = '../Datasets/hdfs/'
output_dir = '../output/hdfs/'  # The output directory of parsing results
log_file   = "HDFS.log"  # The input log file name

log_structured_file = output_dir + log_file + "_structured.csv"
log_templates_file = output_dir + log_file + "_templates.csv"
log_sequence_file = output_dir + "hdfs_sequence.csv"

### Configuring TPU's as we will be using Bert 

In [10]:
# Detect hardware, return appropriate distribution strategy
try:
    # TPU detection. No parameters necessary if TPU_NAME environment variable is
    # set: this is always the case on Kaggle.
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Running on TPU ', tpu.master())
except ValueError:
    tpu = None

if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
else:
    # Default distribution strategy in Tensorflow. Works on CPU and single GPU.
    strategy = tf.distribute.get_strategy()

print("REPLICAS: ", strategy.num_replicas_in_sync)

REPLICAS:  1


In [12]:

#train = logFile2DataFrame('../Datasets/HDFS/HDFS.log')
#validation = pd.read_csv('../Datasets/Sentiment/validation.csv')
#test = pd.read_csv('../Datasets/Sentiment/test.csv')

In [13]:
import re
import os

def load_data(log_format):
    headers, regex = generate_logformat_regex(log_format)
    df_log = log_to_dataframe(os.path.join(input_dir, log_file), regex, headers, log_format)
    return df_log

def log_to_dataframe( log_file, regex, headers, logformat):
    """ Function to transform log file to dataframe
    """
    log_messages = []
    linecount = 0
    cnt = 0
    with open(log_file, 'r') as fin:
        for line in fin.readlines():
            cnt += 1
            try:
                match = regex.search(line.strip())
                message = [match.group(header) for header in headers]
                log_messages.append(message)
                linecount += 1
            except Exception as e:
                # print("\n", line)
                # print(e)
                pass
    print("Total size after encoding is", linecount, cnt)
    logdf = pd.DataFrame(log_messages, columns=headers)
    logdf.insert(0, 'LineId', None)
    logdf['LineId'] = [i + 1 for i in range(linecount)]
    return logdf

def generate_logformat_regex( logformat):
    """ Function to generate regular expression to split log messages
    """
    headers = []
    splitters = re.split(r'(<[^<>]+>)', logformat)
    regex = ''
    for k in range(len(splitters)):
        if k % 2 == 0:
            splitter = re.sub(' +', '\\\s+', splitters[k])
            regex += splitter
        else:
            header = splitters[k].strip('<').strip('>')
            regex += '(?P<%s>.*?)' % header
            headers.append(header)
    regex = re.compile('^' + regex + '$')
    return headers, regex




In [14]:
log_format = '<Date> <Time> <Pid> <Level> <Component>: <Content>'  # HDFS log format
train = load_data(log_format)
train.head()


Total size after encoding is 3746105 3746106


Unnamed: 0,LineId,Date,Time,Pid,Level,Component,Content
0,1,81109,203518,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
1,2,81109,203518,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...
2,3,81109,203519,143,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
3,4,81109,203519,145,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...
4,5,81109,203519,145,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...


In [55]:
import json
from collections import defaultdict

def hdfs_sampling(df, window='session'):
    assert window == 'session', "Only window=session is supported for HDFS dataset."
    df['BlockId']= "NULL"
    #for idx, row in tqdm(df.iterrows()):
    #    df['Label'][idx] =  re.findall(r'(blk_-?\d+)', row['Content'])
    
    data_dict = defaultdict(list) #preserve insertion order of items
    for idx, row in tqdm(df.iterrows()):
        blkId_list = re.findall(r'(blk_-?\d+)', row['Content'])
        blkId_set = set(blkId_list)
        for blk_Id in blkId_set:
            data_dict[blk_Id].append(row["Content"])

    data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence'])  
    
    return data_df

aaa_df= hdfs_sampling(train)
aaa_df.head(20)


3746105it [02:08, 29104.41it/s]


Unnamed: 0,BlockId,EventSequence
0,blk_-1608999687919862906,[Receiving block blk_-1608999687919862906 src:...
1,blk_7503483334202473044,[Receiving block blk_7503483334202473044 src: ...
2,blk_-3544583377289625738,[Receiving block blk_-3544583377289625738 src:...
3,blk_-9073992586687739851,[Receiving block blk_-9073992586687739851 src:...
4,blk_7854771516489510256,[Receiving block blk_7854771516489510256 src: ...
5,blk_1717858812220360316,[Receiving block blk_1717858812220360316 src: ...
6,blk_-2519617320378473615,[Receiving block blk_-2519617320378473615 src:...
7,blk_7063315473424667801,[Receiving block blk_7063315473424667801 src: ...
8,blk_8586544123689943463,[Receiving block blk_8586544123689943463 src: ...
9,blk_2765344736980045501,[Receiving block blk_2765344736980045501 src: ...


In [59]:
aaa_df.to_csv(log_sequence_file, index=None)

In [17]:
def generate_train_test(hdfs_sequence_file, n=None, ratio=0.3):
    blk_label_dict = {}
    blk_label_file = os.path.join(input_dir, "anomaly_label.csv")
    blk_df = pd.read_csv(blk_label_file)
    for _ , row in tqdm(blk_df.iterrows()):
        blk_label_dict[row["BlockId"]] = 1 if row["Label"] == "Anomaly" else 0

    seq = hdfs_sequence_file.copy() #pd.read_csv(hdfs_sequence_file)
    seq["Label"] = seq["BlockId"].apply(lambda x: blk_label_dict.get(x)) #add label to the sequence of each blockid

    #normal_seq = seq[seq["Label"] == 0]["EventSequence"]
    #normal_seq = normal_seq.sample(frac=1, random_state=20) # shuffle normal data

    #abnormal_seq = seq[seq["Label"] == 1]["EventSequence"]
    #normal_len, abnormal_len = len(normal_seq), len(abnormal_seq)
    #train_len = n if n else int(normal_len * ratio)
    #print("normal size {0}, abnormal size {1}, training size {2}".format(normal_len, abnormal_len, train_len))

    #train = seq.iloc[:train_len]
    #test_normal = normal_seq.iloc[train_len:]
    #test_abnormal = abnormal_seq

    #df_to_file(train, output_dir + "train")
    #df_to_file(test_normal, output_dir + "test_normal")
    #df_to_file(test_abnormal, output_dir + "test_abnormal")
    print("generate train test data done")
    return seq


In [18]:
new__df = generate_train_test(train)
new__df.head()

575061it [00:17, 32079.76it/s]


KeyError: 'BlockId'

In [None]:
train.drop(['severe_toxic','obscene','threat','insult','identity_hate'],axis=1,inplace=True)


In [None]:
train.head()

In [None]:
train = train.loc[:12000,:]
train.shape

### Check the maximum number of words in a comment

In [None]:
train['comment_text'].apply(lambda x:len(str(x).split())).max()

In [None]:
#Writing a function for getting auc score for validation

def roc_auc(predictions,target):
    '''
    This methods returns the AUC Score when given the Predictions
    and Labels
    '''
    
    fpr, tpr, thresholds = metrics.roc_curve(target, predictions)
    roc_auc = metrics.auc(fpr, tpr)
    return roc_auc

In [None]:
# Data Preparation

xtrain, xvalid, ytrain, yvalid = train_test_split(train.comment_text.values, train.toxic.values, 
                                                  stratify=train.toxic.values, 
                                                  random_state=42, 
                                                  test_size=0.2, shuffle=True)

### Simple RNN

In [None]:
# using keras tokenizer here
token = text.Tokenizer(num_words=None)
max_len = 1500

token.fit_on_texts(list(xtrain) + list(xvalid))
xtrain_seq = token.texts_to_sequences(xtrain)
xvalid_seq = token.texts_to_sequences(xvalid)

#zero pad the sequences
from tensorflow.keras.preprocessing.sequence import pad_sequences
xtrain_pad = pad_sequences(xtrain_seq, maxlen=max_len)
xvalid_pad = pad_sequences(xvalid_seq, maxlen=max_len)

word_index = token.word_index

In [None]:
%%time
with strategy.scope():
    # A simpleRNN without any pretrained embeddings and one dense layer
    model = Sequential()
    model.add(Embedding(len(word_index) + 1,
                     300,
                     input_length=max_len))
    model.add(SimpleRNN(100))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
model.summary()

In [None]:
model.fit(xtrain_pad, ytrain, epochs=5, batch_size=64*strategy.num_replicas_in_sync) #Multiplying by Strategy to run on TPU's