In [1]:
from pyspark import SparkContext, SparkConf #spark
import pydoop.hdfs as hdfs #for saving local file as hdfs 
from pyspark.sql import SQLContext # for data frame
import urllib #for imitating hdfs auto escape string
import re #for escape string 


In [3]:
conf = SparkConf().setAppName("error_machine_part_2")
sc = SparkContext(conf=conf)

In [4]:
def Count_Lines(df_path):
    rdd_hdfs = sc.textFile(df_path)
    count_lines = rdd_hdfs.count()
    return count_lines

In [18]:
def Word_Count_Each_Text(df_path):
    '''
    [(u'', 483648),
     (u'virtual', 11546),
     (u'jerry', 11365),
     (u'Jan', 11364),
     (u'machine', 11364),
     (u'NetworkManager', 4390),
     (u'info', 4327)]
    '''
    rdd_hdfs = sc.textFile(df_path)
    count_lines = rdd_hdfs.count()
    # spliting by special characters and number
    rdd_split_map = rdd_hdfs.flatMap(lambda x: re.split(r'[`\- =~!@#$%^&*()_+\[\]{};\'\\:"|<,./<>?0-9]', x),1)
    #ready to count
    rdd_count_map = rdd_split_map.map(lambda x: (x, 1))
    #mapping each key and counting it
    res_reduce = rdd_count_map.reduceByKey(lambda a, b: a+b )
    # desc sorting by counting number
    res_reduce = res_reduce.sortBy(lambda c: c[1], False)
    return res_reduce

In [19]:
def Pick_Top_Text(rdd_wc_reduce, count_lines):
    #picking string which always show and counting bigger than file lines  (deviation value 2)
    pick_top_string_map = rdd_wc_reduce.filter(lambda x: x[1] >= count_lines-2)
    pick_top_string_map.collect()
    return pick_top_string_map.collect()

In [153]:
def Map_Filter_Top_Texts(line, list_tops):
    '''
    filter all the top text 
    if len(text) > len(file lines):
    then delete text
    '''
    res_str = ""
    for text in line:
        flag = 0
        if str(text) == "":
            continue;
        for top in list_tops:
            #res_str +='----' +top[0]
            if top[0] == str(text):

                flag = 1
                break;
        if(flag == 1):
            continue;
        else:
            res_str +="," + str(text)
    return res_str[1:]

In [122]:
def Gen_Relative_Table(col_rdd):
    '''
    0 1 2
    A B C
    
    relative[A] = [B]
    relative[B] = [A,C]
    relative[C] = [B]
    '''
    dist_text = {}
    for str_line in col_rdd:
        line = str_line.split(',')
        for i in range(0,len(line)):
            now = line[i]
            if now not in dist_text:
                dist_text[now] = []
            if i == 0 and i != len(line) -1:
                dist_text[now].append( line[i+1] )
            elif i == len(line) -1 :
                dist_text[now].append( line[i-1] )
            else:
                dist_text[now].append( line[i+1] )
                dist_text[now].append( line[i-1] )

    for text in dist_text:
        set_dist = set(dist_text[text])
        dist_text[text] = list(set_dist)
    return dist_text

In [218]:
def Count_Each_Line_Score(col_rdd, relative_table):
    '''
    for each text
        query relative table if not found this keyword ->score = 0
        if found this keyword and find it's relative world -> score += 1
    line score = text score / (line length - 1)
    '''
    list_res = []
    list_zero_line_number = []
    list_zero_line_content = []
    number = 0
    for str_line in col_rdd:
        number += 1
        line = str_line.split(',')
        score_line = 0.0
        len_line = len(line)
        if len_line > 1:
            for i in range(0,len_line):
                now = line[i]
                if now not in relative_table:
                    score_line += 0
                elif i != (len_line -1):
                    next = line[i+1]
                    for relative in relative_table[now]:
                        if next == relative:
                            score_line += 1
            score_line = score_line/(len_line-1)
        list_res.append(score_line)
        if score_line == 0.0:
            list_zero_line_number.append(number)
            list_zero_line_content.append(str_line)
    return (list_res,list_zero_line_number,list_zero_line_content)

            
        

In [142]:
def Delete_Top_Text_for_Each_Line(df_path, top_collect):
    rdd_hdfs = sc.textFile(df_path)
    # spliting by special characters
    rdd_sp_map = rdd_hdfs.map(lambda x: re.split(r'[`\- =~!@#$%^&*()_+\[\]{};\'\\:"|<,./<>?0-9]', x),1)
    #filter all the top text 
    rdd_fil_map = rdd_sp_map.map(lambda x:Map_Filter_Top_Texts(x, top_collect) )
    return rdd_fil_map

