In [1]:
# import pandas as pd
import databricks.koalas as kpd
import sklearn as sk
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import plotly.offline as pyoff
import plotly.graph_objs as go
from plotly.offline import plot
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("customer_segmentation") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#inititate Plotly
pyoff.init_notebook_mode()
# pd.options.mode.chained_assignment = None

df = kpd.read_csv("/FileStore/tables/master_table.csv")

print(df.dtypes)
print(df.head(5))

In [2]:
# Remove index column and change order date to datetime
df = df.drop(columns=['_c0'])

df['Order_Date_Id'] = kpd.to_datetime(df['Order_Date_Id'])
print(df.dtypes)
print(df.shape)

##### RFM Feature engineering #####

# Get only rows with a valid sales price value
df = df.query('Sales_Price != 0')
print(df.shape)

# Drop dupes
df = df.drop_duplicates()
print(df.shape)

In [3]:
### Recency ###

# Create df with unique customer IDs
df_cust = kpd.DataFrame(df['Customer_Id'].unique())
df_cust.columns = ['Customer_Id']
print(df_cust)

In [4]:
# Get the most recent date for each customer in seperate df
df_mp = df.groupby('Customer_Id').Order_Date_Id.max().reset_index()
df_mp.columns = ['Customer_Id','MostRecent']
print(df_mp.dtypes)

In [5]:
# Take most recent date in dataset to find recency for each observation
df_mp['MostRecentMax'] = df_mp.MostRecent.max()
print(df_mp['MostRecentMax'])

df_mp['Recency'] = (df_mp['MostRecentMax'] - df_mp['MostRecent'])/86400
df_mp['Recency'] = df_mp['Recency'].astype('int64')
print(df_mp)


# Merge dataframes
df_cust = kpd.merge(df_cust, df_mp[['Customer_Id','Recency']], on='Customer_Id')

df_cust.head()

Unnamed: 0,Customer_Id,Recency
0,833,7
1,148,304
2,471,516
3,496,7
4,463,236


In [6]:
# Recency histogram
print(df_cust['Recency'].plot.hist())

In [7]:
# Find optimal clusters for Recency score, using elbow method
# from sklearn.cluster import KMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Convert Koala dataframe to Spark dataframe
df_rec = df_cust.to_spark()

# Prepare for KMeans algo
rec_feat = ['Recency']
vecAssembler = VectorAssembler(inputCols=rec_feat, outputCol="features")
df_kmeansR = vecAssembler.transform(df_rec).select('Customer_Id','features')
cost = np.zeros(10)

In [8]:
# Fit and plot Elbow curve
for k in range(2,10):
    kmeansR = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    modelR = kmeansR.fit(df_kmeansR.sample(False,0.1, seed=42))
    cost[k] = modelR.computeCost(df_kmeansR)
    
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [9]:
# Build 3 clusters (elbow method) for Recency and add it to df
kpd.set_option('compute.ops_on_diff_frames', True)

# Predict clusters in spark dataframe, convert to koalas, and grab predictions
kmeansR = KMeans(k=3)
modelR = kmeansR.fit(df_kmeansR)
k_means_recency = modelR.transform(df_kmeansR).select('Customer_Id', 'prediction')
rows = k_means_recency.collect()
df_recency_pred = spark.createDataFrame(data=rows)
df_recency_pred = kpd.DataFrame(df_recency_pred)
df_cust['RecencyCluster'] = df_recency_pred['prediction']

kpd.reset_option('compute.ops_on_diff_frames')

In [10]:
# Function for ordering cluster numbers (0 cluster number is low recency, 2 is high recency)
def order_cluster(cluster_field_name, target_field_name, df, ascending):
    #new_cluster_field_name = 'new_' + cluster_field_name
    df_new = df.groupby(cluster_field_name)[target_field_name].mean().reset_index()
    df_new = df_new.sort_values(by = target_field_name ,ascending = ascending).reset_index(drop=True) 
    index = df_new.index.to_numpy()
    df_index = kpd.DataFrame(data = index, columns = ['index'])
    df_new = kpd.concat([df_new, df_index], axis = 1)
    # Take index of predictions 
    df_final = kpd.merge(df, df_new[[cluster_field_name,'index']], on = cluster_field_name)
    df_final = df_final.drop([cluster_field_name], axis=1)
    df_final = df_final.rename(columns = {"index":cluster_field_name})
    return df_final

df_cust = order_cluster('RecencyCluster', 'Recency', df_cust, False)
df_cust.describe()

Unnamed: 0,Customer_Id,Recency,RecencyCluster
count,542.0,542.0,542.0
mean,508.258303,155.725092,1.555351
std,287.91668,138.01839,0.657446
min,1.0,0.0,0.0
25%,255.0,76.0,1.0
50%,505.0,96.0,2.0
75%,762.0,238.0,2.0
max,999.0,567.0,2.0


In [11]:
### Frequency ###

# Order counts per customer
df_freq = df.groupby('Customer_Id').Order_Date_Id.count().reset_index()
df_freq.columns = ['Customer_Id','Frequency']

# Add to main df
df_cust = kpd.merge(df_cust, df_freq, on='Customer_Id')
print(df_cust)

In [12]:
# Frequency histogram
print(df_cust['Frequency'].plot.hist())

In [13]:
# Find optimal clusters for Frequency score, using elbow method

# Convert Koala dataframe to Spark dataframe
df_freq = df_cust.to_spark()

freq_feat = ['Frequency']
vecAssemblerF = VectorAssembler(inputCols=freq_feat, outputCol="features")
df_kmeansF = vecAssemblerF.transform(df_freq).select('Customer_Id','features')
cost = np.zeros(10)

