# Retail Data Wrangling and Analytics

In [0]:
file_location = "/FileStore/tables/retail.csv"

retail_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_location)

retail_df.printSchema()


root
 |-- invoice_no: string (nullable = true)
 |-- stock_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- country: string (nullable = true)



In [0]:
retail_df.show()

+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|    489434|     85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00|      6.95|      13085|United Kingdom|
|    489434|    79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00|      6.75|      13085|United Kingdom|
|    489434|    79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00|      6.75|      13085|United Kingdom|
|    489434|     22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|       2.1|      13085|United Kingdom|
|    489434|     21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00|      1.25|      13085|United Kingdom|
|    489434|     22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00|      1.65|      13085|United K

# Total Invoice Amount Distribution

In [0]:
from pyspark.sql.functions import col, round, sum as _sum

# Calculate amount
amount_df = retail_df.withColumn("amount", col("quantity") * col("unit_price"))

# Group by invoice_no and round + alias 
amount_df = amount_df.groupBy("invoice_no").agg(
    round(_sum("amount"), 2).alias("total_amount")
).filter(col("total_amount") > 0)

amount_df = amount_df.orderBy(col("invoice_no").asc())

quantile_85 = amount_df.approxQuantile("total_amount", [0.85], 0.01)[0]

amount_df_85 = amount_df.filter(col("total_amount") < quantile_85)


In [0]:
from pyspark.sql.functions import min, max, median, mode, mean

def show_distribution(amounts):
    min_value = amounts.select(min("total_amount").alias("min_amount")).collect()[0]["min_amount"]
    max_value = amounts.select(max("total_amount").alias("max_amount")).collect()[0]["max_amount"]
    median_value = amounts.select(median("total_amount").alias("median")).collect()[0]["median"]
    mode_value = amounts.select(mode("total_amount").alias("mode")).collect()[0]["mode"]
    mean_value = amounts.select(mean("total_amount").alias("mean")).collect()[0]["mean"] 

    print('Minimum:{:.2f}\nMean:{:.2f}\nMedian:{:.2f}\nMode:{:.2f}\nMaximum:{:.2f}\n'.format(min_value,mean_value,median_value,mode_value,max_value))

    display(amounts)

total_amount = amount_df.select("total_amount")

show_distribution(total_amount)

Minimum:0.19
Mean:523.30
Median:304.31
Mode:15.00
Maximum:168469.60



total_amount
505.3
145.8
630.33
310.75
2286.24
426.3
50.4
344.34
382.37
285.06


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
total_amount = amount_df_85.select("total_amount")

show_distribution(total_amount)

Minimum:0.19
Mean:266.91
Median:252.96
Mode:15.00
Maximum:692.84



total_amount
505.3
145.8
630.33
310.75
426.3
50.4
344.34
382.37
285.06
141.0


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

# Monthly Placed and Canceled Orders

In [0]:
from pyspark.sql.functions import col, countDistinct, date_format

cancelation_df = retail_df.withColumn("is_canceled", col("invoice_no").startswith('C'))
cancelation_df = cancelation_df.withColumn("ym", date_format(col("invoice_date"), "yyyyMM"))
# Calculate amount
amount_df = retail_df.withColumn("amount", col("quantity") * col("unit_price"))
amount_df = amount_df.withColumn("ym", date_format(col("invoice_date"), "yyyyMM"))

# Filter out placed invoices and count distinct invoice_no
monthly_placed_df = cancelation_df.filter(col("is_canceled") == False)
monthly_placed_df = monthly_placed_df.groupBy("ym").agg(countDistinct("invoice_no").alias("monthly_placed"))

# Filter out canceled invoices and count distinct invoice_no
cancelation_df = cancelation_df.filter(col("is_canceled") == True)
cancelation_df = cancelation_df.groupBy("ym").agg(countDistinct("invoice_no").alias("monthly_canceled"))

# Concatenate (join) the two DataFrames on the "ym" column
final_df = monthly_placed_df.join(cancelation_df, "ym", "outer")

#final_df.show()
display(final_df)


ym,monthly_placed,monthly_canceled
200912,1929,401
201001,1333,300
201002,1729,240
201003,1960,407
201004,1588,304
201005,2011,407
201006,1859,357
201007,1673,344
201008,1604,273
201009,2004,371


Databricks visualization. Run in Databricks to view.

# Monthly Sales

In [0]:
from pyspark.sql.functions import sum

# Calculate amount
amount_df = retail_df.withColumn("amount", col("quantity") * col("unit_price"))
amount_df = amount_df.withColumn("ym", date_format(col("invoice_date"), "yyyyMM"))

amount_df = amount_df.groupBy("ym").agg(
    round(sum("amount"), 2).alias("Sales(Million)")
)

