## Analytic Functions

We can use Analytic Functions to compare current record with previous record or next record.
* `lead` and `lag` are the main functions.
* We can also compare each of the day of one week with corresponding day of another week.
* `lead` and `lag` serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.
* Here the examples are demonstrated using `lead`. Same can be achieved using `lag` however while defining the spec we have sort the data with in window in descending order to get similar results.
* Also we can use `first` and `last` functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).

Let us start spark context for this Notebook so that we can execute the code provided.

If you want to use terminal for the practice, here is the command to use.

```
spark2-shell \
  --master yarn \
  --name "Joining Data Sets" \
  --conf spark.ui.port=0
```

In [None]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    appName("Windowing Functions").
    master("yarn").
    getOrCreate()

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [None]:
import spark.implicits._

### Using LEAD

In [None]:
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
val airlines = spark.
  read.
  parquet(airlines_path)

In [None]:
import org.apache.spark.sql.functions.{col, lit, lpad, concat}

In [None]:
import org.apache.spark.sql.functions.lead

In [None]:
import org.apache.spark.sql.expressions.Window

In [None]:
val spec = Window.
    partitionBy("FlightDate", "Origin").
    orderBy(col("CRSDepTime"))

In [None]:
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("LeadUniqueCarrier", lead($"UniqueCarrier", 1).over(spec)).
    withColumn("LeadFlightNum", lead($"FlightNum", 1).over(spec)).
    withColumn("LeadCRSDepTime", lead($"CRSDepTime", 1).over(spec)).
    withColumn("LeadDepDelay", lead($"DepDelay", 1).over(spec)).
    orderBy("FlightDate", "Origin", "CRSDepTime").
    show

### Using LEAD with 7

In [None]:
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
val airlines = spark.
    read.
    parquet(airlines_path)

In [None]:
import org.apache.spark.sql.functions.{col, lit, lpad, concat}

In [None]:
import org.apache.spark.sql.functions.{sum, lead, substring}

In [None]:
import org.apache.spark.sql.expressions.Window

In [None]:
val spec = Window.
    partitionBy(substring($"FlightDate", 1, 6), $"Origin").
    orderBy($"FlightDate", $"TotalDepDelay".desc)

In [None]:
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("Origin = 'ORD'").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show

In [None]:
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("Origin = 'ORD' AND FlightDate BETWEEN 20080101 AND 20080107").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show

In [None]:
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("FlightDate BETWEEN 20080101 AND 20080107").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show