🛍️ RFM Analysis for Customer Segmentation
Objective:
Identify and categorize customers based on Recency, Frequency, and Monetary metrics to improve marketing strategies and retention.

Dataset:

Retail transactions dataset

Columns include: InvoiceDate, Customer ID, Revenue, etc.

Cleaned & preprocessed before analysis

Approach:

Data cleaning and preparation

Calculate Recency, Frequency, and Monetary values

Assign RFM scores using quintiles (1–5)

Combine scores to create RFM segments

Visualize customer distribution by segment

Provide actionable insights

In [1]:
import os

# Set PySpark environment variables to use Anaconda Python (customize these paths as needed)

os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python'


from pyspark.sql import SparkSession

# Start a new Spark session for this retail project
spark=SparkSession.builder.appName('Retail_Project1').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 12:19:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
import pandas as pd

# Read the raw retail data from an Excel file (source format)
df_excel=pd.read_excel("online_retail_II.xlsx")

# Convert the Excel sheet to CSV so Spark can easily consume it
df_excel.to_csv("retail_project_data.csv", index=False)

In [None]:
# Load CSV data into Spark DataFrame
df=spark.read.csv("retail_project_data.csv",header=True)

# Show first 5 rows and infer schema
df.show(5)
df.printSchema()

In [None]:
# The default schema treats many fields as strings: we need to specify actual data types

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

schema = StructType([
    StructField("Invoice", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampType(), True),
    StructField("Price", DoubleType(), True),
    StructField("Customer ID", StringType(), True),
    StructField("Country", StringType(), True)
])

df=spark.read.csv("retail_project_data.csv",header=True,schema=schema)      
df.show(5)
df.printSchema()

In [None]:
# Summarize dataset: count, mean, stddev, min, max for each numeric column
df.describe().collect()

# Check number of nulls in each column
from pyspark.sql.functions import col, sum as _sum
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

In [None]:
# Remove problematic rows according to business rules:
# 1. Remove transactions with missing Customer ID.
# 2. Remove cancellation invoices (Invoice starting with 'C').
# 3. Remove rows with negative/zero Quantity or Price.

df=df.filter(col('Customer ID').isNotNull())
df=df.filter(~col('Invoice').startswith('C'))          # Tilde (~): NOT operator in PySpark
df=df.filter(df.Quantity>0).filter(df.Price>0) 

In [None]:
'''
This way:

Null Customer ID → dropped

Cancellation invoices → dropped

Negative/zero quantities → dropped

Negative/zero prices → dropped
'''

In [None]:
#we can filter in both ways:
'''

df.filter(col...)
or
df.filter(df.col_name....)

'''

In [None]:
df.count()

In [None]:
#10k nearly rows are dropped

In [None]:
# Calculate Revenue for each row (i.e., revenue = quantity * price)

df=df.withColumn('Revenue',df.Price * df.Quantity)

In [None]:
# Aggregate: total revenue per Country and InvoiceDate

df.groupBy('Country','InvoiceDate').sum('Revenue').withColumnRenamed('sum_Rev','total_Rev').show()  

In [None]:
from pyspark.sql.functions import to_date, month, year

# Extract year and month for time-based analysis
df=df.withColumn('InvoiceDate',to_date(col('InvoiceDate'), 'yyyy-MM-dd'))
df=df.withColumn('month',month(col('InvoiceDate')))
df=df.withColumn('year',year(col('InvoiceDate')))


# Aggregate revenue over year, month, and country
df.groupBy('year','month','Country').sum('Revenue').withColumnRenamed('sum_Rev','total_Rev').orderBy('year','month','Country').show() 

In [None]:
# Save cleaned DataFrame and aggregated DataFrames as Parquet for efficient storage/reloading
df.write.mode('overwrite').parquet('cleaned_DataFrame')

agg_df_RbyCC=df.groupBy('Country','InvoiceDate').sum('Revenue').withColumnRenamed('sum_Rev','total_Rev')

agg_df_RbyMC=df.groupBy('year','month','Country').sum('Revenue').withColumnRenamed('sum_Rev','total_Rev').orderBy('year','month','Country')

agg_df_RbyCC.write.mode('overwrite').parquet('RbyCC')
agg_df_RbyMC.write.mode('overwrite').parquet('RbyMC')

In [None]:
# Find top 10 customers by total revenue

df_cleaned=spark.read.parquet('cleaned_DataFrame')
top_N_cust=df_cleaned.groupBy('Customer ID').sum('Revenue').orderBy(col('sum(Revenue)').desc()).limit(10).show() 

In [None]:
#monthly_trend=df_cleaned.groupBy("year", "month").sum("Revenue").orderBy(col("year").desc(), col("month").desc()).show()



In [None]:
df_cleaned.printSchema()

In [None]:
# RFM: Key metrics for customer segmentation

from pyspark.sql.functions import max, current_date, datediff,lit,countDistinct

