# Entrenando un modelo de Regresón Logística para detección de pacientes con diabetes utilizando Apache Spark (PySpark) en Kaggle


Instalando bibliotecas necesarias con las dependencias correctas

In [None]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

Importando dependencias necesarias

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
#import sitexv
import wget
import sys
import re
import pandas as pd

Visualizando archivos en directorio de trabajo

In [2]:
!ls

 HMP_Dataset				 etl_lab.ipynb
'Registros históricos de diabetes.csv'	 final_project.ipynb
 Untitled.ipynb				 install.log
 claimed				'sparkml-diabetes (2).ipynb'
 claimed_1				 workflow.pipeline
 data					 workflow2.pipeline


In [3]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [4]:
!ls

 HMP_Dataset				 final_project.ipynb
'Registros históricos de diabetes.csv'	 install.log
 Untitled.ipynb				 jpmml-sparkml-executable-1.5.12.jar
 claimed				'sparkml-diabetes (2).ipynb'
 claimed_1				 workflow.pipeline
 data					 workflow2.pipeline
 etl_lab.ipynb


In [5]:
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name

In [6]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [7]:
conf = SparkConf().setMaster(master)
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

21/12/23 17:51:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Creando pandas DataFrame utilizando la url original (correo de requerimientos) del archivo .csv

In [9]:
df = pd.read_csv("Registros históricos de diabetes.csv", encoding = "ISO-8859-1")

Transformando a Spark DataFrame

In [10]:
df = spark.createDataFrame(df)

Visualizando los 5 primeros registros

In [11]:
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+--------+-------------------+---------------------------+---------------------------+--------------------------+------------------------+-----------+------------------------+----+-----------+
|Paciente|Nmero de embarazos|Nivel de glucosa plasmtica|Presion arterial diastlica|Espesor de pliegue cutneo|Nivel de insulina srica|        IMC|Probabilidad de diabetes|Edad|ÀDiabtico?|
+--------+-------------------+---------------------------+---------------------------+--------------------------+------------------------+-----------+------------------------+----+-----------+
| 1354778|                  0|                        171|                         80|                        34|                      23|43.50972593|             1.213191354|  21|          0|
| 1147438|                  8|                         92|                         93|                        47|                      36|21.24057571|             0.158364981|  23|          0|
| 1640031|                  7|     

                                                                                

In [12]:
df.printSchema()

root
 |-- Paciente: long (nullable = true)
 |-- Nmero de embarazos: long (nullable = true)
 |-- Nivel de glucosa plasmtica: long (nullable = true)
 |-- Presion arterial diastlica: long (nullable = true)
 |-- Espesor de pliegue cutneo: long (nullable = true)
 |-- Nivel de insulina srica: long (nullable = true)
 |-- IMC: double (nullable = true)
 |-- Probabilidad de diabetes: double (nullable = true)
 |-- Edad: long (nullable = true)
 |-- ÀDiabtico?: long (nullable = true)



### Análisis de clases

Cambiando nombre de columnas para facilitar el análisis

In [13]:
df = df.withColumnRenamed("Paciente", "id_paciente") \
.withColumnRenamed("Nmero de embarazos", "embarazos") \
.withColumnRenamed("Nivel de glucosa plasmtica", "glucosa") \
.withColumnRenamed("Presion arterial diastlica", "diastole") \
.withColumnRenamed("Espesor de pliegue cutneo", "espesor") \
.withColumnRenamed("Nivel de insulina srica", "insulina") \
.withColumnRenamed("IMC", "imc") \
.withColumnRenamed("Probabilidad de diabetes", "prob") \
.withColumnRenamed("Edad", "edad") \
.withColumnRenamed("ÀDiabtico?", "diab")

In [14]:
df.printSchema()

root
 |-- id_paciente: long (nullable = true)
 |-- embarazos: long (nullable = true)
 |-- glucosa: long (nullable = true)
 |-- diastole: long (nullable = true)
 |-- espesor: long (nullable = true)
 |-- insulina: long (nullable = true)
 |-- imc: double (nullable = true)
 |-- prob: double (nullable = true)
 |-- edad: long (nullable = true)
 |-- diab: long (nullable = true)



In [15]:
df.createOrReplaceTempView('diabetes')

In [16]:
positivos = spark.sql("SELECT COUNT(*) as count FROM diabetes WHERE diab = 1").first()['count'] 
print(positivos)



3344


                                                                                

In [17]:
negativos = spark.sql("SELECT COUNT(*) as count FROM diabetes WHERE diab = 0").first()['count'] 
print(negativos)



6656


                                                                                

In [18]:
totales = spark.sql("SELECT COUNT(*) as count FROM diabetes").first()['count'] 
print(totales)



10000


                                                                                

In [19]:
# Porcentaje de positivos
print("Porcentaje de positivos", str(positivos/totales*100))

# Porcentaje de negativos
print("Porcentaje de negativos", str(negativos/totales*100))

Porcentaje de positivos 33.44
Porcentaje de negativos 66.56


Existe un desbalance de clases con una relación de 2 a 1. Para intentar resolver este problema se pueden implementar diferentes estrategias como submuestreo de clase negativa, sobremuestreo de clase positiva, remuestreo o modificar la función de costos para considerar el peso de cada clase.

### Entrenamiento del modelo

¿Cuántos registros son únicos con respecto al ID de pacientes?

In [20]:
spark.sql("SELECT COUNT(DISTINCT id_paciente) as count FROM diabetes").first()['count'] 

                                                                                

9959

