In [6]:
import concurrent.futures
import dask.dataframe as dd
import json
import numpy as np
import pandas as pd

from ast import literal_eval
from dask.diagnostics import ProgressBar
from datetime import datetime
from pandas.api.types import CategoricalDtype
from path import Path
from tqdm import tqdm  # Gives us a nice progress bar

PROJECT_DIR = Path.getcwd().parent  # Could assert some things here to check we got the right path
CACHE_OLD_DIR = Path.joinpath(PROJECT_DIR, 'cache')  # Where the originally downloaded files are
CLEAN_DIR = Path.joinpath(PROJECT_DIR, 'clean_3000')  # Where the parquet files are going to live
CACHE_NEW_DIR = Path.joinpath(PROJECT_DIR, 'cache_new.parquet')  # Where the parquet files are going to live

N_PARTITIONS = 2999  # This was decided after trial and error

In [2]:
operation_categorical_type = CategoricalDtype(categories=["get", "set", "call", "set (failed)"])

symbol_counts = pd.read_csv('symbol_counts.csv', names=['symbol', 'count'])
symbol_categorical_type = CategoricalDtype(categories=symbol_counts.symbol.values)

## Make even(ish) partitions based on call counts.

This still doesn't make them even due to "value" field being so variable. But it's better than just slicing up the list of files.

In [3]:
index_with_counts = pd.read_csv('file_index_with_counts.csv.gz')
index_with_counts = index_with_counts.rename(columns=dict(crawl_id='call_count'))
index_with_counts.head()

Unnamed: 0,file_name,call_count
0,1_00001314358470f0c99914d5c7af0cd89248e54883ac...,6
1,1_000014b53a60c645e3ac9bde6bae020430c930b3cc59...,30
2,1_00003e3765a73da45db5265de2b22424e025d61380f7...,4
3,1_00004636d8310609e710934f194bfb41a5f0ac7ed5e0...,78
4,1_00004b8315fd1954f06dd80b85ebc61f7ab006785cd3...,64


In [4]:
index_with_counts['cum_call_count'] = index_with_counts.call_count.cumsum()
index_with_counts.head(10)

Unnamed: 0,file_name,call_count,cum_call_count
0,1_00001314358470f0c99914d5c7af0cd89248e54883ac...,6,6
1,1_000014b53a60c645e3ac9bde6bae020430c930b3cc59...,30,36
2,1_00003e3765a73da45db5265de2b22424e025d61380f7...,4,40
3,1_00004636d8310609e710934f194bfb41a5f0ac7ed5e0...,78,118
4,1_00004b8315fd1954f06dd80b85ebc61f7ab006785cd3...,64,182
5,1_00004cab8dbbf5a10ec8f4fe3d7a816c69a69ff6dcfb...,32,214
6,1_000052c9c1a2bcf00536c9b3f4132222339f74379067...,10,224
7,1_000055875d91ca961b32d7e535548ed87e50de528399...,27,251
8,1_00005baacbbbd2ce9442b6afa0c914506329f669a5b0...,102,353
9,1_00006011493ed94fb8010cead84ee610cdbece5de961...,3,356


In [5]:
total_calls = index_with_counts.cum_call_count.max()
n_per_partition = total_calls / N_PARTITIONS
print('Total calls: {:,}'.format(total_calls))
print('Calls per partition: ~{:,.0f}'.format(n_per_partition))

Total calls: 113,790,736
Calls per partition: ~37,943


In [7]:
index_with_counts['planned_partition'] = np.floor_divide(index_with_counts.cum_call_count, n_per_partition)
index_with_counts.head()

Unnamed: 0,file_name,call_count,cum_call_count,planned_partition
0,1_00001314358470f0c99914d5c7af0cd89248e54883ac...,6,6,0.0
1,1_000014b53a60c645e3ac9bde6bae020430c930b3cc59...,30,36,0.0
2,1_00003e3765a73da45db5265de2b22424e025d61380f7...,4,40,0.0
3,1_00004636d8310609e710934f194bfb41a5f0ac7ed5e0...,78,118,0.0
4,1_00004b8315fd1954f06dd80b85ebc61f7ab006785cd3...,64,182,0.0


