In [33]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when

from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import expr


In [34]:
spark = SparkSession.builder.appName(
    "ClasificaciónEnfermedadCardiaca"
).getOrCreate()

# Leer los datos
df = spark.read.option(
    "delimiter", " "
).csv('C:/Users/bebit/Desktop/tareas/csv/heart.svc', inferSchema=True, header=False)
df.show(5)

+----+---+---+-----+-----+---+---+-----+---+---+----+----+----+----+
| _c0|_c1|_c2|  _c3|  _c4|_c5|_c6|  _c7|_c8|_c9|_c10|_c11|_c12|_c13|
+----+---+---+-----+-----+---+---+-----+---+---+----+----+----+----+
|70.0|1.0|4.0|130.0|322.0|0.0|2.0|109.0|0.0|2.4| 2.0| 3.0| 3.0|   2|
|67.0|0.0|3.0|115.0|564.0|0.0|2.0|160.0|0.0|1.6| 2.0| 0.0| 7.0|   1|
|57.0|1.0|2.0|124.0|261.0|0.0|0.0|141.0|0.0|0.3| 1.0| 0.0| 7.0|   2|
|64.0|1.0|4.0|128.0|263.0|0.0|0.0|105.0|1.0|0.2| 2.0| 1.0| 7.0|   1|
|74.0|0.0|2.0|120.0|269.0|0.0|2.0|121.0|1.0|0.2| 1.0| 1.0| 3.0|   1|
+----+---+---+-----+-----+---+---+-----+---+---+----+----+----+----+
only showing top 5 rows



In [35]:
# Renombrar las columnas
new_column_names = [
    'año',
    'sexo',
    'tipoDolor',
    'presionDescanso', 
    'colesterol',
    'azucarSangreL120', 
    'electrocardioDescanso',
    'frecCardMax', 
    'angina',
    'depresionST',
    'pendienteST', 
    'numVasos',
    'thal'
]

for i in range(len(new_column_names)):
    df = df.withColumnRenamed(df.columns[i], new_column_names[i])

df = df.drop(df.columns[-1])
df.show(5)

+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+
| año|sexo|tipoDolor|presionDescanso|colesterol|azucarSangreL120|electrocardioDescanso|frecCardMax|angina|depresionST|pendienteST|numVasos|thal|
+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+
|70.0| 1.0|      4.0|          130.0|     322.0|             0.0|                  2.0|      109.0|   0.0|        2.4|        2.0|     3.0| 3.0|
|67.0| 0.0|      3.0|          115.0|     564.0|             0.0|                  2.0|      160.0|   0.0|        1.6|        2.0|     0.0| 7.0|
|57.0| 1.0|      2.0|          124.0|     261.0|             0.0|                  0.0|      141.0|   0.0|        0.3|        1.0|     0.0| 7.0|
|64.0| 1.0|      4.0|          128.0|     263.0|             0.0|                  0.0|      105.0|   1.0|        0.2|        2.0|

In [36]:
df = df.withColumn(
    'enfermo',
    when((df['thal'] == 3) | (df['thal'] == 6), 0).otherwise(1)
)
df.show(5)

+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+
| año|sexo|tipoDolor|presionDescanso|colesterol|azucarSangreL120|electrocardioDescanso|frecCardMax|angina|depresionST|pendienteST|numVasos|thal|enfermo|
+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+
|70.0| 1.0|      4.0|          130.0|     322.0|             0.0|                  2.0|      109.0|   0.0|        2.4|        2.0|     3.0| 3.0|      0|
|67.0| 0.0|      3.0|          115.0|     564.0|             0.0|                  2.0|      160.0|   0.0|        1.6|        2.0|     0.0| 7.0|      1|
|57.0| 1.0|      2.0|          124.0|     261.0|             0.0|                  0.0|      141.0|   0.0|        0.3|        1.0|     0.0| 7.0|      1|
|64.0| 1.0|      4.0|          128.0|     263.0|             0.0|                 