Existen pacientes con múltiples registros, Estos pacientes deberían estar en el mismo conjunto (validación o entrenamiento) para no sobrestimar las métricas de rendimiento del modelo, sin embargo, al ser menos del 1% de los registros se espera que su imacto sea mínimo.

Creando conjuntos de entrenamiento y validación con relación 80:20

In [21]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

¿Son coherentes los datos de la columna "Probabilidad de diabetes"?

In [22]:
spark.sql("SELECT COUNT(DISTINCT id_paciente) as count FROM diabetes \
WHERE prob > 1 OR prob < 0").show()



+-----+
|count|
+-----+
|  781|
+-----+



                                                                                

Como se puede observar en el query anterior, algunos registros no cumplen la definición de probabilidad.

A continuación se definen las características que podrían ser relevantes para el modelo predictivo a optimizar. Se excluyen como entradas del modelo las columnas prob y diab por ser las variables objetivo (target). Así como la columna id_paciente por ser identificador.  

In [23]:
input_columns = os.environ.get('input_columns',
                               '["embarazos", \
                               "glucosa", \
                               "diastole", \
                               "espesor", \
                               "insulina", \
                               "imc", \
                               "edad"]')

In [24]:
indexer = StringIndexer(inputCol="diab", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [25]:
lr = LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.1)

In [26]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [27]:
model = pipeline.fit(df_train)

21/12/23 17:54:09 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/23 17:54:09 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


Predicciones en el conjunto de entrenamiento

In [28]:
prediction = model.transform(df_train)

Predicciones en el conjunto de validación

In [29]:
prediction2 = model.transform(df_test)

Mostrando probabilidades de pertenencia a la clase positiva (diabético) del conjunto de entrenamiento (5 ejemplos)

In [30]:
prediction.select('probability', 'label').show(5)

+--------------------+-----+
|         probability|label|
+--------------------+-----+
|[0.80622990882120...|  0.0|
|[0.87379879015652...|  0.0|
|[0.27934286039219...|  1.0|
|[0.69642477031099...|  0.0|
|[0.80360131480540...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [31]:
prediction2.select('probability', 'label').show(5)

+--------------------+-----+
|         probability|label|
+--------------------+-----+
|[0.62878499798074...|  1.0|
|[0.20552617493825...|  1.0|
|[0.01663430322969...|  1.0|
|[0.94477828351039...|  0.0|
|[0.57213990872888...|  1.0|
+--------------------+-----+
only showing top 5 rows



### Métricas del conjunto de entrenamiento

In [32]:
prediction.printSchema()

root
 |-- id_paciente: long (nullable = true)
 |-- embarazos: long (nullable = true)
 |-- glucosa: long (nullable = true)
 |-- diastole: long (nullable = true)
 |-- espesor: long (nullable = true)
 |-- insulina: long (nullable = true)
 |-- imc: double (nullable = true)
 |-- prob: double (nullable = true)
 |-- edad: long (nullable = true)
 |-- diab: long (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_norm: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [33]:
metrics = ['accuracy', 'weightedPrecision', 'weightedRecall', 'f1','areaUnderROC']

In [34]:
for i in metrics:
    if i == 'areaUnderROC':
        binEval = BinaryClassificationEvaluator()
        print(i+": "+str(binEval.evaluate(prediction)))
    else:
        binEval = MulticlassClassificationEvaluator(). \
        setMetricName(i). \
        setPredictionCol("prediction"). \
        setLabelCol("label")
        print(i+": "+str(binEval.evaluate(prediction)))

                                                                                

accuracy: 0.7770219198790628


                                                                                

weightedPrecision: 0.7701009472022863
weightedRecall: 0.7770219198790627
f1: 0.7690588694184151


                                                                                

areaUnderROC: 0.8489522850781758


### Métricas del conjunto de validación

In [35]:
for i in metrics:
    if i == 'areaUnderROC':
        binEval = BinaryClassificationEvaluator()
        print(i+": "+str(binEval.evaluate(prediction2)))
    else:
        binEval = MulticlassClassificationEvaluator(). \
        setMetricName(i). \
        setPredictionCol("prediction"). \
        setLabelCol("label")
        print(i+": "+str(binEval.evaluate(prediction2)))

                                                                                

accuracy: 0.7832201745877788
weightedPrecision: 0.7805744057365932
weightedRecall: 0.7832201745877789


                                                                                

f1: 0.7750055052781868


                                                                                

areaUnderROC: 0.8552561016546716


La diferencia de 'accuracy' entre los dos conjuntos es apenas perceptible, lo que significa que el modelo está "ligeramente" subajustado. El F1 score es la media harmónica de la "precisión de recuperación de información" (probabilidad de hallar a un enfermo si la prueba es positiva) y sensibilidad (probabilidad de que la prueba sea positiva si el individuo está enfermo), siendo una métrica utilizada para conocer que tan buena es la prueba para encontrar positivos. Así mismo la AUC proporciona información sobre la eficiencia del modelo para diferentes umbrales de decisión.

### Creando modelo final en pmml para despliegue

In [36]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(model_target)

'/resources/labs/BD0231EN/model.xml'

In [37]:
help(PMMLBuilder) 

Help on class PMMLBuilder in module pyspark2pmml:

class PMMLBuilder(builtins.object)
 |  PMMLBuilder(sc, df, pipelineModel)
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sc, df, pipelineModel)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  build(self)
 |  
 |  buildByteArray(self)
 |  
 |  buildFile(self, path)
 |  
 |  putOption(self, pipelineStage, key, value)
 |  
 |  verify(self, df, precision=1e-14, zeroThreshold=1e-14)
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)

