# 1. Window function in spark

A window function performs a calculation across a set of rows(aka. Frame). The built-in
window functions provided by Spark SQL include two categories:
- Ranking functions:
- Analytic functions:


## 1.1 Window specification
To use window functions, we need to create a **window specification**. A window specification defines which rows
are included in the frame associated with a given input row. In another word, the window specification defines
the default frame of a window. A window specification can be classified into three categories:

1. PartitionBy specification:
       - Created with Window.partitionBy on one or more columns
       - All rows that have the same value on the partitionBy column will be in the same frame.
       - The aggregation functions can be applied on each frame
       - The windows functions can not be applied.


2. Ordered specification:
       - Created by using a partitionBy specification, followed by an orderBy specification
       - The frame is not static, it moves when we iterate each row. By default, the frame contains
         all previous rows and the currentRow.
       - The window function can be applied to each moving frame (i.e. currentRow+allPreviousRow)
       - The aggregation functions can be applied to each moving frame. As each row has a different
         frame, the result of the aggregation is different for each row. Unlike the partitionBy
         specification, all rows in the same partition has the same result.

3. Custom Range Frame specification: (check exp4)
       - Created by using a partitionBy specification,
       - Usually followed by an orderBy specification,
       - Then followed by "rangeBetween" or "rowsBetween"
       - Each row has a corresponding frame which is controlled by rangeBetween or rowsBetween. For example,
         rowsBetween(-3,Window.currentRow) means the three rows preceding the current row to the current row.
         It defines a frame including the current input row and three rows appearing before the current row.
       - Aggregation can be applied on each frame.


In spark SQL, the partition specification are defined by keyword "partitionBy", ordering specification is defined by
keyword "orderBy".


## 1.2 Windows function can be divided into following categories

1. Ranking functions:
  - rank: returns the rank of rows within a window partition
  - dense_rank: returns the rank of rows within a window partition, without any gaps. For example, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.

  - percent_rank: returns the relative rank (i.e. percentile) of rows within a window partition.
  - ntile(n:Int): returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get rank 1, the second quarter will get 2, the thirds quarter will get 3, and the last will get 4. If the rows are less than n, it works too.
  - row_number: returns a sequential number starting at 1 within a window partition.

2. Analytic functions:
   - cume_dist: returns the cumulative distribution of values within a window partition, i.e. the fraction of rows that are below the current row. N = total number of rows in the partition. cumeDist(x) = number of values before (and including) x / N. similar to percent_rank()
   - first()
   - last()
   - lag(e\:Column,offset\:Int,defaultValue\:Object): returns the value that is offset rows before the current row, and null if there is less than offset rows before row. For example, an offset of one will return the previous row at any given point in the window partition. The defaultValue is optional
   - lead(e:Column,offset:Int): returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
   - currentRow(): Window function: returns the special frame boundary that represents the current row in
                          the window partition.
3. Aggregation functions: All the aggregation function that we showed in S03_GroupByAndAggregation can be used here.
   - sum(e:Column): returns the sum of selecting column for each partitions.
   - first(e:Column): returns the first value within each partition.
   - last(e:Column): returns the last value within each partition.

In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import row_number, rank, dense_rank, percent_rank, ntile, cume_dist, lag, lead, col, avg, \
    min, max, sum, round, count, datediff, unix_timestamp, stddev, collect_list, element_at, size, sort_array, \
    broadcast, spark_partition_id, lit, coalesce

from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.master("local[2]").appName("Windows functions").getOrCreate()
data = [('Alex', '2018-10-10', 'Paint', 80),
        ('Alex', '2018-04-02', 'Ladder', 20),
        ('Alex', '2018-06-22', 'Stool', 20),
        ('Alex', '2018-12-09', 'Vacuum', 40),
        ('Alex', '2018-07-12', 'Bucket', 5),
        ('Alex', '2018-02-18', 'Gloves', 5),
        ('Alex', '2018-03-03', 'Brushes', 30),
        ('Alex', '2018-09-26', 'Sandpaper', 10),
        ('Bob', '2018-12-09', 'Vacuum', 40),
        ('Bob', '2018-07-12', 'Bucket', 5),
        ('Bob', '2018-02-18', 'Gloves', 5),
        ('Bob', '2018-03-03', 'Brushes', 30),
        ('Bob', '2018-09-26', 'Sandpaper', 10)]

df = spark.createDataFrame(data, schema=['name', 'date', 'product', 'price'])
print("source data frame: ")
df.printSchema()
df.show(truncate=False)

21/09/19 06:26:35 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.184.142 instead (on interface ens33)
21/09/19 06:26:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/19 06:26:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


