<div style="text-align: center;">
    <img src="images/basket-equation.png" alt="Support, Confidence, and Lift metrics" width="882" height="446">
</div>

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, expr
from pyspark.ml.fpm import FPGrowth
from wordcloud import WordCloud

In [None]:
spark = (SparkSession.builder
         .appName("basket-analysis")
         .enableHiveSupport()
         .config("spark.driver.memory", "3g")
         .config("spark.executor.memory", "3g")
         .getOrCreate())

In [None]:
INSTACART_DATA = "../data/instacart"

In [None]:
aisles = spark.read.csv(f"{INSTACART_DATA}/aisles.csv", header=True, inferSchema=True)
departments = spark.read.csv(f"{INSTACART_DATA}/departments.csv", header=True, inferSchema=True)
order_products_prior = spark.read.csv(f"{INSTACART_DATA}/order_products__prior.csv", header=True, inferSchema=True)
order_products_train = spark.read.csv(f"{INSTACART_DATA}/order_products__train.csv", header=True, inferSchema=True)
orders = spark.read.csv(f"{INSTACART_DATA}/orders.csv", header=True, inferSchema=True)
products = spark.read.csv(f"{INSTACART_DATA}/products.csv", header=True, inferSchema=True)

In [None]:
aisles.createOrReplaceTempView("aisles")
departments.createOrReplaceTempView("departments")
order_products_prior.createOrReplaceTempView("order_products_prior")
order_products_train.createOrReplaceTempView("order_products_train")
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")

In [None]:
orders.show(n=5)

In [None]:
products.show(n=5)

In [None]:
aisles.show(n=5)

In [None]:
departments.show(n=5)

In [None]:
order_products_train.show(n=5)

In [None]:
order_products_prior.show(n=5)

In [None]:
query = """
select count(order_id) as total_orders, order_hour_of_day as hour 
from orders 
group by order_hour_of_day 
order by order_hour_of_day
"""
orders_by_hour = spark.sql(query)
orders_by_hour.show(10)

In [None]:
orders_by_hour_plot = orders_by_hour.toPandas()

# Plot using seaborn and matplotlib
sns.set(style="whitegrid")

plt.figure(figsize=(10, 6))
sns.lineplot(x='hour', y='total_orders', data=orders_by_hour_plot, marker='o', markersize=6)

# Adding circles around the points
for i in range(orders_by_hour_plot.shape[0]):
    plt.scatter(orders_by_hour_plot['hour'][i], orders_by_hour_plot['total_orders'][i], s=200, facecolors='none', edgecolors='r')

plt.title('Total Orders by Hour of the Day')
plt.xlabel('Hour of the Day')
plt.ylabel('Total Orders')
plt.xticks(range(24))  # Assuming the hours are in the range 0-23
plt.show()

In [None]:
query = """
select days_since_prior_order, count(order_id) as total_orders
from orders 
group by days_since_prior_order 
order by days_since_prior_order
"""
days_since_prior_order = spark.sql(query)
days_since_prior_order.show(10)

In [None]:
days_since_prior_order_plot = days_since_prior_order.toPandas()

# Plot using seaborn
sns.set(style="whitegrid")

plt.figure(figsize=(10, 6))
sns.barplot(x='days_since_prior_order', y='total_orders', data=days_since_prior_order_plot, 
            palette='viridis', hue='days_since_prior_order', dodge=False, legend=False)

plt.title('Total Orders by Days Since Prior Order')
plt.xlabel('Days Since Prior Order')
plt.ylabel('Total Orders')
plt.show()

In [None]:
query = """
select count(order_id) as total_orders, 
  (case 
     when order_dow = '0' then 'Sunday'
     when order_dow = '1' then 'Monday'
     when order_dow = '2' then 'Tuesday'
     when order_dow = '3' then 'Wednesday'
     when order_dow = '4' then 'Thursday'
     when order_dow = '5' then 'Friday'
     when order_dow = '6' then 'Saturday'              
   end) as day_of_week 
  from orders  
 group by order_dow 
 order by total_orders desc
"""
order_by_weekday = spark.sql(query)
order_by_weekday.show()

In [None]:
order_by_weekday_plot = order_by_weekday.toPandas()

# Plot using seaborn with 'Blues' palette
sns.set(style="whitegrid")

plt.figure(figsize=(10, 6))
sns.barplot(x='day_of_week', y='total_orders', hue='day_of_week', data=order_by_weekday_plot,
            palette='viridis', dodge=False, legend=False)

plt.title('Total Orders by Day of the Week')
plt.xlabel('Day of the Week')
plt.ylabel('Total Orders')
plt.show()

In [None]:
spark.sql("DROP TABLE IF EXISTS order_items_temp")

