RFM Analysis
RFM is we can say a method to analyze customer value so that we can focus on customer accordingly(We can group our customer into clusters to focus on a group having same properties similarly)

RFM stands for three dimension:

1)Recency: How recently did customer purchase(Duration since last purchase)

2)Frequency: How often did they purchase(Total number of purchases)

3)Monetary Value: How much did they spent(Total money the customer spent)

In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
spark = SparkSession \
    .builder \
    .appName("RFM Analysis with PySpark") \
    .getOrCreate()

In [6]:
spark

In [7]:
#load dataset 
data = spark.read.format("csv").option("header", "true").option("delimiter",";").load("trans.csv")

data.show()

+--------+----------+------+------+-------------+-------+-------+--------+----+--------+
|trans_id|account_id|  date|  type|    operation| amount|balance|k_symbol|bank| account|
+--------+----------+------+------+-------------+-------+-------+--------+----+--------+
|  695247|      2378|930101|PRIJEM|        VKLAD| 700.00| 700.00|    null|null|    null|
|  171812|       576|930101|PRIJEM|        VKLAD| 900.00| 900.00|    null|null|    null|
|  207264|       704|930101|PRIJEM|        VKLAD|1000.00|1000.00|    null|null|    null|
| 1117247|      3818|930101|PRIJEM|        VKLAD| 600.00| 600.00|    null|null|    null|
|  579373|      1972|930102|PRIJEM|        VKLAD| 400.00| 400.00|    null|null|    null|
|  771035|      2632|930102|PRIJEM|        VKLAD|1100.00|1100.00|    null|null|    null|
|  452728|      1539|930103|PRIJEM|        VKLAD| 600.00| 600.00|    null|null|    null|
|  725751|      2484|930103|PRIJEM|        VKLAD|1100.00|1100.00|    null|null|    null|
|  497211|      1695|

In [8]:
#drop the columns that its not important and contains null
df = data.drop('account', 'bank','k_symbol')
df.show()

+--------+----------+------+------+-------------+-------+-------+
|trans_id|account_id|  date|  type|    operation| amount|balance|
+--------+----------+------+------+-------------+-------+-------+
|  695247|      2378|930101|PRIJEM|        VKLAD| 700.00| 700.00|
|  171812|       576|930101|PRIJEM|        VKLAD| 900.00| 900.00|
|  207264|       704|930101|PRIJEM|        VKLAD|1000.00|1000.00|
| 1117247|      3818|930101|PRIJEM|        VKLAD| 600.00| 600.00|
|  579373|      1972|930102|PRIJEM|        VKLAD| 400.00| 400.00|
|  771035|      2632|930102|PRIJEM|        VKLAD|1100.00|1100.00|
|  452728|      1539|930103|PRIJEM|        VKLAD| 600.00| 600.00|
|  725751|      2484|930103|PRIJEM|        VKLAD|1100.00|1100.00|
|  497211|      1695|930103|PRIJEM|        VKLAD| 200.00| 200.00|
|  232960|       793|930103|PRIJEM|        VKLAD| 800.00| 800.00|
|  505240|      1726|930103|PRIJEM|        VKLAD|1000.00|1000.00|
|  144541|       485|930104|PRIJEM|        VKLAD| 300.00| 300.00|
|  637741|

In [9]:
#see a summary of table
df.describe().show()

+-------+------------------+------------------+------------------+-------+--------------+-----------------+-----------------+
|summary|          trans_id|        account_id|              date|   type|     operation|           amount|          balance|
+-------+------------------+------------------+------------------+-------+--------------+-----------------+-----------------+
|  count|           1056320|           1056320|           1056320|1056320|        873206|          1056320|          1056320|
|   mean|1335310.7043301272|2936.8672902150865| 965674.8198926462|   null|          null|5924.145675834605|38518.33080316599|
| stddev|1227486.5083824112| 2477.345127182346|13945.346734698618|   null|          null| 9522.73537312024|22117.86801259184|
|    min|                 1|                 1|            930101| PRIJEM|PREVOD NA UCET|             0.00|            -1.60|
|    max|            999980|               998|            981231|  VYDAJ|  VYBER KARTOU|          9999.00|         99

In [10]:
#find out the types of each column
df.schema

StructType(List(StructField(trans_id,StringType,true),StructField(account_id,StringType,true),StructField(date,StringType,true),StructField(type,StringType,true),StructField(operation,StringType,true),StructField(amount,StringType,true),StructField(balance,StringType,true)))

In [11]:
from pyspark.sql import functions as F

full_date_var = concat(lit("19"), col("date"))
full_date_func = (F.when(F.col("date") == df['date'], full_date_var)
                  .otherwise(F.col("date")))

df_1 = df.withColumn("full_date", full_date_func)
df_1.show()

