# 2022/04/30

# Spark SQL

https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#recommended-pandas-and-pyarrow-versions

## Apache Arrow in PySpark

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.

## Enavling for conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call DataFrame.toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with SparkSession.createDataFrame(). To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.pyspark.enabled to true. This is disabled by default.

In addition, optimizations enabled by spark.sql.execution.arrow.pyspark.enabled could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.

In [3]:
import numpy as np
import pandas as pd

import pyspark.pandas as ps
from pyspark.sql import SparkSession



In [4]:
spark = SparkSession.builder.getOrCreate()

22/04/30 16:36:31 WARN Utils: Your hostname, SuideMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.18.142.168 instead (on interface en0)
22/04/30 16:36:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/30 16:36:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrae using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

                                                                                

### CAUTIONS!

Even with the Arrow optimization making it Pandas is still heavy load. ANDDDD it will fuck up your little baby computer

that's because anytime we convert it to pandas, it will trigger `collect()`

## Pandas UDFS (aka Vectorized UDFs)

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf() as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.

Before Spark 3.0, Pandas UDFs used to be defined with pyspark.sql.functions.PandasUDFType. From Spark 3.0 with Python 3.6+, you can also use Python type hints. Using Python type hints is preferred and using pyspark.sql.functions.PandasUDFType will be deprecated in the future release.

Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of StructType. The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:

In [7]:
import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# create a spark dataframe that has three columns including a struct column
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

In [9]:
df.show()

                                                                                

+--------+----------+-----------------+
|long_col|string_col|       struct_col|
+--------+----------+-----------------+
|       1|  a string|{a nested string}|
+--------+----------+-----------------+



In [11]:
df.printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)



In [13]:
view = df.select(func("long_col", "string_col", "struct_col"))
view.show()

                                                                                

+--------------------------------------+
|func(long_col, string_col, struct_col)|
+--------------------------------------+
|                  {a nested string, 9}|
+--------------------------------------+



In [14]:
view.printSchema()

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: long (nullable = true)



## Series to Series

The type hint can be expressed as pandas.Series, … -> pandas.Series.

By using pandas_udf() with the function having such type hints above, it creates a Pandas UDF where the given function takes one or more pandas.Series and outputs one pandas.Series. The output of the function should always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

The following example shows how to create this Pandas UDF that computes the product of 2 columns.



In [27]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

In [19]:
# The function for a pandas_udf should be able to execute with local Pandas data

x = pd.Series([1, 2, 3])
print(multiply_func(x, x))



0    1
1    4
2    9
dtype: int64


In [20]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=['x']))

In [22]:
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [24]:
# it also works with Spark dataframe!!!!!
df.select(multiply(col("x"), col("x"))).show()

                                                                                

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



## Iterator of Series to Iterator of Series

The type hint can be expressed as Iterator[pandas.Series] -> Iterator[pandas.Series].

By using pandas_udf() with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of pandas.Series and outputs an iterator of pandas.Series. The length of the entire output from the function should be the same length of the entire input; therefore, it can prefetch the data from the input iterator as long as the lengths are the same. In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator of Series.

It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case. The pseudocode below illustrates the example.



In [29]:
######## Psuedo code, will not work

# from typing import Iterator

# @pandas_udf("long")
# def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
#     # do some expensive initialization with a state
#     state = very_expensive_initialization()
#     for x in iterator:
#         # use that state for whole iterator.
#         yield calculate_with_state(x, state)
    
    
# df.select(calculate("value")).show()

In [30]:
from typing import Iterator

import pandas as pd

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

In [31]:
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [33]:
# Declare the function and create the UDF

@pandas_udf('long')
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

In [34]:
df.select(plus_one("x")).show()

                                                                                

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+



## Iterator of Multiple Series to Iterator of Series

The type hint can be expressed as Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].

By using pandas_udf() with the function having such type hints above, it creates a Pandas UDF where the given function takes an iterator of a tuple of multiple pandas.Series and outputs an iterator of pandas.Series. In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series to Iterator of Series case.

The following example shows how to create this Pandas UDF:

In [36]:
from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF

@pandas_udf("long")
def multiply_two_cols(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

In [37]:
df.select(multiply_two_cols("x", "x")).show()

[Stage 15:>                                                         (0 + 1) / 1]                                                                                

+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      4|
|                      9|
+-----------------------+



## Series to Scalar

The type hint can be expressed as pandas.Series, … -> Any.

By using pandas_udf() with the function having such type hints above, it creates a Pandas UDF similar to PySpark’s aggregate functions. The given function takes pandas.Series and returns a scalar value. The return type should be a primitive data type, and the returned scalar can be either a python primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64. Any should ideally be a specific scalar type accordingly.

This UDF can be also used with GroupedData.agg() and Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window.

Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by and window operations:

In [38]:
import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
        [(1, 1.0), 
         (1, 2.0), 
         (2, 3.0), 
         (2, 5.0), 
         (2, 10.0)],
        ("id", "v")
)

df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [40]:
# declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

In [41]:
df.select(mean_udf(df['v'])).show()

+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+



In [42]:
# groupby with an aggregation function
df.groupby("id").agg(mean_udf(df['v'])).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+



Adding a groupby back to the frame in one move

THIS IS BEAUTIFUL!!!

In [43]:

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



[Stage 31:>                                                         (0 + 1) / 1]                                                                                

# Pandas Function APIs

Pandas Function APIs can directly apply a Python native function against the whole DataFrame by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function API behaves as a regular API under PySpark DataFrame instead of Column, and Python type hints in Pandas Functions APIs are optional and do not affect how it works internally at this moment although they might be required in the future.

From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, DataFrame.groupby().applyInPandas(). It is still possible to use it with pyspark.sql.functions.PandasUDFType and DataFrame.groupby().apply() as it was; however, it is preferred to use DataFrame.groupby().applyInPandas() directly. Using pyspark.sql.functions.PandasUDFType will be deprecated in the future.

## Group Map

Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame. It maps each group to each pandas.DataFrame in the Python function.

This API implements the “split-apply-combine” pattern which consists of three steps:

Split the data into groups by using DataFrame.groupBy().

Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.

Combine the results into a new PySpark DataFrame.

To use DataFrame.groupBy().applyInPandas(), the user needs to define the following:

A Python function that defines the computation for each group.

A StructType object or a string that defines the schema of the output PySpark DataFrame.

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.

Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.

The following example shows how to use DataFrame.groupby().applyInPandas() to subtract the mean from each value in the group.



In [44]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v")
)

df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [45]:
def subtract_mean(pdf):
    #pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

In [47]:
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

[Stage 37:>                                                         (0 + 1) / 1]

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



                                                                                

## Co-grouped Map

Co-grouped map operations with Pandas instances are supported by DataFrame.groupby().cogroup().applyInPandas() which allows two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps:

Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.

Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.

Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

To use groupBy().cogroup().applyInPandas(), the user needs to define the following:

A Python function that defines the computation for each cogroup.

A StructType object or a string that defines the schema of the output PySpark DataFrame.

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame. on how to label columns when constructing a pandas.DataFrame.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.

The following example shows how to use DataFrame.groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

In [48]:
import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

df1.show()
df2.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+



In [50]:
def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()


[Stage 48:>                                                         (0 + 1) / 1]

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



                                                                                