# SUMMARY
---

## TODO: Describe purpose of this notebook

# SETUP
---

## import packages

In [1]:
import findspark
import pyspark
import numpy as np

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import isnan, when, count, col

## sparkcontext | sqlcontext

In [2]:
# Create SparkContext object
# SparkContext is the main entry point for Spark functionality
spark = SparkSession.builder.appName('InstaCart').getOrCreate()

# Create SQLContext object
# SQLContext is the entry point into all relational functionality in Spark
sql = SQLContext(spark)

## define constants

In [3]:
DEBUG = False
WRITE_TO_FILE = True
print('--- constants initialized ---')

--- constants initialized ---


## load data

### define schemas

In [4]:
# define schema for each file to enforce data types

# define schema for aisles.csv
schema_aisles = StructType([StructField('aisle_id', IntegerType(), False), 
                            StructField('aisle', StringType(), False)])
                           
# define schema for departments.csv
schema_departments = StructType([StructField('department_id', IntegerType(), False), 
                                 StructField('department', StringType(), False)])
                          
# define schema for orders.csv
schema_orders = StructType([StructField('order_id', IntegerType(), False), 
                            StructField('user_id', IntegerType(), False), 
                            StructField('eval_set', StringType(), False), 
                            StructField('order_number', IntegerType(), False), 
                            StructField('order_dow', IntegerType(), False), 
                            StructField('order_hour_of_day', IntegerType(), False), 
                            StructField('days_since_prior_order', FloatType(), False)])
                           
# define schema for products.csv
schema_products = StructType([StructField('product_id', IntegerType(), False), 
                              StructField('product_name', StringType(), False), 
                              StructField('aisle_id', IntegerType(), False), 
                              StructField('department_id', IntegerType(), False)])
                             
# define schema for order_products__prior.csv / order_products__train.csv
schema_order_products = StructType([StructField('order_id', IntegerType(), False), 
                                    StructField('product_id', IntegerType(), False), 
                                    StructField('add_to_cart_order', IntegerType(), False), 
                                    StructField('reordered', IntegerType(), False)])

### create spark dataframes

In [5]:
df_aisles = spark.read.csv('aisles.csv', header=True, schema=schema_aisles)
df_departments = spark.read.csv('departments.csv', header=True, schema=schema_departments)
df_orders = spark.read.csv('orders.csv', header=True, schema=schema_orders)
df_products = spark.read.csv('products.csv', header=True, schema=schema_products)
df_order_products = spark.read.csv('order_products__prior.csv', header=True, schema=schema_order_products)
df_order_products2 = spark.read.csv('order_products__train.csv', header=True, schema=schema_order_products)

# DATA INSPECTION
---

## aisles

### aisles schema

In [6]:
print('--- aisles: schema ---')
print(df_aisles.printSchema())

--- aisles: schema ---
root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)

None


### aisles head

In [63]:
print('--- aisles: head ---')
print(df_aisles.show(5))

--- aisles: head ---
+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
|       3| energy granola bars|
|       4|       instant foods|
|       5|marinades meat pr...|
+--------+--------------------+
only showing top 5 rows

None


### aisles statistics

In [8]:
print('--- aisles: unique aisle_id ---')
print('num unique:',len(df_aisles.select('aisle_id').distinct().collect()))
print('unique values:',sorted([int(row['aisle_id']) for row in df_aisles.select('aisle_id').distinct().collect()]))

--- aisles: unique aisle_id ---
num unique: 134
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134]


### aisles summary
- There are 134 unique aisle_id values, numbered from 1 to 134.

## departments

### departments schema

In [9]:
print('--- departments: schema ---')
print(df_departments.printSchema())

--- departments: schema ---
root
 |-- department_id: integer (nullable = true)
 |-- department: string (nullable = true)

None


### departments head

In [10]:
print('--- departments: head ---')
print(df_departments.show(5))

--- departments: head ---
+-------------+----------+
|department_id|department|
+-------------+----------+
|            1|    frozen|
|            2|     other|
|            3|    bakery|
|            4|   produce|
|            5|   alcohol|
+-------------+----------+
only showing top 5 rows

None


### departments statistics

