In [0]:
from pyspark.sql import SparkSession

# 1) Initialize Spark session
# SparkSession is the entry point for working with Spark DataFrames
spark = SparkSession.builder.appName("ECommerceData").getOrCreate()

# 2) Load raw CSV dataset
# header=True ensures column names are inferred from first row
# inferSchema=True automatically detects column data types
df_raw = spark.read.csv(
    "/Volumes/workspace/default/online_retail_dataset/online_retail.csv", 
    header=True, 
    inferSchema=True
)

# 3) Inspect schema and first few rows to understand data structure
df_raw.printSchema()
df_raw.show(5)

# 4) Save raw dataset as Parquet for faster future access
# Parquet is an efficient columnar storage format widely used in industry
df_raw.write.mode("overwrite").parquet(
    "/Volumes/workspace/default/online_retail_dataset/online_retail_raw.parquet"
)


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365| 

In [0]:
df_raw.count()

541909

In [0]:
# Remove duplicate records to ensure data integrity
# This prevents double-counting in subsequent analyses
df_clean = df_raw.dropDuplicates()
df_clean.count()


536641

In [0]:
# Verify remaining missing values per column
# This helps ensure that all critical columns are complete before further analysis
from pyspark.sql.functions import col, sum

df_clean.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_clean.columns]).show()



+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135037|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [0]:
## Handle Missing Values
# 1) Fill missing product descriptions with "Unknown" to retain the row while marking incomplete data
df_clean = df_clean.fillna({"Description": "Unknown"})

# 2) Drop rows without CustomerID since these cannot be linked to any customer for analysis
df_clean = df_clean.na.drop(subset=["CustomerID"])

# 3) Verify remaining missing values per column after handling
df_clean.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_clean.columns]).show()




+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [0]:
df_clean.count()

401604

In [0]:
# Inspect missing (null) values in Quantity and UnitPrice columns
# This helps identify incomplete transactions that may need handling before analysis
from pyspark.sql.functions import col,count,when

from pyspark.sql.functions import col, when, count

df_clean.select(
    count(when(col("Quantity").isNull(), True)).alias("Missing_Quantity"),
    count(when(col("UnitPrice").isNull(), True)).alias("Missing_UnitPrice")
).show()


+----------------+-----------------+
|Missing_Quantity|Missing_UnitPrice|
+----------------+-----------------+
|               0|                0|
+----------------+-----------------+



In [0]:
# Check for negative values in Quantity and UnitPrice columns 
# to identify any invalid transactions before cleaning the data

from pyspark.sql.functions import count,col,when

df_clean.select(
    count(when(col("Quantity") < 0, True)).alias("Negative_Quantity"),
    count(when(col("UnitPrice") < 0, True)).alias("Negative_UnitPrice")
).show()




+-----------------+------------------+
|Negative_Quantity|Negative_UnitPrice|
+-----------------+------------------+
|             8872|                 0|
+-----------------+------------------+



In [0]:
# Drop rows with negative Quantity values to ensure data integrity, 
# as negative quantities are invalid for sales transaction analysis
from pyspark.sql.functions import col
df_clean=df_clean.filter(col("Quantity")>=0)
df_clean.count()

392732

In [0]:
# Convert CustomerID to string type to ensure consistent data type for grouping and analysis,
# which is important for accurate aggregation and downstream processing

from pyspark.sql.functions import col
from pyspark.sql.types import StringType

df_clean=df_clean.withColumn("CustomerID",col("CustomerID").cast(StringType()))
df_clean.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
## Duplicate Count
duplicate_count = df_clean.count() - df_clean.dropDuplicates().count()
print(f"Duplicate Count : {duplicate_count}")

Duplicate Count : 0


In [0]:
# 1) Aggregated total transaction value per customer by summing Quantity * UnitPrice,
# providing insights into customer-level revenue contribution and enabling customer segmentation analysis

from pyspark.sql.functions import sum,col
total_transactions = df_clean.groupBy("CustomerID")\
                     .agg(sum(col("Quantity")* col("UnitPrice")).alias("Total_Transactions_Per_Customer"))
total_transactions.show(5)

