# Windowing Functions

As part of this section we will primarily talk about Windowing Functions. These are also known as Analytic Functions in Databases like Oracle.

In [1]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/wkYC9crqHH8?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* Prepare HR Database
* Overview of Windowing Functions
* Aggregations using Windowing Functions
* Getting LEAD and LAG values
* Getting first and last values
* Ranking using Windowing Functions
* Understanding order of execution of SQL
* Overview of Nested Sub Queries
* Filtering - Window Function Results

In [None]:
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate

In [None]:
%%sql

SET spark.sql.shuffle.partitions=2

## Prepare HR Database

Let us prepare HR database with **EMPLOYEES** Table. We will be using this for some of the examples as well as exercises related to Window Functions.

In [2]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/ooxBTw_UU3U?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* Create Database **itversity_hr** (replace itversity with your OS User Name)
* Create table **employees** in **itversity_hr** database.
* Load data into the table.

First let us start with creating the database.

In [None]:
%%sql

DROP DATABASE itversity_hr CASCADE

In [None]:
%%sql

CREATE DATABASE itversity_hr

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT current_database()

As the database is created, let us go ahead and add table to it.

In [None]:
%%sql

CREATE TABLE employees (
  employee_id     int,
  first_name      varchar(20),
  last_name       varchar(25),
  email           varchar(25),
  phone_number    varchar(20),
  hire_date       date,
  job_id          varchar(10),
  salary          decimal(8,2),
  commission_pct  decimal(2,2),
  manager_id      int,
  department_id   int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

Let us load the data and validate the table.

In [None]:
%%sql

LOAD DATA LOCAL INPATH '/data/hr_db/employees' 
INTO TABLE employees

In [None]:
%%sql

SELECT * FROM employees LIMIT 10

In [None]:
%%sql

SELECT employee_id, department_id, salary FROM employees LIMIT 10

In [None]:
%%sql

SELECT count(1) FROM employees

## Overview of Windowing Functions

Let us get an overview of Analytics or Windowing Functions in Spark SQL.

In [3]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/psc34WIg3ew?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* Aggregate Functions (`sum`, `min`, `max`, `avg`)
* Window Functions (`lead`, `lag`, `first_value`, `last_value`)
* Rank Functions (`rank`, `dense_rank`, `row_number` etc)
* For all the functions we use `OVER` clause.
* For aggregate functions we typically use `PARTITION BY`
* For global ranking and windowing functions we can use `ORDER BY sorting_column` and for ranking and windowing with in a partition or group we can use `PARTITION BY partition_column ORDER BY sorting_column`.

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT employee_id, department_id, salary FROM employees LIMIT 10

In [None]:
%%sql

SELECT employee_id, department_id, salary,
    count(1) OVER (PARTITION BY department_id) AS employee_count,
    rank() OVER (ORDER BY salary DESC) AS rnk,
    lead(employee_id) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_id,
    lead(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_sal
FROM employees
ORDER BY employee_id

## Aggregations using Windowing Functions

Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.

In [4]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/peDMzBredoU?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* For simple aggregations where we have to get grouping key and aggregated results we can use **GROUP BY**.
* If we want to get the raw data along with aggregated results, then using **GROUP BY** is not possible or overly complicated.
* Using aggregate functions with **OVER** Clause not only simplifies the process of writing query, but also better with respect to performance.
* Let us take an example of getting employee salary percentage when compared to department salary expense.

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary
LIMIT 10

> Let us write the query using `GROUP BY` approach.

In [None]:
%%sql

SELECT department_id,
       sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       ae.department_salary_expense,
       ae.avg_salary_expense
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            avg(salary) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary

> Let us see how we can get it using Analytics/Windowing Functions. 

* We can use all standard aggregate functions such as `count`, `sum`, `min`, `max`, `avg` etc.

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       sum(e.salary) 
         OVER (PARTITION BY e.department_id)
         AS department_salary_expense
FROM employees e
ORDER BY e.department_id

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id

### Create tables to get daily revenue

Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.

* We have **ORDERS** and **ORDER_ITEMS** tables.
* Let us take care of computing daily revenue as well as daily product revenue.
* As we will be using same data set several times, let us create the tables to pre compute the data.
* **daily_revenue** will have the **order_date** and **revenue**, where data is aggregated using **order_date** as partition key.
* **daily_product_revenue** will have **order_date**, **order_item_product_id** and **revenue**. In this case data is aggregated using **order_date** and **order_item_product_id** as partition keys.

Let us create table to compute daily revenue.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DROP TABLE IF EXISTS daily_revenue

In [None]:
%%sql

CREATE TABLE daily_revenue
AS
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date

In [None]:
%%sql

SELECT * 
FROM daily_revenue
ORDER BY order_date
LIMIT 10

Let us create table to compute daily product revenue.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DROP TABLE IF EXISTS daily_product_revenue

In [None]:
%%sql

CREATE TABLE daily_product_revenue
AS
SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id

In [None]:
%%sql

SELECT * 
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 10

## Cumulative or Moving Aggregations

Let us understand how we can take care of cumulative or moving aggregations using Spark SQL.
* When it comes to Windowing or Analytic Functions we can also specify window using `ROWS BETWEEN` clause.
* We can leverage `ROWS BETWEEN` for cumulative aggregations or moving aggregations.
* Here is an example of cumulative sum.

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (
        PARTITION BY e.department_id
        ORDER BY e.salary DESC
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS sum_sal_expense
FROM employees e
ORDER BY e.department_id, e.salary DESC

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
    round(sum(t.revenue) OVER (
        PARTITION BY date_format(order_date, 'yyyy-MM')
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ), 2) AS cumulative_daily_revenue
FROM daily_revenue t
ORDER BY date_format(order_date, 'yyyy-MM'),
    order_date

In [None]:
spark.sql("""
SELECT t.*,
    round(sum(t.revenue) OVER (
        PARTITION BY date_format(order_date, 'yyyy-MM')
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ), 2) AS cumulative_daily_revenue
FROM daily_revenue t
ORDER BY date_format(order_date, 'yyyy-MM'), 
    order_date
""").
    show(100, false)

* Here is an example for moving sum.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
    round(sum(t.revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
    ), 2) AS moving_3day_revenue
FROM daily_revenue t
ORDER BY order_date

In [None]:
spark.sql("""
    SELECT t.*,
        round(sum(t.revenue) OVER (
            ORDER BY order_date
            ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
        ), 2) AS moving_3day_revenue
    FROM daily_revenue t
    ORDER BY order_date
""").
    show(30, false)

In [None]:
%%sql

SELECT t.*,
    round(sum(t.revenue) OVER (
        PARTITION BY date_format(order_date, 'yyyy-MM')
        ORDER BY order_date
        ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
    ), 2) AS moving_3day_revenue
FROM daily_revenue t
ORDER BY date_format(order_date, 'yyyy-MM'),
    order_date

In [None]:
spark.sql("""
SELECT t.*,
    round(sum(t.revenue) OVER (
        PARTITION BY date_format(order_date, 'yyyy-MM')
        ORDER BY order_date
        ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
    ), 2) AS moving_3day_revenue
FROM daily_revenue t
ORDER BY date_format(order_date, 'yyyy-MM'), 
    order_date
""").
    show(100, false)

## Using LEAD or LAG

Let us understand LEAD and LAG functions to get column values from following or prior records.

In [5]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/BhWIslXAebo?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

Here is the example where we can get prior or following records based on **ORDER BY** within **OVER** Clause.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT * FROM daily_revenue
ORDER BY order_date DESC
LIMIT 10

In [None]:
%%sql

SELECT t.*,
  lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue,
  lag(order_date) OVER (ORDER BY order_date) AS lag_prior_date,
  lag(revenue) OVER (ORDER BY order_date) AS lag_prior_revenue
FROM daily_revenue AS t
ORDER BY order_date DESC
LIMIT 10

In [None]:
%%sql

SELECT t.*,
  lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue AS t
ORDER BY order_date
LIMIT 10

We can also pass number of rows as well as default values for nulls as arguments.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10

In [None]:
%%sql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10

In [None]:
%%sql

SELECT t.*,
  lead(order_date, 7, 'NA') OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7, 0) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10

Let us see how we can get prior or following records with in a group based on particular order.

Here is the example where we can get prior or following records based on **PARTITION BY** and then **ORDER BY** Clause.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DESCRIBE daily_product_revenue

In [None]:
%%sql

SELECT * FROM daily_product_revenue 
ORDER BY order_date, revenue DESC
LIMIT 10

In [None]:
%%sql

SELECT t.*,
  LEAD(order_item_product_id) OVER (
    PARTITION BY order_date 
    ORDER BY revenue DESC
  ) next_product_id,
  LEAD(revenue) OVER (
    PARTITION BY order_date 
    ORDER BY revenue DESC
  ) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

In [None]:
spark.sql("""
    SELECT t.*,
      LEAD(order_item_product_id) OVER (
        PARTITION BY order_date 
        ORDER BY revenue DESC
      ) next_product_id,
      LEAD(revenue) OVER (
        PARTITION BY order_date 
        ORDER BY revenue DESC
      ) next_revenue
    FROM daily_product_revenue t
    ORDER BY order_date, revenue DESC
""").
    show(100, false)

We can also pass number of rows as well as default values for nulls as arguments.

In [None]:
%%sql

SELECT t.*,
  LEAD(order_item_product_id) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) next_product_id,
  LEAD(revenue, 1, 0) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

In [None]:
spark.sql("""
    SELECT t.*,
      LEAD(order_item_product_id) OVER (
        PARTITION BY order_date ORDER BY revenue DESC
      ) next_product_id,
      LEAD(revenue, 1, 0) OVER (
        PARTITION BY order_date ORDER BY revenue DESC
      ) next_revenue
    FROM daily_product_revenue t
    ORDER BY order_date, revenue DESC
    LIMIT 100
""").
    show(100, false)

## Getting first and last values

Let us see how we can get first and last value based on the criteria.

In [6]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/0kJ-ZdOJnKQ?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

Here is the example of using first_value.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
  first_value(order_item_product_id) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) first_product_id,
  first_value(revenue) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) first_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

