# Laboratorio 11 - Data Science
## Proyecto de Consultoría Regresión Logística
- Nelson García Bravatti
- Christian Echeverría


In [17]:
!pip -q install pyspark

In [18]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator


import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("Lab DF — Abandono").getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.5.1


In [19]:
file_path = 'abandono_clientes.csv'

df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(file_path)
)

# Ver las primeras filas del dataframe
df.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

In [20]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [21]:
file_path2 = 'clientes_nuevos.csv'

test_df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(file_path2)
)

# Ver las primeras filas del dataframe
test_df.show(5)

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
|Megan Ferguson|32.0|        6487.5|              0|  9.4|     14.0|2016-10-28 05:32:13|922 Wright Branch...|   Sexton-Golden|
|  Taylor Young|32.0|      13147.71|              1| 10.0|      8.0|2012-03-20 00:36:46|Unit 0789 Box 073...|  

In [22]:
test_df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



### Análisis del Dataset:

In [23]:
# ---------- 1) Dimensión, duplicados ----------
n_rows = df.count()
n_cols = len(df.columns)
print(f"Filas: {n_rows}  |  Columnas: {n_cols}")

dup_company = (df.groupBy("Company").count()
                 .filter(F.col("count") > 1)
                 .count())
print(f"Empresas duplicadas por 'Company': {dup_company}")

Filas: 900  |  Columnas: 10
Empresas duplicadas por 'Company': 23


In [24]:
# ---------- 2) Nulos y tipos ----------
nulls = []
for c in df.columns:
    nulls.append((c, df.filter(F.col(c).isNull()).count()))
spark.createDataFrame(nulls, ["columna", "nulos"]).show(n=100, truncate=False)

# Rangos rápidos (numéricas)
num_cols = ["Age","Total_Purchase","Years","Num_Sites"]
(
  df.select([F.min(c).alias(f"min_{c}") for c in num_cols] +
            [F.max(c).alias(f"max_{c}") for c in num_cols] +
            [F.avg(c).alias(f"avg_{c}") for c in num_cols])
).show(truncate=False)



+---------------+-----+
|columna        |nulos|
+---------------+-----+
|Names          |0    |
|Age            |0    |
|Total_Purchase |0    |
|Account_Manager|0    |
|Years          |0    |
|Num_Sites      |0    |
|Onboard_date   |0    |
|Location       |0    |
|Company        |0    |
|Churn          |0    |
+---------------+-----+

+-------+------------------+---------+-------------+-------+------------------+---------+-------------+-----------------+------------------+----------------+-----------------+
|min_Age|min_Total_Purchase|min_Years|min_Num_Sites|max_Age|max_Total_Purchase|max_Years|max_Num_Sites|avg_Age          |avg_Total_Purchase|avg_Years       |avg_Num_Sites    |
+-------+------------------+---------+-------------+-------+------------------+---------+-------------+-----------------+------------------+----------------+-----------------+
|22.0   |100.0             |1.0      |3.0          |65.0   |18026.01          |9.15     |14.0         |41.81666666666667|10062.82403333

In [25]:
# ---------- 3) Balance de clases ----------
class_balance = (df.groupBy("Churn").count()
                   .withColumn("prop", F.col("count")/n_rows))
class_balance.show()


+-----+-----+-------------------+
|Churn|count|               prop|
+-----+-----+-------------------+
|    1|  150|0.16666666666666666|
|    0|  750| 0.8333333333333334|
+-----+-----+-------------------+



In [26]:
# ---------- 4) Stats por clase ----------
stats_by_churn = (
    df.groupBy("Churn")
      .agg(
          F.count("*").alias("n"),
          *[F.mean(c).alias(f"mean_{c}") for c in num_cols],
          *[F.expr(f"percentile_approx({c}, 0.5)").alias(f"p50_{c}") for c in num_cols],
          *[F.expr(f"percentile_approx({c}, array(0.25,0.75))").alias(f"IQR_{c}") for c in num_cols]
      )
)
stats_by_churn.show(truncate=False)

