## Task 2: Create Enriched Tables

This notebook demonstrates how to use the enrichment functions from `load_enriched_table.py` to create enriched customer and product tables.

### Enrichments Covered:
- **Customer Enrichments:**
  - Total Sales, Profit, and Orders
  - Average Order Value
  - Product Diversity
  - Churn Risk Score
- **Product Enrichments:**
  - Price Positioning
  - Inventory Velocity

### 0. Environment Setup and Data Loading

In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Add project root to path for module imports
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

from src.spark_session import get_spark_session
from src.load_source_data import load_customer_data, load_orders_data, load_products_data

# Initialize Spark session
spark = get_spark_session("EnrichmentNotebook")

# Load Raw Data
customers_df = load_customer_data(spark, os.path.join(PROJECT_ROOT, "data", "Customer.xlsx"))
orders_df = load_orders_data(spark, os.path.join(PROJECT_ROOT, "data", "Orders.json"))
products_df = load_products_data(spark, os.path.join(PROJECT_ROOT, "data", "Products.csv"))

print("Data loaded.")

25/12/01 13:58:34 WARN Utils: Your hostname, Kushals-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.136 instead (on interface en0)
25/12/01 13:58:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/01 13:58:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/01 13:58:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/12/01 13:58:35 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.



Checking file at: /Users/kushalsenlaskar/Documents/E-commerce Sales Data/data/Customer.xlsx
File found. Loading Excel data using Spark...
Customer data loaded successfully

Checking file at: /Users/kushalsenlaskar/Documents/E-commerce Sales Data/data/Orders.json
File found. Loading JSON data using Spark...
Orders data loaded successfully

Checking file at: /Users/kushalsenlaskar/Documents/E-commerce Sales Data/data/Products.csv
File found. Loading CSV data using Spark...
Products data loaded successfully
Data loaded.


### 1. Customer Enrichment

In [2]:
import importlib
import src.load_enriched_table
importlib.reload(src.load_enriched_table)

from src.load_enriched_table import (
    summarize_customer_spending,
    calculate_average_basket_size,
    measure_customer_product_variety,
    identify_at_risk_customers,
    classify_product_price_level,
    identify_fast_and_slow_sellers
)

# Create Enriched Customer Table
print("Creating Enriched Customer Table...")

# Columns added: total_sales, total_profit, total_orders.
# Description: Summarizes lifetime value and engagement.
enriched_customers_df = summarize_customer_spending(customers_df, orders_df)

print(f"\nEnriched customers table created with {enriched_customers_df.count()} rows\n")
enriched_customers_df.printSchema()
enriched_customers_df.take(5)

Creating Enriched Customer Table...

--- Cleaning Customers Data for Enrichment ---


                                                                                

Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing duplicate Customer IDs...
   Records with duplicate Customer ID removed: 0
Customers data cleaning completed
  Original records: 793, After cleaning: 793
  Total records removed: 0

--- Cleaning Orders Data for Enrichment ---
Removing records with negative Profit...
   Records with negative Profit: 1870
Removing records with NULL Order ID...
   Records with NULL Order ID: 0
   Removing duplicate Order IDs...
   Records with duplicate Order ID removed: 3697
Validating date formats...
   Records with invalid date format: 0
Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Orders data cleaning completed
  Original records: 9994, After cleaning: 4427
  Total records removed: 5567

Enriched customers table created with 793 rows

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string 

