# 1. Funciones definidas por el usuario #

User defined functions: UDFs

### Spark SQL UDFs ###

In [0]:
from pyspark.sql.types import LongType

# Creamos la función cúbica
def cubed(s):
    return s * s * s
# Registramos la UDF
spark.udf.register("cubed",cubed,LongType())
#Generamos una vista temporal
spark.range(1,9).createOrReplaceTempView("udf_test")

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



Remarcamos que Spark SQL no garantiza el order de evaluación de las subexpresiones. Así, para hacer bien el *null checking*, se recomienda:
1. Hacer la propia UDF *null-aware* y que haga el null-checking dentro de la UDF.
2. Usar IF o CASE WHEN para hacer el null check e invocar la UDF en una rama condicional

#### Acelerando y distribuyendo Pyspark UDFs con Pandas UDFs ####

Panda UDF: UDF vectorizado.

El siguiente ejemplo es de un Pand UDF escalar para Spark 3.0.:

In [0]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

# Create the pandas UDF for the cubed function
cubed_udf=pandas_udf(cubed,returnType=LongType())

# Este código declara una función llamada cubed() que performa una operación *cubed*. 


Empecemos con una simple Panda Series y luego aplicar la función local cubed() para el cálculo cube.

In [0]:
# Create a Pandas Series
x=pd.Series([1,2,3])

# La función para una pandas_udf ejecutada con datos locales Pandas
print(cubed(x))

0     1
1     8
2    27
dtype: int64


Cambiemos a un Spark DataFrame. Podemos ejecutar esta función como una Spark UDF vectorizada como sigue:

In [0]:
from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .appName("AuthorsAges")
        .getOrCreate())

# Creamos un Spark DF

df=spark.range(1,4)

# Ejecutamos la función como una Spark UDF vectorizada

df.select("id",cubed_udf(col("id"))).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



Opuestamente a una función local, usar una UDF vectorizada resultará en la ejecución de Spark jobs; la función local previa es una función Pandas ejecutada sólo en el driver de Spark.

# 2. Funciones high-order en DataFrames y Spark SQL #

Hay dos soluciones típicas para manipular tipos de datos complejos:
- Explotar la estructura anidada en filas individuales, aplicando alguna función y luego recreando la estructura.
- Creando una función user-defined

### Opción 1: Explotar y recoger ###
En esta declaración SQL anidada, primero explotamos los values, creando así una nueva fila (con el id) para cada elemento (value) en values:

```
En SQL:
SELECT id, collect_list(value+1) AS values
FROM (SELECT id, EXPLODE(values) AS value
        FROM table) x
GROUP BY id
```

In [0]:
# Create an array dataset
arrayData = [[1, (1, 2, 3)], [2, (2, 3, 4)], [3, (3, 4, 5)]]

# Create schema
from pyspark.sql.types import *
arraySchema = (StructType([
      StructField("id", IntegerType(), True), 
      StructField("values", ArrayType(IntegerType()), True)
      ]))

# Create DataFrame
df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
df.createOrReplaceTempView("table")
df.printSchema()
df.show()

root
 |-- id: integer (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+---+---------+
| id|   values|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 3, 4]|
|  3|[3, 4, 5]|
+---+---------+



In [0]:
spark.sql("""
SELECT id, collect_list(value+1) AS values 
    FROM (SELECT id, EXPLODE(values) AS value
        FROM table) x 
    GROUP BY id""").show()

+---+---------+
| id|   values|
+---+---------+
|  1|[2, 3, 4]|
|  2|[3, 4, 5]|
|  3|[4, 5, 6]|
+---+---------+



### Opción 2: Funciones definidas por usuario ###

Para ejecutar la misma tarea, podemos crear una UDF que use map() para iterar a través de cada elemento (value) y ejecutar la operación suma.
Mientras que esto es mejor que usar explode() y collect_list(), los procesos de serialización y deserialización puede ser costoso. También recalcar que collect_list() puede causa que los ejecutores pasen por problemas out-of-memory para datasets grandes, mientras que usando UDFs podríamos aliviar esas cuestiones.