In [None]:
spark.sql("""
    SELECT t.*,
      first_value(order_item_product_id) OVER (
        PARTITION BY order_date ORDER BY revenue DESC
      ) first_product_id,
      first_value(revenue) OVER (
        PARTITION BY order_date ORDER BY revenue DESC
      ) first_revenue
    FROM daily_product_revenue t
    ORDER BY order_date, revenue DESC
""").
    show(100, false)

Let us see an example with last_value. While using last_value we need to specify **ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING**.
* By default it uses `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`.
* The last value with in `UNBOUNDED PRECEDING AND CURRENT ROW` will be current record.
* To get the right value, we have to change the windowing clause to `ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING`.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
    last_value(order_item_product_id) OVER (
        PARTITION BY order_date ORDER BY revenue
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) last_product_id,
    last_value(revenue) OVER (
        PARTITION BY order_date ORDER BY revenue
    ) last_revenue
FROM daily_product_revenue AS t
ORDER BY order_date, revenue DESC
LIMIT 100

In [None]:
%%sql

SELECT t.*,
    last_value(order_item_product_id) OVER (
        PARTITION BY order_date ORDER BY revenue
        ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
    ) last_product_id,
    last_value(revenue) OVER (
        PARTITION BY order_date ORDER BY revenue
        ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
    ) last_revenue
