# Retail Data Analytics Using Databricks
[In a previous project](https://github.com/jarviscanada/jarvis_data_eng_KevinShimotakahara/tree/master/python_data_wrangling), we performed analytics on two years' worth of historical transaction data belonging to the e-commerce store London Gift Shop. Ultimately, this project aimed to inform business decisions with data, producing statistics on the company's revenue and customer activity, including a formal Recency-Frequency-Magnitude (RFM) breakdown of London Gift Shop's clientelle, which can be leveraged to create insightful marketing campaigns.

In this notebook, we perform the same [analysis previously done using Python/Pandas in a Jupyter notebook](https://github.com/jarviscanada/jarvis_data_eng_KevinShimotakahara/blob/master/python_data_wrangling/retail_data_analytics_wrangling.ipynb), but this time we use PySpark, a Python API for the Apache Spark computational engine designed for high performance big data analytics. This notebook was developed using the Microsoft Azure Databricks interface.

## Getting Started: Importing CSV Data
First, we must upload the .csv file containing the data we will be working with, and then construct a PySpark DataFrame with it to begin running analytics methods. Uploading the .csv file has been done manually through the Databricks UI. For the construction of the PySpark dataframe, there are two options, namely using the Databricks UI to generate a table schema manually (which is stored in a Hive metastore and can then be accessed by all Databricks notebooks in your workspace), or programmatically read the .csv data with PySpark to generate a DataFrame for it. We do the former option in the following paragraphs.

### Switch to the database the manually-generated table schema was stored in:

In [0]:
%sql
USE kshim017_uottawa_ca_db

### We named the table "retail_csv" when creating it with the Databricks UI; confirm it exists:

In [0]:
%sql
SHOW CREATE TABLE retail_csv

createtab_stmt
"CREATE TABLE `retail_csv` (`invoice_no` INT, `stock_code` STRING, `description` STRING, `quantity` INT, `invoice_date` TIMESTAMP, `unit_price` FLOAT, `customer_id` INT, `country` STRING) USING com.databricks.spark.csv OPTIONS (  `multiLine` 'false',  `escape` '""',  `header` 'false',  `delimiter` ',',  path 'dbfs:/FileStore/tables/retail-1.csv' )"


### Assign the contents of retail_csv to a PySpark DataFrame:

In [0]:
retail_csv = spark.sql("select * from retail_csv")
retail_csv.printSchema()

### Confirm successful import:

In [0]:
retail_csv.show(5)

## The Data at a Glance
First, we would like to inspect the properties of the purchases made by London Gift Shop's customers, e.g. how much do customers spend on average per visit. However, as can be seen by the previous paragraph, a single invoice number (indicated by the `invoice_no` field) has multiple records, one for each different item purchased. Moreover, although not proven here, there are records that have negative `quantity` or `unit_price` fields used to effectively undo orders placed that have since been cancelled. Finally, we are interested in finding out the grand total of each invoice in the table, but the table doesn't even have a subtotal field that multiplies `unit_price` by `quantity`. 

What all of this means is that we need to filter out the records with negative values, create a new column that multiplies `unit_price` by `quantity`,
and aggregate this new column, grouping by the `invoice_no` field. 

This is done in the following paragraph:

In [0]:
from pyspark.sql.functions import *
stat_summary_df = (retail_csv.filter((col("unit_price") > 0) & (col("quantity") > 0))
                   .withColumn("total_cost",col("unit_price")*col("quantity"))
                   .groupBy("invoice_no")
                   .sum()
                   .select("invoice_no","sum(total_cost)")
                   .orderBy("sum(total_cost)")
                   .cache()
                  )
display(stat_summary_df)

invoice_no,sum(total_cost)
528127,0.1899999976158142
570554,0.3799999952316284
567869,0.3999999761581421
529767,0.4199999868869781
502731,0.4199999868869781
539441,0.4199999868869781
507293,0.4199999868869781
518991,0.4199999868869781
532608,0.5
573589,0.550000011920929


## Calclulating Summary Statistics of Purchase Amounts
The following paragraphs use the DataFrame just created to find the mean, max, min, median, and mode of the grand totals of each invoice in the data:

In [0]:
#Mean, max, min
mean_max_min = (stat_summary_df.select(mean(col("sum(total_cost)")),
                        max(col("sum(total_cost)")),
                        min(col("sum(total_cost)"))))
display(mean_max_min)

avg(sum(total_cost)),max(sum(total_cost)),min(sum(total_cost))
523.0445263888447,168469.59375,0.1899999976158142


### Two Ways to Compute Median
The first way is to use the "approxQuantile" method, which returns the approximate value that coresponds to the indicated quantile of the data set. Seeing that the median is the same thing as the 50th quantile value in a data set, we can calculate it with this method. The other way (shown in the paragraph after next) can be used to calculate the exact median value, which is done by sorting the dataframe, cutting it in half, reversing its order, and then taking the first record (or two if the data set has an even number of records).

In [0]:
#approximate median value
stat_summary_df.approxQuantile("sum(total_cost)", [0.5], 0)[0]

In [0]:
#precise median value
from pyspark.sql.functions import *
#stat_summary_df.count()
numRows = 40076
firstIndex = 20037
secondIndex = 20038

#truncate half of the records
first_half_plus_one = stat_summary_df.orderBy("sum(total_cost)").limit(secondIndex)

#invert order, and show take first two values
med_vals = first_half_plus_one.orderBy(desc("sum(total_cost)")).show(2)

### Calculate mode by counting the number of occurences of each invoice amount, and seeing which count is largest

In [0]:
#mode... is a max count
counts = stat_summary_df.groupBy("sum(total_cost)").count()
record_with_mode = counts.orderBy(desc("count")).show(1)

## Repeating for Transactions Falling Below 85th Quantile
In the previous project, it was observed that there were some outliers (very large purchases) that distorted the summary statistics to the exent that they were not representative of typical customer behaviour. To remedy this, we repeat the analysis we just did, but this time we filter out the population of invoices representing the top 15% in largest purchases made. This is done with the help of the approxQuantile method previously described.

In [0]:
#repeat for lower 85th quantile
cutoff = stat_summary_df.approxQuantile("sum(total_cost)", [0.85], 0)[0]
stat_summary_df_85 = stat_summary_df.filter(col("sum(total_cost)") < cutoff)

#Mean, max, min
mean_max_min_85 = (stat_summary_df_85.select(mean(col("sum(total_cost)")),
                        max(col("sum(total_cost)")),
                        min(col("sum(total_cost)"))))
display(mean_max_min_85)

avg(sum(total_cost)),max(sum(total_cost)),min(sum(total_cost))
271.6676137544557,724.1299993991852,0.1899999976158142


In [0]:
#approximate median value
stat_summary_df_85.approxQuantile("sum(total_cost)", [0.5], 0)[0]

In [0]:
#mode... is a max count
counts = stat_summary_df_85.groupBy("sum(total_cost)").count()
counts.orderBy(desc("count")).show(1)

## A Blunder, but Also an Opportunity for Learning
It turned out that the manually configured table schema we have been working with was incorrectly designed for the upcoming analysis. The `invoice_no` was incorrectly formatted as an integer, when it should've been a string, since the negative cancelled order records are prefixed by the letter "C".

For the sake of learning, the table was re-designed, but this time using the other (programmatic) way previously mentioned.

In [0]:
retail_csv.printSchema()

### Manually write schema object, then use spark to read the .csv file and produce the table again

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
#Accidentally set up invoice_no as integer when it should have been string; need to re-import csv
csvSchema = StructType([
  StructField("invoice_no", StringType(), False),
  StructField("stock_code", StringType(), False),
  StructField("description", StringType(), False),
  StructField("quantity", IntegerType(), False),
  StructField("invoice_date", TimestampType(), False),
  StructField("unit_price", DoubleType(), False),
  StructField("customer_id", StringType(), False),
  StructField("country", StringType(), False)
])

file_location = "/FileStore/tables/retail.csv"

# CSV options
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read \
  .schema(csvSchema) \
  .option("sep", delimiter) \
  .csv(file_location)

#now, cancelled invoices should be present in data
df.filter(col("invoice_no").like("C%")).show(5)

## Looking at Monthly Time Series Data
In this section of the notebook, we look at the following month-by-month performance metrics of the business:
  - Monthly Placed and Cancelled Orders
  - Monthly Sales
  - Monthly Sales Growth
  - New and Existing Users

### Monthly Placed and Cancelled Orders
First, we add a new field to the table, namely the month and year of each record in a `yyyymm` format.

In [0]:
retail_csv = df.withColumn("yyyymm",date_format("invoice_date","yyyyMM").cast("integer"))
retail_csv.show(5)

### Cache result for faster queries in the future

In [0]:
retail_csv.cache()

## What is meant by "gross_orders"
Gross orders includes placed and cancelled orders.
For bookkeeping puposes, there are two records that correspond to a cancelled order:
  - The first order is the regular order record initially placed
  - The second order is a order with a "C" prefix on its invoice_no, and is a negative order to counterbalance the initial order

Thus, gross_orders is the sum of all placed orders, and the negative counterbalance orders.

In [0]:
gross_orders = retail_csv.groupby("yyyymm").agg(expr('count(distinct invoice_no)').alias('gross_orders'))
gross_orders = gross_orders.orderBy("yyyymm")
display(gross_orders)

yyyymm,gross_orders
200912,2330
201001,1633
201002,1969
201003,2367
201004,1892
201005,2418
201006,2216
201007,2017
201008,1877
201009,2375


### To find cancelled orders per month, do the same thing as gross orders, but apply a filter first

In [0]:
cancelled_orders = retail_csv.filter(col("invoice_no").like("C%"))
cancelled_orders = cancelled_orders.groupby("yyyymm").agg(expr('count(distinct invoice_no)').alias('cancelled_orders'))
cancelled_orders = cancelled_orders.orderBy("yyyymm")
display(cancelled_orders)

yyyymm,cancelled_orders
200912,401
201001,300
201002,240
201003,407
201004,304
201005,407
201006,357
201007,344
201008,273
201009,371


### Calculate a new field that counts the number of orders that actually went through each month, display final results

In [0]:
monthly_orders = gross_orders.join(cancelled_orders, (cancelled_orders.yyyymm == gross_orders.yyyymm))
monthly_orders = monthly_orders.withColumn("completed_orders",monthly_orders.gross_orders - 2*monthly_orders.cancelled_orders)
display(monthly_orders.select(gross_orders.yyyymm,"gross_orders","cancelled_orders","completed_orders").orderBy(gross_orders.yyyymm))

yyyymm,gross_orders,cancelled_orders,completed_orders
200912,2330,401,1528
201001,1633,300,1033
201002,1969,240,1489
201003,2367,407,1553
201004,1892,304,1284
201005,2418,407,1604
201006,2216,357,1502
201007,2017,344,1329
201008,1877,273,1331
201009,2375,371,1633


## Monthly Sales

In [0]:
retail_csv = retail_csv.withColumn("total_revenue",col("quantity") * col("unit_price"))
rev_df = retail_csv.select("yyyymm","total_revenue")
rev_df = rev_df.groupBy(rev_df.yyyymm).sum()
rev_df = rev_df.drop("sum(yyyymm)")
display(rev_df.orderBy("yyyymm"))

yyyymm,sum(total_revenue)
200912,799847.1100000143
201001,624032.8919999955
201002,533091.4260000042
201003,765848.7609999765
201004,590580.4319999823
201005,615322.8300000005
201006,679786.6099999842
201007,575236.3600000095
201008,656776.3399999854
201009,853650.4309999745


## Monthly Sales Growth
This metric is defined as the percentage change in total sales going from the previous month to the current month. The calculation is ultimately the current month's sales less the previous month's sales, divided by the previous month's sales.

In [0]:
from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("yyyymm")

rev_df = rev_df.withColumn("prev_value", lag(col("sum(total_revenue)")).over(my_window))
rev_df = rev_df.withColumn("diff", when(isnull(col("sum(total_revenue)") - rev_df.prev_value), 0)
                              .otherwise(col("sum(total_revenue)") - rev_df.prev_value))

rev_df = rev_df.withColumn("percent_diff",col("diff")/col("prev_value"))
display(rev_df)

yyyymm,sum(total_revenue),prev_value,diff,percent_diff
200912,799847.1100000143,,0.0,
201001,624032.8919999955,799847.1100000143,-175814.21800001885,-0.2198097808967712
201002,533091.4260000042,624032.8919999955,-90941.46599999128,-0.1457318470962776
201003,765848.7609999765,533091.4260000042,232757.3349999724,0.4366180426994347
201004,590580.4319999823,765848.7609999765,-175268.3289999942,-0.2288550141037566
201005,615322.8300000005,590580.4319999823,24742.398000018205,0.0418950521544184
201006,679786.6099999842,615322.8300000005,64463.77999998361,0.1047641609526881
201007,575236.3600000095,679786.6099999842,-104550.24999997462,-0.1537986310144841
201008,656776.3399999854,575236.3600000095,81539.97999997588,0.1417503928297831
201009,853650.4309999745,656776.3399999854,196874.09099998907,0.2997581962224666


## Monthly New Vs. Existing Users
In this analysis, we observe how many new and existing users shop at London Gift Shop each month. For example, if a customer makes a purchase for the first time in that month, they are considered a "new" user. This is a simple concept, but it is not trivial to determine how many "new" users there are each month given the current state of the data. First, we must create a table that counts the number of distinct customer ids grouped by the `yyyymm` field. Then, we must create another table containing a list of all unique customer ids and the `yyyymm_starting` values that correspond to the year and month of their first purchase, and then join it back to our main table. After that, we must create a table for both new users by filtering the main table's data by the newly created `yyyymm_starting` field, and then do the same for existing users. For completeness, we join the new and existing users tables so we can observe both data sets simultaneously. Referring back to the table we first created containing the total number of active users each month, we can verify that the values add up.

In [0]:
#check out how many active users there are each month.
active_users_df = retail_csv.select("yyyymm","customer_id")
active_users_df = active_users_df.groupby("yyyymm").agg(expr('count(distinct customer_id)').alias('num_active_customers'))
display(active_users_df.orderBy("yyyymm"))

yyyymm,num_active_customers
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


In [0]:
#Figure out year and month of each customer's first purchase
firstUserTimes = retail_csv.select("yyyymm","customer_id")
firstUserTimes = firstUserTimes.groupBy(firstUserTimes.customer_id).min()
firstUserTimes = firstUserTimes.withColumnRenamed("min(yyyymm)","yyyymm_starting")
firstUserTimes = firstUserTimes.withColumnRenamed("customer_id","customer_id_temp")
firstUserTimes.orderBy("customer_id_temp").show(5)

In [0]:
#add this new data to the main table
retail_csv = retail_csv.join(firstUserTimes, (retail_csv.customer_id == firstUserTimes.customer_id_temp))

In [0]:
#need to scroll to the right to see new data
display(retail_csv.orderBy("yyyymm").limit(5))

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,yyyymm,total_revenue,customer_id_temp,yyyymm_starting
491468,21396,RED SPOTTY EGG CUP,24,2009-12-11T12:01:00.000+0000,1.25,12957,United Kingdom,200912,30.0,12957,200912
491468,21704,BAG 250g SWIRLY MARBLES,24,2009-12-11T12:01:00.000+0000,0.85,12957,United Kingdom,200912,20.4,12957,200912
491468,20699,MOUSEY LONG LEGS SOFT TOY,6,2009-12-11T12:01:00.000+0000,2.55,12957,United Kingdom,200912,15.3,12957,200912
491468,22139,RETRO SPOT TEA SET CERAMIC 11 PC,6,2009-12-11T12:01:00.000+0000,4.95,12957,United Kingdom,200912,29.700000000000003,12957,200912
491468,20970,PINK FLORAL FELTCRAFT SHOULDER BAG,8,2009-12-11T12:01:00.000+0000,3.75,12957,United Kingdom,200912,30.0,12957,200912


### Query the new and existing users tables and join them together

In [0]:
users_monthly_existing = retail_csv.where(col("yyyymm") != col("yyyymm_starting"))
users_monthly_existing = users_monthly_existing.groupBy("yyyymm").agg(expr('count(distinct customer_id) AS num_existing_users'))
users_monthly_existing = users_monthly_existing.withColumnRenamed("yyyymm","yyyymm_extra")

users_monthly_new = retail_csv.where(col("yyyymm") == col("yyyymm_starting"))
users_monthly_new = users_monthly_new.groupBy("yyyymm").agg(expr('count(distinct customer_id) AS num_new_users'))

users_monthly = users_monthly_new.join(users_monthly_existing,users_monthly_new.yyyymm == users_monthly_existing.yyyymm_extra,"left")
users_monthly = users_monthly.drop("yyyymm_extra")
display(users_monthly.select("yyyymm","num_new_users","num_existing_users").orderBy("yyyymm"))

yyyymm,num_new_users,num_existing_users
200912,1045,
201001,394,392.0
201002,363,444.0
201003,436,675.0
201004,291,707.0
201005,254,808.0
201006,269,826.0
201007,183,805.0
201008,158,806.0
201009,242,960.0


## RFM Analysis
Recency, Frequency, Magnitude (RFM) analysis ranks customers based on the time elapsed since their last purchase (Recency), how often they make purchases (Frequency), and how large their purchases are (Magnitude). In this notebook, we also refer to the "Magnitude" measure as "Monetary", which is just a more specific instance of "Magnitude".

All scores are normalized to a scale from 1-5, with 5 being the best score. Recency (before normalization) is measured in terms of days since the end of 2011 (the latest time at which our data set bears records); Frequency is measured by the total number of purchases made by a customer; and Monetary is measured in terms of the total amount of money a customer has spent at the store.

We then group customers with similar RFM values, labelling each group in terms of their value to the business and how they should be approached by marketing initiatives. Once grouped, we investigate how many customers there are in each group, while reporting their mean pre-normalized RFM metrics.

### Drop any records with null values, check out some summary statistics

In [0]:
retail_csv = retail_csv.na.drop()
retail_csv = retail_csv.withColumn("total_cost",col("unit_price")*col("quantity"))
stat_summary = retail_csv.select("quantity","unit_price","total_cost")
stat_summary.summary().show()

### Finding Recency, Frequency, and Monetary (RFM)  values

In [0]:
rfm_table = (retail_csv.groupby('customer_id')
                  .agg(sum(retail_csv.total_cost).alias("monetary"),
                       max(retail_csv.invoice_date).alias("most_recent_purchase_date"),
                       expr('count(distinct invoice_no) AS frequency'))
       )
#subtracting 1 so output is consistent with Python project RFM analysis on same data
rfm_table = (rfm_table.withColumn("recency",datediff(to_date(lit("2012-01-01")), to_date("most_recent_purchase_date","yyyy/MM/dd"))-1)
                       .orderBy("customer_id")
            )
rfm_table = rfm_table.drop("most_recent_purchase_date")
rfm_table.show(5)

### Normalizing RFM Values
To map the RFM values to a scale from 1-5, we bin them into 5 disjoint, monotonically increasing quantile intervals. For example, the customers whose frequency values are among the lowest 20% in the population all get a Frequency score of 1, those between the lowest 20%-40% all get a Frequency score of 2, and so on.

PySpark offers an API called `QuantileDiscretizer`, which does this automatically for us.

In [0]:
#bucketing stuff
from pyspark.ml.feature import QuantileDiscretizer
bucketer_monetary = QuantileDiscretizer().setNumBuckets(5).setInputCol("monetary").setOutputCol("monetary_score").setRelativeError(0.0)
fitted_bucketer_monetary = bucketer_monetary.fit(rfm_table)
rfm_table = fitted_bucketer_monetary.transform(rfm_table)

bucketer_frequency = QuantileDiscretizer().setNumBuckets(5).setInputCol("frequency").setOutputCol("frequency_score").setRelativeError(0.0)
fitted_bucketer_frequency = bucketer_frequency.fit(rfm_table)
rfm_table = fitted_bucketer_frequency.transform(rfm_table)

bucketer_recency = QuantileDiscretizer().setNumBuckets(5).setInputCol("recency").setOutputCol("recency_score").setRelativeError(0.0)
fitted_bucketer_recency = bucketer_recency.fit(rfm_table)
rfm_table = fitted_bucketer_recency.transform(rfm_table)

display(rfm_table)

customer_id,monetary,frequency,recency,monetary_score,frequency_score,recency_score
12346,-64.67999999999364,17,347,0.0,4.0,3.0
12347,5633.32,8,24,4.0,3.0,0.0
12348,2019.4,5,97,3.0,3.0,2.0
12349,4404.539999999999,5,40,4.0,3.0,1.0
12350,334.40000000000003,1,332,1.0,1.0,3.0
12351,300.93,1,397,1.0,1.0,3.0
12352,1889.21,13,58,3.0,4.0,1.0
12353,406.75999999999993,2,226,1.0,1.0,3.0
12354,1079.4,1,254,2.0,1.0,3.0
12355,947.61,2,236,2.0,1.0,3.0


## Need to Massage Quantile Discretizer Outputs to Produce Proper RFM Scores
Despite binning the customers by quantile, the labels given to each quantile is not the same value as the quantile's RFM score. Firstly, the QuantileDiscritzer outputs range from 0-4, when we would like them to range from 1-5. Secondly, for recency, there is an inverse relationship between the quantile rank and its RFM score (i.e. larger quantile score = relatively higher recency value = lower recency score).

This means we need to transform the RFM scores by adding new columns that are functions of the original ones generated by QuantileDiscritizer.

In [0]:
#rename the fields produced by QuantileDiscritizer
rfm_table = (rfm_table.withColumnRenamed("monetary_score","old_monetary_score")
                      .withColumnRenamed("frequency_score","old_frequency_score")
                      .withColumnRenamed("recency_score","old_recency_score"))

#transform and drop old score values
rfm_table = rfm_table.withColumn("monetary_score",col("old_monetary_score")+1).drop("old_monetary_score")
rfm_table = rfm_table.withColumn("frequency_score",col("old_frequency_score")+1).drop("old_frequency_score")
udf_recency= udf(lambda x: -x + 5, FloatType())
rfm_table = rfm_table.withColumn("recency_score",udf_recency("old_recency_score")).drop("old_recency_score")
rfm_table.show()

### Concatenating RFM scores
As a precursory step to mapping RFM scores to meaningful labels, we create a new field that represents all three scores as a 3 character string. This way, in future steps, we can match the values of this field with regex patterns that map RFM scores to our desired labels.

In [0]:
rfm_table = (rfm_table.withColumn("recency_score",rfm_table.recency_score.cast(IntegerType()))
                      .withColumn("frequency_score",rfm_table.frequency_score.cast(IntegerType()))
                      .withColumn("monetary_score",rfm_table.monetary_score.cast(IntegerType())))

rfm_table = (rfm_table.withColumn("rfm_score",
                                  concat(rfm_table.recency_score.cast(StringType()),
                                         rfm_table.frequency_score.cast(StringType()),
                                         rfm_table.monetary_score.cast(StringType()))
                                 )
            )

rfm_table.show(10)

### Mapping RFM scores to Insight Labels
The different labels we give to the different RFM groupings are shown in the `lookups` tuple list below. This list also reveals that we actually don't use the Monetary score as a factor in labeling our customers.

The paragraph below builds a Spark User Defined Function (UDF) that allows us to pass our `lookup` function defined below to PySpark DataFrame methods, so we can easily feed our function with the RFM scores we just finished building.

In [0]:
import re

def lookup(s):
    lookups  = [
        ('^[1-2][1-2]','Hibernating'),
        ('^[1-2][3-4]','At Risk'),
        ('^[1-2]5','Can\'t Lose'),
        ('^3[1-2]','About to Sleep'),
        ('^33','Need Attention'),
        ('^[3-4][4-5]','Loyal Customers'),
        ('^41','Promising'),
        ('^51','New Customers'),
        ('^[4-5][2-3]','Potential Loyalists'),
        ('^5[4-5]','Champions')
    ]
    for pattern, value in lookups:
        if re.search(pattern, s):
            return value
    return None
  
lookup_udf = udf(lookup, StringType())

### Create new "segment" field with our lables based on rfm_score

In [0]:
#add new segment column
rfm_table = rfm_table.withColumn("segment",lookup_udf(rfm_table.rfm_score))
display(rfm_table.limit(5))

customer_id,monetary,frequency,recency,monetary_score,frequency_score,recency_score,rfm_score,segment
12346,-64.67999999999364,17,347,1,5,2,251,Can't Lose
12347,5633.32,8,24,5,4,5,545,Champions
12348,2019.4,5,97,4,4,3,344,Loyal Customers
12349,4404.539999999999,5,40,5,4,4,445,Loyal Customers
12350,334.40000000000003,1,332,2,2,2,222,Hibernating


### Summary aggregations on RFM classes

In [0]:
final_rfm_results = (rfm_table.groupby("segment")
                              .agg(avg(rfm_table.recency).alias("recency_mean"),  
                                   avg(rfm_table.frequency).alias("frequency_mean"),
                                   avg(rfm_table.monetary).alias("monetary_mean"),
                                   count(rfm_table.monetary).alias("count")
                                   )
                    )
final_rfm_results.show()