+-----+---+-----------------+-------------------+------------------+-----------------+-------+------------------+---------+-------------+------------+-------------------+------------+-------------+
|Churn|n  |mean_Age         |mean_Total_Purchase|mean_Years        |mean_Num_Sites   |p50_Age|p50_Total_Purchase|p50_Years|p50_Num_Sites|IQR_Age     |IQR_Total_Purchase |IQR_Years   |IQR_Num_Sites|
+-----+---+-----------------+-------------------+------------------+-----------------+-------+------------------+---------+-------------+------------+-------------------+------------+-------------+
|1    |150|42.99333333333333|10192.179933333337 |5.8835999999999995|10.66            |43.0   |10271.19          |5.79     |11.0         |[38.0, 47.0]|[8563.24, 11758.69]|[5.12, 6.68]|[10.0, 12.0] |
|0    |750|41.58133333333333|10036.952853333332 |5.1510666666666625|8.173333333333334|41.0   |9993.5            |5.08     |8.0          |[37.0, 46.0]|[8475.8, 11764.35] |[4.36, 5.99]|[7.0, 9.0]   |
+-----+---

In [27]:
# ---------- 5) Ingeniería de fecha: Tenure desde Onboard_date ----------
max_date = df.agg(F.max("Onboard_date").alias("maxd")).collect()[0]["maxd"]
df2 = df.withColumn("Tenure_years_from_onboard",
                    (F.datediff(F.lit(max_date), F.col("Onboard_date"))/365.25).cast("double"))

(
  df2.select(
      F.avg(F.abs(F.col("Tenure_years_from_onboard") - F.col("Years"))).alias("MAE_Years_vs_Tenure"),
      F.corr("Tenure_years_from_onboard","Years").alias("corr_Years_vs_Tenure")
  ).show()
)


+-------------------+--------------------+
|MAE_Years_vs_Tenure|corr_Years_vs_Tenure|
+-------------------+--------------------+
| 2.9261441630542295| 0.04491650416398004|
+-------------------+--------------------+



In [28]:
# ---------- 6) Leakage check: Account_Manager  ----------
# Tabla de contingencia y proporciones
ct = (df.groupBy("Account_Manager","Churn").count()
        .withColumn("prop", F.col("count")/F.sum("count").over(Window.partitionBy("Account_Manager"))))
ct.orderBy("Account_Manager","Churn").show()

# Chi-cuadrado
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml import Pipeline

# Preparar vector de features categóricas y label
assembler = VectorAssembler(inputCols=["Account_Manager"], outputCol="features")
chi_df = assembler.setHandleInvalid("keep").transform(df.select("Account_Manager","Churn").dropna())
chi = ChiSquareTest.test(chi_df, "features", "Churn").head()
print(f"Chi2 p-value Account_Manager ~ Churn: {chi.pValues[0]}")


+---------------+-----+-----+-------------------+
|Account_Manager|Churn|count|               prop|
+---------------+-----+-----+-------------------+
|              0|    0|  401| 0.8586723768736617|
|              0|    1|   66|0.14132762312633834|
|              1|    0|  349| 0.8060046189376443|
|              1|    1|   84|0.19399538106235567|
+---------------+-----+-----+-------------------+

Chi2 p-value Account_Manager ~ Churn: 0.03414770918874699


In [29]:
# ---------- 7) Correlaciones numéricas ----------
vec_assembler = VectorAssembler(inputCols=num_cols, outputCol="features_num")
num_vec = vec_assembler.setHandleInvalid("skip").transform(df.select(num_cols).dropna())
corr_mat = Correlation.corr(num_vec, "features_num", "pearson").head()[0]  # DenseMatrix
# Imprimir matriz con nombres
def pretty_corr(names, m: DenseMatrix):
    arr = m.toArray().tolist()
    header = "        " + "  ".join([f"{c:>14}" for c in names])
    print(header)
    for i, row in enumerate(arr):
        print(f"{names[i]:>8}  " + "  ".join([f"{v:14.3f}" for v in row]))
pretty_corr(num_cols, corr_mat)


                   Age  Total_Purchase           Years       Num_Sites
     Age           1.000          -0.037           0.006          -0.006
Total_Purchase          -0.037           1.000          -0.006          -0.003
   Years           0.006          -0.006           1.000           0.052
