## Load Data from Table

In [0]:
%sql
USE andres_osoriogr_outlook_com_db

In [0]:
retail_df = spark.read.table("retail_table")
retail_df.printSchema()

In [0]:
print("Partitions: " + str(retail_df.rdd.getNumPartitions()))

In [0]:
display(retail_df.select("*").limit(10))

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085,United Kingdom


In [0]:
%sql
SELECT * FROM retail_table LIMIT 10

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085,United Kingdom


## Total Invoice Amount Distribution
- Each order (invoice_no) has multiple items
- Calculate the sum of (quantity * unit_price) for each order
- Some amounts are too high and skew the distribution; therefore, remove outliers

#### Get the amount for each row, get only relevant columns, and remove rows with negative amounts

In [0]:
from pyspark.sql.functions import *

invoice_amount_df = retail_df.withColumn("amount", expr("quantity * unit_price"))
invoice_amount_df = invoice_amount_df.select("invoice_no", "amount")
invoice_amount_df = invoice_amount_df.where("amount > 0")

#### Sum up all amounts for each invoice_no

In [0]:
total_amounts_df = invoice_amount_df.groupBy("invoice_no").sum("amount")
total_amounts_df = total_amounts_df.withColumnRenamed("sum(amount)", "total_amount")

#### Distribution with outliers (a few very big amounts)

In [0]:
display(total_amounts_df.describe("total_amount"))

summary,total_amount
count,40078.0
mean,523.3037611158244
stddev,1517.351645669805
min,0.19
max,168469.6


In [0]:
display(total_amounts_df.orderBy("invoice_no"))

invoice_no,total_amount
489434,505.30000000000007
489435,145.79999999999998
489436,630.33
489437,310.75
489438,2286.24
489439,426.3
489440,50.4
489441,344.34000000000003
489442,382.37000000000006
489443,285.06


#### Distribution without outliers (only consider the first ~60% quantile of original data)

In [0]:
quantile = total_amounts_df.approxQuantile("total_amount", [0.6], 0.1)
cut = quantile[0]
print(cut)

In [0]:
no_outliers_df = total_amounts_df.filter(col("total_amount") < cut)
display(no_outliers_df.describe("total_amount"))

summary,total_amount
count,27980.0
mean,209.93644270907768
stddev,120.53792537422213
min,0.19
max,436.0299999999999


In [0]:
display(no_outliers_df.orderBy("invoice_no"))

invoice_no,total_amount
489435,145.79999999999998
489437,310.75
489439,426.3
489440,50.4
489441,344.34000000000003
489442,382.37000000000006
489443,285.06
489444,141.0
489445,308.44
489447,130.0


## Monthly Placed and Cancelled Orders
- The column "invoice_no" denotes the order numbers
- Each order has multiple items (we only care about one appearance of each for this query)
- Orders that start with a 'C' have been cancelled
- Calculate the number of placed VS cancelled orders for each month

#### Create YYYYMM column, get relevant columns only, and remove duplicate invoice_no

In [0]:
orders_df = retail_df.select("*", (year("invoice_date") * 100 + month("invoice_date")).alias("YYYYMM"))
orders_df = orders_df.select("YYYYMM", "invoice_no")
orders_df = orders_df.dropDuplicates(["invoice_no"])
display(orders_df)

YYYYMM,invoice_no
200912,489677
200912,C491017
200912,491045
200912,491658
200912,C491705
200912,C492541
200912,C493168
201001,493542
201001,493977
201001,C493984


#### Add column with cancelled orders (orders starting with 'C')

In [0]:
placed_cancelled_df = orders_df.withColumn("cancelled", expr("invoice_no LIKE 'C%'"))
display(placed_cancelled_df)

YYYYMM,invoice_no,cancelled
200912,489677,False
200912,C491017,True
200912,491045,False
200912,491658,False
200912,C491705,True
200912,C492541,True
200912,C493168,True
201001,493542,False
201001,493977,False
201001,C493984,True


