# Apache Arrow
Apache Arrow is a development platform for in-memory analytics. It contains a set of technologies that enable big data systems to process and move data fast. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware.

<img src="arrow_desc.png"/>

<img src="columnar_storage.png"/>

In [1]:
import os
from pyspark.sql import SparkSession

key = os.environ["AWS_ACCESS_KEY_ID"]
secret = os.environ["AWS_SECRET_ACCESS_KEY"]

spark = (
        SparkSession
        .builder
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2,com.amazonaws:aws-java-sdk-pom:1.10.34")
        .getOrCreate()
    )
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", key)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret)

#### Converting to Pandas is expensive
DataFrame to Pandas in PySpark process collects all rows to the Spark driver, serializes each row into Python’s pickle format (row by row) and sends them to a Python worker process. At the end of this converting procedure, it unpickles each row into a massive list of tuples that is then processed into Pandas.

In [2]:
from pyspark.sql.functions import rand

df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
df.printSchema()

root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)



In [3]:
%time pdf = df.toPandas()

CPU times: user 11.4 s, sys: 930 ms, total: 12.3 s
Wall time: 15.7 s


In [4]:
pdf.describe()

Unnamed: 0,id,x
count,4194304.0,4194304.0
mean,2097152.0,0.4999261
std,1210791.0,0.2886626
min,0.0,3.20827e-07
25%,1048576.0,0.2500795
50%,2097152.0,0.4998171
75%,3145727.0,0.7500119
max,4194303.0,0.9999998


### Now enable arrow

In [24]:
arrow_spark = (
        SparkSession
        .builder
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2,com.amazonaws:aws-java-sdk-pom:1.10.34")
        .getOrCreate()
    )
arrow_spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", key)
arrow_spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret)

In [25]:
arrow_spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [26]:
arrow_df = arrow_spark.range(1 << 22).toDF("id").withColumn("x", rand())
arrow_df.printSchema()

root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)



In [11]:
%time pdf = arrow_df.toPandas()

CPU times: user 239 ms, sys: 117 ms, total: 356 ms
Wall time: 840 ms


In [12]:
pdf.describe()

Unnamed: 0,id,x
count,4194304.0,4194304.0
mean,2097152.0,0.5003103
std,1210791.0,0.2886706
min,0.0,1.304829e-07
25%,1048576.0,0.2502151
50%,2097152.0,0.5004541
75%,3145727.0,0.7503815
max,4194303.0,0.9999999


## Also from pandas to spark

In [5]:
import pandas as pd
from pandas import util

pd.util.testing.N = 10**6
test_df = util.testing.makeDataFrame()
test_df.describe()

Unnamed: 0,A,B,C,D
count,1000000.0,1000000.0,1000000.0,1000000.0
mean,-0.001093,0.00117,0.000832,0.002214
std,1.000809,0.999986,0.999185,0.999939
min,-4.720892,-4.530946,-4.749366,-4.922817
25%,-0.676655,-0.674001,-0.674134,-0.672791
50%,-0.001742,0.001121,0.001066,0.00191
75%,0.674406,0.676717,0.674769,0.676591
max,5.499123,4.70264,4.966137,4.516064


In [6]:
%time sdf = spark.createDataFrame(test_df)

CPU times: user 25.2 s, sys: 203 ms, total: 25.4 s
Wall time: 25 s


In [7]:
sdf.printSchema()

root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)
 |-- C: double (nullable = true)
 |-- D: double (nullable = true)



In [15]:
test_df.describe()

Unnamed: 0,A,B,C,D
count,1000000.0,1000000.0,1000000.0,1000000.0
mean,-0.001093,0.00117,0.000832,0.002214
std,1.000809,0.999986,0.999185,0.999939
min,-4.720892,-4.530946,-4.749366,-4.922817
25%,-0.676655,-0.674001,-0.674134,-0.672791
50%,-0.001742,0.001121,0.001066,0.00191
75%,0.674406,0.676717,0.674769,0.676591
max,5.499123,4.70264,4.966137,4.516064


In [13]:
os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'
%time asdf = arrow_spark.createDataFrame(test_df)

CPU times: user 199 ms, sys: 25.6 ms, total: 224 ms
Wall time: 251 ms


In [14]:
asdf.printSchema()

root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)
 |-- C: double (nullable = true)
 |-- D: double (nullable = true)



## But...
Why would we convert a massive spark data frame to pandas? We would lose the parallelized capabilities and all computations would be ridiculously slow anyway. That's what koalas is for, isn't it?

## I'm glad you asked!
Enter... **Pandas Used Defined Functions (UDFs)** a.k.a. Vectorized UDFs
>Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data. A Pandas UDF is defined using the keyword pandas_udf as a decorator or to wrap the function, no additional configuration is required. Currently, there are two types of Pandas UDF: Scalar and Grouped Map.