+--------+----------+------+------+-------------+-------+-------+---------+
|trans_id|account_id|  date|  type|    operation| amount|balance|full_date|
+--------+----------+------+------+-------------+-------+-------+---------+
|  695247|      2378|930101|PRIJEM|        VKLAD| 700.00| 700.00| 19930101|
|  171812|       576|930101|PRIJEM|        VKLAD| 900.00| 900.00| 19930101|
|  207264|       704|930101|PRIJEM|        VKLAD|1000.00|1000.00| 19930101|
| 1117247|      3818|930101|PRIJEM|        VKLAD| 600.00| 600.00| 19930101|
|  579373|      1972|930102|PRIJEM|        VKLAD| 400.00| 400.00| 19930102|
|  771035|      2632|930102|PRIJEM|        VKLAD|1100.00|1100.00| 19930102|
|  452728|      1539|930103|PRIJEM|        VKLAD| 600.00| 600.00| 19930103|
|  725751|      2484|930103|PRIJEM|        VKLAD|1100.00|1100.00| 19930103|
|  497211|      1695|930103|PRIJEM|        VKLAD| 200.00| 200.00| 19930103|
|  232960|       793|930103|PRIJEM|        VKLAD| 800.00| 800.00| 19930103|
|  505240|  

In [18]:
from pyspark.sql.functions import *

# convert string(date) to date(date) in new column
df_trans = df_1.withColumn("new_full_date", to_date(col("full_date"), "yyyyMMdd"))

In [19]:
from pyspark.sql.functions import mean, min, max, sum, datediff, to_date

# compute last date for every account
date_max = df_trans.select(max('new_full_date')).toPandas()

# compute today
current = current_date()

# Calculatre Duration
df_trans = df_trans.withColumn('Duration', datediff(lit(current), 'new_full_date'))

In [20]:
df_trans.show(5)

+--------+----------+------+------+---------+-------+-------+---------+-------------+--------+
|trans_id|account_id|  date|  type|operation| amount|balance|full_date|new_full_date|Duration|
+--------+----------+------+------+---------+-------+-------+---------+-------------+--------+
|  695247|      2378|930101|PRIJEM|    VKLAD| 700.00| 700.00| 19930101|   1993-01-01|   10343|
|  171812|       576|930101|PRIJEM|    VKLAD| 900.00| 900.00| 19930101|   1993-01-01|   10343|
|  207264|       704|930101|PRIJEM|    VKLAD|1000.00|1000.00| 19930101|   1993-01-01|   10343|
| 1117247|      3818|930101|PRIJEM|    VKLAD| 600.00| 600.00| 19930101|   1993-01-01|   10343|
|  579373|      1972|930102|PRIJEM|    VKLAD| 400.00| 400.00| 19930102|   1993-01-02|   10342|
+--------+----------+------+------+---------+-------+-------+---------+-------------+--------+
only showing top 5 rows



In [24]:
# recency = df.groupBy("account_id").agg(min("date").alias("Recency"))
# frequency = df.groupBy("account_id","trans_id").count().groupBy("account_id").agg(count("*").alias("Frequency"))
# # Frequency = df_trans.groupby('account_id').agg(f.count('trans_id').alias('count')) 
# monetary = df.groupBy("account_id").agg(round(sum("amount"), 2).alias("Monetary"))
# rfm = recency.join(frequency,"account_id",how ="inner").join(monetary,"account_id",how ="inner")
import pyspark.sql.functions as f
rfm_table = df_trans.groupBy("account_id")\
                        .agg(f.count('trans_id').alias("Frequency"), \
                             count("Duration").alias("recency"), \
                             sum("amount").alias("Monetary"))

In [25]:
rfm_table.printSchema()

root
 |-- account_id: string (nullable = true)
 |-- Frequency: long (nullable = false)
 |-- recency: long (nullable = false)
 |-- Monetary: double (nullable = true)



In [26]:
rfm_table.show(5)

+----------+---------+-------+------------------+
|account_id|Frequency|recency|          Monetary|
+----------+---------+-------+------------------+
|      4937|      560|    560| 5179029.400000004|
|      3210|      360|    360| 3561189.000000001|
|      2088|      348|    348|          783122.5|
|       467|      415|    415|1493660.1999999997|
|      1512|      337|    337|1833007.0999999994|
+----------+---------+-------+------------------+
only showing top 5 rows



In [28]:
def describe_pd(df_in, columns, deciles=False):
    if deciles:
        percentiles = np.array(range(0, 110, 10))
    else:
        percentiles = [25, 50, 75]
    percs = np.transpose([np.percentile(df_in.select(x).collect(),percentiles) for x in columns])
    percs = pd.DataFrame(percs,columns=columns)
    percs["summary"] = [str(p) + "%"for p in percentiles]
    spark_describe = df_in.describe().toPandas()
    new_df = pd.concat([spark_describe, percs],ignore_index=True)
    new_df = new_df.round(2)
    return new_df[["summary"] + columns]

