# Window Functions in PySPARK, PANDAS and MySQL
Author: Yuan Huang

## Introduction
This notebook showed the usage and implementation of window functions in Pyspark sql, Pandas and MySQL. The database tables and data, and instructions on how to install Mysql server, and load sample database data to the local mysql server can obtained from [the following link](http://www.mysqltutorial.org/). Many of the window function queries in this notebook implemented by Mysql sql queries can be found in [The following link](http://www.mysqltutorial.org/mysql-window-functions/), which provides a reference for comparing the queries implemented by pandas and pyspark sql. The pandas and pyspark sql implementations were implemented by the author.

In this notebook, commonly used window functions including rank, dense rank, mean/average/sum/count/min/max, nth_value, lead, lag, first_value, last_value, row number and n_tiles are demonstrated. More specifically, each window function query was implemented in the following three ways with identical results:
+ **By sql query from Mysql database:**
   sql window functions with 'over(partition by, order by, window framework)' format were transferred to Mysql database server, and executed via pandas read_sql_query command. Query results were fetched and stored as pandas dataframe.
+ **By pandas functions:**
     pandas functions including **rolling, expanding** to define window framework, **groupby** to define partition, **sort_values** to define order by and **apply or transform** to generate the corresponding window function query result columns. The original mysql data tables were read and stored in the corresponding pandas dataframes before data manipulation
+ **By Spark sql:**
     the identical window function aggregation and query results were obtained using pyspark sql. The original mysql data tables were first read and stored in pandas dataframes by pandas.read_sql_query command and then converted to spark dataframes using spark.createDataFrame command.
  
First, let's import packages used in this notebook and establish the mysql connection. This connection was used by the pandas read_sql_query command to query mysql database tables and fetch the resutls to pandas dataframe. 

In [1]:
import pymysql
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
%matplotlib inline
import os

mysql_user=os.environ["Mysql_User"]
mysql_password=os.environ["Mysql_Password"]
my_connection=pymysql.connect(host='localhost',user=mysql_user, passwd=mysql_password,db='classicmodels')
spark=SparkSession.builder.appName("windowFunction").getOrCreate()

## Data Sources
All the data in this notebook were from the mysql database table from the [mysql window function tutorial](http://www.mysqltutorial.org/mysql-window-functions/mysql-percent_rank-function/) and the related links. Panads dataframes were created using pymysql connections and the corresponding spark dataframes were created from the pandas dataframes. The following section created all the pandas dataframes and spark dataframes from the Mysql database tables. Each Mysql database table corresponds to one pandas dataframe and one spark dataframe. 

### Pandas Dataframes:

In [168]:
orderdetails_pdf=pd.read_sql_query('select * from orderdetails',my_connection)
basic_pay_pdf=pd.read_sql_query('select * from basic_pays',my_connection)
orders_pdf=pd.read_sql_query('select * from orders', my_connection)
products_pdf=pd.read_sql_query('select * from products',my_connection)
sales_pdf=pd.read_sql_query('select * from sales',my_connection)
customers_pdf=pd.read_sql_query('select * from customers',my_connection)

In addition to these tables, another table used in this notebook is 'productLineSales'. This table can be create in Mysql database based on the table listed above using the following sql DDL command ( [following link](http://www.mysqltutorial.org/mysql-window-functions/mysql-percent_rank-function/) ). 
<br> <font color=blue> CREATE TABLE productLineSales
<br> SELECT
<br>   productLine,
<br>    YEAR(orderDate) orderYear,
<br>    quantityOrdered * priceEach orderValue
<br>FROM
<br>    orderDetails
<br>        INNER JOIN
<br>    orders USING (orderNumber)
<br>        INNER JOIN
<br>    products USING (productCode)
<br>GROUP BY
<br>    productLine ,
<br>    YEAR(orderDate);
</font>    
In pandas, we can easily create a dataframe corresponding to this table using the pandas merge command, as shown in the follwoing cell:

In [4]:
prolinesale=(orderdetails_pdf.merge(orders_pdf,on=['orderNumber'])
              .merge(products_pdf,on=["productCode"])
            .assign(orderValue=lambda x: x.quantityOrdered*x.priceEach)
            .assign(orderYear=lambda x: pd.to_datetime(x.orderDate).dt.year)    
            [["productLine", "orderYear","orderValue"]])

we can also directly create this table in Mysql server by executing the sql command and read the table to pandas dataframe. The following code creates the pandas dataframe from the Mysql tables:

In [5]:
productLineSales_pdf=pd.read_sql_query('select * from productLineSales',my_connection)

### Spark Dataframes:
We create the spark dataframes, and the corresponding tempViews for spark sql commands.

In [188]:
spark.createDataFrame(orderdetails_pdf).createOrReplaceTempView("orderdetails_sdf")
spark.createDataFrame(basic_pay_pdf).createOrReplaceTempView("basic_pays_sdf")
spark.createDataFrame(orders_pdf).createOrReplaceTempView("orders_sdf")
spark.createDataFrame(products_pdf).createOrReplaceTempView("products_sdf")
spark.createDataFrame(productLineSales_pdf).createOrReplaceTempView("productLineSales_sdf")
spark.createDataFrame(sales_pdf).createOrReplaceTempView("sales_sdf")
spark.createDataFrame(customers_pdf).createOrReplaceTempView("customers_sdf")

### 1. Percent Rank with CTE (Common Table Expressions)

We will run the percent rank code on productLineSales table. Let's first have a look at this table:

In [21]:
productLineSales_pdf.head()

Unnamed: 0,productLine,orderYear,orderValue
0,Vintage Cars,2003,4080.0
1,Vintage Cars,2003,2754.5
2,Vintage Cars,2003,1660.12
3,Vintage Cars,2003,1729.21
4,Vintage Cars,2003,2701.5


#### 1.1 Using stand sql in Mysql database for percent rank
The sql command of percent rank was first executed on a local Mysql server and results were fetched and stored in a pandas dataframe. As shown in the following sql code, the query first used a CTE to extract total amount of orderValue for each productLine, then get the percent_rank of each row using **percent_rank()** window function in sql.

In [45]:
sql="""
with t AS (
     SELECT 
         productLine,
         SUM(orderValue) orderValue
     FROM
         productLineSales
     GROUP BY
         productLine
 )
 
 SELECT
     productLine,
     orderValue,
     ROUND(
         PERCENT_RANK() OVER (
             ORDER BY orderValue
         )
         ,2) percentile_rank
FROM
t;
"""

pd.read_sql_query(sql,my_connection)

Unnamed: 0,productLine,orderValue,percentile_rank
0,Trains,188532.92,0.0
1,Ships,663998.34,0.17
2,Planes,954637.54,0.33
3,Trucks and Buses,1024113.57,0.5
4,Motorcycles,1121426.12,0.67
5,Vintage Cars,1797559.63,0.83
6,Classic Cars,3853922.49,1.0


#### 1.2 Using pandas for percent rank
In pandas, we created the CTE by groupby 'productLine', extract the 'orderValue' column, obtain the total of the orderValue for each productLine, then recover the dataframe columns of 'productLine' and 'orderValue' by reset_index() command, then create the pct_rank column using assign() fucntion. The percent rank values are calculated using the lambda function. The entire CTE dataframe was sent to assign function as x and the percent rank for each row was calcualted based the rank of each row. Comparing with the results obtained from the standard sql command on Mysql, the results are consistent.

In [46]:
(productLineSales_pdf
 .groupby('productLine')['orderValue']
 .sum().reset_index()
 .sort_values('orderValue')
 .assign(pct_rank=lambda x:round((x['orderValue'].rank()-1)/(len(x)-1),2)))

Unnamed: 0,productLine,orderValue,pct_rank
4,Trains,188532.92,0.0
3,Ships,663998.34,0.17
2,Planes,954637.54,0.33
5,Trucks and Buses,1024113.57,0.5
1,Motorcycles,1121426.12,0.67
6,Vintage Cars,1797559.63,0.83
0,Classic Cars,3853922.49,1.0


#### 1.3 Using Pyspar sql for percent rank
In Pyspark sql, we directly use spark sql to execute the standard sql command, which is the same as in Mysql server. Here we used the tempView of productLineSales_sdf created based on the productLineSales table.

In [47]:
sql_pct_spark="""
with t AS (
     SELECT 
         productLine,
         SUM(orderValue) orderValue
     FROM
         productLineSales_sdf
     GROUP BY
         productLine
 )
 
 SELECT
     productLine,
     orderValue,
     ROUND(
         PERCENT_RANK() OVER (
             ORDER BY orderValue
         )
         ,2) percentile_rank
FROM
t
"""

In [48]:
spark.sql(sql_pct_spark).show()

+----------------+------------------+---------------+
|     productLine|        orderValue|percentile_rank|
+----------------+------------------+---------------+
|          Trains|         188532.92|            0.0|
|           Ships| 663998.3400000001|           0.17|
|          Planes|         954637.54|           0.33|
|Trucks and Buses|1024113.5699999998|            0.5|
|     Motorcycles|1121426.1199999999|           0.67|
|    Vintage Cars|1797559.6300000004|           0.83|
|    Classic Cars|        3853922.49|            1.0|
+----------------+------------------+---------------+



### 2. NTILE
We will run the NTILE code on productLineSales table. The following is the standard sql commands executed on Mysql.
server.

#### 2.1 Standard sql on Mysql server for NTILE

In [212]:
sql="""SELECT
    productline, 
    orderyear, 
    ordervalue,
    NTILE(3) OVER (
        PARTITION BY orderyear
        ORDER BY ordervalue DESC
    ) product_line_group
FROM 
    productLineSales
    order by productline, 
    orderyear, 
    ordervalue
    """

pd.read_sql_query(sql,my_connection).head(5)

Unnamed: 0,productline,orderyear,ordervalue,product_line_group
0,Classic Cars,2003,675.78,3
1,Classic Cars,2003,687.2,3
2,Classic Cars,2003,687.36,3
3,Classic Cars,2003,697.83,3
4,Classic Cars,2003,830.75,3


#### 2.2 Pyspark sql for NTILE

In [208]:
sql="""SELECT
    productline, 
    orderyear, 
    ordervalue,
    NTILE(3) OVER (
        PARTITION BY orderyear
        ORDER BY ordervalue DESC
    ) product_line_group
FROM 
    productLineSales_sdf
    order by productline, 
    orderyear, 
    ordervalue
    """
spark.sql(sql).show(5)

+------------+---------+----------+------------------+
| productline|orderyear|ordervalue|product_line_group|
+------------+---------+----------+------------------+
|Classic Cars|     2003|    675.78|                 3|
|Classic Cars|     2003|     687.2|                 3|
|Classic Cars|     2003|    687.36|                 3|
|Classic Cars|     2003|    697.83|                 3|
|Classic Cars|     2003|    830.75|                 3|
+------------+---------+----------+------------------+
only showing top 5 rows



### 3. Nth_Value
We applied the nth_value window function on basic_pays table, and attached the second highest salary of each row's corresponding department to each row.

#### 3.1. Standard sql on Mysql server for Nth_Value

In [206]:
sql='''SELECT
 employee_name,
 department,
 salary,
 NTH_VALUE(employee_name, 2) OVER  (
 PARTITION BY department
 ORDER BY salary DESC
 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
 ) second_highest_salary
FROM
 basic_pays
 order by
 UPPER(department),
 salary desc,
 employee_name
 '''

pd.read_sql_query(sql,my_connection)

Unnamed: 0,employee_name,department,salary,second_highest_salary
0,Gerard Bondur,Accounting,11472,Mary Patterson
1,Mary Patterson,Accounting,9998,Mary Patterson
2,Jeff Firrelli,Accounting,8992,Mary Patterson
3,William Patterson,Accounting,8870,Mary Patterson
4,Diane Murphy,Accounting,8435,Mary Patterson
5,Anthony Bow,Accounting,6627,Mary Patterson
6,Leslie Jennings,IT,8113,Leslie Thompson
7,Leslie Thompson,IT,5186,Leslie Thompson
8,George Vanauf,Sales,10563,Steve Patterson
9,Steve Patterson,Sales,9441,Steve Patterson


#### 3.2. Pandas for Nth_Value

In [204]:
(basic_pay_pdf.assign(second_highest=basic_pay_pdf
                 .sort_values("salary", ascending=False)
                 .groupby('department')['employee_name']
                 .transform(lambda x : x.iloc[1] if len(x)>1 else x.iloc[0] ))
                 .assign(department_upper=lambda x:x['department'].str.upper())
                 .sort_values(["department_upper","salary","employee_name"],ascending=[True,False,True])
                 .drop(columns=['department_upper']))
                 

Unnamed: 0,employee_name,department,salary,second_highest
5,Gerard Bondur,Accounting,11472,Mary Patterson
13,Mary Patterson,Accounting,9998,Mary Patterson
7,Jeff Firrelli,Accounting,8992,Mary Patterson
16,William Patterson,Accounting,8870,Mary Patterson
2,Diane Murphy,Accounting,8435,Mary Patterson
0,Anthony Bow,Accounting,6627,Mary Patterson
10,Leslie Jennings,IT,8113,Leslie Thompson
11,Leslie Thompson,IT,5186,Leslie Thompson
4,George Vanauf,Sales,10563,Steve Patterson
15,Steve Patterson,Sales,9441,Steve Patterson


### 4 Mean/Sum of Rows in Window
The table used is the basic_pays table

#### 4.1. Standard sql for aggreation values of a window (mean/sum/max/count etc.)

In [203]:
sql='''SELECT
 employee_name,
 department,
 salary,
AVG(salary) OVER  (
 PARTITION BY department
 ORDER BY salary
 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
 ) ave_mean
FROM
 basic_pays
 order by
 UPPER(department),
 salary desc,
 employee_name
 '''

pd.read_sql_query(sql,my_connection)

Unnamed: 0,employee_name,department,salary,ave_mean
0,Gerard Bondur,Accounting,11472,10154.0
1,Mary Patterson,Accounting,9998,9286.6667
2,Jeff Firrelli,Accounting,8992,8765.6667
3,William Patterson,Accounting,8870,7977.3333
4,Diane Murphy,Accounting,8435,7531.0
5,Anthony Bow,Accounting,6627,6627.0
6,Leslie Jennings,IT,8113,6649.5
7,Leslie Thompson,IT,5186,5186.0
8,George Vanauf,Sales,10563,9728.3333
9,Steve Patterson,Sales,9441,8427.3333


#### 4.2. Pandas for aggreation values of a window (mean/sum/max/count etc.)

In [201]:
(basic_pay_pdf.assign(ave_mean=basic_pay_pdf.sort_values('salary')
                 .groupby('department')['salary']
                 .apply(lambda x:round(x.rolling(3,min_periods=1).mean(),2)))
                 .assign(department_upper=lambda x:x['department'].str.upper())
                 .sort_values(["department_upper","salary","employee_name"],ascending=[True,False,True])
                 .drop(columns=['department_upper'])) 

Unnamed: 0,employee_name,department,salary,ave_mean
5,Gerard Bondur,Accounting,11472,10154.0
13,Mary Patterson,Accounting,9998,9286.67
7,Jeff Firrelli,Accounting,8992,8765.67
16,William Patterson,Accounting,8870,7977.33
2,Diane Murphy,Accounting,8435,7531.0
0,Anthony Bow,Accounting,6627,6627.0
10,Leslie Jennings,IT,8113,6649.5
11,Leslie Thompson,IT,5186,5186.0
4,George Vanauf,Sales,10563,9728.33
15,Steve Patterson,Sales,9441,8427.33


#### 4.3. Spark sql for aggreation values of a window (mean/sum/max/count etc.)

In [202]:
sql='''SELECT
 employee_name,
 department,
 salary,
 AVG(salary) OVER  (
 PARTITION BY department
 ORDER BY salary
 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
 ) ave_mean
FROM
 basic_pays_sdf
 order by
 UPPER(department),
 salary desc,
 employee_name
 '''
spark.sql(sql).show()

+-----------------+----------+------+------------------+
|    employee_name|department|salary|          ave_mean|
+-----------------+----------+------+------------------+
|    Gerard Bondur|Accounting| 11472|           10154.0|
|   Mary Patterson|Accounting|  9998| 9286.666666666666|
|    Jeff Firrelli|Accounting|  8992| 8765.666666666666|
|William Patterson|Accounting|  8870| 7977.333333333333|
|     Diane Murphy|Accounting|  8435|            7531.0|
|      Anthony Bow|Accounting|  6627|            6627.0|
|  Leslie Jennings|        IT|  8113|            6649.5|
|  Leslie Thompson|        IT|  5186|            5186.0|
|    George Vanauf|     Sales| 10563| 9728.333333333334|
|  Steve Patterson|     Sales|  9441| 8427.333333333334|
|   Julie Firrelli|     Sales|  9181|            7920.5|
|   Foon Yue Tseng|     Sales|  6660|            6660.0|
|       Larry Bott|       SCM| 11798|           11229.0|
|  Pamela Castillo|       SCM| 11303|10779.333333333334|
|      Barry Jones|       SCM| 

### 5 Row Number
Here we use basic_pays table. We only show rows having row numbers of 1 or 2, which was assigned according to the descending order of the salary within each department.

#### 5.1 Standard sql for row number

In [199]:
sql="""
SELECT 
  t.employee_name,
  t.department,
  t.salary
FROM(
  SELECT
  *,
  ROW_NUMBER() OVER(PARTITION BY department ORDER BY salary DESC) as rn 
  FROM basic_pays
  ) t
 WHERE t.rn<3
 ORDER BY
  UPPER(t.department) asc,
  t.salary desc,
  t.employee_name asc
"""
pd.read_sql_query(sql,my_connection)

Unnamed: 0,employee_name,department,salary
0,Gerard Bondur,Accounting,11472
1,Mary Patterson,Accounting,9998
2,Leslie Jennings,IT,8113
3,Leslie Thompson,IT,5186
4,George Vanauf,Sales,10563
5,Steve Patterson,Sales,9441
6,Larry Bott,SCM,11798
7,Pamela Castillo,SCM,11303


#### 5.2 Pandas for row number

In [198]:
(basic_pay_pdf
 .assign(n=basic_pay_pdf.groupby('department')['salary']
 .rank(method='first',ascending=False))
 .query('n<3')[["employee_name","department","salary"]]
 .assign(department_upper=lambda x:x['department'].str.upper())
 .sort_values(["department_upper","salary","employee_name"],ascending=[True,False,True])
 .drop(columns=['department_upper']))

Unnamed: 0,employee_name,department,salary
5,Gerard Bondur,Accounting,11472
13,Mary Patterson,Accounting,9998
10,Leslie Jennings,IT,8113
11,Leslie Thompson,IT,5186
4,George Vanauf,Sales,10563
15,Steve Patterson,Sales,9441
9,Larry Bott,SCM,11798
14,Pamela Castillo,SCM,11303


#### 5.3 Spark sql for row number

In [197]:
sql="""
SELECT 
  t.employee_name,
  t.department,
  t.salary
FROM(
  SELECT
  *,
  ROW_NUMBER() OVER(PARTITION BY department ORDER BY salary DESC) as rn 
  FROM basic_pays_sdf
  ) t
 WHERE t.rn<3
 ORDER BY
  UPPER(t.department) asc,
  t.salary desc,
  t.employee_name asc
"""
spark.sql(sql).show()

+---------------+----------+------+
|  employee_name|department|salary|
+---------------+----------+------+
|  Gerard Bondur|Accounting| 11472|
| Mary Patterson|Accounting|  9998|
|Leslie Jennings|        IT|  8113|
|Leslie Thompson|        IT|  5186|
|  George Vanauf|     Sales| 10563|
|Steve Patterson|     Sales|  9441|
|     Larry Bott|       SCM| 11798|
|Pamela Castillo|       SCM| 11303|
+---------------+----------+------+



### 6 Rank
Rank() function was applied to sales table

#### 6.1. Standard sql for rank

In [127]:
sql="""
  SELECT
    sales_employee,
    fiscal_year,
    sale,
    RANK() OVER (PARTITION BY
                     fiscal_year
                 ORDER BY
                     sale DESC
                ) sales_rank
FROM
    sales
ORDER BY fiscal_year,sale desc, sales_employee    
  """
pd.read_sql_query(sql,my_connection)

Unnamed: 0,sales_employee,fiscal_year,sale,sales_rank
0,John,2016,200.0,1
1,Alice,2016,150.0,2
2,Bob,2016,100.0,3
3,Bob,2017,150.0,1
4,John,2017,150.0,1
5,Alice,2017,100.0,3
6,John,2018,250.0,1
7,Alice,2018,200.0,2
8,Bob,2018,200.0,2


#### 6.2. Pandas for rank

In [128]:
(sales_pdf.assign(sales_rank=sales_pdf
                  .groupby('fiscal_year')['sale'].rank(method="min",ascending=False))
                  .sort_values(["fiscal_year","sale","sales_employee"],ascending=[True,False,True]))

Unnamed: 0,sales_employee,fiscal_year,sale,sales_rank
6,John,2016,200.0,1.0
0,Alice,2016,150.0,2.0
3,Bob,2016,100.0,3.0
4,Bob,2017,150.0,1.0
7,John,2017,150.0,1.0
1,Alice,2017,100.0,3.0
8,John,2018,250.0,1.0
2,Alice,2018,200.0,2.0
5,Bob,2018,200.0,2.0


#### 6.3. Spark for rank

In [132]:
sql="""
  SELECT
    sales_employee,
    fiscal_year,
    sale,
    RANK() OVER (PARTITION BY
                     fiscal_year
                 ORDER BY
                     sale DESC
                ) sales_rank
FROM
    sales_sdf
ORDER BY fiscal_year,sale desc, sales_employee    
  """
spark.sql(sql).show()

+--------------+-----------+-----+----------+
|sales_employee|fiscal_year| sale|sales_rank|
+--------------+-----------+-----+----------+
|          John|       2016|200.0|         1|
|         Alice|       2016|150.0|         2|
|           Bob|       2016|100.0|         3|
|           Bob|       2017|150.0|         1|
|          John|       2017|150.0|         1|
|         Alice|       2017|100.0|         3|
|          John|       2018|250.0|         1|
|         Alice|       2018|200.0|         2|
|           Bob|       2018|200.0|         2|
+--------------+-----------+-----+----------+



### 7 Dense Rank
Use the sales table

#### 7.1. Standard sql for dense rank

In [133]:
sql="""
  SELECT
    sales_employee,
    fiscal_year,
    sale,
    DENSE_RANK() OVER (PARTITION BY
                     fiscal_year
                 ORDER BY
                     sale DESC
                ) sales_rank
FROM
    sales
ORDER BY fiscal_year,sale desc, sales_employee    
  """
pd.read_sql_query(sql,my_connection)

Unnamed: 0,sales_employee,fiscal_year,sale,sales_rank
0,John,2016,200.0,1
1,Alice,2016,150.0,2
2,Bob,2016,100.0,3
3,Bob,2017,150.0,1
4,John,2017,150.0,1
5,Alice,2017,100.0,2
6,John,2018,250.0,1
7,Alice,2018,200.0,2
8,Bob,2018,200.0,2


#### 7.2 Pandas for dense rank

In [134]:
(sales_pdf.assign(sales_rank=sales_pdf
                  .groupby('fiscal_year')['sale'].rank(method="dense",ascending=False))
                  .sort_values(["fiscal_year","sale","sales_employee"],ascending=[True,False,True]))

Unnamed: 0,sales_employee,fiscal_year,sale,sales_rank
6,John,2016,200.0,1.0
0,Alice,2016,150.0,2.0
3,Bob,2016,100.0,3.0
4,Bob,2017,150.0,1.0
7,John,2017,150.0,1.0
1,Alice,2017,100.0,2.0
8,John,2018,250.0,1.0
2,Alice,2018,200.0,2.0
5,Bob,2018,200.0,2.0


#### 7.3 Spark sql for dense rank

In [136]:
sql="""
  SELECT
    sales_employee,
    fiscal_year,
    sale,
    DENSE_RANK() OVER (PARTITION BY
                     fiscal_year
                 ORDER BY
                     sale DESC
                ) sales_rank
FROM
    sales_sdf
ORDER BY fiscal_year,sale desc, sales_employee    
  """
spark.sql(sql).show()

+--------------+-----------+-----+----------+
|sales_employee|fiscal_year| sale|sales_rank|
+--------------+-----------+-----+----------+
|          John|       2016|200.0|         1|
|         Alice|       2016|150.0|         2|
|           Bob|       2016|100.0|         3|
|           Bob|       2017|150.0|         1|
|          John|       2017|150.0|         1|
|         Alice|       2017|100.0|         2|
|          John|       2018|250.0|         1|
|         Alice|       2018|200.0|         2|
|           Bob|       2018|200.0|         2|
+--------------+-----------+-----+----------+



### 8 First_Value
For each row in basic_pays table, attach the first salary value of its corresponding department, based on the descendent order of the salary.

#### 8.1. Standard sql for First_Value

In [196]:
sql="""SELECT
  employee_name,
  department,
  salary,
  FIRST_VALUE(salary) OVER(PARTITION BY department ORDER BY salary DESC
  RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as 1st_depart_salary
  FROM basic_pays
  ORDER BY
  
  UPPER(department),
  salary DESC,
  employee_name
  """
pd.read_sql_query(sql,my_connection)

Unnamed: 0,employee_name,department,salary,1st_depart_salary
0,Gerard Bondur,Accounting,11472,11472
1,Mary Patterson,Accounting,9998,11472
2,Jeff Firrelli,Accounting,8992,11472
3,William Patterson,Accounting,8870,11472
4,Diane Murphy,Accounting,8435,11472
5,Anthony Bow,Accounting,6627,11472
6,Leslie Jennings,IT,8113,8113
7,Leslie Thompson,IT,5186,8113
8,George Vanauf,Sales,10563,10563
9,Steve Patterson,Sales,9441,10563


#### 8.2. Pandas for First_Value (Range Between Unbounded Preceding and Unbounded Following)

In [195]:
(basic_pay_pdf.assign(first_dept_salary=basic_pay_pdf
                      .sort_values('salary',ascending=False)
                      .groupby('department')['salary']
                      .transform(lambda x:x.iloc[0]))
                      .assign(department_upper=lambda x:x['department'].str.upper())
                       .sort_values(["department_upper","salary","employee_name"],ascending=[True,False,True])
                      .drop(columns=['department_upper']))                    

Unnamed: 0,employee_name,department,salary,first_dept_salary
5,Gerard Bondur,Accounting,11472,11472
13,Mary Patterson,Accounting,9998,11472
7,Jeff Firrelli,Accounting,8992,11472
16,William Patterson,Accounting,8870,11472
2,Diane Murphy,Accounting,8435,11472
0,Anthony Bow,Accounting,6627,11472
10,Leslie Jennings,IT,8113,8113
11,Leslie Thompson,IT,5186,8113
4,George Vanauf,Sales,10563,10563
15,Steve Patterson,Sales,9441,10563


#### 8.3. Spark sql for First_Value

In [194]:
sql="""SELECT
  employee_name,
  department,
  salary,
  FIRST_VALUE(salary) OVER(PARTITION BY department ORDER BY salary DESC
  RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as 1st_depart_salary
  FROM basic_pays_sdf
  ORDER BY
  
  UPPER(department),
  salary DESC,
  employee_name
  """
spark.sql(sql).show()

+-----------------+----------+------+-----------------+
|    employee_name|department|salary|1st_depart_salary|
+-----------------+----------+------+-----------------+
|    Gerard Bondur|Accounting| 11472|            11472|
|   Mary Patterson|Accounting|  9998|            11472|
|    Jeff Firrelli|Accounting|  8992|            11472|
|William Patterson|Accounting|  8870|            11472|
|     Diane Murphy|Accounting|  8435|            11472|
|      Anthony Bow|Accounting|  6627|            11472|
|  Leslie Jennings|        IT|  8113|             8113|
|  Leslie Thompson|        IT|  5186|             8113|
|    George Vanauf|     Sales| 10563|            10563|
|  Steve Patterson|     Sales|  9441|            10563|
|   Julie Firrelli|     Sales|  9181|            10563|
|   Foon Yue Tseng|     Sales|  6660|            10563|
|       Larry Bott|       SCM| 11798|            11798|
|  Pamela Castillo|       SCM| 11303|            11798|
|      Barry Jones|       SCM| 10586|           

### 9 Last_Value
For each row in basic_pays table, attach the last salary value of its corresponding department, based on the descendent order of the salary.

#### 9.1. Standard sql for First_Value

In [154]:
sql="""SELECT
  employee_name,
  department,
  salary,
  LAST_VALUE(salary) OVER(PARTITION BY department ORDER BY salary DESC
  RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_depart_salary
  FROM basic_pays
  ORDER BY
  
  UPPER(department),
  salary DESC,
  employee_name
  """
pd.read_sql_query(sql,my_connection)

Unnamed: 0,employee_name,department,salary,last_depart_salary
0,Gerard Bondur,Accounting,11472,6627
1,Mary Patterson,Accounting,9998,6627
2,Jeff Firrelli,Accounting,8992,6627
3,William Patterson,Accounting,8870,6627
4,Diane Murphy,Accounting,8435,6627
5,Anthony Bow,Accounting,6627,6627
6,Leslie Jennings,IT,8113,5186
7,Leslie Thompson,IT,5186,5186
8,George Vanauf,Sales,10563,6660
9,Steve Patterson,Sales,9441,6660


#### 9.2 Pands for Last_Value (Range Between Unbounded Preceding and Unbounded Following)

In [193]:
(basic_pay_pdf.assign(last_dept_salary=basic_pay_pdf
                      .sort_values('salary',ascending=False)
                      .groupby('department')['salary']
                      .transform(lambda x:x.iloc[-1]))
                      .assign(department_upper=lambda x:x['department'].str.upper())
                       .sort_values(["department_upper","salary","employee_name"],ascending=[True,False,True])
                      .drop(columns=['department_upper']))

Unnamed: 0,employee_name,department,salary,last_dept_salary
5,Gerard Bondur,Accounting,11472,6627
13,Mary Patterson,Accounting,9998,6627
7,Jeff Firrelli,Accounting,8992,6627
16,William Patterson,Accounting,8870,6627
2,Diane Murphy,Accounting,8435,6627
0,Anthony Bow,Accounting,6627,6627
10,Leslie Jennings,IT,8113,5186
11,Leslie Thompson,IT,5186,5186
4,George Vanauf,Sales,10563,6660
15,Steve Patterson,Sales,9441,6660


### 10 Lead
use orders table to find the next order date for each row

#### 10.1. Standard sql for Lead()

In [192]:
sql="""
SELECT 
    customerName,
    orderDate,
    LEAD(orderDate,1) OVER (
        PARTITION BY customerNumber
        ORDER BY orderDate ) nextOrderDate
FROM 
    orders
INNER JOIN customers USING (customerNumber)
ORDER BY
UPPER(customerName), orderDate
;
"""

pd.read_sql_query(sql,my_connection).head()

Unnamed: 0,customerName,orderDate,nextOrderDate
0,Alpha Cognac,2003-07-04,2003-11-08
1,Alpha Cognac,2003-11-08,2005-03-28
2,Alpha Cognac,2005-03-28,
3,Amica Models & Co.,2004-08-17,2004-09-09
4,Amica Models & Co.,2004-09-09,


#### 10.2. Pandas for Lead()

In [185]:
(orders_pdf.assign(nextOrderDate=orders_pdf
                   .sort_values('orderDate')
                   .groupby('customerNumber')['orderDate'].shift(-1))
                   .merge(customers_pdf,on=['customerNumber'])[["customerName","orderDate","nextOrderDate"]]
                   .assign(customerName_upper=lambda x:x['customerName'].str.upper())
                   .sort_values(["customerName_upper","orderDate"])
                   .drop(columns=["customerName_upper"])).head()

Unnamed: 0,customerName,orderDate,nextOrderDate
150,Alpha Cognac,2003-07-04,2003-11-08
151,Alpha Cognac,2003-11-08,2005-03-28
152,Alpha Cognac,2005-03-28,
319,Amica Models & Co.,2004-08-17,2004-09-09
320,Amica Models & Co.,2004-09-09,


#### 10.3. Spark sql for Lead()

In [191]:
sql="""
SELECT 
    customerName,
    orderDate,
    LEAD(orderDate,1) OVER (
        PARTITION BY customerNumber
        ORDER BY orderDate ) nextOrderDate
FROM 
    orders_sdf
INNER JOIN customers_sdf USING (customerNumber)
ORDER BY
UPPER(customerName), orderDate

"""
spark.sql(sql).show(5)

+------------------+----------+-------------+
|      customerName| orderDate|nextOrderDate|
+------------------+----------+-------------+
|      Alpha Cognac|2003-07-04|   2003-11-08|
|      Alpha Cognac|2003-11-08|   2005-03-28|
|      Alpha Cognac|2005-03-28|         null|
|Amica Models & Co.|2004-08-17|   2004-09-09|
|Amica Models & Co.|2004-09-09|         null|
+------------------+----------+-------------+
only showing top 5 rows



### 11 Lag
use orders table to find the next order date for each row

#### 11.1. Standard sql for Lag()

In [213]:
sql="""
SELECT 
    customerName,
    orderDate,
    LAG(orderDate,1) OVER (
        PARTITION BY customerNumber
        ORDER BY orderDate ) preOrderDate
FROM 
    orders
INNER JOIN customers USING (customerNumber)
ORDER BY
UPPER(customerName), orderDate
;
"""

pd.read_sql_query(sql,my_connection).head()

Unnamed: 0,customerName,orderDate,preOrderDate
0,Alpha Cognac,2003-07-04,
1,Alpha Cognac,2003-11-08,2003-07-04
2,Alpha Cognac,2005-03-28,2003-11-08
3,Amica Models & Co.,2004-08-17,
4,Amica Models & Co.,2004-09-09,2004-08-17


#### 11.2. Pandas for Lag()

In [216]:
(orders_pdf.assign(preOrderDate=orders_pdf
                   .sort_values('orderDate')
                   .groupby('customerNumber')['orderDate'].shift(1))
                   .merge(customers_pdf,on=['customerNumber'])[["customerName","orderDate","preOrderDate"]]
                   .assign(customerName_upper=lambda x:x['customerName'].str.upper())
                   .sort_values(["customerName_upper","orderDate"])
                   .drop(columns=["customerName_upper"])).head()

Unnamed: 0,customerName,orderDate,preOrderDate
150,Alpha Cognac,2003-07-04,
151,Alpha Cognac,2003-11-08,2003-07-04
152,Alpha Cognac,2005-03-28,2003-11-08
319,Amica Models & Co.,2004-08-17,
320,Amica Models & Co.,2004-09-09,2004-08-17


#### 11.3. Spark sql for Lag()

In [219]:
sql="""
SELECT 
    customerName,
    orderDate,
    LAG(orderDate,1) OVER (
        PARTITION BY os.customerNumber
        ORDER BY orderDate ) preOrderDate
FROM 
    orders_sdf os
INNER JOIN customers_sdf cs ON (os.customerNumber=cs.customerNumber)
ORDER BY
UPPER(customerName), orderDate

"""
spark.sql(sql).show(5)

+------------------+----------+------------+
|      customerName| orderDate|preOrderDate|
+------------------+----------+------------+
|      Alpha Cognac|2003-07-04|        null|
|      Alpha Cognac|2003-11-08|  2003-07-04|
|      Alpha Cognac|2005-03-28|  2003-11-08|
|Amica Models & Co.|2004-08-17|        null|
|Amica Models & Co.|2004-09-09|  2004-08-17|
+------------------+----------+------------+
only showing top 5 rows



### Conclusions:
1. Commonly used window functions were applied to Mysql database tables, panda dataframes and spark dataframes using standard sql, pandas functions ans spark sql. 
2. Most of the windows functions can be applied to all of the three techniques.
3. Spark sql queries are very similar to Mysql sql queries and they generate identical results. Therefore, spark sql provides a convenient way to manipulate data by spark
4. pandas window functions provides very flexible ways to manipulate data, but needs to notice the subtle difference in rolling objects, expanding objects, and directly using groupby for defining fixed windows to the current row (rolling), unbounded preceding to current row (expanding), and unbounded preceding to unbounded following (direct use groupby and then select the specific column). In addition, a good understanding of apply, transform and assign functions in pandas is necessary to combine the different use cases of window functions as deomonstrated in this notebook.