#### Count the number of placed orders (cancelled = false) for each month

In [0]:
placed_count_df = placed_cancelled_df.filter("cancelled = false")
placed_count_df = placed_count_df.select("YYYYMM", "cancelled")
placed_count_df = placed_count_df.groupBy("YYYYMM").count()
placed_count_df = placed_count_df.withColumnRenamed("count", "placed")
display(placed_count_df)

YYYYMM,placed
201108,1459
201011,3093
201101,1216
201004,1588
201003,1960
201103,1665
201112,869
201012,1699
201001,1333
201005,2011


#### Count the number of cancelled orders (cancelled = true) for each month

In [0]:
cancelled_count_df = placed_cancelled_df.filter("cancelled = true")
cancelled_count_df = cancelled_count_df.select("YYYYMM", "cancelled")
cancelled_count_df = cancelled_count_df.groupBy("YYYYMM").count()
cancelled_count_df = cancelled_count_df.withColumnRenamed("count", "cancelled")
display(cancelled_count_df)

YYYYMM,cancelled
201108,278
201011,576
201101,260
201004,304
201003,407
201103,318
201112,146
201012,326
201001,300
201005,407


#### Merge placed and cancelled order counts for each month

In [0]:
placed_cancelled_count_df = placed_count_df.join(cancelled_count_df, "YYYYMM", "left_outer")
display(placed_cancelled_count_df.orderBy("YYYYMM"))

YYYYMM,placed,cancelled
200912,1929,401
201001,1333,300
201002,1729,240
201003,1960,407
201004,1588,304
201005,2011,407
201006,1859,357
201007,1673,344
201008,1604,273
201009,2004,371


In [0]:
display(placed_cancelled_count_df.orderBy("YYYYMM"))

YYYYMM,placed,cancelled
200912,1929,401
201001,1333,300
201002,1729,240
201003,1960,407
201004,1588,304
201005,2011,407
201006,1859,357
201007,1673,344
201008,1604,273
201009,2004,371


## Monthly Sales
- A sale is a placed order, therefore we can reuse the dataframe from the last query (placed_count_df)
- Find the maximum number of placed orders and use it to normalize the data on a scale of 0 to 1

In [0]:
display(placed_count_df.describe("placed"))

summary,placed
count,25.0
mean,1813.44
stddev,509.0469919368938
min,869.0
max,3093.0


In [0]:
sales_df = placed_count_df.select("YYYYMM", (col("placed") / 3093).alias("scaled_sales"))
display(sales_df.orderBy("YYYYMM"))

YYYYMM,scaled_sales
200912,0.623666343355965
201001,0.4309731652117685
201002,0.5590042030391206
201003,0.6336889751050759
201004,0.5134173941157453
201005,0.6501778208858713
201006,0.6010345942450696
201007,0.5408988037504041
201008,0.5185903653410928
201009,0.6479146459747818


In [0]:
display(sales_df.orderBy("YYYYMM"))

YYYYMM,scaled_sales
200912,0.623666343355965
201001,0.4309731652117685
201002,0.5590042030391206
201003,0.6336889751050759
201004,0.5134173941157453
201005,0.6501778208858713
201006,0.6010345942450696
201007,0.5408988037504041
201008,0.5185903653410928
201009,0.6479146459747818


## Monthly Sales Growth
- Calculate the percetange of sales growth for each month
- We can reuse the placed_count_df from before since it has the number of placed orders (sales) for each month
- There's no percetange change function in PySpark; use the lag function to create a new column with the previous value of the number of placed orders and calculate the percetange change this way

In [0]:
from pyspark.sql.window import Window

growth_df = placed_count_df
g_window = Window.partitionBy().orderBy("YYYYMM")
growth_df = growth_df.withColumn("prev_value", lag(growth_df.placed).over(g_window))
growth_df = growth_df.withColumn("growth%", when(isnull(growth_df.placed - growth_df.prev_value), 0)
                              .otherwise((growth_df.placed - growth_df.prev_value) / growth_df.prev_value))
