## Imports

In [0]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import round 

## Data Loading

In [0]:
file_path = "/FileStore/tables/retail.csv"

df = spark.read.option("header", True) \
               .option("inferSchema", True) \
               .csv(file_path)

display(df)


invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


In [0]:
df.printSchema()



root
 |-- invoice_no: string (nullable = true)
 |-- stock_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- country: string (nullable = true)



In [0]:
df.describe().show()

+-------+------------------+------------------+--------------------+-----------------+------------------+------------------+-----------+
|summary|        invoice_no|        stock_code|         description|         quantity|        unit_price|       customer_id|    country|
+-------+------------------+------------------+--------------------+-----------------+------------------+------------------+-----------+
|  count|           1067371|           1067371|             1062989|          1067371|           1067371|            824364|    1067371|
|   mean| 537608.1499316233|29011.161534536903|            21848.25|  9.9388984711033| 4.649387502564253| 15324.63850435002|       null|
| stddev|26662.450446906063|18822.942866189165|   922.9197780233488|172.7057940767532|123.55308764985611|1697.4644503793108|       null|
|    min|            489434|             10002|  DOORMAT UNION J...|           -80995|          -53594.4|             12346|  Australia|
|    max|           C581569|             

In [0]:
# renaming columns
df = df.withColumnRenamed('invoice_no', 'invoice') \
       .withColumnRenamed('StockCode', 'stock_code') \
       .withColumnRenamed('Description', 'description') \
       .withColumnRenamed('Quantity', 'quantity') \
       .withColumnRenamed('InvoiceDate', 'invoice_date') \
       .withColumnRenamed('unit_price', 'price') \
       .withColumnRenamed('Customer ID', 'customer_id') \
       .withColumnRenamed('Country', 'country')


In [0]:


df = df.withColumn("invoice", col("invoice").cast(StringType())) \
       .withColumn("stock_code", col("stock_code").cast(StringType())) \
       .withColumn("description", col("description").cast(StringType())) \
       .withColumn("quantity", col("quantity").cast(IntegerType())) \
       .withColumn("invoice_date", col("invoice_date").cast(TimestampType())) \
       .withColumn("price", col("price").cast(FloatType())) \
       .withColumn("customer_id", col("customer_id").cast(FloatType())) \
       .withColumn("country", col("country").cast(StringType()))



df = df.withColumn("total_amount", expr("quantity * price"))


## Total Amount Invoice Distribution

In [0]:
## removing the outliers and bad invoices


In [0]:
from pyspark.sql.functions import col

# Filter out bad invoices (non-numeric invoice numbers and zero/negative total_amount)
filtered_df = df.filter((col("total_amount") > 0) & (col("invoice").rlike("^\d+$")))


In [0]:
# Group by invoice and sum the total_amount
invoice_total_df = filtered_df.groupBy("invoice") \
                              .sum("total_amount") \
                              .withColumnRenamed("sum(total_amount)", "total_amount")


In [0]:
# Compute the 85th percentile (quantile)
quantile_value = invoice_total_df.approxQuantile("total_amount", [0.85], 0.01)[0]

# Filter out outliers
invoice_total_df = invoice_total_df.withColumn('total_amount', round('total_amount', 2))

In [0]:
# Display the cleaned data
display(invoice_total_df)


invoice,total_amount
489677,192.0
491045,303.2
491658,155.06
493542,118.75
493977,275.95
494244,6711.0
494277,1335.92
495185,2507.06
495783,48.96
496171,199.3


## Monthly Placed and Canceled Orders

In [0]:
from pyspark.sql import functions as F

# Step 1: Add a column for the month (formatted as YYYYMM)
df = df.withColumn("date", F.date_format("invoice_date", "yyyyMM"))

# Step 2: Monthly Cancellations - Filter invoices containing 'C'
monthly_cancels = (df.filter(F.col("invoice").contains("C"))
                   .dropDuplicates(["invoice"])
                   .groupBy("date")
                   .agg(F.count("invoice").alias("cancellations")))

# Step 3: Monthly Orders - Count distinct invoices
monthly_orders = (df.dropDuplicates(["invoice"])
                  .groupBy("date")
                  .agg(F.count("invoice").alias("placements")))

# Step 4: Combine the cancellations and placements
monthly_orders = monthly_orders.join(monthly_cancels, on="date", how="left_outer").fillna(0)

