In [1]:
%%time
import numpy as np
import ast
from datetime import datetime
import glob
import ipaddress
import os
import pandas as pd
from pandarallel import pandarallel
from joblib import Parallel, delayed
from local_utils import *
import sys
import logging

so = open('cbl.log', 'w', 10)
sys.stdout.echo = so
sys.stderr.echo = so

get_ipython().log.handlers[0].stream = so
get_ipython().log.setLevel(logging.INFO)

pandarallel.initialize(progress_bar=True)

INFO: Pandarallel will run on 60 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.
CPU times: user 323 ms, sys: 63.8 ms, total: 386 ms
Wall time: 389 ms


In [2]:
spamhaus_dir = '/data/data-ec/data/data.spamhaus/'
org_ranges_file = 'csv/all_ranges_over_time_gaps_filled.csv'
exploded_ranges_file = 'csv/all_ranges_exploded.csv'

In [3]:
# %%time
# def process_cols(row):
#     row['ripe_names'] = ast.literal_eval(row['ripe_names'])
#     row['Ranges'] = ast.literal_eval(row['Ranges'])
#     return row

# df = pd.read_csv(org_ranges_file)
# df = df.parallel_apply(process_cols, axis=1)
# df

In [4]:
%%time

# exploded = df.copy()
# del exploded['ripe_ranges']

exploded = pd.read_csv(exploded_ranges_file)

cols = list(exploded.columns.values)
cols.remove('Ranges')
cols.remove('ripe_names')

# exploded = exploded.set_index(cols).apply(pd.Series.explode).reset_index()
exploded

In [5]:
def get_file_location_dict(name):
    if 'cbl' in name:
        name = 'cbl.diagnostics'
    files = glob.glob(f'{spamhaus_dir}/{name}*')
    file_location_dict = { '-'.join(file[-10:].split('_')[::-1]): file for file in files }
    files = [ x for x in glob.glob(f'{spamhaus_dir}/*/{name}') if 'sample' not in x ]
    files = { file[33:43]: file for file in files }
    file_location_dict.update(files)
    return file_location_dict

In [6]:
def get_ip_set_xbl(filepath):
    with open(filepath, 'r') as f:
        ip_set = set( [ line.strip() for line in f.readlines() if line[0] not in ['#', ':'] ] )
    return ip_set

def get_ip_range_set_pbl(filepath):
    ranges = []
    exceptions = []
    with open(filepath, 'r') as f:
        for line in f.readlines():
            line = line.strip()
            if line.startswith('#') or line.startswith(':') or line.startswith('!'):
                continue
#             if line.startswith('!'):
#                 exceptions.append(ipaddress.IPv4Address(line[1:]))
            else:
                line = line.strip()[0:-4]
                ranges.append(line)
#                 ranges.append(ipaddress.IPv4Network(line))
                    
    ip_set = set(ranges)
    return ip_set

def get_ip_set_cbl(filepath):
    ips = []
    with open(filepath, 'r', errors='replace') as f:
        for line in f.readlines():
            line = line.strip()
            if line.startswith('#'):
                continue
            else:
                first_comma_idx = line.index(',')
                ip = line.strip()[0:first_comma_idx]
                ips.append(ip)
                    
    ip_set = set(ips)
    return ip_set

def get_ip_set(filepath):
    if 'pbl' in filepath:
        return get_ip_range_set_pbl(filepath)
    
    if 'xbl' in filepath:
        return get_ip_set_xbl(filepath)
    
    if 'cbl' in filepath:
        return get_ip_set_cbl(filepath)


In [7]:
def extract_features_group_cbl(group, file_location_dict):
    date = group['date'].iloc[0]
    
    if date not in file_location_dict:
        group['cbl_ips'] = pd.Series([ [] for x in group.index ], index=group.index, name='cbl_ips')
        group['cbl_count'] = pd.Series(pd.array([np.nan for x in group.index], dtype="Int64"),
                                       index=group.index, name='cbl_count')
        return group
    
    filepath = file_location_dict.get(date)
    ip_set = get_ip_set(filepath)
    
    result = group.apply(extract_features_cbl, args=(ip_set,), axis=1)
    return result
    
def extract_features_cbl(row, ip_set):
    #get single IP range from Ranges column, get all IPs in this range
    r = row['Ranges']
    ips = set([ ip.compressed for ip in ipaddress.IPv4Network(r) ])
    #find union between range and 'malicious' IPs, this is the number of malicious IPs on this date
    union = ips & ip_set
    num_malicious = len(union)
    row['cbl_count'] = num_malicious
    row['cbl_ips'] = list(union)
    return row

In [8]:
file_location_dict = get_file_location_dict('cbl')
print('got locations')
cbl_features = Parallel(n_jobs=20, verbose=1, backend='multiprocessing')(
    delayed(extract_features_group_cbl)(group, file_location_dict) 
    for i, (name, group) in enumerate(exploded.groupby('date'))
)
cbl_features = pd.concat(cbl_features)


got locations


[Parallel(n_jobs=20)]: Using backend MultiprocessingBackend with 20 concurrent workers.
[Parallel(n_jobs=20)]: Done  10 tasks      | elapsed:    4.0s
[Parallel(n_jobs=20)]: Done 280 tasks      | elapsed:    5.1s
[Parallel(n_jobs=20)]: Done 780 tasks      | elapsed:    7.9s
[Parallel(n_jobs=20)]: Done 1200 tasks      | elapsed:   20.0s
[Parallel(n_jobs=20)]: Done 1650 tasks      | elapsed:   32.4s
[Parallel(n_jobs=20)]: Done 2320 tasks      | elapsed:   46.6s
[Parallel(n_jobs=20)]: Done 3590 tasks      | elapsed: 137.6min
[Parallel(n_jobs=20)]: Done 4379 tasks      | elapsed: 238.6min
[Parallel(n_jobs=20)]: Done 4551 out of 4551 | elapsed: 240.1min finished


In [9]:
cbl_features[(~cbl_features['cbl_count'].isna()) & (cbl_features['cbl_count'] > 0)]

In [10]:
cbl_features['Ranges'] = cbl_features['Ranges'].apply(ipaddress.IPv4Network)
cbl_features = cbl_features.sort_values(['date', 'customer', 'Ranges'])
cbl_features['Ranges'] = cbl_features['Ranges'].apply(str)

def aggregate_to_list(group):
    return group.tolist() # or whatever other aggregation you want

# columns specified in .agg() should NOT include the groupby columns
imploded = cbl_features.groupby(cols).agg({'Ranges': aggregate_to_list,
                                           'ripe_names': aggregate_to_list,
                                           'cbl_ips': aggregate_to_list,
                                           'cbl_count': aggregate_to_list}).reset_index()

imploded

In [11]:
imploded.to_csv('csv/features/cbl_features.csv', index=False)

In [12]:
print('done')

done
