# Window functions with Spark

## What are SQL Window Functions?
Window functions operate on a set of rows and return a single value for each row from the underlying query. The term window describes the set of rows on which the function operates. A window function uses values from the rows in a window to calculate the returned values.


## Elements of Window functions

* Partitioning
* Ordering
* Framing

## SQL Expression

```sql
 window_function (expression) 
 OVER (
   [ PARTITION BY expr_list ]
   [ ORDER BY order_list ]
   [ frame_clause ] 
 )
```
where function is one of the functions described, such as AVG(), and expr_list is:
```
expression | column_name [, expr_list ]
```
and order_list is:

```
expression | column_name [ASC | DESC] [ NULLS { FIRST | LAST } ] [, order_list ]
```
and the optional frame_clause is one of the following frames:
```
{ RANGE | ROWS } frame_start
{ RANGE | ROWS } BETWEEN frame_start AND frame_end  
```
where frame_start is one of the following choices:
```
UNBOUNDED PRECEDING  
CURRENT ROW  
```
and frame_end is one of the following choices:
```
CURRENT ROW  
UNBOUNDED FOLLOWING  
```

## How window functions are classified?
* __Ranking__

    All of the ranking functions depend on the sort ordering specified by the **ORDER BY** clause of the associated window definition. Rows that are not distinct in the ordering are called ___peers___. The ranking functions are defined so that they give the same answer for any two peer rows.
    * Row_number()
    * rank()
    * dense_rank()
    * percent_rank()
    * ntile
    
* __Analytics(aka Value functions)__
    * cume_dist
    * first_value
    * last_value
    * lag
    * lead
    
* __aggregation__
    * avg()
    * sum()
    * ... 

## Preparing spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [2]:
df = spark.createDataFrame(
    [
        ("Customer0", "2019-04-01", 60),
        ("Customer1", "2019-04-01", 30), 
        ("Customer2", "2019-04-01", 40),
        ("Customer3", "2019-04-01", 40),
        ("Customer4", "2019-04-01", 20),
        ("Customer5", "2019-04-02", 20),
        ("Customer6", "2019-04-02", 10),
        ("Customer7", "2019-04-02", 60)
    ], 
    ["customer_name", "date", "amount"]
)
df.createOrReplaceTempView("customers")

## Row number

The ROW_NUMBER window function determines the ordinal number of the current row within its partition. The ORDER BY expression in the OVER clause determines the number. Each value is ordered within its partition. Rows with equal values for the ORDER BY expressions receive different row numbers nondeterministically.


### 1. Spark function

In [3]:
from pyspark.sql.functions import row_number, desc
from pyspark.sql import Window

# Window function to partition data by CustomerName
windowF = Window.partitionBy("date").orderBy(desc("amount"))

#Apply window functions to Df
result_row_number = df.withColumn("row", row_number().over(windowF))

#Result
result_row_number.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  3|
|    Customer1|2019-04-01|    30|  4|
|    Customer4|2019-04-01|    20|  5|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



### 2. SQL expression

In [4]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
row_number() over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  3|
|    Customer1|2019-04-01|    30|  4|
|    Customer4|2019-04-01|    20|  5|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



## Rank()

The RANK window function determines the rank of a value in a group of values. The ORDER BY expression in the OVER clause determines the value. Each value is ranked within its partition. Rows with equal values for the ranking criteria receive the same rank.


### 1. Spark function

In [5]:
from pyspark.sql.functions import rank, desc
from pyspark.sql import Window

# Window function to partition
windowF = Window.partitionBy("date").orderBy(desc("amount"))
#Apply window functions to Df
result_rank = df.withColumn("row", rank().over(windowF))
#Result
result_rank.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  2|
|    Customer1|2019-04-01|    30|  4|
|    Customer4|2019-04-01|    20|  5|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



### 2. SQL expression

In [6]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
rank() over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  2|
|    Customer1|2019-04-01|    30|  4|
|    Customer4|2019-04-01|    20|  5|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



## Dense_rank()

The DENSE_RANK () window function determines the rank of a value in a group of values based on the ORDER BY expression and the OVER clause. Each value is ranked within its partition. Rows with equal values receive the same rank. There are no gaps in the sequence of ranked values if two or more rows have the same rank.


### 1. Spark function

In [7]:
from pyspark.sql.functions import dense_rank, desc
from pyspark.sql import Window

