## Load CSV into Dataframe

In [0]:
from pyspark.sql.types import *
# File location and type
file_location = "/FileStore/tables/retail_data_demo.csv"
file_type = "csv"


# CSV options
infer_schema = "true"
first_row_is_header = "false"
delimiter = ","
schema = StructType(
  [
    StructField("invoice_no", StringType(), False),
    StructField("stockcode", StringType(), False),
    StructField("description", StringType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("invoice_date", DateType(), False),
    StructField("unit_price", DoubleType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("country", StringType(), False),
  ]
)

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(schema) \
  .load(file_location)
df.count()

## Total Invoice Amount Distribution

In [0]:

from pyspark.sql.functions import col,min,max,mean,lit
from pyspark.sql.types import StructType




##calcaulate the Invoice
df_calculate = df.withColumn("Invoice", col("quantity")* col("unit_price") )
##group by the invoice_no and sum together
df_invoice = df_calculate.filter(df_calculate["Invoice"] > 0).groupBy("invoice_no").sum()
## get the row we want
df_sum = df_invoice.select("sum(Invoice)")
##rename the column to Invoice
df_sum_invoice = df_sum.withColumnRenamed("sum(Invoice)", "Invoice")
##calcaulate the mode
df_mode = df_sum_invoice.groupBy("Invoice").count().orderBy("count", ascending=False)
df_mode = df_mode.select("Invoice").limit(1).withColumnRenamed("Invoice","mode")
df_mode_new = df_mode.withColumn("new",lit(1))
##calculate the mean,max, and min.
df_mean = df_sum_invoice.select(mean(col('Invoice')).alias("mean"))
df_mean_new = df_mean.withColumn("new",lit(1))

df_max = df_sum_invoice.select(max(col('Invoice')).alias("max"))
df_max_new = df_max.withColumn("new",lit(1))

df_min = df_sum_invoice.select(min(col('Invoice')).alias("min"))
df_min_new = df_min.withColumn("new",lit(1))


df_median = spark.createDataFrame(df_sum_invoice.approxQuantile("Invoice", [0.5], 0), FloatType())
df_median_one = df_median.withColumnRenamed("value","median")
df_median_new = df_median_one.withColumn("new",lit(1))

##join those columns and select we want to display
df_result_one = df_mean_new.crossJoin(df_min_new)
df_result_two = df_result_one.crossJoin(df_max_new)
df_result_three =  df_result_two.crossJoin(df_median_new)
df_result_four = df_result_three.crossJoin(df_mode_new)
df_final = df_result_four.select("min","max","mode","median","mean")
display(df_final)




min,max,mode,median,mean
0.19,168469.6,15.0,304.31,523.303760866311


## Total Invoice Amount Distribution for first 85 quantiles

In [0]:
from pyspark.sql.functions import col,min,max,mean,lit
from pyspark.sql.types import StructType




##calcaulate the Invoice
df_calculate = df.withColumn("Invoice", col("quantity")* col("unit_price") )
##group by the invoice_no and sum together
df_invoice = df_calculate.filter(df_calculate["Invoice"] > 0).groupBy("invoice_no").sum()
## get the row we want
df_sum = df_invoice.select("sum(Invoice)")
##rename the column to Invoice
df_sum_invoice = df_sum.withColumnRenamed("sum(Invoice)", "Invoice")
df_sum_invoice_first85 = df_sum_invoice.filter(df_sum_invoice.Invoice < df_sum_invoice.approxQuantile("Invoice", [0.85], 0)[0])
##calcaulate the mode
df_mode = df_sum_invoice_first85.groupBy("Invoice").count().orderBy("count", ascending=False)
df_mode = df_mode.select("Invoice").limit(1).withColumnRenamed("Invoice","mode")
df_mode_new = df_mode.withColumn("new",lit(1))
##calculate the mean,max, and min.
df_mean = df_sum_invoice_first85.select(mean(col('Invoice')).alias("mean"))
df_mean_new = df_mean.withColumn("new",lit(1))

df_max = df_sum_invoice_first85.select(max(col('Invoice')).alias("max"))
df_max_new = df_max.withColumn("new",lit(1))

df_min = df_sum_invoice_first85.select(min(col('Invoice')).alias("min"))
df_min_new = df_min.withColumn("new",lit(1))


df_median = spark.createDataFrame(df_sum_invoice_first85.approxQuantile("Invoice", [0.5], 0), FloatType())
df_median_one = df_median.withColumnRenamed("value","median")
df_median_new = df_median_one.withColumn("new",lit(1))

##join those columns and select we want to display
df_result_one = df_mean_new.crossJoin(df_min_new)
df_result_two = df_result_one.crossJoin(df_max_new)
df_result_three =  df_result_two.crossJoin(df_median_new)
df_result_four = df_result_three.crossJoin(df_mode_new)
df_final = df_result_four.select("min","max","mode","median","mean")
display(df_final)




## Monthly Placed and Canceled Orders

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct
import pyspark.sql.functions as func

##parese the date into a new integer column
df_new_date = df.withColumn("date",year('invoice_date') * 100 + month('invoice_date'))

## get the canceled orders
cancel_orders = df_new_date.filter(col("invoice_no").contains("C"))

##sum the canceled orders by month
monthly_cancel_orders = cancel_orders.groupBy("date").agg(countDistinct("invoice_no")).orderBy("date",asending = False)

## sum the total orders by the month
total_orders = df_new_date.groupBy("date").agg(countDistinct("invoice_no")).orderBy("date",asending = False)

## rename the columns
total_orders_new = total_orders.withColumnRenamed("count(DISTINCT invoice_no)","total_orders")
monthly_cancel_orders_new =  monthly_cancel_orders.withColumnRenamed("count(DISTINCT invoice_no)","cancel_orders")

##join them together
new_df = monthly_cancel_orders_new.join(total_orders_new,"date","inner")

##calculate the placed order for each month
total_order_each_month = new_df.withColumn("placedorders",col("total_orders") - 2* col("cancel_orders")).orderBy("date",asending = False)

##display the result
display(total_order_each_month)

date,cancel_orders,total_orders,placedorders
200912,401,2330,1528
201001,300,1633,1033
201002,240,1969,1489
201003,407,2367,1553
201004,304,1892,1284
201005,407,2418,1604
201006,357,2216,1502
201007,344,2017,1329
201008,273,1877,1331
201009,371,2375,1633


## Monthly Sales

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct
import pyspark.sql.functions as func
df_calculate = df.withColumn("Invoice", col("quantity")* col("unit_price") )
##group by the invoice_no and sum together
df_invoice = df_calculate.filter(df_calculate["quantity"] > 0)
## get the row we want
df_new_date = df_invoice.withColumn("date",year('invoice_date') * 100 + month('invoice_date'))
df_monthly_sales = df_new_date.groupBy("date").sum("Invoice").orderBy("date",asending = False)
answer = df_monthly_sales.withColumnRenamed("sum(Invoice)","monthly_sales")
display(answer)

date,monthly_sales
200912,825685.7600000115
201001,652708.5019999943
201002,553713.3060000008
201003,833570.1309999689
201004,627934.5919999797
201005,659858.8599999907
201006,752270.1499999794
201007,606681.1400000101
201008,697274.9099999849
201009,924333.0109999602


## Monthly Sales Growth

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct,lag
import pyspark.sql.functions as func
from pyspark.sql.window import Window

## calculate the invoice
df_calculate = df.withColumn("Invoice", col("quantity")* col("unit_price") )

##filter the quantity which is less than zero
df_invoice = df_calculate.filter(df_calculate["quantity"] > 0)

## parese the date change it into integer
df_new_date = df_invoice.withColumn("date",year('invoice_date') * 100 + month('invoice_date'))

## groupby the month and sum the invoice
df_monthly_sales = df_new_date.groupBy("date").sum("Invoice").orderBy("date",asending = False)

##rename the column
answer = df_monthly_sales.withColumnRenamed("sum(Invoice)","monthly_sales")

##define a window range
win = Window.orderBy('date')

##calculate the monthly rate changes
answer_df = answer.withColumn("sales_change (%)", (answer.monthly_sales - lag(answer.monthly_sales).over(win))/ lag(answer.monthly_sales).over(win) * 100).orderBy("date",asending = False)

## display the result
display(answer_df)


date,monthly_sales,sales_change (%)
200912,825685.7600000115,
201001,652708.5019999943,-20.94952660925324
201002,553713.3060000008,-15.166831088710769
201003,833570.1309999689,50.541827687263066
201004,627934.5919999797,-24.669254733648422
201005,659858.8599999907,5.084011679995489
201006,752270.1499999794,14.00470549110912
201007,606681.1400000101,-19.353288177122707
201008,697274.9099999849,14.93268275983876
201009,924333.0109999602,32.563641362053346


## Monthly Active Users

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct,lag
import pyspark.sql.functions as func
from pyspark.sql.window import Window

## parese the month
df_new_date = df.withColumn("date",year('invoice_date') * 100 + month('invoice_date'))

## get the monthly active user and sort by ascending orders
df_active_users = df_new_date.groupBy("date").agg(countDistinct("customer_id").alias("active_user")).orderBy("date",ascending = True)

##show
display(df_active_users)


date,active_user
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


## New and Existing Users

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct,lag
import pyspark.sql.functions as func
from pyspark.sql.window import Window

## parese the month
df_new_date = df.withColumn("date",year('invoice_date') * 100 + month('invoice_date'))

## get the monthly active user and sort by ascending orders
df_active_users = df_new_date.groupBy("date").agg(countDistinct("customer_id").alias("active_user")).orderBy("date",ascending = True)

## get each users's earlist perchase date
df_earlist = df_new_date.groupBy("customer_id").agg(min("date").alias("earlist"))

##join the dataframe back to the original one to merge all the data
df_earlist_users = df_earlist.join(df_new_date,"customer_id", "inner")

##group by the earlist date to get each month's new users
df_new_users = df_earlist_users.groupBy("earlist").agg(countDistinct("customer_id").alias("new_users")).orderBy("earlist")
df_new_users_rename = df_new_users.withColumnRenamed("earlist","date")

## join the new users dataframe with the active users and subtract the active one with the new one to get the existing one.
df_total = df_new_users_rename.join(df_active_users, "date", "inner").orderBy("date")
df_answer = df_total.withColumn("existing_user",df_total.active_user - df_total.new_users)

display(df_answer)



date,new_users,active_user,existing_user
200912,1045,1045,0
201001,394,786,392
201002,363,807,444
201003,436,1111,675
201004,291,998,707
201005,254,1062,808
201006,269,1095,826
201007,183,988,805
201008,158,964,806
201009,242,1202,960


## Finding RFM

In [0]:
from pyspark.sql.functions import year,month,col,countDistinct,lag,datediff,max
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from datetime import datetime
today = datetime(2012,1,1)

## get the invoice calculated
invoice_total = df.withColumn("Invoice", col("quantity")* col("unit_price") )

## calculate the Monetary for each customer
Monetary = invoice_total.groupBy("customer_id").sum("Invoice")
Monetary = Monetary.withColumnRenamed("sum(Invoice)","Monetary").orderBy("customer_id")

## calculate the Recency for each customer
Recency = invoice_total.groupBy("customer_id").agg(datediff(lit(today),max("invoice_date")).alias("Recency")).orderBy("customer_id")

## calculate the Frequency for each customer
Frequency = invoice_total.groupBy("customer_id").agg(countDistinct("invoice_no").alias("Frequency")).orderBy("customer_id")

## merge the result together
Merge = Monetary.join(Recency,"customer_id","inner")
Merge = Merge.join(Frequency,"customer_id","inner").orderBy("customer_id")
display(Merge)

customer_id,Monetary,Recency,Frequency
12346,-64.67999999999981,348,17
12347,5633.320000000001,25,8
12348,2019.4,98,5
12349,4404.54,41,5
12350,334.40000000000003,333,1
12351,300.93,398,1
12352,1889.21,59,13
12353,406.75999999999993,227,2
12354,1079.4,255,1
12355,947.61,237,2


## RFM Segmentation

In [0]:
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from datetime import datetime

## define the quantile range
recency_q = Merge.approxQuantile('Recency', [0.2,0.4,0.6,0.8], 0)
frequency_q = Merge.approxQuantile('Frequency', [0.2,0.4,0.6,0.8], 0)
monetary = Merge.approxQuantile('Monetary', [0.2,0.4,0.6,0.8], 0)

## replace the value in each of the range
Merge = Merge.withColumn("R_Quartile", \
                                 when(col("Recency") >= recency_q[2] , 1).\
                                 when(col("Recency") >= recency_q[1] , 2).\
                                 when(col("Recency") >= recency_q[0] , 3).\
                                 otherwise(4))

Merge = Merge.withColumn("F_Quartile", \
                                 when(col("Frequency") > frequency_q[2] , 4).\
                                 when(col("Frequency") > frequency_q[1] , 3).\
                                 when(col("Frequency") > frequency_q[0] , 2).\
                                 otherwise(1))

Merge = Merge.withColumn("M_Quartile", \
                                 when(col("Monetary") >= monetary[2] , 4).\
                                 when(col("Monetary") >= monetary[1] , 3).\
                                 when(col("Monetary") >= monetary[0] , 2).\
                                 otherwise(1))

rfm_table = Merge.withColumn("RFM_Score", concat(col("R_Quartile"), col("F_Quartile"), col("M_Quartile")))
display(rfm_table)


customer_id,Monetary,Recency,Frequency,R_Quartile,F_Quartile,M_Quartile,RFM_Score
12346,-64.67999999999981,348,17,1,4,1,141
12347,5633.320000000001,25,8,4,4,4,444
12348,2019.4,98,5,2,3,4,234
12349,4404.54,41,5,3,3,4,334
12350,334.40000000000003,333,1,1,1,2,112
12351,300.93,398,1,1,1,2,112
12352,1889.21,59,13,3,4,4,344
12353,406.75999999999993,227,2,1,2,2,122
12354,1079.4,255,1,1,1,3,113
12355,947.61,237,2,1,2,3,123


In [0]:
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from itertools import chain
from pyspark.sql.window import Window
from datetime import datetime

## define the segment map
seg_map = {
    r'[1-2][1-2]': 'Hibernating',
    r'[1-2][3-4]': 'At Risk',
    r'[1-2]5': 'Can\'t Lose',
    r'3[1-2]': 'About to Sleep',
    r'33': 'Need Attention',
    r'[3-4][4-5]': 'Loyal Customers',
    r'41': 'Promising',
    r'51': 'New Customers',
    r'[4-5][2-3]': 'Potential Loyalists',
    r'5[4-5]': 'Champions'
}

##concat the recency and frequency value together
rfm_table= rfm_table.withColumn("segment", concat(rfm_table['R_Quartile'].cast(StringType()) ,rfm_table['F_Quartile'].cast(StringType())))

##iterate the key set and replace the segment value with the value in dictionary
for number in seg_map.keys():
  rfm_table = rfm_table.withColumn('segment', regexp_replace('segment', number, seg_map[number]))
display(rfm_table)

customer_id,Monetary,Recency,Frequency,R_Quartile,F_Quartile,M_Quartile,RFM_Score,segment
12346,-64.67999999999981,348,17,1,4,1,141,At Risk
12347,5633.320000000001,25,8,4,4,4,444,Loyal Customers
12348,2019.4,98,5,2,3,4,234,At Risk
12349,4404.54,41,5,3,3,4,334,Need Attention
12350,334.40000000000003,333,1,1,1,2,112,Hibernating
12351,300.93,398,1,1,1,2,112,Hibernating
12352,1889.21,59,13,3,4,4,344,Loyal Customers
12353,406.75999999999993,227,2,1,2,2,122,Hibernating
12354,1079.4,255,1,1,1,3,113,Hibernating
12355,947.61,237,2,1,2,3,123,Hibernating
