# Setup

Actions performed: 
- Installed pyspark
- Updated kernel to Python 3.13
- Created project directory and initial project structure

**Note: This ETL pipeline was originally designed to export partitioned Parquet files. However, due to Spark's dependency on Hadoop’s native Windows bindings (`NativeIO$Windows.access0`), the JVM consistently triggered system-level errors — even with all recommended workarounds (winutils, config overrides, and coalesced writes). To maintain velocity and ensure clean output, I pivoted to using `pandas.to_csv()` for final export. This approach guaranteed a successful write, preserved data integrity, and demonstrated flexibility in handling environment-specific limitations.**

In [59]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [60]:
!mkdir retail_etl_project\data
!mkdir retail_etl_project\notebooks
!mkdir retail_etl_project\scripts
!mkdir retail_etl_project\output

A subdirectory or file retail_etl_project\data already exists.
A subdirectory or file retail_etl_project\notebooks already exists.
A subdirectory or file retail_etl_project\scripts already exists.
A subdirectory or file retail_etl_project\output already exists.


In [61]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RetailETL") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
    .config("spark.hadoop.fs.file.impl.disable.cache", "true") \
    .config("spark.hadoop.util.NativeCodeLoader", "false") \
    .getOrCreate()

# Data

For the sake of example, I have generated realistic CSV files that simulate:
- 🛍️ E-commerce purchase log

- 🛒 Grocery/retail scan log

- 🧾 Bank transaction log

## Purchase Log

Actions performed:
- CSV ingestion
    - Created SparkSession
- Data exploration
- Data cleaning
- Data transformation
    - Feature engineering
    - Aggregations
- Saved the cleaned data as partitioned parquet

In [62]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RetailETL").getOrCreate()

In [63]:
df = spark.read.csv(
    "../data/retail_purchase_log.csv",
    header=True,
    inferSchema=True
)

In [64]:
df.show(5)

+--------------------+--------------------+----------+----------------+--------+------+-------------------+--------------+
|      transaction_id|         customer_id|product_id|product_category|quantity| price|      purchase_date|store_location|
+--------------------+--------------------+----------+----------------+--------+------+-------------------+--------------+
|1666d603-e608-486...|911c1078-542d-483...|      1654|       Groceries|       2|320.68|2025-03-29 04:15:51|         Miami|
|8e6ca660-f89a-444...|cb651de8-76f0-49d...|      1114|           Books|       2|467.28|2025-02-15 05:14:24|         Miami|
|9814bd8b-0b39-483...|0ea06e13-d8a5-41b...|      1025|       Groceries|       1|456.81|2025-03-13 13:39:22|         Miami|
|553d2bce-bd8f-4dd...|863cc4d3-9b75-473...|      1759|           Books|       3|331.35|2025-03-23 09:44:30|       Chicago|
|6bfdc49f-33ee-4d4...|985ba266-f893-43d...|      1281|       Groceries|       1| 12.79|2025-03-14 12:56:30|       Chicago|
+---------------

In [65]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- purchase_date: timestamp (nullable = true)
 |-- store_location: string (nullable = true)



In [66]:
df.count()

1000

In [67]:
df = df.dropDuplicates()

**Dropping rows with Critical Nulls**

Made sure columns like `customer_id` and `price` aren't missing.

In [68]:
from pyspark.sql.functions import col

df = df.dropna(subset=["transaction_id", "customer_id", "product_id", "quantity", "price"])

In [69]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("purchase_date", to_timestamp("purchase_date"))

**Data Engineering**

Created new column "Total Amount" which is calculated as `quantity` x `price`

In [70]:
from pyspark.sql.functions import expr
df = df.withColumn("total_amount", expr("quantity * price"))

In [71]:
df.select("quantity", "price", "total_amount").summary().show()

