# Retail Data Analysis using Apache Spark on Databricks Community Edition
In this notebook, I will be demonstrating various concepts of Apache Spark such as transformations and actions. I will be executing the examples using various API's present in Apache Spark such as RDD's.

The notebook is written and executed on Databricks Community Edition. Getting started with Databricks community edition can be found <a href="https://docs.databricks.com/user-guide/index.html">here</a>. Introduction to Apache Spark with Databricks can be found <a href="https://docs.databricks.com/spark/latest/training/index.html">here</a>

### Datasets

1. <a href="https://github.com/shwetabhartia/data/tree/master/retail_db/orders.csv"><b>orders.csv</b></a> - Contains the list of orders. The various attributes present in the dataset are
 1. Order ID : The unique identifier for the each order (INTEGER)
 2. Order Date: The date and time when order was placed (DATETIME)
 3. Customer ID : The customer Id of the customer associated to the order (INTEGER)
 4. Order Status : The status associated with the order (VARCHAR)
2. <a href="https://github.com/shwetabhartia/data/tree/master/retail_db/order_items.csv"> <b>order_items.csv</b></a> - Contains the details about the each item ordered in the order list
 1. Order Item ID: The unique identifier of the each item in the order list(INTEGER)
 2. Order Item Order ID : The identifier for the order (INTEGER)
 3. Order Item Product ID : The id associated with the product (INTEGER)
 4. Order Item Quantity : The quantity ordered for a particular item (INTEGER)
 5. Order Item Subtotal: The total price for the ordered items (FLOAT)
 6. Order Item Product Price: The price associated with the each product (FLOAT)
3. <a href="https://github.com/shwetabhartia/data/tree/master/retail_db/products.csv"> <b>products.csv</b></a> - Contains the details about the each product
 1. Product ID: The unique identifier for the each product (INTEGER)
 2. Product Category ID : The identifier for the category to which product belongs (INTEGER)
 3. Product Description : The description associated with the product (VARCHAR)
 4. Product Price : The price of the product (FLOAT)
 5. Product Image : The url of the image associated with the product (VARCHAR)
4. <a href="https://github.com/shwetabhartia/data/tree/master/retail_db/categories.csv"> <b>categories.csv</b></a> - Contains the details about the each product
 1. Category ID: The unique identifier for the each category (INTEGER)
 2. Category Department ID : The identifier for the department to which category belongs (INTEGER)
 3. Category Name : The name of the category (VARCHAR)

### Downloading the datasets

Download the datasets using the shell command wget and the URL, save them into the tmp directory. The URL's for the datasets are
1. orders.csv : https://github.com/shwetabhartia/data/tree/master/retail_db/orders.csv
2. order_items.csv : https://github.com/shwetabhartia/data/tree/master/retail_db/order_items.csv
3. products.csv : https://github.com/shwetabhartia/data/tree/master/retail_db/products.csv
4. category.csv : https://github.com/shwetabhartia/data/tree/master/retail_db/categories.csv

In [3]:
%sh
wget -P /tmp "https://raw.githubusercontent.com/shwetabhartia/data/master/retail_db/orders.csv"
wget -P /tmp "https://raw.githubusercontent.com/shwetabhartia/data/master/retail_db/order_items.csv"
wget -P /tmp "https://raw.githubusercontent.com/shwetabhartia/data/master/retail_db/products.csv"
wget -P /tmp "https://raw.githubusercontent.com/shwetabhartia/data/master/retail_db/categories.csv"

### Uploading the datasets into Databricks file system

Databricks file system is a distributed file system lying on top of Amazon S3. We will upload the data from the local file system into our DBFS. Below is a python script which copies the data from the local file system into the datasets folder of DBFS of your cluster.

Note: The local files are referenced using `file:/` and DBFS files are referenced using `dbfs:/`

In [5]:
localOrderFilePath = "file:/tmp/orders.csv"
localOrderItemFilePath = "file:/tmp/order_items.csv"
localProductFilePath = "file:/tmp/products.csv"
localCategoriesFilePath = "file:/tmp/categories.csv"
dbutils.fs.mkdirs("dbfs:/datasets")
dbutils.fs.cp(localOrderFilePath, "dbfs:/datasets/")
dbutils.fs.cp(localOrderItemFilePath, "dbfs:/datasets")
dbutils.fs.cp(localProductFilePath, "dbfs:/datasets/")
dbutils.fs.cp(localCategoriesFilePath, "dbfs:/datasets")
#Displaying the files present in the DBFS datasets folder of your cluser
display(dbutils.fs.ls("dbfs:/datasets"))