In [11]:
print('--- department: unique department_id ---')
print('num unique:',len(df_departments.select('department_id').distinct().collect()))
print('unique values:',sorted([int(row['department_id']) for row in df_departments.select('department_id').distinct().collect()]))

--- department: unique department_id ---
num unique: 21
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]


### departments summary
- There are 21 unique department_id values, numbered from 1 to 21.

## orders

### orders schema

In [12]:
print('--- orders: schema ---')
print(df_orders.printSchema())

--- orders: schema ---
root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: float (nullable = true)

None


### orders head

In [13]:
print('--- orders: head ---')
print(df_orders.show(5))

--- orders: head ---
+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|                8|                  null|
| 2398795|      1|   prior|           2|        3|                7|                  15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|
| 2254736|      1|   prior|           4|        4|                7|                  29.0|
|  431534|      1|   prior|           5|        4|               15|                  28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing top 5 rows

None


### orders statistics

In [14]:
print('--- orders: unique order_id ---')
print('num unique:',len(df_orders.select('order_id').distinct().collect()))

--- orders: unique order_id ---
num unique: 3421083


In [15]:
print('--- orders: unique user_id ---')
print('num unique:',len(df_orders.select('user_id').distinct().collect()))

--- orders: unique user_id ---
num unique: 206209


In [16]:
print('--- orders: unique order_number ---')
print('num unique:',len(df_orders.select('order_number').distinct().collect()))
print('unique values:',sorted([int(row['order_number']) for row in df_orders.select('order_number').distinct().collect()]))

--- orders: unique order_number ---
num unique: 100
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]


In [17]:
print('--- descriptive statistics by user_id latest order ---')
print('min latest order:',np.min(df_orders.groupBy('user_id').max().select('max(order_number)').collect()))
print('mean latest order:',np.mean(df_orders.groupBy('user_id').max().select('max(order_number)').collect()))
print('med latest order:',np.median(df_orders.groupBy('user_id').max().select('max(order_number)').collect()))
print('max latest order:',np.max(df_orders.groupBy('user_id').max().select('max(order_number)').collect()))

--- descriptive statistics by user_id latest order ---
min latest order: 4
mean latest order: 16.590367054784224
med latest order: 10.0
max latest order: 100


In [18]:
print('--- total number of orders per day of week ---')
df_orders.groupBy('order_dow').count().select(['order_dow','count']).sort(col('order_dow').asc()).show()

--- total number of orders per day of week ---
+---------+------+
|order_dow| count|
+---------+------+
|        0|600905|
|        1|587478|
|        2|467260|
|        3|436972|
|        4|426339|
|        5|453368|
|        6|448761|
+---------+------+



In [19]:
print('--- total number of orders per hour of the day ---')
df_orders.groupBy('order_hour_of_day').count().select(['order_hour_of_day','count']).sort(col('order_hour_of_day').asc()).show(30)

--- total number of orders per hour of the day ---
+-----------------+------+
|order_hour_of_day| count|
+-----------------+------+
|                0| 22758|
|                1| 12398|
|                2|  7539|
|                3|  5474|
|                4|  5527|
|                5|  9569|
|                6| 30529|
|                7| 91868|
|                8|178201|
|                9|257812|
|               10|288418|
|               11|284728|
|               12|272841|
|               13|277999|
|               14|283042|
|               15|283639|
|               16|272553|
|               17|228795|
|               18|182912|
|               19|140569|
|               20|104292|
|               21| 78109|
|               22| 61468|
|               23| 40043|
+-----------------+------+



### orders summary
- There are 3,421,083 unique order_id values.
- There are 206,209 unique user_id values.
- There are 206,209 unique user_id values.
- There are 100 unique order_number values, numbered from 1 to 100.
- Users placed between 4 and 100 orders (inclusive)with InstaCart, with a mean of 16.6 orders and a median of 10 orders.
- Saturday (day 0) has the greatest number of orders placed, while Wednesday (day 4) has the smallest number of orders placed.
- The time period of the day with the greatest number of orders placed is between 10:00 am and 04:00 pm.
- The time period of the day with the smallest number of orders placed is before 06:00 am and after 08:00 pm.

