In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [None]:
!hadoop fs -cat /public/trendytech/datasets/windowdata.csv

In [2]:
orders_df = spark.read \
.format("csv") \
.option('header','true') \
.option('inferSchema','true') \
.load("/public/trendytech/datasets/windowdata.csv")

In [3]:
orders_df.sort("country").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       346.1|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|      838.65|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     49|

In [4]:
orders_df.createOrReplaceTempView('data')

In [5]:
spark.sql("""
select country, sum(invoicevalue) as total_invoice from data group by country order by total_invoice
""").show()

+---------------+------------------+
|        country|     total_invoice|
+---------------+------------------+
|         Israel|           -227.44|
|        Bahrain|            205.74|
|         Poland|            248.16|
|        Austria|            257.04|
|Channel Islands|            363.53|
|        Iceland|            711.79|
|          Italy|             794.5|
|        Finland|             892.8|
|      Australia|            1005.1|
|        Denmark|            1281.5|
|    Switzerland|           1304.92|
|         Cyprus|           1590.82|
|      Lithuania|           1661.06|
|        Belgium|1809.9099999999999|
|          Spain|           1843.73|
|       Portugal|           2380.12|
|         Sweden|            2646.3|
|         Norway|           3787.12|
|          Japan|           7705.07|
|    Netherlands|           8784.48|
+---------------+------------------+
only showing top 20 rows



### Make window based on these 3 parameters:
#### 1. Partition Based on country
#### 2. Sort based on Week Number
#### 3. Window Size

In [7]:
orders_df.sort("country").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       346.1|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|      838.65|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     49|

In [8]:
from pyspark.sql import *

In [9]:
mywindw = Window.partitionBy("country") \
.orderBy("weeknum") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [10]:
result = orders_df.withColumn("running_total", sum("invoicevalue").over(mywindw))

In [11]:
result.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|     running_total|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|     3309.75|           3309.75|
|Germany|     49|         12|         1852|     4521.39|           7831.14|
|Germany|     50|         15|         1973|     5065.79|          12896.93|
|Germany|     51|          5|         1103|     1665.91|          14562.84|
| France|     48|          4|         1299|     2808.16|           2808.16|
| France|     49|          9|         2303|     4527.01|           7335.17|
| France|     50|          6|          529|      537.32|           7872.49|
| France|     51|          5|          847|     1702.87|           9575.36|
|Belgium|     48|          1|          528|       346.1|             346.1|
|Belgium|   

## More Windowing functions
#### 1) Rank : Skipping Rank when score repeats
#### 2) DenseRank : Don't Skip Rank even if score repeats
#### 3) RowNumber : Similar to  Sr No, ranks arent repeated and there's random selection to it.
#### 4) Lead 
#### 5) Lag

In [15]:
orders_df = spark.read \
.format("csv") \
.option('header','true') \
.option('inferSchema','true') \
.load("/public/trendytech/datasets/windowdatamodified.csv")

In [16]:
new_window = Window.partitionBy("country") \
.orderBy(desc("invoicevalue"))

## Rank

In [17]:
rank_df = orders_df.withColumn("Rank", rank().over(new_window))

In [18]:
rank_df.show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|Rank|
+-------+-------+-----------+-------------+------------+----+
| Sweden|     50|          3|         3714|      2646.3|   1|
|Germany|     49|         12|         1852|      1800.0|   1|
|Germany|     50|         15|         1973|      1800.0|   1|
|Germany|     48|         11|         1795|      1600.0|   3|
|Germany|     51|          5|         1103|      1600.0|   3|
| France|     50|          6|          529|      537.32|   1|
| France|     51|          5|          847|       500.0|   2|
| France|     49|          9|         2303|       500.0|   2|
| France|     48|          4|         1299|       500.0|   2|
|Belgium|     48|          1|          528|       800.0|   1|
|Belgium|     51|          2|          942|       800.0|   1|
|Belgium|     50|          2|          285|      625.16|   3|
|Finland|     50|          1|         1254|       892.8|   1|
|  India

## Dense Rank

In [21]:
dense_df = orders_df.withColumn("dense_rank", dense_rank().over(new_window))

In [22]:
dense_df.show()

