# Step 1: Data Loading

In [1]:
val aislesDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/aisles.csv")
val departmentsDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/departments.csv")
val productsDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/products.csv")
val ordersDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/orders.csv")

val priorsDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/order_products__prior.csv")
val trainDF = spark.read.option("header", "true").option("inferSchema", "true").csv("instacart/order_products__train.csv")

In [2]:
z.show(aislesDF)
z.show(departmentsDF)
z.show(productsDF)
z.show(ordersDF)

z.show(priorsDF)
z.show(trainDF)

# Step 2: Data Profiling

1. __Aisles__: Contains a one-to-one mapping between a unique Aisle ID and the Aisle Name
2. __Departments__: Contains a one-to-one mapping between a unique Department ID and the Department Name
3. __Products__: Contains a one-to-one mapping between a unique Product ID and the Product Name. Each Product also has an Aisle ID and a Department ID

The Aisle Name and Department Name act different categorization fields for each Product

In [4]:
println("Aisles DF")
aislesDF.printSchema()

println("Departments DF")
departmentsDF.printSchema()

println("Products DF")
productsDF.printSchema()

4. __Orders__: Contains information of a particular order. Fields include -
    - *User ID*: Unique ID for a particular Instacart user
    - *Order Number*: The order number for a particular user
    - *Order DOW*: The Day of the Week that the order was placed
    - *Order Hour of Day*: The hour of day when the order was placed
    - *Days Since Prior Order*: The number of days since the previous order after which this order was placed

In [6]:
println("Orders DF")
ordersDF.printSchema()

5. __Priors__: Contains Order History of every user
6. __Train__: Training Data provided by Instacart

Both these datasets have the same set of columns:
    - *Order ID*: Unique order id for every order
    - *Product ID*: product ID of item purchased in the order
    - *Add to cart order*: denotes the sequence in which products were added to cart
    - *Reordered*: Is the product reordered? (0/1)

In [8]:
println("Priors DF")
priorsDF.printSchema()

println("Train DF")
trainDF.printSchema()

### 1. Dataset Size

Calculating Dataset sizes and answering questions on the dataset

In [10]:
val aislesCount = aislesDF.count()
val departmentsCount = departmentsDF.count()
val productsCount = productsDF.count()
val ordersCount = ordersDF.count()

val priorsCount = priorsDF.count()
val trainCount = trainDF.count()

In [11]:
println(s"Number of aisles: ${aislesCount}")
println(s"Number of departments: ${departmentsCount}")
println(s"Number of products: ${productsCount}")
println(s"Number of orders: ${ordersCount}")

println(s"Number of order history records: ${priorsCount}")
println(s"Number of training order records: ${trainCount}")

In [12]:
val uniqueOrderCount = ordersDF.select("order_id").distinct().count()

In [13]:
println(s"Are all orders unique? ${ordersCount == uniqueOrderCount}")

### 2. Null Checks

Check columns for null values in all fields

In [15]:
val nullAisles = aislesDF.filter($"aisle_id".isNull || $"aisle".isNull).count()
val nullDepartments = departmentsDF.filter($"department_id".isNull || $"department".isNull).count()
val nullProducts = productsDF.filter($"product_id".isNull || $"product_name".isNull || $"aisle_id".isNull || $"department_id".isNull).count()

__Orders Dataframe__ is expected to have null values in case there is no previous order (`days_since_prior_order = null`)

In [17]:
val firstTimeOrders = ordersDF.filter($"days_since_prior_order".isNull).count

In [18]:
val orderColumns = ordersDF.columns.filter(col => col != "days_since_prior_order")
val nullOrders = ordersDF.filter(orderColumns.map(c => col(c).isNull).reduce(_ || _)).count

In [19]:
val nullPriors = priorsDF.filter(priorsDF.columns.map(c => col(c).isNull).reduce(_ || _)).count
val nullTrains = trainDF.filter(trainDF.columns.map(c => col(c).isNull).reduce(_ || _)).count

### 3. Column Statistics

Simple summary statistics for each column of the __Order__ Dataframe

In [21]:
z.show(ordersDF.summary())

# Step 3: Data Preprocessing

The data from different sources need to be preprocessed and merged to perform further data profiling

### 1. Join the smaller tables to get Product Details

Create the `productDetailsDF` by joining `productsDF`, `aislesDF` and `departmentsDF`

We notice that one of the products does not have a corresponding Aisle and Department name

In [24]:
val joinedDF = productsDF.join(broadcast(aislesDF), Seq("aisle_id"), "left_outer")
val productDetailsDF = joinedDF.join(broadcast(departmentsDF), Seq("department_id"), "left_outer")

In [25]:
z.show(productDetailsDF.filter($"aisle".isNull || $"department".isNull))

### 2. Merge the __Prior__ and the __Train__ datasets

Since we are not concerned if the order_products record comes from `train.csv` or `prior.csv`, we can merge the two datasets into a single bigger dataset

In [27]:
val orderProductsDF = priorsDF.union(trainDF)

In [28]:
println(s"Does the merged dataset have distinct items? ${orderProductsDF.count.equals(orderProductsDF.distinct.count)}")

### 3. Remove null from Orders

`ordersDF` has pre-existing null values in the `days_since_prior_order` column. These can be filled with -1.

In [30]:
val updatedOrdersDF = ordersDF.na.fill(Map("days_since_prior_order" -> -1))

### 4. Join the remaining tables

Join the `ordersDF` and `productDetailsDF` to the `orderProductsDF` to form one single denormalized dataset

In [32]:
val joinedDF = orderProductsDF.join(updatedOrdersDF, Seq("order_id"), "left_outer").join(productDetailsDF, Seq("product_id"), "left_outer")

joinedDF.cache()

In [33]:
z.show(joinedDF)

### 5. Drop Null

Drop all rows with null values

In [35]:
val nonNullDF = joinedDF.na.drop()

### 6. Drop ID columns

Drop the `eval_set` column and all the `id` columns

In [37]:
val columns = nonNullDF.columns.filter(c => c.endsWith("_id") || c.equals("eval_set"))

In [38]:
val cleanedDF = nonNullDF.drop(columns: _*)

In [39]:
z.show(cleanedDF)

### 7. Save the cleaned dataframe

Save `cleanedDF` in parquet format

In [41]:
val outputPath = "instacart/instacart-clean.parquet"

cleanedDF.write.mode("overwrite").parquet(outputPath)

# Step 4: Basket Analysis/Profiling

### 1. Most popular orders

In [44]:
val orderCountsDF = cleanedDF.groupBy("product_name").agg(count("product_name") as "order_count").orderBy(desc("order_count"))

In [45]:
z.show(orderCountsDF)

### 2. Peak Order Hour

In [47]:
val orderHourDF = cleanedDF.groupBy("order_hour_of_day").agg(count("order_hour_of_day") as "order_count").orderBy(desc("order_count"))

In [48]:
z.show(orderHourDF)

### 3. Peak Order Day

In [50]:
val orderDayDF = cleanedDF.groupBy("order_dow").agg(count("order_dow") as "order_count").orderBy(desc("order_count"))

In [51]:
z.show(orderDayDF)