In [60]:
# Kernel: Python 3.10.5
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from functools import reduce

### Data Cleaning + Feature Engineering

#### 1. Setup

In [None]:
# Optional if not already set in system env vars
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-1.8"
os.environ["SPARK_HOME"] = "C:\\spark-3.5.1-bin-hadoop3"

findspark.init()

spark = SparkSession.builder \
    .appName("DataCleaning") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()


#### 2. Load data

In [10]:
df = spark.read.csv("OnlineRetail.csv", header=True, inferSchema=True)
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



#### 3. Data Profiling, Anomaly Detection and Data Cleaning

##### 3.1 Check the number of colummns and rows

In [11]:
# Check the number of columns and rows
print("Number of columns: ", len(df.columns))
print("Number of rows: ", df.count())

Number of columns:  8
Number of rows:  541909


##### 3.2 Abnormal 1: Check and correct Data Type

In [12]:
# Check schema
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [13]:
# Convert datatype of InvoiceDate to timestamp
df_cleaned = df.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "M/d/yyyy H:mm"))

# Re-check schema
df_cleaned.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



##### 3.3 Abnormal 2: Check and handle missing values

In [14]:
# Check for missing values in original data
missing_values = df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns
])
missing_values.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



- Description: 1,454 --> remove since it is hard to guess the product description
- CustomerID: 135,080 --> might keep them, change `null` value into `Unknown`

In [15]:
# Delete the null values in Description
df_cleaned = df_cleaned.filter(df_cleaned.Description.isNotNull())

# Change the value in CustomerID into "Unknown" for null values
df_cleaned = df_cleaned.withColumn("CustomerID", when(df_cleaned.CustomerID.isNull(), "Unknown").otherwise(df_cleaned.CustomerID))

In [16]:
# Re-check for missing values in the cleaned data
missing_values = df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns
])
missing_values.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



##### 3.4 Abnormal 3: Check and handle duplicate rows

In [17]:
# Check the number of duplicate rows
duplicate_count = df_cleaned.count() - df_cleaned.dropDuplicates().count()
print("Number of duplicate rows: ", duplicate_count)

Number of duplicate rows:  5268


In [18]:
# Drop duplicate rows
df_cleaned = df_cleaned.dropDuplicates()

In [19]:
# Re-check the number of duplicate rows
duplicate_count = df_cleaned.count() - df_cleaned.dropDuplicates().count()
print("Number of duplicate rows: ", duplicate_count)

Number of duplicate rows:  0


##### 3.5 Abnormal 4: Handle negative Quantity and UnitPrice

**Check the number of cancelled order:** The InvoiceNo starts with C (has negative Quantity)

In [20]:
cancel = df_cleaned.filter(col("InvoiceNo").startswith("C"))
print("Number of canceled orders: ", cancel.count())
cancel.show(5)

Number of canceled orders:  9251
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|  C536825|    22617|BAKING SET SPACEB...|      -1|2010-12-02 17:27:00|     4.95|     15384|United Kingdom|
|  C537251|    22747|POPPY'S PLAYHOUSE...|      -6|2010-12-06 10:45:00|      2.1|   Unknown|United Kingdom|
|  C537805|    22197|SMALL POPCORN HOLDER|      -1|2010-12-08 13:18:00|     0.72|     15311|United Kingdom|
|  C538103|    22941|CHRISTMAS LIGHTS ...|      -2|2010-12-09 15:13:00|      8.5|     17442|United Kingdom|
|  C538768|    84378|SET OF 3 HEART CO...|     -24|2010-12-14 11:34:00|     1.25|     14829|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+-----------

**Check for invalid values in Quantity and UnitPrice:**
- Quantity < 0
- UnitPrice < 0

In [21]:
# Quantity has negative value
negative_Quantity = df_cleaned.filter(col("Quantity") < 0).count()
print("Number of negative Quantity: ", negative_Quantity)
df_cleaned.filter(col("Quantity") < 0).select("InvoiceNo", "Quantity").show(5)