display(growth_df)

YYYYMM,placed,prev_value,growth%
200912,1929,,0.0
201001,1333,1929.0,-0.3089683773976153
201002,1729,1333.0,0.2970742685671418
201003,1960,1729.0,0.1336032388663967
201004,1588,1960.0,-0.1897959183673469
201005,2011,1588.0,0.2663727959697733
201006,1859,2011.0,-0.0755842864246643
201007,1673,1859.0,-0.1000537923614846
201008,1604,1673.0,-0.0412432755528989
201009,2004,1604.0,0.2493765586034912


In [0]:
display(growth_df)

YYYYMM,placed,prev_value,growth%
200912,1929,,0.0
201001,1333,1929.0,-0.3089683773976153
201002,1729,1333.0,0.2970742685671418
201003,1960,1729.0,0.1336032388663967
201004,1588,1960.0,-0.1897959183673469
201005,2011,1588.0,0.2663727959697733
201006,1859,2011.0,-0.0755842864246643
201007,1673,1859.0,-0.1000537923614846
201008,1604,1673.0,-0.0412432755528989
201009,2004,1604.0,0.2493765586034912


## Monthly Active Users
- Calculate the number of distinct customer_ids for each month

#### Create 'YYYYMM' column, select relevant columns only, and do the aggregation operation

In [0]:
customers_df = retail_df.select("*", (year("invoice_date") * 100 + month("invoice_date")).alias("YYYYMM"))
customers_df = customers_df.select("YYYYMM", "customer_id")
customers_count_df = customers_df.groupBy("YYYYMM").agg(countDistinct("customer_id").alias("active_users"))
display(customers_count_df.orderBy("YYYYMM"))

YYYYMM,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


In [0]:
display(customers_count_df.orderBy("YYYYMM"))

YYYYMM,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


## New VS Exisiting Users per Month
- New users are customer_ids that appear for the first time in the whole dataset
- Existing users are customer_ids that have already appeared in previous months 
- We can reuse customers_df from the previous query since it contains all the customer_ids for each month

In [0]:
display(customers_df.orderBy("YYYYMM"))

YYYYMM,customer_id
200912,17804.0
200912,15998.0
200912,17804.0
200912,17592.0
200912,15998.0
200912,16393.0
200912,17804.0
200912,12636.0
200912,17592.0
200912,16329.0


#### New users: dropping duplicate customer_ids and only keeping the first copy gives us the new users for each month

In [0]:
new_users_df = customers_df.dropDuplicates(["customer_id"])

#### Old users: substracting new_users_df from the original customers_df gives us the existing users for each month

In [0]:
old_users_df = customers_df.subtract(new_users_df)

#### Count the number of new/existing users for each month

In [0]:
new_users_count_df = new_users_df.groupBy("YYYYMM").count()
new_users_count_df = new_users_count_df.withColumnRenamed("count", "new_users")
display(new_users_count_df.orderBy("YYYYMM"))

YYYYMM,new_users
200912,1046
201001,394
201002,363
201003,436
201004,291
201005,254
201006,269
201007,183
201008,158
201009,242


In [0]:
old_users_count_df = old_users_df.groupBy("YYYYMM").count()
old_users_count_df = old_users_count_df.withColumnRenamed("count", "existing_users")
display(old_users_count_df.orderBy("YYYYMM"))

YYYYMM,existing_users
201001,393
201002,445
201003,676
201004,708
201005,809
201006,827
201007,806
201008,807
201009,961
201010,1199


#### Merge new and existing users count for each month

In [0]:
new_old_users_count_df = new_users_count_df.join(old_users_count_df, "YYYYMM", "left_outer")
display(new_old_users_count_df.orderBy("YYYYMM"))

YYYYMM,new_users,existing_users
200912,1046,
201001,394,393.0
201002,363,445.0
201003,436,676.0
201004,291,708.0
201005,254,809.0
201006,269,827.0
201007,183,806.0
201008,158,807.0
201009,242,961.0