[Row(Customer ID='AA-10315', Customer Name='Alex Avila', email='josephrice131@gmail.com', phone='680-261-2092', address='91773 Miller Shoal\nDiaztown, FL 38841', Segment='Consumer', Country='United States', City='Round Rock', State='Texas', Postal Code='78664', Region='Central', total_sales=499.15999999999997, total_profit=124.56, total_orders=5),
 Row(Customer ID='AA-10375', Customer Name='Allen Armold', email='garymoore386@gmail.com', phone='221.945.4191x8872', address='6450 John Lodge\nTerriton, KY 95945', Segment='Consumer', Country='United States', City='Atlanta', State='Georgia', Postal Code='30318', Region='South', total_sales=744.0500000000002, total_profit=178.07999999999998, total_orders=9),
 Row(Customer ID='AA-10480', Customer Name='Andrew Allen', email='johnwalker944@gmail.com', phone='388-142-4883x5370', address='27265 Murray Island\nKevinfort, PA 63231', Segment='Consumer', Country='United States', City='Concord', State='North Carolina', Postal Code='28027', Region='Sout

In [3]:
from pyspark.sql.functions import round, col, desc 

print("Calculating average basket size for each customer...")

# Columns added: average_order_value.
# Description: Calculates the average spending per order.
customer_basket_size_df = calculate_average_basket_size(enriched_customers_df, orders_df)

customer_basket_size_df.select(
    col("Customer ID"),
    col("Customer Name"),
    round(col("average_order_value"), 2).alias("average_order_value")
).orderBy(desc("average_order_value")).take(5)

Calculating average basket size for each customer...

--- Cleaning Orders Data for Enrichment ---
Removing records with negative Profit...
   Records with negative Profit: 1870
Removing records with NULL Order ID...
   Records with NULL Order ID: 0
   Removing duplicate Order IDs...
   Records with duplicate Order ID removed: 3697
Validating date formats...
   Records with invalid date format: 0
Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Orders data cleaning completed
  Original records: 9994, After cleaning: 4427
  Total records removed: 5567


[Row(Customer ID='TC-20980', Customer Name='Tamara Chand', average_order_value=3533.39),
 Row(Customer ID='DW-13585', Customer Name='             Dorothy Wardle', average_order_value=3043.21),
 Row(Customer ID='HL-15040', Customer Name='Hunter Lopez', average_order_value=1839.54),
 Row(Customer ID='AB-10105', Customer Name='Adrian Barton', average_order_value=1463.22),
 Row(Customer ID='TB-21400', Customer Name='Tom Boeckenhauer', average_order_value=1395.75)]

In [4]:
from pyspark.sql.functions import col, desc

print("Analyzing product variety per customer...")

# Columns added: unique_products_purchased.
# Description: Counts the number of unique products purchased.
customer_product_variety_df = measure_customer_product_variety(customer_basket_size_df, orders_df)

customer_product_variety_df.select(
    col("Customer ID"),
    col("Customer Name"),
    col("unique_products_purchased")
).orderBy(desc("unique_products_purchased")).take(5)

Analyzing product variety per customer...

--- Cleaning Orders Data for Enrichment ---
Removing records with negative Profit...
   Records with negative Profit: 1870
Removing records with NULL Order ID...
   Records with NULL Order ID: 0
   Removing duplicate Order IDs...
   Records with duplicate Order ID removed: 3697
Validating date formats...
   Records with invalid date format: 0
Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Orders data cleaning completed
  Original records: 9994, After cleaning: 4427
  Total records removed: 5567


[Row(Customer ID='EH-13765', Customer Name='Edward Hooks', unique_products_purchased=12),
 Row(Customer ID='SH-19975', Customer Name='NaN', unique_products_purchased=12),
 Row(Customer ID='EP-13915', Customer Name='Emily Phan', unique_products_purchased=12),
 Row(Customer ID='KL-16645', Customer Name='Ken Lonsdale', unique_products_purchased=12),
 Row(Customer ID='CS-12250', Customer Name='Chris Selesnick', unique_products_purchased=12)]

In [5]:
from pyspark.sql.functions import col, desc

print("Identifying at-risk customers based on purchase order...")

# Columns added: days_since_last_order, churn_risk_score.
# Description: Identifies customers at risk of churning.
at_risk_customers_df = identify_at_risk_customers(customer_product_variety_df, orders_df)

at_risk_customers_df.select(
    col("Customer ID"),
    col("Customer Name"),
    col("days_since_last_order"),
    col("churn_risk_score")
).where(col("days_since_last_order").isNotNull()).orderBy(desc("days_since_last_order")).take(5)

Identifying at-risk customers based on purchase order...

--- Cleaning Orders Data for Enrichment ---
Removing records with negative Profit...
   Records with negative Profit: 1870
Removing records with NULL Order ID...
   Records with NULL Order ID: 0
   Removing duplicate Order IDs...
   Records with duplicate Order ID removed: 3697
Validating date formats...
   Records with invalid date format: 0
Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Orders data cleaning completed
  Original records: 9994, After cleaning: 4427
  Total records removed: 5567


[]

In [6]:
# Final enriched customers dataframe 
final_enriched_customers_df=at_risk_customers_df

final_enriched_customers_df.printSchema()
final_enriched_customers_df.take(5)

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- total_sales: double (nullable = true)
 |-- total_profit: double (nullable = true)
 |-- total_orders: long (nullable = true)
 |-- average_order_value: double (nullable = true)
 |-- unique_products_purchased: long (nullable = true)
 |-- days_since_last_order: integer (nullable = true)
 |-- churn_risk_score: string (nullable = true)



[Row(Customer ID='AA-10315', Customer Name='Alex Avila', email='josephrice131@gmail.com', phone='680-261-2092', address='91773 Miller Shoal\nDiaztown, FL 38841', Segment='Consumer', Country='United States', City='Round Rock', State='Texas', Postal Code='78664', Region='Central', total_sales=499.15999999999997, total_profit=124.56, total_orders=5, average_order_value=99.832, unique_products_purchased=5, days_since_last_order=None, churn_risk_score='Low Risk'),
 Row(Customer ID='AA-10375', Customer Name='Allen Armold', email='garymoore386@gmail.com', phone='221.945.4191x8872', address='6450 John Lodge\nTerriton, KY 95945', Segment='Consumer', Country='United States', City='Atlanta', State='Georgia', Postal Code='30318', Region='South', total_sales=744.0500000000002, total_profit=178.07999999999998, total_orders=9, average_order_value=82.67222222222225, unique_products_purchased=9, days_since_last_order=None, churn_risk_score='Low Risk'),
 Row(Customer ID='AA-10480', Customer Name='Andrew

In [7]:
from src.load_enriched_table import validate_enriched_customer_data

print("Running data quality checks on enriched customer data...")
validate_enriched_df=validate_enriched_customer_data(final_enriched_customers_df)
validate_enriched_df.take(5)
print("\nCustomer data validation complete.")

Running data quality checks on enriched customer data...

Data quality checks for enriched customer data passed successfully.

Customer data validation complete.


### 2. Product Enrichment

In [10]:
print("Classifying product price levels...")

# Columns added: price_position.
# Description: Classifies product price as Premium, Mid-Range, or Budget within its category.
product_price_level_df = classify_product_price_level(products_df)

product_price_level_df.take(5)

Classifying product price levels...

--- Cleaning Products Data for Enrichment ---
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Removing duplicate Product IDs...
   Records with duplicate Product ID removed: 33
Products data cleaning completed
  Original records: 1851, After cleaning: 1818
  Total records removed: 33


[Row(Product ID='FUR-FU-10000260', Category='Furniture', Sub-Category='Furnishings', Product Name='"6"" Cubicle Wall Clock', State=' Black"', Price per product=None, price_tier=1, price_position='Budget'),
 Row(Product ID='FUR-FU-10000305', Category='Furniture', Sub-Category='Furnishings', Product Name='"Tenex V2T-RE Standard Weight Series Chair Mat, 45"" x 53""', State=' Lip 25"" x 12"""', Price per product=None, price_tier=1, price_position='Budget'),
 Row(Product ID='FUR-FU-10001475', Category='Furniture', Sub-Category='Furnishings', Product Name='Contract Clock, 14"', State=' Brown"', Price per product=None, price_tier=1, price_position='Budget'),
 Row(Product ID='FUR-FU-10001488', Category='Furniture', Sub-Category='Furnishings', Product Name='"Tenex 46"" x 60"" Computer Anti-Static Chairmat', State=' Rectangular Shaped"', Price per product=None, price_tier=1, price_position='Budget'),
 Row(Product ID='FUR-FU-10001602', Category='Furniture', Sub-Category='Furnishings', Product Nam

In [11]:
print("Identifying fast and slow-selling products...")

# Columns added: total_quantity_sold, inventory_velocity.
# Description: Classifies products as Fast, Medium, or Slow moving.
product_sales_velocity_df = identify_fast_and_slow_sellers(product_price_level_df, orders_df)

product_sales_velocity_df.take(5)

Identifying fast and slow-selling products...

--- Cleaning Products Data for Enrichment ---
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Removing duplicate Product IDs...
   Records with duplicate Product ID removed: 0
Products data cleaning completed
  Original records: 1818, After cleaning: 1818
  Total records removed: 0

--- Cleaning Orders Data for Enrichment ---
Removing records with negative Profit...
   Records with negative Profit: 1870
Removing records with NULL Order ID...
   Records with NULL Order ID: 0
   Removing duplicate Order IDs...
   Records with duplicate Order ID removed: 3697
Validating date formats...
   Records with invalid date format: 0
Removing records with NULL Customer ID...
   Records with NULL Customer ID: 0
Removing records with NULL Product ID...
   Records with NULL Product ID: 0
Orders data cleaning completed
  Original records: 9994, After cleaning: 4427
  Total records removed: 5567


[Row(Product ID='FUR-BO-10000112', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Birmingham Collection Bookcase, Dark Cherry', State='Illinois', Price per product=91.686, price_tier=3, price_position='Mid-Range', total_quantity_sold=0, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000330', Category='Furniture', Sub-Category='Bookcases', Product Name='Sauder Camden County Barrister Bookcase, Planked Cherry Finish', State='Louisiana', Price per product=120.98, price_tier=3, price_position='Mid-Range', total_quantity_sold=2, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000362', Category='Furniture', Sub-Category='Bookcases', Product Name='Sauder Inglewood Library Bookcases', State='California', Price per product=145.333, price_tier=4, price_position='Premium', total_quantity_sold=3, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000468', Category='Furniture', Sub-Category='Bookcases', Product Name="O'Sullivan 2-Shelf Heavy-

In [12]:
final_enriched_product_df=product_sales_velocity_df

final_enriched_product_df.printSchema()
final_enriched_product_df.take(5)

root
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Price per product: double (nullable = false)
 |-- price_tier: integer (nullable = false)
 |-- price_position: string (nullable = false)
 |-- total_quantity_sold: long (nullable = true)
 |-- inventory_velocity: string (nullable = false)



[Row(Product ID='FUR-BO-10000112', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Birmingham Collection Bookcase, Dark Cherry', State='Illinois', Price per product=91.686, price_tier=3, price_position='Mid-Range', total_quantity_sold=0, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000330', Category='Furniture', Sub-Category='Bookcases', Product Name='Sauder Camden County Barrister Bookcase, Planked Cherry Finish', State='Louisiana', Price per product=120.98, price_tier=3, price_position='Mid-Range', total_quantity_sold=2, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000362', Category='Furniture', Sub-Category='Bookcases', Product Name='Sauder Inglewood Library Bookcases', State='California', Price per product=145.333, price_tier=4, price_position='Premium', total_quantity_sold=3, inventory_velocity='Slow Moving'),
 Row(Product ID='FUR-BO-10000468', Category='Furniture', Sub-Category='Bookcases', Product Name="O'Sullivan 2-Shelf Heavy-

In [None]:
from src.load_enriched_table import validate_enriched_product_data

print("Running data quality checks on enriched product data...")
validate_enriched_product_data(final_enriched_product_df)
print("\nProduct data validation complete.")

### 3. Creation of Enriched Views

In [13]:
# Create temporary views
final_enriched_customers_df.createOrReplaceTempView("enriched_customers_view")
final_enriched_product_df.createOrReplaceTempView("enriched_products_view")

print("Temporary views 'enriched_customers_view' and 'enriched_products_view' created.")

Temporary views 'enriched_customers_view' and 'enriched_products_view' created.


### 4. Displaying data from Enriched Views

In [14]:
print("Enriched Customer View...\n")
spark.sql("SELECT * FROM enriched_customers_view LIMIT 5").show()

print("\nEnriched Product View...")
spark.sql("SELECT * FROM enriched_products_view LIMIT 5").show()

Enriched Customer View...

+-----------+-------------+--------------------+-----------------+--------------------+--------+-------------+----------+--------------+-----------+-------+------------------+------------------+------------+-------------------+-------------------------+---------------------+----------------+
|Customer ID|Customer Name|               email|            phone|             address| Segment|      Country|      City|         State|Postal Code| Region|       total_sales|      total_profit|total_orders|average_order_value|unique_products_purchased|days_since_last_order|churn_risk_score|
+-----------+-------------+--------------------+-----------------+--------------------+--------+-------------+----------+--------------+-----------+-------+------------------+------------------+------------+-------------------+-------------------------+---------------------+----------------+
|   AA-10315|   Alex Avila|josephrice131@gma...|     680-261-2092|91773 Miller Shoa...|Consume