+----------+-------------------------------+
|CustomerID|Total_Transactions_Per_Customer|
+----------+-------------------------------+
|   14165.0|             120.44000000000003|
|   17376.0|                          203.2|
|   17928.0|                         212.54|
|   16527.0|                         228.06|
|   13721.0|                         524.14|
+----------+-------------------------------+
only showing top 5 rows


In [0]:
# 2) Calculated total transaction value for each invoice by multiplying UnitPrice with Quantity,
# enabling analysis of individual transaction contributions to overall revenue

from pyspark.sql.functions import col

total_transactions_value= df_clean.withColumn("total_transactions",col("UnitPrice") * col ("Quantity"))
total_transactions_value.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|total_transactions|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|   536615|    22634|CHILDS BREAKFAST ...|       2|2010-12-02 10:09:00|     9.95|   14047.0|United Kingdom|              19.9|
|   536638|    22297|HEART IVORY TRELL...|      24|2010-12-02 11:41:00|     1.25|   16244.0|United Kingdom|              30.0|
|   536745|    22737|RIBBON REEL CHRIS...|      10|2010-12-02 13:38:00|     1.65|   17235.0|United Kingdom|              16.5|
|   536412|   85049E|SCANDINAVIAN REDS...|       3|2010-12-01 11:49:00|     1.25|   17920.0|United Kingdom|              3.75|
|   536415|    22594|CHRISTMAS GINGHAM...|       5|2010-12-01 11:57:00|     0.85|   12838.0|United Kingdom|    

In [0]:
# 3) Calculated average transaction value per customer to assess customer purchasing behavior and support segmentation strategies
from pyspark.sql.functions import avg

avg_transactional_value=total_transactions_value.groupBy("CustomerID").avg("total_transactions").alias("avg_transactions")
avg_transactional_value.show(5)

+----------+-----------------------+
|CustomerID|avg(total_transactions)|
+----------+-----------------------+
|   14165.0|      5.018333333333334|
|   17376.0|      8.466666666666667|
|   17928.0|      9.240869565217391|
|   16527.0|                  16.29|
|   13721.0|     18.719285714285714|
+----------+-----------------------+
only showing top 5 rows


In [0]:
# 4) Aggregated transactional sales data to calculate total revenue per country, providing insights for regional performance analysis
from pyspark.sql.functions import sum,col

total_revenue_per_country=df_clean.groupBy("Country")\
    .agg(sum(col("UnitPrice")* col("Quantity")).alias("Total_Revenue_Per_Country"))
total_revenue_per_country.orderBy(col("Total_Revenue_Per_Country").desc()).show(5)

+--------------+-------------------------+
|       Country|Total_Revenue_Per_Country|
+--------------+-------------------------+
|United Kingdom|        7285024.643999968|
|   Netherlands|                285446.34|
|          EIRE|       265262.46000000014|
|       Germany|        228678.3999999999|
|        France|       208934.30999999985|
+--------------+-------------------------+
only showing top 5 rows


In [0]:
# 1) Calculating Recency for each customer:
# Recency measures the number of days since a customer's last purchase.
# This metric is critical in industry to identify active vs. inactive customers.
# Lower recency indicates more recent engagement, which can inform retention campaigns
# and targeted marketing strategies.
from pyspark.sql.functions import max,col,datediff, lit
# Step 1: Find the latest purchase date in the dataset
max_date= df_clean.select(max("InvoiceDate")).collect()[0][0]
max_date
# Step 2(a): Find the most recent purchase date for each customer
last_purchase_df = df_clean.groupBy("CustomerID").agg(max("InvoiceDate").alias("LastPurchaseDate"))
last_purchase_df.show(5)
# Step 2()b:Calculate Recency: days since last purchase
recency_df =last_purchase_df.withColumn("Recency",datediff(lit(max_date),col("LastPurchaseDate")))
recency_df .show(5)



