# Load data into Data frames

In [3]:
import os
import pandas as pd
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc,col,sum
from pyspark.sql import *
import logging

In [6]:
logger=logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter=logging.Formatter('%(asctime)s:%(created)f:%(filename)s:%(message)s:%(message)s')
file_handler=logging.FileHandler('assignment6.log')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)

In [None]:
logger=logging.getLogger(__name__)

In [3]:
conf=SparkConf().setAppName('assessment_6a')
sc=SparkContext(conf=conf)

In [4]:
sql=SQLContext(sc)

# loading csv files onto spark dataframe

In [5]:
orders=sql.read.options(header=True,inferschema=True).csv('orders.csv')

In [6]:
departments=sql.read.options(header=True,inferschema=True).csv('departments.csv')

In [8]:
products=sql.read.options(header=True,inferschema=True).csv('products.csv')

In [9]:
aisles=sql.read.options(header=True,inferschema=True).csv('aisles.csv')

In [10]:
train=sql.read.options(header=True,inferschema=True).csv('order_products__train.csv')

# Another method to read csv into Sparkdataframe-use pandas to read the file and then load in onto sparkdataframe

In [26]:
two=pd.read_csv('/mnt/home/edureka_37986/aisles.csv')
one=pd.read_csv('/mnt/home/edureka_37986/orders.csv')
three=pd.read_csv('/mnt/home/edureka_37986/departments.csv')
four=pd.read_csv('/mnt/home/edureka_37986/products.csv')
six=pd.read_csv('/mnt/home/edureka_37986/order_products__train.csv')
five=pd.read_csv('/mnt/home/edureka_37986/order_products__prior.csv')

# Move python dataframes to pyspark dataframe

In [27]:
aisles_a=sql.createDataFrame(two)
orders_df=sql.createDataFrame(one)
departments=sql.createDataFrame(three)
products=sql.createDataFrame(four)
prior=sql.createDataFrame(five)
train_df=sql.createDataFrame(six)

In [11]:
final=orders.alias('a').join(train.alias('b'),on=["order_id"],how='left_outer').select('a.*','b.product_id','b.add_to_cart_order','b.reordered')            

In [12]:
final2=final.alias('a').join(products.alias('b'),on=["product_id"],how='left_outer').select('a.*','b.product_name','b.aisle_id','b.department_id')

In [13]:
final3=final2.alias('a').join(departments.alias('b'),on=["department_id"],how='left_outer').select('a.*','b.department')

In [14]:
final4=final3.alias('a').join(aisles.alias('b'),on=["aisle_id"],how='left_outer').select('a.*','b.aisle')

# register the final dataframe as a temp table

In [15]:
final4.registerTempTable('hey')

#order_number,reordered,product_name,department,aisle,order_hour_of_day

# Most re-ordered products

In [16]:
t=sql.sql('select product_name,sum(reordered) as Reordered_count from hey group by 1 order by Reordered_count desc ')

+--------------------+---------------+
|        product_name|Reordered_count|
+--------------------+---------------+
|              Banana|          16557|
|Bag of Organic Ba...|          13362|
|Organic Strawberries|           8603|
|Organic Baby Spinach|           8055|
|     Organic Avocado|           6226|
|Organic Hass Avocado|           6042|
|         Large Lemon|           5923|
|        Strawberries|           4786|
| Organic Raspberries|           4279|
|               Limes|           4234|
+--------------------+---------------+
only showing top 10 rows



In [None]:
logger.info(t)

# Most important department and aisle (by number of products) 

In [17]:
tt=sql.sql('select department,aisle,count( distinct product_name)as distinct_count from hey group by department,aisle order by department,aisle,distinct_count desc')

+----------+--------------------+--------------+
|department|               aisle|distinct_count|
+----------+--------------------+--------------+
|      null|                null|             0|
|   alcohol|       beers coolers|           225|
|   alcohol|           red wines|           143|
|   alcohol|specialty wines c...|            56|
|   alcohol|             spirits|           126|
|   alcohol|         white wines|            97|
|    babies|    baby accessories|            22|
|    babies| baby bath body care|            90|
|    babies|   baby food formula|           606|
|    babies|       diapers wipes|           151|
|    bakery|     bakery desserts|           216|
|    bakery|               bread|           505|
|    bakery|    breakfast bakery|           201|
|    bakery|          buns rolls|           171|
|    bakery|tortillas flat bread|           220|
| beverages|   cocoa drink mixes|           160|
| beverages|              coffee|           547|
| beverages|energy s