In [37]:
df = df.transform(
    lambda df: df.withColumn(
        'enfermoNuevo',
        when((df['thal'] == 3) | (df['thal'] == 6), 0).otherwise(1))
)

df.show(5)

+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+------------+
| año|sexo|tipoDolor|presionDescanso|colesterol|azucarSangreL120|electrocardioDescanso|frecCardMax|angina|depresionST|pendienteST|numVasos|thal|enfermo|enfermoNuevo|
+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+------------+
|70.0| 1.0|      4.0|          130.0|     322.0|             0.0|                  2.0|      109.0|   0.0|        2.4|        2.0|     3.0| 3.0|      0|           0|
|67.0| 0.0|      3.0|          115.0|     564.0|             0.0|                  2.0|      160.0|   0.0|        1.6|        2.0|     0.0| 7.0|      1|           1|
|57.0| 1.0|      2.0|          124.0|     261.0|             0.0|                  0.0|      141.0|   0.0|        0.3|        1.0|     0.0| 7.0|      1|           1|
|64.

In [38]:
# Eliminar la columna 'enfermoNuevo'
df = df.drop('enfermoNuevo')
df.show(5)

+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+
| año|sexo|tipoDolor|presionDescanso|colesterol|azucarSangreL120|electrocardioDescanso|frecCardMax|angina|depresionST|pendienteST|numVasos|thal|enfermo|
+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+
|70.0| 1.0|      4.0|          130.0|     322.0|             0.0|                  2.0|      109.0|   0.0|        2.4|        2.0|     3.0| 3.0|      0|
|67.0| 0.0|      3.0|          115.0|     564.0|             0.0|                  2.0|      160.0|   0.0|        1.6|        2.0|     0.0| 7.0|      1|
|57.0| 1.0|      2.0|          124.0|     261.0|             0.0|                  0.0|      141.0|   0.0|        0.3|        1.0|     0.0| 7.0|      1|
|64.0| 1.0|      4.0|          128.0|     263.0|             0.0|                 

In [39]:
# Ensamblar todas las características en un solo vector
assembler = VectorAssembler(
    inputCols=[
        'año', 'sexo', 'tipoDolor', 'presionDescanso',
        'colesterol', 'azucarSangreL120', 'electrocardioDescanso',
        'frecCardMax', 'angina', 'depresionST', 'pendienteST',
        'numVasos'
    ],
    outputCol='caracteristicas'
)

df = assembler.transform(df)
df.show(5)

