**UDFs**(User Defined Functions): además de las funciones que hay predefinidas en spark, el usuario puede definir funciones que aplicar a un conjunto de datos una vez definida la función. Las funciones solo existen mientras se esta activa la sesion, una vez cerrada se eliminan, igual que las variables.

##### Scala

In [1]:
//Ejemplo UDF
val cubed = (s: Long) => {
 s * s * s
}

//Guarrdamos la funcion
spark.udf.register("cubed", cubed)

//Creamos un rango de valores al que aplicar la función
spark.range(1, 9).createOrReplaceTempView("udf_test")

Intitializing Scala interpreter ...

Spark Web UI available at http://EM2021002836.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1623753172799)
SparkSession available as 'spark'


cubed: Long => Long = $Lambda$1985/0x0000000801426428@444394e6


In [2]:
//Aplicamos la función 
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



##### Python

In [2]:
from pyspark.sql.types import LongType

In [3]:
##Definimos la funcion
def cubed(s):
     return s * s * s

##Registramos la funcion para poder utilizarla
spark.udf.register("cubed", cubed, LongType())

##Creamos un rango de valores
spark.range(1, 9).createOrReplaceTempView("udf_test")

<function __main__.cubed(s)>

In [6]:
##Aplicamos la función
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



In [11]:
pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-4.0.1-cp38-cp38-win_amd64.whl (13.3 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-4.0.1
Note: you may need to restart the kernel to use updated packages.


In [13]:
##Ejemplo utilizando Pandas UDF
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

##Declaramos la funcion
def cubed2(a: pd.Series) -> pd.Series:
     return a * a * a

#Creamos la funcion cube con pandas UDF
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [15]:
#Definimos una serie a la que aplicar la funcion y la aplicamos
x = pd.Series([1, 2, 3])
print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [16]:
#Ahora lo hacemos para un Spark DF
df = spark.range(1, 4)
df.select("id", cubed_udf(col("id"))).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



### Using the Spark SQL Shell    
      
``cd C:\Spark\bin``        
Iniciamos la spark-sql:      
``spark-sql``     
Creamos una tabla:       
`CREATE TABLE people (name STRING, age int);`      
Antes de insertar valores, salimos de la spark-shell y ponemos el siguiente comando:         
``winutils chmod -R 777 spark-warehouse``        
Una vez hecho esto, volvemos a spark-sql.        
Insertamos los datos en la tabla:        
`INSERT INTO people VALUES ("Michael", NULL);
INSERT INTO people VALUES ("Andy", 30);
INSERT INTO people VALUES ("Samantha", 19);`        
Mostramos las tablas que tenemos guardadas:        
``SHOW TABLES;``      
Hacemos querys:    
``SELECT * FROM people WHERE age < 20;``     
``SELECT name FROM people WHERE age IS NULL;``    

### External Data Sources    
**PostgreSQL**       
*Scala*

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("ClientPL")
.master("local")
.enableHiveSupport()
.getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://EM2021002836.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1623827372967)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@b041b74


In [8]:
// Read Option 1: Loading data from a JDBC source using load method
val jdbcDF1 = spark
 .read
 .format("jdbc")
 .option("url", "jdbc:postgresql://localhost:5432/consumer complaints")
 .option("dbtable", "public.consumer_complaints")
 .option("user", "postgres")
 .option("password", "Ttzz2r2a")
 .option("driver", "org.postgresql.Driver")
 .load()

jdbcDF1: org.apache.spark.sql.DataFrame = [date_received: string, product_name: string ... 16 more fields]


In [None]:
jdbcDF1.schema

In [12]:
jdbcDF1.columns

res7: Array[String] = Array(date_received, product_name, sub_product, issue, sub_issue, consumer_complaint_narrative, company_public_response, company, state_name, zip_code, tags, consumer_consent_provided, submitted_via, date_sent, company_response_to_consumer, timely_response, consumer_disputed, complaint_id)


In [14]:
jdbcDF1.select("date_received", "product_name", "sub_product", "issue").show(5)

+-------------+--------------------+--------------------+--------------------+
|date_received|        product_name|         sub_product|               issue|
+-------------+--------------------+--------------------+--------------------+
|   2013-07-29|       Consumer Loan|        Vehicle loan|Managing the loan...|
|   2013-07-29|Bank account or s...|    Checking account|Using a debit or ...|
|   2013-07-29|Bank account or s...|    Checking account|Account opening, ...|
|   2013-07-29|Bank account or s...|    Checking account|Deposits and with...|
|   2013-07-29|            Mortgage|Conventional fixe...|Loan servicing, p...|
+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [3]:
// Read Option 2: Loading data from a JDBC source using jdbc method
// Create connection properties
import java.util.Properties
val cxnProp = new Properties()
cxnProp.put("user", "postgres")
cxnProp.put("password", "Ttzz2r2a")
// Load data using the connection properties
val jdbcDF2 = spark
 .read
 .jdbc("jdbc:postgresql://localhost:5432/consumer complaints", "public.consumer_complaints", cxnProp)

import java.util.Properties
cxnProp: java.util.Properties = {password=Ttzz2r2a, user=postgres}
jdbcDF2: org.apache.spark.sql.DataFrame = [date_received: string, product_name: string ... 16 more fields]


In [15]:
jdbcDF2.select("date_received", "product_name", "sub_product", "issue").show(5)

+-------------+--------------------+--------------------+--------------------+
|date_received|        product_name|         sub_product|               issue|
+-------------+--------------------+--------------------+--------------------+
|   2013-07-29|       Consumer Loan|        Vehicle loan|Managing the loan...|
|   2013-07-29|Bank account or s...|    Checking account|Using a debit or ...|
|   2013-07-29|Bank account or s...|    Checking account|Account opening, ...|
|   2013-07-29|Bank account or s...|    Checking account|Deposits and with...|
|   2013-07-29|            Mortgage|Conventional fixe...|Loan servicing, p...|
+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [5]:
// Write Option 1: Saving data to a JDBC source using save method
jdbcDF1
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://localhost:5432/consumer complaints")
 .option("dbtable", "public.consumer_complaints2")
 .option("user", "postgres")
 .option("password", "Ttzz2r2a")
 .save()

In [6]:
jdbcDF2.write
 .jdbc(s"jdbc:postgresql://localhost:5432/consumer complaints", "public.consumer_complaints3", cxnProp)

*Python*

In [1]:
# Read Option 1: Loading data from a JDBC source using load method
jdbcDF1 = (spark\
 .read\
 .format("jdbc")\
 .option("url", "jdbc:postgresql://localhost:5432/consumer complaints")\
 .option("dbtable", "public.consumer_complaints")\
 .option("user", "postgres")\
 .option("password", "Ttzz2r2a")\
 .load())

In [5]:
jdbcDF1.columns

['date_received',
 'product_name',
 'sub_product',
 'issue',
 'sub_issue',
 'consumer_complaint_narrative',
 'company_public_response',
 'company',
 'state_name',
 'zip_code',
 'tags',
 'consumer_consent_provided',
 'submitted_via',
 'date_sent',
 'company_response_to_consumer',
 'timely_response',
 'consumer_disputed',
 'complaint_id']

In [6]:
jdbcDF1.select("date_received", "product_name", "sub_product", "issue").show(5)

+-------------+--------------------+--------------------+--------------------+
|date_received|        product_name|         sub_product|               issue|
+-------------+--------------------+--------------------+--------------------+
|   2013-07-29|       Consumer Loan|        Vehicle loan|Managing the loan...|
|   2013-07-29|Bank account or s...|    Checking account|Using a debit or ...|
|   2013-07-29|Bank account or s...|    Checking account|Account opening, ...|
|   2013-07-29|Bank account or s...|    Checking account|Deposits and with...|
|   2013-07-29|            Mortgage|Conventional fixe...|Loan servicing, p...|
+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [2]:
# Read Option 2: Loading data from a JDBC source using jdbc method
jdbcDF2 = (spark\
 .read\
 .jdbc("jdbc:postgresql://localhost:5432/consumer complaints", "public.consumer_complaints",\
 properties={"user": "postgres", "password": "Ttzz2r2a"}))

In [8]:
jdbcDF2.select("date_received", "product_name", "sub_product", "issue").show(5)

+-------------+--------------------+--------------------+--------------------+
|date_received|        product_name|         sub_product|               issue|
+-------------+--------------------+--------------------+--------------------+
|   2013-07-29|       Consumer Loan|        Vehicle loan|Managing the loan...|
|   2013-07-29|Bank account or s...|    Checking account|Using a debit or ...|
|   2013-07-29|Bank account or s...|    Checking account|Account opening, ...|
|   2013-07-29|Bank account or s...|    Checking account|Deposits and with...|
|   2013-07-29|            Mortgage|Conventional fixe...|Loan servicing, p...|
+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [3]:
# Write Option 1: Saving data to a JDBC source using save method
(jdbcDF1
 .write\
 .format("jdbc")\
 .option("url", "jdbc:postgresql://localhost:5432/consumer complaints")\
 .option("dbtable", "public.consumer_complaints4")\
 .option("user", "postgres")\
 .option("password", "Ttzz2r2a")\
 .save())

In [4]:
# Write Option 2: Saving data to a JDBC source using jdbc method
(jdbcDF2
 .write\
 .jdbc("jdbc:postgresql://localhost:5432/consumer complaints", "public.consumer_complaints5",\
 properties={"user": "postgres", "password": "Ttzz2r2a"}))

#### Conexión con MySQL - Scala
Loading data from a JDBC source using load 

    val jdbcDF = spark
     .read
     .format("jdbc")
     .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .load()
Saving data to a JDBC source using save 

    jdbcDF
     .write
     .format("jdbc")
     .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .save()  

#### Conexión con MySQL - Python
Loading data from a JDBC source using load 

    jdbcDF = (spark
     .read
     .format("jdbc")
     .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .load())

Saving data to a JDBC source using save 

    jdbcDF
     .write
     .format("jdbc")
     .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .save())  

