# Customer Segmentation with pyspark on Flo Dataset

📌 In this section, we will perform customer segmentation using pyspark in the Flo dataset.

# Business Problem

📌 FLO, an online shoe store, wants to segment its customers and determine marketing strategies according to these segments. For this, the behaviors of the customers will be defined and groups will be formed according to the clusters in these behaviors.

# Dataset Story

📌 The dataset consists of information obtained from the past shopping behaviors of customers who made their last purchases on OmniChannel (both online and offline) in 2020 - 2021.

📌 20,000 observations, 13 variables

master_id: Unique client number

order_channel : Which channel of the shopping platform is used (Android, ios, Desktop, Mobile, Offline)

last_order_channel : The channel where the last purchase was made

first_order_date : The date of the first purchase made by the customer

last_order_date : The date of the last purchase made by the customer

last_order_date_online : The date of the last purchase made by the customer on the online platform

last_order_date_offline : The date of the last purchase made by the customer on the offline platform

order_num_total_ever_online : The total number of purchases made by the customer on the online platform

order_num_total_ever_offline : Total number of purchases made by the customer offline

customer_value_total_ever_offline : The total price paid by the customer for offline purchases

customer_value_total_ever_online : The total price paid by the customer for their online shopping

interested_in_categories_12 : List of categories the customer has shopped in the last 12 months

store_type : It represents 3 different companies. If the person who shopped from company A made it from company B, it was written as A, B.


# Create Session in Spark

In [2]:
!pip install findspark
import findspark
findspark.init("C:\spark")
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
spark = SparkSession.builder \
    .master("local") \
    .appName("Flo_Segmentation") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()
sc = spark.sparkContext



In [3]:
sc

# Import Necesaary Libraries

In [82]:
import pandas as pd
pd.set_option("display.max_columns",None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 500)
pd.set_option("display.float_format", lambda x: '%.4f' % x)
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Import Dataset

In [42]:
df = (spark.read.format("csv")
      .option("header", True)
      .option("delimiter", "|")
      .option("inferSchema", True)
      .load("flo_100k.csv")
)
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: timestamp, store_type: string]

In [43]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],,,2021-05-05,A


In [44]:
df.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: timestamp (nullable = true)
 |-- store_type: string (nullable = true)



# Missing Value Analysis

In [11]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
        (F.col(col_name) == "NA")|
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
    ).count()
    return nc

In [12]:
null_count(df, "order_num_total_ever_online")

70784

In [15]:
def show_all_null(dataframe):
    for col_name in dataframe.dtypes:
        nc = null_count(dataframe, col_name[0])
        if nc > 0:
            print("{} ===> {} , Ratio: {:.2f}".format(col_name[0], nc, (nc/dataframe.count())*100))

In [16]:
show_all_null(df)

last_order_date_online ===> 70784 , Ratio: 70.78
last_order_date_offline ===> 21703 , Ratio: 21.70
order_num_total_ever_online ===> 70784 , Ratio: 70.78
order_num_total_ever_offline ===> 21703 , Ratio: 21.70
interested_in_categories_12 ===> 56590 , Ratio: 56.59
online_product_group_amount_top_name_12 ===> 88295 , Ratio: 88.30
offline_product_group_name_12 ===> 77209 , Ratio: 77.21


In [48]:
# Assigning zeros to the null values of the required variables
df = df.na.fill(value=0, subset=["order_num_total_ever_online","order_num_total_ever_offline",
                                 "customer_value_total_ever_offline","customer_value_total_ever_online"])

In [49]:
show_all_null(df)

last_order_date_online ===> 70784 , Ratio: 70.78
last_order_date_offline ===> 21703 , Ratio: 21.70
interested_in_categories_12 ===> 56590 , Ratio: 56.59
online_product_group_amount_top_name_12 ===> 88295 , Ratio: 88.30
offline_product_group_name_12 ===> 77209 , Ratio: 77.21