source data frame: 
root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+----+----------+---------+-----+
|name|date      |product  |price|
+----+----------+---------+-----+
|Alex|2018-10-10|Paint    |80   |
|Alex|2018-04-02|Ladder   |20   |
|Alex|2018-06-22|Stool    |20   |
|Alex|2018-12-09|Vacuum   |40   |
|Alex|2018-07-12|Bucket   |5    |
|Alex|2018-02-18|Gloves   |5    |
|Alex|2018-03-03|Brushes  |30   |
|Alex|2018-09-26|Sandpaper|10   |
|Bob |2018-12-09|Vacuum   |40   |
|Bob |2018-07-12|Bucket   |5    |
|Bob |2018-02-18|Gloves   |5    |
|Bob |2018-03-03|Brushes  |30   |
|Bob |2018-09-26|Sandpaper|10   |
+----+----------+---------+-----+



                                                                                

In [5]:
# Window specifications for the notebook

# We first create a window specification by using column name
win_name = Window.partitionBy("name")

# The second specification takes the first and order it by using column price
win_name_ordered = win_name.orderBy("price")

# we use name partition, but this time, we will order it by using date
win_name_ordered_by_date = win_name.orderBy("date")

## 1.3 Ranking function example 
In this example, We show how to use window specification to create window. Then we will aplly below Ranking functions on ordered frame:
- row_number
- rank
- dense_rank
- percent_rank
- ntile

Note all above window functions require that the frame are ordered. You can try to
replace win_name_ordered by win_name and see what happens. 

In [3]:
# We first create a window specification by using column name
win_name = Window.partitionBy("name")
# The second specification takes the first and order it by using column price
win_name_ordered = win_name.orderBy("price")
# The final specification contains two partition "Alex", "Bob", each partition is ordered by price in ascending order.

# Create a new column by calling the row_number() ranking fucntion. 
# Two Things to be noted: 
# 1. To invoke a window function, we use "function_name.over(specification)"
# 2. You can notice the row number restarted from 1 for Bob, because it's in a new partition
df1 = df.withColumn("row_number", row_number().over(win_name_ordered))
   
df1.printSchema()
df1.show()

df1.withColumn("partition_id", spark_partition_id()).show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- row_number: integer (nullable = true)



                                                                                

+----+----------+---------+-----+----------+
|name|      date|  product|price|row_number|
+----+----------+---------+-----+----------+
|Alex|2018-07-12|   Bucket|    5|         1|
|Alex|2018-02-18|   Gloves|    5|         2|
|Alex|2018-09-26|Sandpaper|   10|         3|
|Alex|2018-04-02|   Ladder|   20|         4|
|Alex|2018-06-22|    Stool|   20|         5|
|Alex|2018-03-03|  Brushes|   30|         6|
|Alex|2018-12-09|   Vacuum|   40|         7|
|Alex|2018-10-10|    Paint|   80|         8|
| Bob|2018-07-12|   Bucket|    5|         1|
| Bob|2018-02-18|   Gloves|    5|         2|
| Bob|2018-09-26|Sandpaper|   10|         3|
| Bob|2018-03-03|  Brushes|   30|         4|
| Bob|2018-12-09|   Vacuum|   40|         5|
+----+----------+---------+-----+----------+

+----+----------+---------+-----+----------+------------+
|name|date      |product  |price|row_number|partition_id|
+----+----------+---------+-----+----------+------------+
|Alex|2018-07-12|Bucket   |5    |1         |74          |
|A

In [4]:
# create a new column with rank ranking function
# Note that for Alex partition, there is no rank2, because we have two items in rank 1, the third item goes to
# rank 3. If you want compact rank number, use dense rank
df2 = df.withColumn("rank", rank().over(win_name_ordered))
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- rank: integer (nullable = true)

+----+----------+---------+-----+----+
|name|      date|  product|price|rank|
+----+----------+---------+-----+----+
|Alex|2018-07-12|   Bucket|    5|   1|
|Alex|2018-02-18|   Gloves|    5|   1|
|Alex|2018-09-26|Sandpaper|   10|   3|
|Alex|2018-04-02|   Ladder|   20|   4|
|Alex|2018-06-22|    Stool|   20|   4|
|Alex|2018-03-03|  Brushes|   30|   6|
|Alex|2018-12-09|   Vacuum|   40|   7|
|Alex|2018-10-10|    Paint|   80|   8|
| Bob|2018-07-12|   Bucket|    5|   1|
| Bob|2018-02-18|   Gloves|    5|   1|
| Bob|2018-09-26|Sandpaper|   10|   3|
| Bob|2018-03-03|  Brushes|   30|   4|
| Bob|2018-12-09|   Vacuum|   40|   5|
+----+----------+---------+-----+----+



In [5]:
# create a column with dense rank
# Note that for Alex partition, even thought we have two items in rank 1, but the third item goes to
# rank 2 not 3.
df3 = df.withColumn("dense_rank", dense_rank().over(win_name_ordered))
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- dense_rank: integer (nullable = true)