# Adjust for cancellations
monthly_orders = monthly_orders.withColumn("placements", F.col("placements") - 2 * F.col("cancellations"))

# Step 5: Show the result
display(monthly_orders)


date,placements,cancellations
201103,1347,318
201001,1033,300
201002,1489,240
201010,2013,476
201009,1633,371
201104,1264,240
201106,1354,329
201102,955,219
201110,1913,362
201008,1331,273


## Monthly Sales

In [0]:
from pyspark.sql.functions import col, month, year, sum as _sum

# Add a 'date' column formatted as 'YYYYMM' (if it's not already in that format)
df = df.withColumn('date', df['invoice_date'].substr(1, 7))  # This gives 'YYYY-MM'

# Filter rows with valid invoices (no 'D' in invoice) and calculate the total sales per month
monthly_sales = (df.filter(~df['invoice'].rlike('\D'))  # Filter to keep only numerical invoice numbers
                 .groupBy('date')  # Group by month (formatted date)
                 .agg(_sum('total_amount').alias('total_sales')))  # Sum the total_amount per group

# Display the result
display(monthly_sales)


date,total_sales
2010-02,553339.734475787
2010-03,833570.1277497764
2010-01,652708.4992080962
2009-12,825685.7573565356
2010-04,681528.9893587043
2010-06,752270.1472830437
2010-07,650712.9387189075
2010-05,659858.8580619022
2010-08,697274.908798039
2010-09,924333.0085303576


## Monthly Sales Growth

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Order monthly sales by 'date' to calculate growth
monthly_sales = monthly_sales.orderBy('date')

# Calculate the percentage change in sales from the previous month
monthly_sales = monthly_sales.withColumn('monthly_growth', 
                                         (F.col('total_sales') - F.lag('total_sales').over(Window.orderBy('date'))) / 
                                         F.lag('total_sales').over(Window.orderBy('date')) * 100)

# Display the result
display(monthly_sales)


date,total_sales,monthly_growth
2009-12,825685.7573565356,
2010-01,652708.4992080962,-20.949526694299863
2010-02,553339.734475787,-15.224064778207898
2010-03,833570.1277497764,50.64346111696983
2010-04,681528.9893587043,-18.23975372072261
2010-05,659858.8580619022,-3.179634562161903
2010-06,752270.1472830437,14.0047054142103
2010-07,650712.9387189075,-13.500098193571544
2010-08,697274.908798039,7.155531619027032
2010-09,924333.0085303576,32.563641236384235


## Monthly Active Users

In [0]:
from pyspark.sql import functions as F

# Group by 'date' and 'customer_id' to count unique active users per month
monthly_active = (df
                  .dropDuplicates(['date', 'customer_id'])
                  .groupby('date')
                  .agg(F.count('customer_id').alias('active_users')))

# Show the result
display(monthly_active)


date,active_users
2010-08,964
2010-11,1683
2010-02,807
2010-04,998
2011-05,1079
2010-03,1111
2010-01,786
2010-09,1202
2010-06,1095
2011-03,1020


## New and Existing Users

In [0]:
from pyspark.sql import functions as F

# Step 1: Format the date as yyyyMM
df = df.withColumn('date', F.date_format('invoice_date', 'yyyyMM'))

# Step 2: Find the first purchase date for each customer
first_purchase = (df
                  .groupBy('customer_id')
                  .agg(F.min('date').alias('first_purchase_date')))

# Step 3: Join first purchase date with the main DataFrame
df_with_first_purchase = df.join(first_purchase, on='customer_id', how='left')

# Step 4: Identify new users (those whose first purchase is in a given month)
new_user_count = (df_with_first_purchase
                  .filter(df_with_first_purchase['first_purchase_date'] == df_with_first_purchase['date'])
                  .groupBy('date')
                  .agg(F.count('customer_id').alias('new_users')))

# Step 5: Identify old users (those who made purchases after their first purchase)
old_user_count = (df_with_first_purchase
                  .filter(df_with_first_purchase['first_purchase_date'] != df_with_first_purchase['date'])
                  .groupBy('date')
                  .agg(F.count('customer_id').alias('old_users')))

# Step 6: Merge New and Old User Counts
final_user_count = new_user_count.join(old_user_count, on='date', how='left').fillna(0)

# Step 7: Display the result (no plot)
display(final_user_count)


