## Pandas UDF

## Apache Arrow in Pyspark
Spark可以使用`Apache Arrow`对python和jvm之间的数据进行传输， 这样会比默认传输方式更加高效。
为了能高效地利用特性和保障兼容性，使用的时候可能需要一点点修改或者配置。

## 为什么使用Arrow作为数据交换中介能够提升性能？
普通的python udf需要经过如下步骤来和jvm交互：
- jvm中一条数据序列化
- 序列化的数据发送到python进程
- 记录被python反序列化
- 记录被python处理
- 结果被python序列化
- 结果被发送到jvm
- jvm反序列化并存储结果到dataframe

所以python udf会比java和scala原生的udf慢。
但是使用pandas udf可以克服数据传输中需要的序列化问题，关键是使用了Arrow. spark使用arrow把JVM中的Dataframe转为可共享的buffer, 然后python也可以把这块共享buffer作为pandas的dataframe, 所以python可以直接在共享内存上操作。
以上，我们总结一下，使用arrow主要有两个好处：
1. 因为直接使用了共享内存，不在需要python和jvm序列化和反序列化数据。
2. pandas有很多使用c实现的方法， 可以直接使用。

## Spark DataFrame和Pandas DataFrame的转化
首先需要配置spark, 设置`spark.sql.execution.arrow.pyspark.enabled`, 默认这个选项是不打开的。
还可以开启`spark.sql.execution.arrow.pyspark.fallback.enabled`来避免如果没有安装`Arrow`或者其它相关错误。
Spark可以使用`toPandas()`方法转化为Pandas DataFrame(这个方法会单机生成，有性能问题）; 而使用`createDataFrame(pandas_df)`把Pandas DataFrame转为Spark DataFrame.

In [1]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("app1") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [3]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(20, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
result_pdf

Unnamed: 0,0,1,2
0,0.91601,0.922255,0.59619
1,0.128835,0.186798,0.236069
2,0.560804,0.828434,0.056276
3,0.245575,0.437656,0.358806
4,0.643864,0.652498,0.775339
5,0.025681,0.746733,0.531121
6,0.068925,0.182059,0.719574
7,0.329564,0.891549,0.175906
8,0.97099,0.360034,0.852293
9,0.138032,0.807608,0.84957


## Pandas UDF
`Pandas UDF`是用户定义的函数， Spark是用arrow传输数据并用pandas来运行`pandas UDF`， `pandas UDF`使用向量计算，相比于旧版本的`row-at-a-time`python udf, 最多增加100倍的性能. 使用`pandas_udf`修饰器装饰函数，就可以定义一个`pandas UDF`.对spark来说，UDF就是一个普通的pyspark函数。
从spark3.0开始， 推荐使用python类型(`type hint`)来定义pandas udf.
定义类型的时候，`StructType`需要使用`pandas.DataFrame`类型， 其他一律使用`pandas.Series`类型。

In [4]:
from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3:pd.DataFrame) -> pd.DataFrame:
    s3['col1'] = 'constant string'
    s3['col2'] = s1 + s2.str.len()
    return s3

df = spark.createDataFrame(
    [[1, 'a string', ('a nested string',)]],
    "long_col long, string_col string, struct_col struct<col1: string>"
)
df.printSchema()
df.select(func('long_col', 'string_col', 'struct_col')).printSchema()
df.select(func('long_col', 'string_col', 'struct_col')).show()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: long (nullable = true)

+--------------------------------------+
|func(long_col, string_col, struct_col)|
+--------------------------------------+
|                  [constant string, 9]|
+--------------------------------------+



### 定义Series to Series UDF
当类型提示可以被表达为`pandas.Series -> pandas.Series`时，称为`Series to Series`UDF
这种类型的`pandas UDF`的输入和输出必须要有相同的长度， PySpark会把数据按列分成多个batch, 然后对每个batch运行`pandas UDF`, 然后组合各自的结果。

In [5]:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
def multiply_func(a: pd.Series, b:pd.Series) -> pd.Series:
    return a*b

multiply = pandas_udf(multiply_func, returnType=LongType())
x = pd.Series([1,3,4,5])
df = spark.createDataFrame(pd.DataFrame(x, columns=['x']))
df.select(multiply(col('x'), col('x'))).show()

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  9|
|                 16|
|                 25|
+-------------------+



### 定义Series迭代器 -> Series迭代器 UDF
当类型提示可以被表达为`Iterator[pandas.Series] -> Iterator[pandas.Series]`时，称为`Iterator[Series] to Iterator[Series]`UDF. 这种类型在原有一个python函数接受迭代器传参时可用。

In [8]:
from typing import Iterator
pdf = pd.DataFrame([1,3,4], columns=['x'])
df = spark.createDataFrame(pdf)
@pandas_udf('long')
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x+1

df.select(plus_one('x')).show()

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          4|
|          5|
+-----------+



### 定义多个Series迭代器 -> Series迭代器的UDF
当类型提示可以被表达为`Iterator[Tuple[pandas.Series,...]] -> Iterator[pandas.Series]`时，称为`Iterator[Tuple[pandas.Series,...]] to Iterator[Series]`UDF.

In [11]:
from typing import Tuple
@pandas_udf('long')
def multiply_two_cols(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a,b in iterator:
        yield a*b

df.select(multiply_two_cols('x','x')).show()

+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      9|
|                     16|
+-----------------------+



### 定义Series -> Scalar类型的UDF
当类型提示可以被表达为`pandas.Series -> Scalar`时，称为`Series to Scalar`UDF.
Scalar具体的类型必须是原生python类型如int, float等等， 或者是numpy的数据类型如numpy.int64, numpy.float64
这种UDF可以被用于`groupBy(), agg(), pyspark.sql.Window`.

In [18]:
from pyspark.sql import Window
df = spark.createDataFrame([(1,1.0), (1,2.0), (2,3.0),(2,4.0), (2,10.0)], ('id', 'v'))
@pandas_udf('float')
def mean_udf(v: pd.Series) -> float:
    return v.mean()
df.select(mean_udf('v')).show()
df.groupby('id').agg(mean_udf('v')).show()

# 在window的范围内做平均
w = Window.partitionBy('id').rowsBetween(Window.currentRow, 1)
df.withColumn('mean_v', mean_udf('v').over(w)).show()

+-----------+
|mean_udf(v)|
+-----------+
|        4.0|
+-----------+

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|  5.6666665|
+---+-----------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   2.0|
|  2| 3.0|   3.5|
|  2| 4.0|   7.0|
|  2|10.0|  10.0|
+---+----+------+