In [125]:
def Gen_Relative_Table_by_Filter_Map(rdd_fil_map):
    col_rdd_fil = rdd_fil_map.collect()
    rdd_relative_table = Gen_Relative_Table(col_rdd_fil)
    return rdd_relative_table

In [158]:
def Main_Normal_Case(input_path):
    count_lines = Count_Lines(input_path)
    red_wordcount = Word_Count_Each_Text(input_path)
    col_red_top = Pick_Top_Text(red_wordcount, count_lines)
    rdd_map_filiter_top = Delete_Top_Text_for_Each_Line(input_path, col_red_top)
    table = Gen_Relative_Table_by_Filter_Map(rdd_map_filiter_top)
    return table

In [219]:
def Main_Compare_Case(input_path, normal_path):
    count_lines = Count_Lines(input_path)
    red_wordcount = Word_Count_Each_Text(input_path)
    col_red_top = Pick_Top_Text(red_wordcount, count_lines)
    rdd_map_filiter_top = Delete_Top_Text_for_Each_Line(input_path, col_red_top)
    main_relative_table = Main_Normal_Case(normal_path)
    list_score,list_zore_number,list_zero_line_content = Count_Each_Line_Score(rdd_map_filiter_top.collect(), main_relative_table)
    return list_score,list_zore_number,list_zero_line_content

In [160]:
inputPath_normal = '/user/spark/input/log_normal'

In [161]:
inputPath_compare = '/user/spark/input/log_error'

In [151]:
rdd_map_filiter_top_com = Main_Compare_Case(inputPath_compare)

In [154]:
rdd_map_filiter_top_com.take(7)

['anacron,Job,cron,daily,terminated',
 'anacron,Normal,exit,job,run',
 'dhclient,DHCPREQUEST,of,on,ens,to,port,xid,x,ce,aa',
 'dhclient,DHCPACK,of,from',
 'NetworkManager,info,address',
 'NetworkManager,info,plen',
 'NetworkManager,info,gateway']

In [220]:
score_lines, list_zero_number,list_zero_line_content = Main_Compare_Case(inputPath_compare, inputPath_normal)

In [212]:
print score_lines

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.3, 1.0, 1.0,

In [217]:
print list_zero_number

[456, 457, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 487, 488, 489, 490, 491, 492, 526, 534, 535, 536, 537, 541, 545, 553, 559, 560, 564, 565, 566, 567, 568, 569, 570, 571, 573, 574, 578, 579, 580, 581, 582, 591, 592, 593, 594, 599, 600, 601, 602, 614, 649, 653, 657, 658, 660, 661, 662, 666, 673, 675, 677, 678, 681, 682, 684, 744, 746, 749, 750, 757, 769, 771, 773, 776, 777, 778, 780, 782, 784, 785, 787, 788, 789, 790, 791, 794, 795, 796, 798, 799, 800, 801, 802, 805, 807, 809, 814, 815, 816, 817, 818, 819, 820, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 838, 840, 842, 843, 844, 847, 848, 849, 850, 854, 857, 858, 859, 860, 861, 862, 863, 869, 871, 872, 873]


In [221]:
print list_zero_line_content

