## Load retail csv table

In [0]:
# File location and type
file_location = "/FileStore/tables/online_retail_II.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df.limit(10))

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


In [0]:
spark

In [0]:
# Create a view or table

rdf = "online_retail_II_csv"

df.createOrReplaceTempView(rdf)

In [0]:
from pyspark.sql.functions import col, avg, min, max

In [0]:
from pyspark.sql import functions as F

In [0]:
list(df.columns)

In [0]:
nInvoice = df.Invoice
stockcode = df.StockCode
description = df.Description
qty = df.Quantity
date = df.InvoiceDate
price = df.Price
#customer_id = df."Customer_ID"
country = df.Country

## Total Invoice Amount Distribution

<h5> Distribution (Invoice Amount) </h5>

In [0]:
amount_df = df.withColumn('amount', (qty * price))
amount_sum = amount_df.groupby(nInvoice).sum('amount').where('sum(amount) > 0')
amount_min = amount_sum.agg({'sum(amount)' : 'min'})
amount_max = amount_sum.agg({'sum(amount)' : 'max'})
amount_mean = amount_sum.agg({'sum(amount)' : 'mean'})
amount_median = amount_sum.approxQuantile('sum(amount)', [0.5], 0)[0]
amount_mode = amount_sum.groupby('sum(amount)').count().orderBy("count", ascending=False).select('sum(amount)').first()[0]

amount_distribution_invoice = amount_sum.select(F.max('sum(amount)').alias("Maximum"),\
                                               F.min('sum(amount)').alias("Minimum"),\
                                               F.mean('sum(amount)').alias("Mean"))\
.withColumn("Median", F.lit(amount_median))\
.withColumn("Mode", F.lit(amount_mode))

amount_distribution_invoice.show()

<h5> Distribution (Remove Outliers) </h5>

In [0]:
outlier_quantile = amount_sum.approxQuantile('sum(amount)', [0.85], 0)[0]
outlier_amount = amount_sum.where(F.col('sum(amount)') < outlier_quantile)
outlier_mode = outlier_amount.groupby('sum(amount)').count().orderBy("count", ascending=False).select('sum(amount)').first()[0]
outlier_median = outlier_amount.approxQuantile('sum(amount)', [0.5], 0)[0]

distribution_outlier_data = outlier_amount.select(F.max('sum(amount)').alias("Maximum"),\
                                               F.min('sum(amount)').alias("Minimum"),\
                                               F.mean('sum(amount)').alias("Mean"))\
.withColumn("Median", F.lit(outlier_median))\
.withColumn("Mode", F.lit(outlier_mode))

distribution_outlier_data.show()

## Monthly Placed and Canceled Orders

In [0]:
orders_df = amount_df.withColumn("Date", F.date_format(date, "YYYYMM"))
orders_total = orders_df.groupby(nInvoice, "Date").count().select(nInvoice, "Date", "count")
orders_placed = orders_total.groupby("Date").count().select("Date", "count").orderBy("Date", ascending=True)
orders_canceled = orders_total.where(nInvoice.contains("C")).groupby("Date").count().select("Date", "count").orderBy("Date", ascending=True)
orders_placed_canceled = orders_placed.select("Date", F.col("count").alias("Placement")).join(orders_canceled, "Date")

orders_displacement = orders_placed_canceled.select("Date", "Placement", F.col("count").alias("Cancellation"))\
.orderBy("Date", ascending=True)

orders_displacement.show()

## Monthly Sales

In [0]:
sales_df = orders_df.where('amount > 0').groupby("Date").sum('amount').orderBy("Date", ascending=True)

sales_monthly = sales_df.select("Date", F.col('sum(amount)').alias("Monthly Sales"))\
.withColumn("Sales(Millions)", F.col("Monthly Sales")/1000000)\
.drop("Monthly Sales")

sales_monthly.show()

## Monthly Sales Growth

In [0]:
from pyspark.sql.window import Window

growth_df = sales_df.withColumn("Growth %", F.col("sum(amount)")/F.sum("sum(amount)").over(Window.partitionBy()))\
.orderBy("Date", ascending=True)\
.drop("sum(amount)")

growth_df.show()

## Monthly Active Users

In [0]:
monthlyUsers_df = orders_df.select("Date", "Customer Id").distinct().groupBy("Date").count().orderBy("Date", ascending=True)
users_monthly = monthlyUsers_df.select("Date", F.col("count").alias("# of Active Users"))

users_monthly.show()

## New and Existing Users

In [0]:
user_earliest = orders_df.groupby("Customer Id").agg(F.min("Date").alias("SignupDate"))\
.groupby("SignupDate").count().sort("SignupDate").withColumnRenamed("count", "New Users")

user_list = users_monthly.join(user_earliest, user_earliest.SignupDate==users_monthly.Date)\
.withColumn("Existing Users", F.col("# of Active Users")-F.col("New Users"))\
.orderBy("Date", ascending=True)\
.drop("SignupDate")


user_list.show()

## Finding RFM

In [0]:
from datetime import datetime
today = datetime(2022,1,1)

orders_df = orders_df.withColumn('TotalPrice', F.round(orders_df.Quantity * orders_df.Price, 2 ))
orders_df = orders_df.withColumn('Duration', F.datediff(F.lit(today), 'InvoiceDate'))

recency = orders_df.groupBy('Customer ID').agg(F.min('Duration').alias('Recency'))
frequency = orders_df.groupBy('Customer ID', 'Invoice').count().groupBy('Customer ID').agg(F.count("*").alias("Frequency"))
monetary = orders_df.groupBy('Customer ID').agg(F.round(F.sum('TotalPrice'), 2).alias('Monetary'))

rfm = recency.join(frequency,'Customer ID', how = 'inner').join(monetary,'Customer ID', how = 'inner')

rfm.show()

## RFM Segmentation

In [0]:
def RScore(x):
    if  x <= 16:
        return 1
    elif x<= 50:
        return 2
    elif x<= 143:
        return 3
    else:
        return 4

def FScore(x):
    if  x <= 1:
        return 4
    elif x <= 3:
        return 3
    elif x <= 5:
        return 2
    else:
        return 1

def MScore(x):
    if  x <= 293:
        return 4
    elif x <= 648:
        return 3
    elif x <= 1611:
        return 2
    else:
        return 1

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = F.udf(lambda x: RScore(x), StringType())
F_udf = F.udf(lambda x: FScore(x), StringType())
M_udf = F.udf(lambda x: MScore(x), StringType())

rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))


rfm_seg = rfm_seg.withColumn('RFMScore', F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg'))).sort(F.col('RFMScore'))

rfm_seg.show()