## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import * 
from pyspark.sql.window import Window

# File location and type
file_location = "/FileStore/tables/online_retail_II.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df.take(10))

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


In [0]:
retail_df = df.cache()
display(retail_df.take(5))

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom


### Total Invoice Distribution

In [0]:
# function to calculate mean, minimum, maximum, median, mode
def show_distribution(df, col):
  df.select(min(df[col])).show()
  df.select(max(df[col])).show()
  df.select(mean(df[col])).show()
  print(df.approxQuantile(col,[0.5],0))
  print(df.groupby(col).count().orderBy(desc('count')).first())
  
# creating a Dataframe containing invoice amounts, using the original df and multiplying the according columns
# negative results are not of interest for this (they represent cancellations) so they are filtered out

invoice_amount_df = retail_df.withColumn('Amount', retail_df['Quantity'] * retail_df['Price']).groupBy('Invoice').agg(sum('Amount').alias('Amount')).filter("Amount > 0.0")
show_distribution(invoice_amount_df, 'Amount')

In [0]:
# show distribution of first 85 quantiles, to remove outliers
quantiles = invoice_amount_df.approxQuantile('Amount', [0.0, 0.85], 0)
filtered_amount_df = invoice_amount_df.filter(col("Amount") > quantiles[0]).filter(col("Amount") < quantiles[1])
show_distribution(filtered_amount_df, 'Amount')

### Monthly Placed and canceled Orders

In [0]:
# new colum for "yyyymm"
orders_df = retail_df
yyyymm_df = orders_df.select('Invoice', 'InvoiceDate', year("InvoiceDate").alias("Year"), month("InvoiceDate").alias("Month"))
yyyymm_df = yyyymm_df.withColumn('yyyymm', yyyymm_df['Year']*100+yyyymm_df['Month'])
yyyymm_df = orders_df.join(yyyymm_df,['Invoice','InvoiceDate'], how ='right_outer')
yyyymm_df = yyyymm_df.orderBy('yyyymm')
orders_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('Invoice').alias('Orders'))

#get canceled order for each month
canceled_df = yyyymm_df.filter(col('Invoice').startswith('C')).groupBy('yyyymm').agg(countDistinct('Invoice').alias('Canceled'))

#calculated placed orders
orders_df = orders_df.join(canceled_df, 'yyyymm', how = 'inner')
orders_df = orders_df.withColumn('placed', orders_df['Orders']-2*orders_df['Canceled'])

display(orders_df.orderBy('yyyymm'))

yyyymm,Orders,Canceled,placed
200912,2330,401,1528
201001,1633,300,1033
201002,1969,240,1489
201003,2367,407,1553
201004,1892,304,1284
201005,2418,407,1604
201006,2216,357,1502
201007,2017,344,1329
201008,1877,273,1331
201009,2375,371,1633


###Monthly Sales

In [0]:
monthly_sales_df = yyyymm_df.withColumn('Amount', yyyymm_df['Quantity'] * yyyymm_df['Price']).groupBy("yyyymm").agg(sum("Amount").alias('Amount'))
monthly_sales_df = monthly_sales_df.withColumn('Amount',round(monthly_sales_df['Amount'], 2))
display(monthly_sales_df)

yyyymm,Amount
200912,77471037.9
201001,43357438.27
201002,22199180.02
201003,38678884.23
201004,26545544.76
201005,26890716.72
201006,33441026.79
201007,22964806.36
201008,27722660.82
201009,32798946.42


###Monthly Sales Growth

In [0]:
monthly_growth_window = Window.orderBy('yyyymm')
sale_growth_df = monthly_sales_df.withColumn('previous', lag(monthly_sales_df['Amount']).over(monthly_growth_window))
sale_growth_df = sale_growth_df.withColumn('growth',when(isnull((sale_growth_df['Amount']-sale_growth_df['previous'])/sale_growth_df['previous']),0.0).otherwise((sale_growth_df['Amount']-sale_growth_df['previous'])/sale_growth_df['previous']))

sale_growth_df = sale_growth_df.withColumn('previous',round(sale_growth_df['previous'],2))
sale_growth_df = sale_growth_df.withColumn('growth',round(sale_growth_df['growth'],2))
display(sale_growth_df)