In [50]:
df = df.filter((df.order_num_total_ever_online >= 0) &
                (df.order_num_total_ever_offline >= 0) &
                (df.customer_value_total_ever_offline >= 0) &
                (df.customer_value_total_ever_online >= 0))

In [52]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A


# General Information About to the Dataset

In [24]:
print((df.count(), len(df.columns)))

(100000, 17)


In [21]:
def distinct_df(dataframe, col_name):
    if dataframe.select(col_name).distinct().count() == dataframe.count():
        print("Yes, users dataframe is distinct")
    else:
        print("No, users dataframe is not distinct")

In [22]:
distinct_df(df, "master_id")

Yes, users dataframe is distinct


In [23]:
# show total platform type and order channel sales
df.groupby(["platform_type","order_channel"]).count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|  OmniChannel|      Offline| 4793|
|       Online|      Ios App| 3008|
|      Offline|      Offline|65991|
|       Online|       Mobile| 6451|
|       Online|      Desktop| 3253|
|       Online|  Android App| 8728|
|  OmniChannel|       Mobile| 2061|
|  OmniChannel|  Android App| 3261|
|  OmniChannel|      Ios App|  956|
|  OmniChannel|      Desktop| 1498|
+-------------+-------------+-----+



In [53]:
# create order_num_total and customer_value_total variables
df = df.withColumn("order_num_total", df.order_num_total_ever_online + df.order_num_total_ever_offline)
df  =df.withColumn("customer_value_total", df.customer_value_total_ever_offline + df.customer_value_total_ever_online)