#### Conexión con MS SQL Server - Scala
Loading data from a JDBC source
Configure jdbcUrl

    val jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
Create a Properties() object to hold the parameters.      
Note, you can create the JDBC URL without passing in the user/password parameters directly.

    val cxnProp = new Properties()
    cxnProp.put("user", "[USERNAME]")
    cxnProp.put("password", "[PASSWORD]")
    cxnProp.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
Load data using the connection properties

    val jdbcDF = spark.read.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)
Saving data to a JDBC source
    
    jdbcDF.write.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)

#### Conexión con MS SQL Server - Python
In Python
Configure jdbcUrl

    jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
Loading data from a JDBC source

    jdbcDF = (spark
     .read
     .format("jdbc")
     .option("url", jdbcUrl)
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .load())
Saving data to a JDBC source

    (jdbcDF
     .write
     .format("jdbc")
     .option("url", jdbcUrl)
     .option("dbtable", "[TABLENAME]")
     .option("user", "[USERNAME]")
     .option("password", "[PASSWORD]")
     .save())

### Higher-Order Functions in DataFrames and Spark SQL

**Opcion 1**    
Explode (crea una nueva fila para cada elemento dentro de la columna seleccionada) and Collect (devuelve una lista de objetos con duplicados)
    
    spark.sql("""SELECT id, collect_list(value + 1) AS values
    FROM (SELECT id, EXPLODE(values) AS value
    FROM table) x
    GROUP BY id""")