+----+----------+---------+-----+----------+
|name|      date|  product|price|dense_rank|
+----+----------+---------+-----+----------+
|Alex|2018-07-12|   Bucket|    5|         1|
|Alex|2018-02-18|   Gloves|    5|         1|
|Alex|2018-09-26|Sandpaper|   10|         2|
|Alex|2018-04-02|   Ladder|   20|         3|
|Alex|2018-06-22|    Stool|   20|         3|
|Alex|2018-03-03|  Brushes|   30|         4|
|Alex|2018-12-09|   Vacuum|   40|         5|
|Alex|2018-10-10|    Paint|   80|         6|
| Bob|2018-07-12|   Bucket|    5|         1|
| Bob|2018-02-18|   Gloves|    5|         1|
| Bob|2018-09-26|Sandpaper|   10|         2|
| Bob|2018-03-03|  Brushes|   30|         3|
| Bob|2018-12-09|   Vacuum|   40|         4|
+----+----------+---------+-----+----------+



In [6]:
# create a column with percent rank, the percent is calculate by dense_rank_number/total_item_number
df4 = df.withColumn("percent_rank", percent_rank().over(win_name_ordered))
df4.printSchema()
df4.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- percent_rank: double (nullable = true)

+----+----------+---------+-----+-------------------+
|name|      date|  product|price|       percent_rank|
+----+----------+---------+-----+-------------------+
|Alex|2018-07-12|   Bucket|    5|                0.0|
|Alex|2018-02-18|   Gloves|    5|                0.0|
|Alex|2018-09-26|Sandpaper|   10| 0.2857142857142857|
|Alex|2018-04-02|   Ladder|   20|0.42857142857142855|
|Alex|2018-06-22|    Stool|   20|0.42857142857142855|
|Alex|2018-03-03|  Brushes|   30| 0.7142857142857143|
|Alex|2018-12-09|   Vacuum|   40| 0.8571428571428571|
|Alex|2018-10-10|    Paint|   80|                1.0|
| Bob|2018-07-12|   Bucket|    5|                0.0|
| Bob|2018-02-18|   Gloves|    5|                0.0|
| Bob|2018-09-26|Sandpaper|   10|                0.5|
| Bob|2018-03-03|  Brushes|   30|               

In [7]:
# create a column with ntile
# here we set n=3, which means we divide each window into 3 parts. The rows in the 1st part will get 1 as ntile_rank,
# The rows in the 2n part will get 2, etc.
df4 = df.withColumn("ntile_rank", ntile(3).over(win_name_ordered))
df4.printSchema()
df4.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- ntile_rank: integer (nullable = true)

+----+----------+---------+-----+----------+
|name|      date|  product|price|ntile_rank|
+----+----------+---------+-----+----------+
|Alex|2018-07-12|   Bucket|    5|         1|
|Alex|2018-02-18|   Gloves|    5|         1|
|Alex|2018-09-26|Sandpaper|   10|         1|
|Alex|2018-04-02|   Ladder|   20|         2|
|Alex|2018-06-22|    Stool|   20|         2|
|Alex|2018-03-03|  Brushes|   30|         2|
|Alex|2018-12-09|   Vacuum|   40|         3|
|Alex|2018-10-10|    Paint|   80|         3|
| Bob|2018-07-12|   Bucket|    5|         1|
| Bob|2018-02-18|   Gloves|    5|         1|
| Bob|2018-09-26|Sandpaper|   10|         2|
| Bob|2018-03-03|  Brushes|   30|         2|
| Bob|2018-12-09|   Vacuum|   40|         3|
+----+----------+---------+-----+----------+



## 1.4 Analytic function examples
In this example, we show how to use analytic functions on ordered frame
- cume_dist
- lag
- lead

Note all above window functions require that the **frame are ordered**.

In [8]:
# create a new column that shows the cumulative_distribution 
df_cume_dist = df.withColumn("cumulative_distribution", cume_dist().over(win_name_ordered))
df_cume_dist.printSchema()
df_cume_dist.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- cumulative_distribution: double (nullable = true)

+----+----------+---------+-----+-----------------------+
|name|      date|  product|price|cumulative_distribution|
+----+----------+---------+-----+-----------------------+
|Alex|2018-07-12|   Bucket|    5|                   0.25|
|Alex|2018-02-18|   Gloves|    5|                   0.25|
|Alex|2018-09-26|Sandpaper|   10|                  0.375|
|Alex|2018-04-02|   Ladder|   20|                  0.625|
|Alex|2018-06-22|    Stool|   20|                  0.625|
|Alex|2018-03-03|  Brushes|   30|                   0.75|
|Alex|2018-12-09|   Vacuum|   40|                  0.875|
|Alex|2018-10-10|    Paint|   80|                    1.0|
| Bob|2018-07-12|   Bucket|    5|                    0.4|
| Bob|2018-02-18|   Gloves|    5|                    0.4|
| Bob|2018-09-26|Sandpaper|   10|      

