In [1]:
import os
import sys
from pyspark.sql import SparkSession

os.environ["HADOOP_HOME"] = "C:\\Users\\SkJain\\Downloads\\Compressed\\winutils-master\\hadoop-3.2.2"
sys.path.append('C:\\Users\\SkJain\\Downloads\\Compressed\\winutils-master\\hadoop-3.2.2\\bin')

In [2]:
spark = SparkSession. \
    builder. \
    config("spark.ui.port", "0"). \
    enableHiveSupport(). \
    appName('SparkSql - Window Functions'). \
    master('local'). \
    getOrCreate()

## prepare database

In [4]:
spark.sql("SHOW DATABASEs").show(200, False)

+----------+
|namespace |
+----------+
|default   |
|nysedb    |
|siddhantdb|
+----------+



In [5]:
spark.sql("SELECT current_database()").show(200, False)

+------------------+
|current_database()|
+------------------+
|default           |
+------------------+



In [6]:
spark.sql("USE siddhantdb").show(200, False)

++
||
++
++



In [7]:
spark.sql("SELECT current_database()").show(200, False)

+------------------+
|current_database()|
+------------------+
|siddhantdb        |
+------------------+



In [8]:
spark.sql("SHOW TABLES").show(200, False)

+----------+------------------+-----------+
|database  |tableName         |isTemporary|
+----------+------------------+-----------+
|siddhantdb|orders            |false      |
|siddhantdb|orders_partitioned|false      |
+----------+------------------+-----------+



### Create employee table

In [9]:
create_employee_query = """CREATE TABLE employees (
    employee_id     int,
    first_name      varchar(20),
    last_name       varchar(25),
    email           varchar(25),
    phone_number    varchar(20),
    hire_date       date,
    job_id          varchar(10),
    salary          decimal(8,2),
    commission_pct  decimal(2,2),
    manager_id      int,
    department_id   int
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'"""

spark.sql(create_employee_query).show(200, False)

++
||
++
++



In [10]:
spark.sql("SHOW TABLES").show(200, False)

+----------+------------------+-----------+
|database  |tableName         |isTemporary|
+----------+------------------+-----------+
|siddhantdb|employees         |false      |
|siddhantdb|orders            |false      |
|siddhantdb|orders_partitioned|false      |
+----------+------------------+-----------+



In [15]:
employee_file_path = 'C:/Users/SkJain/Downloads/Compressed/data-master/hr_db/employees'
load_data_query = f"LOAD DATA LOCAL INPATH '{employee_file_path}' INTO TABLE employees"
spark.sql(load_data_query).show(200, False)

++
||
++
++



In [19]:
spark.sql("SELECT * FROM employees LIMIT 10").show(200, False)

+-----------+----------+---------+--------+------------+----------+----------+--------+--------------+----------+-------------+
|employee_id|first_name|last_name|email   |phone_number|hire_date |job_id    |salary  |commission_pct|manager_id|department_id|
+-----------+----------+---------+--------+------------+----------+----------+--------+--------------+----------+-------------+
|100        |Steven    |King     |SKING   |515.123.4567|1987-06-17|AD_PRES   |24000.00|null          |null      |90           |
|101        |Neena     |Kochhar  |NKOCHHAR|515.123.4568|1989-09-21|AD_VP     |17000.00|null          |100       |90           |
|102        |Lex       |De Haan  |LDEHAAN |515.123.4569|1993-01-13|AD_VP     |17000.00|null          |100       |90           |
|103        |Alexander |Hunold   |AHUNOLD |590.423.4567|1990-01-03|IT_PROG   |9000.00 |null          |102       |60           |
|104        |Bruce     |Ernst    |BERNST  |590.423.4568|1991-05-21|IT_PROG   |6000.00 |null          |10

In [20]:
spark.sql("SELECT count(*) FROM employees").show(200, False)

+--------+
|count(1)|
+--------+
|107     |
+--------+



### create daily revenue table

In [49]:
spark.sql("SHOW TABLES").show(200, False)

