This notebook processes the 2million+ json files into a cleaned data set.

It uses the following steps:

1. For every file in the ucosp bucket, downloads the json file from s3 and then:
  1. It corrects the raw_data so it can be parsed as rows of json
  1. Each row of json is validated against the defined JSON Schema 
  1. For bad rows, the reason for the failure is saved along with the data, the file number, and the row number
  1. For good rows, further processing is done
1. For good data, columns are processed to give clean dtypes such as int, bool, timestamp
1. Derived columns are computed e.g. the length of the value field
1. The bad and cleaned datasets are saved as parquet on s3 for future retrieval

### Imports and constants

In [1]:
from __future__ import unicode_literals
from __future__ import division

import json
import pandas as pd

from ast import literal_eval
from jinja2 import Template
from jsonschema import Draft4Validator
from pyspark.sql.functions import length
from pyspark.sql.types import TimestampType

In [2]:
BUCKET_NAME = 'Location of bucket where all json blobs are'

### Setup the schema

The schema holds a list of enums for symbol, that are stored in a csv, so we put in at runtime using jinja.

The schema is similar to validate_data_util.py but has some differences:


Things that validate_data_util does, that this validation does not do:

1. This validation does not do all the validation of call_stack checking per line entries. Reason: I could not capture that in a regex pattern
1. This validation allows anything for function name. Reason: There are many invalid values for a function name that are present in the dataset, not just not having a number at the front. So only excluding those with a number at start seemed inconsistent.


Additional things that this validation does:

1. It includes an additional enum for operation `set (failed)`
1. It includes enums for symbol
1. It adds some alternative regexes for timestamps as some were being thrown out unecessarily

In [3]:
# Prepare schema
symbol_counts = pd.read_csv('symbol_counts.csv', names=['symbol', 'count'])
with open('raw_data_schema.template', 'r') as f:
    schema_template = Template(f.read())
schema = literal_eval(
    schema_template.render(
        list_of_symbols=list(symbol_counts.symbol.values)
    )
)

Pretty print it for reference.

You can copy-paste this to a JSON viewer like: http://jsonviewer.stack.hu/ to see it more structured.

In [4]:
print(json.dumps(schema, indent=4, sort_keys=True))

