In [1]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Format output tables better

In [3]:
spark

In [4]:
df = spark.read.csv("data/ecommerce.csv", header=True)

In [5]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

## Exploratory data analysis

In [6]:
df.count()

541909

How many unique customers are present in dataset?

In [7]:
df.select('CustomerID').distinct().count()

4373

Wich country do most purchases come from?

df.groupBy('Country').agg(F.countDistinct('CustomerID').alias('country_count')).orderBy(F.desc('country_count')).show()

When was the most recent purchase made by a customer in the e-commerce platform?

In [8]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

key,value
spark.sql.legacy....,LEGACY


In [9]:
df = df.withColumn('date', F.to_timestamp("InvoiceDate", "MM/dd/yy HH:mm"))

In [10]:
df.select(F.max("date")).show()

+-------------------+
|          max(date)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



When was the earliest purchase made by a customer in the e-commerce platform?

In [11]:
df.select(F.min("date")).show()

+-------------------+
|          min(date)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



## Data Pre-processing

In [12]:
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|               date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
+---------+-----

We will create new features to better capture customer behaviour. 

**RFM** is commonly used in marketing to evalute a client's value based on their:

1. RECENCY: How recently has each customer made a purchase?
2. FRECUENCY: How often they bought something?
3. MONEARY VALUE: How much money do they spend on average when making purchases?

### Recency

We will substract every date in the dataframe from the earliest date. This will tell us how recently a customer was seen in the data.

In [13]:
df = df.withColumn("from_date", F.lit("2010-12-01 08:26:00"))

In [14]:
## Parse date column
df = df.withColumn("from_date", F.to_timestamp("from_date", "yy-MM-dd HH:mm"))

In [15]:
df2 = df.withColumn("from_date", F.to_timestamp(F.col("from_date"))) \
     .withColumn("recency", F.col("date").cast("long") - F.col("date").cast("long"))

In [17]:
df2.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|recency|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      0|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      0|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      0|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United 

In [19]:
# Select the most recent purchase
df2 = df2.join(df2.groupby("CustomerID") \
                   .agg(F.max("recency").alias("recency")), on="recency", how='leftsemi')

In [20]:
df2.show(5)

+-------+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+
|recency|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|               date|          from_date|
+-------+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+
|      0|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|      0|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|      0|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|      0|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850

### Frequency

How often a customer bought something on the platform.

In [22]:
df_freq = df2.groupBy('CustomerID').agg(F.count("InvoiceDate").alias("frequency"))

In [23]:
df_freq.show()

+----------+---------+
|CustomerID|frequency|
+----------+---------+
|     16250|       24|
|     15574|      168|
|     15555|      925|
|     15271|      275|
|     17714|       10|
|     17686|      286|
|     13865|       30|
|     14157|       49|
|     13610|      228|
|     17757|      742|
|     17551|       43|
|     13187|       37|
|     16549|      981|
|     12637|      394|
|     15052|       30|
|     15448|       28|
|     13985|      353|
|     12888|        7|
|     14525|      298|
|     18283|      756|
+----------+---------+
only showing top 20 rows



In [24]:
# Join back to the original dataframe
df3 = df2.join(df_freq, on="CustomerID", how="inner")

In [25]:
df3.show()

+----------+-------+---------+---------+--------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+
|CustomerID|recency|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|       Country|               date|          from_date|frequency|
+----------+-------+---------+---------+--------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+
|     17850|      0|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      312|
|     17850|      0|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      312|
|     17850|      0|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|      312|
|     17850|      0|   536365|   8

## Monetary value

We calculate the total amount spent by each customer.

In [26]:
m_val = df3.withColumn("TotalAmount", F.col("Quantity") * F.col("UnitPrice"))

In [28]:
m_val = m_val.groupBy("CustomerID").agg(F.sum("TotalAmount").alias("monetary_value"))

In [30]:
final_df = m_val.join(df3, on="CustomerID", how="inner")

In [31]:
# Select only the required columns
final_df = final_df.select(["recency", "frequency", "monetary_value", "CustomerID"]).distinct()

In [32]:
final_df.show()

+-------+---------+------------------+----------+
|recency|frequency|    monetary_value|CustomerID|
+-------+---------+------------------+----------+
|      0|       24|389.44000000000005|     16250|
|      0|      168| 702.2500000000002|     15574|
|      0|      925|4758.2000000000035|     15555|
|      0|      275|           2485.82|     15271|
|      0|       10|             153.0|     17714|
|      0|      286|           5739.46|     17686|
|      0|       30|501.56000000000006|     13865|
|      0|       49| 400.4300000000001|     14157|
|      0|      228|1115.4299999999996|     13610|
|      0|      742| 5585.490000000003|     17757|
|      0|       43|            306.84|     17551|
|      0|       37|236.01999999999995|     13187|
|      0|      981|           4154.64|     16549|
|      0|      394| 5953.250000000001|     12637|
|      0|       30|            215.78|     15052|
|      0|       28|494.64000000000004|     15448|
|      0|      353| 7024.529999999999|     13985|


## Train model

Before training the model, we standarize the data to ensure all variables have the same scale.

In [34]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [37]:
assemble = VectorAssembler(
    inputCols=[
        "recency", "frequency", 
        "monetary_value"
    ], outputCol="features"
)

In [38]:
assembled_data = assemble.transform(final_df)

In [39]:
scale = StandardScaler(inputCol="features", outputCol="standardized")
data_scale = scale.fit(assembled_data)
data_scale_output = data_scale.transform(assembled_data)

In [40]:
# check how the standardied vector looks like
data_scale_output.select("standardized").show(2, truncate=False)

+---------------------------------------------+
|standardized                                 |
+---------------------------------------------+
|[0.0,0.10323841364177115,0.04738090362627903]|
|[0.0,0.7226688954923981,0.08543868008307944] |
+---------------------------------------------+
only showing top 2 rows

