# Packages to be Upgraded

In [None]:
# Install with Conda
conda install -c conda-forge pyarrow

# Install PyArrow with Python
pip install pyarrow==0.15.0

# Install Py4j with Python
pip install py4j==0.10.9

# Install pyspark with Python
pip install pyspark==3.0.0

# Spark Session Configurations

In [1]:
import pyarrow
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PandasUDF_with_PyArrow") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.hadoop.parquet.enable.summary-metadata", "false") \
    .getOrCreate()


In [2]:
# Spark Session Details
spark

In [None]:
# Spark Session All Assigned Configurations 
spark.sparkContext.getConf().getAll()

# Parquet to Arrow

In [3]:
import pandas as pd
import numpy as np

# Importing PyArrow Parquet
import pyarrow.parquet as pq

# Link to data: 
# https://www.kaggle.com/yassinealouini/m5-sales-hierarchy-dataset

path = 'dataset/dimension.parquet'

data_frame = pq.read_table(path).to_pandas()

In [4]:
data_frame

Unnamed: 0,location,department,category,id,store_id,item_id
0,CA,HOBBIES_1,HOBBIES,HOBBIES_1_001_CA_1_validation,CA_1,HOBBIES_1_001
154,CA,HOBBIES_1,HOBBIES,HOBBIES_1_002_CA_1_validation,CA_1,HOBBIES_1_002
416,CA,HOBBIES_1,HOBBIES,HOBBIES_1_003_CA_1_validation,CA_1,HOBBIES_1_003
541,CA,HOBBIES_1,HOBBIES,HOBBIES_1_004_CA_1_validation,CA_1,HOBBIES_1_004
818,CA,HOBBIES_1,HOBBIES,HOBBIES_1_005_CA_1_validation,CA_1,HOBBIES_1_005
...,...,...,...,...,...,...
6840006,WI,FOODS_3,FOODS,FOODS_3_823_WI_3_validation,WI_3,FOODS_3_823
6840288,WI,FOODS_3,FOODS,FOODS_3_824_WI_3_validation,WI_3,FOODS_3_824
6840570,WI,FOODS_3,FOODS,FOODS_3_825_WI_3_validation,WI_3,FOODS_3_825
6840852,WI,FOODS_3,FOODS,FOODS_3_826_WI_3_validation,WI_3,FOODS_3_826


# Parquet to Arrow with Pandas Dataframe

In [5]:
import pandas as pd
import numpy as np

import pyarrow as pa
import pyarrow.parquet as pq

pandas_df = pd.DataFrame(data={'column_1': [1, 2], 'column_2': [3, 4], 'column_3': [5, 6]})
table = pa.Table.from_pandas(pandas_df, preserve_index=True)
pq.write_table(table, 'pandas_dataframe.parquet')

In [6]:
pandas_df

Unnamed: 0,column_1,column_2,column_3
0,1,3,5
1,2,4,6


In [7]:
table

pyarrow.Table
column_1: int64
column_2: int64
column_3: int64
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"na'
            b'me": null, "field_name": null, "pandas_type": "unicode", "numpy_'
            b'type": "object", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"name": "column_1", "field_name": "column_1", "pandas_type": '
            b'"int64", "numpy_type": "int64", "metadata": null}, {"name": "col'
            b'umn_2", "field_name": "column_2", "pandas_type": "int64", "numpy'
            b'_type": "int64", "metadata": null}, {"name": "column_3", "field_'
            b'name": "column_3", "pandas_type": "int64", "numpy_type": "int64"'
            b', "metadata": null}, {"name": null, "field_name": "__index_level'
            b'_0__", "pandas_type": "int64", "numpy_type": "int64", "metadata"'
            b': null}], "creator": {"library": "pyarrow", "version": "0.15.1"}'
            b',

# Calculating Script Processing Time

In [8]:
import time
# Processor Time
time.process_time()

1.390625

In [9]:
# Wall-Clock Time
time.perf_counter()

192.0849444

In [10]:
time.time()

1596023702.1228762

In [11]:
time.monotonic()

543412.968

# PySpark Pandas UDFs (Vectorized UDFs)

# 1. SCALAR Pandas UDF

In [13]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

dataframe = spark.createDataFrame(
    [(1, 5), (2, 7), (2, 8), (2, 10), (3, 18), (3, 22), (4, 36)],
    ("index", "weight"))

# The function definition and the UDF creation
@pandas_udf("int")
def weight_avg_udf(weight: pd.Series) -> float:
    return weight.mean()

dataframe.select(weight_avg_udf(dataframe['weight'])).show()

+----------------------+
|weight_avg_udf(weight)|
+----------------------+
|                    15|
+----------------------+



# 2. GROUPED_AGG Pandas UDF

In [14]:
# Aggregation Process on Pandas UDF
dataframe.groupby("index").agg(weight_avg_udf(dataframe['weight'])).show()

w = Window \
    .partitionBy('index') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

+-----+----------------------+
|index|weight_avg_udf(weight)|
+-----+----------------------+
|    1|                     5|
|    3|                    20|
|    2|                     8|
|    4|                    36|
+-----+----------------------+



In [15]:
# Print the windowed results
dataframe.withColumn('avg_weight', weight_avg_udf(dataframe['weight']).over(w)).show()

+-----+------+----------+
|index|weight|avg_weight|
+-----+------+----------+
|    1|     5|         5|
|    3|    18|        20|
|    3|    22|        20|
|    2|     7|         8|
|    2|     8|         8|
|    2|    10|         8|
|    4|    36|        36|
+-----+------+----------+



# 3. GROUPED_MAP Pandas UDF

In [16]:
import pandas as pd
import numpy as np

# Pandas DataFrame generation
pandas_dataframe = pd.DataFrame(np.random.rand(200, 4))

def weight_map_udf(pandas_dataframe):
    weight = pandas_dataframe.weight
    return pandas_dataframe.assign(weight=weight - weight.mean())

dataframe.groupby("index").applyInPandas(weight_map_udf, schema="index int, weight int").show()

+-----+------+
|index|weight|
+-----+------+
|    1|     0|
|    3|    -2|
|    3|     2|
|    2|    -1|
|    2|     0|
|    2|     1|
|    4|     0|
+-----+------+

