## Prerequisites for spark and arrow

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StructField, StructType, DoubleType, IntegerType
import pyspark.pandas as ps
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

In [2]:
# setup for local testing - comment in case of databricks
builder = SparkSession.builder.master("local[4]").appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "1g") \
    .config("spark.ui.enabled", "false")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
spark = builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/01 17:50:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
LENGTH = 500000

## Evaluate how to get pypsark schema for pandas pyspark dataframes

Note that DataFrame.spark.schema() is not listed in the [API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html)

In [4]:
psdf = ps.DataFrame({"id": range(LENGTH), "value": range(10, LENGTH+10), "a": range(LENGTH,LENGTH*2)})

In [5]:
print(psdf.spark.schema())

StructType([StructField('id', LongType(), False), StructField('value', LongType(), False), StructField('a', LongType(), False)])


In [6]:
sdf = psdf.to_spark()
sdf.printSchema()

root
 |-- id: long (nullable = false)
 |-- value: long (nullable = false)
 |-- a: long (nullable = false)





### Test how to enforce nullable on schema

In [7]:
schema_new = StructType(
    [StructField("id", LongType(), True), StructField("value", LongType(), True), StructField('a', LongType(), False)]
)
sdf_new_schema = spark.createDataFrame(sdf.rdd, schema_new)

In [8]:
sdf_new_schema.printSchema()

root
 |-- id: long (nullable = true)
 |-- value: long (nullable = true)
 |-- a: long (nullable = false)



In [9]:
psdf_new_schema = sdf_new_schema.pandas_api()
psdf_new_schema.spark.schema()

StructType([StructField('id', LongType(), True), StructField('value', LongType(), True), StructField('a', LongType(), False)])

In [10]:
# psdf_new_schema.info()

## Select and filter columns to pandas on pyspark df

The goal is to compare the performance of three different approaches: native pyspark, pandas on pyspark with vectorized function and pyspark with scalar function. For this we do the same transformation in each approach and check the execution plan as well as the time taken to execute (on a larger dataset on databricks).

The transformations should be two filters which could be optimized and one complex filter.

First we generally investigate how pandas dataframes are working in pyspark and see that we need to be careful how functions are applied


In [11]:
ps.DataFrame({'A': range(1001)}).apply(lambda col: col.max())

                                                                                

A     249
A     499
A     749
A    1000
dtype: int64

### Scalar UDFS vs pandas UDFs 

