In [0]:
# import modules 
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# File location and type
file_location = "/FileStore/tables/retail.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
retail_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(retail_df.take(20))

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085,United Kingdom


<h2>Total Invoice Distribution</h2>

In [0]:
# function to calculate mean, minimum, maximum, median, mode
def show_distribution(df, col):
  df.select(mean(df[col])).show()
  df.select(min(df[col])).show()
  df.select(max(df[col])).show()
  print(df.approxQuantile(col, [0.5], 0))
  print(df.groupby(col).count().orderBy(desc('count')).first())
  
# creating a Dataframe containing invoice amounts, using the original df and multiplying the according columns
# negative results are not of interest for this (they represent cancellations) so they are filtered out
invoice_amount_df = retail_df.withColumn('amount', retail_df['quantity'] * retail_df['unit_price']).groupBy('invoice_no').agg(sum('amount').alias('amount')).filter("amount > 0.0")
show_distribution(invoice_amount_df, 'amount')

In [0]:
# show distribution of first 85 quantiles, to remove outliers
quantiles = invoice_amount_df.approxQuantile('amount', [0.0, 0.85], 0)
filtered_amount_df = invoice_amount_df.filter(col("amount") > quantiles[0]).filter(col("amount") < quantiles[1])
show_distribution(filtered_amount_df, 'amount')

<h2>Monthly Placed and Canceled Orders</h2>

In [0]:
# create new yyyymm column for grouping
yyyymm_df = retail_df.select('invoice_no', 'invoice_date', year("invoice_date").alias("year"), month("invoice_date").alias("month"))
yyyymm_df = yyyymm_df.withColumn('yyyymm', yyyymm_df['year'] * 100 + yyyymm_df['month'])
yyyymm_df = retail_df.join(yyyymm_df, ['invoice_no','invoice_date'], how='right_outer')
yyyymm_df = yyyymm_df.orderBy('yyyymm')

# get overall orders by month
orders_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('invoice_no').alias('orders'))

# identify canceled orders by month
canceled_df = yyyymm_df.filter(col('invoice_no').startswith('C')).groupBy('yyyymm').agg(countDistinct('invoice_no').alias('canceled'))

# join total and canceled Dataframes, calculate placed orders from canceled orders
orders_df = orders_df.join(canceled_df, 'yyyymm', how='inner')
orders_df = orders_df.withColumn('placed', orders_df['orders'] - 2 * orders_df['canceled'])

display(orders_df.orderBy('yyyymm'))

yyyymm,orders,canceled,placed
200912,2330,401,1528
201001,1633,300,1033
201002,1969,240,1489
201003,2367,407,1553
201004,1892,304,1284
201005,2418,407,1604
201006,2216,357,1502
201007,2017,344,1329
201008,1877,273,1331
201009,2375,371,1633


<h2>Monthly Sales</h2>

In [0]:
monthly_sales_df = yyyymm_df.withColumn('amount', yyyymm_df['quantity'] * yyyymm_df['unit_price']).groupBy("yyyymm").agg(sum("amount").alias('amount'))
display(monthly_sales_df)

yyyymm,amount
200912,77471037.90008375
201001,43357438.2729976
201002,22199180.01702302
201003,38678884.22595475
201004,26545544.75500796
201005,26890716.71998633
201006,33441026.790028084
201007,22964806.359982315
201008,27722660.820028465
201009,32798946.415957283


<h2>Monthly Sales Growth</h2>

In [0]:
#calculate percentage sales growth on a monthly basis
growth_window = Window.orderBy('yyyymm')
sales_growth_df = monthly_sales_df.withColumn('previous', lag(monthly_sales_df['amount']).over(growth_window))
sales_growth_df = sales_growth_df.withColumn('pct_change', when(isnull((sales_growth_df['amount'] - sales_growth_df['previous']) / sales_growth_df['previous']), 0.0).otherwise((sales_growth_df['amount'] - sales_growth_df['previous']) / sales_growth_df['previous']))
display(sales_growth_df)

yyyymm,amount,previous,pct_change
200912,77471037.90008375,,0.0
201001,43357438.2729976,77471037.90008375,-0.4403400361188304
201002,22199180.01702302,43357438.2729976,-0.4879960417115243
201003,38678884.22595475,22199180.01702302,0.7423564382240506
201004,26545544.75500796,38678884.22595475,-0.3136941438141315
201005,26890716.71998633,26545544.75500796,0.01300300928702
201006,33441026.790028084,26890716.71998633,0.2435900142882128
201007,22964806.359982315,33441026.790028084,-0.3132744845373502
201008,27722660.820028465,22964806.359982315,0.2071802559736373
201009,32798946.415957283,27722660.820028465,0.1831096094593277


