# Data Analysis of a Retail Store using Apache Spark (Scala)

These tables come from the Cloudera Installation. A fictitious retail house with the following tables: customers, departments, categories, products, orders and order_items. In this notebook we will go through Spark SQL as well as Spark DF API based transformations and actions ranging from simple to quite complex.
 

`@author: Amarnath Mukherjee | @email: amarnath.muk@gmail.com | VISA, Inc.`  
`@author: Anindya Saha | @email: mail.anindya@gmail.com | NetApp, Inc.`

## 1. Understanding the Data Set:
![](assets/cloudera-retail-db.png)
Picture Source: https://www.cloudera.com/developers/get-started-with-hadoop-tutorial/exercise-1.html
```
mysql> describe customers;
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
| customer_lname    | varchar(45)  | NO   |     | NULL    |                |
| customer_email    | varchar(45)  | NO   |     | NULL    |                |
| customer_password | varchar(45)  | NO   |     | NULL    |                |
| customer_street   | varchar(255) | NO   |     | NULL    |                |
| customer_city     | varchar(45)  | NO   |     | NULL    |                |
| customer_state    | varchar(45)  | NO   |     | NULL    |                |
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+

mysql> describe departments;
+-----------------+-------------+------+-----+---------+----------------+
| Field           | Type        | Null | Key | Default | Extra          |
+-----------------+-------------+------+-----+---------+----------------+
| department_id   | int(11)     | NO   | PRI | NULL    | auto_increment |
| department_name | varchar(45) | NO   |     | NULL    |                |
+-----------------+-------------+------+-----+---------+----------------+

mysql> describe categories;
+------------------------+-------------+------+-----+---------+----------------+
| Field                  | Type        | Null | Key | Default | Extra          |
+------------------------+-------------+------+-----+---------+----------------+
| category_id            | int(11)     | NO   | PRI | NULL    | auto_increment |
| category_department_id | int(11)     | NO   |     | NULL    |                |
| category_name          | varchar(45) | NO   |     | NULL    |                |
+------------------------+-------------+------+-----+---------+----------------+

mysql> describe products;
+---------------------+--------------+------+-----+---------+----------------+
| Field               | Type         | Null | Key | Default | Extra          |
+---------------------+--------------+------+-----+---------+----------------+
| product_id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| product_category_id | int(11)      | NO   |     | NULL    |                |
| product_name        | varchar(45)  | NO   |     | NULL    |                |
| product_description | varchar(255) | NO   |     | NULL    |                |
| product_price       | float        | NO   |     | NULL    |                |
| product_image       | varchar(255) | NO   |     | NULL    |                |
+---------------------+--------------+------+-----+---------+----------------+

mysql> describe orders;
+-------------------+-------------+------+-----+---------+----------------+
| Field             | Type        | Null | Key | Default | Extra          |
+-------------------+-------------+------+-----+---------+----------------+
| order_id          | int(11)     | NO   | PRI | NULL    | auto_increment |
| order_date        | datetime    | NO   |     | NULL    |                |
| order_customer_id | int(11)     | NO   |     | NULL    |                |
| order_status      | varchar(45) | NO   |     | NULL    |                |
+-------------------+-------------+------+-----+---------+----------------+

mysql> describe order_items;
+--------------------------+------------+------+-----+---------+----------------+
| Field                    | Type       | Null | Key | Default | Extra          |
+--------------------------+------------+------+-----+---------+----------------+
| order_item_id            | int(11)    | NO   | PRI | NULL    | auto_increment |
| order_item_order_id      | int(11)    | NO   |     | NULL    |                |
| order_item_product_id    | int(11)    | NO   |     | NULL    |                |
| order_item_quantity      | tinyint(4) | NO   |     | NULL    |                |
| order_item_subtotal      | float      | NO   |     | NULL    |                |
| order_item_product_price | float      | NO   |     | NULL    |                |
+--------------------------+------------+------+-----+---------+----------------+
```

## 2. Creating the Spark Session:

In [1]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

In [2]:
val spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("retail_database_analysis_scala")
         .getOrCreate())

spark = org.apache.spark.sql.SparkSession@58d3061d


In [3]:
val sc = spark.sparkContext
val sqlContext = spark.sqlContext

sc = org.apache.spark.SparkContext@107efb5b
sqlContext = org.apache.spark.sql.SQLContext@52174c1b


org.apache.spark.sql.SQLContext@52174c1b

In [4]:
import sqlContext.implicits._

## 3. Load the Data From Files Into DataFrames:

In [5]:
val CUSTOMERS_DATA   = "data/customers.csv"
val DEPARTMENTS_DATA = "data/departments.csv"
val CATEGORIES_DATA  = "data/categories.csv"
val PRODUCTS_DATA    = "data/products.csv"
val ORDERS_DATA      = "data/orders.csv"
val ORDER_ITEMS_DATA = "data/order_items.csv"

CUSTOMERS_DATA = data/customers.csv
DEPARTMENTS_DATA = data/departments.csv
CATEGORIES_DATA = data/categories.csv
PRODUCTS_DATA = data/products.csv
ORDERS_DATA = data/orders.csv
ORDER_ITEMS_DATA = data/order_items.csv


data/order_items.csv

In [6]:
val customers_schema = StructType(Array(
    StructField("customer_id", IntegerType, true),
    StructField("customer_fname", StringType, true),
    StructField("customer_lname", StringType, true),
    StructField("customer_email", StringType, true),
    StructField("customer_password", StringType, true),
    StructField("customer_street", StringType, true),
    StructField("customer_city", StringType, true),
    StructField("customer_state", StringType, true),
    StructField("customer_zipcode", StringType, true)))

customers_schema = StructType(StructField(customer_id,IntegerType,true), StructField(customer_fname,StringType,true), StructField(customer_lname,StringType,true), StructField(customer_email,StringType,true), StructField(customer_password,StringType,true), StructField(customer_street,StringType,true), StructField(customer_city,StringType,true), StructField(customer_state,StringType,true), StructField(customer_zipcode,StringType,true))


StructType(StructField(customer_id,IntegerType,true), StructField(customer_fname,StringType,true), StructField(customer_lname,StringType,true), StructField(customer_email,StringType,true), StructField(customer_password,StringType,true), StructField(customer_street,StringType,true), StructField(customer_city,StringType,true), StructField(customer_state,StringType,true), StructField(customer_zipcode,StringType,true))

In [7]:
val departments_schema = StructType(Array(
    StructField("department_id", IntegerType, true),
    StructField("department_name", StringType, true)))

departments_schema = StructType(StructField(department_id,IntegerType,true), StructField(department_name,StringType,true))


StructType(StructField(department_id,IntegerType,true), StructField(department_name,StringType,true))

In [8]:
val categories_schema = StructType(Array(
    StructField("category_id", IntegerType, true),
    StructField("category_department_id", IntegerType, true),
    StructField("category_name", StringType, true)))

categories_schema = StructType(StructField(category_id,IntegerType,true), StructField(category_department_id,IntegerType,true), StructField(category_name,StringType,true))


StructType(StructField(category_id,IntegerType,true), StructField(category_department_id,IntegerType,true), StructField(category_name,StringType,true))

In [9]:
val products_schema = StructType(Array(
    StructField("product_id", IntegerType, true),
    StructField("product_category_id", IntegerType, true),
    StructField("product_name", StringType, true),
    StructField("product_description", StringType, true),
    StructField("product_price", FloatType, true),
    StructField("product_image", StringType, true)))

products_schema = StructType(StructField(product_id,IntegerType,true), StructField(product_category_id,IntegerType,true), StructField(product_name,StringType,true), StructField(product_description,StringType,true), StructField(product_price,FloatType,true), StructField(product_image,StringType,true))


StructType(StructField(product_id,IntegerType,true), StructField(product_category_id,IntegerType,true), StructField(product_name,StringType,true), StructField(product_description,StringType,true), StructField(product_price,FloatType,true), StructField(product_image,StringType,true))

In [10]:
val orders_schema = StructType(Array(
    StructField("order_id", IntegerType, true),
    StructField("order_date", StringType, true),
    StructField("order_customer_id", IntegerType, true),
    StructField("order_status", StringType, true)))

orders_schema = StructType(StructField(order_id,IntegerType,true), StructField(order_date,StringType,true), StructField(order_customer_id,IntegerType,true), StructField(order_status,StringType,true))


StructType(StructField(order_id,IntegerType,true), StructField(order_date,StringType,true), StructField(order_customer_id,IntegerType,true), StructField(order_status,StringType,true))

In [11]:
val order_items_schema = StructType(Array(
    StructField("order_item_id", IntegerType, true),
    StructField("order_item_order_id", IntegerType, true),
    StructField("order_item_product_id", IntegerType, true),
    StructField("order_item_quantity", IntegerType, true),
    StructField("order_item_subtotal", FloatType, true),
    StructField("order_item_product_price", FloatType, true)))

order_items_schema = StructType(StructField(order_item_id,IntegerType,true), StructField(order_item_order_id,IntegerType,true), StructField(order_item_product_id,IntegerType,true), StructField(order_item_quantity,IntegerType,true), StructField(order_item_subtotal,FloatType,true), StructField(order_item_product_price,FloatType,true))


StructType(StructField(order_item_id,IntegerType,true), StructField(order_item_order_id,IntegerType,true), StructField(order_item_product_id,IntegerType,true), StructField(order_item_quantity,IntegerType,true), StructField(order_item_subtotal,FloatType,true), StructField(order_item_product_price,FloatType,true))

In [12]:
val customers_df   = spark.read.format("csv").schema(customers_schema).load(CUSTOMERS_DATA).cache()
val departments_df = spark.read.format("csv").schema(departments_schema).load(DEPARTMENTS_DATA).cache()
val categories_df  = spark.read.format("csv").schema(categories_schema).load(CATEGORIES_DATA).cache()
val products_df    = spark.read.format("csv").schema(products_schema).load(PRODUCTS_DATA).cache()
val orders_df      = spark.read.format("csv").schema(orders_schema).load(ORDERS_DATA).cache()
val order_items_df = spark.read.format("csv").schema(order_items_schema).load(ORDER_ITEMS_DATA).cache()