In [0]:
display(new_old_users_count_df.orderBy("YYYYMM"))

YYYYMM,new_users,existing_users
200912,1046,
201001,394,393.0
201002,363,445.0
201003,436,676.0
201004,291,708.0
201005,254,809.0
201006,269,827.0
201007,183,806.0
201008,158,807.0
201009,242,961.0


## RFM Segmentation
RFM Segmentation is a segmentation technique that helps determine marketing and sales strategies based on buying habits of customers.
- RFM = Recency, Frequency, Monetary
- Recency: Time since customer's last purchase
- Frequency: Total number of purchases
- Monetary: Total spending by the customer
- Segment customers into meaningful groups depending on their values for Recency, Frequency, and Monetary
- This segmentation is used to gain insights into customer value and develop targeted marketting strategies for important segments

In [0]:
display(retail_df)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


#### Change the invoice_date type to date and make a new column with a literal date of 'January 1, 2012' to facilitate the Recency measure; 
#### Make a column with amount = (unit_price * quantity) to faciliate Monetary measure

In [0]:
rfm_tmp = retail_df.select("invoice_no", "quantity", "unit_price", "customer_id", to_date(retail_df.invoice_date).alias("invoice_date"), to_date(lit("2012-01-01")).alias("today"), 
                           expr("ROUND(unit_price * quantity) AS total_price"))
display(rfm_tmp)

invoice_no,quantity,unit_price,customer_id,invoice_date,today,total_price
489434,12,6.95,13085.0,2009-12-01,2012-01-01,83.0
489434,12,6.75,13085.0,2009-12-01,2012-01-01,81.0
489434,12,6.75,13085.0,2009-12-01,2012-01-01,81.0
489434,48,2.1,13085.0,2009-12-01,2012-01-01,101.0
489434,24,1.25,13085.0,2009-12-01,2012-01-01,30.0
489434,24,1.65,13085.0,2009-12-01,2012-01-01,40.0
489434,24,1.25,13085.0,2009-12-01,2012-01-01,30.0
489434,10,5.95,13085.0,2009-12-01,2012-01-01,60.0
489435,12,2.55,13085.0,2009-12-01,2012-01-01,31.0
489435,12,3.75,13085.0,2009-12-01,2012-01-01,45.0


#### Remove entries with negavtive values and drop missing values to faciliate evaluation

In [0]:
rfm_tmp = rfm_tmp.filter(rfm_tmp.quantity > 0).filter(rfm_tmp.unit_price > 0).filter(rfm_tmp.total_price > 0)
rfm_tmp = rfm_tmp.na.drop()
rfm_tmp.count()

#### Find the Recency and Monetary values

In [0]:
df_tmp = rfm_tmp.groupBy("customer_id").agg(sum("total_price").alias("sum_price"), max("invoice_date").alias("max_date"), max("today").alias("today"))
df_x = df_tmp.select("customer_id", "sum_price", datediff(col("today"), col("max_date")).alias("recency"))
display(df_x)

customer_id,sum_price,recency
13623,2596.0,53
17679,3579.0,75
17389,57252.0,23
18051,2283.0,657
13289,310.0,746
17753,392.0,487
15727,9535.0,39
16574,1298.0,94
14832,323.0,653
15447,496.0,353


#### Find the Frequency value

In [0]:
df_y = rfm_tmp.groupBy("customer_id", "invoice_no").agg(sum("total_price").alias("sum_price2"))
df_z = df_y.groupBy("customer_id").agg(count("sum_price2").alias("sum_price2"))
display(df_z)

customer_id,sum_price2
16574,3
15727,15
17389,61
15447,5
15619,1
18051,7
14450,7
15846,1
15790,1
13285,6


#### Create the RFM table
The table can be read as: the customer with the customer_id 12346 has spent a total of £77,551; the last purchase he made was about 1 year ago (348 days); and the number of purchases he has made is 12.

