### Load Data From Table

In [0]:
retail_df = spark.read.table("retail_csv")
retail_df.printSchema()

In [0]:
print("Partitions: " + str(retail_df.rdd.getNumPartitions()))

In [0]:
display(retail_df.select("*").limit(10))

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
570668,21181,PLEASE ONE PERSON METAL SIGN,5,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom
570668,21166,COOK WITH WINE METAL SIGN,3,2011-10-11T14:34:00.000+0000,2.08,17837.0,United Kingdom
570668,21903,MAN FLU METAL SIGN,2,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom
570668,82494L,WOODEN FRAME ANTIQUE WHITE,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,82482,WOODEN PICTURE FRAME WHITE FINISH,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,22659,LUNCH BOX I LOVE LONDON,4,2011-10-11T14:34:00.000+0000,1.95,17837.0,United Kingdom
570668,22796,PHOTO FRAME 3 CLASSIC HANGING,1,2011-10-11T14:34:00.000+0000,9.95,17837.0,United Kingdom
570668,84706F,RED PEONY TABLE CLOCK,2,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,20749,ASSORTED COLOUR MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom
570668,20750,RED RETROSPOT MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom


In [0]:
%sql
SELECT * FROM retail_csv limit 10

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
570668,21181,PLEASE ONE PERSON METAL SIGN,5,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom
570668,21166,COOK WITH WINE METAL SIGN,3,2011-10-11T14:34:00.000+0000,2.08,17837.0,United Kingdom
570668,21903,MAN FLU METAL SIGN,2,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom
570668,82494L,WOODEN FRAME ANTIQUE WHITE,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,82482,WOODEN PICTURE FRAME WHITE FINISH,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,22659,LUNCH BOX I LOVE LONDON,4,2011-10-11T14:34:00.000+0000,1.95,17837.0,United Kingdom
570668,22796,PHOTO FRAME 3 CLASSIC HANGING,1,2011-10-11T14:34:00.000+0000,9.95,17837.0,United Kingdom
570668,84706F,RED PEONY TABLE CLOCK,2,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom
570668,20749,ASSORTED COLOUR MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom
570668,20750,RED RETROSPOT MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom


### Total Invoice Amount Distribution

remove negetive amounts rows

In [0]:
from pyspark.sql.functions import *
invoice_amount_df = retail_df.withColumn("amount", expr("quantity * unit_price"))
invoice_amount_df = invoice_amount_df.select("invoice_no", "amount").where("amount > 0")

Get Total amounts

In [0]:
total_amounts_df = invoice_amount_df.groupBy("invoice_no").sum("amount")
total_amounts_df = total_amounts_df.withColumnRenamed("sum(amount)", "total_amount")

In [0]:
display(total_amounts_df.orderBy("invoice_no").limit(10))

invoice_no,total_amount
489434,505.30000000000007
489435,145.79999999999998
489436,630.33
489437,310.75
489438,2286.24
489439,426.3
489440,50.4
489441,344.34000000000003
489442,382.37000000000006
489443,285.06


### Distribution

In [0]:
display(total_amounts_df.describe("total_amount"))

summary,total_amount
count,40078.0
mean,523.3037611158245
stddev,1517.351645669805
min,0.19
max,168469.6


### Distribution without outliers
remove outliers

In [0]:
quantile = total_amounts_df.approxQuantile("total_amount", [0.85], 0)
remove_outliers = quantile[0]
print(remove_outliers)

In [0]:
no_outliers_df = total_amounts_df.filter(col("total_amount") < remove_outliers)
display(no_outliers_df.describe("total_amount"))

summary,total_amount
count,34066.0
mean,271.68389129924265
stddev,174.9935861934174
min,0.19
max,724.2499999999999


In [0]:
display(no_outliers_df.orderBy("invoice_no").limit(10))

invoice_no,total_amount
489434,505.30000000000007
489435,145.79999999999998
489436,630.33
489437,310.75
489439,426.3
489440,50.4
489441,344.34000000000003
489442,382.37000000000006
489443,285.06
489444,141.0