## products

### products schema

In [20]:
print('--- products: schema ---')
print(df_products.printSchema())

--- products: schema ---
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)

None


### products head

In [21]:
print('--- products: head ---')
print(df_products.show(5))

--- products: head ---
+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
|         3|Robust Golden Uns...|      94|            7|
|         4|Smart Ones Classi...|      38|            1|
|         5|Green Chile Anyti...|       5|           13|
+----------+--------------------+--------+-------------+
only showing top 5 rows

None


### products statistics

In [22]:
print('--- products: unique product_id ---')
print('num unique:',len(df_products.select('product_id').distinct().collect()))
#print('unique values:',sorted([int(row['aisle_id']) for row in df_aisles.select('aisle_id').distinct().collect()]))

--- products: unique product_id ---
num unique: 49688


In [23]:
print('--- products: unique aisle_id ---')
print('num unique:',len(df_products.select('aisle_id').distinct().collect()))
print('unique values:',sorted([(row['aisle_id']) for row in df_products.where(col('aisle_id').isNotNull()).select('aisle_id').distinct().collect()]))

# print including NULL values
#print('unique values:',[(row['aisle_id']) for row in df_products.select('aisle_id').distinct().collect()])

--- products: unique aisle_id ---
num unique: 135
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134]


In [24]:
# find row with NULL value for aisle_id
df_products.where(col('aisle_id').isNull()).show()

+----------+------------+--------+-------------+
|product_id|product_name|aisle_id|department_id|
+----------+------------+--------+-------------+
|      null|        null|    null|         null|
+----------+------------+--------+-------------+



In [25]:
# drop row of NULL values
df_products = df_products.na.drop()

# double check results
print('--- products: unique product_id ---')
print('num unique:',len(df_products.select('product_id').distinct().collect()))
print('--- products: unique aisle_id ---')
print('num unique:',len(df_products.select('aisle_id').distinct().collect()))
print('unique values:',sorted([int(row['aisle_id']) for row in df_products.select('aisle_id').distinct().collect()]))

--- products: unique product_id ---
num unique: 49687
--- products: unique aisle_id ---
num unique: 134
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134]


In [26]:
print('--- aisles: unique department_id ---')
print('num unique:',len(df_products.select('department_id').distinct().collect()))
print('unique values:',sorted([int(row['department_id']) for row in df_products.select('department_id').distinct().collect()]))
#print('unique values:',sorted([(row['department_id']) for row in df_products.where(col('department_id').isNotNull()).select('department_id').distinct().collect()]))

--- aisles: unique department_id ---
num unique: 21
unique values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]


### products summary
- There are 49,687 unique product_id values (after dropping one NULL row).
- There are 134 unique aisle_id values, numbered from 1 to 134 (after dropping one NULL row).
- There are 21 unique department_id values, numbered from 1 to 21 (after dropping one NULL row).

## order_products

### order_products schema

In [27]:
print('--- order_products: schema ---')
print(df_order_products.printSchema())

--- order_products: schema ---
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)

None


### order_products head

In [28]:
print('--- order_products: head ---')
print(df_order_products.show(5))

--- order_products: head ---
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       2|     33120|                1|        1|
|       2|     28985|                2|        1|
|       2|      9327|                3|        0|
|       2|     45918|                4|        1|
|       2|     30035|                5|        0|
+--------+----------+-----------------+---------+
only showing top 5 rows

None


### order_products statistics

In [29]:
print('--- order_products: unique order_id ---')
print('num unique:',len(df_order_products.select('order_id').distinct().collect()))

--- order_products: unique order_id ---
num unique: 3214874


In [30]:
print('--- order_products: unique product_id ---')
print('num unique:',len(df_order_products.select('product_id').distinct().collect()))

--- order_products: unique product_id ---
num unique: 49677


