<h1>
  Predicción de Churn de Clientes con MLlib de PySpark
</h1>

Una agencia de marketing ofrece a muchas compañías un servicio para producir anuncios en páginas webs. La agencia ha detectado que muchas de estas compañías acaban dándose de baja del servicio, por lo que interesa mejorar su índice de retención.

Para ello, el personal de la agencia ha pensado en aplicar algoritmos de Machine Learning que predigan los clientes que acabarán prescindiendo de sus servicios. De esta forma, podrán identificar cuáles son las compañías a las que les deberían asignar un ejecutivo de cuentas (Account Manager) para evitar que se marchen.



In [1]:
from pyspark.sql import SparkSession
import os

# Elegir el máster de Spark dependiendo de si se ha definido la variable de entorno HADOOP_CONF_DIR o YARN_CONF_DIR
SPARK_MASTER: str = 'yarn' if 'HADOOP_CONF_DIR' in os.environ or 'YARN_CONF_DIR' in os.environ else 'local[*]'

spark = SparkSession.builder \
    .appName('MLlibNaïveBayesProject') \
    .master(SPARK_MASTER) \
    .getOrCreate()

In [2]:
data = spark.read.csv('customer_churn.csv', header = True, inferSchema=True)

In [3]:
# Forzamos la redistribución del DataFrame en más particiones
data = data.repartition(2)
print("Número de particiones:", data.rdd.getNumPartitions())

Número de particiones: 2


En este proyecto usaremos modelos supervisados. Como nuestro DataFrame estará dividido en varias particiones necesitamos barajar los registros. Así evitaremos que todos los registros de una clase acaben en una única partición.

In [4]:
import pyspark.sql.functions as F

# Equivalente a una función shuffle()
data = data.orderBy(F.rand())

In [5]:
data.summary()

DataFrame[summary: string, Names: string, Age: string, Total_Purchase: string, Account_Manager: string, Years: string, Num_Sites: string, Location: string, Company: string, Churn: string]

## Información de las columnas

**Name**: Nombre del último contacto de la compañía

**Age**: Edad del cliente

**Total_Purchase**: Número de anuncios comprados

**Account_Manager**: 0 sin manager, 1 con manager asignado

**Years**: Número de años como cliente

**Num_sites**: Número de sitios web usando el servicio

**Onboard_date**: Fecha en la que el contacto más reciente de una empresa fue registrado en el sistema.

**Location**: Dirección de la sede central de la compañía.

**Company**: Nombre de la compañía.

**Churn**: Si el cliente se da de baja del servicio





In [6]:
# Imprimimos el esquema del dataframe
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [7]:
data.take(1)

[Row(Names='Tammy Pope', Age=45.0, Total_Purchase=9576.07, Account_Manager=1, Years=7.01, Num_Sites=8.0, Onboard_date=datetime.datetime(2007, 6, 7, 6, 17, 8), Location='9490 Kathryn Tunnel Suite 480 Shaneland, GA 26165-9694', Company='Hays, Henson and Shelton', Churn=0)]

In [8]:
from pyspark.sql.functions import col

# Comprobamos cuantos clientes han abandonado y cuantos han seguido
data.groupBy(col('Churn')).count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



**¡IMPORTANTE!**

El dataset está desbalanceado. Será necesario aplicar algún tipo de estratificación para compensar la falta de representación de clientes que abandonan el servicio.

Los dataframe tienen implementada una función RandomSplit para separar el conjunto de datos de entrenamiento del de test. Sin embargo, no asegura que exista la misma representación de etiquetas en ambos conjuntos. Por ello, vamos a implementar nuestra propia función que segregue los datos.

In [9]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def stratified_split(df, label_col, train_frac=0.8, seed=42):
    """
    Realiza una partición estratificada en conjuntos de entrenamiento y prueba.

    Argumentos:
        df (DataFrame): El DataFrame original.
        label_col (str): Nombre de la columna de clase (por ejemplo, "Churn").
        train_frac (float): Fracción de datos que irá al conjunto de entrenamiento.
        seed (int): Semilla para la aleatoriedad.

    Devuelve:
        Tuple[DataFrame, DataFrame]: (train_df, test_df)
    """

    # Obtenemos los valores únicos de la clase (por ejemplo 0 y 1)
    labels = [row[label_col] for row in df.select(label_col).distinct().collect()]

    # Listas donde iremos guardando los datos segregados
    train_parts = []
    test_parts = []

    for label in labels:
        # Filtramos por etiqueta
        subset = df.filter(col(label_col) == label)

        # Del conjunto del dataframe filtrado muestreamos una proporción -> conjunto train
        train_subset = subset.sample(withReplacement=False, fraction=train_frac, seed=seed)

        # El resto del conjunto filtrado va para test
        test_subset = subset.subtract(train_subset)

        train_parts.append(train_subset)
        test_parts.append(test_subset)

    # Unimos los subconjuntos estratificados
    train_df = train_parts[0].unionByName(train_parts[1])
    test_df = test_parts[0].unionByName(test_parts[1])

    return train_df, test_df