{
    "$schema": "http://json-schema.org/draft-04/schema#", 
    "description": "The schema for a row of the raw data in the crawl catalog. (The final dataset as additional derived columns)", 
    "properties": {
        "arguments": {
            "description": "Any arguments passed to the javascript call. When present takes the form of an object with numeric string keys e.g. '0', '1', up to a max of '9'. Validator does not check for this yet as couldn't find a satisfactory regex", 
            "type": "string"
        }, 
        "call_stack": {
            "description": "69% of calls have no call_stack. Where there is a call_stack, it appears you can: split on '\n' and get the same values that are in func_name, script_url, script_col, script_line - func_name@script_url:script_line:script_col", 
            "pattern": "^$|^(?!undefined).*$|", 
            "type": "string"
        }, 
        "crawl_id": {
            "const": 1, 
            "description": "The ID for this crawl"
  

In [5]:
# Broadcast the schema to all the workers
schema = sc.broadcast(schema)

### Process the data

In [6]:
COLUMNS = (
    'argument_0',
    'argument_1',
    'argument_2',
    'argument_3',
    'argument_4',
    'argument_5',
    'argument_6',
    'argument_7',
    'argument_8',
    'arguments',
    'arguments_n_keys',
    'call_id',
    'call_stack',
    'crawl_id',
    'file_name',
    'func_name',
    'in_iframe',
    'location',
    'operation',
    'script_col',
    'script_line',
    'script_loc_eval',
    'script_url',
    'symbol',
    'time_stamp',
    'value',
    'value_1000',
    'value_len',
    # Validator Columns
    'valid',
    'errors'
)

In [7]:
def get_rows_from_raw_data(raw_data):
    # Setup dummy row incase something goes wrong
    rows = [{'errors': ''}]
    try:
        data = "[" + raw_data[1:-1] + "]"
        rows = json.loads(data)
    except ValueError as e:
        rows[0]['errors'] = 'Error converting raw_data into json: {}'.format(e.message)
    except:
        rows[0]['errors'] = 'Unknown error when reading data.'
    return rows


def get_json_schema_errors(row, validator):
    errors = ''
    for error in validator.iter_errors(row):
        try:
            bad_value_truncated = error.instance[:75] + (error.instance[75:] and '...')
            errors += '{}: {} not valid ||\n'.format(error.path[0], bad_value_truncated)
        except:
            errors += 'Could not parse JSON Schema error.'
    return errors


def convert_to_dict(item):
    item = item.replace('false', 'False')
    item = item.replace('true', 'True')
    item = item.replace('null', 'None')
    try:
        return literal_eval(item)
    except:
        return {}


def process_arguments(row):
    if 'arguments' not in row.keys():
        row['arguments'] = '{}'
    _arguments = convert_to_dict(row['arguments'])
    n_args = len(_arguments)
    row["arguments_n_keys"] = n_args
    for n in range(n_args):
        key = 'argument_{}'.format(n)
        row[key] = _arguments.get(str(n), "")
    return row


def validate_and_process_file(whole_text_file_row):    
    file_name = whole_text_file_row[0].split(BUCKET_NAME)[1]
    raw_data = whole_text_file_row[1]
    
    validator = Draft4Validator(schema.value)
    rows = get_rows_from_raw_data(raw_data)
    
    for i, row in enumerate(rows):
        errors = row.get('errors')

        if errors is None:
            valid = validator.is_valid(row)
            if valid is False:
                errors = get_json_schema_errors(row, validator)
            else:
                errors = ''
        else: 
            valid = False
        
        row['file_name'] = '{}'.format(file_name)
        row['call_id'] = '{}__{}'.format(file_name, i)
        row['valid'] = valid
        row['errors'] = errors
        
        if valid is True:
            row = process_arguments(row)
        
    return rows

In [11]:
%%time
# These lines take a long time when there are lots of files 
# (30s for 100k files, ~30min for 2m+ files)

filesRDD = sc.wholeTextFiles(BUCKET_NAME, minPartitions=3000)

validated_rows = filesRDD.flatMap(validate_and_process_file)

validated_rows_mapped = validated_rows.map(lambda x : tuple(x.get(col, "") for col in COLUMNS))

validated_df = spark.createDataFrame(validated_rows_mapped, schema=COLUMNS)

CPU times: user 248 ms, sys: 96 ms, total: 344 ms
Wall time: 32min


In [12]:
# Get the good rows
good_rows = validated_df[validated_df.valid == True]

# Add the value_len column
good_rows = good_rows.withColumn('value_len', length(good_rows.value))

# Add the column with the initial values from value
good_rows = good_rows.withColumn('value_1000', good_rows.value.substr(0, 1000))

# Set timestamp and remove bad values
good_rows = good_rows.withColumn('time_stamp', good_rows.time_stamp.cast(TimestampType()))

# Keep good_rows at just 3000 partitions
good_rows = good_rows.coalesce(3000)

# Let's look at the dtypes
good_rows.dtypes

[('argument_0', 'string'),
 ('argument_1', 'string'),
 ('argument_2', 'string'),
 ('argument_3', 'string'),
 ('argument_4', 'string'),
 ('argument_5', 'string'),
 ('argument_6', 'string'),
 ('argument_7', 'string'),
 ('argument_8', 'string'),
 ('arguments', 'string'),
 ('arguments_n_keys', 'bigint'),
 ('call_id', 'string'),
 ('call_stack', 'string'),
 ('crawl_id', 'bigint'),
 ('file_name', 'string'),
 ('func_name', 'string'),
 ('in_iframe', 'boolean'),
 ('location', 'string'),
 ('operation', 'string'),
 ('script_col', 'string'),
 ('script_line', 'string'),
 ('script_loc_eval', 'string'),
 ('script_url', 'string'),
 ('symbol', 'string'),
 ('time_stamp', 'timestamp'),
 ('value', 'string'),
 ('value_1000', 'string'),
 ('value_len', 'int'),
 ('valid', 'boolean'),
 ('errors', 'string')]

In [13]:
# Now setup bad_rows
bad_rows = validated_df[validated_df.valid == False]

# Shrink bad_rows to 100 (we expect it to be much smaller)
bad_rows = bad_rows.coalesce(100)

### Save data

In [None]:
good_location = "{}clean.parquet".format(BUCKET_NAME)
bad_location = "{}invalid.parquet".format(BUCKET_NAME)

In [14]:
%%time
good_rows.write.parquet(good_location)

CPU times: user 1.22 s, sys: 556 ms, total: 1.78 s
Wall time: 4h 22min 13s


In [15]:
%%time
bad_rows.write.parquet(bad_location)

CPU times: user 500 ms, sys: 212 ms, total: 712 ms
Wall time: 2h 9min 23s
