In [0]:

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DoubleType, DateType
# File location and type
file_location = "/FileStore/tables/online_retail_II-1.csv"
file_type = "csv"

# CSV options
delimiter = ","

schema = StructType(
  [
    StructField("Invoice", StringType(), False),
    StructField("StockCode", StringType(), False),
    StructField("Description", StringType(), False),
    StructField("Quantity", IntegerType(), False),
    StructField("InvoiceDate", DateType(), False),
    StructField("Price", DoubleType(), False),
    StructField("CustomerId", StringType(), False),
    StructField("Country", StringType(), False),
  ]
)

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", delimiter) \
  .schema(schema)\
  .load(file_location)

display(df)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerId,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01,3.75,13085.0,United Kingdom


# Total Invoice Amount Distribution

In [0]:
# Dispaly the distribution of invoice

# for each invoice, amount =  SUM(Quantity * Price) 
df_amount = df.withColumn("Amount", F.col("Quantity")*F.col("Price"))
df_total = df_amount.groupby("Invoice").sum("Amount").where(F.col("sum(Amount)") > 0).select(F.col("sum(Amount)").alias("Total"))

df_mode_val = df_total.groupby("Total").count().orderBy("count", ascending=False).first()[0]
df_median_val = df_total.approxQuantile("Total", [0.5], 0)[0]

df_distribution_val = df_total\
.select(F.max("Total").alias("max"), F.min("Total").alias("min"), F.mean("Total").alias("mean"))\
.withColumn("median", F.lit(df_median_val))\
.withColumn("mode", F.lit(df_mode_val))

display(df_distribution_val)

max,min,mean,median,mode
168469.6,0.19,523.3037611158244,304.3100000000002,15.0


In [0]:
# 0.85 Total Invoice Amount Distribution

threshold = df_total.approxQuantile("Total", [0.85], 0)[0]
df_total_threshold = df_total.where(F.col("Total") < threshold)

df_mode_threshold_val = df_total_threshold.groupby("Total").count().orderBy("count", ascending=False).first()[0]
df_median_threshold_val = df_total.approxQuantile("Total", [0.5], 0)[0]

df_distribution_threshold_val = df_total_threshold\
.select(F.max("Total").alias("max"), F.min("Total").alias("min"), F.mean("Total").alias("mean"))\
.withColumn("median", F.lit(df_median_threshold_val))\
.withColumn("mode", F.lit(df_mode_threshold_val))

display(df_distribution_threshold_val)

max,min,mean,median,mode
724.2499999999999,0.19,271.6838912992427,304.3100000000002,15.0


# Monthly Placed and Canceled Orders

In [0]:
# Monthly Placed and Canceled Orders

df_amount = df_amount.withColumn("Date", F.date_format(F.col("InvoiceDate"),"YYYY-MM"))
df_group = df_amount.groupby(["Invoice", "Date"]).count().select(["Invoice", "Date"])

df_placement = df_group.groupby("Date").count().select(["Date", "count"]).withColumnRenamed("count", "TotalOrders").sort("Date")
df_canceled = df_group.where(F.col("Invoice").contains("C")).groupby("Date").count().select(["Date", "count"]).withColumnRenamed("count", "CanceledOrders").sort("Date")

df_orders = df_placement.join(df_canceled, [df_placement.Date==df_canceled.Date], 'inner').drop(df_canceled.Date)
df_orders = df_orders.withColumn("PlacedOrders", F.col("TotalOrders") - 2*F.col("CanceledOrders")).select(["Date","PlacedOrders", "CanceledOrders"]).sort("Date")


display(df_orders)

Date,PlacedOrders,CanceledOrders
2009-12,1528,401
2010-01,1033,300
2010-02,1489,240
2010-03,1553,407
2010-04,1284,304
2010-05,1604,407
2010-06,1502,357
2010-07,1329,344
2010-08,1331,273
2010-09,1633,371


# Monthly Sales in Millions

In [0]:
# Monthly Sales in Millions
df_sale = df_amount.where(F.col("Amount") > 0).groupby("Date").sum("Amount").sort("Date")
df_sale_millions = df_sale.withColumn("Sale in Millions", F.col("sum(Amount)")/1000000).drop("sum(Amount)")
display(df_sale_millions)