In [28]:
from pyspark.sql.types import IntegerType
new_df = arrow_df.withColumn("group_id", (rand() * 10).cast(IntegerType()))
new_df.show(5)

+---+-------------------+--------+
| id|                  x|group_id|
+---+-------------------+--------+
|  0| 0.7835014960581982|       9|
|  1| 0.6304803124378475|       8|
|  2| 0.8438476250450363|       4|
|  3| 0.5897122285716005|       6|
|  4|0.20807418193297245|       3|
+---+-------------------+--------+
only showing top 5 rows



## Grouped aggregate
Defines an aggregation from one or more `pandas.Series` to a scalar value, where each `pandas.Series` represents a column within the group or window.

In [3]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(val):
    os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'
    return val.mean()

In [40]:
%time new_df.groupby("group_id").agg(mean_udf(new_df['x'])).show()

+--------+-------------------+
|group_id|        mean_udf(x)|
+--------+-------------------+
|       1| 0.5003724296015791|
|       6| 0.5005115712065953|
|       3| 0.5006386456346672|
|       5|0.49991454914227007|
|       9| 0.4999234648309748|
|       4| 0.5000941894368167|
|       8| 0.5005127133164889|
|       7| 0.5007053770330026|
|       2| 0.5000558271576734|
|       0| 0.5003753971571799|
+--------+-------------------+

CPU times: user 26.4 ms, sys: 12.8 ms, total: 39.1 ms
Wall time: 2.57 s


In [32]:
from pyspark.sql.types import IntegerType

slow_df = df.withColumn("group_id", (rand() * 10).cast(IntegerType()))
slow_df.show(5)
%time slow_df.groupby("group_id").agg(mean_udf(slow_df['x'])).show()

+---+------------------+--------+
| id|                 x|group_id|
+---+------------------+--------+
|  0|0.4695762895238291|       5|
|  1|0.8155342295932761|       1|
|  2|0.7769013623312877|       2|
|  3|0.7413768141953538|       1|
|  4|0.9841248543797412|       4|
+---+------------------+--------+
only showing top 5 rows

+--------+-------------------+
|group_id|        mean_udf(x)|
+--------+-------------------+
|       1|0.49963746215395277|
|       6| 0.4992933200746855|
|       3| 0.5005319476083719|
|       5|0.49978673289073505|
|       9| 0.5005034392982204|
|       4| 0.5004049064645534|
|       8| 0.5001358157567577|
|       7|0.49955033307267627|
|       2| 0.4999911395441541|
|       0| 0.4992863812864833|
+--------+-------------------+

CPU times: user 18.1 ms, sys: 10.4 ms, total: 28.5 ms
Wall time: 3.41 s


## Grouped map
Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the “split-apply-combine” pattern. Split-apply-combine consists of three steps:

- Split the data into groups by using `DataFrame.groupBy`.
- Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The input data contains all the rows and columns for each group.
- Combine the results into a new spark `DataFrame`.

To use groupBy().apply(), the user needs to define the following:

- A Python function that defines the computation for each group.
- A StructType object or a string that defines the schema of the output DataFrame.

The column labels of the returned `pandas.DataFrame` must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices.

Arrow **is supposed to** speed up the steps where each group is turned into a `pandas.DataFrame` and where all the resulting `pandas.Dataframe` objects are combined into a single spark `DataFrame`

