In [1]:
from pyspark.sql import SparkSession
from pyspark.mllib.clustering import KMeans
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

In [2]:
spark = SparkSession.builder.appName('olistRFM').getOrCreate()

In [3]:
dfOrders = spark.read.csv('olist_orders_dataset.csv', header=True, inferSchema=True)

In [4]:
dfOrders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [5]:
dfOrdersPayment = spark.read.csv('olist_order_payments_dataset.csv', header=True, inferSchema=True)

In [6]:
dfOrdersPayment.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [7]:
dfOrders.count()

99441

In [8]:
dfOrdersPayment.count()

103886

In [9]:
dfOrders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [10]:
dfOrdersPayment.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [11]:
df = dfOrders.join(dfOrdersPayment,on='order_id',how='inner')

In [12]:
df.count()

103886

In [13]:
df.select('order_id').distinct().count()

99440

In [14]:
orderFrequency = df.groupBy('order_id').count().orderBy('count', ascending = False).select('count')

In [15]:
type(orderFrequency)

pyspark.sql.dataframe.DataFrame

In [16]:
data = orderFrequency.collect()

In [17]:
df = df.drop("order_delivered_carrier_date","order_approved_at","order_delivered_customer_date","order_estimated_delivery_date","payment_installments","payment_sequential")

In [18]:
custumerOrdersCount = df.select("customer_id").groupby("customer_id").count().orderBy("count",ascending=False)

In [19]:
custumerOrdersPayment = df.groupby("customer_id").max("payment_value")

In [20]:
joinResult = custumerOrdersCount.join(custumerOrdersPayment, on ="customer_id", how="inner")

In [21]:
joinResult=joinResult.withColumnRenamed('count','frequency')

In [22]:
import numpy as np
import datetime
from pyspark.sql.functions import year, month, dayofmonth

df = df.withColumn("order_purchase_timestamp",df["order_purchase_timestamp"].cast("double"))
OrderRecent = df.groupBy("customer_id").max("order_purchase_timestamp")
OrderRecent=OrderRecent.withColumn("max(order_purchase_timestamp)",OrderRecent["max(order_purchase_timestamp)"].cast("timestamp"))
OrderRecent = OrderRecent.withColumnRenamed("max(order_purchase_timestamp)","time_stamp")
OrderRecent.show()

+--------------------+-------------------+
|         customer_id|         time_stamp|
+--------------------+-------------------+
|f54a9f0e6b351c431...|2017-01-23 18:29:09|
|2a1dfb647f32f4390...|2018-06-01 12:23:13|
|4f28355e5c17a4a42...|2017-05-18 13:55:47|
|4632eb5a8f175f6fe...|2017-11-30 22:02:15|
|843ff05b30ce4f75b...|2017-11-13 10:07:36|
|a4156bb8aff5d6722...|2017-09-26 11:08:38|
|1099d033c74a027a7...|2018-01-27 22:09:48|
|7a3bd3b37285f0ab2...|2017-03-09 16:43:40|
|5fff39f1b59dc4d2f...|2017-05-01 19:13:37|
|18f6ca10777417c93...|2018-01-18 14:40:57|
|f1e46939e6408b3e6...|2017-06-01 14:58:11|
|a8695124db570d100...|2017-11-22 16:56:05|
|545b9a267af9ba134...|2018-02-14 10:48:33|
|90d7075599361b694...|2017-07-28 17:03:33|
|a340ce6c3570e68d4...|2018-04-18 14:29:17|
|ce0681e1c3f70e145...|2017-12-19 16:58:22|
|ee905ec97794ec6e9...|2017-06-18 17:55:19|
|b74ca180d63f9ae04...|2018-01-30 09:43:45|
|0de46efc7d10114ac...|2017-03-23 13:25:32|
|8bdae6b4ff9bc7f4c...|2017-11-24 23:30:13|
+----------

In [23]:
OrderRecent.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)



In [24]:
joinResult.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- frequency: long (nullable = false)
 |-- max(payment_value): double (nullable = true)



In [38]:
finalDf = joinResult.join(OrderRecent, on = 'customer_id')

In [39]:
finalDf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- frequency: long (nullable = false)
 |-- max(payment_value): double (nullable = true)
 |-- time_stamp: timestamp (nullable = true)



In [40]:
finalDf = finalDf.withColumnRenamed('max(payment_value)','monetary')
finalDf = finalDf.withColumnRenamed('time_stamp','recency')

