In [1]:
import pandas as pd
from pathlib import Path
import re
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
import numpy as np
import seaborn as sns
import pickle
import gc

import dask
import dask.dataframe as dd

from scipy.sparse import csr_matrix as sparce_matrix

from IPython.core.debugger import Pdb
ipdb = Pdb()

tqdm.pandas()

from dask.diagnostics import ProgressBar
ProgressBar().register()

  from pandas import Panel


In [2]:
dataset_version = '4.2'

preprocessed_dir = Path(f'C:/Datasets/CERT_output_v{dataset_version}/')
assert(preprocessed_dir.is_dir())

tmp_dir = Path('C:/Datasets/tmp')
assert(tmp_dir.is_dir())
dask.config.set(temporary_directory=tmp_dir.as_posix())

<dask.config.set at 0x225ea2be1c0>

In [3]:
# from dask.distributed import Client
# client = Client()  # start distributed scheduler locally.  Launch dashboard
# client

## Initial processing

Concatenate the data

In [7]:
logon_df = dd.read_csv(preprocessed_dir/ 'logon_preprocessed.csv',
                      usecols = ['date', 'user', 'is_usual_pc', 'is_work_time', 'subtype'])
logon_df['type'] = 'logon'
print('logon')

http_df = dd.read_csv(preprocessed_dir/ 'http_preprocessed.csv',
                      usecols = ['date', 'user', 'is_usual_pc', 'is_work_time', 'subtype'])
http_df['type'] = 'http'
http_df.subtype = http_df.subtype.map(int)
print('http')

device_df = dd.read_csv(preprocessed_dir/ 'device_preprocessed.csv',
                      usecols = ['date', 'user', 'is_usual_pc', 'is_work_time', 'subtype'])
device_df['type'] = 'device'
print('device')

email_df = dd.read_csv(preprocessed_dir/ 'email_preprocessed.csv',
                      usecols = ['date', 'user', 'is_usual_pc', 'is_work_time', 'subtype'])
email_df['type'] = 'email'
email_df.subtype = email_df.subtype.map({True: 'external', False: 'internal'})
print('email')

file_df = dd.read_csv(preprocessed_dir/ 'file_preprocessed.csv',
                      usecols = ['date', 'user', 'is_usual_pc', 'is_work_time', 'subtype'])
file_df['type'] = 'file'
print('file')

logon
http
device
email
file


In [8]:
logon_df, http_df, device_df, email_df, file_df = dask.persist(logon_df, http_df, device_df, email_df, file_df)