# Window function to partition
windowF = Window.partitionBy("date").orderBy(desc("amount"))
#Apply window functions to Df
result_dense_rank = df.withColumn("row", dense_rank().over(windowF))
#Result
result_dense_rank.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  2|
|    Customer1|2019-04-01|    30|  3|
|    Customer4|2019-04-01|    20|  4|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



### 2. SQL expression

In [8]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
dense_rank() over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60|  1|
|    Customer2|2019-04-01|    40|  2|
|    Customer3|2019-04-01|    40|  2|
|    Customer1|2019-04-01|    30|  3|
|    Customer4|2019-04-01|    20|  4|
|    Customer7|2019-04-02|    60|  1|
|    Customer5|2019-04-02|    20|  2|
|    Customer6|2019-04-02|    10|  3|
+-------------+----------+------+---+



## percent_rank()

The PERCENT_RANK () window function calculates the percent rank of the current row using the following formula: (x - 1) / (number of rows in window partition - 1) where x is the rank of the current row.


### 1. Spark function

In [9]:
from pyspark.sql.functions import percent_rank, desc
from pyspark.sql import Window

# Window function to partition
windowF = Window.partitionBy("date").orderBy(desc("amount"))
#Apply window functions to Df
result_percent_rank = df.withColumn("row", percent_rank().over(windowF))
#Result
result_percent_rank.show()

+-------------+----------+------+----+
|customer_name|      date|amount| row|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60| 0.0|
|    Customer2|2019-04-01|    40|0.25|
|    Customer3|2019-04-01|    40|0.25|
|    Customer1|2019-04-01|    30|0.75|
|    Customer4|2019-04-01|    20| 1.0|
|    Customer7|2019-04-02|    60| 0.0|
|    Customer5|2019-04-02|    20| 0.5|
|    Customer6|2019-04-02|    10| 1.0|
+-------------+----------+------+----+



### 2. SQL expression

In [10]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
percent_rank() over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+----+
|customer_name|      date|amount| row|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60| 0.0|
|    Customer2|2019-04-01|    40|0.25|
|    Customer3|2019-04-01|    40|0.25|
|    Customer1|2019-04-01|    30|0.75|
|    Customer4|2019-04-01|    20| 1.0|
|    Customer7|2019-04-02|    60| 0.0|
|    Customer5|2019-04-02|    20| 0.5|
|    Customer6|2019-04-02|    10| 1.0|
+-------------+----------+------+----+



## Exploring Frames

In addition to the ordering and partitioning, users need to define:

- Start boundary of the frame
- End boundary of the frame, 
- Type of the frame

There are five types of boundaries:

- UNBOUNDED PRECEDING
- UNBOUNDED FOLLOWING
- CURRENT ROW 
- *value* PRECEDING 
- *value* FOLLOWING

UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING represent the first row of the partition and the last row of the partition, respectively.  

For the other three types of boundaries, they specify the offset from the position of the current input row and their specific meanings are defined based on the type of the frame. 
    
There are two types of frames:

- ROW frame 
- RANGE frame
    
to define a window specification, users can use the following syntax in SQL.
```sql
OVER (PARTITION BY ... ORDER BY ... [frame_type] BETWEEN start AND end)

```
Here, frame_type can be either ROWS (for ROW frame) or RANGE (for RANGE frame); start can be any of UNBOUNDED PRECEDING, CURRENT ROW, <value> PRECEDING, and <value> FOLLOWING; and end can be any of UNBOUNDED FOLLOWING, CURRENT ROW, <value> PRECEDING, and <value> FOLLOWING.





## Lag

The LAG() window function returns the value for the row before the current row in a partition. If no row exists, null is returned.

### 1. Spark function

In [11]:
from pyspark.sql.functions import lag, desc
from pyspark.sql import Window

# Window function to partition
windowF = Window.partitionBy("date").orderBy(desc("amount"))
#Apply window functions to Df
result_lag = df.withColumn("lag", lag('amount',1).over(windowF))
#Result
result_lag.show()

+-------------+----------+------+----+
|customer_name|      date|amount| lag|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60|null|
|    Customer2|2019-04-01|    40|  60|
|    Customer3|2019-04-01|    40|  40|
|    Customer1|2019-04-01|    30|  40|
|    Customer4|2019-04-01|    20|  30|
|    Customer7|2019-04-02|    60|null|
|    Customer5|2019-04-02|    20|  60|
|    Customer6|2019-04-02|    10|  20|
+-------------+----------+------+----+