+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+--------------------+
| año|sexo|tipoDolor|presionDescanso|colesterol|azucarSangreL120|electrocardioDescanso|frecCardMax|angina|depresionST|pendienteST|numVasos|thal|enfermo|     caracteristicas|
+----+----+---------+---------------+----------+----------------+---------------------+-----------+------+-----------+-----------+--------+----+-------+--------------------+
|70.0| 1.0|      4.0|          130.0|     322.0|             0.0|                  2.0|      109.0|   0.0|        2.4|        2.0|     3.0| 3.0|      0|[70.0,1.0,4.0,130...|
|67.0| 0.0|      3.0|          115.0|     564.0|             0.0|                  2.0|      160.0|   0.0|        1.6|        2.0|     0.0| 7.0|      1|[67.0,0.0,3.0,115...|
|57.0| 1.0|      2.0|          124.0|     261.0|             0.0|                  0.0|      141.0|   0.0|        0.3|        1.0|

In [40]:
# Seleccionar solo las 'caracteristicas' y la columna 'enfermo' para el modelo
datos_modelo = df.select('caracteristicas', 'enfermo')
datos_modelo.show(5)

+--------------------+-------+
|     caracteristicas|enfermo|
+--------------------+-------+
|[70.0,1.0,4.0,130...|      0|
|[67.0,0.0,3.0,115...|      1|
|[57.0,1.0,2.0,124...|      1|
|[64.0,1.0,4.0,128...|      1|
|[74.0,0.0,2.0,120...|      0|
+--------------------+-------+
only showing top 5 rows



In [62]:
# 1. Use Standard Scaler para poner todos los números en la misma escala. Esto toma la observación y resta la media, y luego la divide por la
# desviación estándar.
scaler = StandardScaler(inputCol='caracteristicas', outputCol='caracteristicas_esc')
scaler_model = scaler.fit(datos_modelo)
datos_entrenamiento_esc = scaler_model.transform(datos_modelo)
datos_prueba_esc = scaler_model.transform(datos_prueba)

In [63]:
# Imprima el estado actual de sus datos
datos_entrenamiento_esc.show(5)
datos_prueba_esc.show(5)

+--------------------+-------+--------------------+
|     caracteristicas|enfermo| caracteristicas_esc|
+--------------------+-------+--------------------+
|[70.0,1.0,4.0,130...|      0|[7.68465131046640...|
|[67.0,0.0,3.0,115...|      1|[7.35530911144641...|
|[57.0,1.0,2.0,124...|      1|[6.25750178137978...|
|[64.0,1.0,4.0,128...|      1|[7.02596691242642...|
|[74.0,0.0,2.0,120...|      0|[8.12377424249305...|
+--------------------+-------+--------------------+
only showing top 5 rows

+--------------------+--------+--------------------+
|     caracteristicas|etiqueta| caracteristicas_esc|
+--------------------+--------+--------------------+
|(12,[0,2,3,4,7,10...|       0|(12,[0,2,3,4,7,10...|
|(12,[0,2,3,4,7,10...|       0|(12,[0,2,3,4,7,10...|
|(12,[0,2,3,4,7,10...|       0|(12,[0,2,3,4,7,10...|
|[29.0,1.0,2.0,130...|       0|[3.18364125719322...|
|[34.0,0.0,2.0,118...|       0|[3.73254492222653...|
+--------------------+--------+--------------------+
only showing top 5 rows



In [65]:
# Divida los datos en conjuntos de datos de entrenamiento y prueba. Use 50/50.
train_data, test_data = datos_modelo.randomSplit([0.5, 0.5])


In [47]:
#  Cree un modelo de regresión logística y entrénalo.
rl = LogisticRegression(featuresCol='caracteristicas', labelCol='enfermo')

# Ajustar el modelo a los datos de entrenamiento
modelo_rl = rl.fit(datos_entrenamiento_div)

In [69]:
# Finalmente Use la función F de Spark SQL para crear una nueva columna correcta cuando ENFERMO() es igual a la predicción, lo que significa que
# el resultado predicho es igual a los resultados reales.

lr = LogisticRegression(featuresCol='caracteristicas', labelCol='etiqueta')
modelo_lr = lr.fit(datos_entrenamiento)

# Realizar predicciones en los datos de prueba
predicciones = modelo_lr.transform(datos_prueba)

# Mostrar algunas predicciones
predicciones.show()




+--------------------+--------+--------------------+--------------------+----------+
|     caracteristicas|etiqueta|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|(12,[0,2,3,4,7,10...|       0|[3.33700562000298...|[0.96567673131360...|       0.0|
|(12,[0,2,3,4,7,10...|       0|[3.84525573468476...|[0.97906664171550...|       0.0|
|(12,[0,2,3,4,7,10...|       0|[2.31676665790501...|[0.91025615946216...|       0.0|
|[29.0,1.0,2.0,130...|       0|[1.76436478788113...|[0.85375547670381...|       0.0|
|[34.0,0.0,2.0,118...|       0|[3.23682149295224...|[0.96219666339790...|       0.0|
|[35.0,1.0,4.0,126...|       1|[0.13931292281631...|[0.53477201069284...|       0.0|
|[38.0,1.0,1.0,120...|       1|[-1.1360018561565...|[0.24305518108066...|       1.0|
|[40.0,1.0,4.0,152...|       1|[0.38560443416449...|[0.59522410999911...|       0.0|
|[41.0,0.0,3.0,112...|       0|[2.61976013698929...|[0.9321225318