In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.\
    builder.\
    enableHiveSupport().\
    appName("Spark SQL -Windowing Functions").\
    master("yarn").\
    getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
%colors LightBG

In [4]:
%load_ext sparksql_magic

In [5]:
%load_ext sql

In [5]:
%%sparksql

DROP DATABASE if exists kevin_hr CASCADE

In [6]:
%%sparksql

CREATE DATABASE if not exists kevin_hr

In [6]:
%%sparksql

USE kevin_hr

In [7]:
%%sparksql

select current_database()

                                                                                

0
current_database()
kevin_hr


In [9]:
%%sparksql
CREATE TABLE employees (
  employee_id     int,
  first_name      varchar(20),
  last_name       varchar(25),
  email           varchar(25),
  phone_number    varchar(20),
  hire_date       date,
  job_id          varchar(10),
  salary          decimal(8,2),
  commission_pct  decimal(2,2),
  manager_id      int,
  department_id   int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

In [10]:
%%sparksql

LOAD DATA LOCAL INPATH '/home/hadoop/data/data/hr_db/employees' 
INTO TABLE employees

In [11]:
%%sparksql

SELECT * FROM employees LIMIT 5

0,1,2,3,4,5,6,7,8,9,10
employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,commission_pct,manager_id,department_id
100,Steven,King,SKING,515.123.4567,1987-06-17,AD_PRES,24000.00,,,90
101,Neena,Kochhar,NKOCHHAR,515.123.4568,1989-09-21,AD_VP,17000.00,,100,90
102,Lex,De Haan,LDEHAAN,515.123.4569,1993-01-13,AD_VP,17000.00,,100,90
103,Alexander,Hunold,AHUNOLD,590.423.4567,1990-01-03,IT_PROG,9000.00,,102,60
104,Bruce,Ernst,BERNST,590.423.4568,1991-05-21,IT_PROG,6000.00,,103,60


In [12]:
%%sparksql

SELECT employee_id, department_id, salary FROM employees LIMIT 5

0,1,2
employee_id,department_id,salary
100,90,24000.00
101,90,17000.00
102,90,17000.00
103,60,9000.00
104,60,6000.00


In [13]:
%%sparksql

SELECT count(1) FROM employees

0
count(1)
107


### Overview of Windowing Functions

These are also known as Analytic Functions in Databases like Oracle.

- Aggregate Functions **(sum, min, max, avg)**
- Window Functions **(lead, lag, first_value, last_value)**
- Rank Functions **(rank, dense_rank, row_number etc)**
- For all the functions we use **OVER** clause.
- For aggregate functions we typically use **PARTITION BY**
- For ranking and windowing functions we might use **ORDER BY** sorting_column or **PARTITION BY** partition_column **ORDER BY** sorting_column.

With LAG(), you must specify an ORDER BY in the OVER clause, with a column or a list of columns by which the rows should be sorted.


In [14]:
%%sparksql

SELECT employee_id, department_id, salary FROM employees LIMIT 5

0,1,2
employee_id,department_id,salary
100,90,24000.00
101,90,17000.00
102,90,17000.00
103,60,9000.00
104,60,6000.00


In [16]:
%%sparksql

SELECT employee_id, department_id, salary,
    count(1) OVER (PARTITION BY department_id) AS employee_count,
    rank() OVER (ORDER BY salary DESC) AS rnk,
    lead(employee_id) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_id,
    lead(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_sal
FROM employees
ORDER BY employee_id
limit 10

0,1,2,3,4,5,6
employee_id,department_id,salary,employee_count,rnk,lead_emp_id,lead_emp_sal
100,90,24000.00,3,1,101,17000.00
101,90,17000.00,3,2,102,17000.00
102,90,17000.00,3,2,,
103,60,9000.00,5,24,104,6000.00
104,60,6000.00,5,56,105,4800.00
105,60,4800.00,5,59,106,4800.00
106,60,4800.00,5,59,107,4200.00
107,60,4200.00,5,62,,
108,100,12000.00,6,7,109,9000.00


### Aggregations using Windowing Functions
Aggregations with in a partition or group using Windowing/Analytics Functions.

- For simple aggregations where we have to get grouping key and aggregated results we can use **GROUP BY**.
- If we want to get the raw data along with aggregated results, then using *GROUP BY* is not possible or overly complicated.
- Using aggregate functions with **OVER** Clause not only simplifies the process of writing query, but also better with respect to performance.
- Let us take an example of getting employee salary percentage when compared to department salary expense.


>getting individual employee salary

In [17]:
%%sparksql

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary
LIMIT 10

0,1,2
employee_id,department_id,salary
178,,7000.00
200,10,4400.00
202,20,6000.00
201,20,13000.00
119,30,2500.00
118,30,2600.00
117,30,2800.00
116,30,2900.00
115,30,3100.00


In [22]:
%%sparksql

SELECT count(employee_id),department_id,
       sum(salary) AS department_salary_expense,
        round(avg(salary),2) AS department_avg_salary
FROM employees
GROUP BY department_id
ORDER BY department_id

0,1,2,3
count(employee_id),department_id,department_salary_expense,department_avg_salary
1,,7000.00,7000.00
1,10,4400.00,4400.00
2,20,19000.00,9500.00
6,30,24900.00,4150.00
1,40,6500.00,6500.00
45,50,156400.00,3475.56
5,60,28800.00,5760.00
1,70,10000.00,10000.00
34,80,304500.00,8955.88


Below **self-join** to get individual salaries along with departmentwise expense

In [10]:
%%time
%%sparksql

SELECT e.employee_id, e.department_id, e.salary,
       ae.department_salary_expense,
       ae.avg_salary_expense
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            round(avg(salary),2) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary
limit 5

CPU times: user 0 ns, sys: 19.8 ms, total: 19.8 ms
Wall time: 2.03 s


0,1,2,3,4
employee_id,department_id,salary,department_salary_expense,avg_salary_expense
200,10,4400.00,4400.00,4400.00
202,20,6000.00,19000.00,9500.00
201,20,13000.00,19000.00,9500.00
119,30,2500.00,24900.00,4150.00
118,30,2600.00,24900.00,4150.00


>Let us see how we can get above using Analytics/Windowing Functions.
>We can use all standard aggregate functions such as **count, sum, min, max, avg etc.**


In [29]:
%%time
%%sparksql


SELECT e.employee_id, e.department_id, e.salary,
       sum(e.salary) OVER (PARTITION BY e.department_id) AS department_salary_expense,
       round(avg(e.salary)  OVER(PARTITION BY e.department_id),2) as av_salary_de
        
FROM employees e
ORDER BY e.department_id
limit 5

CPU times: user 3.68 ms, sys: 15.2 ms, total: 18.9 ms
Wall time: 1.27 s


0,1,2,3,4
employee_id,department_id,salary,department_salary_expense,av_salary_de
178,,7000.00,7000.00,7000.00
200,10,4400.00,4400.00,4400.00
201,20,13000.00,19000.00,9500.00
202,20,6000.00,19000.00,9500.00
114,30,11000.00,24900.00,4150.00


In [31]:
%%sparksql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    round(avg(e.salary) OVER (PARTITION BY e.department_id),2) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id

only showing top 20 row(s)


0,1,2,3,4,5,6,7
employee_id,department_id,salary,sum_sal_expense,avg_sal_expense,min_sal_expense,max_sal_expense,cnt_sal_expense
178,,7000.00,7000.00,7000.00,7000.00,7000.00,1
200,10,4400.00,4400.00,4400.00,4400.00,4400.00,1
201,20,13000.00,19000.00,9500.00,6000.00,13000.00,2
202,20,6000.00,19000.00,9500.00,6000.00,13000.00,2
114,30,11000.00,24900.00,4150.00,2500.00,11000.00,6
119,30,2500.00,24900.00,4150.00,2500.00,11000.00,6
115,30,3100.00,24900.00,4150.00,2500.00,11000.00,6
116,30,2900.00,24900.00,4150.00,2500.00,11000.00,6
118,30,2600.00,24900.00,4150.00,2500.00,11000.00,6


### Create tables to get daily revenue

- We have ORDERS and ORDER_ITEMS tables.
- Let us take care of computing daily revenue as well as daily product revenue.
- As we will be using same data set several times, let us create the tables to pre compute the data.
- daily_revenue will have the order_date and revenue, where data is aggregated using order_date as partition key.
- daily_product_revenue will have order_date, order_item_product_id and revenue. In this case data is aggregated using order_date and order_item_product_id as partition keys.


In [5]:
%%sparksql

USE kevin_retail

In [6]:
%%sparksql
show tables

0,1,2
namespace,tableName,isTemporary
kevin_retail,daily_product_revenue,False
kevin_retail,daily_revenue,False
kevin_retail,dual,False
kevin_retail,order_items,False
kevin_retail,order_items_stage,False
kevin_retail,orders,False
kevin_retail,orders_part,False
kevin_retail,orders_single_column,False
kevin_retail,sales,False


In [35]:
%%sparksql

DROP TABLE IF EXISTS daily_revenue

In [40]:
##%%sparksql
##describe order_items

Creating table to compute daily revenue

In [36]:
%%sparksql

CREATE TABLE daily_revenue
AS
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date

                                                                                

In [7]:
%%sparksql

SELECT * 
FROM daily_revenue
ORDER BY order_date
LIMIT 5

0,1
order_date,revenue
2013-07-25 00:00:00.0,31547.23
2013-07-26 00:00:00.0,54713.23
2013-07-27 00:00:00.0,48411.48
2013-07-28 00:00:00.0,35672.03
2013-07-29 00:00:00.0,54579.7


 create table to compute daily product revenue

In [41]:
%%sparksql

DROP TABLE IF EXISTS daily_product_revenue

In [42]:
%%sparksql

CREATE TABLE daily_product_revenue
AS
SELECT o.order_date, oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
    
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id

                                                                                

In [43]:
%%sparksql

SELECT * 
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 5

0,1,2
order_date,order_item_product_id,revenue
2013-07-25 00:00:00.0,24,319.96
2013-07-25 00:00:00.0,93,74.97
2013-07-25 00:00:00.0,134,100.0
2013-07-25 00:00:00.0,191,5099.49
2013-07-25 00:00:00.0,226,599.99


### Getting LEAD and LAG values

In [44]:
%%sparksql

SELECT * FROM daily_revenue
ORDER BY order_date DESC
LIMIT 5

0,1
order_date,revenue
2014-07-24 00:00:00.0,50885.19
2014-07-23 00:00:00.0,38795.23
2014-07-22 00:00:00.0,36717.24
2014-07-21 00:00:00.0,51427.7
2014-07-20 00:00:00.0,60047.45


In [45]:
%%sparksql

SELECT t.*,
  lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10

                                                                                

0,1,2,3
order_date,revenue,prior_date,prior_revenue
2014-07-24 00:00:00.0,50885.19,2014-07-23 00:00:00.0,38795.23
2014-07-23 00:00:00.0,38795.23,2014-07-22 00:00:00.0,36717.24
2014-07-22 00:00:00.0,36717.24,2014-07-21 00:00:00.0,51427.7
2014-07-21 00:00:00.0,51427.7,2014-07-20 00:00:00.0,60047.45
2014-07-20 00:00:00.0,60047.45,2014-07-19 00:00:00.0,38420.99
2014-07-19 00:00:00.0,38420.99,2014-07-18 00:00:00.0,43856.6
2014-07-18 00:00:00.0,43856.6,2014-07-17 00:00:00.0,36384.77
2014-07-17 00:00:00.0,36384.77,2014-07-16 00:00:00.0,43011.92
2014-07-16 00:00:00.0,43011.92,2014-07-15 00:00:00.0,53480.23


passing number of rows as well as default values for nulls as arguments.

In [46]:
%%sparksql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue

FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10

0,1,2,3
order_date,revenue,prior_date,prior_revenue
2014-07-24 00:00:00.0,50885.19,2014-07-17 00:00:00.0,36384.77
2014-07-23 00:00:00.0,38795.23,2014-07-16 00:00:00.0,43011.92
2014-07-22 00:00:00.0,36717.24,2014-07-15 00:00:00.0,53480.23
2014-07-21 00:00:00.0,51427.7,2014-07-14 00:00:00.0,29937.52
2014-07-20 00:00:00.0,60047.45,2014-07-13 00:00:00.0,40410.99
2014-07-19 00:00:00.0,38420.99,2014-07-12 00:00:00.0,38449.77
2014-07-18 00:00:00.0,43856.6,2014-07-11 00:00:00.0,29596.32
2014-07-17 00:00:00.0,36384.77,2014-07-10 00:00:00.0,47826.02
2014-07-16 00:00:00.0,43011.92,2014-07-09 00:00:00.0,36929.91


Passing 0 where there is null and **nodate** where no date

In [48]:
%%sparksql

SELECT t.*,

  lead(order_date, 7, 'nodate') OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7, 0) OVER (ORDER BY order_date DESC) AS prior_revenue

FROM daily_revenue t
ORDER BY order_date
LIMIT 10

0,1,2,3
order_date,revenue,prior_date,prior_revenue
2013-07-25 00:00:00.0,31547.23,nodate,0.0
2013-07-26 00:00:00.0,54713.23,nodate,0.0
2013-07-27 00:00:00.0,48411.48,nodate,0.0
2013-07-28 00:00:00.0,35672.03,nodate,0.0
2013-07-29 00:00:00.0,54579.7,nodate,0.0
2013-07-30 00:00:00.0,49329.29,nodate,0.0
2013-07-31 00:00:00.0,59212.49,nodate,0.0
2013-08-01 00:00:00.0,49160.08,2013-07-25 00:00:00.0,31547.23
2013-08-02 00:00:00.0,50688.58,2013-07-26 00:00:00.0,54713.23


>get prior or following records with in a group based on particular order.

>get prior or following records based on PARTITION BY and then ORDER BY Clause.

In [50]:
%%sparksql
SELECT * FROM daily_product_revenue LIMIT 5

0,1,2
order_date,order_item_product_id,revenue
2014-03-28 00:00:00.0,793,59.96
2014-04-09 00:00:00.0,191,6599.34
2014-04-10 00:00:00.0,775,9.99
2014-04-15 00:00:00.0,116,404.91
2014-05-03 00:00:00.0,172,120.0


In [9]:
%%sparksql

SELECT t.*,
  LEAD(order_item_product_id) OVER (PARTITION BY order_date ORDER BY revenue DESC) next_product_id,
  LEAD(revenue) OVER (PARTITION BY order_date ORDER BY revenue DESC) next_revenue

FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 10

0,1,2,3,4
order_date,order_item_product_id,revenue,next_product_id,next_revenue
2013-07-25 00:00:00.0,1004,5599.72,191,5099.49
2013-07-25 00:00:00.0,191,5099.49,957,4499.7
2013-07-25 00:00:00.0,957,4499.7,365,3359.44
2013-07-25 00:00:00.0,365,3359.44,1073,2999.85
2013-07-25 00:00:00.0,1073,2999.85,1014,2798.88
2013-07-25 00:00:00.0,1014,2798.88,403,1949.85
2013-07-25 00:00:00.0,403,1949.85,502,1650.0
2013-07-25 00:00:00.0,502,1650.0,627,1079.73
2013-07-25 00:00:00.0,627,1079.73,226,599.99


>passing number of rows as well as default values for nulls as arguments.

In [20]:
%%sparksql

SELECT t.*,
  LEAD(order_item_product_id) OVER (PARTITION BY order_date ORDER BY revenue DESC) next_product_id,
  LEAD(revenue, 1, 0) OVER (PARTITION BY order_date ORDER BY revenue DESC) next_revenue
FROM daily_product_revenue t
LIMIT 100

0,1,2,3,4
order_date,order_item_product_id,revenue,next_product_id,next_revenue
2013-07-25 00:00:00.0,1004,5599.72,191,5099.49
2013-07-25 00:00:00.0,191,5099.49,957,4499.7
2013-07-25 00:00:00.0,957,4499.7,365,3359.44
2013-07-25 00:00:00.0,365,3359.44,1073,2999.85
2013-07-25 00:00:00.0,1073,2999.85,1014,2798.88
2013-07-25 00:00:00.0,1014,2798.88,403,1949.85
2013-07-25 00:00:00.0,403,1949.85,502,1650.0
2013-07-25 00:00:00.0,502,1650.0,627,1079.73
2013-07-25 00:00:00.0,627,1079.73,226,599.99


In [19]:
%config SparkSql.limit

100

In [18]:
%config SparkSql.limit=100

### Getting first and last values
>get first and last value based on the criteria. We can also use min or max as well.

In [23]:
%%sparksql
select current_database()

0
current_database()
kevin_retail


In [26]:
%%sparksql
SELECT t.*,
  first_value(order_item_product_id) OVER (PARTITION BY order_date ORDER BY revenue DESC) first_product_id,
  first_value(revenue) OVER (PARTITION BY order_date ORDER BY revenue DESC) first_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

0,1,2,3,4
order_date,order_item_product_id,revenue,first_product_id,first_revenue
2013-07-25 00:00:00.0,1004,5599.72,1004,5599.72
2013-07-25 00:00:00.0,191,5099.49,1004,5599.72
2013-07-25 00:00:00.0,957,4499.7,1004,5599.72
2013-07-25 00:00:00.0,365,3359.44,1004,5599.72
2013-07-25 00:00:00.0,1073,2999.85,1004,5599.72
2013-07-25 00:00:00.0,1014,2798.88,1004,5599.72
2013-07-25 00:00:00.0,403,1949.85,1004,5599.72
2013-07-25 00:00:00.0,502,1650.0,1004,5599.72
2013-07-25 00:00:00.0,627,1079.73,1004,5599.72


>While using last_value we need to specify ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING/PRECEEDING. 

In [27]:
%%sparksql

SELECT t.*,
  last_value(order_item_product_id) OVER (PARTITION BY order_date ORDER BY revenue
    ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) last_product_id,
  last_value(revenue) OVER (PARTITION BY order_date ORDER BY revenue
    ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)  last_revenue
FROM daily_product_revenue AS t
ORDER BY order_date, revenue DESC
LIMIT 100

0,1,2,3,4
order_date,order_item_product_id,revenue,last_product_id,last_revenue
2013-07-25 00:00:00.0,1004,5599.72,1004,5599.72
2013-07-25 00:00:00.0,191,5099.49,1004,5599.72
2013-07-25 00:00:00.0,957,4499.7,1004,5599.72
2013-07-25 00:00:00.0,365,3359.44,1004,5599.72
2013-07-25 00:00:00.0,1073,2999.85,1004,5599.72
2013-07-25 00:00:00.0,1014,2798.88,1004,5599.72
2013-07-25 00:00:00.0,403,1949.85,1004,5599.72
2013-07-25 00:00:00.0,502,1650.0,1004,5599.72
2013-07-25 00:00:00.0,627,1079.73,1004,5599.72


### Ranking using Windowing Functions

- If we have to get ranks globally, we just need to specify **ORDER BY**
- If we have to get ranks with in a key then we need to specify **PARTITION BY** and then **ORDER BY**.
- By default ***ORDER BY*** will sort the data in ascending order. We can change the order by passing ***DESC*** after ***order by***.


In [30]:
%%sparksql

SELECT t.*,
  rank() OVER (PARTITION BY order_date ORDER BY revenue DESC) AS rnk
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 10

0,1,2,3
order_date,order_item_product_id,revenue,rnk
2013-07-25 00:00:00.0,1004,5599.72,1
2013-07-25 00:00:00.0,191,5099.49,2
2013-07-25 00:00:00.0,957,4499.7,3
2013-07-25 00:00:00.0,365,3359.44,4
2013-07-25 00:00:00.0,1073,2999.85,5
2013-07-25 00:00:00.0,1014,2798.88,6
2013-07-25 00:00:00.0,403,1949.85,7
2013-07-25 00:00:00.0,502,1650.0,8
2013-07-25 00:00:00.0,627,1079.73,9


In [6]:
%%sparksql
use kevin_hr

#### Understand the difference between **rank**, **dense_rank** and **row_number**.

- We can either of the functions to generate ranks when the rank field does not have duplicates.
- When rank field have duplicates then row_number should not be used as it generate unique number for each record with in the partition.
- rank will skip the ranks in between if multiple people get the same rank while dense_rank continue with the next number

In [7]:
%%sparksql

SELECT
  employee_id,department_id,salary,
  rank() OVER (PARTITION BY department_id ORDER BY salary DESC) rnk,
  dense_rank() OVER (PARTITION BY department_id ORDER BY salary DESC) drnk,
  row_number() OVER (PARTITION BY department_id ORDER BY salary DESC) rn
FROM employees
ORDER BY department_id, salary DESC

[Stage 2:>                                                          (0 + 1) / 1]

only showing top 20 row(s)


                                                                                

0,1,2,3,4,5
employee_id,department_id,salary,rnk,drnk,rn
178,,7000.00,1,1,1
200,10,4400.00,1,1,1
201,20,13000.00,1,1,1
202,20,6000.00,2,2,2
114,30,11000.00,1,1,1
115,30,3100.00,2,2,2
116,30,2900.00,3,3,3
117,30,2800.00,4,4,4
118,30,2600.00,5,5,5


### Understanding order of execution of SQL

- order of writing the query.

>SELECT

>FROM

>JOIN or OUTER JOIN with ON

>WHERE

>GROUP BY and optionally HAVING

>ORDER BY


- However order of execution is different.

    FROM

    JOIN or OUTER JOIN with ON

    WHERE

    GROUP BY and optionally HAVING

    SELECT

    ORDER BY

>As SELECT is executed before ORDER BY Clause, we will not be able to refer the aliases in SELECT in other clauses except for ORDER BY.


In [8]:
%%sparksql
use  kevin_retail

In [9]:
%%sparksql

SELECT o.order_date,round(sum(oi.order_item_order_id), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
ORDER BY o.order_date
LIMIT 10

                                                                                

0,1
order_date,revenue
2013-07-25 00:00:00.0,3472888
2013-07-26 00:00:00.0,1798683
2013-07-27 00:00:00.0,3040190
2013-07-28 00:00:00.0,1128298
2013-07-29 00:00:00.0,2574603
2013-07-30 00:00:00.0,1849067
2013-07-31 00:00:00.0,3443867
2013-08-01 00:00:00.0,1128103
2013-08-02 00:00:00.0,1505612


In [41]:
%%sparksql

SELECT o.order_date,round(sum(oi.order_item_order_id), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
    HAVING revenue >= 2000000
ORDER BY order_date
LIMIT 10

0,1
order_date,revenue
2013-07-25 00:00:00.0,3472888
2013-07-27 00:00:00.0,3040190
2013-07-29 00:00:00.0,2574603
2013-07-31 00:00:00.0,3443867
2013-08-03 00:00:00.0,2248798
2013-08-04 00:00:00.0,2844419
2013-08-07 00:00:00.0,3403251
2013-08-08 00:00:00.0,2197269
2013-08-09 00:00:00.0,2219000


### Overview of Nested Sub Queries

- We typically have Nested Sub Queries in **FROM** Clause.
- We need to provide **alias** to the Nested Sub Queries in **FROM** Clause in *Hive*.
- We use nested queries quite often over queries using Analytics/Windowing Functions
- We can achieve using HAVING clause (no need to be nested to filter)


In [10]:
%%sparksql

SELECT * FROM (SELECT current_date) AS q

0
current_date()
2023-04-03


In [12]:
%%sparksql

SELECT * FROM (
  SELECT order_date, count(1) AS order_count
  FROM orders
  GROUP BY order_date
) q
LIMIT 10

                                                                                

0,1
order_date,order_count
2013-08-13 00:00:00.0,73
2014-03-19 00:00:00.0,130
2014-04-26 00:00:00.0,251
2013-10-12 00:00:00.0,162
2013-11-15 00:00:00.0,135
2013-09-16 00:00:00.0,121
2013-09-20 00:00:00.0,139
2013-12-31 00:00:00.0,266
2014-06-15 00:00:00.0,128


In [18]:
%%sparksql

SELECT * FROM (
  SELECT order_date, count(1) AS order_count
  FROM orders
  GROUP BY order_date
) q
WHERE q.order_count > 0
limit 5

0,1
order_date,order_count
2013-08-13 00:00:00.0,73
2013-10-12 00:00:00.0,162
2013-11-15 00:00:00.0,135
2013-09-16 00:00:00.0,121
2013-09-20 00:00:00.0,139


### Filtering - Window Function Results

- We can use *Window Functions* only in *SELECT* Clause.
- If we have to filter based on Window Function results, then we need to use Nested Sub Queries.
- Once the query is nested, we can apply filter using aliases of the Window Functions.


In [20]:
%%sparksql

SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (PARTITION BY order_date ORDER BY revenue DESC) AS drnk
  FROM daily_product_revenue t
) q
WHERE drnk <= 5
ORDER BY q.order_date, q.revenue DESC

[Stage 31:>                                                         (0 + 1) / 1]

only showing top 20 row(s)


                                                                                

0,1,2,3
order_date,order_item_product_id,revenue,drnk
2013-07-25 00:00:00.0,1004,5599.72,1
2013-07-25 00:00:00.0,191,5099.49,2
2013-07-25 00:00:00.0,957,4499.7,3
2013-07-25 00:00:00.0,365,3359.44,4
2013-07-25 00:00:00.0,1073,2999.85,5
2013-07-26 00:00:00.0,1004,10799.46,1
2013-07-26 00:00:00.0,365,7978.67,2
2013-07-26 00:00:00.0,957,6899.54,3
2013-07-26 00:00:00.0,191,6799.32,4


### Ranking and Filtering - Recap

- We have our original data in orders and order_items
- We can pre-compute the data or create a view with the logic to generate daily product revenue
- Then, we have to use the view or table or even nested query to compute rank
- Once the ranks are computed, we need to nest it to filter based up on our requirement.


In [21]:
%%sparksql

USE kevin_retail

In [22]:
%%sparksql

DESCRIBE orders

0,1,2
col_name,data_type,comment
order_id,int,
order_date,string,
order_customer_id,int,
order_status,string,


In [23]:
%%sparksql

DESCRIBE order_items

0,1,2
col_name,data_type,comment
order_item_id,int,
order_item_order_id,int,
order_item_product_id,int,
order_item_quantity,int,
order_item_subtotal,float,
order_item_product_price,float,


the query to compute daily product revenue.

In [24]:
%%sparksql

SELECT o.order_date,oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
ORDER BY o.order_date, revenue DESC
LIMIT 100



only showing top 20 row(s)


                                                                                

0,1,2
order_date,order_item_product_id,revenue
2013-07-25 00:00:00.0,1004,5599.72
2013-07-25 00:00:00.0,191,5099.49
2013-07-25 00:00:00.0,957,4499.7
2013-07-25 00:00:00.0,365,3359.44
2013-07-25 00:00:00.0,1073,2999.85
2013-07-25 00:00:00.0,1014,2798.88
2013-07-25 00:00:00.0,403,1949.85
2013-07-25 00:00:00.0,502,1650.0
2013-07-25 00:00:00.0,627,1079.73


computing the rank for each product with in each date using revenue as criteria.

In [25]:
%%sparksql

SELECT q.*,
  rank() OVER (PARTITION BY order_date ORDER BY revenue DESC) AS rnk
FROM (SELECT o.order_date, oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q
ORDER BY order_date, revenue DESC
LIMIT 35

[Stage 39:>                                                         (0 + 1) / 1]

only showing top 20 row(s)


                                                                                

0,1,2,3
order_date,order_item_product_id,revenue,rnk
2013-07-25 00:00:00.0,1004,5599.72,1
2013-07-25 00:00:00.0,191,5099.49,2
2013-07-25 00:00:00.0,957,4499.7,3
2013-07-25 00:00:00.0,365,3359.44,4
2013-07-25 00:00:00.0,1073,2999.85,5
2013-07-25 00:00:00.0,1014,2798.88,6
2013-07-25 00:00:00.0,403,1949.85,7
2013-07-25 00:00:00.0,502,1650.0,8
2013-07-25 00:00:00.0,627,1079.73,9


filtering the data.

In [26]:
%%sparksql

SELECT * FROM (SELECT q.*,
  dense_rank() OVER (PARTITION BY order_date ORDER BY revenue DESC) AS drnk
FROM (SELECT o.order_date, oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q) q1
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35

[Stage 46:>                                                         (0 + 1) / 1]

only showing top 20 row(s)


                                                                                

0,1,2,3
order_date,order_item_product_id,revenue,drnk
2013-07-25 00:00:00.0,1004,5599.72,1
2013-07-25 00:00:00.0,191,5099.49,2
2013-07-25 00:00:00.0,957,4499.7,3
2013-07-25 00:00:00.0,365,3359.44,4
2013-07-25 00:00:00.0,1073,2999.85,5
2013-07-26 00:00:00.0,1004,10799.46,1
2013-07-26 00:00:00.0,365,7978.67,2
2013-07-26 00:00:00.0,957,6899.54,3
2013-07-26 00:00:00.0,191,6799.32,4


In [27]:
%%sparksql

SELECT * FROM (SELECT dpr.*,
  dense_rank() OVER (PARTITION BY order_date ORDER BY revenue DESC) AS drnk
FROM daily_product_revenue AS dpr)
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35



only showing top 20 row(s)


                                                                                

0,1,2,3
order_date,order_item_product_id,revenue,drnk
2013-07-25 00:00:00.0,1004,5599.72,1
2013-07-25 00:00:00.0,191,5099.49,2
2013-07-25 00:00:00.0,957,4499.7,3
2013-07-25 00:00:00.0,365,3359.44,4
2013-07-25 00:00:00.0,1073,2999.85,5
2013-07-26 00:00:00.0,1004,10799.46,1
2013-07-26 00:00:00.0,365,7978.67,2
2013-07-26 00:00:00.0,957,6899.54,3
2013-07-26 00:00:00.0,191,6799.32,4


23/04/03 21:36:11 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.EOFException: End of File Exception between local host is: "master/192.168.56.50"; destination host is: "master":9000; : java.io.EOFException; For more details see:  http://wiki.apache.org/hadoop/EOFException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:913)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:862)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1616)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.Protobu