### 2. SQL expression

In [12]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
lag(amount, 1) over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+----+
|customer_name|      date|amount| row|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60|null|
|    Customer2|2019-04-01|    40|  60|
|    Customer3|2019-04-01|    40|  40|
|    Customer1|2019-04-01|    30|  40|
|    Customer4|2019-04-01|    20|  30|
|    Customer7|2019-04-02|    60|null|
|    Customer5|2019-04-02|    20|  60|
|    Customer6|2019-04-02|    10|  20|
+-------------+----------+------+----+



## Lead

The LEAD() window function returns the value for the row after the current row in a partition. If no row exists, null is returned.

### 1. Spark function

In [13]:
from pyspark.sql.functions import lead, desc
from pyspark.sql import Window

# Window function to partition
windowF = Window.partitionBy("date").orderBy(desc("amount"))
#Apply window functions to Df
result_lead = df.withColumn("lead", lead('amount',1).over(windowF))
#Result
result_lead.show()

+-------------+----------+------+----+
|customer_name|      date|amount|lead|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60|  40|
|    Customer2|2019-04-01|    40|  40|
|    Customer3|2019-04-01|    40|  30|
|    Customer1|2019-04-01|    30|  20|
|    Customer4|2019-04-01|    20|null|
|    Customer7|2019-04-02|    60|  20|
|    Customer5|2019-04-02|    20|  10|
|    Customer6|2019-04-02|    10|null|
+-------------+----------+------+----+



### 2. SQL expression

In [14]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
lead(amount,1) over (partition by date order by amount DESC) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+----+
|customer_name|      date|amount| row|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60|  40|
|    Customer2|2019-04-01|    40|  40|
|    Customer3|2019-04-01|    40|  30|
|    Customer1|2019-04-01|    30|  20|
|    Customer4|2019-04-01|    20|null|
|    Customer7|2019-04-02|    60|  20|
|    Customer5|2019-04-02|    20|  10|
|    Customer6|2019-04-02|    10|null|
+-------------+----------+------+----+



## Last

The LAST_VALUE window function returns the value of the specified expression with respect to the last row in the window frame.

### 1. Spark function

In [15]:
from pyspark.sql.functions import last,avg, desc
from pyspark.sql import Window

# Window function to partition
# You need to add explicitly the frame type to run correctly
windowF = Window \
.partitionBy("date") \
.orderBy(desc("amount")) \
.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
#Apply window functions to Df
result_last = df.withColumn("last", last('amount').over(windowF))
#Result
result_last.show()

+-------------+----------+------+----+
|customer_name|      date|amount|last|
+-------------+----------+------+----+
|    Customer0|2019-04-01|    60|  20|
|    Customer2|2019-04-01|    40|  20|
|    Customer3|2019-04-01|    40|  20|
|    Customer1|2019-04-01|    30|  20|
|    Customer4|2019-04-01|    20|  20|
|    Customer7|2019-04-02|    60|  10|
|    Customer5|2019-04-02|    20|  10|
|    Customer6|2019-04-02|    10|  10|
+-------------+----------+------+----+



### 2. SQL expression

In [16]:
#With SQL solution
sql_result = spark.sql("SELECT \
customer_name, \
date, \
amount, \
last(amount) \
over (\
partition by date \
order by amount desc \
rows between UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING) as `row` \
from customers")

sql_result.show()

+-------------+----------+------+---+
|customer_name|      date|amount|row|
+-------------+----------+------+---+
|    Customer0|2019-04-01|    60| 20|
|    Customer2|2019-04-01|    40| 20|
|    Customer3|2019-04-01|    40| 20|
|    Customer1|2019-04-01|    30| 20|
|    Customer4|2019-04-01|    20| 20|
|    Customer7|2019-04-02|    60| 10|
|    Customer5|2019-04-02|    20| 10|
|    Customer6|2019-04-02|    10| 10|
+-------------+----------+------+---+



## Doc reference:
- [Introduction to window function Spark SQL by Databricks](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html)
- [Apache Drill window function introduction](https://drill.apache.org/docs/sql-window-functions-introduction/)
- [Mastering Spark SQL by @jaceklaskowski](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html)