**Opcion 2**    
User-Defined Function:  utilizando map() para recorrer cada elemento (valor) y realiza la operación de adición

    spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()

**Transform():** toma un array y una función anónima como entrada. La función crea un nuevo array aplicando la función a cada elemento y luego asigna el resultado al array de salida (mas eficiente que UDFs).
    
    transform(values, value -> lambda expression)

*Ejemplos*

*Pyhton*

In [9]:
spark =(SparkSession
        .builder
        .appName("ejemplo")
        .getOrCreate())

In [10]:
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))]) ##Creo el esquema de los datos
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] ##Creo el array
t_c = spark.createDataFrame(t_list, schema) ##Creo el DF
t_c.createOrReplaceTempView("tC") ##Hago una vista temporal
t_c.show() ##Muestro el DF

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [11]:
##Funcion transform()
spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit FROM tC""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [21]:
35*9/5+32

95.0

In [12]:
##Funcion filter()
spark.sql("""SELECT celsius, filter(celsius, t -> t > 38) as high FROM tC""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [14]:
##Funcion exits()
spark.sql("""SELECT celsius, exists(celsius, t -> t = 38) as threshold FROM tC""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



*Scala*

In [26]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
    .builder
    .appName("ejemplo")
    .getOrCreate()

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@b041b74


In [18]:
//Creo el array
val t1 = Array(35, 36, 32, 30, 40, 42, 38) 
val t2 = Array(31, 32, 34, 55, 56)
//Transformo a DF
val tC = Seq(t1, t2).toDF("celsius") 
//Vista temporal
tC.createOrReplaceTempView("tC")
//Muestro DF
tC.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



t1: Array[Int] = Array(35, 36, 32, 30, 40, 42, 38)
t2: Array[Int] = Array(31, 32, 34, 55, 56)
tC: org.apache.spark.sql.DataFrame = [celsius: array<int>]


In [19]:
//Funcion transform()
spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit FROM tC""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [20]:
//Funcion filter()
spark.sql("""SELECT celsius, filter(celsius, t -> t > 38) as high FROM tC""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [22]:
//Funcion exits()
spark.sql("""SELECT celsius, exists(celsius, t -> t = 38) as threshold FROM tC""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



### Common DataFrames and Spark SQL Operations

*Scala*

**Creamos nuestra base de datos para los ejemplos**

In [1]:
import org.apache.spark.sql.functions._

Intitializing Scala interpreter ...

Spark Web UI available at http://EM2021002836.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1623842125218)
SparkSession available as 'spark'


import org.apache.spark.sql.functions._


In [2]:
// Set file paths
val delaysPath ="C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/departuredelays.csv"
val airportsPath ="C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/airport-codes-na.txt"

delaysPath: String = C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/departuredelays.csv
airportsPath: String = C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/airport-codes-na.txt


In [3]:
// Obtain airports data set
val airports = spark.read
 .option("header", "true")
 .option("inferschema", "true")
 .option("delimiter", "\t")
 .csv(airportsPath)
airports.createOrReplaceTempView("airports_na")

airports: org.apache.spark.sql.DataFrame = [City: string, State: string ... 2 more fields]


In [4]:
// Obtain departure Delays data set
val delays = spark.read
 .option("header","true")
 .csv(delaysPath)
 .withColumn("delay", expr("CAST(delay as INT) as delay"))
 .withColumn("distance", expr("CAST(distance as INT) as distance"))
delays.createOrReplaceTempView("departureDelays")

delays: org.apache.spark.sql.DataFrame = [date: string, delay: int ... 3 more fields]


In [5]:
// Create temporary small table
val foo = delays.filter(
 expr("""origin == 'SEA' AND destination == 'SFO' AND 
 date like '01010%' AND delay > 0"""))
foo.createOrReplaceTempView("foo")

foo: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, delay: int ... 3 more fields]


In [6]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [7]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [8]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



**Unions**:  dos tablas con las mismas columnas, unimos filas al final

In [9]:
// Union two tables
val bar = delays.union(foo)
bar.createOrReplaceTempView("bar")

bar: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, delay: int ... 3 more fields]