### Monthly Placed and Cancelled Orders

Create year_month column for YYYYMM

In [0]:
from pyspark.sql.functions import date_format
orders_df = retail_df.select("*", (year("invoice_date") * 100 + month("invoice_date")).alias("YYYYMM"))
orders_df = orders_df.select("YYYYMM", "invoice_no")
orders_df = orders_df.dropDuplicates(["invoice_no"])
display(orders_df.orderBy("YYYYMM").limit(10))

YYYYMM,invoice_no
200912,492051
200912,C489549
200912,491045
200912,489677
200912,C491705
200912,C492541
200912,C493168
200912,491658
200912,C491017
200912,489582


Create a cancelled order column - order starting with "C"

In [0]:
cancelled_df = orders_df.withColumn("cancellation", expr("invoice_no LIKE 'C%'"))
cancelled_df.show()

Count numbers of placed orders for each month

In [0]:
placed_count_df = cancelled_df.filter("cancellation = false")
placed_count_df = placed_count_df.select("YYYYMM", "cancellation").groupBy("YYYYMM").count().orderBy("YYYYMM")
placed_count_df = placed_count_df.withColumnRenamed("count", "placed")
placed_count_df.show()

In [0]:
cancelled_count_df = cancelled_df.filter("cancellation = true")
cancelled_count_df = cancelled_count_df.select("YYYYMM", "cancellation").groupBy("YYYYMM").count().orderBy("YYYYMM")
cancelled_count_df = cancelled_count_df.withColumnRenamed("count", "cancelled")
cancelled_count_df.show()

In [0]:
merge_count_df = placed_count_df.join(cancelled_count_df, "YYYYMM", "left_outer")
display(merge_count_df.orderBy("YYYYMM"))

YYYYMM,placed,cancelled
200912,1929,401
201001,1333,300
201002,1729,240
201003,1960,407
201004,1588,304
201005,2011,407
201006,1859,357
201007,1673,344
201008,1604,273
201009,2004,371


In [0]:
display(merge_count_df.orderBy("YYYYMM"))

YYYYMM,placed,cancelled
200912,1929,401
201001,1333,300
201002,1729,240
201003,1960,407
201004,1588,304
201005,2011,407
201006,1859,357
201007,1673,344
201008,1604,273
201009,2004,371


### Monthly Sales

In [0]:
retail_df = retail_df.withColumn("amount", expr("quantity * unit_price"))

In [0]:
retail_df = retail_df.withColumn("year_month", (year("invoice_date") * 100 + month("invoice_date")))

In [0]:
monthly_sales_df = retail_df.groupBy("year_month").sum("amount")
monthly_sales_df.show()

In [0]:
display(monthly_sales_df.orderBy("year_month"))

year_month,sum(amount)
200912,799847.1100000143
201001,624032.8919999955
201002,533091.4260000042
201003,765848.7610000083
201004,590580.4319999823
201005,615322.8300000005
201006,679786.6099999842
201007,575236.359999999
201008,656776.3399999854
201009,853650.4309999745


### Monthly Sales Growth

In [0]:
monthly_sales_df.head()
monthly_growth_df = monthly_sales_df.orderBy("year_month")
monthly_growth_df = monthly_growth_df.withColumnRenamed("sum(amount)", "amount")
monthly_growth_df.show()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
pre_win = Window.partitionBy().orderBy("year_month")
monthly_growth_df = monthly_growth_df.withColumn("prev_val", F.lag(monthly_growth_df.amount).over(pre_win)) 
monthly_growth_df.show()

In [0]:
monthly_growth_df = monthly_growth_df.withColumn("growth", (monthly_growth_df.amount-monthly_growth_df.prev_val) / monthly_growth_df.prev_val * 100 )
monthly_growth_df.show()

In [0]:
display(monthly_growth_df)