Number of negative Quantity:  9725
+---------+--------+
|InvoiceNo|Quantity|
+---------+--------+
|  C536825|      -1|
|  C537251|      -6|
|  C537805|      -1|
|  C538103|      -2|
|  C538768|     -24|
+---------+--------+
only showing top 5 rows



In [22]:
# UnitPrice has negative value
negative_UnitPrice = df_cleaned.filter(col("UnitPrice") < 0).count()
print("Number of negative UnitPrice: ", negative_UnitPrice)
df_cleaned.filter(col("UnitPrice") < 0).select("InvoiceNo", "UnitPrice").show(5)

Number of negative UnitPrice:  2
+---------+---------+
|InvoiceNo|UnitPrice|
+---------+---------+
|  A563186|-11062.06|
|  A563187|-11062.06|
+---------+---------+



- We can see that cancelled orders also have the negative Quantity.
- The dataset contains 9,251 cancelled orders, while 9,725 orders have a negative quantity.
- Therefore, in the Data Cleaning section, when we remove orders with negative quantities, we also remove the cancelled orders at the same time.

**Remove abnormal Quantity and UnitPrice**

In [23]:
# Filter out negative Quantity and UnitPrice
df_cleaned = df_cleaned.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))

# Check negative Quantity and UnitPrice again
print("Negative Quantity count:", df_cleaned.filter(col("Quantity") < 0).count())
print("Negative UnitPrice count:", df_cleaned.filter(col("UnitPrice") < 0).count())

# Check number of cancelled orders again
cancel = df_cleaned.filter(col("InvoiceNo").startswith("C"))
print("Number of cancelled orders: ", cancel.count())

Negative Quantity count: 0
Negative UnitPrice count: 0
Number of cancelled orders:  0


##### 3.6 Abnormal 5: Identify abnormal `StockCode`-`Description` pairs that are not actual products

**Check abnormal StockCode**

In [24]:
excluded_stockcodes = ["POST", "DOT", "M", "C2", "BANK CHARGES","S", "B", "AMAZONFEE",
                       "gift_0001_10", "gift_0001_20","gift_0001_30","gift_0001_40","gift_0001_50"]

# Identify rows with exclued StockCode
df_excluded = df_cleaned.filter(col("StockCode").isin(excluded_stockcodes))

# Show distinct excluded StockCode - Description pairs
df_excluded.select("StockCode", "Description").distinct().show(truncate=False)

+------------+----------------------------------+
|StockCode   |Description                       |
+------------+----------------------------------+
|POST        |POSTAGE                           |
|DOT         |DOTCOM POSTAGE                    |
|gift_0001_40|Dotcomgiftshop Gift Voucher �40.00|
|C2          |CARRIAGE                          |
|gift_0001_30|Dotcomgiftshop Gift Voucher �30.00|
|BANK CHARGES|Bank Charges                      |
|M           |Manual                            |
|AMAZONFEE   |AMAZON FEE                        |
|gift_0001_50|Dotcomgiftshop Gift Voucher �50.00|
|gift_0001_20|Dotcomgiftshop Gift Voucher �20.00|
|gift_0001_10|Dotcomgiftshop Gift Voucher �10.00|
|S           |SAMPLES                           |
|B           |Adjust bad debt                   |
+------------+----------------------------------+



**Handle abnormal StockCode and Description pairs that are not actual products**

In [25]:
df_cleaned = df_cleaned.filter(~col("StockCode").isin(excluded_stockcodes))

In [26]:
# Re-check the abnormal stock code
df_excluded = df_cleaned.filter(col("StockCode").isin(excluded_stockcodes))
df_excluded.select("StockCode", "Description").distinct().show(truncate=False)

+---------+-----------+
|StockCode|Description|
+---------+-----------+
+---------+-----------+



#### 4. Data cleaning results

In [27]:
# The number of rows before cleaning
rows_before_cleaning = df.count()
print(f"Number of rows before cleaning: {rows_before_cleaning}")

Number of rows before cleaning: 541909


In [28]:
# Check the number of rows after cleaning
rows_after_cleaning = df_cleaned.count()
print(f"Number of rows after cleaning: {rows_after_cleaning}")