customers_df = [customer_id: int, customer_fname: string ... 7 more fields]
departments_df = [department_id: int, department_name: string]
categories_df = [category_id: int, category_department_id: int ... 1 more field]
products_df = [product_id: int, product_category_id: int ... 4 more fields]
orders_df = [order_id: int, order_date: string ... 2 more fields]
order_items_df = [order_item_id: int, order_item_order_id: int ... 4 more fields]


[order_item_id: int, order_item_order_id: int ... 4 more fields]

### 3.1. Register all the DataFrames as Temporary Views:

In [13]:
customers_df.createOrReplaceTempView("customers")

In [14]:
customers_df.select(col("customer_id"), $"customer_fname", $"customer_city", col("customer_state")).show(5)

+-----------+--------------+-------------+--------------+
|customer_id|customer_fname|customer_city|customer_state|
+-----------+--------------+-------------+--------------+
|          1|       Richard|  Brownsville|            TX|
|          2|          Mary|    Littleton|            CO|
|          3|           Ann|       Caguas|            PR|
|          4|          Mary|   San Marcos|            CA|
|          5|        Robert|       Caguas|            PR|
+-----------+--------------+-------------+--------------+
only showing top 5 rows



In [15]:
departments_df.createOrReplaceTempView("departments")

In [16]:
departments_df.show(5)

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        Fitness|
|            3|       Footwear|
|            4|        Apparel|
|            5|           Golf|
|            6|       Outdoors|
+-------------+---------------+
only showing top 5 rows



In [17]:
categories_df.createOrReplaceTempView("categories")

In [18]:
categories_df.show(5)

+-----------+----------------------+-------------------+
|category_id|category_department_id|      category_name|
+-----------+----------------------+-------------------+
|          1|                     2|           Football|
|          2|                     2|             Soccer|
|          3|                     2|Baseball & Softball|
|          4|                     2|         Basketball|
|          5|                     2|           Lacrosse|
+-----------+----------------------+-------------------+
only showing top 5 rows



In [19]:
products_df.createOrReplaceTempView("products")

In [20]:
products_df.show(5)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|               null|        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|               null|       129.99|http://images.acm...|
|         3|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         4|                  2|Under Armour Men'...|               null|        89.99|http://images.acm...|
|         5|                  2|Riddell Youth Rev...|               null|       199.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 5 

In [21]:
orders_df.createOrReplaceTempView("orders")

In [22]:
orders_df.show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [23]:
order_items_df.createOrReplaceTempView("order_items")

In [24]:
order_items_df.show(5)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
+-------------+-

## 4. Data Analysis:

### 4.1 Get How many Orders were placed:

**SQL:**

In [25]:
spark.sql("select count(1) from orders").show()

+--------+
|count(1)|
+--------+
|   68883|
+--------+



**DF API:**

In [26]:
orders_df.count()

68883

### 4.2 Get Average Revenue Per Order:

**SQL:**

In [27]:
spark.sql("""
          SELECT sum(oi.order_item_subtotal) / count(distinct oi.order_item_order_id) as avg_rev_per_order
          |FROM orders o JOIN order_items oi 
          |    ON o.order_id = oi.order_item_order_id
        """).show()

+-----------------+
|avg_rev_per_order|
+-----------------+
|597.6322996016944|
+-----------------+



**DF API:**

In [28]:
(orders_df.join(order_items_df,orders_df("order_id") === order_items_df("order_item_order_id"))
    .select("order_item_subtotal","order_item_order_id")
    .agg((sum(col("order_item_subtotal"))/countDistinct(col("order_item_order_id"))).alias("avg_rev_per_order"))
    .show())

+-----------------+
|avg_rev_per_order|
+-----------------+
|597.6322996016944|
+-----------------+



### 4.3 Get Average Revenue Per Day:

**SQL:**

In [29]:
spark.sql(
        """SELECT o.order_date, sum(oi.order_item_subtotal) / count(distinct oi.order_item_order_id) as avg_rev_per_day
          |FROM orders o JOIN order_items oi 
          |    ON o.order_id = oi.order_item_order_id
          |GROUP BY o.order_date 
          |ORDER BY o.order_date
        """).show(false)

+---------------------+-----------------+
|order_date           |avg_rev_per_day  |
+---------------------+-----------------+
|2013-07-25 00:00:00.0|587.5330286848134|
|2013-07-26 00:00:00.0|585.9234878147109|
|2013-07-27 00:00:00.0|577.5676682063512|
|2013-07-28 00:00:00.0|551.4119109020958|
|2013-07-29 00:00:00.0|635.5883909684641|
|2013-07-30 00:00:00.0|564.5363838698838|
|2013-07-31 00:00:00.0|630.9955146643533|
|2013-08-01 00:00:00.0|608.4982189502356|
|2013-08-02 00:00:00.0|587.8871075517388|
|2013-08-03 00:00:00.0|599.1628419048382|
|2013-08-04 00:00:00.0|594.3201416863335|
|2013-08-05 00:00:00.0|592.8305590897799|
|2013-08-06 00:00:00.0|579.68106844792  |
|2013-08-07 00:00:00.0|583.906170096101 |
|2013-08-08 00:00:00.0|588.4743191939134|
|2013-08-09 00:00:00.0|629.4593056380147|
|2013-08-10 00:00:00.0|586.3113241756664|
|2013-08-11 00:00:00.0|551.5472206441007|
|2013-08-12 00:00:00.0|612.4790563343757|
|2013-08-13 00:00:00.0|604.1594044945457|
+---------------------+-----------

**DF API:**

In [30]:
orders_df.join(order_items_df,orders_df("order_id") === order_items_df("order_item_order_id"))

[order_id: int, order_date: string ... 8 more fields]

In [31]:
val avg_rev_per_day = orders_df.join(order_items_df,orders_df("order_id") === order_items_df("order_item_order_id"))
     .select("order_date","order_item_subtotal","order_item_order_id")
     .groupBy("order_date")
     .agg((sum(col("order_item_subtotal"))/countDistinct(col("order_item_order_id"))).alias("avg_rev_per_day"))
     .orderBy("order_date").cache()

avg_rev_per_day = [order_date: string, avg_rev_per_day: double]


[order_date: string, avg_rev_per_day: double]

In [32]:
avg_rev_per_day.show(false)

+---------------------+-----------------+
|order_date           |avg_rev_per_day  |
+---------------------+-----------------+
|2013-07-25 00:00:00.0|587.5330286848134|
|2013-07-26 00:00:00.0|585.9234878147109|
|2013-07-27 00:00:00.0|577.5676682063512|
|2013-07-28 00:00:00.0|551.4119109020958|
|2013-07-29 00:00:00.0|635.5883909684641|
|2013-07-30 00:00:00.0|564.5363838698838|
|2013-07-31 00:00:00.0|630.9955146643533|
|2013-08-01 00:00:00.0|608.4982189502356|
|2013-08-02 00:00:00.0|587.8871075517388|
|2013-08-03 00:00:00.0|599.1628419048382|
|2013-08-04 00:00:00.0|594.3201416863335|
|2013-08-05 00:00:00.0|592.8305590897799|
|2013-08-06 00:00:00.0|579.68106844792  |
|2013-08-07 00:00:00.0|583.906170096101 |
|2013-08-08 00:00:00.0|588.4743191939134|
|2013-08-09 00:00:00.0|629.4593056380147|
|2013-08-10 00:00:00.0|586.3113241756664|
|2013-08-11 00:00:00.0|551.5472206441007|
|2013-08-12 00:00:00.0|612.4790563343757|
|2013-08-13 00:00:00.0|604.1594044945457|
+---------------------+-----------

### 4.3.1 Get Average Revenue Per Month:

**DF API:**

In [33]:
val avg_rev_per_month = avg_rev_per_day
                        .select(month(col("order_date")).alias("month"),col("avg_rev_per_day"))
                        .groupBy("month")
                        .agg(avg(col("avg_rev_per_day")).alias("avg_rev_per_month"))
                        .orderBy("month").cache()                    

avg_rev_per_month = [month: int, avg_rev_per_month: double]


[month: int, avg_rev_per_month: double]

In [34]:
avg_rev_per_month.show(false)

+-----+-----------------+
|month|avg_rev_per_month|
+-----+-----------------+
|1    |595.4252200140596|
|2    |594.3819554505748|
|3    |601.5593062028504|
|4    |594.360451299625 |
|5    |606.5245105647007|
|6    |611.6376611446879|
|7    |593.4468831474544|
|8    |597.588355427047 |
|9    |604.5177239484814|
|10   |590.8111000351574|
|11   |597.1851199455583|
|12   |596.4810251733772|
+-----+-----------------+



### 4.3.2 Get Total Revenue Per Month Per Year:

**SQL:**

In [35]:
spark.sql(
        """SELECT year(o.order_date) as order_year, month(o.order_date) as order_month, sum(oi.order_item_subtotal) tot_revenue 
          |FROM orders o JOIN order_items oi 
          |    ON o.order_id = oi.order_item_order_id
          |GROUP BY order_year, order_month 
          |ORDER BY order_year, order_month
        """).show(false)

+----------+-----------+------------------+
|order_year|order_month|tot_revenue       |
+----------+-----------+------------------+
|2013      |7          |764782.2047252655 |
|2013      |8          |2828658.754573822 |
|2013      |9          |2934527.3265972137|
|2013      |10         |2624600.6605644226|
|2013      |11         |3168656.0921707153|
|2013      |12         |2932964.327445984 |
|2014      |1          |2924447.0670757294|
|2014      |2          |2778663.7149181366|
|2014      |3          |2862492.265932083 |
|2014      |4          |2807789.8547916412|
|2014      |5          |2753078.2738227844|
|2014      |6          |2703463.491306305 |
|2014      |7          |2238496.5645008087|
+----------+-----------+------------------+



**DF API:**

In [36]:
val tot_rev_per_month_per_year = orders_df.join(order_items_df,orders_df("order_id") === order_items_df("order_item_order_id"))
                                .select(year(col("order_date")).alias("order_year"),
                                        month(col("order_date")).alias("order_month"),
                                        col("order_item_subtotal"))
                                .groupBy("order_year", "order_month")
                                .agg(sum(col("order_item_subtotal")).alias("tot_revenue"))
                                .orderBy("order_year", "order_month")
                                .cache()