year_month,amount,prev_val,growth
200912,799847.1100000143,,
201001,624032.8919999955,799847.1100000143,-21.980978089677127
201002,533091.4260000042,624032.8919999955,-14.573184709627766
201003,765848.7610000083,533091.4260000042,43.66180426994944
201004,590580.4319999823,765848.7610000083,-22.88550141037887
201005,615322.8300000005,590580.4319999823,4.18950521544184
201006,679786.6099999842,615322.8300000005,10.476416095268814
201007,575236.359999999,679786.6099999842,-15.379863101449958
201008,656776.3399999854,575236.359999999,14.17503928298039
201009,853650.4309999745,656776.3399999854,29.975819622246664


### Monthly Active Users

In [0]:
monthly_active_users_df = retail_df.select("year_month", "customer_id")
monthly_active_users_df = monthly_active_users_df.groupBy("year_month").agg(countDistinct("customer_id").alias("active_users"))
display(monthly_active_users_df.orderBy("year_month"))

year_month,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


In [0]:
display(monthly_active_users_df.orderBy("year_month"))

year_month,active_users
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


### New and Existing Users

In [0]:
users_df = retail_df.select("*", (year("invoice_date") * 100 + month("invoice_date")).alias("YYYYMM"))
users_df = users_df.select("YYYYMM", "customer_id")
new_users_df = users_df.dropDuplicates(["customer_id"])
display(new_users_df.orderBy("YYYYMM"))

YYYYMM,customer_id
200912,13672.0
200912,16329.0
200912,16104.0
200912,17172.0
200912,16451.0
200912,17268.0
200912,14627.0
200912,17978.0
200912,12443.0
200912,13042.0


In [0]:
existing_users = users_df.subtract(new_users_df)

In [0]:
monthly_new_user_df = new_users_df.groupBy("YYYYMM").count()
monthly_new_user_df = monthly_new_user_df.withColumnRenamed("count", "new_users")
display(monthly_new_user_df.orderBy("YYYYMM"))

YYYYMM,new_users
200912,492
201001,277
201002,254
201003,296
201004,206
201005,189
201006,196
201007,125
201008,111
201009,172


In [0]:
monthly_existing_user = existing_users.groupBy("YYYYMM").count()
monthly_existing_user = monthly_existing_user.withColumnRenamed("count", "existing_users")
display(monthly_existing_user.orderBy("YYYYMM"))

YYYYMM,existing_users
200912,554
201001,510
201002,554
201003,816
201004,793
201005,874
201006,900
201007,864
201008,854
201009,1031


In [0]:
monthly_users_df = monthly_new_user_df.join(monthly_existing_user, "YYYYMM", "left_outer")
display(monthly_users_df.orderBy("YYYYMM"))

YYYYMM,new_users,existing_users
200912,492,554
201001,277,510
201002,254,554
201003,296,816
201004,206,793
201005,189,874
201006,196,900
201007,125,864
201008,111,854
201009,172,1031


### RFM Segmentation

In [0]:
display(retail_df.limit(10))


invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,amount,year_month
570668,21181,PLEASE ONE PERSON METAL SIGN,5,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom,10.5,201110
570668,21166,COOK WITH WINE METAL SIGN,3,2011-10-11T14:34:00.000+0000,2.08,17837.0,United Kingdom,6.24,201110
570668,21903,MAN FLU METAL SIGN,2,2011-10-11T14:34:00.000+0000,2.1,17837.0,United Kingdom,4.2,201110
570668,82494L,WOODEN FRAME ANTIQUE WHITE,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom,17.700000000000003,201110
570668,82482,WOODEN PICTURE FRAME WHITE FINISH,6,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom,17.700000000000003,201110
570668,22659,LUNCH BOX I LOVE LONDON,4,2011-10-11T14:34:00.000+0000,1.95,17837.0,United Kingdom,7.8,201110
570668,22796,PHOTO FRAME 3 CLASSIC HANGING,1,2011-10-11T14:34:00.000+0000,9.95,17837.0,United Kingdom,9.95,201110
570668,84706F,RED PEONY TABLE CLOCK,2,2011-10-11T14:34:00.000+0000,2.95,17837.0,United Kingdom,5.9,201110
570668,20749,ASSORTED COLOUR MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom,15.9,201110
570668,20750,RED RETROSPOT MINI CASES,2,2011-10-11T14:34:00.000+0000,7.95,17837.0,United Kingdom,15.9,201110