### Lambda Function

Python supports the creation of anonymous functions (i.e. functions that are not bound to a name) at runtime, using a construct called "lambda"

More info on <a href="http://www.secnetix.de/olli/Python/lambda_functions.hawk">lambda</a>

### Creating RDD for orders table

In [8]:
ordersRDD = sc.textFile("dbfs:/datasets/orders.csv")
for i in ordersRDD.take(10): print(i)

### Creating RDD for the order_items table

In [10]:
orderItemsRDD = sc.textFile("dbfs:/datasets/order_items.csv")
for i in orderItemsRDD.take(10): print(i)

### Creating RDD for the products table

In [12]:
productsRDD = sc.textFile("dbfs:/datasets/products.csv")
for i in productsRDD.take(10): print(i)

### Creating RDD for the categories table

In [14]:
categoryRDD = sc.textFile("dbfs:/datasets/categories.csv")
for i in categoryRDD.take(10): print(i)

### Getting the revenue from order_items on daily basis
1. "Map" through the orders RDD and stored order ID and order date in the RDD
2. "Map" through the order items RDD and stored order ID and price
3. Joined both the RDD based on order ID
4. Summed the total price for each date and sorted by the revenue
5. Printed first 10 revenues

In [16]:
ordersMapRDD = ordersRDD.map(lambda x: x.split(",")).map(lambda x: (x[0], x[1]))
orderItemsMapRDD = orderItemsRDD.map(lambda x: x.split(",")).map(lambda x: (x[1], float(x[4])))
ordersJoin = ordersMapRDD.join(orderItemsMapRDD)
revenuePerDay=ordersJoin.map(lambda x:(x[1][0],x[1][1])).reduceByKey(lambda acc, val:acc+val).map(lambda (x,y):(y,x)).sortByKey().map(lambda (x,y):(y,x))
for i in revenuePerDay.take(10): print i


### Get the number of orders from order_items on daily basis
1. "Map" through the orders RDD and stored order date in the RDD
2. Created tuple with order date and 1
3. Added the total orders for each date
4. Printed first 10 orders per day

In [18]:
ordersPerDay=ordersRDD.map(lambda x:x.split(',')[1]).map(lambda x:(x,1)).reduceByKey(lambda acc,value:acc+value).sortByKey()
for i in ordersPerDay.take(10) : print i

### Get total revenue from order_items
1. "Map" through the orders items RDD and stored price in the RDD
2. Added the total price
3. Printed total revenue

In [20]:
totalRevenue=orderItemsRDD.map(lambda x:float(x.split(',')[4])).reduce(lambda acc,value:acc+value)
print totalRevenue

### Get max priced product in products table
1. "Map" through the products RDD and check for the max price of the product
2. Printed max priced product

In [22]:
maxPricedProduct=productsRDD.map(lambda x:x.split(',')).reduce(lambda rec1,rec2: rec1 if float(rec1[3])>float(rec2[3]) else rec2)
print maxPricedProduct

### Computing average revenue
1. "Map" through the order items RDD, stored total price and computed revenue
2. "Map" through the order RDD and counted total number of orders
3. Computed average revenue
4. Printed average revenue

In [24]:
totalRevenue=orderItemsRDD.map(lambda x:float(x.split(',')[4])).reduce(lambda acc,value:acc+value)
totalDistinctOrders=ordersRDD.map(lambda x:x.split(',')[0]).distinct().count()
averageRevenue=totalRevenue/totalDistinctOrders
print averageRevenue


### Number of orders by status - Using countByKey
1. "Map" through the orders RDD and stored order status
2. Computed total orders by status using countByKey
4. Printed number of orders by status

In [26]:
#Since countByKey takes in a PairRDD, you have to specify the None, or else it will be taken as a RDD
ordersByStatus = ordersRDD.map(lambda x: x.split(",")).map(lambda x: (x[3], None)).countByKey()
for i in ordersByStatus.items(): print(i)

### Number of orders by status - Using groupByKey
1. "Map" through the orders RDD and stored order status
2. Computed total orders by status using groupByKey
4. Printed number of orders by status