tot_rev_per_month_per_year = [order_year: int, order_month: int ... 1 more field]


[order_year: int, order_month: int ... 1 more field]

In [37]:
tot_rev_per_month_per_year.show()

+----------+-----------+------------------+
|order_year|order_month|       tot_revenue|
+----------+-----------+------------------+
|      2013|          7| 764782.2047252655|
|      2013|          8| 2828658.754573822|
|      2013|          9|2934527.3265972137|
|      2013|         10|2624600.6605644226|
|      2013|         11|3168656.0921707153|
|      2013|         12| 2932964.327445984|
|      2014|          1|2924447.0670757294|
|      2014|          2|2778663.7149181366|
|      2014|          3| 2862492.265932083|
|      2014|          4|2807789.8547916412|
|      2014|          5|2753078.2738227844|
|      2014|          6| 2703463.491306305|
|      2014|          7|2238496.5645008087|
+----------+-----------+------------------+



### 4.4 Top Performing Departments:

**SQL:**

In [38]:
spark.sql(
        """SELECT d.department_name, year(o.order_date) as order_year, sum(oi.order_item_subtotal) as tot_revenue
          |FROM orders o 
          |    INNER JOIN order_items oi 
          |        ON o.order_id = oi.order_item_order_id
          |    INNER JOIN products p
          |        ON oi.order_item_product_id = p.product_id
          |    INNER JOIN categories c
          |        ON c.category_id = p.product_category_id
          |    INNER JOIN departments d
          |        ON c.category_department_id = d.department_id
          |WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'
          |GROUP BY d.department_name, order_year
          |ORDER BY d.department_name, order_year
        """).show(false)

+---------------+----------+------------------+
|department_name|order_year|tot_revenue       |
+---------------+----------+------------------+
|Apparel        |2013      |3090985.6535224915|
|Apparel        |2014      |3917585.841217041 |
|Fan Shop       |2013      |7290831.879999161 |
|Fan Shop       |2014      |9095735.77280426  |
|Fitness        |2013      |119526.58082199097|
|Fitness        |2014      |150509.1409931183 |
|Footwear       |2013      |1711492.5186824799|
|Footwear       |2014      |2122339.649032593 |
|Golf           |2013      |1967396.959728241 |
|Golf           |2014      |2440585.2815055847|
|Outdoors       |2013      |420317.9507675171 |
|Outdoors       |2014      |532437.6709976196 |
+---------------+----------+------------------+



**DF API:**

In [39]:
val df = orders_df
      .filter((col("order_status") =!= "CANCELED") && (col("order_status") =!= "SUSPECTED_FRAUD"))
      .join(order_items_df,orders_df("order_id") === order_items_df("order_item_order_id"))
      .join(products_df,order_items_df("order_item_product_id") === products_df("product_id"))
      .join(categories_df,products_df("product_category_id") === categories_df("category_id"))
      .join(departments_df,categories_df("category_department_id") === departments_df("department_id"))
      .select(col("department_name"),year(col("order_date")).alias("order_year"),col("order_item_subtotal"))
      .groupBy("department_name","order_year")
      .agg(sum("order_item_subtotal").alias("tot_revenue"))
      .orderBy("department_name","order_year").cache()

df = [department_name: string, order_year: int ... 1 more field]


[department_name: string, order_year: int ... 1 more field]

In [40]:
df.show()

+---------------+----------+------------------+
|department_name|order_year|       tot_revenue|
+---------------+----------+------------------+
|        Apparel|      2013|3090985.6535224915|
|        Apparel|      2014| 3917585.841217041|
|       Fan Shop|      2013| 7290831.879999161|
|       Fan Shop|      2014|  9095735.77280426|
|        Fitness|      2013|119526.58082199097|
|        Fitness|      2014| 150509.1409931183|
|       Footwear|      2013|1711492.5186824799|
|       Footwear|      2014| 2122339.649032593|
|           Golf|      2013| 1967396.959728241|
|           Golf|      2014|2440585.2815055847|
|       Outdoors|      2013| 420317.9507675171|
|       Outdoors|      2014| 532437.6709976196|
+---------------+----------+------------------+



### 4.5 Get Highest Priced Product:

**SQL:**

In [41]:
spark.sql(
        """SELECT p.* 
          |FROM products p
          |WHERE p.product_price = (SELECT max(q.product_price) FROM products q)
        """).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+



**SQL Using rank() Window Function:**

In [42]:
spark.sql(
        """SELECT * 
          |FROM ( 
          |    SELECT *, 
          |         rank() OVER (ORDER BY product_price DESC) as rank
          |    FROM products) tmp
          |WHERE rank <= 1
        """).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|rank|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|   1|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+



**DF API:**

In [43]:
products_df.filter(col("product_price") === products_df.agg(max("product_price")).head().getFloat(0)).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+



**DF API Using Window Function:**

In [44]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number

val windowSpec = Window.orderBy($"product_price".desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@45002821


org.apache.spark.sql.expressions.WindowSpec@45002821

In [45]:
products_df.withColumn("rank", row_number().over(windowSpec))
.filter(col("rank") <= 1).show()

+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|product_id|product_category_id|       product_name|product_description|product_price|       product_image|rank|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+
|       208|                 10|SOLE E35 Elliptical|               null|      1999.99|http://images.acm...|   1|
+----------+-------------------+-------------------+-------------------+-------------+--------------------+----+



### 4.6 Get Highest Revenue Earning Products:

**SQL:**

In [46]:
spark.sql(
        """SELECT p.*, r.product_revenue
          |FROM products p, (SELECT oi.order_item_product_id, sum(CAST(oi.order_item_subtotal as float)) as product_revenue
          |                    FROM order_items oi 
          |                    GROUP BY order_item_product_id 
          |                    ORDER BY product_revenue DESC 
          |                    LIMIT 1) r
          |WHERE product_id = r.order_item_product_id
        """).show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+-----------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|  product_revenue|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+-----------------+
|      1004|                 45|Field & Stream Sp...|               null|       399.98|http://images.acm...|6929653.690338135|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+-----------------+



**SQL:**

In [47]:
// Top 10 revenue generating products (another way of doing similar thing as above)
spark.sql(
        """SELECT p.product_id, p.product_category_id, p.product_name, r.product_revenue
          |FROM products p INNER JOIN
          |                    (SELECT oi.order_item_product_id, round(sum(CAST(oi.order_item_subtotal as float)), 2) as product_revenue
          |                     FROM order_items oi INNER JOIN orders o 
          |                         ON oi.order_item_order_id = o.order_id
          |                     WHERE o.order_status <> 'CANCELED'
          |                     AND o.order_status <> 'SUSPECTED_FRAUD'
          |                     GROUP BY oi.order_item_product_id) r
          |ON p.product_id = r.order_item_product_id
          |ORDER BY r.product_revenue DESC
          |LIMIT 10
        """).show(truncate=false)

+----------+-------------------+---------------------------------------------+---------------+
|product_id|product_category_id|product_name                                 |product_revenue|
+----------+-------------------+---------------------------------------------+---------------+
|1004      |45                 |Field & Stream Sportsman 16 Gun Fire Safe    |6637668.28     |
|365       |17                 |Perfect Fitness Perfect Rip Deck             |4233794.37     |
|957       |43                 |Diamondback Women's Serene Classic Comfort Bi|3946837.0      |
|191       |9                  |Nike Men's Free 5.0+ Running Shoe            |3507549.21     |
|502       |24                 |Nike Men's Dri-FIT Victory Golf Polo         |3011600.0      |
|1073      |48                 |Pelican Sunstream 100 Kayak                  |2967851.68     |
|1014      |46                 |O'Brien Men's Neoprene Life Vest             |2765543.31     |
|403       |18                 |Nike Men's CJ Elit

**DF API:**

In [48]:
/*
# 1. Get the sum of revenue of all the products grouped by order_item_product_id from order_items table
# 2. Sort the result in descending order of their revenues
# 3. Take only the first one from the sorted order using the limit() function
# 4. Join with the prorcuts column to get the product details
*/
order_items_df.select("order_item_product_id","order_item_subtotal")
     .groupBy("order_item_product_id")
     .agg(sum("order_item_subtotal").alias("product_revenue"))
     .orderBy(desc("product_revenue"))
     .limit(1)
     .join(products_df, order_items_df("order_item_product_id") === products_df("product_id"))
     .select("product_id", "product_category_id", "product_name", "product_revenue")
     .show()

+----------+-------------------+--------------------+-----------------+
|product_id|product_category_id|        product_name|  product_revenue|
+----------+-------------------+--------------------+-----------------+
|      1004|                 45|Field & Stream Sp...|6929653.690338135|
+----------+-------------------+--------------------+-----------------+



### 4.7 Top 5 Highest Revenue Earning Products Per Month Per Year:

**SQL:**

In [49]:
// In order to use an udf with sql it needs to be registerd to sqlContext

sqlContext.udf.register("udfmonTomonth", ((mon: Int) => { 
    val monthmap = Map(1 -> "Jan",2 -> "Feb", 3 -> "Mar",4 -> "Apr",5 -> "May",6 -> "Jun", 7 -> "Jul",8 -> "Aug",9 -> "Sep",10 -> "Oct",11 -> "Nov",12 -> "Dec")
        monthmap.get(mon)
    }))

UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))

In [50]:
val df = spark.sql(
        """SELECT q.* 
          |FROM (
          |     SELECT r.*, DENSE_RANK() OVER (PARTITION by order_year, order_month ORDER BY product_revenue DESC) as dense_rank
          |     FROM (
          |          SELECT YEAR(o.order_date) as order_year, udfmonTomonth(MONTH(o.order_date)) as order_month, p.product_name, ROUND(SUM(CAST(oi.order_item_subtotal as float)), 2) as product_revenue
          |          FROM order_items oi 
          |              INNER JOIN orders o 
          |                  ON oi.order_item_order_id = o.order_id
          |              INNER JOIN products p
          |                  ON oi.order_item_product_id = p.product_id
          |              WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'
          |              GROUP BY order_year, order_month, p.product_name ) r ) q
          |WHERE q.dense_rank <= 5
          |ORDER BY q.order_year, q.order_month, q.dense_rank
        """).show()