+----------+------------------+-----------+
|database  |tableName         |isTemporary|
+----------+------------------+-----------+
|siddhantdb|employees         |false      |
|siddhantdb|orders            |false      |
|siddhantdb|orders_partitioned|false      |
+----------+------------------+-----------+



In [51]:
create_order_item_query = """ CREATE TABLE IF NOT EXISTS order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_prod_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_prod_price FLOAT
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
"""

spark.sql(create_order_item_query)
orderItemsFilePath = 'datasets/order_items/*'
load_data_query = f"LOAD DATA LOCAL INPATH '{orderItemsFilePath}' INTO TABLE order_items"
spark.sql(load_data_query)

DataFrame[]

In [52]:
spark.sql("SHOW TABLES").show(200, False)

+----------+------------------+-----------+
|database  |tableName         |isTemporary|
+----------+------------------+-----------+
|siddhantdb|employees         |false      |
|siddhantdb|order_items       |false      |
|siddhantdb|orders            |false      |
|siddhantdb|orders_partitioned|false      |
+----------+------------------+-----------+



In [53]:
spark.sql("SELECT * FROM order_items LIMIT 10").show(200, False)

+-------------+-------------------+------------------+-------------------+-------------------+---------------------+
|order_item_id|order_item_order_id|order_item_prod_id|order_item_quantity|order_item_subtotal|order_item_prod_price|
+-------------+-------------------+------------------+-------------------+-------------------+---------------------+
|1            |1                  |957               |1                  |299.98             |299.98               |
|2            |2                  |1073              |1                  |199.99             |199.99               |
|3            |2                  |502               |5                  |250.0              |50.0                 |
|4            |2                  |403               |1                  |129.99             |129.99               |
|5            |4                  |897               |2                  |49.98              |24.99                |
|6            |4                  |365               |5         

In [54]:
### daily revenue and daily product revenue table

In [55]:
daily_revenue_query = """
CREATE TABLE daily_revenue
AS
    SELECT o.order_date,
           round(sum(oi.order_item_subtotal), 2) AS revenue
    FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
    WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    GROUP BY o.order_date
"""

spark.sql(daily_revenue_query)

DataFrame[]

In [59]:
daily_product_revenue = """
CREATE TABLE daily_product_revenue
    AS
    SELECT o.order_date,
           oi.order_item_prod_id,
           round(sum(oi.order_item_subtotal), 2) AS revenue
    FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
    WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    GROUP BY o.order_date, oi.order_item_prod_id
"""
spark.sql(daily_product_revenue)

DataFrame[]

In [60]:
spark.sql("SHOW TABLES").show(200, False)

+----------+---------------------+-----------+
|database  |tableName            |isTemporary|
+----------+---------------------+-----------+
|siddhantdb|daily_product_revenue|false      |
|siddhantdb|daily_revenue        |false      |
|siddhantdb|employees            |false      |
|siddhantdb|order_items          |false      |
|siddhantdb|orders               |false      |
|siddhantdb|orders_partitioned   |false      |
+----------+---------------------+-----------+



In [61]:
spark.sql("SELECT * FROM daily_revenue LIMIT 10").show(200, False)

+---------------------+--------+
|order_date           |revenue |
+---------------------+--------+
|2013-08-13 00:00:00.0|17956.88|
|2013-10-12 00:00:00.0|35698.85|
|2013-11-15 00:00:00.0|34443.22|
|2014-03-19 00:00:00.0|32967.69|
|2014-04-26 00:00:00.0|53644.32|
|2013-09-16 00:00:00.0|29117.35|
|2013-09-20 00:00:00.0|29575.36|
|2013-12-31 00:00:00.0|52308.99|
|2013-09-06 00:00:00.0|61976.1 |
|2014-06-15 00:00:00.0|26046.93|
+---------------------+--------+



In [62]:
spark.sql("SELECT * FROM daily_product_revenue LIMIT 10").show(200, False)