FROM daily_product_revenue AS t
ORDER BY order_date, revenue DESC
LIMIT 100

In [None]:
spark.sql("""
    SELECT t.*,
      last_value(order_item_product_id) OVER (
        PARTITION BY order_date ORDER BY revenue
        ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
      ) last_product_id,
      last_value(revenue) OVER (
        PARTITION BY order_date ORDER BY revenue
        ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
      )  last_revenue
    FROM daily_product_revenue AS t
    ORDER BY order_date, revenue DESC
""").
    show(100, false)

## Ranking using Windowing Functions

Let us see how we can assign ranks using different **rank** functions.

In [7]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/NsqHuZbUrAw?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* If we have to assign ranks globally, we just need to specify **ORDER BY**
* If we have to assign ranks with in a key then we need to specify **PARTITION BY** and then **ORDER BY**.
* By default **ORDER BY** will sort the data in ascending order. We can change the order by passing **DESC** after order by.
* We have 3 main functions to assign ranks - `rank`, `dense_rank` and `row_number`. We will see the difference between the 3 in a moment.

Here is an example to assign sparse ranks using the table daily_product_revenue with in each day based on revenue. We can use `rank` function to assign sparse ranks.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT t.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

```{note}
Here is another example to assign ranks using employees data set with in each department. We can also use other functions such as `dense_rank` and `row_number` to assign ranks.
```

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT
  employee_id,
  department_id,
  salary,
  rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rnk,
  dense_rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) drnk,
  row_number() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rn
