### <div class="alert alert-success" style="background:#2C3E50;color:white">Data Frame Operations - Window Functions - Overview & APIs</div>

This section will show advanced operations on Data Frames such as -
* Windowing functions

Windowing functions comprise APIs for -
* Aggregations
* Rankings
* Analytics

<p style="background:#F1C40F"><b>Example Problem : </b>To compute and compare individual salary with department wise salary expense. </p>

<p style="background:#AED6F1"><b>Windowing Aggregation Example</p>

<p style="background :#d0d5db"><b> Approaches:</b></p>

* First approach requires self join.
    - Compute department wise expense using groupBy and agg.
    - Join with employees again on department_id.

<p style="background :#d0d5db"><b> Example</p>

In [None]:
>>> employeesDF = spark.read. \
...                     format('csv'). \
...                     option('sep', '\t'). \
...                     schema('''emp_id int,
...                             fname string,
...                             lname string,
...                             email string,
...                             phone string,
...                             hiredt string,
...                             job_id string,
...                             salary float,
...                             commission_pct string,
...                             manager_id string,
...                             department_id int'''). \
...                     load('/user/monahadoop/hr_db/employees')

In [None]:
>>> employeesDF.show()

In [None]:
>>> employeesDF.select('emp_id', 'department_id', 'salary'). \
...             groupBy('department_id'). \
...             agg(round(sum('salary'),2).alias('salary_expense')).show()

* <p style="background :#d0d5db"><b> Second approach Aggreagtion with windowing functions.</p>

**However, using the first approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently.**

The details related to Windowing functions are as follows:
- Main package pyspark.sql.window
- It has classes such as Window and WindowSpec
- Window have APIs such as partitionBy, orderBy etc
- These APIs (such as partitionBy) return WindowSpec object. We can pass WindowSpec object to over on functions such as rank(), dense_rank(), sum() etc
 - Syntax:<code>sum().over(spec) where spec = Window.partitionBy('ColumnName')</code>

The following is the lst of different windowing functions -


<table align=left><tr><td>Functions</td><td>API or Function</td></tr>
<tr><td>Aggregate Functions</td><td>sum</td></tr>
<tr><td></td><td>avg</td></tr>
<tr><td></td><td>min</td></tr>
<tr><td></td><td>max</td></tr>
<tr><td>Ranking Functions</td><td>rank</td></tr>   
<tr><td></td><td>dense_rank</td></tr>
<tr><td></td><td>percent_rank</td></tr>
<tr><td></td><td>row_number</td></tr>
<tr><td></td><td>ntile</td></tr>
<tr><td>Analytic Functions</td><td></td></tr>
<tr><td></td><td>cume_dist</td></tr>
<tr><td></td><td>first</td></tr>
<tr><td></td><td>last</td></tr>
<tr><td></td><td>lead</td></tr>
<tr><td></td><td>lag</td></tr>    
</table>

<p style="background :#d0d5db"><b> Example</p>

In [None]:
>>> from pyspark.sql.window import *

In [None]:
>>> from pyspark.sql.functions import *

In [None]:
>>> employeesDF = spark.read. \
...                      format('csv'). \
...                      option('sep', '\t'). \
...                      schema('''emp_id int,
...                              fname string,
...                              lname string,
...                              email string,
...                              phone string,
...                              hiredt string,
...                              job_id string,
...                              salary float,
...                              commission_pct string,
...                              manager_id string,
...                             department_id string'''). \
...                     load('/user/monahadoop/hr_db/employees')
>>> employeesDF.show()

In [None]:
>>> spec = Window.partitionBy('department_id')

In [None]:
>>> employeesDF.select('emp_id', 'salary', 'department_id').\
...             withColumn('salary_expense', sum('salary').over(spec)). \
...             show()

<p style="background :#d0d5db"><b>Windowing Aggregation Example using spark sql</b> </p>

In [None]:
>>> employeesDF.createOrReplaceTempView('empvw')