yyyymm,Amount,previous,growth
200912,77471037.9,,0.0
201001,43357438.27,77471037.9,-0.44
201002,22199180.02,43357438.27,-0.49
201003,38678884.23,22199180.02,0.74
201004,26545544.76,38678884.23,-0.31
201005,26890716.72,26545544.76,0.01
201006,33441026.79,26890716.72,0.24
201007,22964806.36,33441026.79,-0.31
201008,27722660.82,22964806.36,0.21
201009,32798946.42,27722660.82,0.18


### Monthly Active Users

In [0]:
#get monthly active users
active_users_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('Customer ID').alias('Active_user'))
display(active_users_df)

yyyymm,Active_user
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


### New and Existing Users

In [0]:
#get the new user
new_user_df = yyyymm_df.groupBy('Customer ID').agg(first('yyyymm').alias('yyyymm')).groupby('yyyymm').agg(countDistinct('Customer ID').alias('New_user'))
# get the total user
total_user_df = yyyymm_df.groupBy('yyyymm').agg(countDistinct('Customer ID').alias('Total_user'))
total_user_df=total_user_df.join(new_user_df,'yyyymm',how ='left_outer')
total_user_df = total_user_df.withColumn('Return_user', total_user_df['Total_user']-total_user_df['New_user'])

display(total_user_df)

yyyymm,Total_user,New_user,Return_user
201108,980,106,874
201011,1683,322,1361
201101,783,71,712
201004,998,291,707
201003,1111,436,675
201103,1020,178,842
201112,686,28,658
201012,948,77,871
201001,786,394,392
201005,1062,254,808


### Finding RFM

In [0]:
# find most recent invoice date for each customer, then find days since the invoice was processed
Today = pd.to_datetime('today')
retail_df = retail_df.orderBy('InvoiceDate')
recency_df = retail_df.groupBy('Customer Id').agg(datediff(lit(Today), max('InvoiceDate')).alias('Recency'))

# count invoices associated with each customer
frequency_df = retail_df.groupBy('Customer Id').agg(countDistinct('Invoice').alias('Frequency'))

# calculate how much each user spent
monetary_df = retail_df.withColumn('Amount', retail_df['Quantity'] * retail_df['Price']).groupBy('Customer Id').agg(sum('Amount').alias('Amount'))
monetary_df = monetary_df.withColumn('Monetary',round(monetary_df['Amount'],2))

# join results
rfm_df = recency_df.join(frequency_df, 'Customer Id',how = 'left_outer')
rfm_df = rfm_df.join(monetary_df, 'Customer Id',how = 'left_outer')
display(rfm_df.orderBy('Customer Id').take(20))

Customer Id,Recency,Frequency,Amount,Monetary
,3328,,,
12346.0,3653,17.0,-64.68,-64.68
12347.0,3330,8.0,5633.32,5633.32
12348.0,3403,5.0,2019.4,2019.4
12349.0,3346,5.0,4404.54,4404.54
12350.0,3638,1.0,334.40000000000003,334.4
12351.0,3703,1.0,300.93,300.93
12352.0,3364,13.0,1889.21,1889.21
12353.0,3532,2.0,406.75999999999993,406.76
12354.0,3560,1.0,1079.4,1079.4


### RFM Segmentation

In [0]:
#
r_quartile = rfm_df.approxQuantile("Recency", [0.2, 0.4, 0.6, 0.8], 0)
f_quartile = rfm_df.approxQuantile("Frequency", [0.2, 0.4, 0.6, 0.8], 0)
m_quartile = rfm_df.approxQuantile("Monetary", [0.2, 0.4, 0.6, 0.8], 0)

rfm_df = rfm_df.withColumn("R_Quartile", \
                                 when(col("Recency") >= r_quartile[3] , 1).\
                                 when(col("Recency") >= r_quartile[2] , 2).\
                                 when(col("Recency") >= r_quartile[1] , 3).\
                                 when(col("Recency") >= r_quartile[0] , 4).\
                                 otherwise(4))

rfm_df = rfm_df.withColumn("F_Quartile", \
                                 when(col("Frequency") > f_quartile[3] , 5).\
                                 when(col("Frequency") > f_quartile[2] , 4).\
                                 when(col("Frequency") > f_quartile[1] , 3).\
                                 when(col("Frequency") > f_quartile[0] , 2).\
                                 otherwise(1))

