

# Módulo 4 - Big Data
Tarea
Estudiante: Jairo Prado

# Lectura y limpieza

In [None]:
# Cargar el conjunto de datos completo
import findspark
findspark.init('/usr/lib/python3.7/site-packages/pyspark')

from pyspark.sql.types import (StringType, IntegerType, FloatType, 
                               DecimalType, StructField, StructType, DoubleType)

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

diabetes_df = spark \
    .read \
    .format("csv") \
    .option("path", "diabetes_dataset.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("age", IntegerType()),
                StructField("gender", StringType()),
                StructField("ethnicity", StringType()),
                StructField("education_level", StringType()),
                StructField("income_level", StringType()),
                StructField("employment_status", StringType()),
                StructField("smoking_status", StringType()),
                StructField("alcohol_consumption_per_week",  IntegerType()),
                StructField("physical_activity_minutes_per_week", IntegerType()),
                StructField("diet_score", DoubleType()),
                StructField("sleep_hours_per_day", DoubleType()),
                StructField("screen_time_hours_per_day", DoubleType()),
                StructField("family_history_diabetes", IntegerType()),
                StructField("hypertension_history", IntegerType()),
                StructField("cardiovascular_history", IntegerType()),
                StructField("bmi", DoubleType()),
                StructField("waist_to_hip_ratio", DoubleType()),
                StructField("systolic_bp", IntegerType()),
                StructField("diastolic_bp", IntegerType()),
                StructField("heart_rate", IntegerType()),
                StructField("cholesterol_total", IntegerType()),
                StructField("hdl_cholesterol", IntegerType()),
                StructField("ldl_cholesterol", IntegerType()),
                StructField("triglycerides", IntegerType()),
                StructField("glucose_fasting", IntegerType()),
                StructField("glucose_postprandial", IntegerType()),
                StructField("insulin_level", DoubleType()),
                StructField("hba1c", DoubleType()),
                StructField("diabetes_risk_score", DoubleType()),
                StructField("diabetes_stage", StringType()),
                StructField("diagnosed_diabetes", IntegerType())])) \
    .load()

#diabetes_df.printSchema()
diabetes_df.show()

In [None]:
# Seleccionar un subconjunto de las columnas de interés y el objectivo que es el precio
columns_kept = ['age','gender','ethnicity','education_level','income_level','employment_status','smoking_status','alcohol_consumption_per_week','physical_activity_minutes_per_week','diet_score','sleep_hours_per_day','screen_time_hours_per_day','family_history_diabetes','hypertension_history','cardiovascular_history','bmi','waist_to_hip_ratio','systolic_bp','diastolic_bp','heart_rate','cholesterol_total','hdl_cholesterol','ldl_cholesterol','triglycerides','glucose_fasting','glucose_postprandial','insulin_level','hba1c']

target = ['diagnosed_diabetes']

selected_columns_df = houses_df.select(columns_kept)

selected_columns_df.show()

In [None]:
# Seleccionar un subconjunto de las columnas de interés y el objectivo que es el precio
columns  = ['age','gender','ethnicity','smoking_status','alcohol_consumption_per_week','physical_activity_minutes_per_week','diet_score','sleep_hours_per_day','screen_time_hours_per_day','family_history_diabetes','hypertension_history','cardiovascular_history','bmi','waist_to_hip_ratio','systolic_bp','diastolic_bp','heart_rate','cholesterol_total','hdl_cholesterol','ldl_cholesterol','triglycerides','glucose_fasting','glucose_postprandial','insulin_level','hba1c', 'diagnosed_diabetes']

selected_columns = diabetes_df.select(columns)

selected_features_df.show()

In [None]:
# Analisys de nulos.
from pyspark.sql.functions import col, sum as _sum, when

columns = ['age','gender','ethnicity','smoking_status','alcohol_consumption_per_week',
        'physical_activity_minutes_per_week','diet_score','sleep_hours_per_day',
        'screen_time_hours_per_day','family_history_diabetes','hypertension_history',
        'cardiovascular_history','bmi','waist_to_hip_ratio','systolic_bp','diastolic_bp',
        'heart_rate','cholesterol_total','hdl_cholesterol','ldl_cholesterol','triglycerides',
        'glucose_fasting','glucose_postprandial','insulin_level','hba1c','diagnosed_diabetes']

# Compute null counts
null_counts = diabetes_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in columns
]).toPandas().T.reset_index()

# Rename columns
null_counts.columns = ['Column', 'Null_Count']

# Sort by count descending
null_counts = null_counts.sort_values(by='Null_Count', ascending=False)

# Pretty print
print("\n🧾 Null counts per column:\n")
print(null_counts.to_string(index=False))

In [None]:
# Almacenar el conjunto de datos limpio en la base de datos
correct_types_df \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "houses") \
    .save()

# Inspección de datos
Previo a entrenar el modelo es común que se realice algún tipo de descripción de los datos, para tener una idea del tipo de problema con el que nos enfrentamos. A continuación, algunas operacions útiles que podemos usar.


In [None]:
# Cargar el conjunto de datos. Esta vez desde la base de datos

# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "houses") \
    .load()

df.show()

In [None]:
# Información descriptiva del dataframe.
df.describe(['bedrooms', 'floors', 'yr_built']).show()

In [None]:
# Para realizar operaciones más detalladas es necesario expresar las filas originales en vectores
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot',
               'floors', 'condition', 'yr_built'],
    outputCol='features')

vector_df = assembler.transform(df)
vector_df = vector_df.select(['features', 'price'])
vector_df.show()


In [None]:
# Con la representación de vectores podemos calcular correlaciones
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt

pearson_matrix = Correlation.corr(vector_df, 'features').collect()[0][0]

sns.heatmap(pearson_matrix.toArray(), annot=True, fmt=".2f", cmap='viridis')

# Estandarización
Como recordamos de los módulos anteriores es deseable que los datos se encuentren estandarizados o normalizados, para evitar que la magnitud de ciertos atributos dominen el proceso de entrenamiento. El módulo `pyspark.ml.feature` tiene varias alternativas. A continuación se presenta una.

In [None]:
from pyspark.ml.feature import StandardScaler

standard_scaler = StandardScaler(inputCol='features', outputCol='scaled')
scale_model = standard_scaler.fit(vector_df)

scaled_df = scale_model.transform(vector_df)
scaled_df.show()

# Regresión lineal
Como ejemplo de un tipo de modelo a entrenar usaremos una regresión lineal básica para tratar de predecir el precio. Para ellos utilizaremos los datos estandarizados.

In [None]:
# Nótese que no se hace partición de datos de entrenamiento (ejercicio posterior).
from pyspark.ml.regression import LinearRegression

regression = LinearRegression(featuresCol='scaled', labelCol='price')
regression_model = regression.fit(scaled_df)

print('Pesos: {}\n b: {}'.format(regression_model.coefficients, regression_model.intercept))

print('RMSE: {} r2: {}'.format(
    regression_model.summary.rootMeanSquaredError,
    regression_model.summary.r2))

scaled_df.describe().show()

# Si se quiere hacer predicciones nada más se requiere llamar a regression_model.transform(df)
# Esto regresa el dataset con una columna nueva

# Ejercicio
- Dividir los datos leídos desde la base de datos en dos conjuntos utilizando las funcionalidades de muestreo de Spark.
- Entrene el modelo en uno y evalúe en otro. Escriba los resultados evaluados en una tabla en Postgresql.
- Implementar un esquema de k-fold cross validation. Evalúe los resultados según RMSE y r2.
- Pruebe diferentes parámetros a la regresión.
- Pruebe diferentes tipos de regresión.