# SnackChain Big Data Analysis

This notebook analyzes SnackChain sales data using PySpark, SparkSQL, and SparkML.

Spring 2024 Final Project - Dr. Kaushik Dutta 

Group 9: Penelope Tangamu, Jae-Ann Smith, Pooja Radhakrishnan, Amber Anderson

## Step 1: Setup Spark Session

In [0]:
#Ensure cluster is active, re-run periodically to prevent cluster from timing out
spark.version


Out[1]: '3.3.2'

## Step 2: Load Data

In [0]:
# Converted .xlsx file to .csv before uploading to databricks
# Load datasets (after uploading it to /FileStore/)
products = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/FileStore/tables/products.csv')


stores = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/FileStore/tables/stores.csv')

transactions = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/FileStore/tables/transactions.csv')




## Step 3: Data Cleaning and Processing

In [0]:
# Check structure of each table 
products.printSchema()
transactions.printSchema()
stores.printSchema()

products.describe().show()
transactions.describe().show()
stores.describe().show()

root
 |-- UPC: long (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- SUB_CATEGORY: string (nullable = true)
 |-- PRODUCT_SIZE: string (nullable = true)

root
 |-- WEEK_END_DATE: string (nullable = true)
 |-- STORE_NUM: integer (nullable = true)
 |-- UPC: long (nullable = true)
 |-- UNITS: integer (nullable = true)
 |-- VISITS: integer (nullable = true)
 |-- HHS: integer (nullable = true)
 |-- SPEND: double (nullable = true)
 |-- PRICE: double (nullable = true)
 |-- BASE_PRICE: double (nullable = true)
 |-- FEATURE: integer (nullable = true)
 |-- DISPLAY: integer (nullable = true)
 |-- TPR_ONLY: integer (nullable = true)

root
 |-- STORE_ID: integer (nullable = true)
 |-- STORE_NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- MSA: integer (nullable = true)
 |-- SEGMENT: string (nullable = true)
 |-- PARKING: integer (nullable = 

In [0]:
from pyspark.sql.functions import col, count, when
# Check missing values in products
products.select([count(when(col(c).isNull(), c)).alias(c) for c in products.columns]).show()
transactions.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions.columns]).show()
stores.select([count(when(col(c).isNull(), c)).alias(c) for c in stores.columns]).show()
#Results:
    # Product: 16 rows missing columns
    # Transaction: 157k rows  (likely csv converting issue)
    # Stores: PARKING column -- replace missing with 0, one weird/unknown column _c9(likely csv converting issue)


+---+-----------+------------+--------+------------+------------+
|UPC|DESCRIPTION|MANUFACTURER|CATEGORY|SUB_CATEGORY|PRODUCT_SIZE|
+---+-----------+------------+--------+------------+------------+
| 16|         16|          16|      16|          16|          16|
+---+-----------+------------+--------+------------+------------+

+-------------+---------+------+------+------+------+------+------+----------+-------+-------+--------+
|WEEK_END_DATE|STORE_NUM|   UPC| UNITS|VISITS|   HHS| SPEND| PRICE|BASE_PRICE|FEATURE|DISPLAY|TPR_ONLY|
+-------------+---------+------+------+------+------+------+------+----------+-------+-------+--------+
|       157371|   157371|157371|157371|157371|157371|157371|157394|    157556| 157371| 157371|  157371|
+-------------+---------+------+------+------+------+------+------+----------+-------+-------+--------+

+--------+----------+----+-----+---+-------+-------+----+------------------+---+
|STORE_ID|STORE_NAME|CITY|STATE|MSA|SEGMENT|PARKING|SIZE|AVG_WEEKLY

In [0]:
# Drop missing rows from products and transactions
products = products.dropna()
transactions = transactions.dropna()

# Drop stores _c9 column 
stores = stores.drop('_c9')

# handle missing values in PARKING 
from pyspark.sql.functions import mean, round as pyspark_round
parking_mean = stores.select(mean('PARKING')).collect()[0][0]
stores = stores.fillna({'PARKING': parking_mean}) 

In [0]:
# Recheck missing values 
products.select([count(when(col(c).isNull(), c)).alias(c) for c in products.columns]).show()
transactions.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions.columns]).show()
stores.select([count(when(col(c).isNull(), c)).alias(c) for c in stores.columns]).show()


