# Start

In [1]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import json
import pandas as pd
import os
import tldextract
import hashlib


  data = yaml.load(f.read()) or {}


All sub samples and new samples with new columns/data will be saved under the "DIR" directory to keep things organized. 
As such, the function "save_parquet" and "read_parquet" adds this directory to every parquet name, and I'm using this functions instead of dd.read_parquet/dd.to_parquet direct to ensure the same read and write settings across the notebook. 

NOTE: each section adds its name to the 'FILE_NAME' and saves the new parquet with this name. Because of it, you can run the sections at any order you desire to have the output you need. 

In [2]:
#Initializing client / distributed
# client = Client()
# client

#Create folder to save/read new data
DIR = 'sample0_prep/'
FILE_NAME = 's0'

if not os.path.exists(DIR):
    os.makedirs(DIR)

If no "recalculate_partition" is passed on, it will not recalculate the partitions. It is not mandatory, but good if you are significantly reducing the size of the data. 

In [3]:
#Save a DF to a parquet
def save_parquet(df, name, recalculate_partition=False):
    with ProgressBar():
        #DF.REPARTITION copyed from: https://stackoverflow.com/questions/44657631/strategy-for-partitioning-dask-dataframes-efficiently
        if recalculate_partition:
            n = 1+df.memory_usage(deep=True).sum().compute() // (1000 * 1000 * 100)
            print("Npartition: ", n)
            df.repartition(npartitions= n).to_parquet(DIR + name + '.parquet', engine="pyarrow")
        else:
            df.to_parquet(DIR + name + '.parquet', engine="pyarrow")
        
        
def read_parquet(name):
    return dd.read_parquet(DIR + name + '.parquet', engine='pyarrow')

# Data
Using 10% sample and self produced samples
 - 10% sample has 11292867 rows
 - Filtered by value_len > df.mean() has 499805 rows

In [4]:
#Original sample 
df = dd.read_parquet('sample_0.parquet', 
                     engine='pyarrow', 
                     columns=['value_1000', 'value', 'value_len', 'symbol', 'script_url', 'location', 'operation'])

# df.astype({'value_1000': str, 'value': str,'value_len': int,'symbol': int,'script_url': str})
df.columns

Index(['value_1000', 'value', 'value_len', 'symbol', 'script_url', 'location',
       'operation'],
      dtype='object')

## DF overview
Some overview about the sample: 
- Mean: 1356.97,
- Min: 0,
- Max: 4496861
- Std: 26310.62

In [5]:
with ProgressBar():
    df_mean = df['value_len'].mean()
    df_min = df['value_len'].min()
    df_max = df['value_len'].max()
    df_std = df['value_len'].std()
    df_len = df['value_len'].count()
    (df_mean, df_min, df_max, df_std, df_len) = dd.compute(df_mean, df_min, df_max, df_std, df_len);
    print(df_mean, df_min, df_max, df_std, df_len)

