# Table of Contents
  
* [Import Packages](#ip)
* [Read Data From Azure Data Lake Storage Gen2](#rd)
* [Data Cleaning](#dc)
    * [Orders](#o)
    * [People](#p)
    * [Returns](#r)
* [Create Customer Summary Table](#cust)
* [Acquisition and Churn Analysis](#ac_churn)
* [Customer Segmentation using K-means Clustering and RFM Analysis](#cluster)
    * [Preprocessing](#preprocess)
    * [Model Selection and Model Fitting](#model)
* [Load Transformed Data into Delta Lake](#load)

# Import Packages <a id="ip"></a>

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np

# Read Data From Azure Data Lake Storage Gen2 <a id="rd"></a>

In [0]:
# connect to Azure Data Lake Storage Gen2 using key vault
service_credential = dbutils.secrets.get(scope="key-vault-secret",key="etl")

spark.conf.set("fs.azure.account.auth.type.customeranalysis.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.customeranalysis.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.customeranalysis.dfs.core.windows.net", 
               "887234cf-3091-413a-8188-77ef57a713d8")
spark.conf.set("fs.azure.account.oauth2.client.secret.customeranalysis.dfs.core.windows.net", 
               service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.customeranalysis.dfs.core.windows.net", 
               "https://login.microsoftonline.com/c8097509-77c9-43b0-98e5-fcf963e5156f/oauth2/token")

In [0]:
# read data
orders = spark.read.format('csv').options(header=True, escape='"', inferSchema=True)\
  .load("abfss://data-source@customeranalysis.dfs.core.windows.net/raw-data/orders.csv")
people = spark.read.format('csv').options(header=True, escape='"', inferSchema=True)\
  .load("abfss://data-source@customeranalysis.dfs.core.windows.net/raw-data/people.csv")
returns = spark.read.format('csv').options(header=True, escape='"', inferSchema=True)\
  .load("abfss://data-source@customeranalysis.dfs.core.windows.net/raw-data/returns.csv")

# Data Cleaning <a id="dc"></a>

In [0]:
def check_missing_value(df):
    temp = df.select([F.sum(F.when(df[c].isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).collect()[0]
    if temp.count(0) == len(df.columns):
        print('No missing value')
    else:
        return temp

## Orders <a id="o"></a>

In [0]:
# inspect schema
orders.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State/Province: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
#drop row id
orders = orders.drop('Row ID')
orders.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State/Province: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
# check missing value
check_missing_value(orders)

No missing value


In [0]:
# Drop duplicated rows
orders = orders.dropDuplicates()

In [0]:
orders = orders.drop('Product Name')

## People <a id="p"></a>

In [0]:
#people has only 4 rows with no duplicates and no missing value
people.printSchema()
people.show(5)

root
 |-- Regional Manager: string (nullable = true)
 |-- Region: string (nullable = true)

+-----------------+-------+
| Regional Manager| Region|
+-----------------+-------+
|  Sadie Pawthorne|   West|
|      Chuck Magee|   East|
|Roxanne Rodriguez|Central|
|      Fred Suzuki|  South|
+-----------------+-------+



## Returns <a id="r"></a>

In [0]:
# inspect schema
returns.printSchema()
returns.show(5)

root
 |-- Returned: string (nullable = true)
 |-- Order ID: string (nullable = true)

+--------+--------------+
|Returned|      Order ID|
+--------+--------------+
|     Yes|US-2020-100762|
|     Yes|US-2020-100762|
|     Yes|US-2020-100762|
|     Yes|US-2020-100762|
|     Yes|US-2020-100867|
+--------+--------------+
only showing top 5 rows



In [0]:
#convert to indicator variable
returns = returns.withColumn('Returned', F.lit(1))
returns.printSchema()

root
 |-- Returned: integer (nullable = false)
 |-- Order ID: string (nullable = true)



In [0]:
# check missing value
check_missing_value(returns)

No missing value


In [0]:
# Drop duplicated rows
returns = returns.dropDuplicates()

# Create Customer Summary Table <a id="cust"></a>

In [0]:
# compute summary statistics for each customer
customers = orders.groupBy('Customer ID').agg(F.min('Order Date').alias('first_date'), 
                                              F.max('Order Date').alias('last_date'), 
                                              F.sum('Sales').alias('total_revenue'), 
                                              F.sum('Profit').alias('total_profit'),
                                              F.count_distinct('Order ID').alias('total_num_orders'))
customers.show(10)

+-----------+----------+----------+------------------+------------------+----------------+
|Customer ID|first_date| last_date|     total_revenue|      total_profit|total_num_orders|
+-----------+----------+----------+------------------+------------------+----------------+
|   VW-21775|2020-05-11|2023-12-02|6134.0380000000005|-874.6645000000001|              10|
|   PB-19210|2020-11-23|2022-02-11|           132.738|            21.897|               2|
|   RR-19315|2022-11-14|2023-12-11| 615.9319999999999|-73.83499999999997|               2|
|   MS-17530|2020-11-04|2022-10-22|           475.656| 84.02679999999995|               4|
|   EM-13960|2021-10-01|2023-08-01| 933.7040000000001|102.29600000000002|               5|
|   MY-17380|2021-10-26|2023-12-01|          2254.285|319.12109999999996|               7|
|   AH-10690|2020-04-02|2022-11-15| 7888.293999999999|1298.0165999999992|              12|
|   BD-11500|2020-02-01|2023-11-25|          4411.243|1142.1225000000002|               6|

In [0]:
# when did the latest transaction happen
orders.select(F.max('Order Date')).show()

+---------------+
|max(Order Date)|
+---------------+
|     2023-12-30|
+---------------+



In [0]:
# compute customer lifespan, duration_in_days
# compute transaction recency, days_since_last_order
# Normally, when computing recency, we should use F.current_date() instead of hard coding, 
# but let's assume we are now at 2023-12-31, because the dataset only contains record up to 2023-12-30.
customers = customers.withColumns({'duration_in_days': F.date_diff('last_date', 'first_date') + 1, 
                                   'days_since_last_order': F.date_diff(F.lit('2023-12-31').cast('date'), 'last_date')})
customers.show(10)

+-----------+----------+----------+------------------+------------------+----------------+----------------+---------------------+
|Customer ID|first_date| last_date|     total_revenue|      total_profit|total_num_orders|duration_in_days|days_since_last_order|
+-----------+----------+----------+------------------+------------------+----------------+----------------+---------------------+
|   VW-21775|2020-05-11|2023-12-02|6134.0380000000005|-874.6645000000001|              10|            1301|                   29|
|   PB-19210|2020-11-23|2022-02-11|           132.738|            21.897|               2|             446|                  688|
|   RR-19315|2022-11-14|2023-12-11| 615.9319999999999|-73.83499999999997|               2|             393|                   20|
|   MS-17530|2020-11-04|2022-10-22|           475.656| 84.02679999999995|               4|             718|                  435|
|   EM-13960|2021-10-01|2023-08-01| 933.7040000000001|102.29600000000002|               5|

In [0]:
# count number of orders for each customer and each year
num_orders_per_customer_and_year = orders.groupBy(['Customer ID', 
                                                   F.year(F.col('Order Date')).alias('order_year')])\
                                         .agg(F.count_distinct('Order ID').alias('num_orders'))
# compute average number of yearly orders for each customer
avg_num_order = num_orders_per_customer_and_year.groupBy('Customer ID')\
                                                .agg(F.avg('num_orders')\
                                                .alias('avg_num_orders_per_year'))
# check maximum and minimum of average number of yearly orders
avg_num_order.select(F.max('avg_num_orders_per_year'), F.min('avg_num_orders_per_year')).show()

+----------------------------+----------------------------+
|max(avg_num_orders_per_year)|min(avg_num_orders_per_year)|
+----------------------------+----------------------------+
|                         5.0|                         1.0|
+----------------------------+----------------------------+



In [0]:
customers = customers.join(avg_num_order, 'Customer ID', 'inner').withColumnRenamed('Customer ID', 'cust_id')
customers.show(10)

+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+
| cust_id|first_date| last_date|     total_revenue|      total_profit|total_num_orders|duration_in_days|days_since_last_order|avg_num_orders_per_year|
+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+
|VW-21775|2020-05-11|2023-12-02|6134.0380000000005|-874.6645000000001|              10|            1301|                   29|                    2.5|
|PB-19210|2020-11-23|2022-02-11|           132.738|            21.897|               2|             446|                  688|                    1.0|
|RR-19315|2022-11-14|2023-12-11| 615.9319999999999|-73.83499999999997|               2|             393|                   20|                    1.0|
|MS-17530|2020-11-04|2022-10-22|           475.656| 84.02679999999995|               4|       

# Acquisition and Churn Analysis <a id="ac_churn"></a>

In [0]:
# a customer is considered as churned after <churn_window> days of inactivity, i.e. no transaction
churn_window = 365
customers = customers.withColumn('churned_at', 
                                 F.when(F.date_add(F.col('last_date'), churn_window) > F.lit('2023-12-31').cast('date'), None)\
                                  .otherwise(F.date_add(F.col('last_date'), churn_window)))
customers.show(10)

+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+----------+
| cust_id|first_date| last_date|     total_revenue|      total_profit|total_num_orders|duration_in_days|days_since_last_order|avg_num_orders_per_year|churned_at|
+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+----------+
|VW-21775|2020-05-11|2023-12-02|6134.0380000000005|-874.6645000000001|              10|            1301|                   29|                    2.5|      NULL|
|PB-19210|2020-11-23|2022-02-11|           132.738|            21.897|               2|             446|                  688|                    1.0|2023-02-11|
|RR-19315|2022-11-14|2023-12-11| 615.9319999999999|-73.83499999999997|               2|             393|                   20|                    1.0|      NULL|
|MS-17530|2020-11-04|2022-10

In [0]:
# select the columns we need for computing acquisition and churn
temp = customers.select('cust_id', 'first_date', 'last_date', 'churned_at')
temp.show(10)

+--------+----------+----------+----------+
| cust_id|first_date| last_date|churned_at|
+--------+----------+----------+----------+
|VW-21775|2020-05-11|2023-12-02|      NULL|
|PB-19210|2020-11-23|2022-02-11|2023-02-11|
|RR-19315|2022-11-14|2023-12-11|      NULL|
|MS-17530|2020-11-04|2022-10-22|2023-10-22|
|EM-13960|2021-10-01|2023-08-01|      NULL|
|MY-17380|2021-10-26|2023-12-01|      NULL|
|BD-11500|2020-02-01|2023-11-25|      NULL|
|SW-20275|2020-03-10|2023-11-30|      NULL|
|AH-10690|2020-04-02|2022-11-15|2023-11-15|
|KH-16630|2020-11-01|2023-11-02|      NULL|
+--------+----------+----------+----------+
only showing top 10 rows



In [0]:
# number of churns by year_month
churn = temp.dropna()\
            .groupBy(F.date_trunc('month', 'churned_at').cast('date').alias('year_month'))\
            .agg(F.count('cust_id').alias('num_churned'))\
            .orderBy('year_month')
churn.show(10)

+----------+-----------+
|year_month|num_churned|
+----------+-----------+
|2021-10-01|          1|
|2021-11-01|          1|
|2021-12-01|          1|
|2022-03-01|          1|
|2022-04-01|          1|
|2022-08-01|          1|
|2022-09-01|          2|
|2022-10-01|          2|
|2022-11-01|          7|
|2022-12-01|          3|
+----------+-----------+
only showing top 10 rows



In [0]:
# number of acquisition by year_month
acquisition = temp.groupBy(F.date_trunc('month', 'first_date').cast('date').alias('year_month'))\
                  .agg(F.count('cust_id').alias('num_acquired'))\
                  .orderBy('year_month')
acquisition.show(10)

+----------+------------+
|year_month|num_acquired|
+----------+------------+
|2020-01-01|          33|
|2020-02-01|          24|
|2020-03-01|          66|
|2020-04-01|          55|
|2020-05-01|          57|
|2020-06-01|          49|
|2020-07-01|          44|
|2020-08-01|          52|
|2020-09-01|          68|
|2020-10-01|          43|
+----------+------------+
only showing top 10 rows



In [0]:
# compute net customer gain, number of customers, and churn rate for each year_month
window = Window.orderBy('year_month').rowsBetween(Window.unboundedPreceding, Window.currentRow)
ts = acquisition.join(churn, 'year_month', 'full')\
                .orderBy('year_month')\
                .withColumns({'num_acquired': F.when(F.col('num_acquired').isNull(), 0).otherwise(F.col('num_acquired')), 
                              'num_churned': F.when(F.col('num_churned').isNull(), 0).otherwise(F.col('num_churned'))})\
                .withColumn('net_cust_gain', F.col('num_acquired')-F.col('num_churned'))\
                .withColumn('num_cust', F.sum(F.col('net_cust_gain')).over(window))\
                .withColumn('churn_rate_in_percent', 
                            F.round(F.col('num_churned')/F.lag('num_cust', 1).over(Window.orderBy('year_month'))*100, 2))
ts.show(50)

+----------+------------+-----------+-------------+--------+---------------------+
|year_month|num_acquired|num_churned|net_cust_gain|num_cust|churn_rate_in_percent|
+----------+------------+-----------+-------------+--------+---------------------+
|2020-01-01|          33|          0|           33|      33|                 NULL|
|2020-02-01|          24|          0|           24|      57|                  0.0|
|2020-03-01|          66|          0|           66|     123|                  0.0|
|2020-04-01|          55|          0|           55|     178|                  0.0|
|2020-05-01|          57|          0|           57|     235|                  0.0|
|2020-06-01|          49|          0|           49|     284|                  0.0|
|2020-07-01|          44|          0|           44|     328|                  0.0|
|2020-08-01|          52|          0|           52|     380|                  0.0|
|2020-09-01|          68|          0|           68|     448|                  0.0|
|202

# Customer Segmentation using K-means Clustering and RFM Analysis <a id="cluster"></a>

## Preprocessing <a id="preprocess"></a>

In [0]:
# select the features we need (recency, frequency, monetary value) for customer segmentation
rfm_data = customers.select('cust_id', 'days_since_last_order', 'avg_num_orders_per_year', 'total_profit')
rfm_data.show(10)

+--------+---------------------+-----------------------+------------------+
| cust_id|days_since_last_order|avg_num_orders_per_year|      total_profit|
+--------+---------------------+-----------------------+------------------+
|VW-21775|                   29|                    2.5|-874.6645000000001|
|RR-19315|                   20|                    1.0|-73.83499999999995|
|PB-19210|                  688|                    1.0|            21.897|
|MS-17530|                  435|     1.3333333333333333| 84.02679999999998|
|EM-13960|                  152|                    2.5|102.29600000000002|
|MY-17380|                   30|     2.3333333333333335|319.12109999999996|
|BD-11500|                   36|                    2.0|         1142.1225|
|AH-10690|                  411|                    4.0|1298.0165999999995|
|SW-20275|                   31|                    2.0|          332.8725|
|KH-16630|                   59|                   2.25|          727.3786|
+--------+--

In [0]:
# feature processing
vecAssembler = VectorAssembler(inputCols = ['days_since_last_order', 'avg_num_orders_per_year', 'total_profit'], outputCol='features')
rfm_data = vecAssembler.transform(rfm_data)
rfm_data.show(10)

+--------+---------------------+-----------------------+------------------+--------------------+
| cust_id|days_since_last_order|avg_num_orders_per_year|      total_profit|            features|
+--------+---------------------+-----------------------+------------------+--------------------+
|VW-21775|                   29|                    2.5|-874.6645000000001|[29.0,2.5,-874.66...|
|RR-19315|                   20|                    1.0|-73.83499999999995|[20.0,1.0,-73.834...|
|PB-19210|                  688|                    1.0|            21.897|  [688.0,1.0,21.897]|
|MS-17530|                  435|     1.3333333333333333| 84.02679999999998|[435.0,1.33333333...|
|EM-13960|                  152|                    2.5|102.29600000000002|[152.0,2.5,102.29...|
|MY-17380|                   30|     2.3333333333333335|319.12109999999996|[30.0,2.333333333...|
|BD-11500|                   36|                    2.0|         1142.1225|[36.0,2.0,1142.1225]|
|AH-10690|                  41

In [0]:
# feature scaling
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
model = scaler.fit(rfm_data)
rfm_data = model.transform(rfm_data)
rfm_data.show(10)

+--------+---------------------+-----------------------+------------------+--------------------+--------------------+
| cust_id|days_since_last_order|avg_num_orders_per_year|      total_profit|            features|     scaled_features|
+--------+---------------------+-----------------------+------------------+--------------------+--------------------+
|VW-21775|                   29|                    2.5|-874.6645000000001|[29.0,2.5,-874.66...|[0.15630755307589...|
|RR-19315|                   20|                    1.0|-73.83499999999995|[20.0,1.0,-73.834...|[0.10779831246613...|
|PB-19210|                  688|                    1.0|            21.897|  [688.0,1.0,21.897]|[3.70826194883514...|
|MS-17530|                  435|     1.3333333333333333| 84.02679999999998|[435.0,1.33333333...|[2.34461329613849...|
|EM-13960|                  152|                    2.5|102.29600000000002|[152.0,2.5,102.29...|[0.81926717474264...|
|MY-17380|                   30|     2.3333333333333335|

## Model Selection and Model Fitting <a id="model"></a>

In [0]:
# decide the number of cluster using Silhouette score
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='scaled_features')
scores = []
# repartition for efficiency
rfm_data = rfm_data.repartition(10)
# set random seed
seed = 12345
# list of potential cluster number, 1 and 2 are discarded as they are not very informative
ks = list(range(3, 10))

for k in ks: 
    kmeans = KMeans(featuresCol='scaled_features', k=k, seed=seed) 
    model = kmeans.fit(rfm_data) 
    preds = model.transform(rfm_data)
    score = evaluator.evaluate(preds)
    scores.append(score)
    print(f'Silhouette Score for k = {k} is {score}')

# record the best number of cluster
best_k = ks[np.argmax(scores)]

Silhouette Score for k = 3 is 0.4794399225833959
Silhouette Score for k = 4 is 0.41584983809771275
Silhouette Score for k = 5 is 0.5238644562175873
Silhouette Score for k = 6 is 0.48980065185434263
Silhouette Score for k = 7 is 0.47560947340326315
Silhouette Score for k = 8 is 0.45034967395821696
Silhouette Score for k = 9 is 0.47110414310160775


In [0]:
# fit the model using best k
kmeans = KMeans(featuresCol='scaled_features', k=best_k, seed=seed)
model = kmeans.fit(rfm_data)
preds = model.transform(rfm_data)
preds.show(10)

+--------+---------------------+-----------------------+------------------+--------------------+--------------------+----------+
| cust_id|days_since_last_order|avg_num_orders_per_year|      total_profit|            features|     scaled_features|prediction|
+--------+---------------------+-----------------------+------------------+--------------------+--------------------+----------+
|JF-15565|                   55|                    2.0|         1073.2088|[55.0,2.0,1073.2088]|[0.29644535928187...|         1|
|SC-20050|                   41|                   2.25|1011.7442999999996|[41.0,2.25,1011.7...|[0.22098654055558...|         0|
|LB-16795|                  153|     2.6666666666666665|-40.13220000000007|[153.0,2.66666666...|[0.82465709036595...|         0|
|JK-15625|                   62|     2.3333333333333335| 750.9484999999999|[62.0,2.333333333...|[0.33417476864502...|         0|
|JD-16060|                  301|                    1.0|          111.1141|[301.0,1.0,111.1141]|[

In [0]:
# join the cluster back to the summary table of customer
customers = customers.join(preds.select('cust_id', 'prediction').withColumnRenamed('prediction', 'cluster'), 'cust_id', 'inner')
customers.show(10)

+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+----------+-------+
| cust_id|first_date| last_date|     total_revenue|      total_profit|total_num_orders|duration_in_days|days_since_last_order|avg_num_orders_per_year|churned_at|cluster|
+--------+----------+----------+------------------+------------------+----------------+----------------+---------------------+-----------------------+----------+-------+
|VW-21775|2020-05-11|2023-12-02|6134.0380000000005|-874.6645000000001|              10|            1301|                   29|                    2.5|      NULL|      0|
|PB-19210|2020-11-23|2022-02-11|           132.738|            21.897|               2|             446|                  688|                    1.0|2023-02-11|      4|
|RR-19315|2022-11-14|2023-12-11| 615.9319999999999|-73.83499999999997|               2|             393|                   20|                    1.0|

# Load Transformed Data into Delta Lake <a id="load"></a>

In [0]:
# final adjustment to make column names nicely formatted
def rename_columns(df):
    return df.select([df[c].alias(c.lower().replace(' ', '_').replace('/', '_').replace('-', '_')) for c in df.columns])

orders = rename_columns(orders)
people = rename_columns(people)
returns = rename_columns(returns)
ts = rename_columns(ts)
customers = rename_columns(customers)

In [0]:
orders.write.format("delta").mode('overwrite').saveAsTable("orders")
people.write.format("delta").mode('overwrite').saveAsTable("people")
returns.write.format("delta").mode('overwrite').saveAsTable("returns")
ts.write.format("delta").mode('overwrite').saveAsTable("ts")
customers.write.format("delta").mode('overwrite').saveAsTable("customers")