+---+-----------+------------+--------+------------+------------+
|UPC|DESCRIPTION|MANUFACTURER|CATEGORY|SUB_CATEGORY|PRODUCT_SIZE|
+---+-----------+------------+--------+------------+------------+
|  0|          0|           0|       0|           0|           0|
+---+-----------+------------+--------+------------+------------+

+-------------+---------+---+-----+------+---+-----+-----+----------+-------+-------+--------+
|WEEK_END_DATE|STORE_NUM|UPC|UNITS|VISITS|HHS|SPEND|PRICE|BASE_PRICE|FEATURE|DISPLAY|TPR_ONLY|
+-------------+---------+---+-----+------+---+-----+-----+----------+-------+-------+--------+
|            0|        0|  0|    0|     0|  0|    0|    0|         0|      0|      0|       0|
+-------------+---------+---+-----+------+---+-----+-----+----------+-------+-------+--------+

+--------+----------+----+-----+---+-------+-------+----+------------------+
|STORE_ID|STORE_NAME|CITY|STATE|MSA|SEGMENT|PARKING|SIZE|AVG_WEEKLY_BASKETS|
+--------+----------+----+-----+---+---

In [0]:
# Join tables
# Join transactions with products on UPC
trans_products = transactions.join(products, on='UPC', how='inner')

# Rename STORE_ID in stores to STORE_NUM (to match transactions)
stores = stores.withColumnRenamed('STORE_ID', 'STORE_NUM')

# Join the result with stores on STORE_NUM
df = trans_products.join(stores, on='STORE_NUM', how='inner')

# Cache dataset
df.cache()

df.printSchema()
df.show(5)