+---------------------+------------------+-------+
|order_date           |order_item_prod_id|revenue|
+---------------------+------------------+-------+
|2013-07-27 00:00:00.0|703               |39.98  |
|2013-07-29 00:00:00.0|793               |44.97  |
|2013-08-12 00:00:00.0|627               |3199.2 |
|2013-08-15 00:00:00.0|926               |15.99  |
|2013-09-04 00:00:00.0|957               |3599.76|
|2013-09-07 00:00:00.0|235               |104.97 |
|2013-09-17 00:00:00.0|792               |14.99  |
|2013-09-25 00:00:00.0|44                |239.96 |
|2013-09-27 00:00:00.0|276               |31.99  |
|2013-10-04 00:00:00.0|792               |44.97  |
+---------------------+------------------+-------+



## Windowing functions overview
- Three types:
    - Aggregate Functions (sum, min, max, avg)
    - Window Functions (lead, lag, first_value, last_value)
    - Ranking Functions (rank, dense_rank, row_number etc.)
- for all aggregations we use OVER clause
- with aggregate functions we typically use PARTITION BY
- with ranking and windowing functions we might use ORDER BY or a combination of both PARTITION BY and ORDER BY 

In [21]:
spark.sql("SELECT employee_id, department_id, salary from employees LIMIT 10").show(200, False)

+-----------+-------------+--------+
|employee_id|department_id|salary  |
+-----------+-------------+--------+
|100        |90           |24000.00|
|101        |90           |17000.00|
|102        |90           |17000.00|
|103        |60           |9000.00 |
|104        |60           |6000.00 |
|105        |60           |4800.00 |
|106        |60           |4800.00 |
|107        |60           |4200.00 |
|108        |100          |12000.00|
|109        |100          |9000.00 |
+-----------+-------------+--------+



### Windowing functions

In [22]:
spark.sql("""SELECT employee_id, department_id, salary,
count(1) OVER(PARTITION BY  department_id) as employees_in_dept,
rank() OVER(ORDER BY salary DESC) as salaryRank,
lead(employee_id) OVER(PARTITION BY department_id ORDER BY salary DESC) as lead_emp_id,
lag(employee_id) OVER(PARTITION BY department_id ORDER BY salary DESC) as lag_emp_id,
lead(salary) OVER(PARTITION BY department_id ORDER BY salary DESC) as lead_emp_sal
from employees LIMIT 10""").show(200, False)

+-----------+-------------+--------+-----------------+----------+-----------+----------+------------+
|employee_id|department_id|salary  |employees_in_dept|salaryRank|lead_emp_id|lag_emp_id|lead_emp_sal|
+-----------+-------------+--------+-----------------+----------+-----------+----------+------------+
|178        |null         |7000.00 |1                |45        |null       |null      |null        |
|200        |10           |4400.00 |1                |61        |null       |null      |null        |
|201        |20           |13000.00|2                |6         |202        |null      |6000.00     |
|202        |20           |6000.00 |2                |56        |null       |201       |null        |
|114        |30           |11000.00|6                |11        |115        |null      |3100.00     |
|115        |30           |3100.00 |6                |78        |116        |114       |2900.00     |
|116        |30           |2900.00 |6                |84        |117        |115  

### aggregate functions

In [23]:
spark.sql("SELECT employee_id, department_id, salary from employees ORDER BY department_id, salary  LIMIT 10").show(200, False)

+-----------+-------------+--------+
|employee_id|department_id|salary  |
+-----------+-------------+--------+
|178        |null         |7000.00 |
|200        |10           |4400.00 |
|202        |20           |6000.00 |
|201        |20           |13000.00|
|119        |30           |2500.00 |
|118        |30           |2600.00 |
|117        |30           |2800.00 |
|116        |30           |2900.00 |
|115        |30           |3100.00 |
|114        |30           |11000.00|
+-----------+-------------+--------+



In [25]:
# total salary per department
spark.sql("SELECT department_id, sum(salary) from employees GROUP BY department_id ORDER BY department_id LIMIT 10").show(200, False)

+-------------+-----------+
|department_id|sum(salary)|
+-------------+-----------+
|null         |7000.00    |
|10           |4400.00    |
|20           |19000.00   |
|30           |24900.00   |
|40           |6500.00    |
|50           |156400.00  |
|60           |28800.00   |
|70           |10000.00   |
|80           |304500.00  |
|90           |58000.00   |
+-------------+-----------+



