## Spark SQL and DataFrames: Interacción con fuentes de datos externas

Este notebook muestra ejemplos de código del capítulo 5

#### Creación de función, registro deUDF y creación de vista temporal

In [0]:
from pyspark.sql.types import LongType

def cubed(s):
    return s * s * s

spark.udf.register("cubed", cubed, LongType())

spark.range(1, 9).createOrReplaceTempView("udf_test")

In [0]:
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



## Acceleración y distrubución de UDFs de PySpark con UDFs de Panda

In [0]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

cubed_udf = pandas_udf(cubed, returnType=LongType())

#### Usando los DataFrame de Pandas

In [0]:
x = pd.Series([1, 2, 3])

print(cubed(x))

0     1
1     8
2    27
dtype: int64


#### Usando los DataFrame de Spark

In [0]:
df = spark.range(1, 4)

df.select("id", cubed_udf(col("id"))).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



## Funciones de orden superior en DataFrames y Spark SQL

In [0]:
arrayData = [[1, (1, 2, 3)], [2, (2, 3, 4)], [3, (3, 4, 5)]]

from pyspark.sql.types import *
arraySchema = (StructType([
    StructField("id", IntegerType(), True),
    StructField("values", ArrayType(IntegerType()), True)
    ]))

df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
df.createOrReplaceTempView("table")
df.printSchema()
df.show()

