# Basic Transformations

## Overview of Basic Transformations

We will cover filtering, aggregations and sorting as part of this module and look into joins and ranking in subsequent modules.

Let us define problem statements and come up with solutions to learn more about Data Frame APIs. 
* Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
 * Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
* Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day. 
 * Output should contain 4 columns - **FlightDate**, **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
 * **FlightDate** should be of **YYYY-MM-dd** format.
 * Data should be **sorted** in ascending order by **flightDate**

## Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    appName("Basic Transformations").
    //master("yarn").
    getOrCreate

spark = org.apache.spark.sql.SparkSession@155577d6


org.apache.spark.sql.SparkSession@155577d6

## Overview of Filtering
Let us understand few important details related to filtering before we get into the solution

In [2]:
val airlines_path = "flightmonth=200801"

airlines_path = flightmonth=200801


flightmonth=200801

In [4]:
import sys.process._

///"hdfs dfs -ls /public/airlines_all/airlines-part/flightmonth=200801" !
"ls -ltr flightmonth=200801" !

total 28648
-rw-r--r--@ 1 sethangavel  staff  14666683 Nov  9  2019 part-00252-9f6f50c3-9ea4-459e-8b66-c0f1699e7475.c000.snappy.parquet




0

In [5]:
val airlines = spark.read.parquet(airlines_path)

airlines = [Year: int, Month: int ... 29 more fields]


[Year: int, Month: int ... 29 more fields]

In [6]:
airlines.printSchema

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

* Filtering can be done either by using `filter` or `where`. These are like synonyms to each other.
* When it comes to the condition, we can either pass it in **SQL Style** or **Data Frame Style**.
* Example for SQL Style - `airlines.filter("IsArrDelayed = 'YES'").show() or airlines.where("IsArrDelayed = 'YES'").show()`
* Example for Data Frame Style - `airlines.filter(airlines("IsArrDelayed") === "YES").show()` or `airlines.filter($"IsArrDelayed" === "YES").show()`. We can also use where instead of filter.
* Here are the other operations we can perform to filter the data - `!=`, `>`, `<`, `>=`, `<=`, `LIKE`, `BETWEEN` with `AND`
* If we have to validate against multiple columns then we need to use boolean operations such as `AND` and `OR`.
* If we have to compare each column value with multiple values then we can use the `IN` operator.

### Tasks

Let us perform some tasks to understand filtering in detail. Solve all the problems by passing  conditions using both SQL Style as well as API Style.

* Read the data for the month of 2008 January.

In [8]:
val airlines_path = "flightmonth=200801"

airlines_path = flightmonth=200801


flightmonth=200801

In [9]:
val airlines_all = spark.
    read.
    parquet(airlines_path)

airlines_all = [Year: int, Month: int ... 29 more fields]


[Year: int, Month: int ... 29 more fields]

In [10]:
airlines_all.printSchema

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [None]:
val airlines = airlines_all.
    select("Year", "Month", "DayOfMonth",
           "DepDelay", "ArrDelay", "UniqueCarrier", 
           "FlightNum", "IsArrDelayed", "IsDepDelayed"
          )

In [None]:
airlines.show

#### Get count of flights which are departed late at origin and reach destination early or on time.


In [None]:
airlines.show

* SQL Style

In [None]:
airlines.
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'").
    count

* API Style

In [None]:
import org.apache.spark.sql.functions.col

In [None]:
airlines.
    filter(col("IsDepDelayed") === "YES" and col("IsArrDelayed") === "NO").
    count

In [None]:
import spark.implicits._

In [None]:
airlines.
    filter($"IsDepDelayed" === "YES" and $"IsArrDelayed" === "NO").
    count

In [None]:
airlines.
    filter(airlines("IsDepDelayed") === "YES" and airlines("IsArrDelayed") === "NO").
    count

#### Get count of flights which are departed late from origin by more than 60 minutes.


* SQL Style

In [None]:
airlines.
    filter("DepDelay > 60").
    count


* API Style

In [None]:
import org.apache.spark.sql.functions.col

In [None]:
airlines.
    filter(col("DepDelay") > 60).
    count

In [None]:
import spark.implicits._