display(amount_df)

ym,Sales(Million)
201001,624032.89
201002,533091.43
200912,799847.11
201003,765848.76
201007,575236.36
201004,590580.43
201006,679786.61
201005,615322.83
201103,683267.08
201010,1045168.35


Databricks visualization. Run in Databricks to view.

# Monthly Sales Growth


In [0]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window

# Define a window specification to calculate previous row
window_spec = Window.orderBy("ym")

# Add the previous row's 'Sales(Million)' using lag function
amount_df = amount_df.withColumn("previous_amount", lag("Sales(Million)").over(window_spec))

# Calculate the percentage growth
amount_df = amount_df.withColumn("pct_growth", 
                                 (col("Sales(Million)") - col("previous_amount")) / col("previous_amount") * 100)

display(amount_df)

ym,Sales(Million),previous_amount,pct_growth
200912,799847.11,,
201001,624032.89,799847.11,-21.98097833972295
201002,533091.43,624032.89,-14.57318379484773
201003,765848.76,533091.43,43.66180300441144
201004,590580.43,765848.76,-22.88550157083233
201005,615322.83,590580.43,4.189505568276264
201006,679786.61,615322.83,10.47641609527149
201007,575236.36,679786.61,-15.379863101451791
201008,656776.34,575236.36,14.175039282982734
201009,853650.43,656776.34,29.97581946998884


Databricks visualization. Run in Databricks to view.

# Monthly Active Users

In [0]:
user_df = retail_df.withColumn("ym", date_format(col("invoice_date"), "yyyyMM"))
user_df = user_df.groupBy("ym").agg(countDistinct("customer_id"))
display(user_df)

ym,count(customer_id)
201103,1020
201001,786
201002,807
201010,1577
201009,1202
201104,899
201106,1051
201102,798
201110,1425
201008,964


Databricks visualization. Run in Databricks to view.

# New and Existing Users



In [0]:
from pyspark.sql.functions import col, min, date_format, countDistinct

# Add 'ym' to the main retail_df first
retail_df = retail_df.withColumn("ym", date_format(col("invoice_date"), "yyyyMM"))

# Then create the first purchase DataFrame
first_purchase_df = retail_df.groupBy("customer_id").agg(
    min("ym").alias("first_purchase_ym")
)

# Join the first purchase info back to the full dataset (retail_df already has 'ym')
retail_df_with_first_purchase = retail_df.join(
    first_purchase_df, on="customer_id", how="left"
)

# Identify new users: ym == first_purchase_ym
new_user_df = retail_df_with_first_purchase.filter(col("ym") == col("first_purchase_ym"))
new_user_count = new_user_df.groupBy("ym").agg(countDistinct("customer_id").alias("new_user_count"))

# Identify existing users: ym > first_purchase_ym
ex_user_df = retail_df_with_first_purchase.filter(col("ym") > col("first_purchase_ym"))
ex_user_count = ex_user_df.groupBy("ym").agg(countDistinct("customer_id").alias("ex_user_count"))

# Merge new and existing user counts
user_count_df = new_user_count.join(ex_user_count, on="ym", how="outer").fillna(0)

display(user_count_df)


ym,new_user_count,ex_user_count
200912,1045,0
201001,394,392
201002,363,444
201003,436,675
201004,291,707
201005,254,808
201006,269,826
201007,183,805
201008,158,806
201009,242,960


Databricks visualization. Run in Databricks to view.

## Finding RFM