In [0]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType

# Create UDF
def addOne(values):
  return [value + 1 for value in values]

# Register UDF
spark.udf.register("plusOneInt", addOne, ArrayType(IntegerType())) 

Out[12]: <function __main__.addOne(values)>

In [0]:
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()

+---+---------+
| id|   values|
+---+---------+
|  1|[2, 3, 4]|
|  2|[3, 4, 5]|
|  3|[4, 5, 6]|
+---+---------+



### Funciones de orden alto ###

Hay funciones de orden alto que toma funciones lambda anónimas como argumentos. Un ejemplo de función de orden alto es la siguiente:
```
spark.sql("transform(values,value->lambda expression)")
```
donde transform() coge el array values y una función anónima (lambda expression) como entrada. La función crea un nuevo array aplicando la función anónima a cada elemento, y luego asignando el resultado al array de salida.

Creamos a continuación un dataset para ver algunos ejemplos.

In [0]:
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

# Show the DataFrame
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



#### Transform() ####

Produce un array aplicando la función a cada elemento del array de entrada (similar a map())

In [0]:
# Calcular Fahrenheit a Celsius para un array de temperaturas

spark.sql("""
SELECT celsius, transform(celsius, t->((t*9) div 5) +32) as fahrenheit
    FROM tC
""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



#### Filter() ####

Produce un array consistente en sólo elementos del array de entrada para los cuales la función booleana es true:

In [0]:
# Filtramos las temperaturas mayores de 38ºC

spark.sql("""
SELECT celsius,
    filter(celsius,t->t>38) as high
    FROM tC
""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



#### Exists() ####

Devuelve true si la función booleana se mantiene para algún elemento del array de entrada:

In [0]:
# Hay alguna temperatura de 38ºC?

spark.sql("""
SELECT celsius,
       exists(celsius,t->t=38) as threshold
FROM tC
""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



#### Reduce() ####

Reduce los elementos del array a un valor único fusionando los elementos a un buffer B usando function<B,T,B> y aplicando una función finalizadora function<B,R> en el buffer final:

In [0]:
# Calcular la temperatura media y convertir a F

spark.sql("""
SELECT celsius,
       reduce(
         celsius, / 
         0,
         (t,acc)->t+acc,
         acc-> (acc div size(celsius)*9 div 5) +32
         ) as avgFahrenheit
FROM tC
""").show()

+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+



# 3. Operaciones comunes de DataFrames y SparkSQL #

Vamos a preparar datos. Así:
1. Importamos dos archivos y creamos dos DataFrames, uno para información de aeropuerto y otro para retrasos en los vuelos de US.
2. Usando expr(), convertimos las columnas delay y distance de STRING a INT.
3. Creamos una tabla más pequeña, foo, en la que podemos centrarnos en nuestros ejemplos demo; contiene la información sólo de 3 vuelos desde SEA a SFO.

In [0]:
from pyspark.sql.functions import expr
tripdelaysFilePath="/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnaFilePath="/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

airportsna=(spark.read.format("csv").options(header="true",inferSchema="true",sep="\t").load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")

departureDelays=(spark.read.format("csv").options(header="true").load(tripdelaysFilePath))
departureDelays=(departureDelays.withColumn("delay", expr("CAST(delay as INT) as delay")).withColumn("distance",expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")

foo=(departureDelays.filter(expr("""origin == 'SEA' and destination == 'SFO' and date like '01010%' and delay>0""")))
foo.createOrReplaceTempView("foo")

spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



### Uniones ###

Unamos dos tablas. El dataframe 'bar' es la unión de foo con delays.

In [0]:
bar=departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

bar.filter(expr("""origin=='SEA' AND destination=='SFO' AND date like '01010%' and delay>0""")).show()


+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



### Joins ###

Por defecto, Spark SQL hace el join como un inner join, con las opciones inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi y left_anti.
El siguiente ejemplo ejecuta una inner join entre los dataframes airportsna y foo

In [0]:
# Join foo con airport info.

foo.join(
    airportsna,
    airportsna.IATA == foo.origin
).select("City","State","date","delay","distance","destination").show()

# En SQL:
spark.sql("""
SELECT a.City,a.State,f.date,f.delay,f.distance,f.destination
FROM foo f
JOIN airports_na a 
    ON a.IATA=f.origin
""").show(10)

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



### Windowing ###

Una función ventana usa valores de las filas en una ventana para devolver un conjunto de valores, tipicamente en forma de otra fila. Hacen posible operar en un grupo de filas mientras que aún está devolviendo un único valor para cada fila de entrada.
Veremos en esta sección como usar dense_rank().

Empecemos con una review de TotalDelays (calculado por sum(Delay)) experimentado por vuelos de origen en SEA, SFO y JFK y yendo a un conjunto específico de destinos, como notamos en la siguiente consulta:

In [0]:
spark.sql("DROP TABLE IF EXISTS departureDelaysWindow")

spark.sql("""
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
    FROM departureDelays
    WHERE origin IN ('SEA','SFO','JFK')
        AND DESTINATION IN ('SEA','SFO','JFK','DEN','ORD','LAX','ATL')
    GROUP BY origin, destination;
""")

spark.sql("SELECT * FROM departureDelaysWindow").show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   JFK|        SFO|      35619|
|   JFK|        DEN|       4315|
|   JFK|        ATL|      12141|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SEA|        LAX|       9359|
|   SFO|        ORD|      27412|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



Si quisiéramos ver para cada uno de los aeropuertos de origen los 3 destinos que experimentan más retrasos:

In [0]:
spark.sql("""
SELECT origin,destination, TotalDelays, rank
    FROM (
        SELECT origin, destination,TotalDelays,dense_rank()
        OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
        FROM departureDelaysWindow
) t
WHERE rank <=3
""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



### Modificaciones ###

Mientras que los DataFrames por sí mismos son inmutables, podemos modificarlos mediante operaciones que creen nuevos y diferentes DataFrames, con distintas columnas, por ejemplo.

#### Añadir nuevas columnas ####

Usamos el comando withColumn()

In [0]:
from pyspark.sql.functions import expr
foo2=(foo.withColumn(
          "status",
           expr("CASE WHEN delay<=10 THEN 'On-time' ELSE 'Delayed' END")
))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



#### Dropeando columnas ####

Usamos el método drop().

In [0]:
foo3=foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



#### Renombrando columnas ####

Usando rename()

In [0]:
foo4=foo3.withColumnRenamed("status","flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



#### Pivotando ####

Cuando trabajamos con datos, a veces necesitaremos intercambiar las columnas por las filas (i.e. pivotar los datos).

In [0]:
spark.sql("""
SELECT destination, CAST(SUBSTRING(date,0,2) AS int) AS month, delay
FROM departureDelays
WHERE origin='SEA'""").show()

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
|        ORD|    1|    0|
|        DFW|    1|   23|
|        DFW|    1|   36|
|        ORD|    1|  298|
|        JFK|    1|    4|
|        DFW|    1|    0|
|        MIA|    1|    2|
|        DFW|    1|    0|
|        DFW|    1|    0|
|        ORD|    1|   83|
+-----------+-----+-----+
only showing top 20 rows



Pivotar nos permite poner nombres en la columna month así como ejecutar cálculos de agregación (avg y max, por ejemplo).

In [0]:
spark.sql("""
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date,0,2) AS int) AS month, delay
    FROM departureDelays WHERE origin='SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4,2)) AS AvgDelay, MAX(delay) AS MaxDelay
FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination""").show()

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D