In [35]:
#GET PERCENTAGE SALARY BY DEPARTMENT WITHOUT WINDOW FUNCTIONS
query = """ 
SELECT e1.department_id, e1.employee_id, round((salary/totalSalary)*100,2) as SalPercentage
from employees e1
LEFT OUTER JOIN 
(SELECT department_id, sum(salary) as totalSalary from employees GROUP BY department_id) e2
ON e1.department_id=e2.department_id
ORDER BY e1.department_id
LIMIT 10
"""

spark.sql(query).show(200, False)

+-------------+-----------+-------------+
|department_id|employee_id|SalPercentage|
+-------------+-----------+-------------+
|null         |178        |null         |
|10           |200        |100.00       |
|20           |201        |68.42        |
|20           |202        |31.58        |
|30           |114        |44.18        |
|30           |115        |12.45        |
|30           |116        |11.65        |
|30           |117        |11.24        |
|30           |118        |10.44        |
|30           |119        |10.04        |
+-------------+-----------+-------------+



In [47]:
query = """
SELECT department_id, employee_id,
round((salary / sum(salary) OVER(PARTITION BY department_id))*100,2) SalPercentage,
avg(salary) OVER(PARTITION BY department_id) dep_avg_sal,
min(salary) OVER(PARTITION BY department_id) dep_min_sal,
max(salary) OVER(PARTITION BY department_id) dep_max_sal,
count(salary) OVER(PARTITION BY department_id) dep_cnt_sal
FROM employees 
ORDER BY department_id
LIMIT 10
"""

spark.sql(query).show(200, False)

+-------------+-----------+-------------+-----------+-----------+-----------+-----------+
|department_id|employee_id|SalPercentage|dep_avg_sal|dep_min_sal|dep_max_sal|dep_cnt_sal|
+-------------+-----------+-------------+-----------+-----------+-----------+-----------+
|null         |178        |100.00       |7000.000000|7000.00    |7000.00    |1          |
|10           |200        |100.00       |4400.000000|4400.00    |4400.00    |1          |
|20           |202        |31.58        |9500.000000|6000.00    |13000.00   |2          |
|20           |201        |68.42        |9500.000000|6000.00    |13000.00   |2          |
|30           |114        |44.18        |4150.000000|2500.00    |11000.00   |6          |
|30           |115        |12.45        |4150.000000|2500.00    |11000.00   |6          |
|30           |116        |11.65        |4150.000000|2500.00    |11000.00   |6          |
|30           |117        |11.24        |4150.000000|2500.00    |11000.00   |6          |
|30       

### LEAD and LAG functions
- to get prior or following record respectively

In [63]:
query = """select dr.*,
lead(order_date) OVER(ORDER BY order_date desc) AS prior_date,
lead(revenue) OVER(ORDER BY order_date desc) AS prior_revenue
from daily_revenue as dr
order by order_date desc
limit 10 """

spark.sql(query).show(200, False)

+---------------------+--------+---------------------+-------------+
|order_date           |revenue |prior_date           |prior_revenue|
+---------------------+--------+---------------------+-------------+
|2014-07-24 00:00:00.0|50885.19|2014-07-23 00:00:00.0|38795.23     |
|2014-07-23 00:00:00.0|38795.23|2014-07-22 00:00:00.0|36717.24     |
|2014-07-22 00:00:00.0|36717.24|2014-07-21 00:00:00.0|51427.7      |
|2014-07-21 00:00:00.0|51427.7 |2014-07-20 00:00:00.0|60047.45     |
|2014-07-20 00:00:00.0|60047.45|2014-07-19 00:00:00.0|38420.99     |
|2014-07-19 00:00:00.0|38420.99|2014-07-18 00:00:00.0|43856.6      |
|2014-07-18 00:00:00.0|43856.6 |2014-07-17 00:00:00.0|36384.77     |
|2014-07-17 00:00:00.0|36384.77|2014-07-16 00:00:00.0|43011.92     |
|2014-07-16 00:00:00.0|43011.92|2014-07-15 00:00:00.0|53480.23     |
|2014-07-15 00:00:00.0|53480.23|2014-07-14 00:00:00.0|29937.52     |
+---------------------+--------+---------------------+-------------+