In [8]:
index_with_counts.tail()

Unnamed: 0,file_name,call_count,cum_call_count,planned_partition
2059730,1_9ff1b3f34e786b83451d70a6d88cb279ec46b2d3bcef...,1062,113790582,2998.0
2059731,1_9ff1bda5513936fdecb3148b3584c8f7919e3664636d...,3,113790585,2998.0
2059732,1_9ff1c07c2ffdcd30289f48e40fc71228ed4ea6006350...,6,113790591,2998.0
2059733,1_9ff1ce65db76da369f56da383646fb50de92c9053344...,4,113790595,2998.0
2059734,1_9ff1d64487df95ecc523893aee10d7770bddf8628772...,141,113790736,2998.0


In [9]:
# Check penultimate break
rough_break = index_with_counts.cum_call_count.values[-1] - n_per_partition
index_with_counts[
    (index_with_counts.cum_call_count < rough_break + 300) &
    (index_with_counts.cum_call_count > rough_break - 200)
]

Unnamed: 0,file_name,call_count,cum_call_count,planned_partition
2059020,1_9fdb773b09566b26e533d1b8feef5b01e03e65215b64...,83,113752617,2997.0
2059021,1_9fdb7eb13238d265bd7bb9e2cb9375543237406678a5...,61,113752678,2997.0
2059022,1_9fdb805360ac920ee97207239131fd6f7d16bb3b3056...,1,113752679,2997.0
2059023,1_9fdb85ab7613e9913cf60d07ed35dab8a649adcb0eea...,5,113752684,2997.0
2059024,1_9fdb96cfcc6383a292f892a28fdd1517a804b5c706a1...,3,113752687,2997.0
2059025,1_9fdb98af93156db3998199949c1449b9cdf10807e4e5...,6,113752693,2997.0
2059026,1_9fdbadb39d698da23f286a99e939d507e0faf2b813e6...,3,113752696,2997.0
2059027,1_9fdbc479d4379ce6d290a1b2d572a20eb869a947d040...,145,113752841,2998.0
2059028,1_9fdbc5144abfbafdf1b69dcdf737e152b9062fed8e6d...,2,113752843,2998.0
2059029,1_9fdbda3f578ae9afc4d59272a7c8a1d860ed8105ffd0...,28,113752871,2998.0


## Process data

In [12]:
fix_data = lambda x:  "[" + x[1:-1] + "]"

def get_files_for_split(split_number):
    filtered = index_with_counts[index_with_counts.planned_partition == split_number]
    return filtered.file_name.values
    
def get_data_from_file(file_name):
    fp = Path.joinpath(CACHE_OLD_DIR, file_name)
    with open(fp, 'r') as f:
        raw_data = f.read()
    data = json.loads(fix_data(raw_data))
    return data

def convert_to_dict(item):
    item = item.replace('false', 'False')
    item = item.replace('true', 'True')
    item = item.replace('null', 'None')
    try:
        return literal_eval(item)
    except:
        return {}

def make_df_from_data(data, file_name):
    df = pd.DataFrame.from_records(data)
    df = df.reset_index()
    df['file_name'] = file_name
    # Make a unique call_id
    df = df.rename(columns=dict(index='call_id'))
    call_id_format = '{file_name}__{index:06d}'
    short_file_name = file_name.split('.json')[0]
    df.call_id = df.call_id.apply(lambda x: call_id_format.format(file_name=short_file_name, index=x))
    # Make a value len and initial value
    df['value_len'] = df.value.str.len()
    df['value_1000'] = df.value.str.slice(0, 1000)
    # Make a timestamp
    df['time_stamp'] = pd.to_datetime(df.time_stamp, errors='coerce')
    # Drop the crawl id that's always 1
    df = df.drop('crawl_id', axis=1)
    # Make categorical
    df['operation'] = df.operation.astype(operation_categorical_type)
    df['symbol'] = df.symbol.astype(symbol_categorical_type)
    # Make script_col and script_line numeric, an drop bad values
    df['script_col'] = pd.to_numeric(df.script_col, errors='coerce')
    df['script_line'] = pd.to_numeric(df.script_line, errors='coerce')
    df = df.dropna(subset=['script_col', 'script_line'])
    df['script_col'] = df['script_col'].astype(int)
    df['script_line'] = df['script_line'].astype(int)
    # Parse arguments
    if 'arguments' in df.columns:
        df['arguments'] = df.arguments.astype(str)
        df['_arg_as_dict'] = df.arguments.apply(convert_to_dict)
        df['arguments_n_keys'] = df._arg_as_dict.apply(len)
        # Based on previous computation of max_keys_count
        for n in range(9):
            key = 'argument_{}'.format(n)
            df[key] = df._arg_as_dict.apply(lambda x: x.get(str(n))).astype(str)
        df = df.drop('_arg_as_dict', axis=1)
    return df

