# Proyecto

En este notebook se explorarán los datos del dataset de **Retail Data Analytics** propuesta en [Kaggle](https://www.kaggle.com/manjeetsingh/retaildataset?select=Features+data+set.csv) mediante el uso de funciones de Spark.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType, BooleanType, DoubleType, DateType
from pyspark.sql.functions import to_date, col, datediff, lit
import pyspark.sql.functions as sf

## Leyendo csv's:

### Explorando file system

In [0]:
#display(dbutils.fs.ls("mnt/etlp1a-vicmacbec1@hotmail.com-si/retail-org"))
display(dbutils.fs.ls("FileStore/tables"))

path,name,size
dbfs:/FileStore/tables/Features_data_set-1.csv,Features_data_set-1.csv,600478
dbfs:/FileStore/tables/Features_data_set.csv,Features_data_set.csv,600478
dbfs:/FileStore/tables/sales_data_set-1.csv,sales_data_set-1.csv,13264115
dbfs:/FileStore/tables/sales_data_set.csv,sales_data_set.csv,13264115
dbfs:/FileStore/tables/stores_data_set-1.csv,stores_data_set-1.csv,577
dbfs:/FileStore/tables/stores_data_set.csv,stores_data_set.csv,577


### Features data set.csv

In [0]:
%fs head FileStore/tables/Features_data_set.csv

In [0]:
# File location and type
file_location = "/FileStore/tables/Features_data_set.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
features = (spark.read.format(file_type)
            .option("inferSchema", infer_schema)
            .option("header", first_row_is_header)
            .option("sep", delimiter)
            .load(file_location))

display(features)

Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,05/02/2010,42.31,2.572,,,,,,211.0963582,8.106,False
1,12/02/2010,38.51,2.548,,,,,,211.2421698,8.106,True
1,19/02/2010,39.93,2.514,,,,,,211.2891429,8.106,False
1,26/02/2010,46.63,2.561,,,,,,211.3196429,8.106,False
1,05/03/2010,46.5,2.625,,,,,,211.3501429,8.106,False
1,12/03/2010,57.79,2.667,,,,,,211.3806429,8.106,False
1,19/03/2010,54.58,2.72,,,,,,211.215635,8.106,False
1,26/03/2010,51.45,2.732,,,,,,211.0180424,8.106,False
1,02/04/2010,62.27,2.719,,,,,,210.8204499,7.808,False
1,09/04/2010,65.86,2.77,,,,,,210.6228574,7.808,False


In [0]:
features.printSchema()

Se observa que Date, MarkDown*, CPI y Unemployment están mal inferidos, por lo que se procede a declarar el esquema.

In [0]:
featuresSchema = StructType([
  StructField("Store", IntegerType(), True), 
  StructField("Date", StringType(), True),
  StructField("Temperature", DoubleType(), True),
  StructField("Fuel_Price", DoubleType(), True),
  StructField("MarkDown1", DoubleType(), True),
  StructField("MarkDown2", DoubleType(), True),
  StructField("MarkDown3", DoubleType(), True),
  StructField("MarkDown4", DoubleType(), True),
  StructField("MarkDown5", DoubleType(), True),
  StructField("CPI", DoubleType(), True),
  StructField("Unemployment", DoubleType(), True),
  StructField("IsHoliday", BooleanType(), True)
])

features2 = (spark.read
             .format(file_type)
             .option("header", first_row_is_header)
             .option("sep", delimiter)
             .schema(featuresSchema)
             .load(file_location))

display(features2)

Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,05/02/2010,42.31,2.572,,,,,,211.0963582,8.106,False
1,12/02/2010,38.51,2.548,,,,,,211.2421698,8.106,True
1,19/02/2010,39.93,2.514,,,,,,211.2891429,8.106,False
1,26/02/2010,46.63,2.561,,,,,,211.3196429,8.106,False
1,05/03/2010,46.5,2.625,,,,,,211.3501429,8.106,False
1,12/03/2010,57.79,2.667,,,,,,211.3806429,8.106,False
1,19/03/2010,54.58,2.72,,,,,,211.215635,8.106,False
1,26/03/2010,51.45,2.732,,,,,,211.0180424,8.106,False
1,02/04/2010,62.27,2.719,,,,,,210.8204499,7.808,False
1,09/04/2010,65.86,2.77,,,,,,210.6228574,7.808,False


Cambiando el tipo de Date (String -> Date)

In [0]:
features3 = features2.withColumn("Date", to_date(col("Date"),"dd/MM/yyyy"))
#changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))
display(features3)

Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,2010-02-05,42.31,2.572,,,,,,211.0963582,8.106,False
1,2010-02-12,38.51,2.548,,,,,,211.2421698,8.106,True
1,2010-02-19,39.93,2.514,,,,,,211.2891429,8.106,False
1,2010-02-26,46.63,2.561,,,,,,211.3196429,8.106,False
1,2010-03-05,46.5,2.625,,,,,,211.3501429,8.106,False
1,2010-03-12,57.79,2.667,,,,,,211.3806429,8.106,False
1,2010-03-19,54.58,2.72,,,,,,211.215635,8.106,False
1,2010-03-26,51.45,2.732,,,,,,211.0180424,8.106,False
1,2010-04-02,62.27,2.719,,,,,,210.8204499,7.808,False
1,2010-04-09,65.86,2.77,,,,,,210.6228574,7.808,False


In [0]:
features3.printSchema()

In [0]:
features3.count()

### sale data-set.csv

In [0]:
%fs head FileStore/tables/sales_data_set.csv

In [0]:
# File location and type
file_location = "/FileStore/tables/sales_data_set.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
sales = (spark.read.format(file_type)
         .option("inferSchema", infer_schema)
         .option("header", first_row_is_header)
         .option("sep", delimiter)
         .load(file_location))

display(sales)

Store,Dept,Date,Weekly_Sales,IsHoliday
1,1,05/02/2010,24924.5,False
1,1,12/02/2010,46039.49,True
1,1,19/02/2010,41595.55,False
1,1,26/02/2010,19403.54,False
1,1,05/03/2010,21827.9,False
1,1,12/03/2010,21043.39,False
1,1,19/03/2010,22136.64,False
1,1,26/03/2010,26229.21,False
1,1,02/04/2010,57258.43,False
1,1,09/04/2010,42960.91,False


In [0]:
sales.printSchema()

Se observa que Date está mal inferido, por lo que se procede a declarar el esquema.

In [0]:
salesSchema = StructType([
  StructField("Store", IntegerType(), True), 
  StructField("Dept", IntegerType(), True),
  StructField("Date", StringType(), True),
  StructField("Weekly_Sales", DoubleType(), True),
  StructField("IsHoliday", BooleanType(), True)
])

sales2 = (spark.read
          .format(file_type)
          .option("header", first_row_is_header)
          .option("sep", delimiter)
          .schema(salesSchema)
          .load(file_location))

display(sales2)

Store,Dept,Date,Weekly_Sales,IsHoliday
1,1,05/02/2010,24924.5,False
1,1,12/02/2010,46039.49,True
1,1,19/02/2010,41595.55,False
1,1,26/02/2010,19403.54,False
1,1,05/03/2010,21827.9,False
1,1,12/03/2010,21043.39,False
1,1,19/03/2010,22136.64,False
1,1,26/03/2010,26229.21,False
1,1,02/04/2010,57258.43,False
1,1,09/04/2010,42960.91,False


Cambiando el tipo de Date (String -> Date)

In [0]:
sales3 = sales2.withColumn("Date", to_date(col("Date"),"dd/MM/yyyy"))
#changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))
display(sales3)

Store,Dept,Date,Weekly_Sales,IsHoliday
1,1,2010-02-05,24924.5,False
1,1,2010-02-12,46039.49,True
1,1,2010-02-19,41595.55,False
1,1,2010-02-26,19403.54,False
1,1,2010-03-05,21827.9,False
1,1,2010-03-12,21043.39,False
1,1,2010-03-19,22136.64,False
1,1,2010-03-26,26229.21,False
1,1,2010-04-02,57258.43,False
1,1,2010-04-09,42960.91,False


In [0]:
sales3.printSchema()

In [0]:
sales3.count()

### stores data-set.csv

In [0]:
%fs head FileStore/tables/stores_data_set.csv

In [0]:
# File location and type
file_location = "/FileStore/tables/stores_data_set.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
stores = (spark.read.format(file_type)
          .option("inferSchema", infer_schema)
          .option("header", first_row_is_header)
          .option("sep", delimiter)
          .load(file_location))

display(stores)

Store,Type,Size
1,A,151315
2,A,202307
3,B,37392
4,A,205863
5,B,34875
6,A,202505
7,B,70713
8,A,155078
9,B,125833
10,B,126512


In [0]:
stores.printSchema()

Se observa que se inferió correctamente el esquema.

In [0]:
stores.count()

## Joinning datasets

In [0]:
# features3, sales3, stores
featuresSales = features3.join(sales3, on=['Store','Date','IsHoliday'], how='full') # Store, Date, IsHoliday

display(featuresSales)

Store,Date,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Dept,Weekly_Sales
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,1.0,14539.79
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,2.0,45341.92
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,3.0,32148.93
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,4.0,37449.94
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,5.0,21320.93
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,6.0,3124.83
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,7.0,14088.28
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,8.0,31061.21
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,9.0,18708.23
1,2011-08-12,False,90.76,3.638,,,,,,215.6057878,7.962,10.0,31383.62


In [0]:
featuresSales.count()

Dados los conteos de los registros de las tablas features3 (8,190) y sales3 (421,570), se observa que al hacer outerjoin de estas tablas, se obtienen (423,325) registros, lo que indica que hay registros en cada tabla que no tengan el id correspondiente entre sí.

In [0]:
featuresSalesStores = featuresSales.join(stores, on=['Store'], how='full') # Store

display(featuresSalesStores)

Store,Date,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Dept,Weekly_Sales,Type,Size
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,1.0,15169.49,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,2.0,58356.34,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,3.0,7289.98,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,4.0,31998.55,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,5.0,20564.76,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,6.0,3020.31,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,7.0,22689.32,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,8.0,26618.32,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,9.0,14553.3,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,10.0,19984.26,A,203750


In [0]:
featuresSalesStores.count()

## Explorando data

### Mostrando tabla

In [0]:
display(featuresSalesStores)

Store,Date,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Dept,Weekly_Sales,Type,Size
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,1.0,15169.49,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,2.0,58356.34,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,3.0,7289.98,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,4.0,31998.55,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,5.0,20564.76,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,6.0,3020.31,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,7.0,22689.32,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,8.0,26618.32,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,9.0,14553.3,A,203750
31,2010-06-18,False,86.18,2.637,,,,,,211.1096543,8.2,10.0,19984.26,A,203750


### Análisis estadístico descriptivo

In [0]:
featuresSalesStores.describe().display()

summary,Store,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Dept,Weekly_Sales,Type,Size
count,423325.0,423325.0,423325.0,152433.0,112532.0,138658.0,136466.0,153187.0,422740.0,422740.0,421570.0,421570.0,423325,423325.0
mean,22.20385991850233,60.06713962085865,3.361933245142616,7246.604247374274,3337.5972891266547,1449.0989315437912,3382.019834097878,4618.743782827524,171.21886315838344,7.957296073236492,44.26031738501317,15981.258123467032,,136701.2157940117
stddev,12.78624408303602,18.452598879854165,0.4580381068432779,8319.906707191132,9461.265420282907,9674.70523428954,6306.728467945709,6258.233495654179,39.16708815989315,1.8634333014166284,30.49205401578584,22711.18351916323,,60990.97780085184
min,1.0,-7.29,2.472,-2781.45,-265.76,-179.26,0.22,-185.17,126.064,3.684,1.0,-4988.94,A,34875.0
max,45.0,101.95,4.468,103184.98,104519.54,149483.31,67474.85,771448.1,228.9764563,14.313,99.0,693099.36,C,219622.0


### Realizando algunas agrupaciones

#### Groupby type

In [0]:
(featuresSalesStores
 .groupBy("Type")
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn("Weekly_Sales_Sum", sf.round(col("Weekly_Sales_Sum"), 2))
 .sort('Type')
 .display())

Type,Weekly_Sales_Sum
A,4331014722.75
B,2000700736.82
C,405503527.54


In [0]:
# featuresSalesStores.groupBy("type").agg({'Weekly_Sales':'avg'}).display()
(featuresSalesStores
 .groupBy("Type")
 .agg(sf.avg('Weekly_Sales').alias('Weekly_Sales_Avg'))
 .withColumn("Weekly_Sales_Avg", sf.round(col("Weekly_Sales_Avg"), 2))
 .sort('Type')
 .display())

Type,Weekly_Sales_Avg
A,20099.57
B,12237.08
C,9519.53


In [0]:
(featuresSalesStores
 .groupBy("Type")
 .count()
 .sort('Type')
 .display())

Type,count
A,216336
B,164158
C,42831


#### Groupby type & Date

In [0]:
(featuresSalesStores
 .groupBy(['Date','Type'])
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn('Weekly_Sales_Sum', sf.round(col('Weekly_Sales_Sum'), 2))
 .sort('Date','Type')
 .display())

Date,Type,Weekly_Sales_Sum
2010-02-05,A,32144126.25
2010-02-05,B,14775498.62
2010-02-05,C,2831115.63
2010-02-12,A,30982570.79
2010-02-12,B,14439178.59
2010-02-12,C,2914928.25
2010-02-19,A,31000072.76
2010-02-19,B,14540038.62
2010-02-19,C,2736882.4
2010-02-26,A,28008995.99


#### Groupby type & dept

In [0]:
(featuresSalesStores
 .groupBy(['Dept','Type'])
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn('Weekly_Sales_Sum', sf.round(col('Weekly_Sales_Sum'), 2))
 .sort('Dept','Type')
 .display())

Dept,Type,Weekly_Sales_Sum
,A,
,B,
,C,
1.0,A,72222369.29
1.0,B,43735819.94
1.0,C,7680587.31
2.0,A,163575247.15
2.0,B,104659404.51
2.0,C,12376522.77
3.0,A,43669730.25


### Plots

#### Type ~ Date

In [0]:
(featuresSalesStores
 .groupBy(['Date','Type'])
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn('Weekly_Sales_Sum', sf.round(col('Weekly_Sales_Sum'), 2))
 .sort('Date','Type')
 .display())

Date,Type,Weekly_Sales_Sum
2010-02-05,A,32144126.25
2010-02-05,B,14775498.62
2010-02-05,C,2831115.63
2010-02-12,A,30982570.79
2010-02-12,B,14439178.59
2010-02-12,C,2914928.25
2010-02-19,A,31000072.76
2010-02-19,B,14540038.62
2010-02-19,C,2736882.4
2010-02-26,A,28008995.99


#### Weekly Sales ~ Date

Analizando ventas semanales de departamentos. Se observa que las ventas del departamento 2 son constantes en el año y son las más altas, sin embargo los departamentos 4 y 7 son los que más venden en diciembre.

In [0]:
(featuresSalesStores
 .groupBy(['Date','Dept'])
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn('Weekly_Sales_Sum', sf.round(col('Weekly_Sales_Sum'), 2))
 .sort('Date','Dept')
 .display())

Date,Dept,Weekly_Sales_Sum
2010-02-05,1,881833.41
2010-02-05,2,1997831.89
2010-02-05,3,484368.9
2010-02-05,4,1205801.77
2010-02-05,5,1116952.54
2010-02-05,6,206793.28
2010-02-05,7,764967.96
2010-02-05,8,1527438.17
2010-02-05,9,535727.56
2010-02-05,10,773682.9


#### Boxplot

Analizando las ventas semanales por cada departamento. Se observa que el departamento 16 tiene una gran varianza de ventas semanales.

In [0]:
(featuresSalesStores
 .groupBy(['Date','Dept'])
 .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales_Sum'))
 .withColumn('Weekly_Sales_Sum', sf.round(col('Weekly_Sales_Sum'), 2))
 .sort('Date','Dept')
 .display())

Date,Dept,Weekly_Sales_Sum
2010-02-05,1,881833.41
2010-02-05,2,1997831.89
2010-02-05,3,484368.9
2010-02-05,4,1205801.77
2010-02-05,5,1116952.54
2010-02-05,6,206793.28
2010-02-05,7,764967.96
2010-02-05,8,1527438.17
2010-02-05,9,535727.56
2010-02-05,10,773682.9


## RFM Análisis (Recency, Frequency, Monetary)

Primero hay que seleccionar la información de interes:
- Store
- Date
- Weekly_Sales

Después hay que quitar nulos y agrupar los datos por Date y Store  y hacer la suma de Weekly_Sales.

In [0]:
RFM0 = (featuresSalesStores
       .select('Store', 'Date', 'Weekly_Sales')
       .dropna('any')
       .groupby(['Store','Date'])
       .agg(sf.sum('Weekly_Sales').alias('Weekly_Sales'))
       .withColumn('Weekly_Sales', sf.round(col('Weekly_Sales'), 2))
       )

RFM0.display()

Store,Date,Weekly_Sales
31,2011-06-24,1375962.46
28,2011-08-26,1166479.51
28,2011-05-13,1253316.3
28,2011-09-23,1109105.92
26,2011-09-02,1040143.14
27,2012-02-10,1651605.35
27,2011-04-15,1727175.61
1,2010-04-02,1594968.28
1,2010-03-05,1554806.68
1,2011-12-16,1881176.67


### Recency

Primero se obtiene la última fecha a analizar, en este caso será la útlima reportada.

In [0]:
max_date = RFM0.agg(sf.max('Date'))
max_date.display()

maxDate = max_date.groupBy('max(Date)').max().collect()[0] # Para tomar el valor del DataFrame

max(Date)
2012-10-26


In [0]:
max_date.printSchema()

Luego se revisa la última fecha registrada por cada tienda. En este caso se observa que todas las tiendas han comprado en la misma fecha que la fecha máxima global.

In [0]:
storesMaxDate = RFM0.groupby(['Store']).agg(sf.max('Date'))
display(storesMaxDate)

Store,max(Date)
31,2012-10-26
34,2012-10-26
28,2012-10-26
26,2012-10-26
27,2012-10-26
44,2012-10-26
12,2012-10-26
22,2012-10-26
1,2012-10-26
13,2012-10-26


In [0]:
storesMaxDate.printSchema()

Obteniendo la diferencia entre la máxima fecha global y la máxima fecha de cada tienda:

In [0]:
r = (RFM0
     .join(storesMaxDate, on = 'Store')
     .withColumn('Recency', datediff(to_date(lit(maxDate[0])), col('max(Date)')))
    )

display(r)
#recency.count()

Store,Date,Weekly_Sales,max(Date),Recency
31,2011-06-24,1375962.46,2012-10-26,0
31,2012-08-24,1379783.21,2012-10-26,0
31,2012-07-27,1303732.36,2012-10-26,0
31,2012-09-28,1279080.58,2012-10-26,0
31,2010-05-28,1309437.17,2012-10-26,0
31,2012-10-05,1363365.05,2012-10-26,0
31,2011-12-23,2026176.14,2012-10-26,0
31,2012-08-10,1386472.59,2012-10-26,0
31,2010-04-16,1367448.28,2012-10-26,0
31,2011-09-23,1347607.74,2012-10-26,0


### Frequency

Se obtiene la frecuencia de compras que hubo en el periodo dado. En este caso se observa como todas las tiendas tuvieron ventas todos los días.

In [0]:
f = (RFM0
    .groupby(['Store'])
    .count()
    .withColumn('Frequency',col('count'))
    )

f.display()

Store,count,Frequency
31,143,143
34,143,143
28,143,143
26,143,143
27,143,143
44,143,143
12,143,143
22,143,143
1,143,143
13,143,143


### Monetary

In [0]:
m = (RFM0
    .groupby('Store')
    .agg(sf.sum('Weekly_Sales').alias('Monetary'))
    .withColumn('Monetary', sf.round(col('Monetary'), 2))
    )

m.display()

Store,Monetary
31,199613905.5
34,138249763.0
28,189263680.58
26,143416393.79
27,253855916.88
44,43293087.84
12,144287230.15
22,147075648.57
1,222402808.85
13,286517703.8


### RFM

Juntando los DataFrames r, f, m con las tiendas.

In [0]:
RFM1 = (r.select('Store', 'Recency').distinct()
       .join(f.select('Store', 'Frequency'), on = 'Store')
       .join(m, on = 'Store')
       )

RFM1.display()

Store,Recency,Frequency,Monetary
31,0,143,199613905.5
34,0,143,138249763.0
28,0,143,189263680.58
26,0,143,143416393.79
27,0,143,253855916.88
44,0,143,43293087.84
12,0,143,144287230.15
22,0,143,147075648.57
1,0,143,222402808.85
13,0,143,286517703.8


## Saving data

In [0]:
#%sh
#pwd
#ls 
display(dbutils.fs.ls("FileStore/tables"))

path,name,size
dbfs:/FileStore/tables/Features_data_set-1.csv,Features_data_set-1.csv,600478
dbfs:/FileStore/tables/Features_data_set.csv,Features_data_set.csv,600478
dbfs:/FileStore/tables/sales_data_set-1.csv,sales_data_set-1.csv,13264115
dbfs:/FileStore/tables/sales_data_set.csv,sales_data_set.csv,13264115
dbfs:/FileStore/tables/stores_data_set-1.csv,stores_data_set-1.csv,577
dbfs:/FileStore/tables/stores_data_set.csv,stores_data_set.csv,577


In [0]:
workingDir = "FileStore/tables"
targetPath = f"{workingDir}/RFM"

RFM1.write.mode("OVERWRITE").parquet(targetPath)

In [0]:
display(dbutils.fs.ls(targetPath))

path,name,size
dbfs:/FileStore/tables/RFM/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/RFM/_committed_7903083045350027116,_committed_7903083045350027116,4002
dbfs:/FileStore/tables/RFM/_started_7903083045350027116,_started_7903083045350027116,0
dbfs:/FileStore/tables/RFM/part-00000-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43074-1-c000.snappy.parquet,part-00000-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43074-1-c000.snappy.parquet,572
dbfs:/FileStore/tables/RFM/part-00002-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43053-1-c000.snappy.parquet,part-00002-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43053-1-c000.snappy.parquet,1184
dbfs:/FileStore/tables/RFM/part-00011-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43067-1-c000.snappy.parquet,part-00011-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43067-1-c000.snappy.parquet,1183
dbfs:/FileStore/tables/RFM/part-00014-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43045-1-c000.snappy.parquet,part-00014-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43045-1-c000.snappy.parquet,1183
dbfs:/FileStore/tables/RFM/part-00019-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43042-1-c000.snappy.parquet,part-00019-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43042-1-c000.snappy.parquet,1260
dbfs:/FileStore/tables/RFM/part-00021-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43050-1-c000.snappy.parquet,part-00021-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43050-1-c000.snappy.parquet,1184
dbfs:/FileStore/tables/RFM/part-00024-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43046-1-c000.snappy.parquet,part-00024-tid-7903083045350027116-f21a6edd-d7a5-4227-b95c-ab5fd242fd80-43046-1-c000.snappy.parquet,1184