In [0]:
tmp_retail_df = retail_df.select("invoice_no", "quantity", "unit_price", "customer_id", to_date(retail_df.invoice_date).alias("invoice_date"), to_date(lit("2012-01-01")).alias("today"), expr("ROUND(unit_price * quantity) AS total_price"))
display(tmp_retail_df.limit(10))

invoice_no,quantity,unit_price,customer_id,invoice_date,today,total_price
570668,5,2.1,17837.0,2011-10-11,2012-01-01,11.0
570668,3,2.08,17837.0,2011-10-11,2012-01-01,6.0
570668,2,2.1,17837.0,2011-10-11,2012-01-01,4.0
570668,6,2.95,17837.0,2011-10-11,2012-01-01,18.0
570668,6,2.95,17837.0,2011-10-11,2012-01-01,18.0
570668,4,1.95,17837.0,2011-10-11,2012-01-01,8.0
570668,1,9.95,17837.0,2011-10-11,2012-01-01,10.0
570668,2,2.95,17837.0,2011-10-11,2012-01-01,6.0
570668,2,7.95,17837.0,2011-10-11,2012-01-01,16.0
570668,2,7.95,17837.0,2011-10-11,2012-01-01,16.0


Remove negative values and missing values

In [0]:
tmp_retail_df = tmp_retail_df.filter(tmp_retail_df.quantity > 0).filter(tmp_retail_df.unit_price > 0).filter(tmp_retail_df.total_price > 0)
tmp_retail_df = tmp_retail_df.na.drop()
tmp_retail_df.count()

In [0]:
recency_df = tmp_retail_df.groupBy("customer_id").agg(max("invoice_date").alias("max_date"), max("today").alias("today"))
recency_df = recency_df.select("customer_id", datediff(col("today"), col("max_date")).alias("recency"))
recency_df.head()

#### RECENCY

In [0]:
recency_df.show()

#### Frequency

In [0]:
frequency_df = tmp_retail_df.groupBy("customer_id", "invoice_no").count()
frequency_df = frequency_df.groupBy("customer_id").count()
display(frequency_df.orderBy("customer_id"))

customer_id,count
12346.0,12
12347.0,8
12348.0,5
12349.0,4
12350.0,1
12351.0,1
12352.0,10
12353.0,2
12354.0,1
12355.0,2


In [0]:
monetary_df = retail_df.select("customer_id", "amount").groupBy("customer_id").agg({'amount':'sum'})
monetary_df = monetary_df.filter(monetary_df.customer_id.isNotNull())
monetary_df = monetary_df.withColumnRenamed("sum(amount)", "monetary")
display(monetary_df.orderBy("customer_id"))

customer_id,monetary
12346.0,-64.68
12347.0,5633.319999999999
12348.0,2019.4
12349.0,4404.54
12350.0,334.40000000000003
12351.0,300.93
12352.0,1889.21
12353.0,406.75999999999993
12354.0,1079.4
12355.0,947.61


### Create RFM table

In [0]:
rfm_tmp_df = recency_df.join(frequency_df, "customer_id", "inner")
rfm_df = rfm_tmp_df.join(monetary_df, "customer_id", "inner")
rfm_df = rfm_df.withColumnRenamed("count", "frequency")
display(rfm_df.orderBy("customer_id").limit(10))

