# TRAINING - RANDOM FOREST

The objective of this notebook is to train a Random Forest model to apply it to the test data.

## SET UP

In [32]:
!pip install findspark

import findspark
findspark.init()



## LIBRARIES

In [33]:
# Cargar Pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("model").master("local[*]").getOrCreate()

In [34]:
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [35]:
import time
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import math
import time

warnings.filterwarnings('ignore')

## LOAD DATA

In [36]:
# Path definition
path = "output/preprocessing/preprocessing_data"

In [37]:
# Read dataframe
train = spark.read.parquet(path, header= True, inferSchema=True)

In [38]:
# Verify rows
train.count()

9670308

In [39]:
# Cast binary variables
train = (train
         .withColumn("ind_mora_vigente",col("ind_mora_vigente").cast("integer"))
         .withColumn("cartera_castigada",col("cartera_castigada").cast("integer"))
         .withColumn("tenencia_tc",col("tenencia_tc").cast("integer"))
         .withColumn("tiene_consumo",col("tiene_consumo").cast("integer"))
         .withColumn("tiene_crediagil",col("tiene_crediagil").cast("integer"))
         .withColumn("tiene_ctas_activas",col("tiene_ctas_activas").cast("integer"))
         .withColumn("tiene_ctas_embargadas",col("tiene_ctas_embargadas").cast("integer"))
         .withColumn("tiene_cred_hipo_1",col("tiene_cred_hipo_1").cast("integer"))
         .withColumn("tiene_cred_hipo_2",col("tiene_cred_hipo_2").cast("integer"))
         .withColumn("pension_fopep",col("pension_fopep").cast("integer"))
        )

In [41]:
# Set testing path
path = "output/preprocessing/preprocessing_data_test"
# Read dataframe
test = spark.read.parquet(path, header= True, inferSchema=True)
# Verify rows
test.count()

281666

In [42]:
test = (test
         .withColumn("ind_mora_vigente",col("ind_mora_vigente").cast("integer"))
         .withColumn("cartera_castigada",col("cartera_castigada").cast("integer"))
         .withColumn("tenencia_tc",col("tenencia_tc").cast("integer"))
         .withColumn("tiene_consumo",col("tiene_consumo").cast("integer"))
         .withColumn("tiene_crediagil",col("tiene_crediagil").cast("integer"))
         .withColumn("tiene_ctas_activas",col("tiene_ctas_activas").cast("integer"))
         .withColumn("tiene_ctas_embargadas",col("tiene_ctas_embargadas").cast("integer"))
         .withColumn("tiene_cred_hipo_1",col("tiene_cred_hipo_1").cast("integer"))
         .withColumn("tiene_cred_hipo_2",col("tiene_cred_hipo_2").cast("integer"))
         .withColumn("pension_fopep",col("pension_fopep").cast("integer"))
        )

## FEATURE ENGINEER

In [43]:
train = (train
        .withColumn("gastos"
                    , col('cuota_cred_hipot') 
                    + col('cuota_de_vivienda') 
                    + col('cuota_de_consumo')
                    + col('cuota_rotativos')
                    + col('cuota_tarjeta_de_credito')
                    + col('cuota_de_sector_solidario')
                    + col('cuota_sector_real_comercio')
                    + col('cuota_tc_mdo')
                    + col('cuota_libranza_sf')
                   )
        .withColumn("ingresos" 
                    ,col('ingreso_nompen')
                    + col('ingreso_final')
                    + col('ingreso_nomina')
                    + col('ingreso_segurida_social')
                   )
       )

In [44]:
test = (test
        .withColumn("gastos"
                    , col('cuota_cred_hipot') 
                    + col('cuota_de_vivienda') 
                    + col('cuota_de_consumo')
                    + col('cuota_rotativos')
                    + col('cuota_tarjeta_de_credito')
                    + col('cuota_de_sector_solidario')
                    + col('cuota_sector_real_comercio')
                    + col('cuota_tc_mdo')
                    + col('cuota_libranza_sf')
                   )
        .withColumn("ingresos" 
                    ,col('ingreso_nompen')
                    + col('ingreso_final')
                    + col('ingreso_nomina')
                    + col('ingreso_segurida_social')
                   )
       )