['kernel,btrfs,xor', 'kernel,xor', 'kernel,raid,pq', 'kernel,raid,pq', 'kernel,btrfs,xor,raid,pq,ufs,qnx', 'kernel,ufs,ufs,qnx', 'kernel,qnx,hfsplus', 'kernel,hfsplus,hfs', 'kernel,hfs,minix', 'kernel,minix', 'kernel,ntfs,msdos', 'kernel,ntfs,hfsplus,hfs', 'kernel,jfs', 'kernel,minix', 'kernel,msdos', 'kernel,ntfs', 'kernel,jfs,xfs', 'kernel,msdos,jfs,xfs', 'kernel,xfs,libcrc,c', 'kernel,vhost,vhost,vhost', 'kernel,macvtap,macvtap', 'kernel,macvtap', 'kernel,macvlan', 'kernel,macvlan', 'kernel,macvlan', 'kernel,coretemp,coretemp', 'kernel,snd,snd', 'kernel,kvm,kvm', 'kernel,soundcore', 'kernel,soundcore', 'kernel,irqbypass,joydev', 'kernel,nfit,nfit', 'kernel,ppdev', 'kernel,vmxnet', 'kernel,vmxnet', 'kernel,usbhid', 'kernel,usbhid', 'kernel,usbhid', 'kernel,hid', 'kernel,hid', 'kernel,hid', 'kernel,vmwgfx', 'kernel,vmwgfx', 'kernel,ttm', 'kernel,ttm', 'kernel,psmouse', 'kernel,psmouse', 'kernel,psmouse', 'kernel,syscopyarea', 'kernel,syscopyarea', 'kernel,drm', 'kernel,mii', 'kernel,m

In [222]:
len_list = len(list_zero_line_content)
for i in range(0,len_list):
    print list_zero_number[i]
    print list_zero_line_content[i]

456
kernel,btrfs,xor
457
kernel,xor
459
kernel,raid,pq
460
kernel,raid,pq
461
kernel,btrfs,xor,raid,pq,ufs,qnx
462
kernel,ufs,ufs,qnx
463
kernel,qnx,hfsplus
464
kernel,hfsplus,hfs
465
kernel,hfs,minix
466
kernel,minix
467
kernel,ntfs,msdos
468
kernel,ntfs,hfsplus,hfs
469
kernel,jfs
470
kernel,minix
471
kernel,msdos
472
kernel,ntfs
473
kernel,jfs,xfs
474
kernel,msdos,jfs,xfs
475
kernel,xfs,libcrc,c
487
kernel,vhost,vhost,vhost
488
kernel,macvtap,macvtap
489
kernel,macvtap
490
kernel,macvlan
491
kernel,macvlan
492
kernel,macvlan
526
kernel,coretemp,coretemp
534
kernel,snd,snd
535
kernel,kvm,kvm
536
kernel,soundcore
537
kernel,soundcore
541
kernel,irqbypass,joydev
545
kernel,nfit,nfit
553
kernel,ppdev
559
kernel,vmxnet
560
kernel,vmxnet
564
kernel,usbhid
565
kernel,usbhid
566
kernel,usbhid
567
kernel,hid
568
kernel,hid
569
kernel,hid
570
kernel,vmwgfx
571
kernel,vmwgfx
573
kernel,ttm
574
kernel,ttm
578
kernel,psmouse
579
kernel,psmouse
580
kernel,psmouse
581
kernel,syscopyarea
582
kernel,

In [10]:
inputPath_normal = '/user/spark/input/log_normal'

In [11]:
#red_wordcount = Main_Normal_Case(inputPath_normal)

In [16]:
red_wordcount.take(7)

[(u'', 483648),
 (u'virtual', 11546),
 (u'jerry', 11365),
 (u'Jan', 11364),
 (u'machine', 11364),
 (u'NetworkManager', 4390),
 (u'info', 4327)]

In [25]:
rdd_map_filiter_top = Main_Normal_Case(inputPath_normal)

In [26]:
rdd_map_filiter_top.take(7)

['anacron,Job,cron,daily,terminated',
 'anacron,Normal,exit,job,run',
 'dhclient,DHCPREQUEST,of,on,ens,to,port,xid,x,da,d,b',
 'dhclient,DHCPACK,of,from',
 'NetworkManager,info,address',
 'NetworkManager,info,plen',
 'NetworkManager,info,gateway']

In [82]:
col_rdd = rdd_map_filiter_top.collect()

In [121]:
dist_text = {}
y = 0
for str_line in col_rdd:
    line = str_line.split(',')
    for i in range(0,len(line)):
        now = line[i]
        if now not in dist_text:
            dist_text[now] = []
        if i == 0 and i != len(line) -1:
            dist_text[now].append( line[i+1] )
        elif i == len(line) -1 :
            dist_text[now].append( line[i-1] )
        else:
            dist_text[now].append( line[i+1] )
            dist_text[now].append( line[i-1] )

for text in dist_text:
    set_dist = set(dist_text[text])
    dist_text[text] = list(set_dist)

    

print dist_text['info']

['audit', 'domain', 'level', 'wins', 'address', 'msg', 'devices', 'server', 'policy', 'manager', 'dhcp', 'dns', 'plen', 'device', 'NetworkManager', 'nameserver', 'keyfile', 'gateway', 'lease']


In [123]:
table = gen_relative_table_by_each_word(rdd_map_filiter_top)

In [189]:
table['gateway']

['info']

In [172]:
col_rdd = rdd_map_filiter_top_com.collect()

In [173]:
relative_table = table