customer_id,recency,frequency,monetary
12346.0,348,12,-64.68
12347.0,25,8,5633.319999999999
12348.0,98,5,2019.4
12349.0,41,4,4404.54
12350.0,333,1,334.40000000000003
12351.0,398,1,300.93
12352.0,59,10,1889.21
12353.0,227,2,406.75999999999993
12354.0,255,1,1079.4
12355.0,237,2,947.61


### RFM Segmentation

In [0]:
from pyspark.ml.feature import QuantileDiscretizer
qd = QuantileDiscretizer(numBuckets = 5, inputCol="recency", outputCol="recency_score")
rfm_df = qd.fit(rfm_df).transform(rfm_df)

qd = QuantileDiscretizer(numBuckets = 5, inputCol="monetary", outputCol="monetary_score")
rfm_df = qd.fit(rfm_df).transform(rfm_df)

qd = QuantileDiscretizer(numBuckets = 5, inputCol="frequency", outputCol="frequency_score")
rfm_df = qd.fit(rfm_df).transform(rfm_df)

display(rfm_df.orderBy("customer_id").limit(10))

customer_id,recency,frequency,monetary,recency_score,monetary_score,frequency_score
12346.0,348,12,-64.68,3.0,0.0,4.0
12347.0,25,8,5633.319999999999,0.0,4.0,4.0
12348.0,98,5,2019.4,2.0,3.0,3.0
12349.0,41,4,4404.54,0.0,4.0,3.0
12350.0,333,1,334.40000000000003,3.0,1.0,1.0
12351.0,398,1,300.93,3.0,1.0,1.0
12352.0,59,10,1889.21,1.0,3.0,4.0
12353.0,227,2,406.75999999999993,3.0,1.0,2.0
12354.0,255,1,1079.4,3.0,2.0,1.0
12355.0,237,2,947.61,3.0,2.0,2.0


In [0]:
rfm_df = rfm_df.select("customer_id", "recency", "frequency", "monetary", abs(rfm_df.recency_score-5).alias("recency_score"), (rfm_df.frequency_score + 1).alias("frequency_score"), (rfm_df.monetary_score + 1).alias("monetary_score"))
display(rfm_df.orderBy("customer_id").limit(10))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score
12346.0,348,12,-64.68,2.0,5.0,1.0
12347.0,25,8,5633.319999999999,5.0,5.0,5.0
12348.0,98,5,2019.4,3.0,4.0,4.0
12349.0,41,4,4404.54,5.0,4.0,5.0
12350.0,333,1,334.40000000000003,2.0,2.0,2.0
12351.0,398,1,300.93,2.0,2.0,2.0
12352.0,59,10,1889.21,4.0,5.0,4.0
12353.0,227,2,406.75999999999993,2.0,3.0,2.0
12354.0,255,1,1079.4,2.0,2.0,3.0
12355.0,237,2,947.61,2.0,3.0,3.0


### RFM Scores

In [0]:
rfm_df = rfm_df.withColumn("rfm_score", concat(col("recency_score").cast("int"), col("frequency_score").cast("int"), col("monetary_score").cast("int")))
display(rfm_df.orderBy("customer_id").limit(10))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score
12346.0,348,12,-64.68,2.0,5.0,1.0,251
12347.0,25,8,5633.319999999999,5.0,5.0,5.0,555
12348.0,98,5,2019.4,3.0,4.0,4.0,344
12349.0,41,4,4404.54,5.0,4.0,5.0,545
12350.0,333,1,334.40000000000003,2.0,2.0,2.0,222
12351.0,398,1,300.93,2.0,2.0,2.0,222
12352.0,59,10,1889.21,4.0,5.0,4.0,454
12353.0,227,2,406.75999999999993,2.0,3.0,2.0,232
12354.0,255,1,1079.4,2.0,2.0,3.0,223
12355.0,237,2,947.61,2.0,3.0,3.0,233


In [0]:
rfm_df = rfm_df.drop(col("segment"))
rfm_df.show()

In [0]:
rfm_df = rfm_df.withColumn("segment", concat(col("recency_score").cast("int"), col("frequency_score").cast("int"), col("monetary_score").cast("int")))

