In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.functions import col, sum, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [4]:
spark = SparkSession.builder \
    .appName("Spark SQL - Trabajo") \
    .getOrCreate()

In [12]:
ruta_parquet = "gs://mds-grupo06-gcs/historico_creditos_peru_csv_50m_20/parquet/"

In [16]:
df = spark.read.parquet(ruta_parquet)

                                                                                

In [None]:
### PreProcessing

In [None]:
df.printSchema()

In [19]:
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()



+----------+--------+----+---------------+-------------+--------------+---------+---------------+--------+--------------+
|cliente_id|distrito|edad|fecha_solicitud|monto_credito|nivel_ingresos|ocupacion|pagos_atrasados|anio_mes|estado_credito|
+----------+--------+----+---------------+-------------+--------------+---------+---------------+--------+--------------+
|         0|       0| 199|            199|          199|             0|        0|            199|     199|             0|
+----------+--------+----+---------------+-------------+--------------+---------+---------------+--------+--------------+



                                                                                

In [20]:
df = df.dropna(subset=['edad', 'fecha_solicitud', 'monto_credito', 'pagos_atrasados','anio_mes'])

In [21]:
df.printSchema()

root
 |-- cliente_id: string (nullable = true)
 |-- distrito: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- fecha_solicitud: date (nullable = true)
 |-- monto_credito: integer (nullable = true)
 |-- nivel_ingresos: string (nullable = true)
 |-- ocupacion: string (nullable = true)
 |-- pagos_atrasados: integer (nullable = true)
 |-- anio_mes: string (nullable = true)
 |-- estado_credito: string (nullable = true)



In [25]:
df.show(20)

+--------------------+--------+----+---------------+-------------+--------------+-------------+---------------+--------+--------------+------+
|          cliente_id|distrito|edad|fecha_solicitud|monto_credito|nivel_ingresos|    ocupacion|pagos_atrasados|anio_mes|estado_credito|target|
+--------------------+--------+----+---------------+-------------+--------------+-------------+---------------+--------+--------------+------+
|062e1ccd-911d-4d3...|Arequipa|  46|     2021-12-28|         6024|          Alto|Independiente|              9| 2021-12|        Pagado|     1|
|e6dfa886-65c1-42d...|    Puno|  43|     2021-12-26|         5821|          Bajo|  Pensionista|             12| 2021-12|        Pagado|     1|
|9111a716-f226-481...|    Lima|  62|     2021-12-15|        34449|          Alto|  Desempleado|             12| 2021-12|        Pagado|     1|
|108cce7d-1445-461...|Chiclayo|  30|     2021-12-07|        19530|          Bajo|  Pensionista|              9| 2021-12|        Pagado|     1|

In [24]:
df = df.withColumn("target", when(df.pagos_atrasados >= 2, 1).otherwise(0))

In [27]:
df.groupBy("target").count().show()



+------+--------+
|target|   count|
+------+--------+
|     1|84616298|
|     0|15383702|
+------+--------+



                                                                                

In [None]:
### Featuring Engineering

In [30]:
cat_cols = ['distrito', 'estado_credito', 'nivel_ingresos', 'ocupacion']
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_vec") for c in cat_cols]

In [33]:
feature_cols = ['edad', 'monto_credito'] + [c + "_vec" for c in cat_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [None]:
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model_pipeline = pipeline.fit(df)
df_prepared = model_pipeline.transform(df)



In [None]:
train_df, test_df = df_prepared.randomSplit([0.8, 0.2], seed=42)

In [None]:
rf = RandomForestClassifier(labelCol="default", featuresCol="features", numTrees=100)
model = rf.fit(train_df)

In [None]:
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="default", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")


In [None]:
results = predictions.select("cliente_id", "probability", "prediction")
results.write.format("mongo").mode("overwrite").option("collection", "predicciones_riesgo").save()