Number of rows after cleaning: 522541


#### 5. Feature Engineering

In [29]:
# Convert InvoiceDate data type to DateType
df_fe = df_cleaned.withColumn("InvoiceDate", to_date(col("InvoiceDate"), "M/d/yyyy H:mm"))
df_fe.printSchema()
df_fe.show(5)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536384|    84755|COLOUR GLASS T-LI...|      48| 2010-12-01|     0.65|     18074|United Kingdom|
|   536385|    22168|ORGANISER WOOD AN...|       2| 2010-12-01|      8.5|     17420|United Kingdom|
|   536399|    22632|HAND WARMER RED P...|       6| 2010-12-01|     1.85|     17850|United Kingdom|
|   536401|    22767|TRIPLE PHOTO FRAM...|       2| 2010-12-01|  

In [30]:
# Get the max date of the dataset
max_date = df_fe.agg(max("InvoiceDate")).collect()[0][0]
max_date

datetime.date(2011, 12, 9)

In [31]:
# Calculate recency
recency_df = df_fe.groupBy("CustomerID").agg(
    datediff(lit(max_date), max("InvoiceDate")).alias("Recency")
)
recency_df.show(5)

+----------+-------+
|CustomerID|Recency|
+----------+-------+
|     16250|    261|
|     15574|    177|
|     15555|     12|
|     15271|      7|
|     17757|      1|
+----------+-------+
only showing top 5 rows



In [32]:
# Calculate frequency (number of transactions per customer)
frequency_df = df_fe.groupBy("CustomerID").agg(
    count("InvoiceNo").alias("Frequency")
)
frequency_df.show(5)

+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|     15574|      152|
|     15555|      885|
|     15271|      264|
|     17686|      286|
|     17714|       10|
+----------+---------+
only showing top 5 rows



In [33]:
# Calculate Monetary (Total money spent by customer)
monetary_df = df_fe.withColumn("TotalPrice", col("Quantity") * col("UnitPrice")) \
    .groupBy("CustomerID") \
    .agg(round(sum("TotalPrice"), 3).alias("Monetary"))
monetary_df.show(5)

+----------+--------+
|CustomerID|Monetary|
+----------+--------+
|     15574|  675.64|
|     15555| 4791.87|
|     15271| 2493.34|
|     17686| 5739.46|
|     17714|   153.0|
+----------+--------+
only showing top 5 rows



In [34]:
# Join all RFM features
dfs = [recency_df, frequency_df, monetary_df]
rfm_df = reduce(lambda df1, df2: df1.join(df2, "CustomerID"), dfs)
rfm_df.show(5)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     15574|    177|      152|  675.64|
|     15555|     12|      885| 4791.87|
|     15271|      7|      264| 2493.34|
|     17686|      7|      286| 5739.46|
|     17714|    320|       10|   153.0|
+----------+-------+---------+--------+
only showing top 5 rows



In [35]:
# Calculate the average of Recency, Frequency, and Monetary
rfm_df.select(
    mean("Recency").alias("Mean_Recency"),
    mean("Frequency").alias("Mean_Frequency"),
    mean("Monetary").alias("Mean_Monetary")
).show()


+-----------------+------------------+-----------------+
|     Mean_Recency|    Mean_Frequency|    Mean_Monetary|
+-----------------+------------------+-----------------+
|92.20530565167243|120.54002306805074|2363.834342099193|
+-----------------+------------------+-----------------+



Based on the mean recency, I chose 95 days as the threshold for labeling churn.
However, customers who haven’t returned for over 95 days but have high frequency (i.e., they’re loyal) shouldn’t be classified as churned based on recency alone.
Therefore, I also used the mean frequency, setting frequency = 120 as a second threshold to more accurately determine whether a customer is churned.

In [36]:
# Define thresholds based on the mean values
recency_threshold = 95
frequency_threshold = 120