query = """
create table order_items_temp as
(select op.*, p.product_name, p.aisle_id, p.department_id, d.department from
 (select * from order_products_train 
 union
 select * from order_products_prior) as op
 inner join products as p
 on op.product_id = p.product_id
 inner join departments as d
 on p.department_id = d.department_id)
"""
spark.sql(query)

In [None]:
query = """
select order_id, count(product_id) as total_items
from order_items_temp 
group by order_id
"""
items_by_order = spark.sql(query)
items_by_order.show(10)

In [None]:
query = """
select total_items, count(order_id) as num_orders
from (
    select order_id, count(product_id) as total_items
    from order_items_temp 
    group by order_id
) as items_by_order
group by total_items
order by total_items
"""
items_by_order_aggregated = spark.sql(query)
items_by_order_aggregated.show(10)

In [None]:
items_by_order_plot = items_by_order_aggregated.toPandas()

sns.set(style="whitegrid")

plt.figure(figsize=(18, 6))
bar_plot = sns.barplot(x='total_items', y='num_orders', data=items_by_order_plot, 
            hue='total_items', palette='viridis', dodge=False, legend=False)

bar_plot.xaxis.set_major_locator(ticker.MultipleLocator(4))
bar_plot.yaxis.set_major_formatter(ticker.FuncFormatter(lambda x, pos: f'{int(x/1000)}k'))

plt.title('Number of Orders by Total Items')
plt.xlabel('Total Items')
plt.ylabel('Number of Orders')
plt.show()

In [None]:
query = """
select department, count(*) as orders_count from order_items_temp
group by department
order by orders_count desc
limit 10
"""
orders_by_department = spark.sql(query)
orders_by_department.show()

In [None]:
orders_by_department_plot = orders_by_department.toPandas()

plt.figure(figsize=(10, 7))
plt.pie(orders_by_department_plot['orders_count'], labels=orders_by_department_plot['department'],
        autopct='%1.1f%%', startangle=140)

# Add a legend
plt.legend(orders_by_department_plot['department'], title="Departments", bbox_to_anchor=(1.05, 1), loc='best')

plt.title('Top 10 Departments by Order Count')
plt.show()

In [None]:
query = """
select product_name, count(*) as orders_count from order_items_temp
group by product_name
order by orders_count desc
limit 200
"""
product_by_order = spark.sql(query)
product_by_order.show(10)

In [None]:
query = """
SELECT product_name
FROM (
  SELECT product_name, count(*) AS orders_count
  FROM order_items_temp
  GROUP BY product_name
  ORDER BY orders_count DESC
  LIMIT 200
)
"""
words_df = spark.sql(query)
words = words_df.rdd.flatMap(lambda x: x).collect()
words_str = ' '.join(words)
word_cloud = WordCloud(background_color="white").generate(words_str)

In [None]:
plt.figure(figsize=(14, 10))
plt.imshow(word_cloud, interpolation='bilinear')
plt.axis("off")
plt.show()
display()

In [None]:
query = """
SELECT products.product_name, order_products.order_id 
FROM products 
INNER JOIN order_products_train AS order_products  
WHERE order_products.product_id = products.product_id
"""

raw_data = spark.sql(query)
raw_data.show(5, truncate=False)

In [None]:
baskets = raw_data.groupBy('order_id').agg(collect_set('product_name').alias('items'))
baskets.createOrReplaceTempView('baskets')
baskets.show(5, truncate=False)

In [None]:
query = """
SELECT items from baskets
""" 

baskets_items = spark.sql(query).withColumn('items', expr('TRANSFORM(items, x -> CAST(x AS STRING))'))
baskets_items.show(5, truncate=False)

In [None]:
fpgrowth = FPGrowth().setItemsCol("items").setMinSupport(0.001).setMinConfidence(0)
model = fpgrowth.fit(baskets_items)

In [None]:
most_popular_item_in_basket = model.freqItemsets
most_popular_item_in_basket.createOrReplaceTempView("most_popular_item_in_basket")

In [None]:
if_then = model.associationRules
if_then.createOrReplaceTempView("if_then")

In [None]:
query = """
SELECT items, freq
FROM most_popular_item_in_basket 
WHERE SIZE(items) > 2 
ORDER BY freq desc
LIMIT 20
"""

items_freq = spark.sql(query)
items_freq.show(5, truncate=False)

In [None]:
query = """
SELECT antecedent as `antecedent (if)`, consequent as `consequent (then)`, confidence 
FROM if_then 
ORDER BY confidence DESC
LIMIT 20
"""

confidence = spark.sql(query)
confidence.show(5, truncate=False)

In [None]:
query = """
SELECT * FROM if_then 
WHERE lift > 1
ORDER BY lift DESC
"""

lift = spark.sql(query)
lift.show(5, truncate=False)