RFM is a method used for analyzing customer value. It is commonly used in database marketing and direct marketing and has received particular attention in the retail and professional services industries. ([wikipedia](https://en.wikipedia.org/wiki/RFM_(market_research)))


RFM stands for three dimensions:

- Recency – How recently did the customer purchase?

- Frequency – How often do they purchase?

- Monetary Value – How much do they spend?


In [0]:
from pyspark.sql.functions import max, datediff, lit, to_date, count

amount_df = retail_df.withColumn("amount", col("quantity") * col("unit_price"))

current_date = to_date(lit("2025-02-05"))

# Get last purchase date per customer
recency_df = amount_df.groupBy("customer_id").agg(
    max("invoice_date").alias("last_purchase_date")
)

# Calculate Recency
recency_df = recency_df.withColumn(
    "recency", datediff(current_date, col("last_purchase_date"))
)
recency_df = recency_df.select("customer_id", "recency")

# Calculate Frequency
frequency_df = retail_df.groupBy("customer_id").agg(
    countDistinct("invoice_no").alias("frequency")
)

# Calculate Monetary 
monetary_df = amount_df.groupBy("customer_id").agg(round(
    sum("amount"),2).alias("monetary"))

rfm_df = recency_df.join(frequency_df, on="customer_id", how="inner")
rfm_df = rfm_df.join(monetary_df, on="customer_id", how="inner")

display(rfm_df)


customer_id,recency,frequency,monetary
18051,5441,8,2275.98
13623,4837,15,2446.36
14832,5437,3,-274.63
17389,4807,77,54587.03
15447,5137,6,484.93
15727,4823,15,9445.51
17753,5271,5,388.06
17679,4859,11,3166.56
13285,4830,6,3364.59
13289,5530,1,307.95


# RFM Segmentation

In [0]:
from pyspark.sql.functions import ntile, concat_ws

recency_window = Window.orderBy(col("recency").asc())
frequency_window = Window.orderBy(col("frequency").desc())
monetary_window = Window.orderBy(col("monetary").desc())

rfm_df = rfm_df.withColumn("recency_score", ntile(5).over(recency_window))
rfm_df = rfm_df.withColumn("frequency_score", ntile(5).over(frequency_window))
rfm_df = rfm_df.withColumn("monetary_score", ntile(5).over(monetary_window))

# Concatenate scores to get RFM score
rfm_df = rfm_df.withColumn(
    "rfm_score",
    concat_ws("", col("recency_score").cast("string"),
                  col("frequency_score").cast("string"),
                  col("monetary_score").cast("string"))
)

display(rfm_df)

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score
18102,4807,153,598215.22,1,1,1,111
14646,4808,164,523342.07,1,1,1,111
14156,4816,202,296564.69,1,1,1,111
14911,4808,510,270248.53,1,1,1,111
17450,4815,61,233579.39,1,1,1,111
13694,4810,164,190825.52,1,1,1,111
17511,4809,85,171885.98,1,1,1,111
12415,4831,33,143269.29,2,1,1,211
16684,4811,65,141502.25,1,1,1,111
15061,4810,138,136391.48,1,1,1,111


In [0]:
from pyspark.sql.functions import concat_ws, udf
from pyspark.sql.types import StringType

rfm_df = rfm_df.withColumn(
    "rf_segment", concat_ws("", col("recency_score").cast("string"), col("frequency_score").cast("string"))
)

seg_map = {
    "11": "Hibernating", "12": "Hibernating", "21": "Hibernating", "22": "Hibernating",
    "13": "At Risk", "14": "At Risk", "23": "At Risk", "24": "At Risk",
    "25": "Can't Lose",
    "31": "About to Sleep", "32": "About to Sleep",
    "33": "Need Attention",
    "34": "Loyal Customers", "35": "Loyal Customers",
    "44": "Loyal Customers", "45": "Loyal Customers",
    "41": "Promising",
    "51": "New Customers",
    "42": "Potential Loyalists", "43": "Potential Loyalists",
    "52": "Potential Loyalists", "53": "Potential Loyalists",
    "54": "Champions", "55": "Champions"
}

def segment_lookup(rf_segment):
    return seg_map.get(rf_segment, "Other")

segment_udf = udf(segment_lookup, StringType())

rfm_df = rfm_df.withColumn("segment", segment_udf(col("rf_segment")))

display(rfm_df)

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score,rf_segment,segment
18102,4807,153,598215.22,1,1,1,111,11,Hibernating
14646,4808,164,523342.07,1,1,1,111,11,Hibernating
14156,4816,202,296564.69,1,1,1,111,11,Hibernating
14911,4808,510,270248.53,1,1,1,111,11,Hibernating
17450,4815,61,233579.39,1,1,1,111,11,Hibernating
13694,4810,164,190825.52,1,1,1,111,11,Hibernating
17511,4809,85,171885.98,1,1,1,111,11,Hibernating
12415,4831,33,143269.29,2,1,1,211,21,Hibernating
16684,4811,65,141502.25,1,1,1,111,11,Hibernating
15061,4810,138,136391.48,1,1,1,111,11,Hibernating


In [0]:
from pyspark.sql.functions import avg

agg_df = rfm_df.groupBy("segment").agg(
    round(avg("recency")).alias("avg_recency"),
    round(avg("frequency")).alias("avg_frequency"),
    round(avg("monetary"),2).alias("avg_monetary"),
    count("segment").alias("count")
)

display(agg_df)

segment,avg_recency,avg_frequency,avg_monetary,count
Champions,5374.0,1.0,287.52,939
Promising,5100.0,19.0,6944.67,58
At Risk,4832.0,2.0,726.91,844
About to Sleep,4912.0,10.0,3175.23,551
Hibernating,4826.0,19.0,7787.51,1534
Potential Loyalists,5168.0,5.0,1233.72,747
Loyal Customers,5049.0,1.0,430.4,997
Need Attention,4921.0,4.0,1026.08,261
New Customers,5277.0,21.0,4355.25,11
