In [1]:
import sparkhub.client as sh

spark = sh.cluster(name="thirst-quenching-tail", max_workers=50)

sh.ui()

In [23]:
from numba import jit
import numpy as np
import pandas as pd
from pyspark.sql import functions as F, types as T

In [None]:
# code snippets used in the slides

# # monotonically increasing ids
# mono_ids = data\
#     .select('col').distinct()\
#     .withColumn('col_id', F.monotonically_increasing_id())
# data\
#     .join(F.broadcast(mono_ids), on='col')\
#     .drop('col')

# # unique values counts
# data\
#     .groupby('id', 'value').count()\
#     .withColumnRenamed('count', 'value_count')\
#     .withColumn('value_count_pair', F.struct('value', 'value_count'))\
#     .groupby('id')\
#     .agg(F.collect_list('value_count_pair').alias('value_count_pairs'))

# # time series data
# data\
#     .withColumn('time_transaction_pair', F.struct('time', 'transaction'))\
#     .groupby('id')\
#     .agg(F.collect_list('time_transaction_pair').alias('time_transaction_pairs'))

At quantcast, one of our most expensive queries look into pairwise interactions between pairs of events. A lot of the queries roughly look like
* Aggregating all of the data
* Determining some `partition_key` that splits the data into sufficiently small parititions such that looking at pairwise interactions is computationally feasible.
* Look at all of the pairwise interactions between events for each `partition_key`
* Extract some useful information from each of these pair of events. Sometimes with machine learning models

### Overview of our query evolution
In this section we show:
* An example of processing these pairwise interactions at scale on sanitized data
* Examples of how we applied the following optimizations:
    * Using efficient python packages to improve over our pandas implementation by 10x
    * Redesigning our query to use scalar UDFs instead of grouped map UDFs to improve our implementation by 12x

