# Window LAG

## COVID-19 Data
Notes on the data: This data was assembled based on work done by [Rodrigo Pombo](https://github.com/pomber/covid19) based on [John Hopkins University](https://systems.jhu.edu/research/public-health/ncov/), based on [World Health Organisation](https://www.who.int/health-topics/coronavirus). The data was assembled 21st April 2020 - there are no plans to keep this data set up to date.

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.4.0`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val spark = {
    NotebookSparkSession.builder()
    .progress(false)
    .appName("app09+")
    // .master("spark://192.168.31.31:7077")
    .master("local[*]")
    .config("spark.sql.warehouse.dir", 
            "hdfs://192.168.31.31:9000/user/hive/warehouse") 
    .config("spark.cores.max", "4") 
    .config("spark.executor.instances", "1") 
    .config("spark.executor.cores", "2") 
    .config("spark.executor.memory", "10g") 
    .config("spark.shuffle.service.enabled", "false") 
    .config("spark.dynamicAllocation.enabled", "false") 
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.sql.repl.eagerEval.enabled", "true")
    .config("spark.driver.allowMultipleContexts", "true")
    .getOrCreate()
}

Loading spark-stubs, spark-hive
Adding Hive conf dir /opt/hive/conf to classpath
Creating SparkSession


SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.


[32mimport [39m[36m$ivy.$                                  

[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3023096e

In [2]:
import spark.implicits._
def sc = spark.sparkContext
val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)

[32mimport [39m[36mspark.implicits._
[39m
defined [32mfunction[39m [36msc[39m
[36mhiveCxt[39m: [32msql[39m.[32mhive[39m.[32mHiveContext[39m = org.apache.spark.sql.hive.HiveContext@193516e9

In [3]:
// Credit to Aivean
implicit class RichDF(val ds:DataFrame) {
    def showHTML(limit: Int = 50, truncate: Int = 100) = {
        import xml.Utility.escape
        val data = ds.take(limit)
        val header = ds.schema.fieldNames.toSeq        
        val rows: Seq[Seq[String]] = data.map { row =>
          row.toSeq.map {cell =>
            val str = cell match {
              case null => "null"
              case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
              case array: Array[_] => array.mkString("[", ", ", "]")
              case seq: Seq[_] => seq.mkString("[", ", ", "]")
              case _ => cell.toString
            }
            if (truncate > 0 && str.length > truncate) {
              // do not show ellipses for strings shorter than 4 characters.
              if (truncate < 4) str.substring(0, truncate)
              else str.substring(0, truncate - 3) + "..."
            } else {
              str
            }
          }: Seq[String]
        }
    publish.html(s""" <table>
                <tr>
                 ${header.map(h => s"<th>${escape(h)}</th>").mkString}
                </tr>
                ${rows.map {row =>
                  s"<tr>${row.map{c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
                }.mkString}
            </table>
        """)
    }
}

defined [32mclass[39m [36mRichDF[39m

In [4]:
val covid = hiveCxt.table("sqlzoo.covid")
val world = hiveCxt.table("sqlzoo.world")

[36mcovid[39m: [32mDataFrame[39m = [name: string, whn: string ... 3 more fields]
[36mworld[39m: [32mDataFrame[39m = [name: string, continent: string ... 6 more fields]

## Window Function
The SQL Window functions include LAG, LEAD, RANK and NTILE. These functions operate over a "window" of rows - typically these are rows in the table that are in some sense adjacent.

## 1. Introducing the `covid` table

The example uses a WHERE clause to show the cases in 'Italy' in March.

**Modify the query to show data from Spain**

In [5]:
(covid.withColumn("day", dayofmonth(covid("whn")))
    .filter((covid("name")==="Spain") && (month(covid("whn"))===3))
    .select("name", "day", "confirmed", "deaths", "recovered")
    .orderBy("day")
    .showHTML())

name,day,confirmed,deaths,recovered
Spain,1,84,0,2
Spain,2,120,0,2
Spain,3,165,1,2
Spain,4,222,2,2
Spain,5,259,3,2
Spain,6,400,5,2
Spain,7,500,10,30
Spain,8,673,17,30
Spain,9,1073,28,32
Spain,10,1695,35,32


## 2. Introducing the LAG function

The LAG function is used to show data from the preceding row or the table. When lining up rows the data is partitioned by country name and ordered by the data whn. That means that only data from Italy is considered.

**Modify the query to show confirmed for the day before.**

In [6]:
(covid.withColumn("day", dayofmonth(covid("whn")))
     .withColumn("mo", month(covid("whn")))
     .filter((covid("name")==="Italy") && 
             (month(covid("whn"))===3))
     .withColumn("lag_cfrm", lag(col("confirmed"), 1)
                 .over(Window.orderBy("day").partitionBy("name")))
     .select("name", "day", "confirmed", "mo", "lag_cfrm")
     .showHTML())

name,day,confirmed,mo,lag_cfrm
Italy,1,1694,3,
Italy,2,2036,3,1694.0
Italy,3,2502,3,2036.0
Italy,4,3089,3,2502.0
Italy,5,3858,3,3089.0
Italy,6,4636,3,3858.0
Italy,7,5883,3,4636.0
Italy,8,7375,3,5883.0
Italy,9,9172,3,7375.0
Italy,10,10149,3,9172.0


### LAG operation

Here is the correct query showing the cases for the day before:

```sql
SELECT name, DAY(whn), confirmed,
   LAG(confirmed, 1) OVER (partition by name ORDER BY whn) AS lag
 FROM covid
WHERE name = 'Italy'
AND MONTH(whn) = 3
ORDER BY whn
```

Notice how the values in the LAG column match the value of the row diagonally above and to the left.

name | DAY(whn) | confirmed | dbf
------|---|------|----------
Italy | 1 | **1694** | null
Italy | 2 | 2036 | **1694**
Italy | 3 | 2502 | 2036
Italy | 4 | 3089 | 2502
Italy | 5 | **3858** | 3089
Italy | 6 | 4636 | **3858**
Italy | 7 | 5883 | 4636
Italy | 8 | 7375 | 5883
Italy | 9 | 9172 | 7375
Italy | 10 | 10149 | 9172
... | | |

## 3. Number of new cases

The number of confirmed case is cumulative - but we can use LAG to recover the number of new cases reported for each day.

**Show the number of new cases for each day, for Italy, for March.**

In [7]:
(covid.withColumn("day", dayofmonth(covid("whn")))
     .filter((covid("name")==="Italy") && 
             (month(covid("whn"))===3))
     .withColumn("new", col("confirmed") - lag(col("confirmed"), 1).over(
         Window.orderBy("day").partitionBy("name")))
     .select("name", "day", "new")
     .showHTML())

name,day,new
Italy,1,
Italy,2,342.0
Italy,3,466.0
Italy,4,587.0
Italy,5,769.0
Italy,6,778.0
Italy,7,1247.0
Italy,8,1492.0
Italy,9,1797.0
Italy,10,977.0


## 4. Weekly changes

The data gathered are necessarily estimates and are inaccurate. However by taking a longer time span we can mitigate some of the effects.

You can filter the data to view only Monday's figures **WHERE WEEKDAY(whn) = 0**.

**Show the number of new cases in Italy for each week - show Monday only.**

In [8]:
(covid.filter((covid("name")==="Italy") && 
              (dayofweek(covid("whn"))===1))
     .withColumn("new", col("confirmed")-lag(col("confirmed"), 1)
                 .over(Window.orderBy("whn").partitionBy("name")))
     .select("name", "whn", "new")
     .showHTML())

name,whn,new
Italy,2020-01-26,
Italy,2020-02-02,2.0
Italy,2020-02-09,1.0
Italy,2020-02-16,0.0
Italy,2020-02-23,152.0
Italy,2020-03-01,1539.0
Italy,2020-03-08,5681.0
Italy,2020-03-15,17372.0
Italy,2020-03-22,34391.0
Italy,2020-03-29,38551.0


## 5. LAG using a JOIN

You can JOIN a table using DATE arithmetic. This will give different results if data is missing.

**Show the number of new cases in Italy for each week - show Monday only.**

In the sample query we JOIN this week tw with last week lw using the DATE_ADD function.

In [23]:
val a = covid.filter((covid("name")==="Italy") && 
                     (dayofweek(covid("whn"))===1))

(a.select("whn", "name", "confirmed")
     .join(a
           .withColumn("whn", to_date($"whn" + lit("7 days").cast(CalendarIntervalType)))
           .withColumnRenamed("confirmed", "confirmed2")
           .select("whn", "name", "confirmed2"), 
           Seq("whn", "name"), joinType="left")
     .withColumn("new", col("confirmed")-col("confirmed2"))
     .select("name", "whn", "new")
     .showHTML())

name,whn,new
Italy,2020-01-26,
Italy,2020-02-02,2.0
Italy,2020-02-09,1.0
Italy,2020-02-16,0.0
Italy,2020-02-23,152.0
Italy,2020-03-01,1539.0
Italy,2020-03-08,5681.0
Italy,2020-03-15,17372.0
Italy,2020-03-22,34391.0
Italy,2020-03-29,38551.0


[36ma[39m: [32mDataset[39m[[32mRow[39m] = [name: string, whn: string ... 3 more fields]

## 6. RANK()

The query shown shows the number of confirmed cases together with the world ranking for cases.

United States has the highest number, Spain is number 2...

Notice that while Spain has the second highest confirmed cases, Italy has the second highest number of deaths due to the virus.

**Include the ranking for the number of deaths in the table. Only include countries with a population of at least 10 million.**

In [24]:
(covid.join(world.select("name", "population"), Seq("name"))
    .filter((col("whn")==="2020-04-20") && 
            (col("population")>=1e7))
    .withColumn("rc1", rank().over(Window.orderBy(col("confirmed").desc)
                                .partitionBy("whn")))
    .withColumn("rc2", rank().over(Window.orderBy(col("deaths").desc)
                                  .partitionBy("whn")))
    .select("name", "confirmed", "rc1", "deaths", "rc2")
    .orderBy(col("confirmed").desc)
    .showHTML())

name,confirmed,rc1,deaths,rc2
United States,784326,1,42094,1
Spain,200210,2,20852,3
Italy,181228,3,24114,2
France,156480,4,20292,4
Germany,147065,5,4862,8
United Kingdom,125856,6,16550,5
Turkey,90980,7,2140,12
China,83817,8,4636,9
Iran,83505,9,5209,7
Russia,47121,10,405,23


## 7. Infection rate

The query shown includes a JOIN t the world table so we can access the total population of each country and calculate infection rates (in cases per 100,000).

**Show the infect rate ranking for each country. Only include countries with a population of at least 10 million.**

In [9]:
# a was obtained in #6
(a.withColumn("r_inf", round(1e5*a("confirmed")/a("population")))
     .withColumn("rc", rank().over(Window.orderBy(col("r_inf"))
                                  .partitionBy("whn")))
     .orderBy(col("population").desc())
     .select("name", "r_inf", "rc")
     .showHTML())

Unnamed: 0,name,r_inf,rc
0,China,6.0,50
1,India,1.0,23
2,United States,238.0,87
3,Indonesia,3.0,35
4,Pakistan,4.0,41
...,...,...,...
85,Jordan,4.0,41
86,Dominican Republic,48.0,73
87,Sweden,143.0,81
88,Portugal,203.0,85


## 8. Turning the corner

For each country that has had at last 1000 new cases in a single day, show the date of the peak number of new cases.

In [25]:
(covid.withColumn("new", (covid("confirmed")-lag(col("confirmed"), 1).over(
    Window.partitionBy("name").orderBy("whn"))))
     .na.fill(0, Array("new"))
     .withColumn("rc", rank().over(Window.partitionBy("name").orderBy(col("new").desc)))
     .filter((col("rc")===1) && (col("new")>1000))
     .select("name", "whn", "new")
     .orderBy("whn", "new")
     .showHTML())

name,whn,new
China,2020-02-13,15136
Italy,2020-03-21,6557
Switzerland,2020-03-23,1321
Israel,2020-03-25,1131
Spain,2020-03-25,9630
Austria,2020-03-26,1321
Germany,2020-03-27,6933
Iran,2020-03-30,3186
Canada,2020-04-05,2778
Netherlands,2020-04-10,1346


In [26]:
spark.stop()