FROM employees
ORDER BY department_id, salary DESC

In [None]:
spark.sql("""
SELECT
  employee_id,
  department_id,
  salary,
  rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rnk,
  dense_rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) drnk,
  row_number() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC, employee_id
  ) rn
FROM employees
ORDER BY department_id, salary DESC
""").
    show(100, false)

In [None]:
%%sql

SELECT * FROM employees ORDER BY salary LIMIT 10

```{note}
Here is the example for global rank with out `PARTITION BY` clause.
```

In [None]:
%%sql

SELECT employee_id, salary,
    dense_rank() OVER (ORDER BY salary DESC) AS drnk
FROM employees

Let us understand the difference between **rank**, **dense_rank** and **row_number**.

* We can use either of the functions to generate ranks when there are no duplicates in the column based on which ranks are assigned.
* When the column based on which ranks are assigned have duplicates then row_number should not be used as it generate unique number for each record with in the partition. For those duplicate values, the row number need not be same across multiple runs.
* **rank** will skip the ranks in between if multiple people get the same rank while **dense_rank** will not skip the ranks based up on the number of times the value is repeated.

## Order of execution of SQL

Let us review the order of execution of SQL. First let us review the order of writing the query.

In [8]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/UNCJNFMyr6c?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

1. **SELECT**
2. **FROM**
3. **JOIN** or **OUTER JOIN** with **ON**
4. **WHERE**
5. **GROUP BY** and optionally **HAVING**
6. **ORDER BY**

Let us come up with a query which will compute daily revenue using COMPLETE or CLOSED orders and also sorted by order_date.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT o.order_date,
  round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
ORDER BY o.order_date
LIMIT 10

In [None]:
%%sql

SELECT o.order_date,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
    HAVING round(sum(oi.order_item_subtotal), 2) >= 50000
ORDER BY order_date
LIMIT 10

However order of execution is typically as follows.

1. **FROM**
2. **JOIN** or **OUTER JOIN** with **ON**
3. **WHERE**
4. **GROUP BY** and optionally **HAVING**
5. **SELECT**
6. **ORDER BY**

As **SELECT** is executed before **ORDER BY** clause, we will not be able to refer the aliases defined in **SELECT** caluse in other clauses except for **ORDER BY** in most of the traditional databases. However, in Spark we can specify the aliases defined in **SELECT** in **HAVING** as well as **ORDER BY**.

```{error}
This will fail as revenue which is an alias defined in **SELECT** cannot be used in **WHERE**.
```

In [None]:
%%sql

SELECT o.order_date,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND revenue >= 50000
GROUP BY o.order_date
ORDER BY order_date
LIMIT 10

```{note}
This will work as revenue which is an alias defined in **SELECT** can be used in **HAVING** as well as **ORDER BY**.
```

In [None]:
%%sql

SELECT o.order_date,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
    HAVING revenue >= 50000
ORDER BY order_date,
    revenue DESC
LIMIT 10

## Overview of Sub Queries

Let us recap about Sub Queries.

In [9]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/6AOlttTRG48?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* We typically have Sub Queries in **FROM** Clause.
* We need not provide alias to the Sub Queries in **FROM** Clause in Spark SQL. In earlier versions, you might have to provide alias for the Sub Query.
* We use Sub Queries quite often over queries using Analytics/Windowing Functions

In [None]:
%%sql

SELECT * FROM (SELECT current_date)

In [None]:
%%sql

SELECT * FROM (SELECT current_date) AS q

Let us see few more examples with respect to Sub Queries.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT * FROM (
    SELECT order_date, count(1) AS order_count
    FROM orders
    GROUP BY order_date
) q
LIMIT 10

```{note}
Here is an example of how we can filter based up on the derived columns using sub query. However, this can be achieved with direct query as well using `HAVING`. 
```

In [None]:
%%sql

SELECT * FROM (
    SELECT order_date, count(1) AS order_count
    FROM orders
    GROUP BY order_date
) q
WHERE q.order_count > 10

In [None]:
%%sql

SELECT order_date, count(1) AS order_count
FROM orders
GROUP BY order_date
    HAVING count(1) > 10

## Filtering - Window Function Results

Let us understand how to filter on top of results of Window Functions.

In [10]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/8ncS4CCdVA0?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

* We can use **Window Functions** only in **SELECT** Clause.
* If we have to filter based on Window Function results, then we need to use Sub Queries.
* Once the query with window functions is defined as sub query, we can apply filter using aliases provided for the Window Functions.