In [65]:
#last value will be null in case of lead and first value for lag 
#by default it leads ore lag 1 position (1 record next or 1 record prev)
# since here in over we have ordered in desc that means first date will have null value

query = """select dr.*,
lead(order_date) OVER(ORDER BY order_date desc) AS prior_date,
lead(revenue) OVER(ORDER BY order_date desc) AS prior_revenue
from daily_revenue as dr
order by order_date
limit 10 """

spark.sql(query).show(200, False)

+---------------------+--------+---------------------+-------------+
|order_date           |revenue |prior_date           |prior_revenue|
+---------------------+--------+---------------------+-------------+
|2013-07-25 00:00:00.0|31547.23|null                 |null         |
|2013-07-26 00:00:00.0|54713.23|2013-07-25 00:00:00.0|31547.23     |
|2013-07-27 00:00:00.0|48411.48|2013-07-26 00:00:00.0|54713.23     |
|2013-07-28 00:00:00.0|35672.03|2013-07-27 00:00:00.0|48411.48     |
|2013-07-29 00:00:00.0|54579.7 |2013-07-28 00:00:00.0|35672.03     |
|2013-07-30 00:00:00.0|49329.29|2013-07-29 00:00:00.0|54579.7      |
|2013-07-31 00:00:00.0|59212.49|2013-07-30 00:00:00.0|49329.29     |
|2013-08-01 00:00:00.0|49160.08|2013-07-31 00:00:00.0|59212.49     |
|2013-08-02 00:00:00.0|50688.58|2013-08-01 00:00:00.0|49160.08     |
|2013-08-03 00:00:00.0|43416.74|2013-08-02 00:00:00.0|50688.58     |
+---------------------+--------+---------------------+-------------+



In [66]:
# lead or lag by sevaral steps

query = """select dr.*,
lead(order_date, 7) OVER(ORDER BY order_date desc) AS prior_date,
lead(revenue, 7) OVER(ORDER BY order_date desc) AS prior_revenue
from daily_revenue as dr
order by order_date
limit 10 """

spark.sql(query).show(200, False)

+---------------------+--------+---------------------+-------------+
|order_date           |revenue |prior_date           |prior_revenue|
+---------------------+--------+---------------------+-------------+
|2013-07-25 00:00:00.0|31547.23|null                 |null         |
|2013-07-26 00:00:00.0|54713.23|null                 |null         |
|2013-07-27 00:00:00.0|48411.48|null                 |null         |
|2013-07-28 00:00:00.0|35672.03|null                 |null         |
|2013-07-29 00:00:00.0|54579.7 |null                 |null         |
|2013-07-30 00:00:00.0|49329.29|null                 |null         |
|2013-07-31 00:00:00.0|59212.49|null                 |null         |
|2013-08-01 00:00:00.0|49160.08|2013-07-25 00:00:00.0|31547.23     |
|2013-08-02 00:00:00.0|50688.58|2013-07-26 00:00:00.0|54713.23     |
|2013-08-03 00:00:00.0|43416.74|2013-07-27 00:00:00.0|48411.48     |
+---------------------+--------+---------------------+-------------+



In [67]:
# replace the null values which we get at start or end with something else (pass as third arg)

query = """select dr.*,
lead(order_date, 7, 'NA') OVER(ORDER BY order_date desc) AS prior_date,
lead(revenue, 7, 0) OVER(ORDER BY order_date desc) AS prior_revenue
from daily_revenue as dr
order by order_date
limit 10 """

spark.sql(query).show(200, False)

