In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.parquet('./data/processed')
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)
 |-- label: integer (nullable = true)



In [3]:
train_data, test_data = df.randomSplit([0.8, 0.2], 42)

In [4]:
train_data.count()

5690

In [5]:
test_data.count()

1342

In [6]:
df.count()

7032

In [9]:
ignore_cols = ['customerID', 'Churn', 'label']
cat_col = [ nombre for (nombre, tipo) in df.dtypes if tipo == 'string'
          and nombre not in ignore_cols]
cat_col

['gender',
 'Partner',
 'Dependents',
 'PhoneService',
 'MultipleLines',
 'InternetService',
 'OnlineSecurity',
 'OnlineBackup',
 'DeviceProtection',
 'TechSupport',
 'StreamingTV',
 'StreamingMovies',
 'Contract',
 'PaperlessBilling',
 'PaymentMethod']

In [10]:
num_col = [ nombre for (nombre, tipo) in df.dtypes if nombre not in cat_col
          and nombre not in ignore_cols ]
num_col

['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges']

In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

cat_col_indx = [nombre + '_indx' for nombre in cat_col]
indexer = StringIndexer(inputCols = cat_col,  
                        outputCols = cat_col_indx, handleInvalid = 'keep')


cat_col_vect = [nombre + '_vect' for nombre in cat_col]
encoder = OneHotEncoder(inputCols = cat_col_indx, outputCols = cat_col_vect)


assembler_inputs = num_col +  cat_col_vect
assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = 'features')

In [14]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', seed = 42)

In [15]:
stages = [indexer, encoder, assembler, rfc]

In [16]:
len(stages)

4

In [17]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)

In [34]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pgb = ParamGridBuilder().addGrid(rfc.numTrees, [20, 50]).addGrid(rfc.maxDepth, [5, 10]).build()

In [35]:
cv = CrossValidator(estimator = pipeline,
                    estimatorParamMaps = pgb,
                    evaluator = BinaryClassificationEvaluator(),
                    numFolds = 3)

In [38]:
cv_model = cv.fit(train_data)
print("Entrenamiento finalizado. Ya tenemos al campeón.")

Entrenamiento finalizado. Ya tenemos al campeón.


In [39]:
predictions = cv_model.transform(test_data)

In [40]:
bce = BinaryClassificationEvaluator()
eva = bce.evaluate(predictions)
eva

0.8492874951394128

In [41]:
print(cv_model.bestModel.stages[-1])

RandomForestClassificationModel: uid=RandomForestClassifier_c5fa19113638, numTrees=50, numClasses=2, numFeatures=45


In [44]:
rf_model = cv_model.bestModel.stages[-1]
print(rf_model.featureImportances)

(45,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44],[0.0018462831581923292,0.12913482793560843,0.020787205442573942,0.08312072939584975,0.0007035575864503235,0.0003400934770878377,0.0008367439900958976,0.0024326008857862985,0.0026486604189458387,0.0011714062813353025,0.001025417124242885,0.0003299534992235085,0.00039615249987600375,0.00202114043226492,0.00034639053889378037,0.07815247759417483,0.009831256458606396,0.009991499925083561,0.13049758117204743,0.013472736738289685,0.021910078800060847,0.0022326007705488087,0.007821813699500704,0.023833134271625006,0.004951500208114916,0.0027504083504714433,0.07113733163417048,0.018077925769645984,0.007417977806085347,0.00032629202779822184,0.0012080199968735597,0.005637601374738685,0.0010878052938786012,0.0005030243329163977,0.007543845324744423,0.18582091195412592,0.07460383994086126,0.020001259053851243,0.0017248482287393887,0.0032653513083047148,0.0467547405135215

In [45]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    
    # Extraemos la metadata de la columna features
    meta = dataset.schema[featuresCol].metadata.get('ml_attr')
    if meta is None:
        print("No hay metadatos. Asegúrate de usar el DataFrame transformado.")
        return
        
    # Los metadatos pueden estar divididos en tipos (numéricos, binarios, nominales)
    attrs = meta.get('attrs')
    
    # Recorremos todos los atributos para reconstruir la lista completa ordenada por índice
    for attr_type, attr_list in attrs.items():
        for attr in attr_list:
            # Guardamos (Nombre, Score)
            list_extract.append((attr['name'], featureImp[attr['idx']]))
    
    # Ordenamos por importancia descendente (de mayor a menor)
    list_extract.sort(key=lambda x: x[1], reverse=True)
    
    # Imprimimos el Top 10
    print("=== TOP 10 FACTORES DE FUGA ===")
    for i, (name, score) in enumerate(list_extract[:10]):
        print(f"{i+1}. {name}: {score:.4f}")

# Ejecutar la función usando tu modelo y tus predicciones
# rf_model: El objeto Random Forest que sacaste antes (bestModel.stages[-1])
# predictions: El dataframe resultante del transform
ExtractFeatureImp(rf_model.featureImportances, predictions, "features")

=== TOP 10 FACTORES DE FUGA ===
1. Contract_vect_Month-to-month: 0.1858
2. OnlineSecurity_vect_No: 0.1305
3. tenure: 0.1291
4. TotalCharges: 0.0831
5. InternetService_vect_Fiber optic: 0.0782
6. Contract_vect_Two year: 0.0746
7. TechSupport_vect_No: 0.0711
8. PaymentMethod_vect_Electronic check: 0.0468
9. DeviceProtection_vect_No: 0.0238
10. OnlineBackup_vect_No: 0.0219


In [46]:
final_pipeline = cv_model.bestModel
final_pipeline.save("./models/random_forest_pipeline_v1")

In [47]:
del final_pipeline

In [51]:
from pyspark.ml import PipelineModel

pipe_loaded = PipelineModel.load("./models/random_forest_pipeline_v1")
pred = pipe_loaded.transform(test_data)

In [57]:
pred.select("customerID", "label", "prediction", "probability").show(5)

+----------+-----+----------+--------------------+
|customerID|label|prediction|         probability|
+----------+-----+----------+--------------------+
|0004-TLHLJ|    1|       1.0|[0.37896984995951...|
|0013-SMEOE|    0|       0.0|[0.89846168055417...|
|0015-UOCOJ|    0|       0.0|[0.65548167712968...|
|0019-EFAEP|    0|       0.0|[0.88076182028689...|
|0023-HGHWL|    1|       1.0|[0.42614515977655...|
+----------+-----+----------+--------------------+
only showing top 5 rows

