# Resumen de la actividad

Una empresa de e-commerce, en nuestro rol de Data Scientist, nos ha encargado diseñar e implementar una solución predictiva de Machine Learning escalable usando Apache Spark MLib. En particular, se nos ha solicitado desarrollar todo el proceso desde la preparación de los datos hasta la evaluación del modelo.

# Dataset

Para esta actividad se usará el dataset disponible en [este link](https://www.kaggle.com/datasets/adilshamim8/online?resource=download). Este recopila información sobre la intención de compra de diversos usuarios, obtenido de las métricas de sesión web de Google Analytics.

En nuestro caso particular, la variable objetivo es "Revenue", que indica si un usuario realizó o no una compra en la página.

# Desarrollo

Comenzamos importando las librerías necesarias.

In [1]:
import findspark
findspark.init()

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    BooleanType,
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType
)
from pyspark.sql.functions import isnan, isnull, when, count, col, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
spark = SparkSession.builder.appName("RevenueEstimator").config("spark.driver.bindAddress", "localhost").master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

Ahora cargamos el archivo como un dataframe, especificando el esquema de nuestro conjunto de datos.

In [4]:
path = os.path.join(".", "data", "online_shoppers_intention.csv")
schema = StructType([
    StructField('Administrative', IntegerType(), True),
    StructField('Administrative_Duration', FloatType(), True),
    StructField('Informational', IntegerType(), True),
    StructField('Informational_Duration', FloatType(), True),
    StructField('ProductRelated', IntegerType(), True),
    StructField('ProductRelated_Duration', FloatType(), True),
    StructField('BounceRates', FloatType(), True),
    StructField('ExitRates', FloatType(), True),
    StructField('PageValues', FloatType(), True),
    StructField('SpecialDay', FloatType(), True),
    StructField('Month', StringType(), True),
    StructField('OperatingSystems', IntegerType(), True),
    StructField('Browser', IntegerType(), True),
    StructField('Region', IntegerType(), True),
    StructField('TrafficType', IntegerType(), True),
    StructField('VisitorType', StringType(), True),
    StructField('Weekend', BooleanType(), True),
    StructField('Revenue', StringType(), True),
])
df = spark.read.csv(path, header=True, schema=schema)
df = df.replace(["True", "False"], ["1", "0"], subset=["Revenue"]).withColumn("Revenue", col("Revenue").cast("float"))

In [5]:
(df.count(), len(df.columns))

(12330, 18)

In [6]:
df

Administrative,Administrative_Duration,Informational,Informational_Duration,ProductRelated,ProductRelated_Duration,BounceRates,ExitRates,PageValues,SpecialDay,Month,OperatingSystems,Browser,Region,TrafficType,VisitorType,Weekend,Revenue
0,0.0,0,0.0,6,1156.5,0.0,0.03333333,0.0,0.0,Nov,2,2,1,20,Returning_Visitor,False,0.0
4,52.0,1,7.0,46,3087.0,0.003773585,0.021383649,16.946438,0.0,Mar,2,2,3,8,Returning_Visitor,False,1.0
4,106.5,0,0.0,12,806.25,0.0125,0.029166667,0.0,0.0,Dec,2,2,1,2,Returning_Visitor,False,0.0
9,497.16666,0,0.0,20,1170.1666,0.012121212,0.023484848,0.0,0.0,Nov,3,2,3,2,Returning_Visitor,False,0.0
0,0.0,0,0.0,3,17.0,0.0,0.03333333,0.0,0.0,Mar,1,1,1,3,Returning_Visitor,False,0.0
0,0.0,0,0.0,12,363.0,0.016666668,0.047222223,0.0,0.0,Nov,4,1,4,2,Returning_Visitor,False,0.0
1,11.5,1,94.0,98,6212.7793,0.024162985,0.033991575,0.0,0.0,Nov,3,2,4,10,Returning_Visitor,True,0.0
9,174.20833,2,383.75,151,9018.124,0.012820513,0.02459296,0.0,0.0,Nov,1,2,1,1,Returning_Visitor,True,0.0
0,0.0,0,0.0,9,165.0,0.0,0.022222223,0.0,1.0,May,2,2,4,4,Returning_Visitor,True,0.0
0,0.0,0,0.0,6,118.333336,0.03333333,0.044444446,0.0,0.0,Nov,3,2,1,3,Returning_Visitor,True,0.0