## Transformación de la columna `Onboard_date`

La columna almacena la fecha en la que se registró el contacto. Sería más interesante calcular el tiempo que ha pasado desde entonces, porque podríamos establecer una relación entre esta cantidad de tiempo con el hecho de si el cliente abandona el servicio.

In [10]:
data.select('Onboard_date').show()

+-------------------+
|       Onboard_date|
+-------------------+
|2015-09-04 11:32:49|
|2009-09-21 12:50:42|
|2014-10-11 14:08:10|
|2016-07-29 08:39:49|
|2013-02-14 03:05:33|
|2011-09-01 16:47:27|
|2014-06-20 05:10:09|
|2009-04-13 22:54:21|
|2006-11-15 14:58:19|
|2007-11-25 17:41:30|
|2016-03-05 13:19:04|
|2012-03-26 06:23:51|
|2010-04-23 00:50:01|
|2010-04-21 15:26:30|
|2008-03-25 15:58:18|
|2006-08-15 20:48:58|
|2014-02-17 08:26:10|
|2012-08-02 14:55:41|
|2011-05-07 04:27:00|
|2010-11-05 09:22:13|
+-------------------+
only showing top 20 rows



In [11]:
import datetime

colOnboard = col('Onboard_date')

today_date = datetime.datetime.today()
difDayfn = F.udf(lambda x: (today_date-x).days)
data = data.withColumn('Days_Since_Last_Contact', difDayfn(colOnboard).cast('int'))
data.select('Days_Since_Last_Contact').printSchema()

root
 |-- Days_Since_Last_Contact: integer (nullable = true)



## Preprocesamiento de datos
A nuestro algoritmo Naïve Bayes de la librería MLlib solo le podemos aportar los predictores a través de una única columna (no 6 columnas). Es por ello que usaremos el transformador VectorAssembler, el cual reunirá la información de las 5 columnas en una columna que almacena vectores.

Las variables que usaremos para predecir si un cliente prescinde de los servicios de la agencia serán `Age`, `Total_Purchase`, `Account_Manager`, `Years` y `Num_Sites`. Todas son numéricas, a excepción de `Account_Manager`. Aunque `Account_Manager` no sigue una distribución normal si no Bernoulli, voy a probar experimentalmente si incluirla en nuestra clasificador Naïve Bayes Gaussiano mejora los resultados de la clasificación.

In [12]:
# Renombramos la variable Churn como label (la etiqueta que queremos predecir)
data = data.withColumnRenamed("Churn", "label")

In [13]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['Age', 'Total_Purchase', 'Years', 'Num_Sites', 'Days_Since_Last_Contact']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
data_transformed = assembler.transform(data)

In [14]:
data_transformed.select('features_vec').show(truncate=False)

+--------------------------------+
|features_vec                    |
+--------------------------------+
|[34.0,9779.12,5.89,9.0,5675.0]  |
|[54.0,8092.55,4.55,10.0,4369.0] |
|[34.0,7324.32,6.69,10.0,3774.0] |
|[46.0,14664.0,6.54,8.0,4581.0]  |
|[51.0,6114.85,4.7,9.0,4175.0]   |
|[45.0,9034.21,6.89,9.0,5049.0]  |
|[41.0,9552.57,3.81,8.0,3723.0]  |
|[50.0,13247.31,3.48,6.0,5408.0] |
|[48.0,10963.5,5.89,9.0,6249.0]  |
|[45.0,9951.97,4.4,6.0,3667.0]   |
|[51.0,12780.22,6.06,7.0,3665.0] |
|[46.0,13725.55,5.09,9.0,6983.0] |
|[44.0,12155.91,4.11,5.0,5738.0] |
|[36.0,8972.54,5.23,8.0,5783.0]  |
|[48.0,11316.41,4.74,10.0,6115.0]|
|[35.0,10801.37,5.64,11.0,3786.0]|
|[47.0,7222.35,6.41,11.0,3581.0] |
|[32.0,10716.75,5.12,8.0,6775.0] |
|[45.0,11486.53,5.5,7.0,5608.0]  |
|[54.0,13184.51,6.04,8.0,4409.0] |
+--------------------------------+
only showing top 20 rows



