In [203]:
import findspark
findspark.init()

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import *

from pyspark.sql.functions import col

In [2]:
%cd "C:\Users\khaih\Downloads\Project 3\Data"

C:\Users\khaih\Downloads\Project 3\Data


In [3]:
spark = SparkSession.builder.appName('KMeans_RFM').getOrCreate()

In [4]:
data = spark.read.csv("RFM_data.csv", header=True,
                      inferSchema=True)

In [5]:
data.show(3)

+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+
|customer_id|Recency|Frequency|Monetary|  R|  F|  M|RFM_Segment|RFM_Score|RFM_Level|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+
|       7592|      1|      201|13990.93|  4|  4|  4|        444|       12|    STARS|
|      14048|      0|      217| 8976.33|  4|  4|  4|        444|       12|    STARS|
|       7983|      5|      149| 6973.07|  4|  4|  4|        444|       12|    STARS|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+
only showing top 3 rows



In [6]:
data=data.filter(col("Monetary")>0)

In [7]:
data.count()

23502

In [8]:
data.dropna().count()

23502

In [9]:
vec_assembler = VectorAssembler(inputCols = ["Recency","Frequency","Monetary"], outputCol='features')

In [10]:
final_data = vec_assembler.transform(data)

In [11]:
final_data.show(3)

+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+
|customer_id|Recency|Frequency|Monetary|  R|  F|  M|RFM_Segment|RFM_Score|RFM_Level|            features|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+
|       7592|      1|      201|13990.93|  4|  4|  4|        444|       12|    STARS|[1.0,201.0,13990.93]|
|      14048|      0|      217| 8976.33|  4|  4|  4|        444|       12|    STARS| [0.0,217.0,8976.33]|
|       7983|      5|      149| 6973.07|  4|  4|  4|        444|       12|    STARS| [5.0,149.0,6973.07]|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+
only showing top 3 rows



## Chuẩn hoá dữ liệu trước khi chạy Kmeans

In [12]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [13]:
ScalerModel=scaler.fit(final_data)

In [14]:
final_data =ScalerModel.transform(final_data)

In [15]:
final_data.show(3, truncate=True)