+---------------------+--------+---------------------+-------------+
|order_date           |revenue |prior_date           |prior_revenue|
+---------------------+--------+---------------------+-------------+
|2013-07-25 00:00:00.0|31547.23|NA                   |0.0          |
|2013-07-26 00:00:00.0|54713.23|NA                   |0.0          |
|2013-07-27 00:00:00.0|48411.48|NA                   |0.0          |
|2013-07-28 00:00:00.0|35672.03|NA                   |0.0          |
|2013-07-29 00:00:00.0|54579.7 |NA                   |0.0          |
|2013-07-30 00:00:00.0|49329.29|NA                   |0.0          |
|2013-07-31 00:00:00.0|59212.49|NA                   |0.0          |
|2013-08-01 00:00:00.0|49160.08|2013-07-25 00:00:00.0|31547.23     |
|2013-08-02 00:00:00.0|50688.58|2013-07-26 00:00:00.0|54713.23     |
|2013-08-03 00:00:00.0|43416.74|2013-07-27 00:00:00.0|48411.48     |
+---------------------+--------+---------------------+-------------+



### use lead lag with partition by

In [69]:
query = """select dr.*,
lead(order_item_prod_id) OVER(PARTITION BY order_date ORDER BY revenue desc) AS next_prod_id,
lead(revenue) OVER(PARTITION BY order_date ORDER BY revenue desc) AS next_revenue
from daily_product_revenue as dr
order by order_date, revenue desc
limit 10 """

spark.sql(query).show(200, False)

#last value in each partion will be null
#can be resolved the same way as we did on top, by passing 3 args

+---------------------+------------------+-------+------------+------------+
|order_date           |order_item_prod_id|revenue|next_prod_id|next_revenue|
+---------------------+------------------+-------+------------+------------+
|2013-07-25 00:00:00.0|1004              |5599.72|191         |5099.49     |
|2013-07-25 00:00:00.0|191               |5099.49|957         |4499.7      |
|2013-07-25 00:00:00.0|957               |4499.7 |365         |3359.44     |
|2013-07-25 00:00:00.0|365               |3359.44|1073        |2999.85     |
|2013-07-25 00:00:00.0|1073              |2999.85|1014        |2798.88     |
|2013-07-25 00:00:00.0|1014              |2798.88|403         |1949.85     |
|2013-07-25 00:00:00.0|403               |1949.85|502         |1650.0      |
|2013-07-25 00:00:00.0|502               |1650.0 |627         |1079.73     |
|2013-07-25 00:00:00.0|627               |1079.73|226         |599.99      |
|2013-07-25 00:00:00.0|226               |599.99 |24          |319.96      |

### getting first and last values

In [71]:
query = """select dr.*,
first_value(order_item_prod_id) OVER(PARTITION BY order_date ORDER BY revenue desc) AS first_prod_id,
first_value(revenue) OVER(PARTITION BY order_date ORDER BY revenue desc) AS first_revenue
from daily_product_revenue as dr
order by order_date, revenue desc
limit 100 """

spark.sql(query).show(200, False)

+---------------------+------------------+--------+-------------+-------------+
|order_date           |order_item_prod_id|revenue |first_prod_id|first_revenue|
+---------------------+------------------+--------+-------------+-------------+
|2013-07-25 00:00:00.0|1004              |5599.72 |1004         |5599.72      |
|2013-07-25 00:00:00.0|191               |5099.49 |1004         |5599.72      |
|2013-07-25 00:00:00.0|957               |4499.7  |1004         |5599.72      |
|2013-07-25 00:00:00.0|365               |3359.44 |1004         |5599.72      |
|2013-07-25 00:00:00.0|1073              |2999.85 |1004         |5599.72      |
|2013-07-25 00:00:00.0|1014              |2798.88 |1004         |5599.72      |
|2013-07-25 00:00:00.0|403               |1949.85 |1004         |5599.72      |
|2013-07-25 00:00:00.0|502               |1650.0  |1004         |5599.72      |
|2013-07-25 00:00:00.0|627               |1079.73 |1004         |5599.72      |
|2013-07-25 00:00:00.0|226              

#### last_value is little different
- with query we need to mention 'ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING'
- because by defautt it uses 'ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW'
- other it will return the record itself
- since at the time this record is accessed it is the last record