Here is the example where we can filter data based on Window Functions.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (
      PARTITION BY order_date
      ORDER BY revenue DESC
    ) AS drnk
  FROM daily_product_revenue t
) q
WHERE q.drnk <= 5
ORDER BY q.order_date, q.revenue DESC
LIMIT 100

In [None]:
spark.sql("""SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (
      PARTITION BY order_date
      ORDER BY revenue DESC
    ) AS drnk
  FROM daily_product_revenue t
) q
WHERE q.drnk <= 5
ORDER BY q.order_date, q.revenue DESC
""").
    show(100, false)

### Ranking and Filtering - Recap

Let us recap the procedure to get top 5 orders by revenue for each day.

* We have our original data in **orders** and **order_items**
* We can pre-compute the data and store in a table or create a view with the logic to generate **daily product revenue**
* Then, we have to use the view or table or even sub query to compute rank
* We can use the query with ranks as sub query to filter so that we can get top 5 products by revenue.
* Let us see the overall process in action.

Let us come up with the query to compute daily product revenue.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DESCRIBE orders

In [None]:
%%sql

DESCRIBE order_items

In [None]:
%%sql

SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
ORDER BY o.order_date, revenue DESC
LIMIT 100

Let us compute the rank for each product with in each date using revenue as criteria.

In [None]:
%%sql

SELECT q.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q
ORDER BY order_date, revenue DESC
LIMIT 35

Now let us see how we can filter the data.

In [None]:
%%sql

SELECT * FROM (SELECT q.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q) q1
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35

In [None]:
spark.sql("DESCRIBE daily_product_revenue").show(false)

In [None]:
%%sql

SELECT * FROM (SELECT dpr.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM daily_product_revenue AS dpr)
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35

## Exercises - Windowing Functions

Let us take care of the exercises related to windowing or analytics functions. We will be using HR database for the same.

* Get all the employees who is making more than average salary with in each department.
* Get cumulative salary for one of the department along with department name.
* Get top 3 paid employees with in each department by salary (use dense_rank)
* Get top 3 products sold in the month of 2014 January by revenue.
* Get top 3 products in each category sold in the month of 2014 January by revenue.

### Exercise 1

Get all the employees who is making more than average salary with in each department.

* Use HR database employees and department tables for this problem.
* Compute average salary expense for each department and get those employee details who are making more salary than average salary.
* Make sure average salary expense per department is rounded off to 2 decimals.
* Output should contain employee_id, department_name, salary and avg_salary_expense (derived field).
* Data should be sorted in ascending order by department_id and descending order by salary.

|employee_id|department_name|salary|avg_salary_expense|
|---|---|---|---|
|201|Marketing|13000.00|9500.00|
|114|Purchasing|11000.00|4150.00|
|121|Shipping|8200.00|3475.56|
|120|Shipping|8000.00|3475.56|
|122|Shipping|7900.00|3475.56|
|123|Shipping|6500.00|3475.56|
|124|Shipping|5800.00|3475.56|
|184|Shipping|4200.00|3475.56|
|185|Shipping|4100.00|3475.56|
|192|Shipping|4000.00|3475.56|
|193|Shipping|3900.00|3475.56|
|188|Shipping|3800.00|3475.56|
|137|Shipping|3600.00|3475.56|
|189|Shipping|3600.00|3475.56|
|141|Shipping|3500.00|3475.56|
|103|IT|9000.00|5760.00|
|104|IT|6000.00|5760.00|
|145|Sales|14000.00|8955.88|
|146|Sales|13500.00|8955.88|
|147|Sales|12000.00|8955.88|
|168|Sales|11500.00|8955.88|
|148|Sales|11000.00|8955.88|
|174|Sales|11000.00|8955.88|
|149|Sales|10500.00|8955.88|
|162|Sales|10500.00|8955.88|
|156|Sales|10000.00|8955.88|
|150|Sales|10000.00|8955.88|
|169|Sales|10000.00|8955.88|
|170|Sales|9600.00|8955.88|
|163|Sales|9500.00|8955.88|
|151|Sales|9500.00|8955.88|
|157|Sales|9500.00|8955.88|
|158|Sales|9000.00|8955.88|
|152|Sales|9000.00|8955.88|
|100|Executive|24000.00|19333.33|
|108|Finance|12000.00|8600.00|
|109|Finance|9000.00|8600.00|
|205|Accounting|12000.00|10150.00|

