# Window Functions

* Window functions operate on a set of rows and return a single aggregated value for each row. The term Window describes the set of rows in the database on which the function will operate.

* We define the Window (set of rows on which functions operates) using an OVER() clause

### Types of Window functions

* Aggregate Window Functions SUM(), MAX(), MIN(), AVG(). COUNT()
* Ranking Window Functions RANK(), DENSE_RANK(), ROW_NUMBER(), NTILE()
* Value Window Functions LAG(), LEAD(), FIRST_VALUE(), LAST_VALUE()

#### OVER
* Specifies the window clauses for aggregate functions.

#### PARTITION BY partition_list
* 

### Useful links

[Good Example in sql Server](https://www.sqlshack.com/use-window-functions-sql-server/)

[Interview window functions](https://towardsdatascience.com/sql-window-function-demonstrated-with-real-interview-questions-from-leetcode-e83e28edaabc)



In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array, when, array_remove, lit, expr,to_date

In [38]:
spark = SparkSession.builder.appName("Python Spark SQL Window Example").getOrCreate()

In [41]:
spark.version

'3.0.1'

### Create Spark Sample Dataframe

In [64]:
data = [('1001','2017-04-01','David Smith','GuildFord',10000),
('1002','2017-04-02','David Jones','Arlington',20000),
('1003','2017-04-03','John Smith','Shalford',5000),
('1004','2017-04-04','Michael Smith','GuildFord',15000),
('1005','2017-04-05','David Williams','Shalford',7000),
('1006','2017-04-06','Paum Smith','GuildFord',25000),
('1007','2017-04-10','Andrew Smith','Arlington',15000),
('1008','2017-04-11','David Brown','Arlington',2000),
('1009','2017-04-20','Robert Smith','Shalford',1000),
('1010','2017-04-25','Peter Smith','GuildFord',500)]

schema = "order_id STRING, order_date STRING, customer_name STRING, city STRING, order_amount INT"

ordersDF = spark.createDataFrame(data, schema)
ordersDF = ordersDF.withColumn("order_date", to_date(col("order_date"), 'yyyy-mm-dd').cast("DATE"))
ordersDF.createOrReplaceTempView("Orders")
sumDF = spark.sql("select city, sum(order_amount) total_order_amount from Orders group by City")
sumDF.show(10)



# SELECT city, SUM(order_amount) total_order_amount FROM [dbo].[Orders] GROUP BY city



+---------+------------------+
|     city|total_order_amount|
+---------+------------------+
|GuildFord|             50500|
|Arlington|             37000|
| Shalford|             13000|
+---------+------------------+



In [74]:
sumDF = spark.sql("select order_id, order_date, customer_name, city, order_amount, sum(order_amount) over (partition by city) as grand_total from Orders")
sumDF.show(10)

avgDF = spark.sql("select order_id, order_date, customer_name, city, order_amount, sum(order_amount) over (partition by city, month(order_date)) as avg from Orders")
avgDF.show(10)

# Can calculate
# max(order_amount)
# min(order_amount)
# count(order_amount)

+--------+----------+--------------+---------+------------+-----------+
|order_id|order_date| customer_name|     city|order_amount|grand_total|
+--------+----------+--------------+---------+------------+-----------+
|    1001|2017-01-01|   David Smith|GuildFord|       10000|      50500|
|    1004|2017-01-04| Michael Smith|GuildFord|       15000|      50500|
|    1006|2017-01-06|    Paum Smith|GuildFord|       25000|      50500|
|    1010|2017-01-25|   Peter Smith|GuildFord|         500|      50500|
|    1002|2017-01-02|   David Jones|Arlington|       20000|      37000|
|    1007|2017-01-10|  Andrew Smith|Arlington|       15000|      37000|
|    1008|2017-01-11|   David Brown|Arlington|        2000|      37000|
|    1003|2017-01-03|    John Smith| Shalford|        5000|      13000|
|    1005|2017-01-05|David Williams| Shalford|        7000|      13000|
|    1009|2017-01-20|  Robert Smith| Shalford|        1000|      13000|
+--------+----------+--------------+---------+------------+-----

In [25]:
data = [("Lisa", "Sales", 10000, 35),("Evan", "Sales", 32000, 38),
        ("Fred", "Engineering", 21000, 28),
        ("Alex", "Sales", 30000, 33),
        ("Tom", "Engineering", 23000, 33),
        ("Jane", "Marketing", 29000, 28),
        ("Jeff", "Marketing", 35000, 38),
        ("Paul", "Engineering", 29000, 23),
        ("Chloe", "Engineering", 23000, 25)]

df = spark.createDataFrame(data, "name STRING, dept STRING, salary INT, age INT")
df.printSchema()
df.show(10)
df.createOrReplaceTempView("employees")

root
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)

+-----+-----------+------+---+
| name|       dept|salary|age|
+-----+-----------+------+---+
| Lisa|      Sales| 10000| 35|
| Evan|      Sales| 32000| 38|
| Fred|Engineering| 21000| 28|
| Alex|      Sales| 30000| 33|
|  Tom|Engineering| 23000| 33|
| Jane|  Marketing| 29000| 28|
| Jeff|  Marketing| 35000| 38|
| Paul|Engineering| 29000| 23|
|Chloe|Engineering| 23000| 25|
+-----+-----------+------+---+



### RANK()

* The RANK() function is used to give a unique rank to each record based on a specified value, for example salary, order amount etc.

* If two records have the same value then the RANK() function will assign the same rank to both records by skipping the next rank. This means – if there are two identical values at rank 2, it will assign the same rank 2 to both records and then skip rank 3 and assign rank 4 to the next record.

In [32]:
df2_rank = spark.sql("SELECT name, dept,salary, RANK() OVER (PARTITION BY dept ORDER BY salary desc) AS rank FROM employees")
df2_rank.show(10)

+-----+-----------+------+----+
| name|       dept|salary|rank|
+-----+-----------+------+----+
| Evan|      Sales| 32000|   1|
| Alex|      Sales| 30000|   2|
| Lisa|      Sales| 10000|   3|
| Paul|Engineering| 29000|   1|
|  Tom|Engineering| 23000|   2|
|Chloe|Engineering| 23000|   2|
| Fred|Engineering| 21000|   4|
| Jeff|  Marketing| 35000|   1|
| Jane|  Marketing| 29000|   2|
+-----+-----------+------+----+



From the above image, you can see that the same rank (3) is assigned to two identical records (each having an order amount of 15,000) and it then skips the next rank (4) and assign rank 5 to next record.

### DENSE_RANK()

The DENSE_RANK() function is identical to the RANK() function except that it does not skip any rank. This means that if two identical records are found then DENSE_RANK() will assign the same rank to both records but not skip then skip the next rank.

Let’s see how this works in practice.

In [35]:
df3_dense_rank = spark.sql("SELECT name, dept,salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary desc ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees")
df3_dense_rank.show(10)

+-----+-----------+------+----------+
| name|       dept|salary|dense_rank|
+-----+-----------+------+----------+
| Evan|      Sales| 32000|         1|
| Alex|      Sales| 30000|         2|
| Lisa|      Sales| 10000|         3|
| Paul|Engineering| 29000|         1|
|  Tom|Engineering| 23000|         2|
|Chloe|Engineering| 23000|         2|
| Fred|Engineering| 21000|         3|
| Jeff|  Marketing| 35000|         1|
| Jane|  Marketing| 29000|         2|
+-----+-----------+------+----------+



### ROW_NUMBER()
#### ROW_ NUMBER() without PARTITION BY

In [79]:
rownumDF = spark.sql("SELECT order_id,order_date,customer_name,city, order_amount, ROW_NUMBER() OVER(ORDER BY order_id) row_number FROM Orders")
rownumDF.show(10)

+--------+----------+--------------+---------+------------+----------+
|order_id|order_date| customer_name|     city|order_amount|row_number|
+--------+----------+--------------+---------+------------+----------+
|    1001|2017-01-01|   David Smith|GuildFord|       10000|         1|
|    1002|2017-01-02|   David Jones|Arlington|       20000|         2|
|    1003|2017-01-03|    John Smith| Shalford|        5000|         3|
|    1004|2017-01-04| Michael Smith|GuildFord|       15000|         4|
|    1005|2017-01-05|David Williams| Shalford|        7000|         5|
|    1006|2017-01-06|    Paum Smith|GuildFord|       25000|         6|
|    1007|2017-01-10|  Andrew Smith|Arlington|       15000|         7|
|    1008|2017-01-11|   David Brown|Arlington|        2000|         8|
|    1009|2017-01-20|  Robert Smith| Shalford|        1000|         9|
|    1010|2017-01-25|   Peter Smith|GuildFord|         500|        10|
+--------+----------+--------------+---------+------------+----------+



 ## ROW_NUMBER() with PARTITION BY



In [83]:
rnPar = spark.sql("SELECT order_id,order_date,customer_name,city, order_amount, ROW_NUMBER() OVER(PARTITION BY city ORDER BY order_amount DESC) as row_number FROM Orders")
rnPar.show(10)

+--------+----------+--------------+---------+------------+----------+
|order_id|order_date| customer_name|     city|order_amount|row_number|
+--------+----------+--------------+---------+------------+----------+
|    1006|2017-01-06|    Paum Smith|GuildFord|       25000|         1|
|    1004|2017-01-04| Michael Smith|GuildFord|       15000|         2|
|    1001|2017-01-01|   David Smith|GuildFord|       10000|         3|
|    1010|2017-01-25|   Peter Smith|GuildFord|         500|         4|
|    1002|2017-01-02|   David Jones|Arlington|       20000|         1|
|    1007|2017-01-10|  Andrew Smith|Arlington|       15000|         2|
|    1008|2017-01-11|   David Brown|Arlington|        2000|         3|
|    1005|2017-01-05|David Williams| Shalford|        7000|         1|
|    1003|2017-01-03|    John Smith| Shalford|        5000|         2|
|    1009|2017-01-20|  Robert Smith| Shalford|        1000|         3|
+--------+----------+--------------+---------+------------+----------+



## NTILE()

* NTILE() is a very helpful window function. It helps you to identify what percentile (or quartile, or any other subdivision) a given row falls into.

* This means that if you have 100 rows and you want to create 4 quartiles based on a specified value field you can do so easily and see how many rows fall into each quartile.

* Let’s see an example. In the query below, we have specified that we want to create four quartiles based on order amount. We then want to see how many orders fall into each quartile.

In [85]:
ntilePar = spark.sql("SELECT order_id,order_date,customer_name,city, order_amount, NTILE(4) OVER(ORDER BY order_amount) as row_number from Orders")
ntilePar.show(10)

+--------+----------+--------------+---------+------------+----------+
|order_id|order_date| customer_name|     city|order_amount|row_number|
+--------+----------+--------------+---------+------------+----------+
|    1010|2017-01-25|   Peter Smith|GuildFord|         500|         1|
|    1009|2017-01-20|  Robert Smith| Shalford|        1000|         1|
|    1008|2017-01-11|   David Brown|Arlington|        2000|         1|
|    1003|2017-01-03|    John Smith| Shalford|        5000|         2|
|    1005|2017-01-05|David Williams| Shalford|        7000|         2|
|    1001|2017-01-01|   David Smith|GuildFord|       10000|         2|
|    1004|2017-01-04| Michael Smith|GuildFord|       15000|         3|
|    1007|2017-01-10|  Andrew Smith|Arlington|       15000|         3|
|    1002|2017-01-02|   David Jones|Arlington|       20000|         4|
|    1006|2017-01-06|    Paum Smith|GuildFord|       25000|         4|
+--------+----------+--------------+---------+------------+----------+



* NTILE creates tiles based on following formula:

* No of rows in each tile = number of rows in result set / number of tiles specified

* Here is our example, we have total 10 rows and 4 tiles are specified in the query so number of rows in each tile will be 2.5 (10/4). As number of rows should be whole number, not a decimal. SQL engine will assign 3 rows for first two groups and 2 rows for remaining two groups.

[Datascience](https://towardsdatascience.com/sql-window-function-demonstrated-with-real-interview-questions-from-leetcode-e83e28edaabc)