In [28]:
ordersGrouped = ordersRDD.map(lambda x: (x.split(",")[3],1)).groupByKey().map(lambda (x,y): (x,sum(y)))
for i in ordersGrouped.take(10): print(i)

### Number of orders by status - Using reduceByKey
1. "Map" through the orders RDD and stored order status
2. Computed total orders by status using reduceByKey
4. Printed number of orders by status

In [30]:
ordersByStatus=ordersRDD.map(lambda x: (x.split(",")[3],1)).reduceByKey(lambda acc,val: acc+val)
for i in ordersByStatus.take(10): print(i)

### Get customer_id with max revenue for each day
1. "Map" through the orders RDD and stored order ID as key and date and customer ID as value
2. "Map" through the order items RDD and store order ID and price
3. Joined both the RDD on order ID
4. Compared each record to find the customer ID having maximum revenue for each day
4. Printed customer with max revenue for each day

In [32]:
ordersMapRDD=ordersRDD.map(lambda x: x.split(',')).map(lambda x: (x[0],(x[1],int(x[2]))))
orderItemsMapRDD=orderItemsRDD.map(lambda x: x.split(',')).map(lambda x: (x[1],float(x[4])))
ordersJoin=ordersMapRDD.join(orderItemsMapRDD)
perDayRDD=ordersJoin.map(lambda (key,value):(value[0][0],(value[1],value[0][1])))
custIdMaxRev=perDayRDD.reduceByKey(lambda x,y: (x if x[1]>y[1] else y))
for i in custIdMaxRev.take(10): print i

### Get all the orders with status COMPLETE
1. "Map" through the orders RDD and filtered out order having complete status
2. Printed number of orders with complete status

In [34]:
completedOrders = ordersRDD.map(lambda x: x.split(",")).filter(lambda x: x[3]=="COMPLETE")
for i in completedOrders.take(1): print(i)

### Get all the orders where status contains the word PENDING
1. "Map" through the orders RDD and filtered out order having pending status
2. Printed number of orders with pending status

In [36]:
pendingInOrders = ordersRDD.map(lambda x: x.split(",")).filter(lambda x: "PENDING" in x[3])
for i in pendingInOrders.take(1): print(i)

### Get all the orders where order_id > 100 or order_status is in one of the pending states
1. "Map" through the orders RDD and filtered out order having pending status or order ID > 100
2. Printed number of orders with pending status and order ID > 100

In [38]:
filteredRDD = ordersRDD.map(lambda x: x.split(",")).filter(lambda x: int(x[0])>100 or "PENDING" in x[3])
for i in filteredRDD.take(1): print(i)

### Check if there are cancelled orders with amount greater than 1000$
1. "Map" through the orders RDD and filtered out order having canceled status and stored order ID, order status
2. "Map" through the orders items RDD and stored order ID, price
3. Summed all the orders on order ID and filtered out order having total amount > 1000$
2. Printed number of orders with cancelled status and total order amount > 1000

In [40]:
cancelledOrders=ordersRDD.map(lambda x:x.split(',')).filter(lambda x : x[3]=="CANCELED").map(lambda x: (x[0],x[3]))
itemGreater=orderItemsRDD.map(lambda x: x.split(',')).map(lambda x: (x[1],float(x[4]))).reduceByKey(lambda acc,value: acc+value).filter(lambda x: x[1]>1000)
answerRDD=cancelledOrders.join(itemGreater)
for i in answerRDD.take(1) : print i


### Find maximum priced product
1. "Map" through the products RDD and stored product price and every product row
2. Printed using top action

In [42]:
productsSorted = productsRDD.map(lambda x: (float(x.split(",")[3]), x))
#Top automatically sorts the data in descending order and returns the records, so no need to use it in conjunction with sortByKey
for i in productsSorted.top(10): print(i)

### Sort products by price with in each category
1. "Map" through the products RDD and stored category ID and product price
2. Sorted and printed top proucts by price in each category

In [44]:
productsParsed = productsRDD.map(lambda x: x.split(",")).map(lambda x: (x[1],float(x[3]))).groupByKey()
sortedProducts = productsParsed.map(lambda (x,y): (x,sorted(y, reverse=True)))
for i in sortedProducts.take(2): print(i)