In [0]:
seg_map_df = rfm_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'[1-2][1-2][1-5]', "Hibernating").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'[1-2][3-4][1-5]', "At Risk").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'[1-2]5[1-5]', "Can\'t Lose").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'3[1-2][1-5]', "About to Sleep").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'33[1-5]', "Need Attention").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'[3-4][4-5][1-5]', "Loyal Customers").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'41[1-5]', "Promising").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'51[1-5]', "New Customers").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'[4-5][2-3][1-5]', "Potential Loyalists").alias("segment"))

seg_map_df = seg_map_df.select("customer_id", "recency", "frequency", "monetary", "recency_score", "frequency_score", "monetary_score", "rfm_score", regexp_replace(col("segment"), r'5[4-5][1-5]', "Champions").alias("segment"))

display(seg_map_df.orderBy("customer_id").limit(10))

customer_id,recency,frequency,monetary,recency_score,frequency_score,monetary_score,rfm_score,segment
12346.0,348,12,-64.68,2.0,5.0,1.0,251,Can't Lose
12347.0,25,8,5633.319999999999,5.0,5.0,5.0,555,Champions
12348.0,98,5,2019.4,3.0,4.0,4.0,344,Loyal Customers
12349.0,41,4,4404.54,5.0,4.0,5.0,545,Champions
12350.0,333,1,334.40000000000003,2.0,2.0,2.0,222,Hibernating
12351.0,398,1,300.93,2.0,2.0,2.0,222,Hibernating
12352.0,59,10,1889.21,4.0,5.0,4.0,454,Loyal Customers
12353.0,227,2,406.75999999999993,2.0,3.0,2.0,232,At Risk
12354.0,255,1,1079.4,2.0,2.0,3.0,223,Hibernating
12355.0,237,2,947.61,2.0,3.0,3.0,233,At Risk


### RFM mean for each segment

In [0]:
seg_mean_df = seg_map_df.groupBy("segment").agg(mean("recency").cast("int").alias("recency_mean"), count("recency").alias("recency_count"), 
                                                mean("frequency").cast("int").alias("frequency_mean"), count("frequency").alias("frequency_count"),
                                                mean("monetary").cast("int").alias("monetary_mean"), count("monetary").alias("monetary_count"))
display(seg_mean_df.orderBy("segment"))

segment,recency_mean,recency_count,frequency_mean,frequency_count,monetary_mean,monetary_count
About to Sleep,127,246,1,246,389,246
At Risk,406,1122,3,1122,1018,1122
Can't Lose,359,90,14,90,5768,90
Champions,30,882,18,882,10046,882
Hibernating,499,1140,1,1140,305,1140
Loyal Customers,90,1309,9,1309,3766,1309
Need Attention,135,326,2,326,890,326
Potential Loyalists,49,763,2,763,684,763


In [0]:
seg_comp_df = seg_mean_df.select("segment", struct("recency_mean", "recency_count").alias("recency"),
                                            struct("frequency_mean", "frequency_count").alias("frequency"),
                                            struct("monetary_mean", "monetary_count").alias("monetary"))
display(seg_comp_df.orderBy("segment"))

segment,recency,frequency,monetary
About to Sleep,"List(127, 246)","List(1, 246)","List(389, 246)"
At Risk,"List(406, 1122)","List(3, 1122)","List(1018, 1122)"
Can't Lose,"List(359, 90)","List(14, 90)","List(5768, 90)"
Champions,"List(30, 882)","List(18, 882)","List(10046, 882)"
Hibernating,"List(499, 1140)","List(1, 1140)","List(305, 1140)"
Loyal Customers,"List(90, 1309)","List(9, 1309)","List(3766, 1309)"
Need Attention,"List(135, 326)","List(2, 326)","List(890, 326)"
Potential Loyalists,"List(49, 763)","List(2, 763)","List(684, 763)"