In [10]:
spark.sql("SELECT * FROM bar LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [11]:
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

spark.sql("""SELECT * FROM bar WHERE origin = 'SEA' AND destination = 'SFO' AND date LIKE '01010%' AND delay > 0""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



**Joins**: a traves de la PK unimos combinamos columnas de diferentes DF

In [14]:
foo.join(airports.as('air), $"air.IATA" === $"origin").select("City", "State", "date", "delay", "distance", "destination").show()
    
spark.sql("""SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination  FROM foo f JOIN airports_na a
ON a.IATA = f.origin""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



**Modifications**: adding new columns, dropping columns, renaming columns, pivoting

In [15]:
foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [17]:
//Adding new columns
import org.apache.spark.sql.functions.expr
val foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



import org.apache.spark.sql.functions.expr
foo2: org.apache.spark.sql.DataFrame = [date: string, delay: int ... 4 more fields]


In [18]:
//Dropping columns
val foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



foo3: org.apache.spark.sql.DataFrame = [date: string, distance: int ... 3 more fields]


In [19]:
//Renaming columns
val foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



foo4: org.apache.spark.sql.DataFrame = [date: string, distance: int ... 3 more fields]


*Python*

**Creamos nuestra base de datos para los ejemplos**

In [23]:
from pyspark.sql.functions import expr

In [25]:
tripdelaysFilePath ="C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/departuredelays.csv"
airportsnaFilePath ="C:/Users/nerea.gomez/Documents/Documentacion/Learning Spark/Datasets/airport-codes-na.txt"

In [26]:
# Obtain airports data set
airportsna = (spark.read
 .format("csv")
 .options(header="true", inferSchema="true", sep="\t")
 .load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")

In [27]:
# Obtain departure delays data set
departureDelays = (spark.read
 .format("csv")
 .options(header="true")
 .load(tripdelaysFilePath))

departureDelays = (departureDelays
 .withColumn("delay", expr("CAST(delay as INT) as delay"))
 .withColumn("distance", expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")

In [28]:
# Create temporary small table
foo = (departureDelays
 .filter(expr("""origin == 'SEA' and destination == 'SFO' and 
 date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")

In [29]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [30]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [31]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



**Unions**: dos tablas con las mismas columnas, unimos filas al final

In [32]:
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

In [37]:
spark.sql("SELECT * FROM bar LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [38]:
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

spark.sql("""SELECT * FROM bar WHERE origin = 'SEA' AND destination = 'SFO' AND date LIKE '01010%' AND delay > 0""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



**Joins**: a traves de la PK unimos combinamos columnas de diferentes DF

In [40]:
foo.join(airportsna, airportsna.IATA == foo.origin).select("City", "State", "date", "delay", "distance", "destination").show()

spark.sql("""SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination  FROM foo f JOIN airports_na a
ON a.IATA = f.origin""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



**Windowing**: utiliza los valores de las filas de una ventana para devolver un conjunto de valores, en forma de otra fila. Con estas funciones es posible operar en un grupo de filas mientras se devuelve un unico valor para cada fila de entrada. (pag173)

In [44]:
spark.sql("""DROP TABLE IF EXISTS departureDelaysWindow;""")

DataFrame[]

In [45]:
spark.sql("""CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination;""")

DataFrame[]

In [48]:
spark.sql("""SELECT * FROM departureDelaysWindow""").show(10)

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   SEA|        LAX|       9359|
|   JFK|        SFO|      35619|
|   SFO|        ORD|      27412|
|   JFK|        DEN|       4315|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   JFK|        ATL|      12141|
|   SFO|        ATL|       5091|
+------+-----------+-----------+
only showing top 10 rows



In [53]:
spark.sql("""SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
FROM departureDelaysWindow
WHERE origin = 'JFK'
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
LIMIT 3""").show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        LAX|      35755|
|   JFK|        SFO|      35619|
|   JFK|        ATL|      12141|
+------+-----------+-----------+



In [55]:
spark.sql("""SELECT origin, destination, TotalDelays, rank FROM ( 
 SELECT origin, destination, TotalDelays, dense_rank() 
 OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank 
 FROM departureDelaysWindow) t WHERE rank <= 3""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
+------+-----------+-----------+----+



**Modifications**: adding new columns, dropping columns, renaming columns, pivoting

In [57]:
foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [58]:
##Adding new columns
from pyspark.sql.functions import expr

foo2 = (foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [59]:
##Dropping columns
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [60]:
##Renaming columns
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



In [63]:
##Pivoting
spark.sql("""SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays 
WHERE origin = 'SEA'""").show()


spark.sql("""SELECT * 
FROM (SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay FROM departureDelays WHERE origin = 'SEA')
PIVOT (CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay FOR month IN (1 JAN, 2 FEB))
ORDER BY destination""").show()

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
|        ORD|    1|    0|
|        DFW|    1|   23|
|        DFW|    1|   36|
|        ORD|    1|  298|
|        JFK|    1|    4|
|        DFW|    1|    0|
|        MIA|    1|    2|
|        DFW|    1|    0|
|        DFW|    1|    0|
|        ORD|    1|   83|
+-----------+-----+-----+
only showing top 20 rows

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       