In [26]:
if True:

    # parameters for sanitized data
    n = 1000000
    partition_size = 20
    d = 20

    # function that generates random numbers in the range of [-1, 1)
    def generate_random_vector(size):
        return (2 * np.random.random(size)) - 1

    # generate sample data
    sample_data_1M = pd.DataFrame([[i // partition_size, i] for i in range(n)], columns=['partition_key', 'event_id'])
    sample_data_1M['feature_vector'] = generate_random_vector((n, d)).tolist()

    sample_data_1M.head()


In [27]:
 sample_data_1M.head()

Unnamed: 0,partition_key,event_id,feature_vector
0,0,0,"[-0.5165411328143239, -0.7697248833258343, 0.2..."
1,0,1,"[-0.3785918054897246, -0.8154872951239256, 0.6..."
2,0,2,"[-0.6884332215238746, -0.532683039200122, 0.30..."
3,0,3,"[0.8309149840909955, -0.896161813570687, 0.511..."
4,0,4,"[0.7258664613993673, -0.1955426784967571, -0.8..."


In [98]:
# sample machine learning model implementation that operates on these pairs of events
hidden_layer_size = 10
first_layer = generate_random_vector((d * 2, hidden_layer_size))
final_layer = generate_random_vector(hidden_layer_size)
sigmoid = lambda x: 1 / (1 + np.exp(-x))
def score_feature_pair(feature_vector1, feature_vector2):
    hidden_layer = np.dot(np.concatenate([feature_vector1, feature_vector2]), first_layer)
    non_linear_result = np.maximum(hidden_layer, 0)
    return sigmoid(np.dot(non_linear_result, final_layer))



In [99]:
# create a spark dataframe for this data
sample_data_1M_df = spark.createDataFrame(sample_data_1M)

In [115]:
# v1 of our code. Strategy was to use pandas grouped map UDFs
# on our small sample this takes 20 min CPU time

    # schema of our UDF
    process_partition_schema = T.StructType([
        T.StructField('partition_key', T.IntegerType()),
        T.StructField('event_id1', T.IntegerType()),
        T.StructField('event_id2', T.IntegerType()),
        T.StructField('model_score', T.DoubleType()),
    ])

    # processes a single partition_key of data
    def process_partition(data):
        output = []

        # uses pandas iterrows to do the double for loop and score everything
        partition_key = data.iloc[0]['partition_key']
        for i, row1 in data.iterrows():
            event_id1 = row1['event_id']
            for j, row2 in data[i+1:].iterrows():
                event_id2 = row2['event_id']
                model_score = score_feature_pair(row1['feature_vector'], row2['feature_vector'])
                output.append([partition_key, event_id1, event_id2, model_score])

        return pd.DataFrame(output, columns=process_partition_schema.names)

    sample_data_1M_df.groupby('partition_key').applyInPandas(process_partition, process_partition_schema)\
        .coalesce(1)\
        .write.mode('overwrite').parquet('/qfs/tmp/mtong/spark_demo_base')
    



DataFrame[partition_key: int, event_id1: int, event_id2: int, model_score: double]

In [116]:
# Sample scoring function
# Note how we are using numpy types instead of pandas types.
# This already provides us a 6x speedup over this dataset
process_partition_schema = T.StructType([
    T.StructField('partition_key', T.IntegerType()),
    T.StructField('event_id1', T.IntegerType()),
    T.StructField('event_id2', T.IntegerType()),
    T.StructField('model_score', T.DoubleType()),
])

def process_partition(data):
    output = []

    # easy way to convert pandas series to numpy vectors and matrices
    event_ids = data['event_id'].values
    # convert to numpy matrix so each individual element is a numpy vector
    feature_matrix = np.array(data['feature_vector'].values.tolist())

    # use of enumerate and zip instead of using pandas iterrows
    partition_key = data.iloc[0]['partition_key']
    for i, (event_id1, feature_vector1) in enumerate(zip(event_ids, feature_matrix)):
        for event_id2, feature_vector2 in zip(event_ids[i+1:], feature_matrix[i+1:]):
            model_score = score_feature_pair(feature_vector1, feature_vector2)
            output.append([partition_key, event_id1, event_id2, model_score])

    return pd.DataFrame(output, columns=process_partition_schema.names)

sample_data_1M_df.groupby('partition_key').applyInPandas(process_partition, process_partition_schema)\
    .coalesce(1)\
    .write.mode('overwrite').parquet('/qfs/tmp/mtong/spark_demo_numpy')

In [117]:
# v3 of our code.
# Main insight is that we can process multiple partition_keys at once with pandas udfs
# This runs in about 10x faster than the previous version of our solution


    process_partition_schema = T.StructType([
        T.StructField('partition_key', T.ArrayType(T.IntegerType())),
        T.StructField('event_id1', T.ArrayType(T.IntegerType())),
        T.StructField('event_id2', T.ArrayType(T.IntegerType())),
        T.StructField('model_score', T.ArrayType(T.DoubleType())),
    ])

    # our first iteration of attempting pandas UDFs was to write a function for processing each row
    def process_partition(partition_key, event_ids, feature_matrix):
        feature_matrix = np.array(feature_matrix.tolist())
        output = []
        for i, (event_id1, feature_vector1) in enumerate(zip(event_ids, feature_matrix)):
            for event_id2, feature_vector2 in zip(event_ids[i+1:], feature_matrix[i+1:]):
                model_score = score_feature_pair(feature_vector1, feature_vector2)
                output.append([partition_key, event_id1, event_id2, model_score])
        # massage columns so they are arrays of fields
        return list(zip(*output))

    # and a separate function that could process all rows
    def process_partition_batch(partition_key_series, event_ids_series, feature_matrix_series):
        results = [process_partition(*args) for args in zip(partition_key_series, event_ids_series, feature_matrix_series)]
        return pd.DataFrame(results, columns=process_partition_schema.names)

    process_partition_udf = F.pandas_udf(process_partition_batch, process_partition_schema)

# with pandas UDFs that means we now have to write our queries such
# that each partition_key occupies a single row
# we also need to massage the data format a bit

    sample_data_1M_df\
        .withColumn('event_id_feature_pair', F.struct('event_id', 'feature_vector'))\
        .groupby('partition_key')\
        .agg(F.collect_list('event_id_feature_pair').alias('event_id_feature_pairs'))\
        .withColumn('partition_scores', process_partition_udf(
            'partition_key', 'event_id_feature_pairs.event_id', 'event_id_feature_pairs.feature_vector'))\
        .repartition(1)\
        .withColumn('partition_scores_zipped', F.arrays_zip(*[f'partition_scores.{col}' for col in process_partition_schema.names]))\
        .withColumn('partition_scores_exploded', F.explode('partition_scores_zipped'))\
        .select(*[F.col(f'partition_scores_exploded.{i}').alias(col) for i, col in enumerate(process_partition_schema.names)])\
        .write.mode('overwrite').parquet('/qfs/tmp/mtong/spark_demo_scalar_udf')
    


### Overview of individual function improvements
In this section we show:
* How scoring things in batches improves our overall scoring algorithm
* An example of how we use jit to improve our model scoring


In [139]:
# intialize some data
# typically in each pandas_udf batch we score ~100k rows at a time
feature_matrix1 = np.random.random((100000, d))
feature_matrix2 = np.random.random((100000, d))

In [154]:
# baseline method, score each row individually
sigmoid = lambda x: 1 / (1 + np.exp(-x))
def score_feature_pair(feature_vector1, feature_vector2):
    hidden_layer = np.dot(np.concatenate([feature_vector1, feature_vector2]), first_layer)
    non_linear_result = np.maximum(hidden_layer, 0)
    return sigmoid(np.dot(non_linear_result, final_layer))

%timeit _ = [score_feature_pair(*args) for args in zip(feature_matrix1, feature_matrix2)]

837 ms ± 23.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [155]:
# by writing scoring as a batch operation, we can take advantage of numpy optimizations
# this makes the function 
def score_feature_matrix(feature_matrix1, feature_matrix2):
    hidden_layer = np.dot(np.hstack([feature_matrix1, feature_matrix2]), first_layer)
    non_linear_result = np.maximum(hidden_layer, 0)
    return sigmoid(np.dot(non_linear_result, final_layer))

%timeit _ = score_feature_matrix(feature_matrix1, feature_matrix2)

101 ms ± 11.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [163]:
# one of the downsides to using jit is that not all numpy functions are supported
# so sometimes you have to do workarounds
# however there is almost always a way to rewrite numpy functions to faster jit versions
@jit(nopython=True)
def sigmoid_jit(vector):
    return 1 / (1 + np.exp(-vector))

@jit(nopython=True)
def score_feature_matrix_jit(stacked_matrix):
    hidden_layer = np.dot(stacked_matrix, first_layer)
    non_linear_result = np.maximum(hidden_layer, 0)
    return sigmoid_jit(np.dot(non_linear_result, final_layer))

# compile the jit function
_ = score_feature_matrix_jit(np.hstack([feature_matrix1, feature_matrix2]))
%timeit _ = score_feature_matrix_jit(np.hstack([feature_matrix1, feature_matrix2]))

70 ms ± 5.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