+----------+-------------------+
|CustomerID|   LastPurchaseDate|
+----------+-------------------+
|   13502.0|2011-11-23 10:39:00|
|   13064.0|2011-11-27 12:31:00|
|   16554.0|2011-10-28 14:51:00|
|   15160.0|2010-12-17 14:15:00|
|   13225.0|2011-12-06 13:29:00|
+----------+-------------------+
only showing top 5 rows
+----------+-------------------+-------+
|CustomerID|   LastPurchaseDate|Recency|
+----------+-------------------+-------+
|   13502.0|2011-11-23 10:39:00|     16|
|   13064.0|2011-11-27 12:31:00|     12|
|   16554.0|2011-10-28 14:51:00|     42|
|   15160.0|2010-12-17 14:15:00|    357|
|   13225.0|2011-12-06 13:29:00|      3|
+----------+-------------------+-------+
only showing top 5 rows


In [0]:
# 2) Aggregated customer purchase data to calculate transaction frequency per customer, 
# providing insights into customer loyalty and engagement patterns for targeted retention strategies.
from pyspark.sql.functions import count, col
frequency_df = df_clean.groupBy("CustomerID") \
                       .agg(count("InvoiceNo").alias("Frequency"))
# Show top 5 customers by frequency
frequency_df.orderBy(col("Frequency").desc()).show(5)

+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|   17841.0|     7676|
|   14911.0|     5672|
|   14096.0|     5111|
|   12748.0|     4413|
|   14606.0|     2677|
+----------+---------+
only showing top 5 rows


In [0]:
# 3) Calculated Monetary Value (total revenue generated by each customer).
# This metric is widely used in industry to measure customer lifetime value,
# helping businesses identify high-revenue customers and prioritize retention/upselling strategies.

from pyspark.sql.functions import sum,col

monetary_df=df_clean.groupBy("CustomerID")\
    .agg(sum(col("Quantity") * col("UnitPrice")).alias("total_revenue_per_customer"))
monetary_df.show(5)

+----------+--------------------------+
|CustomerID|total_revenue_per_customer|
+----------+--------------------------+
|   14165.0|        120.44000000000003|
|   17376.0|                     203.2|
|   17928.0|                    212.54|
|   16527.0|                    228.06|
|   13721.0|                    524.14|
+----------+--------------------------+
only showing top 5 rows


In [0]:
## Step 1: Combine R, F, M into one dataframe

rfm_df=recency_df.join(frequency_df,on="CustomerID",how="inner")\
    .join(monetary_df,on="CustomerID",how="inner")
rfm_df.show(5)

+----------+-------------------+-------+---------+--------------------------+
|CustomerID|   LastPurchaseDate|Recency|Frequency|total_revenue_per_customer|
+----------+-------------------+-------+---------+--------------------------+
|   14165.0|2011-03-09 12:39:00|    275|       24|        120.44000000000003|
|   17376.0|2011-09-30 12:36:00|     70|       24|                     203.2|
|   17928.0|2011-10-25 13:52:00|     45|       23|                    212.54|
|   16527.0|2011-09-19 16:21:00|     81|       14|                    228.06|
|   13721.0|2011-11-03 14:05:00|     36|       28|                    524.14|
+----------+-------------------+-------+---------+--------------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile, col

recency_window=Window.orderBy(col("Recency").desc())
rfm_df =rfm_df.withColumn("R_Score",ntile(5).over(recency_window))
rfm_df.show(5)


frequency_window=Window.orderBy(col("Frequency").asc())
rfm_df =rfm_df.withColumn("F_Score",ntile(5).over(frequency_window))
rfm_df.show(5) 

monetary_window=Window.orderBy(col("total_revenue_per_customer").asc())
rfm_df =rfm_df.withColumn("M_Score",ntile(5).over(monetary_window))
rfm_df.show(5) 




+----------+-------------------+-------+---------+--------------------------+-------+
|CustomerID|   LastPurchaseDate|Recency|Frequency|total_revenue_per_customer|R_Score|
+----------+-------------------+-------+---------+--------------------------+-------+
|   17908.0|2010-12-01 11:45:00|    373|       54|        232.02999999999997|      1|
|   16048.0|2010-12-01 15:28:00|    373|        8|        256.44000000000005|      1|
|   17968.0|2010-12-01 12:23:00|    373|       81|        265.09999999999997|      1|
|   14729.0|2010-12-01 12:43:00|    373|       71|                    313.49|      1|
|   18011.0|2010-12-01 17:35:00|    373|       28|                    102.79|      1|
+----------+-------------------+-------+---------+--------------------------+-------+
only showing top 5 rows
+----------+-------------------+-------+---------+--------------------------+-------+-------+
|CustomerID|   LastPurchaseDate|Recency|Frequency|total_revenue_per_customer|R_Score|F_Score|
+----------+--