root
 |-- STORE_NUM: integer (nullable = true)
 |-- UPC: long (nullable = true)
 |-- WEEK_END_DATE: string (nullable = true)
 |-- UNITS: integer (nullable = true)
 |-- VISITS: integer (nullable = true)
 |-- HHS: integer (nullable = true)
 |-- SPEND: double (nullable = true)
 |-- PRICE: double (nullable = true)
 |-- BASE_PRICE: double (nullable = true)
 |-- FEATURE: integer (nullable = true)
 |-- DISPLAY: integer (nullable = true)
 |-- TPR_ONLY: integer (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- MANUFACTURER: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- SUB_CATEGORY: string (nullable = true)
 |-- PRODUCT_SIZE: string (nullable = true)
 |-- STORE_NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- MSA: integer (nullable = true)
 |-- SEGMENT: string (nullable = true)
 |-- PARKING: integer (nullable = true)
 |-- SIZE: integer (nullable = true)
 |-- AVG_WEEKLY_BASKETS: integer (nullable 

## Step 4: Exploratory Data Analysis (EDA)

In [0]:
# Top 10 Best-Selling Prouducts by units
top_products = df.groupBy('DESCRIPTION')\
    .sum('UNITS')\
    .orderBy('sum(UNITS)', ascending=False)

top_products.show(10)

# Top 10 Busiest Stores by  # of Transactions
busiest_stores = df.groupBy('STORE_NAME')\
    .count()\
    .orderBy('count', ascending=False)

busiest_stores.show(10)

# Weekly Sales Trend
weekly_sales = df.groupBy('WEEK_END_DATE')\
    .sum('SPEND')\
    .orderBy('WEEK_END_DATE')

weekly_sales.show(10)



+--------------------+----------+
|         DESCRIPTION|sum(UNITS)|
+--------------------+----------+
|         GM CHEERIOS|    907571|
|GM HONEY NUT CHEE...|    828200|
|PL MINI TWIST PRE...|    687797|
| KELL FROSTED FLAKES|    579036|
|   PL PRETZEL STICKS|    547771|
|PL BT SZ FRSTD SH...|    477461|
|    KELL FROOT LOOPS|    471884|
|KELL BITE SIZE MI...|    407179|
|      PL RAISIN BRAN|    377635|
|QKER CAP N CRUNCH...|    344551|
+--------------------+----------+
only showing top 10 rows

+------------------+-----+
|        STORE_NAME|count|
+------------------+-----+
|           HOUSTON|26531|
|        MIDDLETOWN|14986|
|          ROCKWALL|13812|
|      FLOWER MOUND|13574|
|ANDERSON TOWNE CTR| 8045|
|         HYDE PARK| 8007|
|          BLUE ASH| 7968|
|        CINCINNATI| 7934|
|       LIBERTY TWP| 7895|
|           LEBANON| 7865|
+------------------+-----+
only showing top 10 rows

+-------------+------------------+
|WEEK_END_DATE|        sum(SPEND)|
+-------------+---------

In [0]:
# Visualizations
display(top_products)
display(busiest_stores)
display(weekly_sales)

DESCRIPTION,sum(UNITS)
GM CHEERIOS,907571
GM HONEY NUT CHEERIOS,828200
PL MINI TWIST PRETZELS,687797
KELL FROSTED FLAKES,579036
PL PRETZEL STICKS,547771
PL BT SZ FRSTD SHRD WHT,477461
KELL FROOT LOOPS,471884
KELL BITE SIZE MINI WHEAT,407179
PL RAISIN BRAN,377635
QKER CAP N CRUNCH BERRIES,344551


STORE_NAME,count
HOUSTON,26531
MIDDLETOWN,14986
ROCKWALL,13812
FLOWER MOUND,13574
ANDERSON TOWNE CTR,8045
HYDE PARK,8007
BLUE ASH,7968
CINCINNATI,7934
LIBERTY TWP,7895
LEBANON,7865


WEEK_END_DATE,sum(SPEND)
1-Apr-09,172916.45999999996
1-Dec-10,161237.46999999924
1-Jul-09,163263.78999999972
1-Jun-11,162016.9299999996
1-Sep-10,169089.7300000004
10-Aug-11,181691.0799999996
10-Feb-10,202372.6700000001
10-Jun-09,182451.07000000036
10-Mar-10,198871.7799999997
10-Nov-10,197864.8799999997


## Step 5: SparkSQL Queries

In [0]:
# Create df as a SQL table 
df.createOrReplaceTempView('sales_data')



In [0]:
# Run SQL Queries
# Most profitable products by spend value
spark.sql("""
SELECT DESCRIPTION, SUM(SPEND) AS total_spend
FROM sales_data
GROUP BY DESCRIPTION
ORDER BY total_spend DESC
LIMIT 10
""").show()


# Category that sells the most physically 
spark.sql("""
SELECT CATEGORY, SUM(UNITS) AS total_units_sold
FROM sales_data
GROUP BY CATEGORY
ORDER BY total_units_sold DESC
LIMIT 10
""").show()

# Biggest Weeks for total sales
spark.sql("""
SELECT WEEK_END_DATE, SUM(SPEND) AS weekly_revenue
FROM sales_data
GROUP BY WEEK_END_DATE
ORDER BY weekly_revenue DESC
LIMIT 10
""").show()


+--------------------+------------------+
|         DESCRIPTION|       total_spend|
+--------------------+------------------+
|         GM CHEERIOS| 2889043.689999998|
|GM HONEY NUT CHEE...|2110348.3599999994|
| KELL FROSTED FLAKES|1519959.8999999994|
|    DIGRN PEPP PIZZA|1475513.5799999996|
|KELL BITE SIZE MI...|        1249213.25|
|    KELL FROOT LOOPS|1185958.5700000005|
| DIGRN SUPREME PIZZA|1078161.3399999996|
|PL BT SZ FRSTD SH...|1011252.7899999996|
|POST HNY BN OTS H...| 984546.0300000001|
|PL MINI TWIST PRE...|         932626.46|
+--------------------+------------------+

+--------------------+----------------+
|            CATEGORY|total_units_sold|
+--------------------+----------------+
|         COLD CEREAL|         5978605|
|          BAG SNACKS|         2672353|
|        FROZEN PIZZA|         1382068|
|ORAL HYGIENE PROD...|          534511|
+--------------------+----------------+

+-------------+------------------+
|WEEK_END_DATE|    weekly_revenue|
+-------------+-----

## Step 6: Feature Engineering

In [0]:
# Convert categorical columns (Category, Store) to numeric
from pyspark.ml.feature import StringIndexer

category_index = StringIndexer(inputCol='CATEGORY', outputCol='CATEGORY_INDEX')
store_index = StringIndexer(inputCol='STORE_NAME', outputCol='STORE_INDEX')

# Apply indexers to df
df = category_index.fit(df).transform(df)
df = store_index.fit(df).transform(df)



In [0]:
# new engineered features 
df = df.withColumn('DISCOUNT_PCT', (col('BASE_PRICE') - col('PRICE')) / col('BASE_PRICE'))

# create promotion score feature
# identify products that have no promotion, single promotion (featured or displayed), or both
df = df.withColumn('PROMOTION_SCORE', col('FEATURE') + col('DISPLAY'))

In [0]:
# assemble features 
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[ 
        'STORE_INDEX',
        'DISCOUNT_PCT', # inclues base_price and price 
        'PROMOTION_SCORE' #includes feature and display 
    
    ],
    outputCol='features'
)

# Transform the final dataset
df = assembler.transform(df)


In [0]:
# check features
df.select('features').show(5, truncate=False)

+------------------------------+
|features                      |
+------------------------------+
|[63.0,0.11464968152866252,0.0]|
|[63.0,0.0,0.0]                |
|[63.0,0.0,0.0]                |
|[63.0,0.2204899777282851,0.0] |
|[63.0,0.0,0.0]                |
+------------------------------+
only showing top 5 rows



## Step 7: Machine Learning Models (SparkML)

In [0]:
# split data set
train_data, test_data = df.randomSplit([0.7, 0.3])



In [0]:
# M1 - Logistic Regression
from pyspark.ml.classification import LogisticRegression

# initialize 
lr = LogisticRegression(featuresCol='features', labelCol='CATEGORY_INDEX', maxIter=10)

# train 
lr_model = lr.fit(train_data)

# predict
lr_predictions = lr_model.transform(test_data)

# test/show predictions 
lr_predictions.select('CATEGORY_INDEX', 'prediction').show(50)

+--------------+----------+
|CATEGORY_INDEX|prediction|
+--------------+----------+
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       2.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           2.0|       2.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       2.0|
|           2.0|       2.0|
|           2.0|       0.0|
|           2.0|       2.0|
|           2.0|       0.0|
|           2.0|    

In [0]:
# Test M1
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol='CATEGORY_INDEX', predictionCol='prediction', metricName='accuracy')

