# PySpark: Apache Arrow in PySpark

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.

## Ensure PyArrow Installed

> pip install pyspark[sql]

## Importing libraries

In [1]:
import numpy as np
import pandas as pd

from typing import Iterator, Tuple
from collections.abc import Iterable

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import pandas_udf, col, udf
from pyspark.sql.types import LongType

## Connect to Spark

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
# Enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame in notebooks
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

## Enabling for Conversion to/from Pandas

In [4]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

print((df.count(), len(df.columns)))
df

(100, 3)


0,1,2
0.098747111482863,0.893175805710203,0.0476443091548728
0.9567366946948238,0.3398680130169368,0.6447115632156324
0.0330841386420428,0.1609302668691218,0.5050091336423522
0.7367932835868325,0.7755342205961568,0.9842201067808896
0.3689169304338334,0.704545853193771,0.2697110625143786
0.3322482487964862,0.4724419452427658,0.7760579952377767
0.1284218412484928,0.0314053424950141,0.9252385775188006
0.3784426798669348,0.033588235797431,0.9923117758080666
0.8789964656028638,0.2119259333641431,0.3388224974077995
0.6220994893016789,0.6239021274755167,0.6232632692887881


In [5]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))

Pandas DataFrame result statistics:
                0           1           2
count  100.000000  100.000000  100.000000
mean     0.462936    0.458167    0.497645
std      0.281031    0.306636    0.288140
min      0.012886    0.005309    0.001311
25%      0.243387    0.173176    0.269334
50%      0.411746    0.401460    0.434134
75%      0.711391    0.718346    0.777430
max      0.965285    0.994722    0.993816



## Pandas UDFs (a.k.a. Vectorized UDFs)

### Series to Series

In [6]:
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())  # type: ignore[call-overload]

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

0    1
1    4
2    9
dtype: int64


In [7]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



### Iterator of Series to Iterator of Series

In [8]:
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [9]:
# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+



### Iterator of Multiple Series to Iterator of Series

In [10]:
pdf = pd.DataFrame([(1, 1), (2, 4), (3, 9)], columns=["x", "y"])
df = spark.createDataFrame(pdf)
df

x,y
1,1
2,4
3,9


In [11]:
# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "y")).show()

+-----------------------+
|multiply_two_cols(x, y)|
+-----------------------+
|                      1|
|                      8|
|                     27|
+-----------------------+



### Series to Scalar

In [12]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v")
)
df

id,v
1,1.0
1,2.0
2,3.0
2,5.0
2,10.0


In [13]:
# Declare the function and create the UDF
@pandas_udf("double")  # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()

+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+



In [14]:
df.groupby("id").agg(mean_udf(df['v'])).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+



In [15]:
w = (Window.partitionBy('id')
           .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



## Pandas Function APIs

### Grouped Map

In [16]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

df.groupby("id").agg(mean_udf(df['v'])).show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+



In [17]:
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



### Map

In [18]:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
df.show()

+---+---+
| id|age|
+---+---+
|  1| 21|
|  2| 30|
+---+---+



In [19]:
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+



### Co-grouped Map

In [20]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

df1.show()
df2.show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+



In [21]:
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_ordered(left, right)

(df1.groupby("id")
    .cogroup(df2.groupby("id"))
    .applyInPandas(merge_ordered, schema="time int, id int, v1 double, v2 string")).show()

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



## Arrow Python UDFs

Arrow Python UDFs are user defined functions that are executed row-by-row, utilizing Arrow for efficient batch data transfer and serialization. To define an Arrow Python UDF, you can use the udf() decorator or wrap the function with the udf() method, ensuring the useArrow parameter is set to True. Additionally, you can enable Arrow optimization for Python UDFs throughout the entire SparkSession by setting the Spark configuration spark.sql

In [22]:
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.show()

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|John Doe| 21|
+---+--------+---+



In [23]:
@udf(returnType='int')  # A default, pickled Python UDF
def slen(s):  # type: ignore[no-untyped-def]
    return len(s)

@udf(returnType='int', useArrow=True)  # An Arrow Python UDF
def arrow_slen(s):  # type: ignore[no-untyped-def]
    return len(s)

df.select(slen("name"), arrow_slen("name")).show()

+----------+----------------+
|slen(name)|arrow_slen(name)|
+----------+----------------+
|         8|               8|
+----------+----------------+



## Close session

In [24]:
spark.stop()