**Spark SQL UDFs**

In [2]:
val cubed = (s: Long) => {
    s * s * s
}

spark.udf.register("cubed", cubed)

cubed: Long => Long = $Lambda$2105/0x00000008015d0260@61c6fdcf
res1: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2105/0x00000008015d0260@61c6fdcf,LongType,List(Some(class[value[0]: bigint])),Some(class[value[0]: bigint]),Some(cubed),false,true)


In [4]:
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [10]:
spark.sql("SELECT id, cubed(id) as id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



In [2]:
val t1 = Array(35, 36, 32, 30, 40, 42, 38)
val t2 = Array(31, 32, 34, 55, 56)
val tC = Seq(t1, t2).toDF("celsius")
tC.createOrReplaceTempView("tC")

t1: Array[Int] = Array(35, 36, 32, 30, 40, 42, 38)
t2: Array[Int] = Array(31, 32, 34, 55, 56)
tC: org.apache.spark.sql.DataFrame = [celsius: array<int>]


In [4]:
tC.show(false)

+----------------------------+
|celsius                     |
+----------------------------+
|[35, 36, 32, 30, 40, 42, 38]|
|[31, 32, 34, 55, 56]        |
+----------------------------+



In [6]:
// transform() function
spark.sql("""SELECT celsius,
    transform(celsius, t -> ((t * 9) div 5) + 32) as farenheit FROM tC""").show(false)

+----------------------------+-------------------------------+
|celsius                     |farenheit                      |
+----------------------------+-------------------------------+
|[35, 36, 32, 30, 40, 42, 38]|[95, 96, 89, 86, 104, 107, 100]|
|[31, 32, 34, 55, 56]        |[87, 89, 93, 131, 132]         |
+----------------------------+-------------------------------+



In [7]:
// filter() function
spark.sql("""SELECT celsius,
    filter(celsius, t-> t > 38) as high FROM tC""").show(false)

+----------------------------+--------+
|celsius                     |high    |
+----------------------------+--------+
|[35, 36, 32, 30, 40, 42, 38]|[40, 42]|
|[31, 32, 34, 55, 56]        |[55, 56]|
+----------------------------+--------+



In [8]:
// exists() function
spark.sql("""SELECT celsius,
    exists(celsius, t -> t = 38) as threshold FROM tC""").show(false)

+----------------------------+---------+
|celsius                     |threshold|
+----------------------------+---------+
|[35, 36, 32, 30, 40, 42, 38]|true     |
|[31, 32, 34, 55, 56]        |false    |
+----------------------------+---------+



In [36]:
// reduce() function
spark.sql("""SELECT celsius,
    aggregate(celsius, 0, (t, acc) -> t+acc, acc -> (acc div size(celsius) * 9 div 5)+32) as avgFahrenheit
    FROM tC""").show(false)

+----------------------------+-------------+
|celsius                     |avgFahrenheit|
+----------------------------+-------------+
|[35, 36, 32, 30, 40, 42, 38]|96           |
|[31, 32, 34, 55, 56]        |105          |
+----------------------------+-------------+



In [13]:
val delaysPath = "../data/departuredelays.csv"
val airportsPath = "../data/airport-codes-na.txt"

val airports = spark.read
    .option("header", "true")
    .option("inferSchema","true")
    .option("delimiter","\t")
    .csv(airportsPath)
airports.createOrReplaceTempView("airports_na")

val delays = spark.read
    .option("header", "true")
    .csv(delaysPath)
    .withColumn("delay", expr("CAST(delay as INT) as delay"))
    .withColumn("distance", expr("CAST (distance as INT) as distance"))
delays.createOrReplaceTempView("departureDelays")

delaysPath: String = ../data/departuredelays.csv
airportsPath: String = ../data/airport-codes-na.txt
airports: org.apache.spark.sql.DataFrame = [City: string, State: string ... 2 more fields]
delays: org.apache.spark.sql.DataFrame = [date: string, delay: int ... 3 more fields]


In [14]:
val foo = delays.filter(
    expr("""origin == 'SEA' AND destination == 'SFO' AND date like '01010%' AND delay > 0"""))
foo.createOrReplaceTempView("foo")

foo: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, delay: int ... 3 more fields]


In [15]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [16]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [17]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [18]:
// UNIONS
val bar = delays.union(foo)
bar.createOrReplaceTempView("bar")
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



bar: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, delay: int ... 3 more fields]


In [21]:
// JOINS
foo.join(airports.as("air"), $"air.IATA" === $"origin")
    .select("City","State","date","delay","distance","destination").show()

// sql
spark.sql("""SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
    FROM foo f
    JOIN airports_na a
    ON a.IATA = f.origin""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [23]:
val dWindow = spark.sql("""SELECT origin, destination, SUM(delay) as TotalDelays
    FROM departureDelays 
    WHERE origin IN ('SEA','SFO','JFK') AND destination IN('SEA','SFO','JFK','DEN','ORD','LAX','ATL')
    GROUP BY origin, destination""")
dWindow.show()
dWindow.createOrReplaceTempView("departureDelaysWindow")

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   SEA|        LAX|       9359|
|   JFK|        SFO|      35619|
|   SFO|        ORD|      27412|
|   JFK|        DEN|       4315|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   JFK|        ATL|      12141|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



dWindow: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 1 more field]


In [24]:
// WINDOW FUNCTION
// dense_rank()
spark.sql("""SELECT origin, destination, TotalDelays, rank
    FROM (
        SELECT origin, destination, TotalDelays, dense_rank()
            OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
            FROM departureDelaysWindow
    ) t
    WHERE rank <= 3""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
+------+-----------+-----------+----+



In [27]:
// ADDING NEW COLUMNS

val foo2 = foo.withColumn("status",expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



foo2: org.apache.spark.sql.DataFrame = [date: string, delay: int ... 4 more fields]


In [28]:
// DROPPING COLUMNS
val foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



foo3: org.apache.spark.sql.DataFrame = [date: string, distance: int ... 3 more fields]


In [29]:
// RENAMING COLUMNS
val foo4 = foo3.withColumnRenamed("status","flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



foo4: org.apache.spark.sql.DataFrame = [date: string, distance: int ... 3 more fields]


In [30]:
// PIVOTING
spark.sql("""SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
 FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
 CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
 FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination""").show()



+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D