In [15]:
data_transformed.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- Days_Since_Last_Contact: integer (nullable = true)
 |-- features_vec: vector (nullable = true)



## Segregación de datos

In [16]:
train_data, test_data = stratified_split(data_transformed, label_col='label', seed=42)

## Creación y entrenamiento del modelo

In [17]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(featuresCol='features_vec', labelCol='label', modelType="gaussian")
model = nb.fit(train_data)


## Evaluación del modelo

In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


Accuracy: 0.9172413793103448


### Ahora probamos incluyendo `Account_Manager`

In [19]:
# Incluimos la variable binaria Account_Manager
feature_cols = ['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites', 'Days_Since_Last_Contact']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")

data_transformed = assembler.transform(data)
data_transformed.select('features_vec').show(truncate=False)

# Segregamos los datos en un conjunto de entrenamiento y test
train_data, test_data = stratified_split(data_transformed, label_col='label', seed=42)

# Entrenamos el modelo Naïve Bayes
nb = NaiveBayes(featuresCol='features_vec', labelCol='label', modelType="gaussian")
model2 = nb.fit(train_data)

# Realizamos las predicciones
predictions = model2.transform(test_data)

# Evaluamos el modelo
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")



+------------------------------------+
|features_vec                        |
+------------------------------------+
|[45.0,8595.24,0.0,6.4,10.0,4542.0]  |
|[32.0,9472.72,1.0,1.0,10.0,5865.0]  |
|[41.0,7777.37,0.0,4.81,12.0,4104.0] |
|[45.0,7351.38,0.0,5.76,11.0,3113.0] |
|[42.0,9635.87,1.0,2.43,9.0,4145.0]  |
|[39.0,13532.85,1.0,5.63,9.0,5268.0] |
|[47.0,13359.2,1.0,4.55,8.0,3183.0]  |
|[45.0,11170.06,1.0,7.76,10.0,5193.0]|
|[48.0,10241.32,1.0,7.25,7.0,3822.0] |
|[44.0,9723.71,0.0,4.34,9.0,4673.0]  |
|[50.0,10850.78,1.0,5.63,9.0,5416.0] |
|[32.0,9036.27,0.0,7.14,11.0,5405.0] |
|[43.0,6065.64,1.0,4.86,7.0,5194.0]  |
|[49.0,6283.67,0.0,6.35,11.0,6674.0] |
|[39.0,10741.9,0.0,5.48,8.0,4421.0]  |
|[34.0,8772.26,0.0,5.78,9.0,4079.0]  |
|[44.0,11331.58,1.0,5.23,11.0,3054.0]|
|[48.0,9722.23,1.0,5.98,6.0,4133.0]  |
|[47.0,7364.12,1.0,3.65,9.0,4849.0]  |
|[55.0,8907.17,1.0,7.6,11.0,3386.0]  |
+------------------------------------+
only showing top 20 rows

Accuracy: 0.9103448275862069


Finalmente, añadir la variable binaria no ha supuesto ninguna mejora de la calidad del modelo. Lo cierto es que usar un modelo Naïve Bayes suele funcionar bien cuando los tipos de datos son homogéneos. En su lugar, podríamos optar por usar un modelo de regresión logística, el cual sí tolera esta hetoregeneidad en los datos.

# Uso del modelo de regresión logística

Esta vez plantearemos el modelo a partir de un pipeline. Los stages estarán formados por el VectorAssembler, el StandardScaler y el modelo de regresión logística.

In [20]:
# Escalamos los datos
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features_vec", outputCol="features_scaled")

In [21]:
# Creamos el modelo de regresión logística
from pyspark.ml.classification import LogisticRegression
logReg = LogisticRegression(featuresCol="features_scaled", labelCol="label")


In [22]:
# Creamos el pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[scaler,logReg])

In [23]:
model = pipeline.fit(train_data)

# Realizamos las predicciones
predictions = model.transform(test_data)

# Evaluamos el modelo
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.9103448275862069


**Conclusión**

El modelo Gaussiano de Naïve Bayes nos ha aportado un mayor rendimiento que el modelo de regresión logística. Esto podría deberser al gran número de variables numéricas con las que contaba nuestro dataset, aunque algunas fueran discretas y binarias.