accuracy = evaluator.evaluate(lr_predictions)
print(f"M1 Test Accuracy = {accuracy}")

#Results: m1 Test Accuracy = 1.0 = 100%, most likely due to strong features

M1 Test Accuracy = 0.3275738074867636


In [0]:
# M2 - Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='CATEGORY_INDEX', maxBins=128)
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)

dt_predictions.select('CATEGORY_INDEX', 'prediction').show(50)


+--------------+----------+
|CATEGORY_INDEX|prediction|
+--------------+----------+
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           2.0|       1.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       1.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       0.0|
|           2.0|       2.0|
|           2.0|       0.0|
|           2.0|       2.0|
|           2.0|       0.0|
|           2.0|    

In [0]:
# Test M2


In [0]:
# M3 - Random Forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='CATEGORY_INDEX', numTrees=20, maxBins=128)
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)

rf_predictions.select('CATEGORY_INDEX', 'prediction').show(5)


+--------------+----------+
|CATEGORY_INDEX|prediction|
+--------------+----------+
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
+--------------+----------+
only showing top 5 rows



In [0]:
# M4 - K Cluster
from pyspark.ml.clustering import KMeans

# Note: KMeans doesn't use label
kmeans = KMeans(featuresCol='features', k=5, seed=42)
kmeans_model = kmeans.fit(train_data)
kmeans_predictions = kmeans_model.transform(test_data)

kmeans_predictions.select('prediction').show(5)


+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 5 rows



## Step 8: Model Evaluations

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol='CATEGORY_INDEX', predictionCol='prediction', metricName='accuracy')

# Logistic Regression
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"M1 Test Accuracy = {lr_accuracy}")

# Decision Tree
dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"M2 Test Accuracy = {dt_accuracy}")

# Random Forest
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"M3 Test Accuracy = {rf_accuracy}")

# KMeans - no labels = no accuracy 
kmeans_predictions.select('prediction').show(5)





M1 Test Accuracy = 0.3275738074867636
M2 Test Accuracy = 0.3726766605909559
M3 Test Accuracy = 0.372434872100088
+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 5 rows

