# Window Functions

The purpose of this notebook is to give you a basic understanding of how window functions work in Pyspark.

We will start by importing all of the Pyspark machinery and a function that generates a dummy table:

In [1]:
# Spark related machinery
import pyspark
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession, HiveContext, Window
from pyspark.sql.functions import concat_ws

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()

In [2]:
#Import function
from pyspark_functions import create_sp_table3

In the following block of code we create a table called **data** using the function ```create_sp_table3```. The table has 3 columns: 
* **id**: Numbers from 1 to 5
* **team**: Numbers from 1 to 3 
* **score**: Numbers from 0 to 100

Then, we will print it to screen sorted by the column **id**:

In [3]:
#Create table
data = create_sp_table3()

#print table dorted by id
data.sort("id").show()

+---+----+-----+
| id|team|score|
+---+----+-----+
|  1|   3|   44|
|  1|   3|   44|
|  1|   2|    8|
|  1|   1|   54|
|  2|   3|   41|
|  2|   1|   67|
|  2|   1|   52|
|  3|   3|   71|
|  3|   3|   38|
|  3|   3|    7|
|  4|   2|   69|
|  4|   1|   61|
|  4|   3|   58|
|  4|   3|   45|
|  4|   1|   91|
|  5|   1|   11|
|  5|   2|   54|
|  5|   3|   83|
|  5|   3|   22|
|  5|   2|   67|
+---+----+-----+



In the **id** column we have several rows sharing the same value. In other words, the values of the column **id** are not unique. In the block of code below, using window functions, we will create the following 3 new columns:

* high_score: for each id, the highest score achieved
* low_score: for each id, the lowest score achieved
* mean_score: for each id, the average score achieved


In [4]:
#Define Window where we partinion the table by the column id
w = Window.partitionBy("id")

#Create column with the highest score for each id
data = data.withColumn("high_score", F.max("score").over(w))

#Create column with the lowest score for each id
data = data.withColumn("low_score", F.min("score").over(w))

#Create column with the mean score for each id
data = data.withColumn("mean_score", F.mean("score").over(w))

#Print table sorted by id
data.sort("id").show()

+---+----+-----+----------+---------+------------------+
| id|team|score|high_score|low_score|        mean_score|
+---+----+-----+----------+---------+------------------+
|  1|   2|    8|        54|        8|              37.5|
|  1|   1|   54|        54|        8|              37.5|
|  1|   3|   44|        54|        8|              37.5|
|  1|   3|   44|        54|        8|              37.5|
|  2|   3|   41|        67|       41|53.333333333333336|
|  2|   1|   67|        67|       41|53.333333333333336|
|  2|   1|   52|        67|       41|53.333333333333336|
|  3|   3|   71|        71|        7|38.666666666666664|
|  3|   3|    7|        71|        7|38.666666666666664|
|  3|   3|   38|        71|        7|38.666666666666664|
|  4|   1|   91|        91|       45|              64.8|
|  4|   2|   69|        91|       45|              64.8|
|  4|   1|   61|        91|       45|              64.8|
|  4|   3|   58|        91|       45|              64.8|
|  4|   3|   45|        91|    

The first line of code above defines the window, in this case a partition by the column **id**. The following 3 lines of code create the new columns using the same recipe:

1. Create the a new column using the method ```.withColumn()```.
2. The method ```.withColumn()``` takes the name of the new column and the function used to create the new column.
3. We use ```max()```, ```min()```, and ```mean()``` as our functions over the window that we defined earlier, in this case over the group of rows sharing the same value for the column **id**.

As expected all rows sharing the same id value, also share the same value for the new created columns.

# Window Partitioning by One Column and Ordering by Other

In this last example we will show how to create a new column **team_list** where for each **id** we will store all the values in the column **team** as an ordered list.

In [5]:
#Define window
w2 = Window.partitionBy("id").orderBy("team").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

#Create column with list of teams (order)
data = data.withColumn("team_list", F.collect_list("team").over(w2))

data.sort("id", "team").show()

+---+----+-----+----------+---------+------------------+---------------+
| id|team|score|high_score|low_score|        mean_score|      team_list|
+---+----+-----+----------+---------+------------------+---------------+
|  1|   1|   54|        54|        8|              37.5|   [1, 2, 3, 3]|
|  1|   2|    8|        54|        8|              37.5|   [1, 2, 3, 3]|
|  1|   3|   44|        54|        8|              37.5|   [1, 2, 3, 3]|
|  1|   3|   44|        54|        8|              37.5|   [1, 2, 3, 3]|
|  2|   1|   52|        67|       41|53.333333333333336|      [1, 1, 3]|
|  2|   1|   67|        67|       41|53.333333333333336|      [1, 1, 3]|
|  2|   3|   41|        67|       41|53.333333333333336|      [1, 1, 3]|
|  3|   3|   71|        71|        7|38.666666666666664|      [3, 3, 3]|
|  3|   3|    7|        71|        7|38.666666666666664|      [3, 3, 3]|
|  3|   3|   38|        71|        7|38.666666666666664|      [3, 3, 3]|
|  4|   1|   61|        91|       45|              

As expected, the first line of code creates the new window following the same format as before. However, now we are ordering by the column **team** (```.orderBy("team")```), and we are adding ```.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)``` to the line of code.  This last part allow us to grab all **team** values and store them in the list, try to remove it and run the code again to see what happens. 

# Final Words
We went over the basics of how to create and use window functions in pyspark. Now it is your turn to start coding, try to create different windows by partitioning over other column (maybe **team**), or by using other functions to create the new column such as: 
* ``avg()``
* ```count()```
* ``` row_number``` (in this one your window would have to look something like: ```Window.partitionBy("a column").orderBy("another column")```).