In [9]:
# create a new column that shows the lag value by using price.
# The lag function takes a column name (e.g. price) and an offset (3). 
# note if we set offset as 2, the first two row of lag is null, and the third rows gets the first row value of the
# price column. If we set offset as 3, the first three rows will be null, and the fourth rows get the first row
# value.
df_lag = df.withColumn("lag", lag("price", 3).over(win_name_ordered))
df_lag.printSchema()
df_lag.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- lag: long (nullable = true)

+----+----------+---------+-----+----+
|name|      date|  product|price| lag|
+----+----------+---------+-----+----+
|Alex|2018-07-12|   Bucket|    5|null|
|Alex|2018-02-18|   Gloves|    5|null|
|Alex|2018-09-26|Sandpaper|   10|null|
|Alex|2018-04-02|   Ladder|   20|   5|
|Alex|2018-06-22|    Stool|   20|   5|
|Alex|2018-03-03|  Brushes|   30|  10|
|Alex|2018-12-09|   Vacuum|   40|  20|
|Alex|2018-10-10|    Paint|   80|  20|
| Bob|2018-07-12|   Bucket|    5|null|
| Bob|2018-02-18|   Gloves|    5|null|
| Bob|2018-09-26|Sandpaper|   10|null|
| Bob|2018-03-03|  Brushes|   30|   5|
| Bob|2018-12-09|   Vacuum|   40|   5|
+----+----------+---------+-----+----+



In [10]:
# create a new column that pushes up 3 row of price column.
# note if we set offset as 2, the last two row of lead is null in each partition, and the last third row gets the
# value of last row of the price column. If we set offset as 3, the last three rows will be null, and the last
# fourth rows get the last row value.
df_lead = df.withColumn("lead", lead("price", 3).over(win_name_ordered))
df_lead.printSchema()
df_lead.show()

root
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product: string (nullable = true)
 |-- price: long (nullable = true)
 |-- lead: long (nullable = true)

+----+----------+---------+-----+----+
|name|      date|  product|price|lead|
+----+----------+---------+-----+----+
|Alex|2018-07-12|   Bucket|    5|  20|
|Alex|2018-02-18|   Gloves|    5|  20|
|Alex|2018-09-26|Sandpaper|   10|  30|
|Alex|2018-04-02|   Ladder|   20|  40|
|Alex|2018-06-22|    Stool|   20|  80|
|Alex|2018-03-03|  Brushes|   30|null|
|Alex|2018-12-09|   Vacuum|   40|null|
|Alex|2018-10-10|    Paint|   80|null|
| Bob|2018-07-12|   Bucket|    5|  30|
| Bob|2018-02-18|   Gloves|    5|  40|
| Bob|2018-09-26|Sandpaper|   10|null|
| Bob|2018-03-03|  Brushes|   30|null|
| Bob|2018-12-09|   Vacuum|   40|null|
+----+----------+---------+-----+----+



### Quiz 1.
Could you show the days from the previous purchase? Or the days before next purchase?

In [14]:
# here we set lag on date column with offset 1, it means the second row will have the value of first row, then
# apply the datediff function on this value with the current row date value, then we get days from the last
# purchase.
# Use the same logic by using lead, we get the days before next purchase, if we set offset as 2, we will get
# the days before next 2 purchase

# Step1 we create a new column called lag_date which is the date of previous purchase
df_lag_day = df.withColumn("lag_date",lag('date',1).over(win_name.orderBy(col('date'))))
df_lag_day.show()

# Step2, we create a column "days" by doing a date diff on "date" and "lag_date" 
df_diff=df_lag_day.withColumn("days_from_previous_purchase",datediff("date","lag_date"))
df_diff.show()

# If you use lead
# Step1 we create a new column called lead_date which is the date of previous purchase
df_lead_day = df.withColumn("lead_date",lead('date',1).over(win_name.orderBy(col('date'))))
df_lead_day.show()

# Step2, we create a column "days" by doing a date diff on "date" and "lag_date" 
df_diff=df_lead_day.withColumn("days_before_next_purchase",datediff("lead_date","date"))
df_diff.show()

+----+----------+---------+-----+----------+
|name|      date|  product|price|  lag_date|
+----+----------+---------+-----+----------+
|Alex|2018-02-18|   Gloves|    5|      null|
|Alex|2018-03-03|  Brushes|   30|2018-02-18|
|Alex|2018-04-02|   Ladder|   20|2018-03-03|
|Alex|2018-06-22|    Stool|   20|2018-04-02|
|Alex|2018-07-12|   Bucket|    5|2018-06-22|
|Alex|2018-09-26|Sandpaper|   10|2018-07-12|
|Alex|2018-10-10|    Paint|   80|2018-09-26|
|Alex|2018-12-09|   Vacuum|   40|2018-10-10|
| Bob|2018-02-18|   Gloves|    5|      null|
| Bob|2018-03-03|  Brushes|   30|2018-02-18|
| Bob|2018-07-12|   Bucket|    5|2018-03-03|
| Bob|2018-09-26|Sandpaper|   10|2018-07-12|
| Bob|2018-12-09|   Vacuum|   40|2018-09-26|
+----+----------+---------+-----+----------+