[########################################] | 100% Completed | 58.7s
1356.9776628910975 0 4496861 26310.62140481331 11292867


# Add Column: Domains
The following code is copyed from this same project: ~/analyses/hello_world.ipynb

It uses the data saved from the last section
This section is dedicated to extract the domain of the columns "location" and "script_url" and add it as new columns "location_domain" and "script_domain"

In [6]:
FILE_NAME += '_domains'
print('Notebook name: ', FILE_NAME)

Notebook name:  s0_domains


In [7]:
def extract_domain(url):
    """Use tldextract to return the base domain from a url"""
    try:
        extracted = tldextract.extract(url)
        return '{}.{}'.format(extracted.domain, extracted.suffix)
    except Exception as e:
        return 'ERROR'

In [8]:
df.astype({'value_1000': str, 'value': str,'value_len': int,'symbol': int,'script_url': str, 'location': str})
df['location_domain'] = df.location.apply(extract_domain, meta='O')
df['script_domain'] = df.script_url.apply(extract_domain, meta='O')

In [9]:
#save
save_parquet(df=df, name=FILE_NAME)

[########################################] | 100% Completed |  6min 23.0s


In [10]:
#read
df = read_parquet(FILE_NAME)
df[['location_domain',  'location', 'script_domain', 'script_url']].head()

Unnamed: 0,location_domain,location,script_domain,script_url
0,vk.com,https://vk.com/widget_comments.php?app=2297596...,vk.com,https://vk.com/js/api/xdm.js?1449919642
1,vk.com,https://vk.com/widget_comments.php?app=2297596...,vk.com,https://vk.com/js/api/xdm.js?1449919642
2,vk.com,https://vk.com/widget_comments.php?app=2297596...,vk.com,https://vk.com/js/al/aes_light.js?592436914
3,baidu.com,https://pos.baidu.com/s?hei=70&wid=670&di=u313...,baidustatic.com,https://cpro.baidustatic.com/cpro/ui/noexpire/...
4,serienjunkies.org,http://serienjunkies.org/smilf/smilf-season-1-...,google.com,https://apis.google.com/js/plusone.js?_=151338...


# Add Column:  is_json

After manual initial analysis I have think that the huge values are json structured, to validate that I included an new column that is a boolean value with the validation of json

After simple validation of value is a json or not, boolean value will be saved on a new column named "is_json"


In [11]:
FILE_NAME += '_isjson'
print('Notebook name: ', FILE_NAME)

Notebook name:  s0_domains_isjson


In [12]:
def is_json(myjson):
    if (myjson == '{}'):
        #would be counted as valid, but its an empty json
        return False
    try:
        #Eliminate false positives
        return (type(json.loads(myjson)) == dict)
    except ValueError as e:
        return False

In [13]:
df['is_json'] = df['value'].apply(is_json, meta=False)

In [14]:
#save
save_parquet(df=df, name=FILE_NAME)

[########################################] | 100% Completed |  4min 21.6s


In [15]:
#read
df = read_parquet(FILE_NAME)
df[['value_1000', 'is_json']].head()

Unnamed: 0,value_1000,is_json
0,fXDcab74,False
1,fXDcab74,False
2,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,False
3,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,False
4,_ga=GA1.2.1529583939.1513387469; _gid=GA1.2.17...,False


# Add Column:  value_md5
Include new columns called "value_md5" that is the md5 of value column

In [16]:
FILE_NAME += '_md5'
print('Notebook name: ', FILE_NAME)

Notebook name:  s0_domains_isjson_md5


In [17]:
def md5(value):
    return hashlib.md5(value.encode('utf-8')).hexdigest()

In [18]:
df['value_md5'] = df['value'].apply(md5, meta='O')

In [19]:
#save
save_parquet(df=df, name=FILE_NAME)

[########################################] | 100% Completed |  2min 45.9s


In [20]:
#read
df = read_parquet(FILE_NAME)
df[['value_1000', 'value_md5']].head()

Unnamed: 0,value_1000,value_md5
0,fXDcab74,7df64196939a8b6ff11482ed6df4b25a
1,fXDcab74,7df64196939a8b6ff11482ed6df4b25a
2,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,bc0aac3569031babbd73e069947a4b12
3,Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko...,bc0aac3569031babbd73e069947a4b12
4,_ga=GA1.2.1529583939.1513387469; _gid=GA1.2.17...,324dd29b8c6438bc700ac2d85e33f12d


# Saving other possible usefull filtered samples to future analyses

## value_len > df_mean
1356 is the value_len mean

To filter the data into something that is more interesting to this task I decided to only work with values that are at above the mean.

All values above the mean count up to 499805 rows. That is just 4,42% of the whole sample, and a lot easier to work on. 

In [21]:
name = FILE_NAME + '_above_mean'
print('Notebook name: ', name)

Notebook name:  s0_domains_isjson_md5_above_mean


In [22]:
#Save
save_parquet(df= df[df['value_len'] > df_mean], name= name, recalculate_partition=True)

[########################################] | 100% Completed | 50.5s
Npartition:  245
[########################################] | 100% Completed |  1min 38.3s


In [23]:
#Read
df = read_parquet(name)
df.columns

Index(['value_1000', 'value', 'value_len', 'symbol', 'script_url', 'location',
       'operation', 'location_domain', 'script_domain', 'is_json',
       'value_md5'],
      dtype='object')

## Filter to parquet containing only JSON 

In [24]:
name = FILE_NAME + '_JSON_ONLY'
print('Notebook name: ', name)

Notebook name:  s0_domains_isjson_md5_JSON_ONLY


In [25]:
save_parquet(df=df[df['is_json'] == True], name=name, recalculate_partition=True)

[########################################] | 100% Completed | 28.9s
Npartition:  233
[########################################] | 100% Completed |  1min  5.0s


In [26]:
#read all_json_above_mean
df = read_parquet(name)
df[['value_1000', 'is_json']].head()

Unnamed: 0,value_1000,is_json
0,"{""im-settings"":""{\""val\"":{\""settings\"":{\""Site...",True
1,"{""APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c...",True
2,"{""APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c...",True
3,"{""dueljs_channel_comm"":""[{\""id\"":4734405521279...",True
4,"{""dueljs_channel_comm"":""[{\""id\"":4734405521279...",True


### Add json keys and schema columns

Extract the top level keys, sort them and add as a list into another column named 'json_keys'
Will be using "https://github.com/rnd0101/json_schema_inferencer" to guess the json schema and save it into another column called "json_schema"

In [27]:
name += '_schema_keys'
print('Notebook name: ', name)

Notebook name:  s0_domains_isjson_md5_JSON_ONLY_schema_keys


In [28]:
from json_schema_inferencer.guess_json_schema import guess_schema

def jsonSchema(myjson):
    try:
        dct = json.loads(myjson)
        value = guess_schema(dct)
        l = list(value['properties'])
        l.sort()
        return l
    except ValueError as e:
        return list()
    
def jsonKeys(myjson):
    try:
        dct = json.loads(myjson)
        keys = list(dct.keys())
        keys.sort()
        return keys
    except ValueError as e:
        return list()

In [29]:
df['json_keys'] = df.value.apply(jsonKeys, meta='O')
df['json_schema'] = df.value.apply(jsonSchema, meta='O')
save_parquet(df=df, name=name)


[########################################] | 100% Completed |  4min 18.1s


In [30]:
#read 
df = read_parquet(name)
df[['value_1000', 'json_keys', 'json_schema']].head()

Unnamed: 0,value_1000,json_keys,json_schema
0,"{""im-settings"":""{\""val\"":{\""settings\"":{\""Site...",[im-settings],[im-settings]
1,"{""APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c...",[APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c],[APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c]
2,"{""APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c...",[APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c],[APLUS_S_CORE_0.17.12_20171214163401_2ee09a0c]
3,"{""dueljs_channel_comm"":""[{\""id\"":4734405521279...","[LastSearch, LastSearch_e, dueljs_channel_comm...","[LastSearch, LastSearch_e, dueljs_channel_comm..."
4,"{""dueljs_channel_comm"":""[{\""id\"":4734405521279...","[LastSearch, LastSearch_e, dueljs_channel_comm...","[LastSearch, LastSearch_e, dueljs_channel_comm..."


## All NON json above the mean

In [37]:
name = FILE_NAME + '_nonJSON_ONLY'
df = read_parquet(FILE_NAME)
print('Notebook name: ', name)

Notebook name:  s0_domains_isjson_md5_nonJSON_ONLY


In [38]:
save_parquet(df=df[df['is_json'] == False], name=name, recalculate_partition=True)

[########################################] | 100% Completed |  1min 54.5s
Npartition:  116
[########################################] | 100% Completed |  1min 13.1s


In [35]:
#read 
df = read_parquet(name)
df.head()



Unnamed: 0,value_1000,value,value_len,symbol,script_url,location,operation,location_domain,script_domain,is_json,value_md5,json_keys,json_schema