In [45]:
# Clean input variables
input_features = [
    'ingreso_final',
    'cuota_cred_hipot',
    'gastos',
    'ingresos',
    'Independiente',
    'EXTERIOR',
    'edad',
    'estado_civil',
    'ctas_activas',
    'tipo_vivienda',
    'ingreso_segurida_social',
    'tiene_cred_hipo_2',
    'ind_mora_vigente',
    'cuota_de_consumo',
    'cant_moras_90_ult_12_meses',
    'cant_moras_30_ult_12_meses',
    'tiene_crediagil',
    'cant_oblig_tot_sf',
    'ANDINA',
    'pension_fopep',
    'pol_centr_ext',
    'cant_cast_ult_12m_sr',
    'cuota_de_sector_solidario',
    'cuota_libranza_sf',
    'saldo_no_rot_mdo',
    'cuota_tc_mdo',
    'saldo_prom3_tdc_mdo',
    'cuota_tarjeta_de_credito',
    'mediana_pen3',
    'tenencia_tc',
    'Pensionado',
    'nivel_academico',
    'Empleado',
    'Estudiante',
    'cat_ingreso'
]

## MODELING

In [46]:
# Vectorize variables to define the features column
feat_vector = VectorAssembler(inputCols=input_features, outputCol= "features")

In [47]:
# Appply vectorization to train
transTrain = feat_vector.transform(train)

In [48]:
# select variables for the train model
train_model = transTrain.select("id_cli","periodo","features","gasto_familiar")

In [49]:
# Appply vectorization to test
transTest = feat_vector.transform(test)

In [50]:
# select variables for the test model
test_model = transTest.select("id_cli","periodo","features")

In [51]:
test_model.show(5)

+-------+-------+--------------------+
| id_cli|periodo|            features|
+-------+-------+--------------------+
|1165927| 201908|(35,[0,1,2,3,4,6,...|
|1172919| 201908|(35,[0,2,3,6,7,8,...|
|1538512| 201909|(35,[0,1,2,3,6,7,...|
|3371270| 202004|(35,[0,2,3,6,7,8,...|
|2784853| 202003|(35,[0,2,3,6,7,8,...|
+-------+-------+--------------------+
only showing top 5 rows



In [52]:
# Homologate target variable to label
trainingData = train_model.withColumnRenamed("gasto_familiar","label")

In [53]:
# Homologate target variable to label
testData = test_model.withColumnRenamed("gasto_familiar","label")

In [54]:
# https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)

In [55]:
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

In [56]:
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

In [57]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

In [58]:
# Make predictions.
predictions = model.transform(testData)

In [59]:
# Select example rows to display.
predictions = predictions.withColumnRenamed("label","gasto_familiar")

In [60]:
predictions.count()

281666

In [61]:
# Show prediction
df_final = predictions.select(concat(str("id_cli"),lit('#'),str("periodo")).alias("id_registro"),col("prediction").alias("gasto_familiar"))
df_final.show()

+--------------+------------------+
|   id_registro|    gasto_familiar|
+--------------+------------------+
|1165927#201908|2059832.5359667938|
|1172919#201908|1001361.6224041565|
|1538512#201909| 589659.1832191929|
|3371270#202004| 575940.4571666874|
|2784853#202003| 699491.6204102885|
|2219310#202001| 554293.8887390872|
|2220638#202001|  728263.084622402|
|1568926#201909| 664262.6429371256|
|1573039#201909| 794636.4739709541|
|2430420#202002| 986137.3669881342|
|1455662#201909|  899511.938925623|
|1461386#201909|1161394.0904536599|
|1463134#201909|1592455.1125530212|
|1477680#201909| 580233.6830473272|
|1361632#201909| 1771518.104507753|
|1362813#201909| 680326.7691991692|
|1379397#201909|1265892.1155196384|
| 834901#201905| 624556.8201242986|
| 839363#201905|1201493.6403914555|
| 842749#201905|2271207.6035069595|
+--------------+------------------+
only showing top 20 rows



In [62]:
# Time mark
ts = time.strftime('%Y%m%d%H%M%S')
path = "output/implementations/model_RF_" + str(ts) + ".csv"

In [63]:
# Save
df_final.toPandas().to_csv(path, index=False)