+----+----------+---------+-----+----------+---------------------------+
|name|      date|  product|price|  lag_date|days_from_previous_purchase|
+----+----------+---------+-----+----------+---------------------------+
|Alex|2018-02-1

In [15]:
# You can do it in one line by putting lag function as parameter in datediff
df_final=df.withColumn('days_from_last_purchase', datediff('date', lag('date', 1).over(win_name.orderBy(col('date'))))) \
        .withColumn('days_before_next_purchase', datediff(lead('date', 1).over(win_name.orderBy(col('date'))), 'date'))
df_final.show()

+----+----------+---------+-----+-----------------------+-------------------------+
|name|      date|  product|price|days_from_last_purchase|days_before_next_purchase|
+----+----------+---------+-----+-----------------------+-------------------------+
|Alex|2018-02-18|   Gloves|    5|                   null|                       13|
|Alex|2018-03-03|  Brushes|   30|                     13|                       30|
|Alex|2018-04-02|   Ladder|   20|                     30|                       81|
|Alex|2018-06-22|    Stool|   20|                     81|                       20|
|Alex|2018-07-12|   Bucket|    5|                     20|                       76|
|Alex|2018-09-26|Sandpaper|   10|                     76|                       14|
|Alex|2018-10-10|    Paint|   80|                     14|                       60|
|Alex|2018-12-09|   Vacuum|   40|                     60|                     null|
| Bob|2018-02-18|   Gloves|    5|                   null|                   

## 1.5 Aggregation function example 

Show aggregation functions on ordered frame and basic partitionBy frame
- avg/mean
- min
- max
- sum

In df1, we use a partition window specification, so the result is the same for all rows that are in the same partition.

In df2, we use an ordered window specification, the result is different for each rows. 

In [6]:


# We apply the aggregation function on the window specification that only has partition, so the 
# result is the same for all rows that are in the same 
df1 = df.withColumn("avg", avg(col("price")).over(win_name)) \
        .withColumn("sum", sum(col("price")).over(win_name)) \
        .withColumn("min", min(col("price")).over(win_name)) \
        .withColumn("max", max(col("price")).over(win_name)) \
        .withColumn("item_number", count("*").over(win_name)) \
        .withColumn("item_list", collect_list(col("product")).over(win_name))
   
df1.show(truncate=False)

# if we apply aggregation function on a windows spec with order, you will get a cumulative result for each rows
df2 = df.withColumn('avg_to_date', round(avg('price').over(win_name_ordered_by_date), 2)) \
        .withColumn('sum_to_date', sum('price').over(win_name_ordered_by_date)) \
        .withColumn('max_to_date', max('price').over(win_name_ordered_by_date)) \
        .withColumn('min_to_date', max('price').over(win_name_ordered_by_date)) \
        .withColumn('item_number_to_date', count('*').over(win_name_ordered_by_date)) \
        .withColumn("item_list_to_date", collect_list(col("product")).over(win_name_ordered_by_date))

   
df2.show(truncate=False)

                                                                                