+-------+-------+-----------+-------------+------------+----------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|dense_rank|
+-------+-------+-----------+-------------+------------+----------+
| Sweden|     50|          3|         3714|      2646.3|         1|
|Germany|     49|         12|         1852|      1800.0|         1|
|Germany|     50|         15|         1973|      1800.0|         1|
|Germany|     48|         11|         1795|      1600.0|         2|
|Germany|     51|          5|         1103|      1600.0|         2|
| France|     50|          6|          529|      537.32|         1|
| France|     51|          5|          847|       500.0|         2|
| France|     49|          9|         2303|       500.0|         2|
| France|     48|          4|         1299|       500.0|         2|
|Belgium|     48|          1|          528|       800.0|         1|
|Belgium|     51|          2|          942|       800.0|         1|
|Belgium|     50|          2|          285|     

## Row Number

In [24]:
row_numdf = orders_df.withColumn("Row", row_number().over(new_window))

In [25]:
row_numdf.show()

+-------+-------+-----------+-------------+------------+---+
|country|weeknum|numinvoices|totalquantity|invoicevalue|Row|
+-------+-------+-----------+-------------+------------+---+
| Sweden|     50|          3|         3714|      2646.3|  1|
|Germany|     49|         12|         1852|      1800.0|  1|
|Germany|     50|         15|         1973|      1800.0|  2|
|Germany|     48|         11|         1795|      1600.0|  3|
|Germany|     51|          5|         1103|      1600.0|  4|
| France|     50|          6|          529|      537.32|  1|
| France|     51|          5|          847|       500.0|  2|
| France|     49|          9|         2303|       500.0|  3|
| France|     48|          4|         1299|       500.0|  4|
|Belgium|     48|          1|          528|       800.0|  1|
|Belgium|     51|          2|          942|       800.0|  2|
|Belgium|     50|          2|          285|      625.16|  3|
|Finland|     50|          1|         1254|       892.8|  1|
|  India|     49|       

# LEAD AND LAG Functions

In [26]:
newwindow = Window.partitionBy("country") \
.orderBy("weeknum")

In [27]:
orders_df.orderBy("country","weeknum").show(50)

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     48|          1|          107|      358.25|
|      Australia|     49|          1|          214|       258.9|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       800.0|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|       800.0|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     48|

In [28]:
lag_df = orders_df.withColumn("previous_week",lag("invoicevalue").over(newwindow))

In [29]:
lag_df.show()

+-------+-------+-----------+-------------+------------+-------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|
+-------+-------+-----------+-------------+------------+-------------+
| Sweden|     50|          3|         3714|      2646.3|         null|
|Germany|     48|         11|         1795|      1600.0|         null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|
| France|     48|          4|         1299|       500.0|         null|
| France|     49|          9|         2303|       500.0|        500.0|
| France|     50|          6|          529|      537.32|        500.0|
| France|     51|          5|          847|       500.0|       537.32|
|Belgium|     48|          1|          528|       800.0|         null|
|Belgium|     50|          2|          285|      625.16|        800.0|
|Belgi

In [30]:
invoice_diff = lag_df.withColumn("invoice_diff", expr("invoicevalue - previous_week"))

In [31]:
invoice_diff.show()

+-------+-------+-----------+-------------+------------+-------------+-------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|       invoice_diff|
+-------+-------+-----------+-------------+------------+-------------+-------------------+
| Sweden|     50|          3|         3714|      2646.3|         null|               null|
|Germany|     48|         11|         1795|      1600.0|         null|               null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|              200.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|                0.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|             -200.0|
| France|     48|          4|         1299|       500.0|         null|               null|
| France|     49|          9|         2303|       500.0|        500.0|                0.0|
| France|     50|          6|          529|      537.32|        500.0|  37.32000000000005|

## To find total sales across all the weeks

In [32]:
newwin = Window.partitionBy("country")

In [33]:
result_df = orders_df.withColumn("total_invoice_value", sum("invoicevalue").over(newwin))

In [34]:
result_df.show()

+-------+-------+-----------+-------------+------------+-------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|total_invoice_value|
+-------+-------+-----------+-------------+------------+-------------------+
| Sweden|     50|          3|         3714|      2646.3|             2646.3|
|Germany|     48|         11|         1795|      1600.0|             6800.0|
|Germany|     49|         12|         1852|      1800.0|             6800.0|
|Germany|     50|         15|         1973|      1800.0|             6800.0|
|Germany|     51|          5|         1103|      1600.0|             6800.0|
| France|     51|          5|          847|       500.0| 2037.3200000000002|
| France|     49|          9|         2303|       500.0| 2037.3200000000002|
| France|     48|          4|         1299|       500.0| 2037.3200000000002|
| France|     50|          6|          529|      537.32| 2037.3200000000002|
|Belgium|     48|          1|          528|       800.0|            2225.16|