+----------+-----------+--------------------+---------------+----------+
|order_year|order_month|        product_name|product_revenue|dense_rank|
+----------+-----------+--------------------+---------------+----------+
|      2013|        Aug|Field & Stream Sp...|      540772.97|         1|
|      2013|        Aug|Perfect Fitness P...|      349861.69|         2|
|      2013|        Aug|Diamondback Women...|      319778.69|         3|
|      2013|        Aug|Nike Men's Free 5...|      279172.08|         4|
|      2013|        Aug|Nike Men's Dri-FI...|       247700.0|         5|
|      2013|        Dec|Field & Stream Sp...|      595570.24|         1|
|      2013|        Dec|Perfect Fitness P...|      342842.86|         2|
|      2013|        Dec|Diamondback Women...|      336277.59|         3|
|      2013|        Dec|Nike Men's Free 5...|      298370.16|         4|
|      2013|        Dec|Pelican Sunstream...|      249987.51|         5|
|      2013|        Jul|Field & Stream Sp...|      

df: Unit = ()


**DF API:**

In [51]:
def udfmonTomonth = udf((mon: Int) => { 
    val monthmap = Map(1 -> "Jan",2 -> "Feb", 3 -> "Mar",4 -> "Apr",5 -> "May",6 -> "Jun", 7 -> "Jul",8 -> "Aug",9 -> "Sep",10 -> "Oct",11 -> "Nov",12 -> "Dec")
    monthmap.get(mon)
})

udfmonTomonth: org.apache.spark.sql.expressions.UserDefinedFunction


In [52]:
val rev_per_month_per_year_per_product = orders_df
                    .select(year($"order_date").alias("order_year"), udfmonTomonth(month($"order_date")).alias("order_month"), $"order_id", $"order_status")
                    .filter((col("order_status") =!= "CANCELED") && (col("order_status") =!= "SUSPECTED_FRAUD"))
                    .join(order_items_df, orders_df("order_id") === order_items_df("order_item_order_id"))
                    .join(products_df, order_items_df("order_item_product_id") === products_df("product_id"))
                    .select("order_year","order_month","product_name", "order_item_subtotal")
                    .groupBy("order_year","order_month","product_name")
                    .agg(round(sum("order_item_subtotal"),2).alias("product_revenue"))
                    .cache()

rev_per_month_per_year_per_product = [order_year: int, order_month: string ... 2 more fields]


[order_year: int, order_month: string ... 2 more fields]

In [53]:
rev_per_month_per_year_per_product.show(5,false)

+----------+-----------+---------------------------------------------+---------------+
|order_year|order_month|product_name                                 |product_revenue|
+----------+-----------+---------------------------------------------+---------------+
|2013      |Nov        |Under Armour Women's Micro G Skulpt Running S|3792.93        |
|2013      |Oct        |Polar Loop Activity Tracker                  |329.85         |
|2014      |Jan        |Bushnell Pro X7 Jolt Slope Rangefinder       |599.99         |
|2013      |Aug        |Hirzl Men's Hybrid Golf Glove                |1064.29        |
|2014      |Feb        |Titleist Small Wheeled Travel Cover          |249.99         |
+----------+-----------+---------------------------------------------+---------------+
only showing top 5 rows