In [None]:
logger.info(tt)

# Get the Top 10 departments by orders counts

In [18]:
ttt=sql.sql('select department,sum(order_number)as Total_orders from hey group by department order by department,Total_orders desc')

+------------+------------+
|  department|Total_orders|
+------------+------------+
|        null|    56509607|
|     alcohol|       85259|
|      babies|      297191|
|      bakery|      831727|
|   beverages|     1954927|
|   breakfast|      492407|
|        bulk|       27455|
|canned goods|      724702|
|  dairy eggs|     3818365|
|        deli|      748747|
+------------+------------+
only showing top 10 rows



In [None]:
logger.info(ttt)

# List top 10 products ordered in the morning (6 AM to 11 AM) 

In [19]:
nine=sql.sql('select product_name,order_hour_of_day,count(order_number)as Order_count from hey group by product_name,order_hour_of_day having order_hour_of_day > 6')

In [35]:
nine.sort(desc('Order_count'),'order_hour_of_day').filter(col('product_name').isNotNull())

+--------------------+-----------------+-----------+
|        product_name|order_hour_of_day|Order_count|
+--------------------+-----------------+-----------+
|              Banana|               14|       1563|
|              Banana|               15|       1519|
|              Banana|               10|       1503|
|              Banana|               11|       1476|
|              Banana|               16|       1467|
|              Banana|               12|       1458|
|              Banana|               13|       1453|
|              Banana|                9|       1372|
|Bag of Organic Ba...|               15|       1334|
|              Banana|               17|       1318|
|Bag of Organic Ba...|               14|       1302|
|Bag of Organic Ba...|               16|       1285|
|Bag of Organic Ba...|               10|       1221|
|Bag of Organic Ba...|               13|       1218|
|Bag of Organic Ba...|               11|       1193|
|Bag of Organic Ba...|               12|      

In [None]:
logger.info(nine)

In [14]:
final4.select('product_name','order_dow','order_number').filter(col('product_name').isNotNull()).show(20)

+--------------------+---------+------------+
|        product_name|order_dow|order_number|
+--------------------+---------+------------+
|Bag of Organic Ba...|        3|          32|
|  Seedless Cucumbers|        3|          32|
|   Organic Mandarins|        3|          32|
|Organic Strawberries|        3|          32|
|Versatile Stain R...|        3|          32|
|    Pink Lady Apples|        3|          32|
|Chicken Apple Sau...|        3|          32|
|          Raw Shrimp|        3|          32|
|       Cracked Wheat|        2|          31|
|Organic Greek Who...|        2|          31|
|       Navel Oranges|        2|          31|
|             Spinach|        2|          31|
|Original Patties ...|        2|          31|
|Cinnamon Multigra...|        2|          31|
|Chewy 25% Low Sug...|        2|          31|
|Uncured Genoa Salami|        2|          31|
|Natural Vanilla I...|        2|          31|
|        Lemon Yogurt|        2|          31|
|Strawberry Rhubar...|        2|  

# Do people usually reorder the same previous ordered products? 

In [21]:
five_one=sql.sql('select user_id,days_since_prior_order,count(reordered) as Total_reordered from hey group by user_id,days_since_prior_order having days_since_prior_order = 1')

In [22]:
five_two=sql.sql('select user_id,count(order_number) as Total_ordered from hey group by user_id')

In [25]:
five_final=five_one.join(five_two,on=['user_id'],how='left_outer').sort(desc('Total_reordered'))

In [29]:
five_final.columns

['user_id', 'days_since_prior_order', 'Total_reordered', 'Total_ordered']

In [35]:
five_final.select('*',(five_final.Total_reordered/five_final.Total_ordered).alias('Percent of re-ordering')).sort(desc('Percent of re-ordering')).show()

+-------+----------------------+---------------+-------------+----------------------+
|user_id|days_since_prior_order|Total_reordered|Total_ordered|Percent of re-ordering|
+-------+----------------------+---------------+-------------+----------------------+
|  94096|                   1.0|             49|           52|    0.9423076923076923|
| 191925|                   1.0|             44|           49|    0.8979591836734694|
|  71369|                   1.0|             34|           38|    0.8947368421052632|
|   2014|                   1.0|             24|           27|    0.8888888888888888|
| 145937|                   1.0|             22|           25|                  0.88|
| 186026|                   1.0|             50|           57|    0.8771929824561403|
| 186069|                   1.0|             19|           22|    0.8636363636363636|
|  86347|                   1.0|             19|           22|    0.8636363636363636|
| 163823|                   1.0|             19|      