In [56]:
# sales values by order channel
df.groupby("order_channel").agg({"master_id": "count",
                                "order_num_total": "mean",
                                "customer_value_total": "mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|order_channel|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           70784|        218.1639484913738|  1.6003475361663653|
|      Ios App|            3964|        568.4640312815278|   3.377648839556004|
|       Mobile|            8512|        391.7418761748324|   2.798637218045113|
|      Desktop|            4751|       376.91355504103205|   2.538623447695222|
|  Android App|           11989|         532.846284093796|    3.50971724080407|
+-------------+----------------+-------------------------+--------------------+



In [57]:
# sales values by platform
df.groupby("platform_type").agg({"master_id": "count",
                                "order_num_total": "mean",
                                "customer_value_total": "mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|platform_type|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           65991|       215.73030686020934|  1.5837917291751906|
|  OmniChannel|           12569|        500.7937512929094|  3.4947092051873656|
|       Online|           21440|        404.7896576493302|  2.6207089552238805|
+-------------+----------------+-------------------------+--------------------+



# Create RFM(Recency, Frequency & Monetary)

In [58]:
# create RFM 
rfm = df[["master_id","last_order_date_new","order_num_total","customer_value_total"]]
rfm.show(5)

+--------------------+-------------------+---------------+--------------------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|
+--------------------+-------------------+---------------+--------------------+
|b3ace094-a17f-11e...|2019-02-23 00:00:00|            1.0|              212.98|
|c57d7c4c-a950-11e...|2019-12-01 00:00:00|            1.0|              199.98|
|602897a6-cdac-11e...|2020-07-24 00:00:00|            1.0|              140.49|
|388e4c4e-af86-11e...|2018-12-31 00:00:00|            1.0|              174.99|
|80664354-adf0-11e...|2021-05-05 00:00:00|            2.0|              283.95|
+--------------------+-------------------+---------------+--------------------+
only showing top 5 rows



In [68]:
# select last_date
last_order_date = df.agg({"last_order_date":"max"}).collect()[0][0]
last_order_date.date()

datetime.date(2021, 5, 30)

In [72]:
# create Recency
rfm = rfm.withColumn("Recency",F.expr("datediff('2021-6-1', last_order_date_new)"))
rfm.show(5)

+--------------------+-------------------+---------------+--------------------+-------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|Recency|
+--------------------+-------------------+---------------+--------------------+-------+
|b3ace094-a17f-11e...|2019-02-23 00:00:00|            1.0|              212.98|    829|
|c57d7c4c-a950-11e...|2019-12-01 00:00:00|            1.0|              199.98|    548|
|602897a6-cdac-11e...|2020-07-24 00:00:00|            1.0|              140.49|    312|
|388e4c4e-af86-11e...|2018-12-31 00:00:00|            1.0|              174.99|    883|
|80664354-adf0-11e...|2021-05-05 00:00:00|            2.0|              283.95|     27|
+--------------------+-------------------+---------------+--------------------+-------+
only showing top 5 rows



In [76]:
# Renamed variables
rfm = (rfm.withColumnRenamed("order_num_total", "Frequency")
          .withColumnRenamed("customer_value_total", "Monetary"))
rfm.show(5)

+--------------------+-------------------+---------+--------+-------+
|           master_id|last_order_date_new|Frequency|Monetary|Recency|
+--------------------+-------------------+---------+--------+-------+
|b3ace094-a17f-11e...|2019-02-23 00:00:00|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|2019-12-01 00:00:00|      1.0|  199.98|    548|
|602897a6-cdac-11e...|2020-07-24 00:00:00|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|2018-12-31 00:00:00|      1.0|  174.99|    883|
|80664354-adf0-11e...|2021-05-05 00:00:00|      2.0|  283.95|     27|
+--------------------+-------------------+---------+--------+-------+
only showing top 5 rows



In [77]:
# drop last_order_date_new
rfm = rfm.drop("last_order_date_new")
rfm.show(5)

+--------------------+---------+--------+-------+
|           master_id|Frequency|Monetary|Recency|
+--------------------+---------+--------+-------+
|b3ace094-a17f-11e...|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|      1.0|  199.98|    548|
|602897a6-cdac-11e...|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|      1.0|  174.99|    883|
|80664354-adf0-11e...|      2.0|  283.95|     27|
+--------------------+---------+--------+-------+
only showing top 5 rows



# Create Model

In [79]:
rfm_col = ["Recency", "Frequency", "Monetary"]
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(rfm_col).setOutputCol("unscaled_features")
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

In [80]:
# create and fit pipeline model
pipeline_obj = Pipeline().setStages([assembler, scaler])
pipeline_model = pipeline_obj.fit(rfm)

In [81]:
# show transform pipeline dataframe
transform_df = pipeline_model.transform(rfm)
transform_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[829.0, 1.0, 212.98]","[3.2479381193526153, 0.3277012075046895, 0.459..."
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[548.0, 1.0, 199.98]","[2.147008551755408, 0.3277012075046895, 0.4318..."
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[312.0, 1.0, 140.49]","[1.2223844309264367, 0.3277012075046895, 0.303..."
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[883.0, 1.0, 174.99]","[3.4595046554744986, 0.3277012075046895, 0.377..."
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[27.0, 2.0, 283.95]","[0.10578326806094163, 0.655402415009379, 0.613..."


In [83]:
# compute k-means model for k clusters
def compute_kmeans_model(dataframe, k):
    kmeans_object = KMeans() \
        .setSeed(123) \
        .setK(k)
    
    return kmeans_object.fit(dataframe)

In [84]:
# create evaluator object
evaluator = ClusteringEvaluator()

In [87]:
# find Silhouette score
score_k_list = []
for k in range(2, 11):
    kmeans_model = compute_kmeans_model(transform_df, k)
    transformed_df = kmeans_model.transform(transform_df)
    score_k = evaluator.evaluate(transformed_df)
    score_k_list.append(score_k)
    print("k: {}, score: {:.3f}".format(k, score_k))

k: 2, score: 0.440
k: 3, score: 0.655
k: 4, score: 0.672
k: 5, score: 0.502
k: 6, score: 0.542
k: 7, score: 0.549
k: 8, score: 0.572
k: 9, score: 0.570
k: 10, score: 0.550


In [98]:
# find optimum k 
def find_optimum_k(transform_df, k_range):
    score_k_list = []
    for k in range(2, k_range):
        kmeans_model = compute_kmeans_model(transform_df, k)
        transformed_df = kmeans_model.transform(transform_df)
        score_k = evaluator.evaluate(transformed_df)
        score_k_list.append(score_k)
    optimum_k_value = max(score_k_list)
    optimum_k_index = score_k_list.index(optimum_k_value) + 2
    print("Optimum k value: {:.3f} , and index: {}".format(optimum_k_value, optimum_k_index))
    return optimum_k_index

In [99]:
optimum_k = find_optimum_k(transform_df, 11)

Optimum k value: 0.672 , and index: 4


In [100]:
# optimum_k = 4
kmeans_model = compute_kmeans_model(transform_df, optimum_k)

In [101]:
# prediction
transformed_df = kmeans_model.transform(transform_df)
transformed_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[829.0, 1.0, 212.98]","[3.2479381193526153, 0.3277012075046895, 0.459...",0
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[548.0, 1.0, 199.98]","[2.147008551755408, 0.3277012075046895, 0.4318...",0
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[312.0, 1.0, 140.49]","[1.2223844309264367, 0.3277012075046895, 0.303...",1
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[883.0, 1.0, 174.99]","[3.4595046554744986, 0.3277012075046895, 0.377...",0
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[27.0, 2.0, 283.95]","[0.10578326806094163, 0.655402415009379, 0.613...",1


In [102]:
transformed_df = transformed_df.withColumn("prediction", (transformed_df["prediction"] + 1))
transformed_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[829.0, 1.0, 212.98]","[3.2479381193526153, 0.3277012075046895, 0.459...",1
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[548.0, 1.0, 199.98]","[2.147008551755408, 0.3277012075046895, 0.4318...",1
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[312.0, 1.0, 140.49]","[1.2223844309264367, 0.3277012075046895, 0.303...",2
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[883.0, 1.0, 174.99]","[3.4595046554744986, 0.3277012075046895, 0.377...",1
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[27.0, 2.0, 283.95]","[0.10578326806094163, 0.655402415009379, 0.613...",2


In [103]:
transformed_df.select("master_id", "prediction").limit(10).toPandas()

Unnamed: 0,master_id,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1
4,80664354-adf0-11eb-8f64-000d3a299ebf,2
5,47511f36-aeb4-11e9-a2fc-000d3a38a36f,1
6,77f7c318-3407-11eb-9a15-000d3a38a36f,2
7,399d6dd2-ecf1-11ea-9369-000d3a38a36f,2
8,b3d4a6f2-a368-11e9-a2fc-000d3a38a36f,2
9,42fdccd4-f1d1-11ea-bde9-000d3a38a36f,2


In [104]:
transformed_df.select("prediction").distinct().count()

4

In [108]:
# descriptive statistics
transformed_df.groupby("prediction").agg(F.count("Monetary").alias("count"),
                                        F.mean("Monetary").alias("avg_monetary"),
                                        F.mean("Recency").alias("avg_recency"),
                                        F.mean("Frequency").alias("avg_frequency")).sort(F.desc("avg_monetary")).show()

+----------+-----+------------------+------------------+------------------+
|prediction|count|      avg_monetary|       avg_recency|     avg_frequency|
+----------+-----+------------------+------------------+------------------+
|         3|    8|26985.673750000005|           324.875|           196.875|
|         4| 3586|1653.0847044060242|191.23006134969324|10.237032905744563|
|         2|42227| 315.6817353826911|173.59634830795463| 2.082790631586426|
|         1|54179|179.68744015223677| 615.3929566806327|1.4469443880470292|
+----------+-----+------------------+------------------+------------------+