In [29]:
import numpy as np

In [31]:
# cols = ["Recency","Frequency","Monetary"]
# rfm.select(cols).describe().show()

In [40]:
#Using the quantile for defining the R,F,M values between 1 and 4
#According to the magnitudes we have assigned values between 1 to 4 to the attributes

def RScore(x):
  #Smaller value of x(Recency) tells us that the particular customer has done some activity(like buying something or using some product) recently and contrary larger the value of x will give some inference that customer wasn't involved in activity from a long time
  if x <= 16:
    return 1
  elif x<= 50:
    return 2
  elif x<= 143:
    return 3
  else:
    return 4

def FScore(x):
  #Smaller the value of x(Frequency) tell that the customer is not involved in activities frequently and for customer with high value of x denotes that customer is involved in Frequent activities
  if x <= 1:
    return 4
  elif x <= 3:
    return 3
  elif x <= 5:
    return 2
  else:
    return 1

def MScore(x):
  #Smaller the value of x(Monetary value) tells us that customer activities cost is not much(has not spent much money on buying some product etc) and contrary higher value of x denotes that customer has spent a lot of money on activities
  if x <= 293:
    return 4
  elif x <= 648:
    return 3
  elif x <= 1611:
    return 2
  else:
    return 1

#A customer can have any of the permutation of these values corresponding to their activities
  

#For each and every value of R,F,M we will pass them through the lambda function in corresponding R_udf,F_udf,M_udf
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())

#RFM segamentation
from pyspark.sql.functions import concat

rfm_seg=rfm_table.withColumn("r_seg", R_udf("Recency"))
rfm_seg=rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg=rfm_seg.withColumn("m_seg", M_udf("Monetary"))
#Display is inbuilt function Databricks environment to show the dataframe
display(rfm_seg.head(5))

[Row(account_id='4937', Frequency=560, recency=560, Monetary=5179029.400000004, r_seg='4', f_seg='1', m_seg='1'),
 Row(account_id='3210', Frequency=360, recency=360, Monetary=3561189.000000001, r_seg='4', f_seg='1', m_seg='1'),
 Row(account_id='2088', Frequency=348, recency=348, Monetary=783122.5, r_seg='4', f_seg='1', m_seg='1'),
 Row(account_id='467', Frequency=415, recency=415, Monetary=1493660.1999999997, r_seg='4', f_seg='1', m_seg='1'),
 Row(account_id='1512', Frequency=337, recency=337, Monetary=1833007.0999999994, r_seg='4', f_seg='1', m_seg='1')]

In [41]:
col_list=["r_seg","f_seg","m_seg"]

#RFM score is nothing but the concatenated R,F,M values
rfm_seg=rfm_seg.withColumn("RFMScore",concat(*col_list))
display(rfm_seg.sort("RFMScore").head(5))

[Row(account_id='2892', Frequency=13, recency=13, Monetary=40700.0, r_seg='1', f_seg='1', m_seg='1', RFMScore='111'),
 Row(account_id='1563', Frequency=12, recency=12, Monetary=33400.0, r_seg='1', f_seg='1', m_seg='1', RFMScore='111'),
 Row(account_id='2300', Frequency=15, recency=15, Monetary=37700.0, r_seg='1', f_seg='1', m_seg='1', RFMScore='111'),
 Row(account_id='727', Frequency=11, recency=11, Monetary=35400.0, r_seg='1', f_seg='1', m_seg='1', RFMScore='111'),
 Row(account_id='869', Frequency=16, recency=16, Monetary=41300.0, r_seg='1', f_seg='1', m_seg='1', RFMScore='111')]

In [42]:
#Statistical summary for each RFM score(Mapping of RFM score against average R,F,M values)
display(rfm_seg.groupBy("RFMScore").agg({"Recency":"mean","Frequency":"mean","Monetary":"mean"} ).sort(["RFMScore"]).head(5))

[Row(RFMScore='111', avg(Recency)=13.0, avg(Monetary)=37169.230769230766, avg(Frequency)=13.0),
 Row(RFMScore='211', avg(Recency)=35.96551724137931, avg(Monetary)=132443.56666666665, avg(Frequency)=35.96551724137931),
 Row(RFMScore='311', avg(Recency)=101.73720136518772, avg(Monetary)=554965.3031569968, avg(Frequency)=101.73720136518772),
 Row(RFMScore='411', avg(Recency)=289.2769516728625, avg(Monetary)=1733385.512608427, avg(Frequency)=289.2769516728625)]