see [this post](https://gist.github.com/BryanCutler/0b0c820c1beb5ffc40618c462912195f) for more

Short excurse to pandas vs python udf

In [12]:
df = spark.range(1 << 24, numPartitions=16).toDF("id") \
        .withColumn("p1", F.rand()).withColumn("p2", F.rand())

In [13]:
from math import log, exp

def scalar_func(p1, p2):
    w = 0.5
    return exp(log(p1) + log(p2) - log(w))

In [14]:
my_udf = F.udf(scalar_func, DoubleType())

result = df.withColumn("p", my_udf(F.col("p1"), F.col("p2")))

%timeit result.filter("p < 1.0").count()



4.34 s ± 146 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

In [15]:
def vect_func(p1, p2):
    w = 0.5
    return np.exp(np.log(p1) + np.log(p2) - np.log(w))

In [16]:
my_udf = F.pandas_udf(vect_func, DoubleType())

result = df.withColumn("p", my_udf(F.col("p1"), F.col("p2")))

%timeit result.filter("p < 1.0").count()



1.36 s ± 52.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

### Start with execution of different when statements

In [17]:
# scalar udf
def scalar_when(x: int) -> int: 
    return 100 if x < LENGTH/20 else (50 if x < LENGTH/2 else 0)

In [18]:
# vectorized udf returning a pandas Series
def vectorized_when_pd(x: pd.Series) -> pd.Series: 
  return pd.Series(np.where(x < LENGTH/20, 100, np.where(x < LENGTH/2, 50, 0)))
    

In [19]:
# vectorized udf returning a numpy array
def vectorized_when(x): 
  return np.where(x < LENGTH/20, 100, np.where(x < LENGTH/2, 50, 0))
    

In [20]:
def generate_dataset(len: int = LENGTH) -> ps.DataFrame:
  return ps.DataFrame({"id": range(len), "value": range(10, len+10), "a": range(len,len*2)})

### With native pyspark pandas

In [21]:
psdf_native = generate_dataset()
psdf_native = psdf_native.loc[psdf_native['id'] < (LENGTH - LENGTH/10)]
psdf_native["new_value"] = psdf_native["id"].where(psdf_native["id"] < LENGTH/20, 100).where(psdf_native["id"] < LENGTH/2, 50).where(psdf_native["id"] >= LENGTH/2, 0)
psdf_native = psdf_native.loc[psdf_native['id'] < (LENGTH - LENGTH/5)]
psdf_native = psdf_native.loc[:, ['new_value', 'id']]

In [22]:
psdf_native.spark.explain(mode="formatted")

== Physical Plan ==
LocalTableScan (1)


(1) LocalTableScan
Output [3]: [__index_level_0__#287L, new_value#340L, id#288L]
Arguments: [__index_level_0__#287L, new_value#340L, id#288L]




In [23]:
%timeit psdf_native.count()

24/08/01 17:51:14 WARN TaskSetManager: Stage 51 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:15 WARN TaskSetManager: Stage 54 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:17 WARN TaskSetManager: Stage 57 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:19 WARN TaskSetManager: Stage 60 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:20 WARN TaskSetManager: Stage 63 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:22 WARN TaskSetManager: Stage 66 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:23 WARN TaskSetManager: Stage 69 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


1.65 s ± 63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


24/08/01 17:51:25 WARN TaskSetManager: Stage 72 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


### Use pandas (vectorized) udf for pyspark

The [apply and transform](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/transform_apply.html) functions are working on pandas series and thus have the same effect as using pandas udfs.

In [24]:
# this runs a spark job - in case output type is not defined - since: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. (https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html)

psdf_scalar = generate_dataset()
psdf_scalar = psdf_scalar.loc[psdf_scalar['id'] < (LENGTH - LENGTH/10)]
psdf_scalar = psdf_scalar.assign(new_value=psdf_scalar["id"].transform(vectorized_when))
psdf_scalar = psdf_scalar.loc[psdf_scalar['id'] < (LENGTH - LENGTH/5)]
psdf_scalar = psdf_scalar.loc[:, ['new_value', 'id']]

In [25]:
psdf_scalar.spark.explain(mode='formatted')

== Physical Plan ==
* Project (4)
+- ArrowEvalPython (3)
   +- * Project (2)
      +- * LocalTableScan (1)


(1) LocalTableScan [codegen id : 1]
Output [5]: [__index_level_0__#693L, id#694L, value#695L, a#696L, __natural_order__#719L]
Arguments: [__index_level_0__#693L, id#694L, value#695L, a#696L, __natural_order__#719L]

(2) Project [codegen id : 1]
Output [2]: [__index_level_0__#693L, id#694L]
Input [5]: [__index_level_0__#693L, id#694L, value#695L, a#696L, __natural_order__#719L]

(3) ArrowEvalPython
Input [2]: [__index_level_0__#693L, id#694L]
Arguments: [pudf(__index_level_0__#693L, id#694L)#747L], [pythonUDF0#775L], 200

(4) Project [codegen id : 2]
Output [3]: [__index_level_0__#693L, pythonUDF0#775L AS new_value#750L, id#694L]
Input [3]: [__index_level_0__#693L, id#694L, pythonUDF0#775L]




In [26]:
%timeit psdf_scalar.count()

24/08/01 17:51:29 WARN TaskSetManager: Stage 76 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:31 WARN TaskSetManager: Stage 79 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:33 WARN TaskSetManager: Stage 82 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:34 WARN TaskSetManager: Stage 85 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:36 WARN TaskSetManager: Stage 88 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:38 WARN TaskSetManager: Stage 91 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:39 WARN TaskSetManager: Stage 94 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/

1.72 s ± 55.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Use pandas (scalar) udf for pyspark

In [27]:
psdf_vect = generate_dataset()
psdf_vect = psdf_vect.loc[psdf_vect['id'] < (LENGTH - LENGTH/10)]
psdf_vect = psdf_vect.assign(new_value=psdf_vect["id"].transform(scalar_when))
psdf_vect = psdf_vect.loc[psdf_vect['id'] < (LENGTH - LENGTH/5)]
psdf_vect = psdf_vect.loc[:, ['new_value', 'id']]

In [28]:
psdf_vect.spark.explain(mode='formatted')

== Physical Plan ==
* Project (4)
+- ArrowEvalPython (3)
   +- * Project (2)
      +- * LocalTableScan (1)


(1) LocalTableScan [codegen id : 1]
Output [5]: [__index_level_0__#1144L, id#1145L, value#1146L, a#1147L, __natural_order__#1170L]
Arguments: [__index_level_0__#1144L, id#1145L, value#1146L, a#1147L, __natural_order__#1170L]

(2) Project [codegen id : 1]
Output [2]: [__index_level_0__#1144L, id#1145L]
Input [5]: [__index_level_0__#1144L, id#1145L, value#1146L, a#1147L, __natural_order__#1170L]

(3) ArrowEvalPython
Input [2]: [__index_level_0__#1144L, id#1145L]
Arguments: [pudf(__index_level_0__#1144L, id#1145L)#1178L], [pythonUDF0#1206L], 200

(4) Project [codegen id : 2]
Output [3]: [__index_level_0__#1144L, pythonUDF0#1206L AS new_value#1181L, id#1145L]
Input [3]: [__index_level_0__#1144L, id#1145L, pythonUDF0#1206L]




In [29]:
%timeit psdf_vect.count()

24/08/01 17:51:45 WARN TaskSetManager: Stage 100 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:46 WARN TaskSetManager: Stage 103 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:48 WARN TaskSetManager: Stage 106 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:50 WARN TaskSetManager: Stage 109 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:51 WARN TaskSetManager: Stage 112 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:53 WARN TaskSetManager: Stage 115 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:51:55 WARN TaskSetManager: Stage 118 contains a task of very large size (6355 KiB). The maximum recommended task size is 1000 KiB.

1.63 s ± 104 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Native pyspark

In [30]:
sdf_native = generate_dataset().to_spark()
sdf_native = sdf_native.filter(F.col("id") < LENGTH - LENGTH/10)
sdf_native = sdf_native.withColumn("new_value", F.when(F.col("id") < 40, 100).when(F.col("id") < 60, 50).otherwise(0))
sdf_native = sdf_native.filter(F.col("id") < LENGTH - LENGTH/5)
sdf_native = sdf_native.select("id", "new_value")



In [31]:
sdf_native.explain(mode='formatted')

== Physical Plan ==
LocalTableScan (1)


(1) LocalTableScan
Output [2]: [id#1576L, new_value#1592]
Arguments: [id#1576L, new_value#1592]




In [32]:
%timeit sdf_native.count()

24/08/01 17:51:59 WARN TaskSetManager: Stage 124 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:00 WARN TaskSetManager: Stage 127 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:01 WARN TaskSetManager: Stage 130 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:01 WARN TaskSetManager: Stage 133 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:02 WARN TaskSetManager: Stage 136 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:03 WARN TaskSetManager: Stage 139 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.
24/08/01 17:52:04 WARN TaskSetManager: Stage 142 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.

976 ms ± 21.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


24/08/01 17:52:05 WARN TaskSetManager: Stage 145 contains a task of very large size (1667 KiB). The maximum recommended task size is 1000 KiB.


### scalar udf

In [33]:
scalar_when_udf = F.udf(scalar_when, LongType())

In [34]:
sdf_sclar = generate_dataset().to_spark()
sdf_sclar = sdf_sclar.filter(F.col("id") < 90)
sdf_sclar = sdf_sclar.withColumn("new_value", scalar_when_udf(F.col("id")))
sdf_sclar = sdf_sclar.filter(F.col("id") < 80)
sdf_sclar = sdf_sclar.select("id", "new_value")



In [35]:
sdf_sclar.explain(mode='formatted')

== Physical Plan ==
* Project (4)
+- BatchEvalPython (3)
   +- * Project (2)
      +- * LocalTableScan (1)


(1) LocalTableScan [codegen id : 1]
Output [3]: [id#1656L, value#1657L, a#1658L]
Arguments: [id#1656L, value#1657L, a#1658L]

(2) Project [codegen id : 1]
Output [1]: [id#1656L]
Input [3]: [id#1656L, value#1657L, a#1658L]

(3) BatchEvalPython
Input [1]: [id#1656L]
Arguments: [scalar_when(id#1656L)#1672L], [pythonUDF0#1680L]

(4) Project [codegen id : 2]
Output [2]: [id#1656L, pythonUDF0#1680L AS new_value#1673L]
Input [2]: [id#1656L, pythonUDF0#1680L]




In [36]:
%timeit sdf_sclar.count()

384 ms ± 13.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### vect udf

In [37]:
vect_when_udf = F.pandas_udf(vectorized_when_pd, IntegerType())

In [38]:
sdf_vect = generate_dataset().to_spark()
sdf_vect = sdf_vect.filter(F.col("id") < 90)
sdf_vect = sdf_vect.withColumn("new_value", vect_when_udf(F.col("id")))
sdf_vect = sdf_vect.filter(F.col("id") < 80)
sdf_vect = sdf_vect.select("id", "new_value")



In [39]:
sdf_vect.explain(mode='formatted')

== Physical Plan ==
* Project (4)
+- ArrowEvalPython (3)
   +- * Project (2)
      +- * LocalTableScan (1)


(1) LocalTableScan [codegen id : 1]
Output [3]: [id#1738L, value#1739L, a#1740L]
Arguments: [id#1738L, value#1739L, a#1740L]

(2) Project [codegen id : 1]
Output [1]: [id#1738L]
Input [3]: [id#1738L, value#1739L, a#1740L]

(3) ArrowEvalPython
Input [1]: [id#1738L]
Arguments: [vectorized_when_pd(id#1738L)#1754], [pythonUDF0#1762], 200

(4) Project [codegen id : 2]
Output [2]: [id#1738L, pythonUDF0#1762 AS new_value#1755]
Input [2]: [id#1738L, pythonUDF0#1762]




In [40]:
%timeit sdf_vect.count()

383 ms ± 24.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