Monthly Active Users

In [0]:
active_users_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('customer_id').alias('active_users'))
display(active_users_df)

yyyymm,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


<h2>New and Existing Users</h2>

In [0]:
#get earliest yyyymm value for each user, then count unique values for each month 
new_users_df = yyyymm_df.groupby('customer_id').agg(first('yyyymm').alias('yyyymm')).groupby('yyyymm').agg(countDistinct('customer_id').alias('new_users'))

# compare all monthly users against new_users to find returning users for each month
total_users_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('customer_id').alias('total_users'))
total_users_df = total_users_df.join(new_users_df, 'yyyymm', 'left')
total_users_df = total_users_df.withColumn('returning_users', total_users_df['total_users'] - total_users_df['new_users'])

display(total_users_df)

yyyymm,total_users,new_users,returning_users
201108,980,106,874
201011,1683,322,1361
201101,783,71,712
201004,998,291,707
201003,1111,436,675
201103,1020,178,842
201112,686,28,658
201012,948,77,871
201001,786,394,392
201005,1062,254,808


<h2>Finding RFM</h2>

In [0]:
# find most recent invoice date for each customer, then find days since the invoice was processed
today = pd.to_datetime('today')
retail_df = retail_df.orderBy('invoice_date')
recency_df = retail_df.groupBy('customer_id').agg(datediff(lit(today), max('invoice_date')).alias('recency'))

# count invoices associated with each customer
frequency_df = retail_df.groupBy('customer_id').agg(countDistinct('invoice_no').alias('frequency'))

# calculate how much each user spent
monetary_df = retail_df.withColumn('amount', retail_df['quantity'] * retail_df['unit_price']).groupBy('customer_id').agg(sum('amount').alias('amount'))

# join results
rfm_df = recency_df.join(frequency_df, 'customer_id', 'left')
rfm_df = rfm_df.join(monetary_df, 'customer_id', 'left')
display(rfm_df.orderBy('customer_id').take(20))

customer_id,recency,frequency,amount
,3274,,
12346.0,3599,17.0,-64.68
12347.0,3276,8.0,5633.32
12348.0,3349,5.0,2019.4
12349.0,3292,5.0,4404.54
12350.0,3584,1.0,334.40000000000003
12351.0,3649,1.0,300.93
12352.0,3310,13.0,1889.21
12353.0,3478,2.0,406.75999999999993
12354.0,3506,1.0,1079.4


<h2>RFM Segmentation</h2>

In [0]:
# calculating invoice amounts again, and filtering invalid values
rfm_segment_df = retail_df
rfm_segment_df = rfm_segment_df.withColumn('amount', retail_df['quantity'] * retail_df['unit_price'])
rfm_segment_df = rfm_segment_df.filter("amount > 0.0").filter("quantity > 0.0")
rfm_segment_df = rfm_segment_df.dropna()

# calculating rfm scores again, and joining results
recency_df = rfm_segment_df.groupBy('customer_id').agg(datediff(lit(today), max('invoice_date')).alias('recency'))
monetary_df = rfm_segment_df.withColumn('amount', rfm_segment_df['quantity'] * rfm_segment_df['unit_price']).groupBy('customer_id').agg(sum('amount').alias('monetary'))
rfm_segment_x = recency_df.join(monetary_df, 'customer_id', 'left')
rfm_segment_z = rfm_segment_df.groupBy('customer_id').agg(countDistinct('invoice_no').alias('frequency'))

rfm_table = rfm_segment_x.join(rfm_segment_z, 'customer_id', 'left')

# calculating quantile values
r_quantile = rfm_table.approxQuantile('recency', [0.2,0.4,0.6,0.8], 0)
f_quantile = rfm_table.approxQuantile('frequency', [0.2,0.4,0.6,0.8], 0)
m_quantile = rfm_table.approxQuantile('monetary', [0.2,0.4,0.6,0.8], 0)

# bucketing our results
rfm_table = rfm_table.withColumn('r_quantile', \
                                when(col('recency') >= r_quantile[3], 1).\
                                when(col('recency') >= r_quantile[2], 2).\
                                when(col('recency') >= r_quantile[1], 3).\
                                when(col('recency') >= r_quantile[0], 4).\
                                otherwise(5))