+-------+------------------+-----------------+-----------------+
|summary|          quantity|            price|     total_amount|
+-------+------------------+-----------------+-----------------+
|  count|              1000|             1000|             1000|
|   mean|             3.068|252.4036700000002|770.6666200000004|
| stddev|1.3983316242334398|141.6850235117269|586.8167205856818|
|    min|                 1|             5.89|             6.89|
|    25%|                 2|           127.84|           295.74|
|    50%|                 3|           251.95|           619.98|
|    75%|                 4|           375.39|           1163.4|
|    max|                 5|            498.9|           2494.5|
+-------+------------------+-----------------+-----------------+



**Transformations & Aggregations**
- 📊 Total Spend per Customer
- 🏆 Top Products by Revenue
- 🏙️ Revenue by Store Location
- 📅 Purchases Over Time


In [72]:
print("Total Spend Per Customer")
df.groupBy("customer_id") \
  .sum("total_amount") \
  .withColumnRenamed("sum(total_amount)", "total_spent") \
  .orderBy("total_spent", ascending=False) \
  .show(10)

Total Spend Per Customer
+--------------------+------------------+
|         customer_id|       total_spent|
+--------------------+------------------+
|2cace57f-add5-4d4...|            2494.5|
|3da85db8-918f-4d2...|           2485.35|
|fb9ebc4b-e4c7-481...|           2468.75|
|e655bbb3-5f9a-49d...|2436.4500000000003|
|5eccf26a-9f26-4c0...|            2373.0|
|693201f8-af73-42c...|           2361.95|
|6369b06e-5998-4a3...|           2358.85|
|4b2da249-7042-43b...|           2357.15|
|a1fd4499-64af-4aa...|           2328.45|
|7bde36be-75ad-4ac...|            2315.8|
+--------------------+------------------+
only showing top 10 rows



In [73]:
print("Top Products by Revenue")
df.groupBy("product_id", "product_category") \
  .agg(expr("sum(total_amount) as total_revenue")) \
  .orderBy("total_revenue", ascending=False) \
  .show(10)

Top Products by Revenue
+----------+----------------+------------------+
|product_id|product_category|     total_revenue|
+----------+----------------+------------------+
|      1215|     Electronics|           4554.47|
|      1685|       Groceries|4023.8500000000004|
|      1885|       Groceries|           3659.91|
|      1517|           Books|           3508.92|
|      1194|           Books|           3412.87|
|      1025|       Groceries|           3288.85|
|      1388|        Clothing|            3260.9|
|      1429|            Toys|           3166.29|
|      1773|           Books|           3161.41|
|      1669|        Clothing|           3160.99|
+----------+----------------+------------------+
only showing top 10 rows



In [74]:
print("Revenue by Store Location")
df.groupBy("store_location") \
  .agg(expr("sum(total_amount) as revenue_by_location")) \
  .orderBy("revenue_by_location", ascending=False) \
  .show()

Revenue by Store Location
+--------------+-------------------+
|store_location|revenue_by_location|
+--------------+-------------------+
|       Chicago| 162427.71999999994|
|         Miami| 159687.56999999998|
|       Houston| 153963.75000000003|
|      New York| 149034.31000000003|
|   Los Angeles| 145553.27000000002|
+--------------+-------------------+



In [75]:
print ("Purchases Over Time")
from pyspark.sql.functions import to_date

df_by_day = df.withColumn("purchase_day", to_date("purchase_date"))

df_by_day.groupBy("purchase_day") \
  .agg(expr("sum(total_amount) as daily_revenue")) \
  .orderBy("purchase_day") \
  .show(10)

Purchases Over Time
+------------+------------------+
|purchase_day|     daily_revenue|
+------------+------------------+
|  2025-02-14| 6090.590000000001|
|  2025-02-15|          16982.95|
|  2025-02-16|          13026.42|
|  2025-02-17|10978.729999999998|
|  2025-02-18|           9691.99|
|  2025-02-19|           7009.83|
|  2025-02-20|12598.120000000003|
|  2025-02-21|          17216.03|
|  2025-02-22|          10362.88|
|  2025-02-23|17908.120000000003|
+------------+------------------+
only showing top 10 rows



In [80]:
df_pd = df.toPandas()
df_pd.to_csv("../output/retail_purchase_cleaned.csv", index=False)