[########################################] | 100% Completed | 35.6s
[########################################] | 100% Completed | 35.7s


In [9]:
df = dd.concat([logon_df, http_df, device_df, email_df, file_df], axis=0)

In [10]:
del logon_df
del http_df
del file_df
del device_df
del email_df

gc.collect()

839

In [11]:
df = df.persist()

[########################################] | 100% Completed |  7.0s
[########################################] | 100% Completed |  7.1s


Turn categorical columns into their codes

In [12]:
df = df.categorize(columns=['subtype', 'type', 'is_usual_pc', 'is_work_time', 'user'])

[########################################] | 100% Completed |  6.6s
[########################################] | 100% Completed |  6.6s


In [13]:
df['type'] = df['type'].cat.codes
df['subtype'] = df['subtype'].cat.codes
df['is_usual_pc'] = df['is_usual_pc'].cat.codes
df['is_work_time'] = df['is_work_time'].cat.codes
df = df.persist()

[########################################] | 100% Completed | 20.1s
[########################################] | 100% Completed | 20.2s


Construct single combined categorical column

In [14]:
df['action_id'] = df.is_usual_pc * 32 + df.is_work_time * 16 + df.subtype
df = df.persist()

[########################################] | 100% Completed |  0.2s
[########################################] | 100% Completed |  0.3s


Get datetime columns

In [15]:
df['date'] = dd.to_datetime(df.date, format='%Y/%m/%d %H:%M:%S')
df['day'] = df.date.dt.floor('D')
df = df.persist()

[########################################] | 100% Completed | 24.9s
[########################################] | 100% Completed | 24.9s


In [16]:
df['day_minutes'] = df['date'].dt.hour * 60 + df['date'].dt.minute
df = df.persist()

[########################################] | 100% Completed |  1.6s
[########################################] | 100% Completed |  1.7s


In [17]:
df = df[['day', 'user', 'action_id', 'day_minutes']]
df = df.persist()

[########################################] | 100% Completed |  1.2s
[########################################] | 100% Completed |  1.2s


In [18]:
memory_usage = df.memory_usage().sum().compute() / 1024**2
f'{memory_usage} MB'

[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.4s


'595.1946811676025 MB'

In [19]:
df.shape[0].compute()

[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.2s


32770222

In [20]:
df.to_parquet(tmp_dir / 'df.parquet')

[########################################] | 100% Completed |  2.9s
[########################################] | 100% Completed |  2.9s


## Group by

In [47]:
df = dd.read_parquet(tmp_dir / 'df.parquet')

In [48]:
action_id_lists = df.groupby('user')\
    .apply(pd.DataFrame.sort_values, ['day', 'day_minutes'])\
    .reset_index(drop=True)\
    .groupby(['user', 'day'])[['action_id', 'day_minutes']]\
    .apply(
        lambda x: [list(x['action_id']), list(x['day_minutes'])],
            meta=pd.Series(dtype=object),
            )

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  action_id_lists = df.groupby('user')\


In [49]:
# This should NOT be ran in distributed fashion
action_id_lists = action_id_lists.compute() # takes about 20-30 min
# action_id_lists = action_id_lists.dropna().reset_index()
# action_id_lists = action_id_lists.rename({0: 'action_id'}, axis=1)

action_id_lists = action_id_lists.dropna()
action_id_lists = action_id_lists.progress_apply(pd.Series)
action_id_lists = action_id_lists.reset_index()
action_id_lists.columns = ['user', 'day', 'action_id', 'day_minutes']

[########################################] | 100% Completed |  4min 29.8s


100%|████████████████████████████████████████████████████████████████████████| 330452/330452 [02:05<00:00, 2627.61it/s]


In [50]:
action_id_lists

Unnamed: 0,user,day,action_id,day_minutes
0,NGF0157,2010-01-04,"[0, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 3, 2, 2, ...","[421, 432, 433, 434, 446, 450, 451, 467, 467, ..."
1,NGF0157,2010-02-24,"[0, 9, 2, 2, 2, 2, 2, 2, 2, 2, 2, 24, 18, 18, ...","[423, 440, 441, 443, 446, 448, 451, 452, 452, ..."
2,NGF0157,2010-04-01,"[0, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 9, 2, ...","[416, 417, 419, 426, 431, 434, 437, 440, 440, ..."
3,NGF0157,2010-06-10,"[0, 9, 2, 2, 2, 2, 9, 9, 2, 9, 8, 8, 2, 9, 2, ...","[410, 411, 412, 412, 412, 412, 412, 412, 413, ..."
4,NGF0157,2010-06-25,"[1, 0, 2, 2, 2, 2, 2, 2, 2, 18, 18, 18, 18, 18...","[51, 417, 420, 422, 431, 436, 441, 476, 479, 4..."
...,...,...,...,...
330447,RHY0079,2010-07-19,"[16, 18, 18, 25, 25, 18, 24, 18, 18, 24, 18, 1...","[564, 565, 565, 565, 567, 569, 569, 570, 571, ..."
330448,RHY0079,2010-09-22,"[16, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 1...","[546, 547, 549, 549, 551, 557, 561, 564, 568, ..."
330449,RHY0079,2010-12-14,"[16, 18, 24, 18, 18, 18, 18, 18, 24, 24, 18, 1...","[540, 545, 548, 552, 552, 565, 566, 567, 573, ..."
330450,RHY0079,2011-01-21,"[16, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 1...","[557, 559, 560, 561, 561, 561, 561, 561, 561, ..."


In [51]:
action_id_lists.to_parquet(tmp_dir / 'df_aggregated.parquet')

In [52]:
df = action_id_lists

In [53]:
assert (df['action_id'].apply(len) == df['day_minutes'].apply(len)).all()

In [60]:
# check if day_minutes lists sorted in ascended order
for ls in tqdm(df.day_minutes):
    assert all(ls[i] <= ls[i+1] for i in range(len(ls)-1))

100%|███████████████████████████████████████████████████████████████████████| 330452/330452 [00:05<00:00, 59734.51it/s]


## Answers

In [61]:
answers_dir = Path(r"C:\Datasets\CERT\answers")
assert answers_dir.is_dir()

insider_files = list(answers_dir.glob(f'**/r{dataset_version}*.csv'))

answers_df_ls = []

for filename in insider_files:
    answers_df = pd.read_csv(filename, names=list(range(13)))
    answers_df_ls.append(answers_df)
    
answers_df = pd.concat(answers_df_ls, axis=0, ignore_index=True)

In [62]:
answers_csv = Path("C:/Datasets/CERT/answers/insiders.csv")
assert(answers_csv.is_file())

main_df = pd.read_csv(answers_csv)
main_df = main_df[main_df['dataset'].astype(str) == str(dataset_version)] \
    .drop(['dataset', 'details'], axis=1)

main_df['start'] = pd.to_datetime(main_df['start'], format='%m/%d/%Y %H:%M:%S')
main_df['end'] = pd.to_datetime(main_df['end'], format='%m/%d/%Y %H:%M:%S')

In [63]:
meta_df = pd.DataFrame(columns=['user', 'day'])
meta_df['user'] = meta_df['user'].astype('category')
meta_df['day'] = meta_df['day'].astype('datetime64[ns]')

In [64]:
main_df['user'] = main_df['user'].astype('category')
df = dd.merge(df, main_df, on='user', how='left')

df['malicious'] = (df.day >= df.start) & (df.day <= df.end)
df = df.drop(['start', 'end', 'scenario'], axis=1)

# df.to_parquet(tmp_dir / 'final_df.parquet')

In [66]:
df.to_parquet(preprocessed_dir / f'final_{dataset_version}.parquet')

In [67]:
df.malicious.mean()

0.003915848595257405

In [68]:
df.shape[0]

330452