In [0]:
# Combine into a segment string
from pyspark.sql.functions import concat_ws
rfm_df = rfm_df.withColumn("RFM_Segment", concat_ws("-", col("R_Score"), col("F_Score"), col("M_Score")))

rfm_df.select("CustomerID", "Recency", "Frequency", "total_revenue_per_customer",
              "R_Score", "F_Score", "M_Score", "RFM_Segment").show(10)



+----------+-------+---------+--------------------------+-------+-------+-------+-----------+
|CustomerID|Recency|Frequency|total_revenue_per_customer|R_Score|F_Score|M_Score|RFM_Segment|
+----------+-------+---------+--------------------------+-------+-------+-------+-----------+
|   13256.0|     14|        1|                       0.0|      4|      1|      1|      4-1-1|
|   16738.0|    297|        1|                      3.75|      1|      1|      1|      1-1-1|
|   14792.0|     63|        2|                       6.2|      3|      1|      1|      3-1-1|
|   16454.0|     44|        2|                       6.9|      3|      1|      1|      3-1-1|
|   17956.0|    249|        1|                     12.75|      1|      1|      1|      1-1-1|
|   16878.0|     84|        3|                      13.3|      2|      1|      1|      2-1-1|
|   15823.0|    372|        1|                      15.0|      1|      1|      1|      1-1-1|
|   17763.0|    263|        1|                      15.0|   

In [0]:
from pyspark.sql.functions import col, expr

# Define weights (industry often prioritizes Recency a bit higher)
weight_R = 0.5
weight_F = 0.3
weight_M = 0.2

# Calculate weighted RFM score
rfm_df = rfm_df.withColumn(
    "RFM_Score",
    (col("R_Score") * weight_R) +
    (col("F_Score") * weight_F) +
    (col("M_Score") * weight_M)
)

# Show top 10 customers by RFM_Score
rfm_df.select("CustomerID", "R_Score", "F_Score", "M_Score", "RFM_Segment", "RFM_Score") \
      .orderBy(col("RFM_Score").desc()) \
      .show(10, truncate=False)




+----------+-------+-------+-------+-----------+---------+
|CustomerID|R_Score|F_Score|M_Score|RFM_Segment|RFM_Score|
+----------+-------+-------+-------+-----------+---------+
|13755.0   |5      |5      |5      |5-5-5      |5.0      |
|13126.0   |5      |5      |5      |5-5-5      |5.0      |
|16407.0   |5      |5      |5      |5-5-5      |5.0      |
|12856.0   |5      |5      |5      |5-5-5      |5.0      |
|17861.0   |5      |5      |5      |5-5-5      |5.0      |
|13571.0   |5      |5      |5      |5-5-5      |5.0      |
|12518.0   |5      |5      |5      |5-5-5      |5.0      |
|14696.0   |5      |5      |5      |5-5-5      |5.0      |
|18044.0   |5      |5      |5      |5-5-5      |5.0      |
|12935.0   |5      |5      |5      |5-5-5      |5.0      |
+----------+-------+-------+-------+-----------+---------+
only showing top 10 rows


In [0]:
# Register RFM DataFrame as a temporary SQL view
rfm_df.createOrReplaceTempView("rfm_table")

# Example 1: Count of customers per RFM segment
rfm_count = spark.sql("""
SELECT RFM_Segment, COUNT(*) AS CustomerCount
FROM rfm_table
GROUP BY RFM_Segment
ORDER BY CustomerCount DESC
""")
rfm_count.show(10)




+-----------+-------------+
|RFM_Segment|CustomerCount|
+-----------+-------------+
|      5-5-5|          309|
|      1-1-1|          260|
|      4-5-5|          161|
|      1-2-2|          160|
|      4-4-4|          120|
|      2-1-1|          119|
|      2-3-3|          111|
|      2-2-2|          102|
|      3-3-3|           99|
|      3-4-4|           92|
+-----------+-------------+
only showing top 10 rows