Date,Sale in Millions
2009-12,0.8256857600000115
2010-01,0.6527085019999943
2010-02,0.5537133060000008
2010-03,0.833570130999969
2010-04,0.6815289919999798
2010-05,0.6598588599999907
2010-06,0.7522701399999794
2010-07,0.6507129400000102
2010-08,0.6972749099999849
2010-09,0.9243330109999602


In [0]:
# Monthly Sales in Millions
df_sale_percentage = df_sale_millions.withColumn("Sale in Percentage", F.concat(F.round(F.col("Sale in Millions")*100,2), F.lit('%'))).drop("Sale in Millions")
display(df_sale_percentage)

Date,Sale in Percentage
2009-12,82.57%
2010-01,65.27%
2010-02,55.37%
2010-03,83.36%
2010-04,68.15%
2010-05,65.99%
2010-06,75.23%
2010-07,65.07%
2010-08,69.73%
2010-09,92.43%


# Monthly Sales Growth

In [0]:
# Monthly Sales Growth
from pyspark.sql.window import Window
win = Window.orderBy('Date')
df_sale_prev = df_sale.withColumn("prev",  F.lag("sum(Amount)").over(win))
df_sale_growth = df_sale_prev.withColumn("growth", F.round((F.col("sum(Amount)")-F.col("prev"))/F.col("prev"), 2)).drop("sum(Amount)").drop("prev")

display(df_sale_growth)

Date,growth
2009-12,
2010-01,-0.21
2010-02,-0.15
2010-03,0.51
2010-04,-0.18
2010-05,-0.03
2010-06,0.14
2010-07,-0.14
2010-08,0.07
2010-09,0.33


# Monthly Active Users

In [0]:
# Monthly Active Users

df_active_user = df_amount.groupBy(["Date", "CustomerId"]).count().sort("Date").drop("count")
df_active_user = df_active_user.groupBy("Date").count().withColumnRenamed("count", "Active")

display(df_active_user)

Date,Active
2009-12,1046
2010-01,787
2010-02,808
2010-03,1112
2010-04,999
2010-05,1063
2010-06,1096
2010-07,989
2010-08,965
2010-09,1203


# New and Existing Users

In [0]:


df_earlist = df_amount.groupby(["CustomerId"]).agg(F.min("Date").alias("FirstDate"))
df_earlist = df_earlist.groupby(["FirstDate"]).count().sort("FirstDate").withColumnRenamed("count", "New")

df_user = df_active_user.join(df_earlist, [df_earlist.FirstDate==df_active_user.Date], "inner").drop("FirstDate").sort("Date")
df_user = df_user.withColumn("Old", F.col("Active")-F.col("New") ).drop("Active")
display(df_user)


Date,New,Old
2009-12,1046,0
2010-01,394,393
2010-02,363,445
2010-03,436,676
2010-04,291,708
2010-05,254,809
2010-06,269,827
2010-07,183,806
2010-08,158,807
2010-09,242,961


# Finding RFM

In [0]:
from datetime import datetime
today = datetime(2012,1,1)

df_RFM_Monetary = df_amount.groupby("CustomerId").sum("Amount").withColumnRenamed("sum(Amount)", "Monetary").sort("CustomerId").where(F.col("CustomerId") > 0)
df_RFM_Recency = df_amount.groupby('CustomerId').agg(F.datediff(F.lit(today), F.max("Date")).alias("Recency"))
df_RFM_Frequency = df_amount.groupby(['CustomerId', 'Invoice']).count().groupby('CustomerId').count().withColumnRenamed("count", "Frequency")

df_RFM = df_RFM_Monetary.join(df_RFM_Recency, [df_RFM_Recency.CustomerId == df_RFM_Recency.CustomerId], 'inner').drop(df_RFM_Recency.CustomerId)
df_RFM = df_RFM.join(df_RFM_Frequency, [df_RFM_Frequency.CustomerId == df_RFM.CustomerId])\
.drop(df_RFM_Frequency.CustomerId)\
.select(["CustomerId", "Monetary", "Recency", "Frequency"]).sort("CustomerId")

display(df_RFM)