In [41]:
finalDf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- frequency: long (nullable = false)
 |-- monetary: double (nullable = true)
 |-- recency: timestamp (nullable = true)



In [42]:
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import Vectors

In [43]:
finalDf.select('recency').show(1)

+-------------------+
|            recency|
+-------------------+
|2017-11-29 00:11:03|
+-------------------+
only showing top 1 row



In [44]:
finalDf

DataFrame[customer_id: string, frequency: bigint, monetary: double, recency: timestamp]

In [45]:
finalDf.head()

Row(customer_id=u'01d190d14b00073f76e0a5ec46166352', frequency=1, monetary=258.74, recency=datetime.datetime(2017, 11, 29, 0, 11, 3))

In [46]:
def returnEpoch(dt):
    return (dt - datetime.datetime(1970, 1, 1,0,0,0)).total_seconds()
def returnRows(x):
    return float(x[1]),float(x[2]),float(returnEpoch(x[3])),str(x[0])

In [47]:
finalDfrdd = finalDf.rdd.map(lambda x: returnRows(x))

In [48]:
finalDfrdd.take(3)

[(1.0, 258.74, 1511914263.0, '01d190d14b00073f76e0a5ec46166352'),
 (1.0, 198.1, 1529248545.0, '03a7750fc7a7bfbd7a84b2f4f26b92f1'),
 (1.0, 265.04, 1524412304.0, '04495037fc6899faffa41ba3bc4272b4')]

In [50]:
finalDf = finalDfrdd.toDF()

In [52]:
finalDf.show(1)

+---+------+-------------+--------------------+
| _1|    _2|           _3|                  _4|
+---+------+-------------+--------------------+
|1.0|258.74|1.511914263E9|01d190d14b00073f7...|
+---+------+-------------+--------------------+
only showing top 1 row



In [55]:
finalDf = finalDf.withColumnRenamed('_1','frequency')
finalDf = finalDf.withColumnRenamed('_2','monetary')
finalDf = finalDf.withColumnRenamed('_3','recency')
finalDf = finalDf.withColumnRenamed('_4','customer_id')

In [56]:
finalDf.head()

Row(frequency=1.0, monetary=258.74, recency=1511914263.0, customer_id=u'01d190d14b00073f76e0a5ec46166352')

In [57]:
finalDf.describe().show()

+-------+------------------+------------------+--------------------+--------------------+
|summary|         frequency|          monetary|             recency|         customer_id|
+-------+------------------+------------------+--------------------+--------------------+
|  count|             99440|             99440|               99440|               99440|
|   mean|1.0447103781174578|159.47379143201914|1.5147102027885962E9|                null|
| stddev|0.3811656306568635|220.36965405933074|1.3276456021717167E7|                null|
|    min|               1.0|               0.0|       1.473023719E9|00012a2ce6f8dcda2...|
|    max|              29.0|          13664.08|       1.539797418E9|ffffe8b65bbe3087b...|
+-------+------------------+------------------+--------------------+--------------------+



In [58]:
clusterData = finalDf.select('frequency','monetary','recency')

In [60]:
clusterData.head()

Row(frequency=1.0, monetary=258.74, recency=1511914263.0)

In [61]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [62]:
clusterData.columns

['frequency', 'monetary', 'recency']

In [63]:
vec_assembler = VectorAssembler(inputCols = clusterData.columns, outputCol='features')

In [65]:
final_data = vec_assembler.transform(clusterData)

In [67]:
final_data.head()

Row(frequency=1.0, monetary=258.74, recency=1511914263.0, features=DenseVector([1.0, 258.74, 1511914263.0]))

In [69]:
#Scaling

In [70]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [71]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

In [72]:
# Normalize each feature to have unit standard deviation.
final_data = scalerModel.transform(final_data)

In [73]:
#Train the Model and Evaluate

In [74]:
# Trains a k-means model.
kmeans = KMeans(featuresCol='scaledFeatures',k=3)
model = kmeans.fit(final_data)

In [75]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(final_data)
print("Within Set Sum of Squared Errors = " + str(wssse))

Within Set Sum of Squared Errors = 176660.1325


In [76]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[   2.70035637    0.72145197  114.81397409]
[  20.18575923    0.23344198  113.7706518 ]
[   2.72518651    0.72860253  113.11919632]


In [78]:
model.transform(final_data).select('prediction').distinct().show()

+----------+
|prediction|
+----------+
|         1|
|         2|
|         0|
+----------+