In [None]:
>>> salaryExpense = spark.sql("select _c0, _c7, _c10, sum(_c7)
                              over(partition by _c10) salary_exp 
                              from empvw order by _c10, _c7 desc")

<p style="background:#AED6F1"><b>Data Frame Operations - Creating Window Spec</b>

<p style="background:#F1C40F"><b>Window spec :</b> is a specification which contains based on what columns you want to perform operations such as aggregation, ranking, windowing, analytics etc.</p>

* The class Window in spark has APIs such a partitionBy and orderBy. 
* For aggregation, we can define the group by using partitionBy.
* For ranking and windowing we need to use partitionBy and orderBy.<br>
  partitionBy is to group the data based on the specified column <br>
  orderBy   is sort the data to assign rank.
* partitionBy and orderBy return object of class WindowSpec.
* WindowSpec object needs to be passed in over() clause for ranking and       aggregation.

In [None]:
>>> from pyspark.sql import window

In [None]:
>>> help(window)

In [None]:
>>> spec = Window.\
...             partitionBy('department_id'). \
...             orderBy(employeesDF.salary.desc())

<p style="background:#AED6F1"><b>Data Frame Operations - Performing Aggregations using sum, avg etc</b>

In [None]:
>>> employeesDF = spark.read. \
...                      format('csv'). \
...                      option('sep', '\t'). \
...                      schema('''emp_id int,
...                              fname string,
...                              lname string,
...                              email string,
...                              phone string,
...                              hiredt string,
...                              job_id string,
...                              salary float,
...                              commission_pct string,
...                              manager_id string,
...                             department_id string'''). \
...                     load('/user/monahadoop/hr_db/employees')

In [None]:
>>> from pyspark.sql.window import *
>>> spec = Window.partitionBy('department_id')

In [None]:
>>> employeesDF.select('emp_id', 'salary', 'department_id').\
...             withColumn('salary_expense', sum('salary').over(spec)). \
...             sort('department_id').show()

In [None]:
>>> employeesDF.select('emp_id','department_id','salary').\
...             withColumn('salary_expense', sum('salary').over(spec)). \
...             withColumn('least_salary', min('salary').over(spec)). \
...             withColumn('highest_salary', max('salary').over(spec)). \
...             withColumn('average_salary', avg('salary').over(spec)). \
...             sort('department_id').show()

In [None]:
>>> employeesDF.select('emp_id','department_id','salary').\
...             withColumn('salary_expense', sum('salary').over(spec)). \
...             withColumn('salary_pct', employeesDF.salary/col('salary_expense')). \
...             show()

In [None]:
>>> employeesDF.select('emp_id','department_id','salary').\
...             withColumn('salary_expense', sum('salary').over(spec)). \
...             withColumn('salary_pct', round((employeesDF.salary/col('salary_expense') * 100), 2)). \ 
...             show()

<p style="background:#AED6F1"><b>Exercises</b></p>

<p style="background:#F1C40F">Get the average Salary for each department and details of all employees who earn more than the average salary .</p>

In [None]:
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.window import *

In [None]:
>>> employees = spark.read. \
...                     format('csv'). \
...                     option('sep', '\t'). \
...                     schema('''emp_id int,
...                             fname string,
...                             lname string,
...                             email string,
...                             phone string,
...                             hiredt string,
...                             job_id string,
...                             salary float,
...                             com_pct string,
...                             mgr_id string,
...                             dept_id string'''). \
...                     load('/user/monahadoop/hr_db/employees')

>>> employees.printSchema()

In [None]:
>>> spec = Window.partitionBy('dept_id')

In [None]:
>>> avgSalaryByDept = employees.\
...                     select('emp_id', 'fname', 'lname', 'salary', 'dept_id'). \
...                     withColumn('average_salary', avg('salary').over(spec))

In [None]:
>>> avgSalaryByDept.show()

In [None]:
>>> avgSalaryByDept = employees.\
...                     select('emp_id', 'fname', 'lname', 'salary', 'dept_id'). \
...                     withColumn('average_salary', round(avg('salary').over(spec), 2)). \
...                     where(col('average_salary') < employees.salary)

In [None]:
>>> avgSalaryByDept.show()

In [None]:
>>> avgSalaryByDept.show(50)

<p style="background :#d0d5db"><b>Solving using spark sql</b> </p>

In [None]:
>>> employees.createOrReplaceTempView('empvw')

In [None]:
>>> avgSalaryByDept_sql = spark.sql('select * from empvw')

In [None]:
>>> avgSalaryByDept_sql.show(5)

In [None]:
>>> avgSalaryByDept_sql = spark.sql('''select emp_id,
...                                             fname,
...                                             lname,
...                                             email,
...                                             salary,
...                                             dept_id,
...                                             round(avg(salary) over(partition by dept_id), 2) as average_salary
...                                     from empvw ''')

In [None]:
>>> avgSalaryByDept_sql.show(6)

In [None]:
>>> avgSalaryByDept_sql.where(col('salary') > col('average_salary')).show()

In [None]:
>>> avgSalaryByDept_sql.where(col('dept_id') == 30).show()

In [None]:
>>> avgSalaryByDept_sql.where((col('salary') > col('average_salary')) & (col('dept_id') == 30)).show()

<p style="background:#F1C40F">Get the average revenue for each day and the details of orders like order_id and their total revenue, that have more revenue than the average.</p>

In [None]:
>>> orders = spark.read. \
...             format('csv'). \
...             schema('''
...                     order_id int,
...                     order_date string,
...                     customer_id int,
...                     order_status string
...                     '''). \
...             load('/public/retail_db/orders')
>>> 
>>> orders.show(5)

In [None]:
>>> orderItems = spark.read. \
...                     format('csv'). \
...                     schema('''
...                             oi_item_id int,
...                             oi_order_id int,
...                             oi_product_id int,
...                             oi_qty int,
...                             oi_subtotal float,
...                             oi_product_price float
...                             '''). \
...                     load('/public/retail_db/order_items')
>>> orderItems.show(5)

In [None]:
>>> ordersJoin = orders.where('order_status in ("CLOSED", "COMPLETE")'). \
...                     join(orderItems, orders.order_id == orderItems.oi_order_id)
>>> ordersJoin.show()

In [None]:
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.window import *

In [None]:
>>> spec = Window.partitionBy(ordersJoin.order_date)

In [None]:
>>> dailyRevenue = ordersJoin. \
...                     select('order_id', 'order_date', 'oi_subtotal'). \
...                     withColumn('avg_revenue', round(avg('oi_subtotal').over(spec), 2)). \
...                     withColumn('total_revenue', round(sum('oi_subtotal').over(Window.partitionBy('order_id', 'order_date')), 2))

In [None]:
>>> dailyRevenue.show()

In [None]:
>>> dailyRevenue.where('total_revenue > avg_revenue').show()

In [None]:
>>> dailyRevenue.where('total_revenue > avg_revenue and order_id = 3390').show()

<p style="background:#F1C40F">Get the highest order revenue and the details of the orders that have more than 75% of the revenue .</p>

<p style="background:#AED6F1"><b>Data Frame Operations - Time Series Functions such as Lead, Lag etc</b>

<p style="background :#d0d5db"><b>Lead Windowing Function example : </b> <br>Getting Employee details like<br>
 - emp_id, department_id, salary, next_emp_id, next_emp_salary. (next_emp_id -> second_highest_grosser, next_emp_salary -> second highest salary)<br>
 - Difference between highest and second highest salaries.</p>

In [None]:
>>> employeesDF = spark.read. \
...                      format('csv'). \
...                      option('sep', '\t'). \
...                      schema('''emp_id int,
...                              fname string,
...                              lname string,
...                              email string,
...                              phone string,
...                              hiredt string,
...                              job_id string,
...                              salary float,
...                              commission_pct string,
...                              manager_id string,
...                              department_id int'''). \
...                      load('/user/monahadoop/hr_db/employees')  
>>> 
>>> employeesDF.show(5)

In [None]:
>>> from pyspark.sql.window import *

In [None]:
>>> spark.conf.set('spark.sql.shuffle.partitions', '2')

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary')

In [None]:
>>> employeesDF.show()

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary').sort('department_id')

In [None]:
>>> employeesDF.show()

In [None]:
>>> spec = Window.\
...             partitionBy('department_id'). \
...             orderBy(employeesDF.salary.desc())

In [None]:
>>> from pyspark.sql.functions import *

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                     withColumn('next_emp_id', lead('emp_id').over(spec)). \
...                     sort('department_id', employeesDF.salary.desc())
>>>employeesDF.show()

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                     withColumn('next_highest_grosser', lead('emp_id').over(spec)). \
...                     withColumn('next_highest_salary', lead('salary').over(spec)). \
...                     sort('department_id', employeesDF.salary.desc())

In [None]:
>>> employeesDF.show()

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                     withColumn('next_highest_grosser', lead('emp_id').over(spec)). \
...                     withColumn('next_highest_salary', lead('salary').over(spec)). \
...                     withColumn('salary_difference', employeesDF.salary - col('next_highest_salary')). \
...                     sort('department_id', employeesDF.salary.desc())

In [None]:
>>> employeesDF.show()

<p style="background :#d0d5db"><b>Using Lead Windowing Function : </b> To find next to next highest salary</p>

In [None]:
>>> employeesDF = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                      withColumn('next_to_next_highest_grosser', lead('emp_id', 2).over(spec)). \
...                      withColumn('next_to_next_highest_salary', lead('salary', 2).over(spec)). \
...                      sort('department_id', employeesDF.salary.desc())

In [None]:
>>> employeesDF.show()

<p style="background:#AED6F1"><b>Exercises</b></p>

<p style="background:#F1C40F">Get the salary difference between current and previous employee within each department.</p>

<p style="background :#d0d5db"><b>Using Lag Windowing Function : </b></p>

In [None]:
>>> employeesLag = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                      withColumn('prev_emp', lag('emp_id').over(spec)). \
...                      withColumn('prev_salary', lag('salary').over(spec)). \
...                      sort('department_id', employeesDF.salary.desc())

In [None]:
>>> employeesLag.show()

In [None]:
>>> employeesLag = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                      withColumn('prev_to_prev_emp', lag('emp_id', 2).over(spec)). \
...                      withColumn('prev_to_prev_salary', lag('salary', 2).over(spec)). \
...                      sort('department_id', employeesDF.salary.desc())

In [None]:
>>> employeesLag.show()

<p style="background :#d0d5db"><b>Using Last Windowing Function : </b></p>

To find the department-wise last salary (in this case least too need to check if last always means least or last in the order)->
* last doesn't run normally like lead, lag and first
* a rangeBetween clause needs to set in the Window specification, which takes pre-set values named unboundedPreceding and unboundedFollowing, then the last() works as desired, here in this case givng last salary of each department.

In [None]:
>>> employeesDF = spark.read. \
...                      format('csv'). \
...                      option('sep', '\t'). \
...                      schema('''emp_id int,
...                              fname string,
...                              lname string,
...                              email string,
...                              phone string,
...                              hiredt string,
...                              job_id string,
...                              salary float,
...                              commission_pct string,
...                              manager_id string,
...                              department_id int'''). \
...                      load('/user/monahadoop/hr_db/employees')  
>>> 
>>> employeesDF.show(5)

In [None]:
>>> spec = Window.partitionBy('department_id'). \
...             orderBy(employeesDF.salary.desc()). \
...             rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

In [None]:
>>> employeesLast = employeesDF.select('emp_id', 'department_id', 'salary'). \
...                             withColumn('last_salary', last(employeesDF.salary, False).over(spec)).\
...                             orderBy(employeesDF.department_id, employeesDF.salary.desc())

In [None]:
>>> employeesLast.show()

<p style="background:#F1C40F">Time Series - Revenue comparison between two windows.</p>

<p style="background:#FA8072;border-style:solid;"><b>TO BE DONE</b></p>

<p style="background:#AED6F1"><b>Data Frame Operations - Ranking Functions - rank, dense_rank, row_number etc</b>

In Ranking functions data is partitioned by a key (such as department id) and then sorted by some other key (such as salary).
* Ranking functions are - rank, dense_rank, row_number etc.
* In spark we need to create a WindowSpec object using partitionBy and orderBy for most of the ranking functions.
* Some use cases are -
    - Assign rank to employees based on salary within each department.
    - Assign ranks to products based on revenue each day or month.

In [None]:
>>> employees = spark.read. \
...                      format('csv'). \
...                      option('sep', '\t'). \
...                      schema('''emp_id int,
...                              fname string,
...                              lname string,
...                              email string,
...                              phone string,
...                              hiredt string,
...                              job_id string,
...                              salary float,
...                              commission_pct string,
...                              manager_id string,
...                              department_id int'''). \
...                      load('/user/monahadoop/hr_db/employees') 

>>> employees.show()

In [None]:
>>> employees = employees.select('emp_id', 'department_id', 'salary'). \
...                     orderBy('department_id', employees.salary.desc())

In [None]:
>>> employees.show()

In [None]:
>>> from pyspark.sql.window import *

In [None]:
>>> from pyspark.sql.functions import *

In [None]:
>>> spec = Window.partitionBy('department_id').\
...             orderBy(employees.salary.desc())

<p style="background :#d0d5db"><b>Using rank() Function : </b></p>

In [None]:
>>> employees. \
...     withColumn('emp_rank', rank().over(spec)). \
...     orderBy('department_id', employees.salary.desc()).\
...     show()

<p style="background :#d0d5db"><b>Using dense_rank() Function : </b></p>

In [None]:
>>> employees. \
...     withColumn('emp_rank', dense_rank().over(spec)). \
...     orderBy('department_id', employees.salary.desc()).\
...     show(200)

<p style="background :#d0d5db"><b>Using row_number() Function : </b></p>

In [None]:
>>> employees. \
...     withColumn('emp_rank', rank().over(spec)). \
...     withColumn('emp_denserank', dense_rank().over(spec)). \
...     withColumn('emp_num', row_number().over(spec)). \
...     orderBy('department_id', employees.salary.desc()).\
...     show(200)

<p style="background:#AED6F1"><b>Define Problem Statement - Get Top N Daily Products</b>

To develop a code, which will calculate the revenue for each product on a daily basis and display top N products for that day (by date).

**Assuming here, we need top 5 products by date.**

The datasets required for this problem are - 
* Orders Data File (orders.csv)
* Order Items Data File (orderItems.csv)
* Products Data File (products.csv)

<p style="background:#AED6F1"><b> Design - Get Top N Daily Products</p>

<p style="background:#FA8072;border-style:solid;"><b>This is to be corrected</b></p>

The design of the problem is as follows -

1. orders.csv read in orders data frame.
2. order_items.csv read in orderItems data frame.
3. products.csv read in products data frame.
4. Filter orders DF, orderItems DF and products DF to get desired columns.
5. Filtered orders DF and orderItems DF joined into ordersJoin DF, on order_id as key column.
6. Create window spec specifying partitionBy order_date and product_id and orderBy order_date and product_id 
7. In ordersJoin DF order_item_subtotal column aggregated using sum function over spec, creating revenue column and data ranked over spec.
8. ordersJoin DF and products DF joined on product_id as key column and a result data frame  created to show product_name and ranked revenue to display top 5 products by date.

<p style="background :#d0d5db"><b>Creating DF</b> </p>

<p style="background :#d0d5db"><b>Filtering DFs</b> </p>

<p style="background :#d0d5db"><b>Joining Order DFs</b> </p>

In [None]:
>>> dailyRevenue = orders.where('order_status in ("COMPLETE","CLOSED")'). \
...                     join(orderItems, orders.order_id == orderItems.oi_order_id).\
...                     groupBy('order_date','oi_product_id').\
...                     agg(round(sum('subtotal'), 2).alias('revenue')).\
...                     orderBy('order_date',orderItems.oi_product_id.desc())

In [None]:
dailyProdRevenue = spark.sql('''select order_date, oi_product_id, round(sum(oi_subtotal), 2) as revenue
            from ordersvw o, orderItemsvw oi
            where o.order_id = oi.oi_order_id
            and
            o.order_status in ("CLOSED", "COMPLETE")
            group by o.order_date, oi.oi_product_id 
            order by o.order_date, oi.oi_product_id desc''')


<p style="background :#d0d5db"><b>Creating Spec, Importing functions and Window objects</b> </p>

In [None]:
dailyProdRevenue = spark.sql('''select order_date, oi_product_id, sum(oi_subtotal)
            from ordersvw, orderItemsvw
            where order_id = oi_order_id
            and
            order_status in ("CLOSED", "COMPLETE")
            group by order_date, oi_product_id
            ''')

In [None]:
select order_date, order_item_product_id, sum(order_item_subtotal)
            from orders, order_items
            where order_id = order_item_order_id
            and
            order_status in ("CLOSED", "COMPLETE")
            group by order_date, order_item_product_id;

<p style="background :#d0d5db"><b>Aggregating and Ranking in DFs</b> </p>

<p style="background :#d0d5db"><b>Joining aggregated order and product DFs</b> </p>

<p style="background :#d0d5db"><b>END</b> </p>