def get_df(file_name):
    data = get_data_from_file(file_name)
    df = make_df_from_data(data, file_name)
    return df

### Test single write and read

In [13]:
%%time
dfs = []
for file_name in get_files_for_split(100):
    dfs.append(get_df(file_name))

CPU times: user 10.4 s, sys: 245 ms, total: 10.7 s
Wall time: 10.9 s


In [14]:
%%time
all_df = pd.concat(dfs, ignore_index=True)
all_df.arguments_n_keys = all_df.arguments_n_keys.fillna(value=0).astype(int)

CPU times: user 1.52 s, sys: 16 ms, total: 1.54 s
Wall time: 1.53 s


In [15]:
len(all_df)

37872

In [16]:
all_df.dtypes

argument_0                  object
argument_1                  object
argument_2                  object
argument_3                  object
argument_4                  object
argument_5                  object
argument_6                  object
argument_7                  object
argument_8                  object
arguments                   object
arguments_n_keys             int64
call_id                     object
call_stack                  object
file_name                   object
func_name                   object
in_iframe                     bool
location                    object
operation                 category
script_col                   int64
script_line                  int64
script_loc_eval             object
script_url                  object
symbol                    category
time_stamp          datetime64[ns]
value                       object
value_1000                  object
value_len                    int64
dtype: object

In [17]:
all_df.head()

Unnamed: 0,argument_0,argument_1,argument_2,argument_3,argument_4,argument_5,argument_6,argument_7,argument_8,arguments,...,operation,script_col,script_line,script_loc_eval,script_url,symbol,time_stamp,value,value_1000,value_len
0,,,,,,,,,,,...,get,585,131,,https://pagead2.googlesyndication.com/pagead/o...,window.navigator.userAgent,2017-12-16 04:41:01.150,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
1,,,,,,,,,,,...,get,405,163,,https://apis.google.com/_/scs/apps-static/_/js...,window.name,2017-12-16 04:41:01.150,,,0
2,,,,,,,,,,,...,get,898,101,,https://kitchen.juicer.cc/?color=LyfEvcP/mFg=,window.navigator.userAgent,2017-12-16 04:41:01.171,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
3,,,,,,,,,,,...,get,926,101,,https://kitchen.juicer.cc/?color=LyfEvcP/mFg=,window.navigator.language,2017-12-16 04:41:01.171,en-US,en-US,5
4,,,,,,,,,,,...,get,953,101,,https://kitchen.juicer.cc/?color=LyfEvcP/mFg=,window.screen.colorDepth,2017-12-16 04:41:01.171,24,24,2


In [18]:
%%time
all_df.to_parquet(Path.joinpath(CLEAN_DIR, 'sample_1.pqt'))

CPU times: user 1.26 s, sys: 115 ms, total: 1.37 s
Wall time: 1.39 s


In [19]:
%%time
df = dd.read_parquet(Path.joinpath(CLEAN_DIR, 'sample_1.pqt'))

CPU times: user 13.4 ms, sys: 1.95 ms, total: 15.3 ms
Wall time: 14.8 ms


In [20]:
df.tail()