rfm_table = rfm_table.withColumn('f_quantile', \
                                when(col('frequency') >= f_quantile[3], 5).\
                                when(col('frequency') >= f_quantile[2], 4).\
                                when(col('frequency') >= f_quantile[1], 3).\
                                when(col('frequency') >= f_quantile[0], 2).\
                                otherwise(1))

rfm_table = rfm_table.withColumn('m_quantile', \
                                when(col('monetary') >= m_quantile[3], 5).\
                                when(col('monetary') >= m_quantile[2], 4).\
                                when(col('monetary') >= m_quantile[1], 3).\
                                when(col('monetary') >= m_quantile[0], 2).\
                                otherwise(1))

display(rfm_table.take(20))

customer_id,recency,monetary,frequency,r_quantile,f_quantile,m_quantile
12799,3803,219.35,1,1,2,1
12940,3328,913.5400000000002,2,4,3,3
13285,3297,3364.59,6,4,4,5
13289,3997,307.95,1,1,2,2
13623,3304,2566.49,10,4,5,4
13832,3293,613.6899999999999,2,4,3,2
13840,3690,651.4000000000003,1,1,2,3
14450,3454,1128.4399999999998,7,3,4,3
14570,3554,613.7499999999999,3,2,3,2
14832,3904,322.69,1,1,2,2


In [0]:
# creating a segmentation map to help us categorize customers by recency and frequency scores
seg_map = {
    r'[1-2][1-2]': 'Hibernating',
    r'[1-2][3-4]': 'At Risk',
    r'[1-2]5': 'Can\'t Lose',
    r'3[1-2]': 'About to Sleep',
    r'33': 'Need Attention',
    r'[3-4][4-5]': 'Loyal Customers',
    r'41': 'Promising',
    r'51': 'New Customers',
    r'[4-5][2-3]': 'Potential Loyalists',
    r'5[4-5]': 'Champions'
}

# casting score values to string and merging to one column, to make regexp_replace possible
rfm_table = rfm_table.withColumn('r_quantile', rfm_table['r_quantile'].cast("string"))
rfm_table = rfm_table.withColumn('m_quantile', rfm_table['m_quantile'].cast("string"))
rfm_table = rfm_table.withColumn('segment', concat(rfm_table['r_quantile'], rfm_table['m_quantile']))

# replacing scores with categories
for key in seg_map:
  rfm_table = rfm_table.withColumn('segment', regexp_replace('segment', key, seg_map[key]))

rfm_table = rfm_table.na.replace(seg_map)
display(rfm_table.take(20))

customer_id,recency,monetary,frequency,r_quantile,f_quantile,m_quantile,segment
12799,3803,219.35,1,1,2,1,Hibernating
12940,3328,913.5400000000002,2,4,3,3,Potential Loyalists
13285,3297,3364.59,6,4,4,5,Loyal Customers
13289,3997,307.95,1,1,2,2,Hibernating
13623,3304,2566.49,10,4,5,4,Loyal Customers
13832,3293,613.6899999999999,2,4,3,2,Potential Loyalists
13840,3690,651.4000000000003,1,1,2,3,At Risk
14450,3454,1128.4399999999998,7,3,4,3,Need Attention
14570,3554,613.7499999999999,3,2,3,2,Hibernating
14832,3904,322.69,1,1,2,2,Hibernating


In [0]:
# show final chart
rfm_chart = rfm_table.groupBy('segment').agg(mean('recency'), mean('frequency'), mean('monetary'), count('customer_id'))
display(rfm_chart)

segment,avg(recency),avg(frequency),avg(monetary),count(customer_id)
Champions,3281.6997487437184,19.73994974874372,11682.594981155771,796
Promising,3310.267175572519,1.4274809160305344,176.56679389312978,131
At Risk,3650.599469496021,3.5623342175066317,1240.617348806366,754
About to Sleep,3383.6081424936388,1.8651399491094147,322.689465648855,393
Hibernating,3730.875588433087,1.433086751849361,278.67173570948216,1487
Potential Loyalists,3298.5774058577404,3.1352859135285915,731.9812705718269,717
Loyal Customers,3339.367558239862,9.531492666091458,4497.51810698878,1159
Need Attention,3383.8195488721803,3.582706766917293,907.3443308270676,266
New Customers,3283.875,1.515625,182.91015625,64
Can't Lose,3636.0,10.576576576576576,8592.901909909913,111