In [54]:
val windowSpec = Window.partitionBy(rev_per_month_per_year_per_product("order_year"),rev_per_month_per_year_per_product("order_month"))
.orderBy(rev_per_month_per_year_per_product("product_revenue").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@22cdda9


org.apache.spark.sql.expressions.WindowSpec@22cdda9

In [55]:
val top_prod_per_month_per_year_by_rev = rev_per_month_per_year_per_product
                           .withColumn("dense_rank", dense_rank().over(windowSpec))
                           .filter(col("dense_rank") <= 5)
                           .orderBy("order_year", "order_month", "dense_rank")
                           .cache()

top_prod_per_month_per_year_by_rev = [order_year: int, order_month: string ... 3 more fields]


[order_year: int, order_month: string ... 3 more fields]

In [56]:
top_prod_per_month_per_year_by_rev.show(false)

+----------+-----------+---------------------------------------------+---------------+----------+
|order_year|order_month|product_name                                 |product_revenue|dense_rank|
+----------+-----------+---------------------------------------------+---------------+----------+
|2013      |Aug        |Field & Stream Sportsman 16 Gun Fire Safe    |540772.97      |1         |
|2013      |Aug        |Perfect Fitness Perfect Rip Deck             |349861.69      |2         |
|2013      |Aug        |Diamondback Women's Serene Classic Comfort Bi|319778.69      |3         |
|2013      |Aug        |Nike Men's Free 5.0+ Running Shoe            |279172.08      |4         |
|2013      |Aug        |Nike Men's Dri-FIT Victory Golf Polo         |247700.0       |5         |
|2013      |Dec        |Field & Stream Sportsman 16 Gun Fire Safe    |595570.24      |1         |
|2013      |Dec        |Perfect Fitness Perfect Rip Deck             |342842.86      |2         |
|2013      |Dec     

### 4.8 Get the most popular Categories:

**SQL:**

In [57]:
spark.sql(
        """SELECT c.category_name, count(order_item_quantity) as order_count 
          |FROM order_items oi 
          |INNER JOIN products p on oi.order_item_product_id = p.product_id 
          |INNER JOIN categories c on c.category_id = p.product_category_id 
          |GROUP BY c.category_name 
          |ORDER BY order_count DESC 
          |LIMIT 10 
        """).show()

+--------------------+-----------+
|       category_name|order_count|
+--------------------+-----------+
|              Cleats|      24551|
|      Men's Footwear|      22246|
|     Women's Apparel|      21035|
|Indoor/Outdoor Games|      19298|
|             Fishing|      17325|
|        Water Sports|      15540|
|    Camping & Hiking|      13729|
|    Cardio Equipment|      12487|
|       Shop By Sport|      10984|
|         Electronics|       3156|
+--------------------+-----------+



**DF API:**

In [58]:
val pop_cat = order_items_df
 .join(products_df, order_items_df("order_item_product_id") === products_df("product_id"))
 .join(categories_df, categories_df("category_id") === products_df("product_category_id"))
 .groupBy("category_name")
 .agg(sum("order_item_quantity").alias("order_count"))
 .orderBy(desc("order_count"))
 .limit(10).cache()

pop_cat = [category_name: string, order_count: bigint]


[category_name: string, order_count: bigint]

In [59]:
pop_cat.show()

+--------------------+-----------+
|       category_name|order_count|
+--------------------+-----------+
|              Cleats|      73734|
|     Women's Apparel|      62956|
|Indoor/Outdoor Games|      57803|
|    Cardio Equipment|      37587|
|       Shop By Sport|      32726|
|      Men's Footwear|      22246|
|             Fishing|      17325|
|        Water Sports|      15540|
|    Camping & Hiking|      13729|
|         Electronics|       9436|
+--------------------+-----------+



### 4.9 Get the revenue for each Category Per Year Per Quarter:

This use case is inspired from https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

**Reshaping Data with Pivot in Apache Spark:**

`pivot` is applied after a `groupBy` operation. It pivots on a `pivotColumn` column, i.e. adds new columns per distinct values in `pivotColumn` and the values from the `groupBy` operation becomes the values in the correponding new distinct valued columns.

**SQL + DF API**  
There is no equivalent pivot function in SQL.

In [60]:
val rev_cat_df = spark.sql(
            """SELECT c.category_name, year(o.order_date) as order_year, concat('Q', quarter(o.order_date)) as order_quarter, order_item_subtotal
              |FROM orders o 
              |INNER JOIN order_items oi on order_item_order_id = o.order_id
              |INNER JOIN products p on oi.order_item_product_id = p.product_id 
              |INNER JOIN categories c on p.product_category_id = c.category_id
              |WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'""")
            .groupBy("category_name","order_year")
            .pivot("order_quarter",Seq("Q1", "Q2", "Q3", "Q4"))  
// specifying the unique values (if we know) for pivot column makes execution faster. Above could be just written as .pivot("order_quarter")
            .agg(round(sum("order_item_subtotal"), 2))
            .withColumn("total_sales", round(coalesce(col("Q1"), lit(0)) + coalesce(col("Q2"), lit(0)) + coalesce(col("Q3"), lit(0)) + coalesce(col("Q4"), lit(0)), 2))
            .orderBy(desc("total_sales")) 
            .cache()

rev_cat_df.show()

+--------------------+----------+----------+----------+----------+----------+-----------+
|       category_name|order_year|        Q1|        Q2|        Q3|        Q4|total_sales|
+--------------------+----------+----------+----------+----------+----------+-----------+
|             Fishing|      2014|1673916.35| 1594720.3| 415179.25|      null|  3683815.9|
|             Fishing|      2013|      null|      null|1248337.61|1705514.77| 2953852.38|
|              Cleats|      2014|1080480.24|1027808.94| 270734.96|      null| 2379024.14|
|    Camping & Hiking|      2014| 981834.58| 935637.65| 273281.79|      null| 2190754.02|
|    Cardio Equipment|      2014| 855664.98| 873333.24| 223277.91|      null| 1952276.13|
|              Cleats|      2013|      null|      null| 806026.14|1059543.73| 1865569.87|
|    Camping & Hiking|      2013|      null|      null| 745750.31|1010332.68| 1756082.99|
|     Women's Apparel|      2014|  756700.0|  728450.0|  196000.0|      null|  1681150.0|
|        W

rev_cat_df = [category_name: string, order_year: int ... 5 more fields]


[category_name: string, order_year: int ... 5 more fields]

**Revalidate above calculation with a specific category:**

In [61]:
spark.sql(
        """SELECT c.category_name, YEAR(o.order_date) as order_year, QUARTER(o.order_date) as order_quarter, ROUND(SUM(oi.order_item_subtotal), 2) as order_total 
          |FROM orders o 
          |INNER JOIN order_items oi on order_item_order_id = o.order_id
          |INNER JOIN products p on oi.order_item_product_id = p.product_id 
          |INNER JOIN categories c on p.product_category_id = c.category_id
          |WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'
          |AND c.category_name = 'Fishing'
          |GROUP BY c.category_name, order_year, order_quarter
          |ORDER BY order_year, order_quarter, order_total DESC
        """).show()

+-------------+----------+-------------+-----------+
|category_name|order_year|order_quarter|order_total|
+-------------+----------+-------------+-----------+
|      Fishing|      2013|            4| 1705514.77|
|      Fishing|      2014|            1| 1673916.35|
|      Fishing|      2014|            2|  1594720.3|
|      Fishing|      2013|            3| 1248337.61|
|      Fishing|      2014|            3|  415179.25|
+-------------+----------+-------------+-----------+



### 4.10 Get Number of Orders By Status:

**SQL:**

In [62]:
spark.sql(
        """SELECT order_status, count(1) as total
          |FROM orders o
          |GROUP BY o.order_status
        """).show()

+---------------+-----+
|   order_status|total|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



**DF API:**

In [63]:
val df = orders_df.groupBy("order_status").count().withColumnRenamed("count", "total").cache()

df = [order_status: string, total: bigint]


[order_status: string, total: bigint]

In [64]:
df.show()

+---------------+-----+
|   order_status|total|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



### 4.11 Get Number of Orders By Order Date and Order Status:

**SQL:**

In [65]:
spark.sql(
        """SELECT order_date, order_status, count(1) as total
          |FROM orders o
          |GROUP BY order_date, o.order_status
        """).show(false)

+---------------------+---------------+-----+
|order_date           |order_status   |total|
+---------------------+---------------+-----+
|2013-08-16 00:00:00.0|COMPLETE       |43   |
|2013-08-30 00:00:00.0|CLOSED         |17   |
|2013-09-10 00:00:00.0|COMPLETE       |80   |
|2013-10-05 00:00:00.0|SUSPECTED_FRAUD|4    |
|2013-12-02 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-09 00:00:00.0|ON_HOLD        |9    |
|2013-12-20 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-23 00:00:00.0|PAYMENT_REVIEW |2    |
|2014-01-02 00:00:00.0|CLOSED         |15   |
|2014-02-11 00:00:00.0|CANCELED       |3    |
|2014-02-14 00:00:00.0|ON_HOLD        |11   |
|2014-02-21 00:00:00.0|PROCESSING     |25   |
|2014-05-13 00:00:00.0|SUSPECTED_FRAUD|3    |
|2014-06-27 00:00:00.0|PENDING        |26   |
|2014-07-16 00:00:00.0|ON_HOLD        |3    |
|2013-08-16 00:00:00.0|PENDING_PAYMENT|30   |
|2013-08-29 00:00:00.0|PROCESSING     |31   |
|2013-09-10 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-09-25 00:00:00.0|CLOSED     

**DF API:**

In [66]:
orders_df
 .groupBy("order_date", "order_status")
 .count().withColumnRenamed("count", "total")
 .show(false)

+---------------------+---------------+-----+
|order_date           |order_status   |total|
+---------------------+---------------+-----+
|2013-08-16 00:00:00.0|COMPLETE       |43   |
|2013-08-30 00:00:00.0|CLOSED         |17   |
|2013-09-10 00:00:00.0|COMPLETE       |80   |
|2013-10-05 00:00:00.0|SUSPECTED_FRAUD|4    |
|2013-12-02 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-09 00:00:00.0|ON_HOLD        |9    |
|2013-12-20 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-12-23 00:00:00.0|PAYMENT_REVIEW |2    |
|2014-01-02 00:00:00.0|CLOSED         |15   |
|2014-02-11 00:00:00.0|CANCELED       |3    |
|2014-02-14 00:00:00.0|ON_HOLD        |11   |
|2014-02-21 00:00:00.0|PROCESSING     |25   |
|2014-05-13 00:00:00.0|SUSPECTED_FRAUD|3    |
|2014-06-27 00:00:00.0|PENDING        |26   |
|2014-07-16 00:00:00.0|ON_HOLD        |3    |
|2013-08-16 00:00:00.0|PENDING_PAYMENT|30   |
|2013-08-29 00:00:00.0|PROCESSING     |31   |
|2013-09-10 00:00:00.0|SUSPECTED_FRAUD|3    |
|2013-09-25 00:00:00.0|CLOSED     

### 4.12 Get all CANCELED orders with amount greater than \$1000:

**SQL:**

In [67]:
spark.sql(
    """SELECT q.* 
      |FROM (SELECT o.order_id, o.order_date, o.order_customer_id, o.order_status, sum(oi.order_item_subtotal) as order_total 
      |      FROM orders o INNER JOIN order_items oi 
      |          ON o.order_id = oi.order_item_order_id 
      |      WHERE o.order_status = 'CANCELED' 
      |      GROUP BY o.order_id, o.order_date, o.order_customer_id, o.order_status) q 
      |WHERE q.order_total >= 1000 
      |ORDER BY q.order_id
    """).show(false)

+--------+---------------------+-----------------+------------+------------------+
|order_id|order_date           |order_customer_id|order_status|order_total       |
+--------+---------------------+-----------------+------------+------------------+
|753     |2013-07-29 00:00:00.0|5094             |CANCELED    |1129.75           |
|2012    |2013-08-04 00:00:00.0|5165             |CANCELED    |1499.8600311279297|
|2144    |2013-08-05 00:00:00.0|7932             |CANCELED    |1099.900032043457 |
|2189    |2013-08-06 00:00:00.0|6829             |CANCELED    |1029.9400253295898|
|2271    |2013-08-06 00:00:00.0|7603             |CANCELED    |1229.9300231933594|
|2754    |2013-08-09 00:00:00.0|8946             |CANCELED    |1109.9500274658203|
|3551    |2013-08-14 00:00:00.0|5363             |CANCELED    |1299.8700408935547|
|4354    |2013-08-20 00:00:00.0|7268             |CANCELED    |1047.9000244140625|
|4801    |2013-08-23 00:00:00.0|11630            |CANCELED    |1016.9500217437744|
|533

**DF API:**

In [68]:
val cancelled_orders = orders_df
                 .filter(col("order_status") === "CANCELED")
                 .join(order_items_df, orders_df("order_id") === order_items_df("order_item_order_id"))
                 .groupBy("order_id", "order_date", "order_customer_id", "order_status")
                 .agg(sum("order_item_subtotal").alias("order_total"))
                 .filter(col("order_total") >= 1000)
                 .orderBy("order_id").cache()

cancelled_orders = [order_id: int, order_date: string ... 3 more fields]


[order_id: int, order_date: string ... 3 more fields]

In [69]:
cancelled_orders.show(false)

+--------+---------------------+-----------------+------------+------------------+
|order_id|order_date           |order_customer_id|order_status|order_total       |
+--------+---------------------+-----------------+------------+------------------+
|753     |2013-07-29 00:00:00.0|5094             |CANCELED    |1129.75           |
|2012    |2013-08-04 00:00:00.0|5165             |CANCELED    |1499.8600311279297|
|2144    |2013-08-05 00:00:00.0|7932             |CANCELED    |1099.900032043457 |
|2189    |2013-08-06 00:00:00.0|6829             |CANCELED    |1029.9400253295898|
|2271    |2013-08-06 00:00:00.0|7603             |CANCELED    |1229.9300231933594|
|2754    |2013-08-09 00:00:00.0|8946             |CANCELED    |1109.9500274658203|
|3551    |2013-08-14 00:00:00.0|5363             |CANCELED    |1299.8700408935547|
|4354    |2013-08-20 00:00:00.0|7268             |CANCELED    |1047.9000244140625|
|4801    |2013-08-23 00:00:00.0|11630            |CANCELED    |1016.9500217437744|
|533

### 4.13 Sort Products by Category and Price:

**SQL:**

In [70]:
spark.sql(
        """SELECT p.product_id, p.product_category_id, p.product_name, p.product_price
          |FROM products p
          |ORDER BY p.product_category_id ASC, p.product_price DESC
        """).show(false)

+----------+-------------------+---------------------------------------------+-------------+
|product_id|product_category_id|product_name                                 |product_price|
+----------+-------------------+---------------------------------------------+-------------+
|16        |2                  |Riddell Youth 360 Custom Football Helmet     |299.99       |
|11        |2                  |Fitness Gear 300 lb Olympic Weight Set       |209.99       |
|5         |2                  |Riddell Youth Revolution Speed Custom Footbal|199.99       |
|14        |2                  |Quik Shade Summit SX170 10 FT. x 10 FT. Canop|199.99       |
|12        |2                  |Under Armour Men's Highlight MC Alter Ego Fla|139.99       |
|23        |2                  |Under Armour Men's Highlight MC Alter Ego Hul|139.99       |
|6         |2                  |Jordan Men's VI Retro TD Football Cleat      |134.99       |
|20        |2                  |Under Armour Men's Highlight MC Footba

**DF API:**

In [71]:
products_df
 .select("product_id", "product_category_id", "product_name", "product_price")
 .orderBy(asc("product_category_id"), desc("product_price"))
 .show(false)

+----------+-------------------+---------------------------------------------+-------------+
|product_id|product_category_id|product_name                                 |product_price|
+----------+-------------------+---------------------------------------------+-------------+
|16        |2                  |Riddell Youth 360 Custom Football Helmet     |299.99       |
|11        |2                  |Fitness Gear 300 lb Olympic Weight Set       |209.99       |
|5         |2                  |Riddell Youth Revolution Speed Custom Footbal|199.99       |
|14        |2                  |Quik Shade Summit SX170 10 FT. x 10 FT. Canop|199.99       |
|12        |2                  |Under Armour Men's Highlight MC Alter Ego Fla|139.99       |
|23        |2                  |Under Armour Men's Highlight MC Alter Ego Hul|139.99       |
|6         |2                  |Jordan Men's VI Retro TD Football Cleat      |134.99       |
|20        |2                  |Under Armour Men's Highlight MC Footba

### 4.14 Sort Products by Price within Each Category:

**SQL:**

In [72]:
spark.sql(
        """SELECT p.product_id, p.product_category_id, p.product_name, p.product_price
          |FROM products p 
          |DISTRIBUTE BY p.product_category_id 
          |SORT BY p.product_price DESC
        """).sample(false, fraction=0.1, seed=23).show(false)

+----------+-------------------+----------------------------------------------+-------------+
|product_id|product_category_id|product_name                                  |product_price|
+----------+-------------------+----------------------------------------------+-------------+
|681       |31                 |Boccieri Golf EL C2-M Counterbalance Putter   |119.99       |
|1192      |53                 |Nike Men's Kobe IX Elite Low Basketball Shoe  |199.99       |
|747       |34                 |Ogio City Spiked Golf Shoes                   |149.99       |
|758       |34                 |TRUE linkswear Vegas Golf Shoes               |99.99        |
|595       |27                 |TYR Girls' Phoenix Maxfit Back Swimsuit       |75.99        |
|549       |26                 |Lotto Men's Zhero Gravity V 700 TF Soccer Cle |59.99        |
|551       |26                 |Lotto Men's Zhero Gravity V 700 TF Soccer Cle |59.99        |
|572       |26                 |TYR Boys' Team Digi Jammer  

**DF API:**

In [73]:
products_df
 .repartition($"product_category_id")
 .sortWithinPartitions(desc("product_price"))
 .sample(false, fraction=0.1, seed=23)
 .select("product_id", "product_category_id", "product_name", "product_price")
 .show(false)

+----------+-------------------+----------------------------------------------+-------------+
|product_id|product_category_id|product_name                                  |product_price|
+----------+-------------------+----------------------------------------------+-------------+
|681       |31                 |Boccieri Golf EL C2-M Counterbalance Putter   |119.99       |
|1192      |53                 |Nike Men's Kobe IX Elite Low Basketball Shoe  |199.99       |
|747       |34                 |Ogio City Spiked Golf Shoes                   |149.99       |
|758       |34                 |TRUE linkswear Vegas Golf Shoes               |99.99        |
|595       |27                 |TYR Girls' Phoenix Maxfit Back Swimsuit       |75.99        |
|549       |26                 |Lotto Men's Zhero Gravity V 700 TF Soccer Cle |59.99        |
|551       |26                 |Lotto Men's Zhero Gravity V 700 TF Soccer Cle |59.99        |
|572       |26                 |TYR Boys' Team Digi Jammer  

### 4.15 Get the topmost 5 products overall sorted by Price Highest to Lowest:
**sortN: get top 5 products by price overall; globalSorting**

**SQL:**

In [74]:
spark.sql(
        """SELECT product_id, product_category_id, product_name, product_price
          |FROM products
          |ORDER BY product_price DESC
          |LIMIT 5
        """).show(false)

+----------+-------------------+------------------------------------------------+-------------+
|product_id|product_category_id|product_name                                    |product_price|
+----------+-------------------+------------------------------------------------+-------------+
|208       |10                 |SOLE E35 Elliptical                             |1999.99      |
|66        |4                  |SOLE F85 Treadmill                              |1799.99      |
|199       |10                 |SOLE F85 Treadmill                              |1799.99      |
|496       |22                 |SOLE F85 Treadmill                              |1799.99      |
|1048      |47                 |"Spalding Beast 60"" Glass Portable Basketball "|1099.99      |
+----------+-------------------+------------------------------------------------+-------------+



**DF API:**

In [75]:
products_df
    .select("product_id", "product_category_id", "product_name", "product_price")
     .orderBy(desc("product_price"))
     .limit(5)
     .show(false)

+----------+-------------------+------------------------------------------------+-------------+
|product_id|product_category_id|product_name                                    |product_price|
+----------+-------------------+------------------------------------------------+-------------+
|208       |10                 |SOLE E35 Elliptical                             |1999.99      |
|66        |4                  |SOLE F85 Treadmill                              |1799.99      |
|199       |10                 |SOLE F85 Treadmill                              |1799.99      |
|496       |22                 |SOLE F85 Treadmill                              |1799.99      |
|1048      |47                 |"Spalding Beast 60"" Glass Portable Basketball "|1099.99      |
+----------+-------------------+------------------------------------------------+-------------+



### 4.16 Get the topmost 5 products in each category where the products are sorted by Price Highest to Lowest:
**sort: sortingByKey, sort() by price per category**

**SQL:**

In [76]:
spark.sql(
        """SELECT product_category_id, product_id, product_name, product_price, row_num
          |FROM ( 
          |     SELECT q.*, row_number() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price desc) as row_num 
          |     FROM products q)
          |WHERE row_num <= 5 
          |ORDER BY product_category_id, row_num
          |      
        """).show(false)

+-------------------+----------+------------------------------------------------+-------------+-------+
|product_category_id|product_id|product_name                                    |product_price|row_num|
+-------------------+----------+------------------------------------------------+-------------+-------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1      |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2      |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3      |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |4      |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5      |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |1      |
|3                  |32        |PUMA Men's evoPOWER 1 Tricks FG 

**DF API:**

In [77]:
val windowSpec = Window.partitionBy(products_df("product_category_id")).orderBy(products_df("product_price").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@1c0e655c


org.apache.spark.sql.expressions.WindowSpec@1c0e655c

In [78]:
products_df.withColumn("row_num", row_number().over(windowSpec))
    .select("product_category_id", "product_id", "product_name", "product_price","row_num")
    .filter(col("row_num") <= 5)
    .orderBy("product_category_id", "row_num")
    .show(false)

+-------------------+----------+------------------------------------------------+-------------+-------+
|product_category_id|product_id|product_name                                    |product_price|row_num|
+-------------------+----------+------------------------------------------------+-------------+-------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1      |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2      |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3      |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |4      |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5      |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |1      |
|3                  |32        |PUMA Men's evoPOWER 1 Tricks FG 

### RANK and DENSE_RANK

**RANK** gives us the ranking within our ordered partition. Ties are assigned the same rank, with the next ranking(s) skipped. So, if we have 3 items at rank 2, the next rank listed would be ranked 5.

**DENSE_RANK** again gives us the ranking within our ordered partition, but the ranks are consecutive. No ranks are skipped if there are ranks with multiple items.

The Following 3 examples plays with the rank(), dense_rank() and row_number() functions.

### 4.17 Get topN products by price in each category:
**topN: For each product category get the top 5 records. i.e. top 5 ranked products in each category (some of the products may have same price so the top 5 products will all be distinct products but their prices may not be distinct 5. So, the number of distinct prices <= 5 in the top5 but the count distinct products may be >= 5. top 5 ranked products does not necessary mean there will be exactly 5 products may be less or more too.**

**SQL:**

In [79]:
spark.sql(
        """SELECT product_category_id, product_id, product_name, product_price, rank
          |FROM ( 
          |     SELECT q.*, rank() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price DESC) as rank 
          |     FROM products q)
          |WHERE rank <= 5 
          |ORDER BY product_category_id, rank
          |      
        """).show(false)

+-------------------+----------+------------------------------------------------+-------------+----+
|product_category_id|product_id|product_name                                    |product_price|rank|
+-------------------+----------+------------------------------------------------+-------------+----+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1   |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2   |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3   |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3   |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |5   |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5   |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99    

**DF API:**

In [80]:
val windowSpec = Window.partitionBy(products_df("product_category_id")).orderBy(products_df("product_price").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@46fece5a


org.apache.spark.sql.expressions.WindowSpec@46fece5a

In [81]:
val top_five_per_ctg = products_df.withColumn("rank", rank().over(windowSpec))
    .select("product_category_id", "product_id", "product_name", "product_price","rank")
    .filter(col("rank") <= 5)
    .orderBy("product_category_id", "rank")
    .cache()

top_five_per_ctg = [product_category_id: int, product_id: int ... 3 more fields]


[product_category_id: int, product_id: int ... 3 more fields]

In [82]:
top_five_per_ctg.show(false)

+-------------------+----------+------------------------------------------------+-------------+----+
|product_category_id|product_id|product_name                                    |product_price|rank|
+-------------------+----------+------------------------------------------------+-------------+----+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1   |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2   |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3   |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3   |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |5   |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |5   |
|3                  |40        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99    

### 4.18 Get 'topN priced' products in each category:
**topDenseN: For each category get top 5 priced products i.e. if there are 10 products with distinct top 5 prices, the DF should give us all 10 products, so all the products will be distinct as well as we will get 5 different distinct prices too among them.**

**SQL:**

In [83]:
spark.sql(
        """SELECT product_category_id, product_id, product_name, product_price, dense_rank
          |FROM ( 
          |     SELECT q.*, dense_rank() OVER (PARTITION BY q.product_category_id ORDER BY q.product_price DESC) as dense_rank 
          |     FROM products q)
          |WHERE dense_rank <= 5 
          |ORDER BY product_category_id, dense_rank
          |      
        """).show(false)

+-------------------+----------+------------------------------------------------+-------------+----------+
|product_category_id|product_id|product_name                                    |product_price|dense_rank|
+-------------------+----------+------------------------------------------------+-------------+----------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1         |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2         |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3         |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3         |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |4         |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |4         |
|2                  |6         |Jorda

**DF API:**

In [84]:
val windowSpec = Window.partitionBy(products_df("product_category_id")).orderBy(products_df("product_price").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@2a45a3fa


org.apache.spark.sql.expressions.WindowSpec@2a45a3fa

In [85]:
products_df.withColumn("dense_rank", dense_rank().over(windowSpec))
    .select("product_category_id", "product_id", "product_name", "product_price","dense_rank")
    .filter(col("dense_rank") <= 5)
    .orderBy("product_category_id", "dense_rank")
    .show(false)

+-------------------+----------+------------------------------------------------+-------------+----------+
|product_category_id|product_id|product_name                                    |product_price|dense_rank|
+-------------------+----------+------------------------------------------------+-------------+----------+
|2                  |16        |Riddell Youth 360 Custom Football Helmet        |299.99       |1         |
|2                  |11        |Fitness Gear 300 lb Olympic Weight Set          |209.99       |2         |
|2                  |14        |Quik Shade Summit SX170 10 FT. x 10 FT. Canop   |199.99       |3         |
|2                  |5         |Riddell Youth Revolution Speed Custom Footbal   |199.99       |3         |
|2                  |12        |Under Armour Men's Highlight MC Alter Ego Fla   |139.99       |4         |
|2                  |23        |Under Armour Men's Highlight MC Alter Ego Hul   |139.99       |4         |
|2                  |6         |Jorda

### 4.19 Get the Customer Id with max revenue on Daily basis:

**SQL:**

In [86]:
/*
# 1. Join orders and order_items and group by order_date and customer_id to calculate the revenue per customer per day
# 2. Sort the above using rank() function to get the maximum revenue per day
# 3. Select only rows with rank = 1, that will give the customer_id with max revenue
# 4. Join with customers table to get the customer details
*/

spark.sql(
        """SELECT a. order_date, a.order_customer_id, concat_ws(' ', c.customer_fname, c.customer_lname) as customer_name, a.order_total
          |FROM ( 
          |     SELECT *, rank() OVER (PARTITION BY b.order_date ORDER BY b.order_total DESC) as rank 
          |     FROM ( 
          |          SELECT o.order_date, o.order_customer_id, round(sum(oi.order_item_subtotal), 2) as order_total 
          |          FROM orders o INNER JOIN order_items oi 
          |              ON o.order_id = oi.order_item_order_id  
          |          WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD' 
          |          GROUP BY o.order_date, o.order_customer_id) b 
          |     ) a INNER JOIN customers c
          |           ON a.order_customer_id = c.customer_id
          |WHERE rank = 1 
          |ORDER BY order_date
          |      
        """).show(false)

+---------------------+-----------------+----------------+-----------+
|order_date           |order_customer_id|customer_name   |order_total|
+---------------------+-----------------+----------------+-----------+
|2013-07-25 00:00:00.0|11941            |Jeffrey Pugh    |1649.8     |
|2013-07-26 00:00:00.0|32               |Alice Smith     |2009.75    |
|2013-07-27 00:00:00.0|11491            |David Smith     |1379.88    |
|2013-07-28 00:00:00.0|5738             |Mildred Taylor  |1499.87    |
|2013-07-29 00:00:00.0|2632             |John Smith      |1389.86    |
|2013-07-29 00:00:00.0|5182             |Thomas Morgan   |1389.86    |
|2013-07-30 00:00:00.0|10029            |Mary Silva      |1529.92    |
|2013-07-31 00:00:00.0|1175             |Mary Gray       |1699.91    |
|2013-08-01 00:00:00.0|9151             |Aaron Smith     |1709.82    |
|2013-08-02 00:00:00.0|5548             |Michael Crawford|1594.92    |
|2013-08-03 00:00:00.0|9572             |Mary Nelson     |1569.79    |
|2013-

**DF API:**

In [87]:
val rev_per_day_per_cust = orders_df
                    .select($"order_date", $"order_id", $"order_customer_id", $"order_status")
                    .filter((col("order_status") =!= "CANCELED") && (col("order_status") =!= "SUSPECTED_FRAUD"))
                    .join(order_items_df, orders_df("order_id") === order_items_df("order_item_order_id"))
                    .select("order_date","order_customer_id","order_item_subtotal")
                    .groupBy("order_date","order_customer_id")
                    .agg(round(sum("order_item_subtotal"),2).alias("order_total"))
                    .cache()

rev_per_day_per_cust = [order_date: string, order_customer_id: int ... 1 more field]


[order_date: string, order_customer_id: int ... 1 more field]

In [88]:
rev_per_day_per_cust.show(5,false)

+---------------------+-----------------+-----------+
|order_date           |order_customer_id|order_total|
+---------------------+-----------------+-----------+
|2013-07-26 00:00:00.0|7710             |1199.82    |
|2013-07-27 00:00:00.0|1180             |1129.94    |
|2013-07-30 00:00:00.0|5511             |319.97     |
|2013-07-31 00:00:00.0|12018            |347.94     |
|2013-08-03 00:00:00.0|6698             |709.94     |
+---------------------+-----------------+-----------+
only showing top 5 rows



In [89]:
val windowSpec = Window.partitionBy(rev_per_day_per_cust("order_date")).orderBy(rev_per_day_per_cust("order_total").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@3502bc96


org.apache.spark.sql.expressions.WindowSpec@3502bc96

In [90]:
val top_cust_per_day_by_rev = rev_per_day_per_cust
                           .withColumn("rank",rank().over(windowSpec))
                           .filter(col("rank") === 1)
                           .orderBy("order_date").cache()

top_cust_per_day_by_rev = [order_date: string, order_customer_id: int ... 2 more fields]


[order_date: string, order_customer_id: int ... 2 more fields]

In [91]:
top_cust_per_day_by_rev.show(5,false)

+---------------------+-----------------+-----------+----+
|order_date           |order_customer_id|order_total|rank|
+---------------------+-----------------+-----------+----+
|2013-07-25 00:00:00.0|11941            |1649.8     |1   |
|2013-07-26 00:00:00.0|32               |2009.75    |1   |
|2013-07-27 00:00:00.0|11491            |1379.88    |1   |
|2013-07-28 00:00:00.0|5738             |1499.87    |1   |
|2013-07-29 00:00:00.0|2632             |1389.86    |1   |
+---------------------+-----------------+-----------+----+
only showing top 5 rows



In [92]:
top_cust_per_day_by_rev
 .join(customers_df, top_cust_per_day_by_rev("order_customer_id") === customers_df("customer_id"))
 .select(col("order_date"),col("order_customer_id"), concat($"customer_fname",lit(" "),$"customer_lname").alias("customer_name"),col("order_total"))
 .orderBy("order_date")
 .show(false)

+---------------------+-----------------+----------------+-----------+
|order_date           |order_customer_id|customer_name   |order_total|
+---------------------+-----------------+----------------+-----------+
|2013-07-25 00:00:00.0|11941            |Jeffrey Pugh    |1649.8     |
|2013-07-26 00:00:00.0|32               |Alice Smith     |2009.75    |
|2013-07-27 00:00:00.0|11491            |David Smith     |1379.88    |
|2013-07-28 00:00:00.0|5738             |Mildred Taylor  |1499.87    |
|2013-07-29 00:00:00.0|2632             |John Smith      |1389.86    |
|2013-07-29 00:00:00.0|5182             |Thomas Morgan   |1389.86    |
|2013-07-30 00:00:00.0|10029            |Mary Silva      |1529.92    |
|2013-07-31 00:00:00.0|1175             |Mary Gray       |1699.91    |
|2013-08-01 00:00:00.0|9151             |Aaron Smith     |1709.82    |
|2013-08-02 00:00:00.0|5548             |Michael Crawford|1594.92    |
|2013-08-03 00:00:00.0|9572             |Mary Nelson     |1569.79    |
|2013-

### 4.20 Get the top 3 Max Revenue Generating Customers Per Month in 2013:

**SQL:**

In [93]:
// In order to use an udf with sql it needs to be registerd to sqlContext

sqlContext.udf.register("udfmonTomonth", ((mon: Int) => { 
    val monthmap = Map(1 -> "Jan",2 -> "Feb", 3 -> "Mar",4 -> "Apr",5 -> "May",6 -> "Jun", 7 -> "Jul",8 -> "Aug",9 -> "Sep",10 -> "Oct",11 -> "Nov",12 -> "Dec")
        monthmap.get(mon)
    }))

UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))

In [94]:
/*# 1. Join orders and order_items and group by order_month and customer_id to calculate the revenue per customer per month
# 2. Sort the above using dense_rank() function to get the maximum revenue per month
# 3. Join with customers table to get the customer details
# 4. Select only rows with rank <= 1, that will give the top 3 customers with max revenue
*/
val df = spark.sql(
        """SELECT a.order_month, concat_ws(' ', c.customer_fname, c.customer_lname) as customer_name, a.order_total, a.dense_rank
          |FROM ( 
          |     SELECT *, dense_rank() OVER (PARTITION BY order_month ORDER BY order_total DESC) as dense_rank 
          |     FROM ( 
          |          SELECT udfmonTomonth(MONTH(o.order_date)) as order_month, o.order_customer_id, ROUND(SUM(oi.order_item_subtotal), 2) as order_total 
          |          FROM orders o INNER JOIN order_items oi 
          |              ON o.order_id = oi.order_item_order_id  
          |          WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD' 
          |          AND YEAR(o.order_date) = 2013
          |          GROUP BY order_month, o.order_customer_id) 
          |     ) a  INNER JOIN customers c
          |         ON a.order_customer_id = c.customer_id
          |WHERE dense_rank <= 3 
          |ORDER BY order_month, dense_rank
          |      
        """).cache()

df = [order_month: string, customer_name: string ... 2 more fields]


[order_month: string, customer_name: string ... 2 more fields]

In [95]:
df.show(false)

+-----------+-----------------+-----------+----------+
|order_month|customer_name    |order_total|dense_rank|
+-----------+-----------------+-----------+----------+
|Aug        |Victoria Smith   |4229.84    |1         |
|Aug        |Shirley Whitehead|3649.66    |2         |
|Aug        |Mary Smith       |3571.73    |3         |
|Dec        |Mary Olson       |4029.61    |1         |
|Dec        |Ann Smith        |3497.69    |2         |
|Dec        |Janet Smith      |3179.68    |3         |
|Jul        |Michelle Callahan|2781.73    |1         |
|Jul        |William Smith    |2059.75    |2         |
|Jul        |Alice Smith      |2009.75    |3         |
|Nov        |David Smith      |3129.72    |1         |
|Nov        |Rachel Smith     |3019.76    |2         |
|Nov        |Robert Williams  |2989.74    |3         |
|Oct        |Diana Smith      |3479.64    |1         |
|Oct        |Mary Smith       |2959.77    |2         |
|Oct        |Mary Smith       |2859.75    |3         |
|Sep      

**DF API:**

In [96]:
// Define an udf
def udfmonTomonth = udf((mon: Int) => { 
    val monthmap = Map(1 -> "Jan",2 -> "Feb", 3 -> "Mar",4 -> "Apr",5 -> "May",6 -> "Jun", 7 -> "Jul",8 -> "Aug",9 -> "Sep",10 -> "Oct",11 -> "Nov",12 -> "Dec")
    monthmap.get(mon)
})

udfmonTomonth: org.apache.spark.sql.expressions.UserDefinedFunction


In [97]:
val rev_per_month_per_cust = orders_df
                    .select(udfmonTomonth(month($"order_date")).alias("order_month"), $"order_id", $"order_customer_id",$"order_status")
                    .filter((col("order_status") =!= "CANCELED") && (col("order_status") =!= "SUSPECTED_FRAUD") && (year($"order_date") === 2013))
                    .join(order_items_df, orders_df("order_id") === order_items_df("order_item_order_id"))
                    .select("order_month","order_customer_id", "order_item_subtotal")
                    .groupBy("order_month","order_customer_id")
                    .agg(round(sum("order_item_subtotal"),2).alias("order_total"))
                    .cache()

rev_per_month_per_cust = [order_month: string, order_customer_id: int ... 1 more field]


[order_month: string, order_customer_id: int ... 1 more field]

In [98]:
rev_per_month_per_cust.show(5,false)

+-----------+-----------------+-----------+
|order_month|order_customer_id|order_total|
+-----------+-----------------+-----------+
|Jul        |4840             |129.99     |
|Jul        |8504             |1279.65    |
|Jul        |7436             |399.96     |
|Jul        |3752             |599.95     |
|Jul        |9639             |129.99     |
+-----------+-----------------+-----------+
only showing top 5 rows



In [99]:
val windowSpec = Window.partitionBy(rev_per_month_per_cust("order_month")).orderBy(rev_per_month_per_cust("order_total").desc)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@68d4bea5


org.apache.spark.sql.expressions.WindowSpec@68d4bea5

In [100]:
val top_cust_per_month_by_rev = rev_per_month_per_cust
                           .withColumn("dense_rank",dense_rank().over(windowSpec))
                           .filter(col("dense_rank") <= 3)
                           .orderBy("order_month")
                           .cache()

top_cust_per_month_by_rev = [order_month: string, order_customer_id: int ... 2 more fields]


[order_month: string, order_customer_id: int ... 2 more fields]

In [101]:
top_cust_per_month_by_rev.show(5,false)

+-----------+-----------------+-----------+----------+
|order_month|order_customer_id|order_total|dense_rank|
+-----------+-----------------+-----------+----------+
|Aug        |9515             |4229.84    |1         |
|Aug        |5047             |3649.66    |2         |
|Aug        |791              |3571.73    |3         |
|Dec        |9586             |4029.61    |1         |
|Dec        |10291            |3497.69    |2         |
+-----------+-----------------+-----------+----------+
only showing top 5 rows



In [102]:
top_cust_per_month_by_rev
    .join(customers_df, top_cust_per_month_by_rev("order_customer_id") === customers_df("customer_id"))
    .select(col("order_month"),col("order_customer_id"), concat($"customer_fname",lit(" "),$"customer_lname").alias("customer_name"),col("order_total"),col("dense_rank"))
    .show(false)

+-----------+-----------------+-----------------+-----------+----------+
|order_month|order_customer_id|customer_name    |order_total|dense_rank|
+-----------+-----------------+-----------------+-----------+----------+
|Aug        |9515             |Victoria Smith   |4229.84    |1         |
|Aug        |5047             |Shirley Whitehead|3649.66    |2         |
|Aug        |791              |Mary Smith       |3571.73    |3         |
|Dec        |9586             |Mary Olson       |4029.61    |1         |
|Dec        |10291            |Ann Smith        |3497.69    |2         |
|Dec        |4781             |Janet Smith      |3179.68    |3         |
|Jul        |5293             |Michelle Callahan|2781.73    |1         |
|Jul        |4257             |William Smith    |2059.75    |2         |
|Jul        |32               |Alice Smith      |2009.75    |3         |
|Nov        |7305             |David Smith      |3129.72    |1         |
|Nov        |5683             |Rachel Smith     |30

### 4.21 Get All Distinct Pair of Products the occurred in Orders where order total was greater than 300:

This problem can be solved using Self Join. But there are some corner cases:
+ Pair (a, b) and (b, a) are same, so they should not be reported twice.
+ What happens if item is order more than once in the same order id? i.e. product a appears more than once in the line items of the order id?

Let's check our fictitious dataset from Cloudera of order items.

In [103]:
spark.sql("""SELECT order_item_order_id, order_item_product_id, order_item_quantity, order_item_subtotal
          |FROM order_items oi
          |ORDER BY order_item_order_id, order_item_product_id
        """).show(10)

+-------------------+---------------------+-------------------+-------------------+
|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+-------------------+---------------------+-------------------+-------------------+
|                  1|                  957|                  1|             299.98|
|                  2|                  403|                  1|             129.99|
|                  2|                  502|                  5|              250.0|
|                  2|                 1073|                  1|             199.99|
|                  4|                  365|                  5|             299.95|
|                  4|                  502|                  3|              150.0|
|                  4|                  897|                  2|              49.98|
|                  4|                 1014|                  4|             199.92|
|                  5|                  365|                  5|             

**Corner Case:**

Let's check an order where the same item is ordered twice in a single order. Here in order_id 148 we have product_id 502 ordered twice albeit with different quantities.

In [104]:
spark.sql("""SELECT order_item_order_id, order_item_product_id, order_item_quantity, order_item_subtotal
          |FROM order_items oi 
          |WHERE oi.order_item_order_id = 148
          |ORDER BY order_item_product_id
        """).show()

+-------------------+---------------------+-------------------+-------------------+
|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+-------------------+---------------------+-------------------+-------------------+
|                148|                  403|                  1|             129.99|
|                148|                  502|                  2|              100.0|
|                148|                  502|                  5|              250.0|
+-------------------+---------------------+-------------------+-------------------+



**SQL:**

Create self join between order_items table and filtered out order_items table where order_total >= 300. Moreover, The sql *`AND t1.order_item_product_id < t2.order_item_product_id`* will ensure that pair (a, b) and (b, a) are treated as same and are not reported twice.

In [105]:
spark.sql("""SELECT distinct a.order_item_order_id as txn_id, t1.order_item_product_id as prd_a, t2.order_item_product_id as prd_b, ROUND(a.order_total, 2) as txn_total
          |FROM (
          |    SELECT order_item_order_id, SUM(order_item_subtotal) as order_total
          |    FROM order_items
          |    GROUP BY order_item_order_id) a, 
          |    order_items t1,
          |    order_items t2
          |WHERE a.order_total >= 300
          |AND t1.order_item_order_id = a.order_item_order_id
          |AND t2.order_item_order_id = a.order_item_order_id
          |AND t1.order_item_product_id < t2.order_item_product_id
          |ORDER BY txn_id, prd_a
        """).show()

+------+-----+-----+---------+
|txn_id|prd_a|prd_b|txn_total|
+------+-----+-----+---------+
|     2|  403|  502|   579.98|
|     2|  403| 1073|   579.98|
|     2|  502| 1073|   579.98|
|     4|  365|  502|   699.85|
|     4|  365| 1014|   699.85|
|     4|  365|  897|   699.85|
|     4|  502|  897|   699.85|
|     4|  502| 1014|   699.85|
|     4|  897| 1014|   699.85|
|     5|  365|  403|  1129.86|
|     5|  365| 1014|  1129.86|
|     5|  365|  957|  1129.86|
|     5|  403| 1014|  1129.86|
|     5|  403|  957|  1129.86|
|     5|  957| 1014|  1129.86|
|     7|  926|  957|   579.92|
|     7|  926| 1073|   579.92|
|     7|  957| 1073|   579.92|
|     8|  365| 1014|   729.84|
|     8|  365|  502|   729.84|
+------+-----+-----+---------+
only showing top 20 rows



**DF API:**

Doing the same thing as above but this time using DataFrame APIs instead of SQL. 

Note: the usage of alias method on dataframe itself for self joins.

In [106]:
val t1 = order_items_df.alias("t1")
val t2 = order_items_df.alias("t2")

t1 = [order_item_id: int, order_item_order_id: int ... 4 more fields]
t2 = [order_item_id: int, order_item_order_id: int ... 4 more fields]


[order_item_id: int, order_item_order_id: int ... 4 more fields]

In [107]:
val df = (order_items_df
      .groupBy(order_items_df("order_item_order_id"))
      .agg(sum(order_items_df("order_item_subtotal")).alias("order_total"))
      .filter(col("order_total") >= 300)
      .join(t1, $"t1.order_item_order_id" === order_items_df("order_item_order_id"))
      .join(t2, $"t2.order_item_order_id" === order_items_df("order_item_order_id"))
      .where($"t1.order_item_product_id" < $"t2.order_item_product_id")
      .select(order_items_df("order_item_order_id").alias("txn_id"), 
              $"t1.order_item_product_id".alias("prd_a"), 
              $"t2.order_item_product_id".alias("prd_b"),
              round(col("order_total"), 2).alias("txn_total"))
      .distinct()
      .orderBy("txn_id", "prd_a")
      .cache())

df = [txn_id: int, prd_a: int ... 2 more fields]


[txn_id: int, prd_a: int ... 2 more fields]

In [108]:
df.show()

+------+-----+-----+---------+
|txn_id|prd_a|prd_b|txn_total|
+------+-----+-----+---------+
|     2|  403|  502|   579.98|
|     2|  403| 1073|   579.98|
|     2|  502| 1073|   579.98|
|     4|  365| 1014|   699.85|
|     4|  365|  897|   699.85|
|     4|  365|  502|   699.85|
|     4|  502|  897|   699.85|
|     4|  502| 1014|   699.85|
|     4|  897| 1014|   699.85|
|     5|  365|  957|  1129.86|
|     5|  365| 1014|  1129.86|
|     5|  365|  403|  1129.86|
|     5|  403|  957|  1129.86|
|     5|  403| 1014|  1129.86|
|     5|  957| 1014|  1129.86|
|     7|  926|  957|   579.92|
|     7|  926| 1073|   579.92|
|     7|  957| 1073|   579.92|
|     8|  365| 1014|   729.84|
|     8|  365|  502|   729.84|
+------+-----+-----+---------+
only showing top 20 rows



In [109]:
spark.stop()