In [21]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("id integer, x double, group_id integer", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'
    val = pdf.x
    return pdf.assign(x=val - val.mean())

@pandas_udf("id integer, x double, group_id integer", PandasUDFType.GROUPED_MAP)
def make_new(pdf):
    os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'
    rows = []
    for i in range(len(pdf)):
        row = pdf.loc[i].to_dict()
        if row["x"] * 10 > row["group_id"]:
            new_row = {
                "id": row["id"],
                "x": row["group_id"] - row["x"] * 10,
                "group_id": row["group_id"]
            }
            rows.append(new_row)
    return pd.DataFrame(rows)

In [22]:
%time slow_df.groupby("group_id").apply(subtract_mean).show()


+---+--------------------+--------+
| id|                   x|group_id|
+---+--------------------+--------+
|  1| 0.31490414362579955|       1|
| 40|  0.4258973148723636|       1|
| 62| 0.06216563732985847|       1|
| 81| -0.2658185064388757|       1|
|100|  0.2423298675086637|       1|
|111|-0.16079115290063883|       1|
|124|-0.46484840242143033|       1|
|126| 0.07271004548102722|       1|
|131|  0.4577507022900106|       1|
|136|-0.11222963663258179|       1|
|145| -0.2587136345611961|       1|
|146|  0.2935153178233523|       1|
|158|-0.21808939746794265|       1|
|161| -0.2760088691222756|       1|
|192|  0.3573616534146574|       1|
|201|  0.3899767922051003|       1|
|206| -0.4839786325336748|       1|
|222| -0.3764642765977415|       1|
|230| 0.18714294966433942|       1|
|232| 0.05346919440626552|       1|
+---+--------------------+--------+
only showing top 20 rows

CPU times: user 20.5 ms, sys: 9.41 ms, total: 29.9 ms
Wall time: 1.97 s


In [30]:
%time new_df.groupby("group_id").apply(subtract_mean).show()

+---+--------------------+--------+
| id|                   x|group_id|
+---+--------------------+--------+
| 39|  0.3941321966706638|       1|
| 49|  0.2800457317561573|       1|
| 51|  0.4723183196682934|       1|
| 56| 0.17780227668203763|       1|
| 65|-0.29535415278073907|       1|
| 79| 0.13679756438091062|       1|
| 80| -0.3112120679074296|       1|
| 90|-0.06858036938887069|       1|
| 98| -0.4072505871917307|       1|
|106|-0.40478885596122094|       1|
|141| 0.41695642373563724|       1|
|151|-0.02222740680657942|       1|
|170| 0.34704178936104824|       1|
|171|-0.18128532357799942|       1|
|187| -0.2400998967551562|       1|
|196|-0.06369698353156272|       1|
|205|-0.05135405091547163|       1|
|207|  0.3435252598696463|       1|
|214|-0.04771468292684111|       1|
|222|    -0.3478538644075|       1|
+---+--------------------+--------+
only showing top 20 rows

CPU times: user 13.6 ms, sys: 7.6 ms, total: 21.2 ms
Wall time: 2.66 s


In [23]:
%time slow_df.groupby("group_id").apply(make_new).show()

+---+--------------------+--------+
| id|                   x|group_id|
+---+--------------------+--------+
|  1|  -7.155342295932762|       1|
| 40|  -8.265274008398402|       1|
| 62|  -4.627957232973351|       1|
| 81| -1.3481157952860086|       1|
|100|  -6.429599534761403|       1|
|111|  -2.398389330668378|       1|
|126|  -4.733401314485038|       1|
|131|  -8.583807882574872|       1|
|136| -2.8840044933489484|       1|
|145|  -1.419164514062805|       1|
|146|  -6.941454037908288|       1|
|158| -1.8254068849953393|       1|
|161| -1.2462121684520095|       1|
|192|   -7.57991739382134|       1|
|201|  -7.906068781725768|       1|
|222|-0.24165809369735092|       1|
|230|   -5.87773035631816|       1|
|232|  -4.540992803737421|       1|
|234| -3.9221715265684747|       1|
|264|  -7.990430338852571|       1|
+---+--------------------+--------+
only showing top 20 rows

CPU times: user 27.4 ms, sys: 13.9 ms, total: 41.3 ms
Wall time: 2min 13s


In [31]:
%time new_df.groupby("group_id").apply(make_new).show()

+---+-------------------+--------+
| id|                  x|group_id|
+---+-------------------+--------+
| 39| -7.943939928444344|       1|
| 49| -6.803075279299279|       1|
| 51| -8.725801158420639|       1|
| 56| -5.780640728558081|       1|
| 65|-1.0490764339303147|       1|
| 79| -5.370593605546811|       1|
| 80|-0.8904972826634092|       1|
| 90| -3.316814267848999|       1|
|141| -8.172182199094078|       1|
|151| -3.780343893671911|       1|
|170| -7.473035855348188|       1|
|171| -2.189764725957711|       1|
|187|-1.6016189941861434|       1|
|196|-3.3656481264220783|       1|
|205| -3.489077452582989|       1|
|207| -7.437870560434169|       1|
|214|-3.5254711324692938|       1|
|222|-0.5240793176627054|       1|
|225|-1.7866853540597125|       1|
|239| -6.250230750907675|       1|
+---+-------------------+--------+
only showing top 20 rows

CPU times: user 19.3 ms, sys: 11.6 ms, total: 31 ms
Wall time: 2min 23s


In [18]:
xxx = pd.DataFrame({
    "id": [1,2,3,4,5],
    "x": [0.5, 0.4, 0.6, 0.2, 0.7],
    "group_id": [1,5,1,5,1]
})

def pd_make_new(pdf):
    rows = []
    for i in range(len(pdf)):
        row = pdf.loc[i].to_dict()
        if row["x"] * 10 > row["group_id"]:
            new_row = {
                "id": row["id"],
                "x": row["group_id"] - row["x"] * 10,
                "group_id": row["group_id"]
            }
            rows.append(new_row)
    return pd.DataFrame(rows)

pd_make_new(xxx)

Unnamed: 0,group_id,id,x
0,1.0,1.0,-4.0
1,1.0,3.0,-5.0
2,1.0,5.0,-6.0


In [17]:
row = xxx.loc[0]
row.to_dict()

{'id': 1.0, 'x': 0.5, 'group_id': 1.0}