# Create a new column 'Churn' based on the thresholds
rfm_df = rfm_df.withColumn(
    "Churn",
    when((col("Recency") > recency_threshold) & (col("Frequency") <= frequency_threshold), 1).otherwise(0)
)

# Show the resulting DataFrame with Churn column
rfm_df.select("CustomerID", "Recency", "Frequency", "Monetary", "Churn").show()

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|     15574|    177|      152|  675.64|    0|
|     15555|     12|      885| 4791.87|    0|
|     15271|      7|      264| 2493.34|    0|
|     17686|      7|      286| 5739.46|    0|
|     17714|    320|       10|   153.0|    1|
|     13865|     58|       30|  501.56|    0|
|     14157|     19|       46|  424.89|    0|
|     13610|     12|      215| 1082.33|    0|
|     16250|    261|       24|  389.44|    1|
|     13282|     18|       37| 1132.14|    0|
|     16320|    172|       56| 1038.46|    1|
|     13192|     95|       63|  911.94|    0|
|     12394|     63|       25| 1080.48|    0|
|     13772|     33|      173| 1122.63|    0|
|     17506|     75|       14|   302.2|    0|
|     18130|     15|       70| 1059.39|    0|
|     16504|     25|       83|  465.88|    0|
|     15634|     17|       15|  243.55|    0|
|     12847|     22|       84|  84

**Final Dataframe**

In [37]:
# Join churn label into transaction-level cleaned data
final_df = df_cleaned.join(rfm_df.select("CustomerID", "Churn"), on="CustomerID", how="left")
final_df.show()

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|Churn|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----+
|     12431|   536389|   35004G|SET OF 3 GOLD FLY...|       4|2010-12-01 10:03:00|     6.35|     Australia|    0|
|     12431|   536389|    21791|VINTAGE HEADS AND...|      12|2010-12-01 10:03:00|     1.25|     Australia|    0|
|     12433|   536532|    22551|PLASTERS IN TIN S...|      24|2010-12-01 13:24:00|     1.65|        Norway|    0|
|     12433|   536532|    21980|PACK OF 12 RED RE...|      48|2010-12-01 13:24:00|     0.29|        Norway|    0|
|     12433|   536532|    22544|MINI JIGSAW SPACEBOY|      24|2010-12-01 13:24:00|     0.42|        Norway|    0|
|     12433|   536532|    22198|LARGE POPCORN HOL...|      48|2010-12-01 13:24:00|     1

### Data Analysis

In [38]:
from pyspark.sql import functions as F

#### 1. Churn rate

In [48]:
# Total churn rate
churn_rate = final_df.groupBy("Churn").agg(
    F.count("*").alias("Count"),
    (F.count("*") / final_df.count() * 100).alias("Percentage")
)
churn_rate.show()

+-----+------+-----------------+
|Churn| Count|       Percentage|
+-----+------+-----------------+
|    1| 35735|6.838697824668303|
|    0|486806| 93.1613021753317|
+-----+------+-----------------+



Examine the churn rate by Country

In [None]:
# Churn rate by Country
churn_by_country = final_df.groupBy("Country") \
    .agg(
        F.count("*").alias("total"),
        F.sum("churn").alias("churned")
    ) \
    .withColumn("churn_rate", F.col("churned") / F.col("total")) \
    .orderBy(F.col("churn_rate").desc())
churn_by_group.show()

+--------------------+-----+-------+-------------------+
|             Country|total|churned|         churn_rate|
+--------------------+-----+-------+-------------------+
|              Brazil|   32|     32|                1.0|
|  European Community|   57|     57|                1.0|
|           Lithuania|   35|     35|                1.0|
|        Saudi Arabia|    9|      9|                1.0|
|United Arab Emirates|   67|     67|                1.0|
|             Lebanon|   45|     45|                1.0|
|             Bahrain|   18|     17| 0.9444444444444444|
|              Greece|  142|    107| 0.7535211267605634|
|               Malta|  109|     45|0.41284403669724773|
|               Japan|  321|     97|0.30218068535825543|
|         Unspecified|  442|    110|  0.248868778280543|
|             Austria|  384|     87|          0.2265625|
|              Poland|  325|     61|0.18769230769230769|
|               Italy|  741|    114|0.15384615384615385|
|            Portugal| 1455|   