### Exercise 2

Get cumulative salary with in each department for Finance and IT department along with department name.

* Use HR database employees and department tables for this problem.
* Compute cumulative salary expense for **Finance** as well as **IT** departments with in respective departments.
* Make sure cumulative salary expense per department is rounded off to 2 decimals.
* Output should contain employee_id, department_name, salary and cum_salary_expense (derived field).
* Data should be sorted in ascending order by department_name and then salary.

|employee_id|department_name|salary|cum_salary_expense|
|---|---|---|---|
|113|Finance|6900.00|6900.00|
|111|Finance|7700.00|14600.00|
|112|Finance|7800.00|22400.00|
|110|Finance|8200.00|30600.00|
|109|Finance|9000.00|39600.00|
|108|Finance|12000.00|51600.00|
|107|IT|4200.00|4200.00|
|106|IT|4800.00|9000.00|
|105|IT|4800.00|13800.00|
|104|IT|6000.00|19800.00|
|103|IT|9000.00|28800.00|

### Exercise 3

Get top 3 paid employees with in each department by salary (use dense_rank)

* Use HR database employees and department tables for this problem.
* Highest paid employee should be ranked first.
* Output should contain employee_id, department_id, department_name, salary and employee_rank (derived field).
* Data should be sorted in ascending order by department_id in ascending order and then salary in descending order.

|employee_id|department_id|department_name|salary|employee_rank|
|---|---|---|---|---|
|200|10|Administration|4400.00|1|
|201|20|Marketing|13000.00|1|
|202|20|Marketing|6000.00|2|
|114|30|Purchasing|11000.00|1|
|115|30|Purchasing|3100.00|2|
|116|30|Purchasing|2900.00|3|
|203|40|Human Resources|6500.00|1|
|121|50|Shipping|8200.00|1|
|120|50|Shipping|8000.00|2|
|122|50|Shipping|7900.00|3|
|103|60|IT|9000.00|1|
|104|60|IT|6000.00|2|
|105|60|IT|4800.00|3|
|106|60|IT|4800.00|3|
|204|70|Public Relations|10000.00|1|
|145|80|Sales|14000.00|1|
|146|80|Sales|13500.00|2|
|147|80|Sales|12000.00|3|
|100|90|Executive|24000.00|1|
|101|90|Executive|17000.00|2|
|102|90|Executive|17000.00|2|
|108|100|Finance|12000.00|1|
|109|100|Finance|9000.00|2|
|110|100|Finance|8200.00|3|
|205|110|Accounting|12000.00|1|
|206|110|Accounting|8300.00|2|

### Exercise 4

Get top 3 products sold in the month of 2014 January by revenue.

* Use retail database tables such as orders, order_items and products.
* Highest revenue generating product should come at top.
* Output should contain product_id, product_name, revenue, product_rank. **revenue** and **product_rank** are derived fields.
* Data should be sorted in descending order by revenue.

|product_id|product_name|revenue|product_rank|
|---|---|---|---|
|1004|Field & Stream Sportsman 16 Gun Fire Safe|250787.46|1|
|365|Perfect Fitness Perfect Rip Deck|151474.75|2|
|957|Diamondback Women's Serene Classic Comfort Bi|148190.12|3|

### Exercise 5

Get top 3 products sold in the month of 2014 January under selected categories by revenue. The categories are **Cardio Equipment** and **Strength Training**.

* Use retail database tables such as orders, order_items, products as well as categories.
* Highest revenue generating product should come at top.
* Output should contain category_id, category_name, product_id, product_name, revenue, product_rank. revenue and product_rank are derived fields.
* Data should be sorted in ascending order by category_id and descending order by revenue.

|category_id|category_name|product_id|product_name|revenue|product_rank|
|---|---|---|---|---|---|
|9|Cardio Equipment|191|Nike Men's Free 5.0+ Running Shoe|132286.77|1|
|9|Cardio Equipment|172|Nike Women's Tempo Shorts|870.00|2|
|10|Strength Training|208|SOLE E35 Elliptical|1999.99|1|
|10|Strength Training|203|GoPro HERO3+ Black Edition Camera|1199.97|2|
|10|Strength Training|216|Yakima DoubleDown Ace Hitch Mount 4-Bike Rack|189.00|3|