+----+----------+---------+-----+-----+---+---+---+-----------+------------------------------------------------------------------+
|name|date      |product  |price|avg  |sum|min|max|item_number|item_list                                                         |
+----+----------+---------+-----+-----+---+---+---+-----------+------------------------------------------------------------------+
|Alex|2018-10-10|Paint    |80   |26.25|210|5  |80 |8          |[Paint, Ladder, Stool, Vacuum, Bucket, Gloves, Brushes, Sandpaper]|
|Alex|2018-04-02|Ladder   |20   |26.25|210|5  |80 |8          |[Paint, Ladder, Stool, Vacuum, Bucket, Gloves, Brushes, Sandpaper]|
|Alex|2018-06-22|Stool    |20   |26.25|210|5  |80 |8          |[Paint, Ladder, Stool, Vacuum, Bucket, Gloves, Brushes, Sandpaper]|
|Alex|2018-12-09|Vacuum   |40   |26.25|210|5  |80 |8          |[Paint, Ladder, Stool, Vacuum, Bucket, Gloves, Brushes, Sandpaper]|
|Alex|2018-07-12|Bucket   |5    |26.25|210|5  |80 |8          |[Paint, Ladder, Stoo



+----+----------+---------+-----+-----------+-----------+-----------+-----------+-------------------+------------------------------------------------------------------+
|name|date      |product  |price|avg_to_date|sum_to_date|max_to_date|min_to_date|item_number_to_date|item_list_to_date                                                 |
+----+----------+---------+-----+-----------+-----------+-----------+-----------+-------------------+------------------------------------------------------------------+
|Alex|2018-02-18|Gloves   |5    |5.0        |5          |5          |5          |1                  |[Gloves]                                                          |
|Alex|2018-03-03|Brushes  |30   |17.5       |35         |30         |30         |2                  |[Gloves, Brushes]                                                 |
|Alex|2018-04-02|Ladder   |20   |18.33      |55         |30         |30         |3                  |[Gloves, Brushes, Ladder]                             

## 1.6 Range window specifications example

Range window specification will create a sub window inside the main window (created by Window.partitionBy). **Only aggregation functions can apply over range window. Rank or analytic function will raise errors**. To build range window specifications, we need to use the two following functions 
- rowsBetween(start:Long,end:Long)-> WindowSpec : Here start, end are the index of rows relative to current rows, -1 means 1 row before current row, 1 mean 1 row after current row
- rangeBetween(start:Long, end:Long)-> WindowSpec : The start, end boundary in rangeBetween is based on row value relative to currentRow. The value definition of the constant values used in range functions:
   - Window.currentRow = 0
   - Window.unboundedPreceding = Long.MinValue
   - Window.unboundedFollowing = Long.MaxValue

The (start, end) index are all inclusive. Their value can be 
- Window.unboundedPreceding
- Window.unboundedFollowing
- Window.currentRow. 
- Or a value relative to Window.currentRow, either negative or positive.

Some examples of rowsBetween:
- rowsBetween(Window.currentRow, 2): From current row to the next 2 rows 
- rowsBetween(-3, Window.currentRow): From the previous 3 rows to the current row. 
- rowsBetween(-1, 2): Frame contains previous row, current row and the next 2 rows 
- rowsBetween(Window.currentRow, Window.unboundedFollowing): From current row to all next rows 
- rowsBetween(Window.unboundedPreceding, Window.currentRow): From all previous rows to the current row. 
- rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing): all rows in the window. 

In [7]:
# we have 86400 seconds in a day
def day_to_seconds(day_num: int):
    return day_num * 86400

### 1.6.1  rowsBetween example

rowsBetween uses current row as base index (i.e. 0), and offset to specify start or end.
- -1: one row before current row
- 0: current row
- 1: one row after current row

In [15]:
# last 2 row(current and the row before it) range window specification
# Below range window takes the current row and the row before it. So it's the last 2. 
last2 = win_name_ordered.rowsBetween(-1, Window.currentRow)
df.withColumn("max_of_last2", max("price").over(last2)).show(truncate=False)

# Below range window takes the row before, current row, and the row after it
privious_to_next = win_name_ordered.rowsBetween(-1,1)
df.withColumn("max_from_previous_to_next", max("price").over(privious_to_next)).show(truncate=False)

# max of all following row
# Below range takes the current row and all the row behind it that are in the same partition.
following = win_name_ordered.rowsBetween(Window.currentRow, Window.unboundedFollowing),
df.withColumn("max_of_following", max("price").over(following)).show(truncate=False)

+----+----------+---------+-----+------------+
|name|date      |product  |price|max_of_last2|
+----+----------+---------+-----+------------+
|Alex|2018-07-12|Bucket   |5    |5           |
|Alex|2018-02-18|Gloves   |5    |5           |
|Alex|2018-09-26|Sandpaper|10   |10          |
|Alex|2018-04-02|Ladder   |20   |20          |
|Alex|2018-06-22|Stool    |20   |20          |
|Alex|2018-03-03|Brushes  |30   |30          |
|Alex|2018-12-09|Vacuum   |40   |40          |
|Alex|2018-10-10|Paint    |80   |80          |
|Bob |2018-07-12|Bucket   |5    |5           |
|Bob |2018-02-18|Gloves   |5    |5           |
|Bob |2018-09-26|Sandpaper|10   |10          |
|Bob |2018-03-03|Brushes  |30   |30          |
|Bob |2018-12-09|Vacuum   |40   |40          |
+----+----------+---------+-----+------------+

+----+----------+---------+-----+-------------------------+
|name|date      |product  |price|max_from_previous_to_next|
+----+----------+---------+-----+-------------------------+
|Alex|2018-07-12|Buc

### 1.6.2  rangeBetween example

rowsBetween uses current **row value** as base index (i.e. 0), and offset to specify start or end.
- -666: all the row that contains value is < current row value and > current_row_value - 666 
- 0: current row
- 666: all the row that contains value is > current row value and < current_row_value + 666

In [16]:
# convert string date column to unix timestamp

df1 = df.withColumn("unix_date", unix_timestamp("date", "yyyy-MM-dd"))
    