In [14]:
# Fit and plot Elbow curve
for k in range(2,10):
    kmeansF = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    modelF = kmeansF.fit(df_kmeansF.sample(False,0.1, seed=42))
    cost[k] = modelF.computeCost(df_kmeansF)
    
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [15]:
# Build 3 clusters (elbow method) for Frequency and add it to df
kpd.set_option('compute.ops_on_diff_frames', True)

# Predict clusters in spark dataframe, convert to koalas, and grab predictions
kmeans = KMeans(k=3)
modelF = kmeans.fit(df_kmeansF)
k_means_frequency = modelF.transform(df_kmeansF).select('Customer_Id', 'prediction')
rows = k_means_frequency.collect()
df_frequency_pred = spark.createDataFrame(rows)
df_frequency_pred = kpd.DataFrame(df_frequency_pred)
df_cust['FrequencyCluster'] = df_frequency_pred['prediction']

kpd.reset_option('compute.ops_on_diff_frames')

# Order Frequency clusters
df_cust = order_cluster('FrequencyCluster', 'Frequency', df_cust, True)
print(df_cust)

# Summary about Frequency Clusters
df_cust.describe()

Unnamed: 0,Customer_Id,Recency,RecencyCluster,Frequency,FrequencyCluster
count,542.0,542.0,542.0,542.0,542.0
mean,508.258303,155.725092,1.555351,1.461255,0.444649
std,287.91668,138.01839,0.657446,0.699467,0.643235
min,1.0,0.0,0.0,1.0,0.0
25%,255.0,76.0,1.0,1.0,0.0
50%,505.0,96.0,2.0,1.0,0.0
75%,762.0,238.0,2.0,2.0,1.0
max,999.0,567.0,2.0,5.0,2.0


In [16]:
### Revenue ###

# Find revenue for each customer
df['Revenue'] = df['Sales_Price'] * df['Quantity']
df_rev = df.groupby('Customer_Id').Revenue.sum().reset_index()

# Add to main df
df_cust = kpd.merge(df_cust, df_rev, on='Customer_Id')
print(df_cust)

In [17]:
# Monetary Value histogram
print(df_cust['Revenue'].plot.hist())

In [18]:
# Find optimal clusters for Revenue score, using elbow method

# Convert Koala dataframe to Spark dataframe
df_rev = df_cust.to_spark()

rev_feat = ['Revenue']
vecAssemblerM = VectorAssembler(inputCols=rev_feat, outputCol="features")
df_kmeansM = vecAssemblerM.transform(df_rev).select('Customer_Id','features')
cost = np.zeros(10)

In [19]:
# Fit and plot Elbow curve
for k in range(2,10):
    kmeansM = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    modelM = kmeansM.fit(df_kmeansM.sample(False,0.1, seed=42))
    cost[k] = modelM.computeCost(df_kmeansM)
    
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')

In [20]:
# Build 3 clusters (elbow method) for Revenue and add it to df
kpd.set_option('compute.ops_on_diff_frames', True)

# Predict clusters in spark dataframe, convert to koalas, and grab predictions
kmeansM = KMeans(k=3)
modelM = kmeansM.fit(df_kmeansM)
k_means_revenue = modelM.transform(df_kmeansM).select('Customer_Id', 'prediction')
rows = k_means_revenue.collect()
df_revenue_pred = spark.createDataFrame(rows)
df_revenue_pred = kpd.DataFrame(df_revenue_pred)
df_cust['RevenueCluster'] = df_revenue_pred['prediction']

kpd.reset_option('compute.ops_on_diff_frames')

# Order Revenue clusters
df_cust = order_cluster('RevenueCluster', 'Revenue', df_cust, True)
print(df_cust)

# Summary about Revenue Clusters
df_cust.describe()

Unnamed: 0,Customer_Id,Recency,RecencyCluster,Frequency,FrequencyCluster,Revenue,RevenueCluster
count,542.0,542.0,542.0,542.0,542.0,542.0,542.0
mean,508.258303,155.725092,1.555351,1.461255,0.444649,2931.773801,0.012915
std,287.91668,138.01839,0.657446,0.699467,0.643235,12481.113026,0.142006
min,1.0,0.0,0.0,1.0,0.0,80.0,0.0
25%,255.0,76.0,1.0,1.0,0.0,460.0,0.0
50%,505.0,96.0,2.0,1.0,0.0,849.0,0.0
75%,762.0,238.0,2.0,2.0,1.0,1733.0,0.0
max,999.0,567.0,2.0,5.0,2.0,175281.0,2.0


In [21]:
# Get Overall Score and look at mean
df_cust['OverallScore'] = df_cust['RecencyCluster'] + df_cust['FrequencyCluster'] + df_cust['RevenueCluster']
RFM = ['Recency','Frequency','Revenue']
df_cust.groupby('OverallScore')[RFM].mean()

Unnamed: 0_level_0,Recency,Frequency,Revenue
OverallScore,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,500.765957,1.0,4325.061702
6,75.0,3.0,175281.0
5,82.0,5.0,115592.2
1,251.454545,1.018182,1039.069364
3,68.579365,2.015873,3862.03254
2,101.068807,1.133028,1681.51055
4,60.923077,3.179487,3266.340256


In [22]:
# Name buckets
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

df_cust = df_cust.to_spark()

def func(OverallScore):
    if OverallScore < 2:
        return 'Low-Value'
    if OverallScore >= 2 and OverallScore < 4:
        return 'Mid-Value'
    if OverallScore >= 4:  
        return 'High-Value'

func_udf = udf(func, StringType())
df_cust = df_cust.withColumn('Segment',func_udf(df_cust['OverallScore']))

df_cust = kpd.DataFrame(df_cust)

print(df_cust['Segment'].value_counts())