Num_Sites          -0.006          -0.003           0.052           1.000


In [30]:
# ---------- 8) Cardinalidad de categorías ----------
(
  df.agg(
    F.countDistinct("Names").alias("uniq_Names"),
    F.countDistinct("Location").alias("uniq_Location"),
    F.countDistinct("Company").alias("uniq_Company")
  ).show()
)

# Top ubicaciones (si aporta algo regional)
(df.groupBy("Location")
   .agg(F.count("*").alias("n"), F.mean("Churn").alias("churn_rate"))
   .orderBy(F.desc("n"))
   .show(20, truncate=False))



+----------+-------------+------------+
|uniq_Names|uniq_Location|uniq_Company|
+----------+-------------+------------+
|       899|          900|         873|
+----------+-------------+------------+

+---------------------------------------------------------------+---+----------+
|Location                                                       |n  |churn_rate|
+---------------------------------------------------------------+---+----------+
|062 Trevor Falls Suite 665 North Mathewchester, MH 93744       |1  |1.0       |
|066 Jenkins Walks Barbaramouth, LA 76409                       |1  |0.0       |
|45946 Day Springs Mendozastad, NJ 46404                        |1  |1.0       |
|143 Andrea Flat Lake Michael, ID 33149                         |1  |0.0       |
|Unit 2093 Box 1530 DPO AA 53596-7800                           |1  |0.0       |
|399 Herbert Key Port Thomas, PR 14265                          |1  |0.0       |
|104 Ruben Rapid Apt. 107 New Andrea, FM 58602                  |1  |0

### Division del dataset y preparación:


In [31]:
# ancla temporal = fecha máxima observada (consistente con tu EDA)
anchor_date = df.agg(F.max("Onboard_date").alias("maxd")).collect()[0]["maxd"]

# Ingeniería de variables
df_feat = (
    df
    .withColumn(
        "Tenure_from_onboard",
        (F.datediff(F.lit(anchor_date), F.col("Onboard_date"))/F.lit(365.25)).cast("double")
    )
    .withColumn("Sites_per_Year", (F.col("Num_Sites")/(F.col("Years")+F.lit(1e-6))).cast("double"))
    .withColumn("Purchase_per_Site", (F.col("Total_Purchase")/(F.col("Num_Sites")+F.lit(1e-6))).cast("double"))
)

# Columnas finales por variante
features_base = [
    "Age", "Total_Purchase", "Years", "Num_Sites",
    "Tenure_from_onboard", "Sites_per_Year", "Purchase_per_Site"
]
features_with_am = features_base + ["Account_Manager"]


In [32]:
# Añadir columna aleatoria por clase para split estratificado
pos = df_feat.filter(F.col("Churn") == 1).withColumn("rand", F.rand(seed=42))
neg = df_feat.filter(F.col("Churn") == 0).withColumn("rand", F.rand(seed=42))

train = pos.filter("rand <= 0.7").unionByName(neg.filter("rand <= 0.7")).drop("rand")
test  = pos.filter("rand > 0.7").unionByName(neg.filter("rand > 0.7")).drop("rand")

print("Train count:", train.count(), "| Test count:", test.count())
train.groupBy("Churn").count().show()
test.groupBy("Churn").count().show()


Train count: 659 | Test count: 241
+-----+-----+
|Churn|count|
+-----+-----+
|    1|  104|
|    0|  555|
+-----+-----+

+-----+-----+
|Churn|count|
+-----+-----+
|    1|   46|
|    0|  195|
+-----+-----+



In [33]:
# Pesos de clase: mayor peso a la clase positiva (churn=1)
n_train = train.count()
n_pos = train.filter(F.col("Churn") == 1).count()
n_neg = n_train - n_pos
pos_weight = n_neg / float(n_pos)  # ~ proporción negativas/positivas

train_w = train.withColumn(
    "classWeightCol",
    F.when(F.col("Churn") == 1, F.lit(pos_weight)).otherwise(F.lit(1.0))
)

# Assembler por variante
assembler_A = VectorAssembler(inputCols=features_base, outputCol="features", handleInvalid="skip")
assembler_B = VectorAssembler(inputCols=features_with_am, outputCol="features", handleInvalid="skip")