df1.show(5, truncate=False)

+----+----------+-------+-----+----------+
|name|date      |product|price|unix_date |
+----+----------+-------+-----+----------+
|Alex|2018-10-10|Paint  |80   |1539122400|
|Alex|2018-04-02|Ladder |20   |1522620000|
|Alex|2018-06-22|Stool  |20   |1529618400|
|Alex|2018-12-09|Vacuum |40   |1544310000|
|Alex|2018-07-12|Bucket |5    |1531346400|
+----+----------+-------+-----+----------+
only showing top 5 rows



### Quiz2, how to get the avg  price of sold proucts of the last 30 days

In [17]:
# We create a specific range window, the end is the date of current row, the start is the date 30 days 
# before current row
# Here 0 is the relative unix_date of current row, the frame boundary of rangeBetween(-day_to_seconds(30), 0)
# For example, row "Alex|2018-02-18|Gloves |5 |1518908400|" will be (1518908400-(30*86400),1518908400). All rows that
# have unix_date column value in this frame boundary will be included in the frame.
range_30 = win_name.orderBy(col("unix_date")).rangeBetween(-day_to_seconds(30), 0)
df2 = df1.withColumn("30day_moving_avg", avg("price").over(range_30))
print("Exp4 create a column that shows last 30 day avg before current row date")
df2.show(10, truncate=False)

Exp4 create a column that shows last 30 day avg before current row date
+----+----------+---------+-----+----------+----------------+
|name|date      |product  |price|unix_date |30day_moving_avg|
+----+----------+---------+-----+----------+----------------+
|Alex|2018-02-18|Gloves   |5    |1518908400|5.0             |
|Alex|2018-03-03|Brushes  |30   |1520031600|17.5            |
|Alex|2018-04-02|Ladder   |20   |1522620000|25.0            |
|Alex|2018-06-22|Stool    |20   |1529618400|20.0            |
|Alex|2018-07-12|Bucket   |5    |1531346400|12.5            |
|Alex|2018-09-26|Sandpaper|10   |1537912800|10.0            |
|Alex|2018-10-10|Paint    |80   |1539122400|45.0            |
|Alex|2018-12-09|Vacuum   |40   |1544310000|40.0            |
|Bob |2018-02-18|Gloves   |5    |1518908400|5.0             |
|Bob |2018-03-03|Brushes  |30   |1520031600|17.5            |
+----+----------+---------+-----+----------+----------------+
only showing top 10 rows



### Quiz3, how to get the avg of 30 day before and 15 days after the current row date

In [18]:
# Note that stddev of some row will return null. Because it requires at least two
# observations to calculate standard deviation.
range_45 = win_name.orderBy("unix_date").rangeBetween(-day_to_seconds(30), day_to_seconds(15))
df3 = df1.withColumn("45day_moving_avg", avg("price").over(range_45)) \
        .withColumn("45day_moving_std", stddev("price").over(range_45))
df3.show(10, truncate=False)

+----+----------+---------+-----+----------+----------------+------------------+
|name|date      |product  |price|unix_date |45day_moving_avg|45day_moving_std  |
+----+----------+---------+-----+----------+----------------+------------------+
|Alex|2018-02-18|Gloves   |5    |1518908400|17.5            |17.67766952966369 |
|Alex|2018-03-03|Brushes  |30   |1520031600|17.5            |17.67766952966369 |
|Alex|2018-04-02|Ladder   |20   |1522620000|25.0            |7.0710678118654755|
|Alex|2018-06-22|Stool    |20   |1529618400|20.0            |null              |
|Alex|2018-07-12|Bucket   |5    |1531346400|12.5            |10.606601717798213|
|Alex|2018-09-26|Sandpaper|10   |1537912800|45.0            |49.49747468305833 |
|Alex|2018-10-10|Paint    |80   |1539122400|45.0            |49.49747468305833 |
|Alex|2018-12-09|Vacuum   |40   |1544310000|40.0            |null              |
|Bob |2018-02-18|Gloves   |5    |1518908400|17.5            |17.67766952966369 |
|Bob |2018-03-03|Brushes  |3

#### Quiz4, how to get the median of a window

mean(avg) and median are commonly used in statistics. 

- mean is cheap to calculate, but outliers can have large effect. For example, the income of population, if we have 9 people has 10 dollar, and 1 person has 1010 dollar. The mean is 1100/10= 110. It does not represent any group's income. 
- Median is expansive to calculate. But in certain cases median are more robust comparing to mean, since it will filter out outlier values. If we retake the previous example, the median will be 10 dollar, which represent a group's income

In [20]:
win1=Window.partitionBy('name')
win2=Window.partitionBy('name').orderBy('price')