+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+--------------------+
|customer_id|Recency|Frequency|Monetary|  R|  F|  M|RFM_Segment|RFM_Score|RFM_Level|            features|      scaledFeatures|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+--------------------+
|       7592|      1|      201|13990.93|  4|  4|  4|        444|       12|    STARS|[1.0,201.0,13990.93]|[0.00551544588297...|
|      14048|      0|      217| 8976.33|  4|  4|  4|        444|       12|    STARS| [0.0,217.0,8976.33]|[0.0,45.761555134...|
|       7983|      5|      149| 6973.07|  4|  4|  4|        444|       12|    STARS| [5.0,149.0,6973.07]|[0.02757722941486...|
+-----------+-------+---------+--------+---+---+---+-----------+---------+---------+--------------------+--------------------+
only showing top 3 rows



## Chạy model Kmeans

In [None]:
k_list = []
silhouette_list = []
sil_str= ""
for k in range(2,11):
    kmeans = KMeans(featuresCol='scaledFeatures', k=k)
    model = kmeans.fit(final_data)
    k_list.append(k)

  #silhouette
    predictions = model.transform(final_data)

  #evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    silhouette_list.append(silhouette)

    sil_str = sil_str + "With k = " +str(k) +" - Silhouette = " +str(silhouette) + "\n"

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
plt.plot(k_list, silhouette_list)
plt.show()

In [None]:
print(sil_str)

### Chọn k=5 để chạy model

In [None]:
kmeans = KMeans(featuresCol='scaledFeatures', k=5)
model = kmeans.fit(final_data)

In [None]:
predictions = model.transform(final_data)

  #evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette = ", str(silhouette))

In [None]:
#Show result
centers = model.clusterCenters()
print("Cluster Center: ")
for center in centers:
  print(center)

In [None]:
predictions = model.transform(final_data)

In [None]:
predictions.groupby('prediction').count().show()

In [None]:
from pyspark.ml.functions import *

In [None]:
temp = final_data.select("features").rdd.map(lambda x: x[0].toArray().tolist()).toDF()

In [None]:
temp.show(3)

In [None]:
temp= temp.select(col('_1').alias('x_scale'), col('_2').alias('y_scale'), col('_3').alias('z_scale'))

In [None]:
temp.show(3)

In [None]:
from pyspark.sql.functions import monotonically_increasing_id as mi
#combined temp to final
final_data=final_data.withColumn('row_index', mi())
temp=temp.withColumn('row_index', mi())
final_data = final_data.join(temp, on=['row_index']).sort('row_index')

data_result = predictions.select('prediction')
data_result = data_result.withColumn('row_index', mi())
final_data = final_data.join(data_result, on=['row_index']).sort('row_index').drop('row_index')

In [None]:
final_data.show(3)

In [None]:
df = final_data[['x_scale', 'y_scale', 'z_scale', 'prediction']].toPandas()

In [None]:
center_df = pd.DataFrame(centers)
center_df = center_df.rename(columns={0: 'x_center', 1: 'y_center', 2: 'z_center'})
center_df.head()

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

In [None]:
fig  = plt.figure(figsize=(12,10))
ax = fig.add_subplot(projection='3d')

ax.scatter(df.x_scale, df.y_scale,df.z_scale,c=df.prediction)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
plt.show()

In [None]:
from pyspark.sql import functions as F
analysis = predictions\
    .groupBy('prediction')\
    .agg(F.round(avg('Recency'),2).alias('Avg Recency'),
         F.round(avg('Frequency'),2).alias('Avg Frequency'),
         F.round(avg('Monetary'),2).alias('Avg Monetary'))
         F.round(count('Monetary'),2).alias('Avg Monetary'))
analysis.show()

In [259]:
def cluster_function(prediction):
    if prediction ==0:
        return "Loyal"
    elif prediction ==1:
        return "Lost Cheap"
    elif prediction ==2:
        return "Big Spender"
    elif prediction ==3:
        return "Regular"
    return "Star"

cluster_function_udf = udf(cluster_function, StringType())
analysis=analysis.withColumn("type", cluster_function_udf(analysis['prediction']))
analysis.show()

+----------+-----------+-------------+------------+-----------+
|prediction|Avg Recency|Avg Frequency|Avg Monetary|       type|
+----------+-----------+-------------+------------+-----------+
|         1|     484.63|         1.37|       42.82| Lost Cheap|
|         3|     136.65|         4.36|      141.74|    Regular|
|         4|        2.0|        189.0|     9980.11|       Star|
|         2|      50.81|        42.08|     2449.32|Big Spender|
|         0|      67.27|        13.83|      602.57|      Loyal|
+----------+-----------+-------------+------------+-----------+



### Nhận xét:

- Cluster 0(Loyal - KH có mức chi tiêu trung bình, tần suất mua hàng vừa phải): AVG Recency: 67.22, AVG Frequency: 13.83, AVG Monetary: 602.2

- Cluster 1(Lost Cheap - KH chi tiêu ít, tần suất mua hàng thấp, thời gian mua hàng xa - Không tiêu xài nhiều cho nhóm KH này): AVG Recency: 484.63, AVG Frequency: 1.37, AVG Monetary: 42.82
- Cluster 2(Big Spender - KH chi nhiều tiền và có tần suất mua hàng cao): AVG Recency: 50.81, AVG Frequency: 42.08, AVG Monetary: 2449.32
- Cluster 3(Regular - KH chi tiêu bình thường và có tần suất mua hàng tạm ổn): AVG Recency: 136.66, AVG Frequency: 4.36, AVG Monetary: 141.74
- Cluster 4(Star - KH mua gần đây, mua nhiều): AVG Recency: 2.0, AVG Frequency: 289.0, AVG Monetary: 9980.11