In [31]:
print('--- descriptive statistics by number of items in cart ---')
print('min num items:',np.min(df_order_products.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('mean num items:',np.mean(df_order_products.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('med num items:',np.median(df_order_products.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('max num items:',np.max(df_order_products.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))

--- descriptive statistics by number of items in cart ---
min num items: 1
mean num items: 10.088883421247614
med num items: 8.0
max num items: 145


In [32]:
print('--- descriptive statistics by number of items reordered ---')
print('min latest order:',np.min(df_order_products.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('mean latest order:',np.mean(df_order_products.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('med latest order:',np.median(df_order_products.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('max latest order:',np.max(df_order_products.groupBy('order_id').sum().select('sum(reordered)').collect()))

--- descriptive statistics by number of items reordered ---
min latest order: 0
mean latest order: 5.949388996271704
med latest order: 4.0
max latest order: 130


### order_products summary
- There are 3,214,874 unique order_id values (there were 3,421,083 in orders.csv)
- There are 49,677 unique product_id values (there were 49,687 in products.csv)
- Users ordered between 1 and 130 items (inclusive) across all orders, with a mean of 10.1 items and a median of 8 items.
- Users re-ordered between 0 and 130 items (inclusive) across all orders, with a mean of 5.9 items and a median of 4 items.

## order_products2

### order_products2 schema

In [33]:
print('--- order_products: schema ---')
print(df_order_products2.printSchema())

--- order_products: schema ---
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)

None


### order_products2 head

In [34]:
print('--- order_products: head ---')
print(df_order_products2.show(5))

--- order_products: head ---
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
|       1|     10246|                3|        0|
|       1|     49683|                4|        0|
|       1|     43633|                5|        1|
+--------+----------+-----------------+---------+
only showing top 5 rows

None


### order_products2 statistics

In [35]:
print('--- order_products: unique order_id ---')
print('num unique:',len(df_order_products2.select('order_id').distinct().collect()))

--- order_products: unique order_id ---
num unique: 131209


In [36]:
print('--- order_products: unique product_id ---')
print('num unique:',len(df_order_products2.select('product_id').distinct().collect()))

--- order_products: unique product_id ---
num unique: 39123


In [37]:
print('--- descriptive statistics by number of items in cart ---')
print('min num items:',np.min(df_order_products2.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('mean num items:',np.mean(df_order_products2.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('med num items:',np.median(df_order_products2.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))
print('max num items:',np.max(df_order_products2.groupBy('order_id').max().select('max(add_to_cart_order)').collect()))

--- descriptive statistics by number of items in cart ---
min num items: 1
mean num items: 10.552759338155157
med num items: 9.0
max num items: 80


In [38]:
print('--- descriptive statistics by number of items reordered ---')
print('min latest order:',np.min(df_order_products2.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('mean latest order:',np.mean(df_order_products2.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('med latest order:',np.median(df_order_products2.groupBy('order_id').sum().select('sum(reordered)').collect()))
print('max latest order:',np.max(df_order_products2.groupBy('order_id').sum().select('sum(reordered)').collect()))

--- descriptive statistics by number of items reordered ---
min latest order: 0
mean latest order: 6.316822778925226
med latest order: 5.0
max latest order: 71


### order_products2 summary
- There are 131,209  unique order_id values (there were 3,214,874 in order_products__prior.csv and 3,421,083 in orders.csv)
- There are 39,123 unique product_id values (there were 49,677 in order_products__prior.csv and 49,687 in products.csv)
- Users ordered between 1 and 80 items (inclusive) across all orders, with a mean of 10.6 items and a median of 9 items.
- Users re-ordered between 0 and 71 items (inclusive) across all orders, with a mean of 6.3 items and a median of 5 items.

---
# DATA TRANSFORMATION
---

## merge aisle | department | products

### join code

In [39]:
# create temp tables
df_products.registerTempTable('prod')
df_aisles.registerTempTable('aisles')

# define sql query
sqlQuery = '''
SELECT prod.*, aisle
FROM prod LEFT JOIN aisles
ON prod.aisle_id = aisles.aisle_id
'''

df_tmp1 = sql.sql(sqlQuery)

# create temp tables
df_tmp1.registerTempTable('prod_aisle')
df_departments.registerTempTable('depts')

# define sql query
sqlQuery = '''
SELECT prod_aisle.*, department
FROM prod_aisle LEFT JOIN depts
ON prod_aisle.department_id = depts.department_id
'''

df_store_info = sql.sql(sqlQuery)

print('--- cell execution complete ---')

--- cell execution complete ---


In [40]:
# rearrange columns
df_store_info = df_store_info.select('product_id','product_name','aisle_id','aisle','department_id','department')

print('--- cell execution complete ---')

--- cell execution complete ---


### join result head

In [41]:
df_store_info.show(5)

+----------+--------------------+--------+--------------------+-------------+----------+
|product_id|        product_name|aisle_id|               aisle|department_id|department|
+----------+--------------------+--------+--------------------+-------------+----------+
|         1|Chocolate Sandwic...|      61|       cookies cakes|           19|    snacks|
|         2|    All-Seasons Salt|     104|   spices seasonings|           13|    pantry|
|         3|Robust Golden Uns...|      94|                 tea|            7| beverages|
|         4|Smart Ones Classi...|      38|        frozen meals|            1|    frozen|
|         5|Green Chile Anyti...|       5|marinades meat pr...|           13|    pantry|
+----------+--------------------+--------+--------------------+-------------+----------+
only showing top 5 rows



### write to file

In [42]:
if WRITE_TO_FILE:
    df_store_info.write.parquet('instacart_store_info.parquet')
else:
    print('skipping write to file: df_store_info.parquet')

## merge order_products and order_products2

### union code

In [43]:
# print shape of individual dataframes
print('--- df_order_products shape ---')
print(df_order_products.count(),len(df_order_products.columns),'\n')
print('--- df_order_products2 shape ---')
print(df_order_products2.count(),len(df_order_products2.columns),'\n')

# stack dataframes using union method
df_order_products_union = df_order_products.union(df_order_products2)

# print shape of resulting dataframe
print('--- df_order_products_union shape ---')
print(df_order_products_union.count(),len(df_order_products_union.columns),'\n')

# sanity check
print('--- sanity check row count ---')
print(df_order_products.count() + df_order_products2.count() == df_order_products_union.count())



--- df_order_products shape ---
32434489 4 

--- df_order_products2 shape ---
1384617 4 

--- df_order_products_union shape ---
33819106 4 

--- sanity check row count ---
True


### union result head

In [44]:
df_order_products_union.show(5)

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       2|     33120|                1|        1|
|       2|     28985|                2|        1|
|       2|      9327|                3|        0|
|       2|     45918|                4|        1|
|       2|     30035|                5|        0|
+--------+----------+-----------------+---------+
only showing top 5 rows



## merge department_id into order_products_union

### join code

In [45]:
# create temp tables
df_order_products_union.registerTempTable('opu')
df_products.registerTempTable('prods')

# define sql query
sqlQuery = '''
SELECT order_id, opu.product_id, department_id, add_to_cart_order, reordered
FROM opu LEFT JOIN prods
ON opu.product_id = prods.product_id
'''

# execute sql query and assign to new dataframe
df_order_products_deptid = sql.sql(sqlQuery)

print('--- cell execution complete ---')

--- cell execution complete ---


### join result head

In [46]:
df_order_products_deptid.show(5)

+--------+----------+-------------+-----------------+---------+
|order_id|product_id|department_id|add_to_cart_order|reordered|
+--------+----------+-------------+-----------------+---------+
|       2|     33120|           16|                1|        1|
|       2|     28985|            4|                2|        1|
|       2|      9327|           13|                3|        0|
|       2|     45918|           13|                4|        1|
|       2|     30035|           13|                5|        0|
+--------+----------+-------------+-----------------+---------+
only showing top 5 rows



## merge department and product info into order_products_deptid

### join code

In [47]:
# create temp tables
df_order_products_deptid.registerTempTable('opd')
df_products.registerTempTable('prods')

# define sql query
sqlQuery = '''
SELECT order_id, opd.product_id, product_name, opd.department_id, add_to_cart_order, reordered
FROM opd LEFT JOIN prods
ON opd.product_id = prods.product_id
'''

# execute sql query and assign to new dataframe
df_tmp = sql.sql(sqlQuery)

# create temp tables
df_tmp.registerTempTable('tmp')
df_departments.registerTempTable('depts')

# define sql query
sqlQuery = '''
SELECT order_id, product_id, product_name, tmp.department_id, department, add_to_cart_order, reordered
FROM tmp LEFT JOIN depts
ON tmp.department_id = depts.department_id
'''

df_order_info = sql.sql(sqlQuery)

print('--- cell execution complete ---')

--- cell execution complete ---


### join results head

In [48]:
df_order_info.show(5)

+--------+----------+--------------------+-------------+----------+-----------------+---------+
|order_id|product_id|        product_name|department_id|department|add_to_cart_order|reordered|
+--------+----------+--------------------+-------------+----------+-----------------+---------+
|       2|     33120|  Organic Egg Whites|           16|dairy eggs|                1|        1|
|       2|     28985|Michigan Organic ...|            4|   produce|                2|        1|
|       2|      9327|       Garlic Powder|           13|    pantry|                3|        0|
|       2|     45918|      Coconut Butter|           13|    pantry|                4|        1|
|       2|     30035|   Natural Sweetener|           13|    pantry|                5|        0|
+--------+----------+--------------------+-------------+----------+-----------------+---------+
only showing top 5 rows



### write to file

In [60]:
if WRITE_TO_FILE:
    df_order_info.write.parquet('instacart_order_info.parquet')
else:
    print('skipping write to file: df_order_info.parquet')

## pivot order_products_deptid

### pivot code

In [50]:
# function to rename pivoted columns, adds a prefix
def rename_pivot_cols(columns, prefix='prefix'):
    return [f'{prefix}' + c for c in columns]

print('--- rename_pivot_cols() function defined ---')

--- rename_pivot_cols() function defined ---


In [51]:
# group by order_id and pivot on department_id, assign to tmp1
df_tmp1 = df_order_products_deptid.groupBy('order_id').pivot('department_id').count()

# group by order_id and pivot on reordered, assign to tmp2
df_tmp2 = df_order_products_deptid.groupBy('order_id').pivot('reordered').count()

if DEBUG:
    # print results
    print('--- pivot results: pivot on department_id ---')
    df_tmp1.show(5)
    print('--- pivot results: pivot on reordered ---')
    df_tmp2.show(5)
else:
    print('--- cell execution complete ---')

--- cell execution complete ---


In [52]:
if DEBUG:
    # sanity check the pivot results above
    df_order_products_deptid.filter(df_order_products_deptid.order_id == 1645).show()
    df_order_products_deptid.filter(df_order_products_deptid.order_id == 19984).show()
    df_order_products_deptid.filter(df_order_products_deptid.order_id == 210661).show()
else:
    print('--- cell execution skipped ---')

--- cell execution skipped ---


In [53]:
# rename pivoted columns for tmp1 dataframe
new_names = ['order_id','null'] + rename_pivot_cols(df_tmp1.columns[2:], 'd')
df_tmp1 = df_tmp1.toDF(*new_names)

# rename pivoted columns for tmp2 dataframe
new_names = ['order_id'] + rename_pivot_cols(['0','1'], 'reord')
df_tmp2 = df_tmp2.toDF(*new_names)

# print results
print('--- tmp1 renamed columns ---')
print(df_tmp1.columns)
print('--- tmp2 renamed columns')
print(df_tmp2.columns)

--- tmp1 renamed columns ---
['order_id', 'null', 'd1', 'd2', 'd3', 'd4', 'd5', 'd6', 'd7', 'd8', 'd9', 'd10', 'd11', 'd12', 'd13', 'd14', 'd15', 'd16', 'd17', 'd18', 'd19', 'd20', 'd21']
--- tmp2 renamed columns
['order_id', 'reord0', 'reord1']


In [54]:
# create temp tables
df_tmp1.registerTempTable('tmp1')
df_tmp2.registerTempTable('tmp2')

# define sql query
sqlQuery = '''
SELECT tmp1.*, tmp2.reord1
FROM tmp1 LEFT JOIN tmp2
ON tmp1.order_id = tmp2.order_id
'''

# execute sql query and assign to new dataframe
df_order_products_pivot = sql.sql(sqlQuery)

print('--- cell execution complete ---')

--- cell execution complete ---


### pivot result head

In [55]:
df_order_products_pivot.show(5)

+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+
|order_id|null|  d1|  d2|  d3|  d4|  d5|  d6|  d7|  d8|  d9| d10| d11| d12| d13| d14| d15| d16| d17| d18| d19| d20| d21|reord1|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+
|     148|null|   2|null|null|   4|null|null|null|null|   1|null|   1|null|   1|null|null|   4|null|null|   1|null|null|    10|
|     463|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|   1|null|null|null|null|  null|
|     471|null|null|null|null|   1|null|null|null|null|null|null|null|   1|   2|null|null|   3|null|null|   1|null|null|     1|
|     496|null|null|null|null|   1|null|null|null|null|null|null|null|   1|   1|null|null|null|null|null|null|null|null|     1|
|     833|null|null|null|null|   1|null|null|   2|null|null|null|   1|null|   1|null|null|   1|null|null

## merge order_products_pivot and orders

### join code

In [56]:
# create temp tables
df_order_products_pivot.registerTempTable('opp')
df_orders.registerTempTable('orders')

# define sql query
sqlQuery = '''
SELECT user_id, eval_set, order_number, order_dow, order_hour_of_day AS order_hour, opp.*, days_since_prior_order AS days_elapsed
FROM orders LEFT JOIN opp
ON orders.order_id = opp.order_id
'''

# execute sql query and assign to new dataframe
df_final = sql.sql(sqlQuery)

print('--- cell execution complete ---')

--- cell execution complete ---


In [57]:
# drop null column
df_final = df_final.drop('null')

# replace nulls with 0
df_final = df_final.na.fill(0)

# add sum of items
df_final = df_final.withColumn('num_items', sum(df_final[col] for col in df_final.columns[6:27]))

# rearrange columns
df_final = df_final.select(['order_id','user_id','eval_set','order_number','order_dow','order_hour','d1','d2','d3','d4','d5','d6','d7','d8','d9','d10','d11', 
                 'd12','d13','d14','d15','d16','d17','d18','d19','d20','d21','days_elapsed','reord1','num_items'])

print('--- cell execution complete ---')

--- cell execution complete ---


### join result head

In [58]:
print(df_final.printSchema())
df_final.show(5)

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour: integer (nullable = true)
 |-- d1: long (nullable = true)
 |-- d2: long (nullable = true)
 |-- d3: long (nullable = true)
 |-- d4: long (nullable = true)
 |-- d5: long (nullable = true)
 |-- d6: long (nullable = true)
 |-- d7: long (nullable = true)
 |-- d8: long (nullable = true)
 |-- d9: long (nullable = true)
 |-- d10: long (nullable = true)
 |-- d11: long (nullable = true)
 |-- d12: long (nullable = true)
 |-- d13: long (nullable = true)
 |-- d14: long (nullable = true)
 |-- d15: long (nullable = true)
 |-- d16: long (nullable = true)
 |-- d17: long (nullable = true)
 |-- d18: long (nullable = true)
 |-- d19: long (nullable = true)
 |-- d20: long (nullable = true)
 |-- d21: long (nullable = true)
 |-- days_elapsed: float (nullable = false)
 |-- reord1: lon

### write to file

In [61]:
if WRITE_TO_FILE:
    df_final.write.parquet('instacart_order_pivot.parquet')
else:
    print('skipping write to file: df_order_pivot.parquet')

In [None]:
df_final.select('order_id','days_elapsed').filter(df_final.days_elapsed==0)

In [65]:
df_departments.show(39)

+-------------+---------------+
|department_id|     department|
+-------------+---------------+
|            1|         frozen|
|            2|          other|
|            3|         bakery|
|            4|        produce|
|            5|        alcohol|
|            6|  international|
|            7|      beverages|
|            8|           pets|
|            9|dry goods pasta|
|           10|           bulk|
|           11|  personal care|
|           12|   meat seafood|
|           13|         pantry|
|           14|      breakfast|
|           15|   canned goods|
|           16|     dairy eggs|
|           17|      household|
|           18|         babies|
|           19|         snacks|
|           20|           deli|
|           21|        missing|
+-------------+---------------+