In [None]:
airlines.
    filter($"DepDelay" > 60).
    count

In [None]:
airlines.
    filter(airlines("DepDelay") > 60).
    count

#### Get count of flights which are departed early or on time but arrive late by at least 15 minutes.


* SQL Style

In [None]:
airlines.
    filter("IsDepDelayed = 'NO' AND ArrDelay >= 15").
    count

* API Style

In [None]:
import org.apache.spark.sql.functions.col

In [None]:
airlines.
    filter(col("IsDepDelayed") === "NO" and col("ArrDelay") >= 15).
    count

In [None]:
import spark.implicits._

In [None]:
airlines.
    filter($"IsDepDelayed" === "NO" and $"ArrDelay" >= 15).
    count

In [None]:
airlines.
    filter(airlines("IsDepDelayed") === "NO" and airlines("ArrDelay") >= 15).
    count

#### Get count of flights departed from following major airports - ORD, DFW, ATL, LAX, SFO.

* SQL Style

In [None]:
airlines_all.
    filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')").
    select("Origin").
    distinct.
    show

In [None]:
airlines_all.
    filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')").
    count

* API Style

In [None]:
import org.apache.spark.sql.functions.col

In [None]:
airlines_all.
    filter(col("Origin") isin ("ORD", "DFW", "ATL", "LAX", "SFO")).
    count

In [None]:
import spark.implicits._

In [None]:
airlines_all.
    filter($"Origin" isin ("ORD", "DFW", "ATL", "LAX", "SFO")).
    count

In [None]:
airlines_all.
    filter(airlines_all("Origin") isin ("ORD", "DFW", "ATL", "LAX", "SFO")).
    count

#### Get count of flights departed late between 2008 January 1st to January 9th using FlightDate.
* Date should be of `yyyyMMdd` format.


In [None]:
airlines.show

* Add a column FlightDate by using Year, Month and DayOfMonth. Format should be `yyyyMMdd`.


In [None]:
import org.apache.spark.sql.functions.{lpad, concat, col}

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    show

* SQL Style

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter("FlightDate LIKE '2008010%' AND IsDepDelayed = 'YES'").
    count

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter("FlightDate BETWEEN '20080101' AND '20080109' AND IsDepDelayed = 'YES'").
    count

* API Style

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter($"FlightDate" like "2008010%" and $"IsDepDelayed" === "YES").
    count

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter($"FlightDate" between ("20080101", "20080109") and $"IsDepDelayed" === "YES").
    count

#### Get number of flights departed late on Sundays.

In [None]:
val l = List("X")

In [None]:
import spark.implicits._

In [None]:
val df = l.toDF("dummy")

In [None]:
import org.apache.spark.sql.functions.current_date

In [None]:
df.select(current_date).show

In [None]:
import org.apache.spark.sql.functions.date_format

In [None]:
df.select(current_date, date_format(current_date, "EEEE")).show

* SQL Style

In [None]:
import org.apache.spark.sql.functions.to_date

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter("date_format(to_date(FlightDate, 'yyyyMMdd'), 'EEEE') = 'Sunday' AND IsDepDelayed = 'YES'").
    count

* API Style

In [None]:
airlines.
    withColumn("FlightDate", 
               concat(col("Year"), 
                      lpad(col("Month"), 2, "0"), 
                      lpad(col("DayOfMOnth"), 2, "0")
                     )
              ).
    filter(date_format(to_date($"FlightDate", "yyyyMMdd"), "EEEE") === "Sunday" and $"IsDepDelayed" === "YES").
    count

## Overview of Aggregations

Let us go through the details related to aggregations using Spark.

* We can perform total aggregations directly on Dataframe or we can perform aggregations after grouping by a key(s).
* Here are the APIs which we typically use to group the data using a key.
 * `groupBy`
 * `rollup`
 * `cube`
* Here are the functions which we typically use to perform aggregations.
 * `count`
 * `sum`, `avg`
 * `min`, `max`
* If we want to provide aliases to the aggregated fields then we have to use `agg` after `groupBy`.
* Let us get the count of flights for each day for the month of 200801.

In [None]:
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
val airlines = spark.
    read.
    parquet(airlines_path)