root
 |-- id: integer (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+---+---------+
| id|   values|
+---+---------+
|  1|[1, 2, 3]|
|  2|[2, 3, 4]|
|  3|[3, 4, 5]|
+---+---------+



#### Opción 1: Explode y Collect

En esta sentencia SQL anidada, primero `explode(values)` que crea una nueva fila (con el id) para cada elemento (`value`) dentro de values.

In [0]:
spark.sql("""SELECT id, collect_list(value + 1) AS newValues FROM (
             SELECT id, explode(values) AS value FROM table
             ) x
             GROUP BY id""").show()

+---+---------+
| id|newValues|
+---+---------+
|  1|[2, 3, 4]|
|  2|[3, 4, 5]|
|  3|[4, 5, 6]|
+---+---------+



#### Opción 2: Función definida por el usuario

Para realizar la misma tarea (añadir un valor de 1 a cada elemento de `values`), también podemos crear una función definida por el usuario (UDF) que utilice map para iterar por cada elemento (`values`) para realizar la operación de adición.

In [0]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType

def addOne (values):
    return [value + 1 for value in values]

spark.udf.register("plusOneIntPy", addOne, ArrayType(IntegerType()))

spark.sql("SELECT id, plusOneIntPy(values) AS values FROM table").show()

+---+---------+
| id|   values|
+---+---------+
|  1|[2, 3, 4]|
|  2|[3, 4, 5]|
|  3|[4, 5, 6]|
+---+---------+



## Funciones de orden superior

Además de las funciones incorporadas mencionadas anteriormente, existen funciones de alto orden que toman como argumentos funciones lambda anónimas.

In [0]:
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



#### Transform

`transform(array<T>, function<T, U>): array<U>`

La función transform produce un array aplicando una función a cada elemento de un array de entrada (similar a una función de mapa).

In [0]:
spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit FROM tC""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



#### Filter

`filter(array<T>, function<T, Boolean>): array<T>`

La función filter produce un array donde la función booleana es verdadera.

In [0]:
spark.sql("""SELECT celsius, filter(celsius, t -> t > 38) as high FROM tC""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



#### Exists

`exists(array<T>, function<T, V, Boolean>): Boolean`

La función exists devuelve verdadero si la función booleana es válida para cualquier elemento de la matriz de entrada.

In [0]:
spark.sql("""SELECT celsius, exists(celsius, t -> t = 38) as threshold FROM tC""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



#### Reduce

`reduce(array<T>, B, function<B, T, B>, function<B, R>)`

La función reduce reduce los elementos de la matriz a un único valor fusionando los elementos en un buffer B mediante la función<B, T, B> y aplicando una función de acabado<B, R> en el buffer final.

In [0]:
spark.sql("""SELECT celsius, reduce(
          celsius,
          0,
          (t, acc) -> t + acc,
          acc -> (acc div size(celsius) * 9 div 5) +32) as avgFahrenheit FROM tC""").show()

+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+



## Operadores relacionales comunes de DataFrames y Spark SQL

En esta sección, nos vamos a enfocar en los siguientes operadores:
* Unions y Joins
* Windowing
* Modifications

In [0]:
from pyspark.sql.functions import expr

delays_path = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airports_path = "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

airports = spark.read.options(header="true", inferSchema="true", sep="\t").csv(airports_path)
airports.createOrReplaceTempView("airports_na")

delays = spark.read.options(header="true").csv(delays_path)
delays = (delays
          .withColumn("delay", expr("CAST(delay as INT) as delay"))
          .withColumn("distance", expr("CAST(distance as INT) as distance")))
delays.createOrReplaceTempView("departureDelays")

foo = delays.filter(expr("""
            origin == 'SEA' AND
            destination == 'SFO' AND
            date like '01010%' AND
            delay > 0"""))
foo.createOrReplaceTempView("foo")

In [0]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [0]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [0]:
spark.sql("SELECT * FROM foo LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



## Unions

In [0]:
bar = delays.union(foo)
bar.createOrReplaceTempView("bar")
bar.filter(expr("origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [0]:
spark.sql("""SELECT * FROM bar
             WHERE origin = 'SEA'
             AND destination = 'SFO'
             AND date LIKE '01010%'
             AND delay > 0""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



## Joins

In [0]:
foo.join(airports, airports.IATA == foo.origin).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [0]:
spark.sql("""SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination FROM foo f
             JOIN airports_na a
             ON a.IATA = f.origin""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



## Funciones ventana

In [0]:
spark.sql("DROP TABLE IF EXISTS departureDelaysWindow")
spark.sql("""CREATE OR REPLACE TABLE departureDelaysWindow AS
             SELECT origin, destination, sum(delay) as TotalDelays FROM departureDelays 
             WHERE origin IN ('SEA', 'SFO', 'JFK') 
             AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL') 
             GROUP BY origin, destination""")

spark.sql("""SELECT * FROM departureDelaysWindow""").show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   JFK|        SFO|      35619|
|   JFK|        DEN|       4315|
|   JFK|        ATL|      12141|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SEA|        LAX|       9359|
|   SFO|        ORD|      27412|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



¿Cuáles son los 3 destinos con más retraso con ciudad origen en SEA, SFO y JFK?

In [0]:
spark.sql("""SELECT origin, destination, TotalDelays, rank FROM (
             SELECT origin, destination, TotalDelays, dense_rank() OVER(
             PARTITION BY origin ORDER BY TotalDelays DESC) as rank FROM departureDelaysWindow
             ) t WHERE rank <= 3""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



## Modificaciones

#### Añadiendo nuevas columnas

In [0]:
foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [0]:
spark.sql("""SELECT * , CASE WHEN delay <= 100 THEN 'On-time' ELSE 'Delayed' END AS status FROM foo""").show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|On-time|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



#### Eliminando columnas

In [0]:
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



#### Renombrando columnas

In [0]:
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



#### Pivotando

In [0]:
spark.sql("""SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays WHERE origin = 'SEA'""").show(10)

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
+-----------+-----+-----+
only showing top 10 rows



In [0]:
spark.sql("""SELECT * FROM (
             SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays
             WHERE origin = 'SEA'
             ) PIVOT (
             CAST (AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
             FOR month IN (1 JAN, 2 FEB, 3 MAR)
             ) ORDER BY destination""").show()

+-----------+------------+------------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|MAR_AvgDelay|MAR_MaxDelay|
+-----------+------------+------------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|       11.47|          74|
|        ANC|        4.44|         149|        7.90|         141|        5.10|         187|
|        ATL|       11.98|         397|        7.73|         145|        6.53|         109|
|        AUS|        3.48|          50|       -0.21|          18|        4.03|          61|
|        BOS|        7.84|         110|       14.58|         152|        7.78|         119|
|        BUR|       -2.03|          56|       -1.89|          78|        2.01|         108|
|        CLE|       16.00|          27|        null|        null|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|        5.16|  

In [0]:
spark.sql("""SELECT * FROM (
             SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays
             WHERE origin = 'SEA' 
             ) PIVOT (
             CAST(AVG(delay) AS DECIMAL(4, 2)) as AvgDelay, MAX(delay) as MaxDelay
             FOR month IN (1 JAN, 2 FEB)
             )
             ORDER BY destination""").show()

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D