## Using rowsBetween and rangeBetween

We can get cumulative aggregations using `rowsBetween` or `rangeBetween`.

* We can use `rowsBetween` to include particular set of rows to perform aggregations.
* We can use `rangeBetween` to include particular range of values on a given column.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.spark.com/plans) to learn Spark SQL using our unique integrated LMS.

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

In [3]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

In [4]:
airtraffic = spark. \
  read. \
  parquet(airtraffic_path)

In [5]:
from pyspark.sql.window import Window

* Let us get cumulative delay at each airport using scheduled departure time as sorting criteria.

In [6]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [7]:
from pyspark.sql.functions import lpad, concat, col, sum

In [8]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        264|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        278|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        312|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        487|
|  20080101|   ABI|  

* We can also get cumulative aggregations based up on moving window. In this case we are getting cumulative aggregation using previous 3 records and current record.

In [16]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(-3, Window.currentRow)

In [17]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        263|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        207|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        104|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        257|
|  20080101|   ABI|  

* We can also use `rangeBetween` to get cumulative delay at each airport using scheduled departure time as sorting criteria. With `Window.unboundedPreceding` and `Window.currentRow`, the behavior is same as `rowsBetween`.

In [11]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rangeBetween(Window.unboundedPreceding, Window.currentRow)

In [12]:
from pyspark.sql.functions import lpad, concat, col, sum

In [13]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        264|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        278|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        312|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        487|
|  20080101|   ABI|  

* `rangeBetween` considers the values rather than rows. With the logic below, it only consider those values which are between current row value and current row - 100 values based up on the `CRSDepTime`.

In [14]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rangeBetween(-100, Window.currentRow)

In [15]:
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show(50)

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         70|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        207|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|         22|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|         34|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|         14|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|         48|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        223|
|  20080101|   ABI|  