CustomerId,Monetary,Recency,Frequency
12346.0,-64.67999999999981,365,17
12347.0,5633.320000000001,31,8
12348.0,2019.4,122,5
12349.0,4404.54,61,5
12350.0,334.40000000000003,334,1
12351.0,300.93,426,1
12352.0,1889.21,61,13
12353.0,406.75999999999993,245,2
12354.0,1079.4,275,1
12355.0,947.61,245,2


# RFM Segmentation

In [0]:
from pyspark.sql.functions import when


recency_q = df_RFM.approxQuantile('Recency', [0.2,0.4,0.6,0.8], 0)
frequency_q = df_RFM.approxQuantile('Frequency', [0.2,0.4,0.6,0.8], 0)
monetary_q = df_RFM.approxQuantile('Monetary', [0.2,0.4,0.6,0.8], 0)

df_RFM = df_RFM.\
          withColumn("RecencyScore",  
            when(df_RFM.Recency > recency_q[3] , '5').
            when(df_RFM.Recency > recency_q[2] , '4').
            when(df_RFM.Recency > recency_q[1] , '3').
            when(df_RFM.Recency > recency_q[0] , '2').
            otherwise(1)).\
          withColumn("FrequencyScore",  
            when(df_RFM.Frequency > frequency_q[3] , '5').
            when(df_RFM.Frequency > frequency_q[2] , '4').
            when(df_RFM.Frequency > frequency_q[1] , '3').
            when(df_RFM.Frequency > frequency_q[0] , '2').
            otherwise(1)).\
          withColumn("MonetaryScore",  
            when(df_RFM.Monetary > monetary_q[3] , '5').
            when(df_RFM.Monetary > monetary_q[2] , '4').
            when(df_RFM.Monetary > monetary_q[1] , '3').
            when(df_RFM.Monetary > monetary_q[0] , '2').
            otherwise(1))

display(df_RFM)



CustomerId,Monetary,Recency,Frequency,RecencyScore,FrequencyScore,MonetaryScore
12346.0,-64.67999999999981,365,17,4,5,1
12347.0,5633.320000000001,31,8,1,4,5
12348.0,2019.4,122,5,3,3,4
12349.0,4404.54,61,5,1,3,5
12350.0,334.40000000000003,334,1,4,1,2
12351.0,300.93,426,1,4,1,2
12352.0,1889.21,61,13,1,5,4
12353.0,406.75999999999993,245,2,4,2,2
12354.0,1079.4,275,1,4,1,3
12355.0,947.61,245,2,4,2,3


In [0]:
import re
def helper(x):
  seg_map = {
    r'[1-2][1-2]': 'Hibernating',
    r'[1-2][3-4]': 'At Risk',
    r'[1-2]5': 'Can\'t Lose',
    r'3[1-2]': 'About to Sleep',
    r'33': 'Need Attention',
    r'[3-4][4-5]': 'Loyal Customers',
    r'41': 'Promising',
    r'51': 'New Customers',
    r'[4-5][2-3]': 'Potential Loyalists',
    r'5[4-5]': 'Champions'
  }
  for key in seg_map.keys():
    x = re.sub(key, seg_map[key], x)
  return x

helper_udf = F.udf(helper, StringType())
df_RFM = df_RFM.withColumn('SegmentScore', F.concat(F.col("RecencyScore"), F.col("FrequencyScore")))
df_RFM = df_RFM.withColumn('Segment', helper_udf("SegmentScore"))
display(df_RFM)

CustomerId,Monetary,Recency,Frequency,RecencyScore,FrequencyScore,MonetaryScore,SegmentScore,Segment
12346.0,-64.67999999999981,365,17,4,5,1,45,Loyal Customers
12347.0,5633.320000000001,31,8,1,4,5,14,At Risk
12348.0,2019.4,122,5,3,3,4,33,Need Attention
12349.0,4404.54,61,5,1,3,5,13,At Risk
12350.0,334.40000000000003,334,1,4,1,2,41,Promising
12351.0,300.93,426,1,4,1,2,41,Promising
12352.0,1889.21,61,13,1,5,4,15,Can't Lose
12353.0,406.75999999999993,245,2,4,2,2,42,Potential Loyalists
12354.0,1079.4,275,1,4,1,3,41,Promising
12355.0,947.61,245,2,4,2,3,42,Potential Loyalists