# In this example, we calculate a rolling median, because for each row, the price_list grows a little bit.
df1=df.withColumn("price_list",collect_list('price').over(win2)) \
      .withColumn("rolling_median",element_at("price_list",(size("price_list")/2+1).cast("int")))

df1.show(truncate=False)

+----+----------+---------+-----+------------------------------+--------------+
|name|date      |product  |price|price_list                    |rolling_median|
+----+----------+---------+-----+------------------------------+--------------+
|Alex|2018-07-12|Bucket   |5    |[5, 5]                        |5             |
|Alex|2018-02-18|Gloves   |5    |[5, 5]                        |5             |
|Alex|2018-09-26|Sandpaper|10   |[5, 5, 10]                    |5             |
|Alex|2018-04-02|Ladder   |20   |[5, 5, 10, 20, 20]            |10            |
|Alex|2018-06-22|Stool    |20   |[5, 5, 10, 20, 20]            |10            |
|Alex|2018-03-03|Brushes  |30   |[5, 5, 10, 20, 20, 30]        |20            |
|Alex|2018-12-09|Vacuum   |40   |[5, 5, 10, 20, 20, 30, 40]    |20            |
|Alex|2018-10-10|Paint    |80   |[5, 5, 10, 20, 20, 30, 40, 80]|20            |
|Bob |2018-07-12|Bucket   |5    |[5, 5]                        |5             |
|Bob |2018-02-18|Gloves   |5    |[5, 5] 

In [22]:
# In this example, we calculate a global median for each window
# note, as the window specification only has partition, does not have order, the collect list need to be sorted

df2=df.withColumn("price_list",sort_array(collect_list('price').over(win1))) \
      .withColumn("global_median",element_at("price_list",(size("price_list")/2+1).cast("int")))
  
df2.show()

+----+----------+---------+-----+--------------------+-------------+
|name|      date|  product|price|          price_list|global_median|
+----+----------+---------+-----+--------------------+-------------+
|Alex|2018-10-10|    Paint|   80|[5, 5, 10, 20, 20...|           20|
|Alex|2018-04-02|   Ladder|   20|[5, 5, 10, 20, 20...|           20|
|Alex|2018-06-22|    Stool|   20|[5, 5, 10, 20, 20...|           20|
|Alex|2018-12-09|   Vacuum|   40|[5, 5, 10, 20, 20...|           20|
|Alex|2018-07-12|   Bucket|    5|[5, 5, 10, 20, 20...|           20|
|Alex|2018-02-18|   Gloves|    5|[5, 5, 10, 20, 20...|           20|
|Alex|2018-03-03|  Brushes|   30|[5, 5, 10, 20, 20...|           20|
|Alex|2018-09-26|Sandpaper|   10|[5, 5, 10, 20, 20...|           20|
| Bob|2018-12-09|   Vacuum|   40|  [5, 5, 10, 30, 40]|           10|
| Bob|2018-07-12|   Bucket|    5|  [5, 5, 10, 30, 40]|           10|
| Bob|2018-02-18|   Gloves|    5|  [5, 5, 10, 30, 40]|           10|
| Bob|2018-03-03|  Brushes|   30| 

In [32]:
# We can also use groupBy

df3=df.groupBy("name").agg(sort_array(collect_list("price")).alias("price_list"))
df3.show()

df4=df3.select("name","price_list",(element_at("price_list", (size("price_list")/2+1).cast("int")).alias("median")))
df4.show()

+----+--------------------+
|name|          price_list|
+----+--------------------+
|Alex|[5, 5, 10, 20, 20...|
| Bob|  [5, 5, 10, 30, 40]|
+----+--------------------+

+----+--------------------+------+
|name|          price_list|median|
+----+--------------------+------+
|Alex|[5, 5, 10, 20, 20...|    20|
| Bob|  [5, 5, 10, 30, 40]|    10|
+----+--------------------+------+



In [33]:
# The pyspark.sql.functions.broadcast(df) marks a DataFrame as small enough for use in broadcast joins.
df.join(broadcast(df4), "name", "inner").show(truncate=False)

+----+----------+---------+-----+------------------------------+------+
|name|date      |product  |price|price_list                    |median|
+----+----------+---------+-----+------------------------------+------+
|Alex|2018-10-10|Paint    |80   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-04-02|Ladder   |20   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-06-22|Stool    |20   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-12-09|Vacuum   |40   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-07-12|Bucket   |5    |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-02-18|Gloves   |5    |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-03-03|Brushes  |30   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Alex|2018-09-26|Sandpaper|10   |[5, 5, 10, 20, 20, 30, 40, 80]|20    |
|Bob |2018-12-09|Vacuum   |40   |[5, 5, 10, 30, 40]            |10    |
|Bob |2018-07-12|Bucket   |5    |[5, 5, 10, 30, 40]            |10    |
|Bob |2018-02-18|Gloves   |5    |[5, 5, 10, 30, 40]            |