In [None]:
import org.apache.spark.sql.functions.{lpad, concat, count, lit}

In [None]:
import spark.implicits._

In [None]:
airlines.
    groupBy(concat(
        $"Year", 
        lpad($"Month", 2, "0"), 
        lpad($"DayOfMonth", 2, "0")).alias("FlightDate")
    ).
    agg(count(lit(1)).alias("FlightCount")).
    show

## Overview of Sorting

Let us understand how to sort the data in a Data Frame.
* We can use `orderBy` or `sort` to sort the data.
* We can perform composite sorting by passing multiple columns or expressions.
* By default data is sorted in ascending order, we can change it to descending by applying `desc()` function on the column or expression.
* Let us sort the Flight Count for each day for the month of 2008 January in descending order by count

In [None]:
val flightCountDaily = airlines.
    groupBy(concat(
        $"Year", 
        lpad($"Month", 2, "0"), 
        lpad($"DayOfMonth", 2, "0")).alias("FlightDate")
    ).
    agg(count(lit(1)).alias("FlightCount"))

In [None]:
flightCountDaily.orderBy($"FlightCount".desc).show

## Solutions - Problem 1
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
* Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**

### Reading airlines data

In [None]:
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
val airlines_all = spark.
    read.
    parquet(airlines_path)

In [None]:
val airlines = airlines_all.
    select("Year", "Month", "DayOfMonth",
           "DepDelay", "ArrDelay", "UniqueCarrier", 
           "FlightNum", "IsArrDelayed", "IsDepDelayed"
          )

In [None]:
airlines.printSchema

In [None]:
airlines.show

### Get flights with delayed arrival

In [None]:
//SQL Style
airlines.filter("IsArrDelayed = 'YES'").show

In [None]:
// API Style
airlines.filter(airlines("IsArrDelayed") === "YES").show

In [None]:
import org.apache.spark.sql.functions.col

In [None]:
airlines.filter(col("IsArrDelayed") === "YES").show

In [None]:
import spark.implicits._

In [None]:
airlines.filter($"IsArrDelayed" === "YES").show

### Get delayed counts

In [None]:
// Departure Delayed Count
airlines.
    filter(airlines("IsDepDelayed") === "YES").
    count

In [None]:
// Arrival Delayed Count
airlines.
    filter(airlines("IsArrDelayed") === "YES").
    count

### Final Solution

In [None]:
import org.apache.spark.sql.functions.{col, lit, count, sum, expr}

In [None]:
airlines.agg(count(lit(1)).alias("FlightCount"),
             sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
             sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
            ).show

## Solutions - Problem 2

Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day. 

* Output should contain 4 columns - **FlightDate**, **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
* **FlightDate** should be of **yyyy-MM-dd** format.
*   Data should be **sorted** in ascending order by **flightDate**

In [None]:
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
val airlines_all = spark.
    read.
    parquet(airlines_path)

In [None]:
val airlines = airlines_all.
    select("Year", "Month", "DayOfMonth",
           "DepDelay", "ArrDelay", "UniqueCarrier", 
           "FlightNum", "IsArrDelayed", "IsDepDelayed"
          )

In [None]:
airlines.printSchema

In [None]:
airlines.show

### Grouping Data by Flight Date

In [None]:
import org.apache.spark.sql.functions.{lit, col, lpad, concat}

In [None]:
airlines.
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  )
           )

### Getting Counts by Flight Date

In [None]:
airlines.
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate")
           ).
    count.
    show

In [None]:
airlines.
    filter("IsDepDelayed = 'YES'").
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate")
           ).
    count.
    show

In [None]:
airlines.
    filter("IsArrDelayed = 'YES'").
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate")
           ).
    count.
    show

### Getting total as well as delayed counts for each day

In [None]:
import org.apache.spark.sql.functions.{sum, count, expr}

In [None]:
airlines.
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate")
           ).
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ).
    show

### Sorting Data By FlightDate

In [None]:
airlines.
    groupBy(concat($"Year", 
                   lit("-"),
                   lpad($"Month", 2, "0"), 
                   lit("-"),
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate")
           ).
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ).
    orderBy("FlightDate").
    show(31)