### Modelo Predictivo

In [34]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="Churn",
    weightCol="classWeightCol",  # usa pesos
    predictionCol="prediction",
    probabilityCol="probability",
    rawPredictionCol="rawPrediction",
    maxIter=200
)

paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.01, 0.1, 0.5])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # Ridge/Lasso/Mix
    .build()
)

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="Churn",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"
)

pipeline = Pipeline(stages=[assembler_A, lr])

tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_pr,
    trainRatio=0.8,
    parallelism=2
)

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="Churn",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

model = tvs.fit(train_w)
preds = model.transform(test)

auc_pr  = evaluator_pr.evaluate(preds)
auc_roc = evaluator_roc.evaluate(preds)
print(f"[LR - CON Account_Manager] PR-AUC: {auc_pr:.4f} | ROC-AUC: {auc_roc:.4f}")


[LR - CON Account_Manager] PR-AUC: 0.7991 | ROC-AUC: 0.9172


In [36]:
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array  # Disponible en Spark 3.0+

# Crea una columna con p(churn=1)
preds_fix = preds.withColumn("prob1", vector_to_array("probability").getItem(1))

def metrics_at_threshold(pred_df, thr, prob_col="prob1", label_col="Churn"):
    scored = pred_df.withColumn("yhat", (F.col(prob_col) >= F.lit(thr)).cast("int"))
    agg = (scored
           .groupBy()
           .agg(
               F.sum(F.when((F.col(label_col)==1) & (F.col("yhat")==1), 1).otherwise(0)).alias("TP"),
               F.sum(F.when((F.col(label_col)==0) & (F.col("yhat")==1), 1).otherwise(0)).alias("FP"),
               F.sum(F.when((F.col(label_col)==0) & (F.col("yhat")==0), 1).otherwise(0)).alias("TN"),
               F.sum(F.when((F.col(label_col)==1) & (F.col("yhat")==0), 1).otherwise(0)).alias("FN"),
           )).collect()[0]
    TP, FP, TN, FN = agg["TP"], agg["FP"], agg["TN"], agg["FN"]
    precision = TP / float(TP+FP) if (TP+FP)>0 else 0.0
    recall    = TP / float(TP+FN) if (TP+FN)>0 else 0.0
    f1        = (2*precision*recall)/float(precision+recall) if (precision+recall)>0 else 0.0
    return TP, FP, TN, FN, precision, recall, f1

thresholds = [x/100.0 for x in range(10, 91, 5)]
rows = [ (t, *metrics_at_threshold(preds_fix, t)) for t in thresholds ]

schema = ["threshold","TP","FP","TN","FN","precision","recall","F1"]
spark.createDataFrame(rows, schema).orderBy(F.desc("F1")).show(truncate=False)

best_row = sorted(rows, key=lambda x: x[-1], reverse=True)[0]
best_thr = best_row[0]
print(f"Mejor umbral por F1: {best_thr:.2f}")


+---------+---+---+---+---+-------------------+--------------------+--------------------+
|threshold|TP |FP |TN |FN |precision          |recall              |F1                  |
+---------+---+---+---+---+-------------------+--------------------+--------------------+
|0.55     |39 |24 |171|7  |0.6190476190476191 |0.8478260869565217  |0.7155963302752294  |
|0.65     |30 |8  |187|16 |0.7894736842105263 |0.6521739130434783  |0.7142857142857143  |
|0.6      |33 |18 |177|13 |0.6470588235294118 |0.717391304347826   |0.6804123711340206  |
|0.5      |40 |36 |159|6  |0.5263157894736842 |0.8695652173913043  |0.6557377049180327  |
|0.45     |41 |48 |147|5  |0.4606741573033708 |0.8913043478260869  |0.6074074074074074  |
|0.7      |20 |3  |192|26 |0.8695652173913043 |0.43478260869565216 |0.5797101449275363  |
|0.4      |43 |68 |127|3  |0.38738738738738737|0.9347826086956522  |0.5477707006369428  |
|0.75     |16 |1  |194|30 |0.9411764705882353 |0.34782608695652173 |0.5079365079365079  |
|0.35     