<a href="https://colab.research.google.com/github/pstorniolo/Master2021/blob/main/2021_10_23_PyArrow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Apache Arrow in PySpark

Apache Arrow è un formato di dati a colonne in memoria utilizzato in Spark per trasferire in modo efficiente i dati tra i processi *JVM* e *Python*. Questo è attualmente molto vantaggioso per gli utenti Python che lavorano con dati *Pandas/NumPy*. Il suo utilizzo non è automatico e potrebbe richiedere alcune modifiche minori alla configurazione o al codice per trarre il massimo vantaggio e garantire la compatibilità.

In [None]:
# Install Spark 3.2.0
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!rm -f *.tgz*

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

!pip install -q findspark
!pip install -q pyspark==3.2.0

[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[K     |████████████████████████████████| 198 kB 14.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row

from datetime import datetime, date
import numpy as np
import pandas as pd
import pyarrow

spark=SparkSession.builder.appName("local[*]").getOrCreate()

##Enabling for Conversion to/from Pandas

Arrow è disponibile come ottimizzazione durante la conversione di Spark DataFrame in Pandas DataFrame usando la chiamata **DataFrame.toPandas()** e durante la creazione di Spark DataFrame da Pandas DataFrame con **SparkSession.createDataFrame()**. Per usare Arrow durante l'esecuzione di queste chiamate, si deve prima impostare la configurazione Spark *spark.sql.execution.arrow.pyspark.enabled* su *true*. Questa impostazione è disabilitata per default.

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")

'true'

L'utilizzo delle ottimizzazioni con Arrow produrrà gli stessi risultati di quando Arrow non è abilitato se non abilitato nell'ambiente Python. La conversione sarà meno efficiente.

In [None]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

%time
result_pdf

CPU times: user 4 µs, sys: 1 µs, total: 5 µs
Wall time: 8.58 µs


Unnamed: 0,0,1,2
0,0.292554,0.895397,0.918625
1,0.071531,0.947871,0.334770
2,0.119521,0.593665,0.055347
3,0.775464,0.308434,0.807798
4,0.787438,0.206828,0.729282
...,...,...,...
995,0.560107,0.684574,0.118798
996,0.702867,0.945057,0.778028
997,0.597309,0.823100,0.379332
998,0.380332,0.171202,0.754277


##Pandas UDFs (a.k.a. Vectorized UDFs)

Le UDF di Panda sono funzioni definite dall'utente che vengono eseguite da Spark utilizzando Arrow per trasferire dati e Panda per lavorare con i dati, il che consente operazioni vettorializzate. Una Pandas UDF viene definita utilizzando **pandas_udf()** come wrapper per la funzione e non è richiesta alcuna configurazione aggiuntiva. Una Pandas UDF si comporta come una normale API di PySpark.

In [None]:
from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()


df.select(func("long_col", "string_col", "struct_col")).printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: long (nullable = true)



###Series to Series

Usando **pandas_udf()** si crea una UDF Pandas in cui la funzione data prende uno o più *pandas.Series* e restituisce un *pandas.Series*. L'output della funzione deve essere sempre della stessa lunghezza dell'input. Internamente, PySpark eseguirà una UDF Pandas suddividendo le colonne in batch e chiamando la funzione per ogni batch come sottoinsieme dei dati, quindi concatenando i risultati insieme.

In [None]:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()


0    1
1    4
2    9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



##Iterator of Series to Iterator of Series

In [None]:
from typing import Iterator

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+



##Iterator of Multiple Series to Iterator of Series

In [None]:
from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
df.show()

# Declare the function and create the UDF
@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+

+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      4|
|                      9|
+-----------------------+



##Series to Scalar

In [None]:
import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()

df.groupby("id").agg(mean_udf(df['v'])).show()

w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



##Grouped Map

In [None]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



##Map

Le operazioni sono supportate da **DataFrame.mapInPandas()** che mappa un iteratore di *pandas.DataFrames* in un altro iteratore di *pandas.DataFrames* che rappresenta l'attuale PySpark DataFrame e restituisce il risultato come PySpark DataFrame.

In [None]:
df = spark.createDataFrame([(1, 21), (2, 30), (3, 55)], ("id", "age"))
df.show()

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

+---+---+
| id|age|
+---+---+
|  1| 21|
|  2| 30|
|  3| 55|
+---+---+

+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+



##Co-grouped Map

In [None]:
import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))
df1.show()

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))
df2.show()

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



##Usage Notes

###Setting Arrow **self_destruct** for memory savings

Dalla versione Spark 3.2, la configurazione *spark.sql.execution.arrow.pyspark.selfDestruct.enabled* può essere utilizzata per abilitare la funzione *self_destruct* di PyArrow, che può risparmiare memoria durante la creazione di un DataFrame Panda tramite toPandas liberando la memoria allocata da Arrow durante la creazione del DataFrame stesso.

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.selfDestruct.enabled","true")
spark.conf.get("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")

'true'