date,new_users,old_users
201103,4654,23168
201001,9625,12814
201002,9689,14217
201010,11232,39329
201009,6247,29139
201104,2557,20641
201106,2958,24878
201102,2762,17601
201110,7880,42815
201008,4746,22196


## RFM

In [0]:
from pyspark.sql import functions as F
from datetime import datetime

# Step 1: Get today's date (or specify the reference date)
today = datetime(2012, 1, 1)

# Step 2: Convert the today date to a PySpark-compatible format
today_date = F.lit(today)

# Step 3: Recency calculation: Find the most recent purchase for each customer
recency_df = df.groupBy('customer_id').agg(
    F.datediff(today_date, F.max('invoice_date')).alias('recency')
)

# Step 4: Frequency calculation: Count the number of purchases per customer
frequency_df = df.groupBy('customer_id').agg(
    F.count('invoice').alias('frequency')
)

# Step 5: Monetary calculation: Sum the total spending per customer
monetary_df = df.groupBy('customer_id').agg(
    F.sum('total_amount').alias('monetary')
)

# Step 6: Join the recency, frequency, and monetary DataFrames
rfm_df = recency_df.join(frequency_df, on='customer_id', how='left') \
                   .join(monetary_df, on='customer_id', how='left')

# Step 7: Display the RFM DataFrame
display(rfm_df)


customer_id,recency,frequency,monetary
12636.0,761,1.0,141.0
17804.0,399,93.0,380.12999856472015
14204.0,25,232.0,1062.9599931240082
17323.0,38,340.0,1553.510007083416
16156.0,31,280.0,5933.590007066727
12714.0,32,696.0,14093.709978342056
13999.0,33,475.0,9438.009954929352
17854.0,208,120.0,1071.449997395277
13883.0,24,336.0,6342.740051031113
15682.0,622,26.0,316.9900002479553


## RFM Segmentation

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Window specs
w_recency = Window.orderBy("recency")
w_frequency = Window.orderBy("frequency")
w_monetary = Window.orderBy("monetary")

# Add RFM scores
rfm_scored = rfm_df \
    .withColumn("recency_score", F.ntile(5).over(w_recency)) \
    .withColumn("frequency_score", F.ntile(5).over(w_frequency)) \
    .withColumn("monetary_score", F.ntile(5).over(w_monetary))


In [0]:
rfm_scored = rfm_scored.withColumn(
    "segment_code",
    F.concat_ws("", F.col("recency_score").cast("string"), F.col("frequency_score").cast("string"))
)


In [0]:
from pyspark.sql.functions import when
#map segement codes to segment the labels
rfm_labeled = rfm_scored.withColumn(
    "segment",
    when(F.col("segment_code").rlike("^[1-2][1-2]"), "Hibernating")
    .when(F.col("segment_code").rlike("^[1-2][3-4]"), "At Risk")
    .when(F.col("segment_code").rlike("^[1-2]5"), "Can't Lose")
    .when(F.col("segment_code").rlike("^3[1-2]"), "About to Sleep")
    .when(F.col("segment_code") == "33", "Need Attention")
    .when(F.col("segment_code").rlike("^[3-4][4-5]"), "Loyal Customers")
    .when(F.col("segment_code") == "41", "Promising")
    .when(F.col("segment_code") == "51", "New Customers")
    .when(F.col("segment_code").rlike("^[4-5][2-3]"), "Potential Loyalists")
    .when(F.col("segment_code").rlike("^5[4-5]"), "Champions")
    .otherwise("Other")
)


In [0]:
rfm_summary = rfm_labeled.groupBy("segment").agg(
    F.round(F.mean("recency"), 2).alias("avg_recency"),
    F.round(F.mean("frequency"), 2).alias("avg_frequency"),
    F.round(F.mean("monetary"), 2).alias("avg_monetary"),
    F.count("*").alias("count")
).orderBy("segment")

display(rfm_summary)


segment,avg_recency,avg_frequency,avg_monetary,count
About to Sleep,131.95,18.42,542.1,427
At Risk,45.92,97.82,2046.45,999
Can't Lose,39.79,534.89,11481.3,890
Champions,505.47,142.16,1877.24,116
Hibernating,48.25,20.0,632.14,489
Loyal Customers,204.08,196.95,3064.2,765
Need Attention,130.54,55.92,1348.35,278
New Customers,595.83,7.08,41.24,491
Potential Loyalists,448.79,37.29,653.72,1187
Promising,339.38,8.38,374.24,301