In [0]:
rfm_df = df_x.join(df_z, "customer_id", "inner")
rfm_df = rfm_df.withColumnRenamed("sum_price", "monetary").withColumnRenamed("sum_price2", "frequency")
display(rfm_df.orderBy("customer_id"))

customer_id,monetary,recency,frequency
12346,77551.0,348,12
12347,5641.0,25,8
12348,2016.0,98,5
12349,4454.0,41,4
12350,334.0,333,1
12351,304.0,398,1
12352,2855.0,59,10
12353,409.0,227,2
12354,1086.0,255,1
12355,950.0,237,2


#### RFM scores

In [0]:
from pyspark.ml.feature import QuantileDiscretizer

qds = QuantileDiscretizer(numBuckets = 6, inputCol="recency", outputCol="recency_score")
rfm_df = qds.fit(rfm_df).transform(rfm_df)

qds = QuantileDiscretizer(numBuckets = 6, inputCol="frequency", outputCol="frequency_score")
rfm_df = qds.fit(rfm_df).transform(rfm_df)

qds = QuantileDiscretizer(numBuckets = 6, inputCol = "monetary", outputCol = "monetary_score")
rfm_df = qds.fit(rfm_df).transform(rfm_df)

display(rfm_df.orderBy("customer_id"))

customer_id,monetary,recency,frequency,recency_score,frequency_score,monetary_score
12346,77551.0,348,12,4.0,5.0,5.0
12347,5641.0,25,8,0.0,4.0,5.0
12348,2016.0,98,5,2.0,4.0,4.0
12349,4454.0,41,4,1.0,3.0,5.0
12350,334.0,333,1,4.0,1.0,1.0
12351,304.0,398,1,4.0,1.0,1.0
12352,2855.0,59,10,1.0,5.0,4.0
12353,409.0,227,2,3.0,2.0,1.0
12354,1086.0,255,1,3.0,1.0,3.0
12355,950.0,237,2,3.0,2.0,3.0


#### Reverse the recency_score value; the lower the recency value, the higher the score should be (QuantileDiscretizer gives a high score to high values)

In [0]:
rfm_df = rfm_df.select("customer_id", "recency", "frequency", "monetary", abs(rfm_df.recency_score - 5).alias("recency_score"), "frequency_score", "monetary_score")
display(rfm_df.orderBy("customer_id"))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score
12346,348,12,77551.0,1.0,5.0,5.0
12347,25,8,5641.0,5.0,4.0,5.0
12348,98,5,2016.0,3.0,4.0,4.0
12349,41,4,4454.0,4.0,3.0,5.0
12350,333,1,334.0,1.0,1.0,1.0
12351,398,1,304.0,1.0,1.0,1.0
12352,59,10,2855.0,4.0,5.0,4.0
12353,227,2,409.0,2.0,2.0,1.0
12354,255,1,1086.0,2.0,1.0,3.0
12355,237,2,950.0,2.0,2.0,3.0


#### Unify RFM scores

In [0]:
rfm_df = rfm_df.withColumn("rfm_score", concat(col("recency_score").cast("int"), col("frequency_score").cast("int"), col("monetary_score").cast("int")))
display(rfm_df.orderBy("customer_id"))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score
12346,348,12,77551.0,1.0,5.0,5.0,155
12347,25,8,5641.0,5.0,4.0,5.0,545
12348,98,5,2016.0,3.0,4.0,4.0,344
12349,41,4,4454.0,4.0,3.0,5.0,435
12350,333,1,334.0,1.0,1.0,1.0,111
12351,398,1,304.0,1.0,1.0,1.0,111
12352,59,10,2855.0,4.0,5.0,4.0,454
12353,227,2,409.0,2.0,2.0,1.0,221
12354,255,1,1086.0,2.0,1.0,3.0,213
12355,237,2,950.0,2.0,2.0,3.0,223


#### Create segments based on RFM scores; use regular expressions to replace RFM scores with corresponding segments