--> We see that Brazil, European Community, Lithuania, Saudi Arabia, United Arab Emirates has the 100% churn rate. Let's dive into each country to see what happened

##### Brazil

In [None]:
brazil_df = final_df.filter((col("Churn") == 1) & (col("Country") == "Brazil"))
brazil_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|12769     |550201   |
+----------+---------+



In [61]:
# Let's check the RFM features of the customers
brazil_rfm_df = rfm_df.filter(col("CustomerID")=="12769")
brazil_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|12769     |238    |32       |1143.6  |1    |
+----------+-------+---------+--------+-----+



--> There is only one customer from Brazil. They purchased 32 items and then left 238 days ago. That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.

##### European Community


In [None]:
EC_df = final_df.filter(col("Country") == "European Community")
EC_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|15108     |551013   |
|15108     |555542   |
|15108     |560783   |
+----------+---------+



In [87]:
# Let's check the RFM features of the customers
EC_rfm_df = rfm_df.filter(col("CustomerID")=="15108")
EC_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|15108     |141    |57       |1159.25 |1    |
+----------+-------+---------+--------+-----+



--> There is only one customer from European Community. They purchased 57 items and then left 141 days ago. That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.

##### Lithuania

In [85]:
lithuania_df = final_df.filter(col("Country") == "Lithuania")
lithuania_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|15332     |537081   |
|15332     |537090   |
|15332     |537086   |
|15332     |537827   |
+----------+---------+



In [67]:
# Let's check the RFM features of the customers
lithuania_rfm_df = rfm_df.filter(col("CustomerID")=="15332")
lithuania_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|15332     |366    |35       |1661.06 |1    |
+----------+-------+---------+--------+-----+



--> There is only one customer from Lithuania. They purchased 35 items and then left 366 days ago. That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.

##### Saudi Arabia

In [84]:
SA_df = final_df.filter(col("Country") == "Saudi Arabia")
SA_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|12565     |544838   |
+----------+---------+



In [69]:
# Let's check the RFM features of the customers
SA_rfm_df = rfm_df.filter(col("CustomerID")=="12565")
SA_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|12565     |288    |9        |145.92  |1    |
+----------+-------+---------+--------+-----+



--> There is only one customer from Saudo Arabia. They purchased 9 items and then left 288 days ago. That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.

##### United Arab Emirates

In [None]:
UAE_df = final_df.filter(col("Country") == "United Arab Emirates")
UAE_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|17829     |543911   |
|12739     |565218   |
+----------+---------+



In [73]:
# Let's check the RFM features of the customers
SA_rfm_df = rfm_df.filter(col("CustomerID").isin("12739", "17829"))
SA_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|12739     |99     |37       |975.54  |1    |
|17829     |298    |30       |889.24  |1    |
+----------+-------+---------+--------+-----+



--> There are two customers from United Arab Emirates:
- 12739 purchases 37 items and leaves 99 days ago.
- 17829 purchases 298 items and leaves 298 days ago.

That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.

##### Lebanon

In [None]:
lebanon_df = final_df.filter(col("Country") == "Lebanon")
lebanon_df.select("CustomerID","InvoiceNo").distinct().show(truncate=False)

+----------+---------+
|CustomerID|InvoiceNo|
+----------+---------+
|12764     |542276   |
+----------+---------+



In [77]:
# Let's check the RFM features of the customers
lebanon_rfm_df = rfm_df.filter(col("CustomerID")=="12764")
lebanon_rfm_df.show(truncate=False)

+----------+-------+---------+--------+-----+
|CustomerID|Recency|Frequency|Monetary|Churn|
+----------+-------+---------+--------+-----+
|12764     |316    |45       |1693.88 |1    |
+----------+-------+---------+--------+-----+



--> There is only one customer from Lebanon. They purchased 45 items and then left 316 days ago. That’s why their RFM features don’t meet the churn threshold, with recency > 90 and frequency < 120.