In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=0af26c0baf61cc74459fe99ae731eeddaa5fc49791c64cdb6efd38e5d7ef0174
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [3]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.types import *

In [5]:
from google.colab import files
files.upload()

Saving online_retail_II.csv to online_retail_II.csv


In [6]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('RFM segmentation with spark').getOrCreate()

In [7]:
df = spark.read.csv('online_retail_II.csv',header=True,sep = ',',inferSchema=True)
df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|2009-12-01 07:45:00| 1.2

In [8]:
df.printSchema()

root
 |-- Invoice: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Price: double (nullable = true)
 |-- Customer ID: double (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
df.toPandas().shape

(1067371, 8)

# **LIMPIEZA Y MANIPULACION DE DATOS**

In [10]:
#Realizamos una funcion que cuenta la cantidad de valores NO NULOS en cada columna del dataframde

import pyspark.sql.functions as F

def check_null(data):
    data.agg(*[F.count(c).alias(c)
             for c in data.columns]).show()

In [11]:
check_null(df)

+-------+---------+-----------+--------+-----------+-------+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|  Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-------+-----------+-------+
|1067371|  1067371|    1062989| 1067371|    1067371|1067371|     824364|1067371|
+-------+---------+-----------+--------+-----------+-------+-----------+-------+



In [12]:
#Eliminamos todas las filas que tengan al menos un valor nulo
df = df.dropna(how='any')
check_null(df)

+-------+---------+-----------+--------+-----------+------+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate| Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+------+-----------+-------+
| 824364|   824364|     824364|  824364|     824364|824364|     824364| 824364|
+-------+---------+-----------+--------+-----------+------+-----------+-------+



In [13]:
#Quitamos las invoice canceladas

df=df.filter(~F.col('Invoice').contains('C'))
df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|2009-12-01 07:45:00| 1.2

In [14]:
df.toPandas().shape

(805620, 8)

In [15]:
#Creamos una funcion en SPark para cambiar la coma por punto
#La funcion a crear hace lo siguiente
#def make_float(value):
  #updated_str=value.replace(',','.')
  #updated_float=float(updated_str)
  #return updated_str

make_float_udf = F.udf(lambda value: float(str(value.replace(',', '.'))), FloatType())

In [16]:
#No aplicamos los funcion por el valor ya es flotante, pero en caso de que sea string, hariamos lo siguiente
#df = df.withColumn('Price', make_float_udf(F.col("Price")))
#df.show(5)

In [17]:
#Calculamos la columna Total Price
df=df.withColumn('TotalPrice',F.round(df.Price*df.Quantity,2))
df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|TotalPrice|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|      83.4|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|     100.8|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|      30.0|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|U

In [18]:
#Creamos una funcion en SPark para cambiar InvoiceDate a data time type
#La funcion a crear hace lo siguiente
#def make_time(value):
  #updated_time= pd.to_datetime(value, format = '%d.%m.%Y %H:%M')
  #return  updated_time

make_time_udf = F.udf(lambda value: pd.to_datetime(value, format = '%Y-%m-%d %H:%M:%S'), TimestampType())


In [19]:
df = df.withColumn('InvoiceDate', make_time_udf(F.col("InvoiceDate")))
df.show(5)

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|TotalPrice|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|      83.4|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|     100.8|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|      30.0|
+-------+---------+--------------------+--------+-------------------+-----+-----------+-

In [20]:
#Obtenemos la fecha maxima y lo guardamos en un dataframe de pandas
date_max = df.select(F.max(df.InvoiceDate).alias('max_date')).toPandas() #toPandas lo guarda como un dataframe de pandas
date_max


Unnamed: 0,max_date
0,2011-12-09 12:50:00


In [21]:
# Se calcula la diferencia entre la fecha maxima y la fecha de la factura
df = df.withColumn('Duration', F.datediff(F.lit(date_max.iloc[0][0]), 'InvoiceDate')) #lit crea una columna con un valor literal constante
df.show(5)

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+--------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|TotalPrice|Duration|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+----------+--------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|      83.4|     738|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|     738|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|      81.0|     738|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|     100.8|     738|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|      30.0|     738|
+-------+-------

In [22]:
#Renombramos columnas

df=df.withColumnRenamed('Invoice','invoice').withColumnRenamed('Customer ID', 'customer_id').withColumnRenamed('Duration', 'duration').withColumnRenamed('TotalPrice', 'total_price')
df.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+-----------+--------+
|invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|customer_id|       Country|total_price|duration|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+-----------+--------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|       83.4|     738|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|       81.0|     738|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|       81.0|     738|
| 489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|      100.8|     738|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|       30.0|     738|
| 489434

# **MODELO RFM**

RECENCY

In [26]:
recency = df.groupby('customer_id').agg(F.min('duration').alias('recency'))

recency.show(5)

+-----------+-------+
|customer_id|recency|
+-----------+-------+
|    17884.0|      3|
|    14285.0|     21|
|    16596.0|     15|
|    16822.0|    686|
|    17072.0|    625|
+-----------+-------+
only showing top 5 rows



FREQUENCY

In [28]:
frequency = df.groupby('customer_id','invoice').count()

frequency.orderBy(frequency.customer_id.desc()).show(5)

+-----------+-------+-----+
|customer_id|invoice|count|
+-----------+-------+-----+
|    18287.0| 523290|    2|
|    18287.0| 508581|   54|
|    18287.0| 523289|   19|
|    18287.0| 534346|   10|
|    18287.0| 554065|   29|
+-----------+-------+-----+
only showing top 5 rows



In [29]:
frequency = frequency.groupby('customer_id').agg(F.count('*').alias('frequency'))

frequency.orderBy(frequency.frequency.desc()).show(5)

+-----------+---------+
|customer_id|frequency|
+-----------+---------+
|    14911.0|      398|
|    12748.0|      337|
|    17841.0|      211|
|    15311.0|      208|
|    13089.0|      203|
+-----------+---------+
only showing top 5 rows



MONETARY

In [30]:
monetary = df.groupby('customer_id').agg(F.round(F.sum('total_price'),2).alias('monetary_value'))

monetary.orderBy(F.col('monetary_value').desc()).show(5)

+-----------+--------------+
|customer_id|monetary_value|
+-----------+--------------+
|    18102.0|     608821.65|
|    14646.0|     528602.52|
|    14156.0|     313946.37|
|    14911.0|     295972.63|
|    17450.0|     246973.09|
+-----------+--------------+
only showing top 5 rows



RFM DATAFRAME

In [31]:
#Juntamos todos los dataframes individuales

rfm = recency.join(frequency,on='customer_id',how='inner').join(monetary,on='customer_id',how='inner')

rfm.show(5)

+-----------+-------+---------+--------------+
|customer_id|recency|frequency|monetary_value|
+-----------+-------+---------+--------------+
|    17884.0|      3|       20|       3072.89|
|    14285.0|     21|        8|       3284.42|
|    16596.0|     15|        4|        579.63|
|    16822.0|    686|        1|        181.39|
|    17072.0|    625|        1|        282.05|
+-----------+-------+---------+--------------+
only showing top 5 rows



In [32]:
rfm.describe().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|       customer_id|           recency|         frequency|   monetary_value|
+-------+------------------+------------------+------------------+-----------------+
|  count|              5881|              5881|              5881|             5881|
|   mean|15314.674205067166| 200.9928583574222|6.2871960550926715|3017.076884883534|
| stddev|1715.4297590182236|209.48965104407029| 13.01287905904238|14734.12861679375|
|    min|           12346.0|                 0|                 1|              0.0|
|    max|           18287.0|               738|               398|        608821.65|
+-------+------------------+------------------+------------------+-----------------+



# **RFM SEGMENTATION**

In [23]:
#Definimos la funcion los scores

#Funcion Recency Score

def r_score(r_value):
    if r_value <= 14:    # 2 weeks
        return 1
    elif r_value <= 31:   # 1 month
        return 2
    elif r_value <= 93:    # 3 month
        return 3
    else:
        return 4

#Funcion Frequency Score

def f_score(f_value):
    if f_value <= 3:
        return 4
    elif f_value <= 18:
        return 3
    elif f_value <= 36:
        return 2
    else:
        return 1

#Funcion Monetary Score

def m_score(m_value):
    if m_value <= 10:
        return 4
    elif m_value <= 100:
        return 3
    elif m_value <= 1000:
        return 2
    else:
        return 1

In [24]:
#Creamos las funciones udf para aplicar en el dataframe

r_udf = F.udf(lambda r_value: r_score(r_value), StringType())
f_udf = F.udf(lambda f_value: f_score(f_value), StringType())
m_udf = F.udf(lambda m_value: m_score(m_value), StringType())


In [33]:
#Agregamos los scores a cada cliente

rfm_seg = rfm.withColumn('r_score', r_udf(F.col('recency')))
rfm_seg = rfm_seg.withColumn('f_score', f_udf(F.col('frequency')))
rfm_seg = rfm_seg.withColumn('m_score', m_udf(F.col('monetary_value')))
rfm_seg.show(5)

+-----------+-------+---------+--------------+-------+-------+-------+
|customer_id|recency|frequency|monetary_value|r_score|f_score|m_score|
+-----------+-------+---------+--------------+-------+-------+-------+
|    17884.0|      3|       20|       3072.89|      1|      2|      1|
|    14285.0|     21|        8|       3284.42|      2|      3|      1|
|    16596.0|     15|        4|        579.63|      2|      3|      2|
|    16822.0|    686|        1|        181.39|      4|      4|      2|
|    17072.0|    625|        1|        282.05|      4|      4|      2|
+-----------+-------+---------+--------------+-------+-------+-------+
only showing top 5 rows



In [34]:
#Juntamos r, f y m en una sola columna

rfm_seg = rfm_seg.withColumn('rfm_score',F.concat(F.col('r_score'), F.col('f_score'), F.col('m_score')))

rfm_seg.sort(F.col('rfm_score').asc()).show(5)

+-----------+-------+---------+--------------+-------+-------+-------+---------+
|customer_id|recency|frequency|monetary_value|r_score|f_score|m_score|rfm_score|
+-----------+-------+---------+--------------+-------+-------+-------+---------+
|    17675.0|      1|       70|       38259.6|      1|      1|      1|      111|
|    17811.0|      4|       47|        9609.8|      1|      1|      1|      111|
|    12471.0|      2|       79|      39963.79|      1|      1|      1|      111|
|    16746.0|      4|       42|       18066.4|      1|      1|      1|      111|
|    15311.0|      0|      208|     116771.16|      1|      1|      1|      111|
+-----------+-------+---------+--------------+-------+-------+-------+---------+
only showing top 5 rows



**RESUMEN** **ESTADISTICO**

In [35]:
#Muestro los valores promedios de cada segmento
rfm_agg =rfm_seg.groupby('rfm_score').agg({'recency': 'mean','frequency': 'mean','monetary_value': 'mean'}).sort(F.col('rfm_score'))
rfm_agg.show()

+---------+-------------------+------------------+------------------+
|rfm_score|avg(monetary_value)|      avg(recency)|    avg(frequency)|
+---------+-------------------+------------------+------------------+
|      111|  60970.98999999999|             3.775|            76.575|
|      121| 11485.007103448283|4.9862068965517246| 25.83448275862069|
|      131| 3901.8938875878257| 6.437939110070258| 9.456674473067915|
|      132|  729.5262264150944| 6.943396226415095| 5.113207547169812|
|      141| 5872.5364285714295| 6.571428571428571|2.5476190476190474|
|      142|  479.8154777070064| 6.598726114649682|2.0127388535031847|
|      143|             81.435|              7.25|              1.25|
|      144|                0.0|              14.0|               1.0|
|      211|  30281.53363636364|22.181818181818183| 60.63636363636363|
|      221| 12865.928999999998|              20.9|23.983333333333334|
|      231| 3920.9593654822356| 22.35532994923858| 9.208121827411167|
|      232|  764.968

In [36]:
rfm_agg.select('rfm_score',
               (F.round('avg(recency)', 0).alias('recency_mean')),
               (F.round('avg(frequency)', 0).alias('frequency_mean')),
               (F.round('avg(monetary_value)', 0).alias('monetaryValue_mean'))).show(5)

+---------+------------+--------------+------------------+
|rfm_score|recency_mean|frequency_mean|monetaryValue_mean|
+---------+------------+--------------+------------------+
|      111|         4.0|          77.0|           60971.0|
|      121|         5.0|          26.0|           11485.0|
|      131|         6.0|           9.0|            3902.0|
|      132|         7.0|           5.0|             730.0|
|      141|         7.0|           3.0|            5873.0|
+---------+------------+--------------+------------------+
only showing top 5 rows