In [0]:
rfm_df = rfm_df.withColumn("segment_tmp", concat(col("recency_score").cast("int"), col("frequency_score").cast("int")))

In [0]:
rfm_seg_df = rfm_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment_tmp"), r'[0-2][0-2]', "Hibernating").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'[0-2][3-4]', "At Risk").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'[0-2]5', "Can\'t Lose").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'3[0-2]', "About to Sleep").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'33', "Need Attention").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'[3-4][4-5]', "Loyal Customers").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'4[0-1]', "Promising").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'5[0-1]', "New Customers").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'[4-5][2-3]', "Potential Loyalists").alias("segment"))
rfm_seg_df = rfm_seg_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", 
                                                      regexp_replace(col("segment"), r'5[4-5]', "Champions").alias("segment"))
display(rfm_seg_df.orderBy("customer_id"))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score,segment
12346,348,12,77551.0,1.0,5.0,5.0,155,Can't Lose
12347,25,8,5641.0,5.0,4.0,5.0,545,Champions
12348,98,5,2016.0,3.0,4.0,4.0,344,Loyal Customers
12349,41,4,4454.0,4.0,3.0,5.0,435,Potential Loyalists
12350,333,1,334.0,1.0,1.0,1.0,111,Hibernating
12351,398,1,304.0,1.0,1.0,1.0,111,Hibernating
12352,59,10,2855.0,4.0,5.0,4.0,454,Loyal Customers
12353,227,2,409.0,2.0,2.0,1.0,221,Hibernating
12354,255,1,1086.0,2.0,1.0,3.0,213,Hibernating
12355,237,2,950.0,2.0,2.0,3.0,223,Hibernating


#### Get the RFM mean and count for each segment

In [0]:
seg_df = rfm_seg_df.groupBy("segment").agg(mean("recency").cast("int").alias("recency_mean"), count("recency").alias("recency_count"),
                                            mean("frequency").cast("int").alias("frequency_mean"), count("frequency").alias("frequency_count"),
                                            mean("monetary").cast("int").alias("monetary_mean"), count("monetary").alias("monetary_count")) 
display(seg_df.orderBy("segment"))

segment,recency_mean,recency_count,frequency_mean,frequency_count,monetary_mean,monetary_count
About to Sleep,87,334,1,334,544,334
At Risk,314,948,4,948,1613,948
Can't Lose,239,148,15,148,7077,148
Champions,29,689,21,689,12299,689
Hibernating,442,1853,1,1853,457,1853
Loyal Customers,65,955,11,955,4877,955
Need Attention,85,228,3,228,1264,228
New Customers,31,56,1,56,383,56
Potential Loyalists,41,557,2,557,1298,557
Promising,50,110,1,110,312,110


In [0]:
seg_complex_df = seg_df.select("segment", struct("recency_mean", "recency_count").alias("recency"),
                                          struct("frequency_mean", "frequency_count").alias("frequency"),
                                          struct("monetary_mean", "monetary_count").alias("monetary"))
                     
display(seg_complex_df.orderBy("segment"))

segment,recency,frequency,monetary
About to Sleep,"List(87, 334)","List(1, 334)","List(544, 334)"
At Risk,"List(314, 948)","List(4, 948)","List(1613, 948)"
Can't Lose,"List(239, 148)","List(15, 148)","List(7077, 148)"
Champions,"List(29, 689)","List(21, 689)","List(12299, 689)"
Hibernating,"List(442, 1853)","List(1, 1853)","List(457, 1853)"
Loyal Customers,"List(65, 955)","List(11, 955)","List(4877, 955)"
Need Attention,"List(85, 228)","List(3, 228)","List(1264, 228)"
New Customers,"List(31, 56)","List(1, 56)","List(383, 56)"
Potential Loyalists,"List(41, 557)","List(2, 557)","List(1298, 557)"
Promising,"List(50, 110)","List(1, 110)","List(312, 110)"