In [7]:
df.select([
    count(
        when(isnan(c) | isnull(c), c)
    ).alias(c) for (c,c_type) in df.dtypes if c_type not in ('timestamp', 'string', 'date', 'boolean', 'binary')
])

Administrative,Administrative_Duration,Informational,Informational_Duration,ProductRelated,ProductRelated_Duration,BounceRates,ExitRates,PageValues,SpecialDay,OperatingSystems,Browser,Region,TrafficType,Revenue
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Continuamos definiendo el pipeline que usaremos para preprocesar nuestro conjunto de datos y alimentar nuestro modelo de clasificación. En este caso, usaremos un modelo de Random Forest.

Dado que tenemos un par de columnas de tipo string, las convertiremos en columnas categóricas usando un StringIndexer, y luego las convertiremos en variables numéricas usando OneHot Encoder.

Cabe señalar que, como los modelos basados en árboles de decisión no son sensibles a la escala de los datos, en nuestro caso no será necesario ejecutar un reescalamiento, por lo que este será el único paso de preprocesamiento necesario.

Además, como no hay valores faltantes en nuestro DataFrame, tampoco es necesario imputar los datos.

In [8]:
month_indexer = StringIndexer(inputCol='Month', outputCol='IndexedMonth')
visitor_type_indexer = StringIndexer(inputCol='VisitorType', outputCol='IndexedVisitorType')

month_ohe = OneHotEncoder(inputCol=month_indexer.getOutputCol(), outputCol='MonthOneHot')
visitor_type_ohe = OneHotEncoder(inputCol=visitor_type_indexer.getOutputCol(), outputCol='VisitorTypeOneHot')


input_cols = df.columns[:-1]
input_cols.remove("Month")
input_cols.remove("VisitorType")
assembler = VectorAssembler(
    inputCols=input_cols,
    outputCol="features"
)

rf = RandomForestClassifier(featuresCol="features", labelCol='Revenue')
pipeline = Pipeline(
    stages=[
        month_indexer,
        visitor_type_indexer,
        month_ohe,
        visitor_type_ohe,
        assembler,
        rf
    ]
)

param_grid = ParamGridBuilder().baseOn(
    {rf.seed: 0}
).addGrid(
    rf.maxDepth, [10, 15, 20]
).addGrid(
    rf.impurity, ['gini', 'entropy']
).addGrid(
    rf.numTrees, [20, 40]
).build()

evaluator = BinaryClassificationEvaluator(labelCol="Revenue", metricName='areaUnderROC')

validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=os.cpu_count() if os.cpu_count() is not None else 1
)

Ahora procedemos a separar el dataframe en un conjunto de entrenamiento y prueba, refinar los hiperparámetros del modelo, entrenarlo y evaluarlo. Como nuestro conjunto de datos es relativamente grande (alrededor de 12.000 observaciones) lo dividiremos en una proporción 60%-40%.

In [9]:
train_data = df.sampleBy("Revenue", fractions={0.0: 0.6, 1.0: 0.6}, seed=10)
test_data = df.subtract(train_data)   

validator_model = validator.fit(train_data)

In [10]:
evaluator.evaluate(validator_model.transform(train_data))

0.9668735679963163

In [11]:
evaluator.evaluate(validator_model.transform(test_data))

0.9154664692193775

Los resultados obtenidos son prometedores, con un valor bastante alto para la métrica ROC-AUC. A pesar de ello, podemos notar que el modelo se encuentra un poco sobreajustado, dado que el desempeño en el conjunto de entrenamiento es significativamente mejor que en el conjunto de prueba.

Dado lo anterior, se recomienda analizar las variables utilizadas para eliminar cualquier caso de multicolinearidad, además de optimizar los hiperparámetros del modelo sobre una colección mayor de parámetros, o usar spark en combinación con un optimizador más eficiente, como [optuna](https://optuna.org/).