In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import numpy as np
spark = SparkSession.builder \
.appName("Retail Data Analytics with PySpark") \
.config("congif.option", "value").getOrCreate()

## Load CSV File Into Notebook

In [0]:
# File location and type
file_location = "/FileStore/tables/retail.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df.limit(10))

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2009-12-01 07:45:00,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01 07:45:00,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01 07:45:00,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01 07:45:00,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01 07:46:00,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01 07:46:00,3.75,13085,United Kingdom


## Total Invoice Amount Distribution

In [0]:
invoice_pos = df.select((df.quantity * df.unit_price).alias("cost"), df.invoice_no).groupBy(df.invoice_no).sum().filter(col("sum(cost)") > 0).orderBy('invoice_no', ascending=True)
display(invoice_pos.limit(10))

invoice_no,sum(cost)
489434,505.30000000000007
489435,145.79999999999998
489436,630.33
489437,310.75
489438,2286.24
489439,426.3
489440,50.4
489441,344.34000000000003
489442,382.37000000000006
489443,285.06


## Monthly Placed and Cancelled Orders

In [0]:
#organize by year and month numerically
df2 = df.withColumn('yyyymm', (df.invoice_date.substr(0,4) * 100 + df.invoice_date.substr(6,2)).cast('int'))

#total orders
monthly_orders = df2.select(df2.yyyymm, df2.invoice_no).distinct().groupBy(df2.yyyymm).count().withColumnRenamed("count", "total_orders")

#cancelled orders
monthly_cancelled_orders = df2.select(df2.yyyymm, df2.invoice_no).filter(df2.invoice_no.startswith('C')).distinct().groupBy(df2.yyyymm).count().withColumnRenamed("count", "cancelled_orders")

#full join both dataframes
monthly_orders = monthly_orders.join(monthly_cancelled_orders, 'yyyymm', how='full').orderBy('yyyymm', ascending=True)

#get monthly placed orders: monthly_orders - 2*monthly_cancelled_orders
monthly_orders = monthly_orders.withColumn('placed_orders', monthly_orders.total_orders - 2 * monthly_orders.cancelled_orders)
display(monthly_orders.limit(10))

yyyymm,total_orders,cancelled_orders,placed_orders
200912,2330,401,1528
201001,1633,300,1033
201002,1969,240,1489
201003,2367,407,1553
201004,1892,304,1284
201005,2418,407,1604
201006,2216,357,1502
201007,2017,344,1329
201008,1877,273,1331
201009,2375,371,1633


## Monthly Sales

In [0]:
monthly_sales = df2.select(df2.yyyymm, (df2.quantity * df2.unit_price).alias("cost")).groupBy(df2.yyyymm).sum("cost").orderBy('yyyymm', ascending=True).withColumnRenamed("sum(cost)", "sales")
display(monthly_sales.limit(10))

yyyymm,sales
200912,799847.1100000143
201001,624032.8919999955
201002,533091.4260000042
201003,765848.7609999765
201004,590580.4319999823
201005,615322.8300000005
201006,679786.6099999842
201007,575236.3600000095
201008,656776.3399999854
201009,853650.4309999745


## Monthly Sales Growth

In [0]:
#get new column with sales column shifted down by 1
monthly_growth = monthly_sales.withColumn("prev_sales", lag(monthly_sales.sales).over(Window.partitionBy().orderBy("yyyymm")))

#calculate monthly sales growth with previous month: (sales - prev_sales) / prev_sales
monthly_growth = monthly_growth.withColumn('growth', when(isnull(monthly_growth.sales - monthly_growth.prev_sales), 0).otherwise((monthly_growth.sales - monthly_growth.prev_sales) / monthly_growth.prev_sales))
display(monthly_growth.limit(10))

yyyymm,sales,prev_sales,growth
200912,799847.1100000143,,0.0
201001,624032.8919999955,799847.1100000143,-0.2198097808967712
201002,533091.4260000042,624032.8919999955,-0.1457318470962776
201003,765848.7609999765,533091.4260000042,0.4366180426994347
201004,590580.4319999823,765848.7609999765,-0.2288550141037566
201005,615322.8300000005,590580.4319999823,0.0418950521544184
201006,679786.6099999842,615322.8300000005,0.1047641609526881
201007,575236.3600000095,679786.6099999842,-0.1537986310144841
201008,656776.3399999854,575236.3600000095,0.1417503928297831
201009,853650.4309999745,656776.3399999854,0.2997581962224666


## Monthly Active Users

In [0]:
monthly_users = df2.select(df2.yyyymm, df2.customer_id).distinct().groupBy(df2.yyyymm).count().orderBy('yyyymm', ascending=True).withColumnRenamed("count", "active_users")
display(monthly_users.limit(10))

yyyymm,active_users
200912,1046
201001,787
201002,808
201003,1112
201004,999
201005,1063
201006,1096
201007,989
201008,965
201009,1203


## New and Existing Users

In [0]:
#first customer purchase date count
first_user = df2.select(df2.customer_id, df2.yyyymm).groupBy(df2.customer_id).min().withColumnRenamed("min(yyyymm)", "yyyymm").groupBy(col("yyyymm")).count().withColumnRenamed("count", "new_users")

#full join both dataframes
monthly_users = monthly_users.join(first_user, 'yyyymm', how='full').orderBy('yyyymm', ascending=True)
monthly_users = monthly_users.withColumn('existing_users', monthly_users.active_users - monthly_users.new_users)
display(monthly_users.limit(10))

yyyymm,active_users,new_users,existing_users
200912,1046,1046,0
201001,787,394,393
201002,808,363,445
201003,1112,436,676
201004,999,291,708
201005,1063,254,809
201006,1096,269,827
201007,989,183,806
201008,965,158,807
201009,1203,242,961


## Finding RFM & RFM Segmentation

In [0]:
#recency: customer's most recent purchase date
customer_recency = df2.select(df2.customer_id, df2.yyyymm).groupBy(df2.customer_id).max().withColumnRenamed("max(yyyymm)", "recency")

#frequency: customer's total number of purchases
customer_frequency = df2.select(df2.customer_id, df2.yyyymm).groupBy(df2.customer_id).count().withColumnRenamed("count", "frequency")

#monetary: customer's total cost of purchases
customer_monetary = df2.select(df2.customer_id, (df2.quantity * df2.unit_price).alias("cost")).groupBy(df2.customer_id).sum().withColumnRenamed("sum(cost)", "monetary")

rfm = customer_recency.join(customer_frequency, 'customer_id', how='inner')
rfm = rfm.join(customer_monetary, 'customer_id', how='inner').orderBy('customer_id', ascending=True)
display(rfm.limit(10))

customer_id,recency,frequency,monetary
12346,201101,48,-64.67999999999981
12347,201112,253,5633.320000000001
12348,201109,51,2019.4
12349,201111,180,4404.54
12350,201102,17,334.40000000000003
12351,201011,21,300.93
12352,201111,113,1889.21
12353,201105,24,406.75999999999993
12354,201104,58,1079.4
12355,201105,35,947.61