Unnamed: 0,argument_0,argument_1,argument_2,argument_3,argument_4,argument_5,argument_6,argument_7,argument_8,arguments,...,operation,script_col,script_line,script_loc_eval,script_url,symbol,time_stamp,value,value_1000,value_len
37867,,,,,,,,,,,...,get,8423,1,,https://tpc.googlesyndication.com/pagead/js/r2...,window.navigator.userAgent,2017-12-16 21:09:23.560,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
37868,,,,,,,,,,,...,get,89,17,,https://googleads.g.doubleclick.net/pagead/ads...,window.navigator.userAgent,2017-12-16 21:09:23.598,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
37869,,,,,,,,,,,...,get,7772,1,,https://tpc.googlesyndication.com/pagead/js/r2...,window.navigator.userAgent,2017-12-16 21:09:23.640,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
37870,,,,,,,,,,,...,get,6589,1,,https://tpc.googlesyndication.com/pagead/js/r2...,window.navigator.userAgent,2017-12-16 21:09:23.657,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68
37871,,,,,,,,,,,...,get,71642,1,,https://pagead2.googlesyndication.com/pagead/j...,window.navigator.userAgent,2017-12-16 11:35:07.436,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,68


In [21]:
df.dtypes

argument_0                  object
argument_1                  object
argument_2                  object
argument_3                  object
argument_4                  object
argument_5                  object
argument_6                  object
argument_7                  object
argument_8                  object
arguments                   object
arguments_n_keys             int64
call_id                     object
call_stack                  object
file_name                   object
func_name                   object
in_iframe                     bool
location                    object
operation                 category
script_col                   int64
script_line                  int64
script_loc_eval             object
script_url                  object
symbol                    category
time_stamp          datetime64[ns]
value                       object
value_1000                  object
value_len                    int64
dtype: object

## Run all

Don't forget to delete old sample_1.pqt before we get started

In [22]:
def process_chunk(split_number):
    dfs = []
    for file_name in get_files_for_split(split_number):
        dfs.append(get_df(file_name))
    all_df = pd.concat(dfs, ignore_index=True)
    all_df.to_parquet(Path.joinpath(CLEAN_DIR, 'sample_{}.pqt'.format(split_number)))

In [23]:
errored = []
future_map = {}
# My machine has 8 cores. Choose a number that's appropriate for your machine. Each worker
# is using about 1GB of RAM on my machine, so that's also something to think about.
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    for n in range(N_PARTITIONS):
        future = executor.submit(process_chunk, n)
        future_map[future] = n
    for future in tqdm(concurrent.futures.as_completed(future_map), total=N_PARTITIONS):
        n = future_map[future]
        try:
            future.result()
        except:
            errored.append(n)

100%|██████████| 2999/2999 [2:19:47<00:00,  2.80s/it]  


Finally we reopen and resave with dask so that dask stores meta data. In addition, we need to change arguments_n_keys column to be int

In [24]:
ddf = dd.read_parquet('../clean_3000/sample_*.pqt')  # Note we sadly can't use Path, which is good practice, because of * syntax
ddf.dtypes

argument_0                  object
argument_1                  object
argument_2                  object
argument_3                  object
argument_4                  object
argument_5                  object
argument_6                  object
argument_7                  object
argument_8                  object
arguments                   object
arguments_n_keys           float64
call_id                     object
call_stack                  object
file_name                   object
func_name                   object
in_iframe                     bool
location                    object
operation                 category
script_col                   int64
script_line                  int64
script_loc_eval             object
script_url                  object
symbol                    category
time_stamp          datetime64[ns]
value                       object
value_1000                  object
value_len                    int64
dtype: object

In [25]:
# If necessary repartition
# ddf = ddf.repartition(n_partitions=3000)

with ProgressBar():
    ddf.to_parquet(CACHE_NEW_DIR, compression='snappy')

[########################################] | 100% Completed | 30min 27.8s


Clean-up behind you by deleting the extra directory of data.

In [None]:
# Single threaded

for split_number in range(2000):
    print('Iteration {}'.format(split_number))
    print('Timestamp: {:%b-%d %H:%M:%S}'.format(datetime.now()))
    dfs = []
    for file_name in get_files_for_split(split_number):
        dfs.append(get_df(file_name))
    all_df = pd.concat(dfs, ignore_index=True)
    all_df.to_parquet(Path.joinpath(CACHE_NEW_DIR, 'sample_{}.pqt'.format(split_number)))