# Find the most recent InvoiceDate in the dataset to define "today"
max_date = df_cleaned.select(max("InvoiceDate")).collect()[0][0]

# Recency: Days since last purchase for each customer
recency_df = df_cleaned.groupBy("Customer ID").agg(datediff(lit(max_date), max("InvoiceDate")).alias("Recency")).orderBy(col('Recency').desc())

# Frequency: Count of unique invoices per customer
frequency_df = df_cleaned.groupBy("Customer ID").agg(countDistinct("Invoice").alias("Frequency")).orderBy(col('Frequency'))

# Monetary: Total revenue per customer
monetary_df=df_cleaned.groupBy('Customer ID').sum('Revenue').withColumnRenamed('sum(Revenue)','Monetary').orderBy(col('Monetary').desc())

In [None]:
# Join R, F, M metrics into a single DataFrame
RFM_df=recency_df.join(frequency_df, 'Customer ID').join(monetary_df,'Customer ID')
RFM_df.show(5)

In [None]:
from pyspark.sql.functions import ntile,concat_ws
from pyspark.sql.window import Window

# Score: assign 1-5 bucket to each metric (1: best, 5: worst for recency; 5: best for frequency/monetary)


recency_score=RFM_df.withColumn('recency_score', ntile(5).over(Window.orderBy(col("Recency").asc())))
frequency_score=RFM_df.withColumn('frequency_score', ntile(5).over(Window.orderBy(col("Frequency").desc())))
monetary_score=RFM_df.withColumn('monetary_score', ntile(5).over(Window.orderBy(col("Monetary").desc())))

# Combine all scores for each customer
RFM_score=recency_score.join(frequency_score, on='Customer ID').join(monetary_score, on='Customer ID')


# Concatenated and total RFM score
RFM_score_combibed=RFM_score.withColumn('RFM_score_combined',(concat_ws("",col('recency_score'),col('frequency_score'),col('monetary_score'))))
RFM_score_total=RFM_score.withColumn('RFM_score_total',col('recency_score')+col('frequency_score')+col('monetary_score'))

In [None]:
#segement analysis::
from pyspark.sql.functions import when

# Assign each customer to a segment based on their total RFM score
RFM_segmented = RFM_score_total.withColumn(
    "Segment",
    when(col("RFM_score_total") >= 13, "Champions") \
    .when((col("RFM_score_total") >= 10) & (col("RFM_score_total") <= 12), "Loyal Customers") \
    .when((col("RFM_score_total") >= 7) & (col("RFM_score_total") <= 9), "Potential Loyalist") \
    .when((col("RFM_score_total") >= 4) & (col("RFM_score_total") <= 6), "Needs Attention") \
    .otherwise("At Risk")
)
RFM_segmented.show(20)

In [None]:
RFM_segmented.groupBy("Segment").count().orderBy(col("count").desc()).show()


In [None]:
import matplotlib.pyplot as plt


# Convert Spark DataFrame to Pandas for plotting
segment_counts = RFM_segmented.groupBy("Segment").count().toPandas()
# Sort by count descending
segment_counts = segment_counts.sort_values(by="count", ascending=False)


plt.figure(figsize=(10,6))
plt.bar(segment_counts["Segment"], segment_counts["count"], color="skyblue")
plt.xticks(rotation=45)
plt.xlabel("Customer Segment")
plt.ylabel("Number of Customers")
plt.title("RFM Customer Segmentation Distribution")
plt.tight_layout()
plt.show()


📌 Conclusion
In this project, we performed an RFM (Recency, Frequency, Monetary) analysis on transactional data using PySpark. The workflow involved:

ETL & Data Cleaning: Removed null values, filtered out negative quantities/prices, and calculated Revenue.

RFM Calculation: Computed Recency, Frequency, and Monetary metrics for each customer.

Scoring: Assigned RFM scores using ntile() for segmentation ranking.

Segmentation: Categorized customers into meaningful groups such as Champions, Loyal Customers, At Risk, etc.

Visualization: Created bar charts to show distribution of customer segments.

Key Insights
Champions and Loyal Customers represent the largest segments, indicating a strong base of repeat buyers.

At Risk customers should be targeted with retention campaigns before they churn.

Potential Loyalists could be converted into long-term buyers with personalized offers.

The distribution of segments shows opportunities for both retention and growth marketing.

Business Implications
Retention Strategies: Special offers, loyalty programs, and personalized communication for At Risk and Needs Attention segments.

Upselling/Cross-selling: Focus on Champions and Loyal Customers with premium products or bundles.

Nurturing New Buyers: Engage Potential Loyalists to increase purchase frequency.

Limitations & Next Steps
The dataset size is relatively small; results may differ with larger, more diverse data.

Segmentation was based only on RFM; additional behavioral and demographic data could improve targeting.

Future work could integrate time series trend analysis, predictive modeling (e.g., churn prediction), or recommendation systems.