rfm_df = rfm_df.withColumn("M_Quartile", \
                                 when(col("Monetary") >= m_quartile[3] , 5).\
                                 when(col("Monetary") >= m_quartile[2] , 4).\
                                 when(col("Monetary") >= m_quartile[1] , 3).\
                                 when(col("Monetary") >= m_quartile[0] , 2).\
                                 otherwise(1))

rfm_df = rfm_df.withColumn("RFM_Score", concat(col("R_Quartile"), col("F_Quartile"), col("M_Quartile")))
display(rfm_df.take(20))

Customer Id,Recency,Frequency,Amount,Monetary,R_Quartile,F_Quartile,M_Quartile,RFM_Score
12467.0,3714,2,-2.842170943040401e-14,0.0,2,2,1,221
12493.0,3493,3,416.79,416.79,3,2,2,322
12671.0,3934,1,2622.481000000001,2622.48,1,1,4,114
12737.0,3826,2,3710.5,3710.5,1,2,5,125
13094.0,3349,17,2214.6600000000003,2214.66,4,5,4,454
13533.0,3510,3,270.79,270.79,3,2,2,322
13607.0,3368,3,1060.6099999999997,1060.61,4,2,3,423
13918.0,3377,2,1212.84,1212.84,4,2,4,424
13956.0,3333,5,1026.42,1026.42,4,3,3,433
13973.0,3615,1,264.7,264.7,2,1,2,212


In [0]:
# creating a segmentation map to help us categorize customers by recency and frequency scores
seg_map = {
    r'[1-2][1-2]': 'Hibernating',
    r'[1-2][3-4]': 'At Risk',
    r'[1-2]5': 'Can\'t Lose',
    r'3[1-2]': 'About to Sleep',
    r'33': 'Need Attention',
    r'[3-4][4-5]': 'Loyal Customers',
    r'41': 'Promising',
    r'51': 'New Customers',
    r'[4-5][2-3]': 'Potential Loyalists',
    r'5[4-5]': 'Champions'
}

# casting score values to string
rfm_table = rfm_df.withColumn('R_Quartile', rfm_df['R_Quartile'].cast("string"))
rfm_table = rfm_df.withColumn('M_Quartile', rfm_df['M_Quartile'].cast("string"))
rfm_table = rfm_df.withColumn('segment', concat(rfm_df['R_Quartile'], rfm_df['M_Quartile']))

# replacing scores with categories
for key in seg_map:
  rfm_table = rfm_table.withColumn('segment', regexp_replace('segment', key, seg_map[key]))

rfm_table = rfm_table.na.replace(seg_map)
rfm_result_df = rfm_table.select("Customer Id","RFM_Score","segment")
display(rfm_result_df.take(20))

Customer Id,RFM_Score,segment
12467.0,221,Hibernating
12493.0,322,About to Sleep
12671.0,114,At Risk
12737.0,125,Can't Lose
13094.0,454,Loyal Customers
13533.0,322,About to Sleep
13607.0,423,Potential Loyalists
13918.0,424,Loyal Customers
13956.0,433,Potential Loyalists
13973.0,212,Hibernating


In [0]:
# show final chart
rfm_chart = rfm_table.groupBy('segment').agg(mean('Recency'), mean('Frequency'), mean('Monetary'), count('Customer Id'))
display(rfm_chart)

segment,avg(Recency),avg(Frequency),avg(Monetary),count(Customer Id)
Promising,3354.7783783783784,1.701086956521739,158.56885869565218,184
At Risk,3703.814717477004,4.320630749014454,1180.8356898817342,761
About to Sleep,3437.961139896373,2.033678756476684,301.61707253886016,386
Hibernating,3792.261356155365,1.6504279131007242,187.84763001974983,1519
Potential Loyalists,3352.248633879781,3.5355191256830603,692.426475409836,732
Loyal Customers,3368.8055415617127,16.581360201511334,6983.152775818643,1985
Need Attention,3440.938181818182,4.218181818181818,865.815127272727,275
Can't Lose,3673.86,13.21,7122.214000000002,100
