<div style="position: absolute; top: 0; left: 0; font-family: 'Garamond'; font-size: 14px;">
    <a href="https://github.com/patriciaapenat" style="text-decoration: none; color: inherit;">Patricia Peña Torres</a>
</div>

<div align="center" style="font-family: 'Garamond'; font-size: 48px;">
    <strong>Proyecto final, BRFSS-clustering</strong>
</div>

<div align="center" style="font-family: 'Garamond'; font-size: 36px;">
    <strong>Imputación de valores nulos</strong>
</div>

__________________

<div style="font-family: 'Garamond'; font-size: 14px;">
    <normal>Dado que trabajamos con una base de datos extensa, es crucial examinar detenidamente la documentación. En este caso, me basé en las fuentes oficiales de la BBDD para explorar los datos. Comencé revisando el cuestionario (<a href="https://www.cdc.gov/brfss/questionnaires/pdf-ques/2022-BRFSS-Questionnaire-508.pdf" target="_blank">disponible aquí</a>) pero hay mayor concordancia con el codebook (<a href="https://www.cdc.gov/brfss/annual_data/2022/zip/codebook22_llcp-v2-508.zip" target="_blank">disponible aquí</a>), donde se encuentran los códigos asociados a las preguntas y respuestas. Esta revisión es esencial para comprender las preguntas formuladas y facilita la eliminación de secciones no pertinentes.
    En el presente notebook, llevé a cabo una revisión del documento mencionado, junto con el archivo de texto generado que contiene información sobre los valores nulos. La finalidad fue reducir con una limpieza rápida el dataset eliminando encuestas que no se habían completado o columnas que no utilizaremos.</normal>
</div>

<div style="font-family: 'Garamond'; font-size: 24px;">
    <strong>Importación de paquetes</strong>
</div>

In [1]:
import pandas as pd
import findspark
findspark.init()
import pyspark
import random
import os.path
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
from pyspark.sql import DataFrame
import pickle

<div style="font-family: 'Garamond'; font-size: 24px;">
    <strong>Configuración de Spark</strong>
</div>

In [2]:
# Si hay un SparkContext existente, debemos cerrarlo antes de crear uno nuevo
if 'sc' in locals() and sc:
    sc.stop()  # Detener el SparkContext anterior si existe

# Configuración de Spark
conf = (
    SparkConf()
    .setAppName("Proyecto_PatriciaA_Peña")  # Nombre de la aplicación en Spark
    .setMaster("local[1]")  # Modo local con un hilo para ejecución
    .set("spark.driver.host", "127.0.0.1")  # Dirección del host del driver
    .set("spark.executor.heartbeatInterval", "3600s")  # Intervalo de latido del executor
    .set("spark.network.timeout", "7200s")  # Tiempo de espera de la red
    .set("spark.executor.memory", "14g")  # Memoria asignada para cada executor
    .set("spark.driver.memory", "14g")  # Memoria asignada para el driver
)

# Crear un nuevo SparkContext con la configuración especificada
sc = SparkContext(conf=conf)

# Configuración de SparkSession (interfaz de alto nivel para trabajar con datos estructurados en Spark)
spark = (
    SparkSession.builder
    .appName("Proyecto_PatriciaA_Peña")  # Nombre de la aplicación en Spark
    .config("spark.sql.repl.eagerEval.enabled", True)  # Habilitar la evaluación perezosa en Spark SQL REPL
    .config("spark.sql.repl.eagerEval.maxNumRows", 1000)  # Número máximo de filas a mostrar en la evaluación perezosa
    .getOrCreate()  # Obtener la sesión Spark existente o crear una nueva si no existe
) 

<div style="font-family: 'Garamond'; font-size: 16px;">
    <strong>Lectura del archivo</strong>
</div>

In [3]:
# Lee el archivo CSV 
df = spark.read.csv(r"C:\\Users\\patri\\OneDrive - UAB\\Documentos\\GitHub\\BRFSS-clustering\\datos\\BRFSS_Cleaner_2022.csv", header=True, inferSchema=True)

In [4]:
# Convertir todas las columnas a tipo numérico
for column_name in df.columns:
    df = df.withColumn(column_name, col(column_name).cast("double"))

In [5]:
def guardar_info_nulos_en_txt(df: DataFrame, archivo_nombre: str):
    # Ruta del archivo de texto
    file_path = f"C:\\Users\\patri\\OneDrive - UAB\\Documentos\\GitHub\\BRFSS-clustering\\tratamiento\\{archivo_nombre}.txt"

    # Verifica si el archivo ya existe
    if os.path.exists(file_path):
        print(f"¡Advertencia! El archivo '{file_path}' ya existe. No se ha sobrescrito. Por favor, elija otro nombre.")
        return
    
    # Abre el archivo en modo de escritura (crea uno nuevo)
    with open(file_path, 'w') as file:
        # Escribe la cantidad de valores nulos por columna en el archivo
        for col in df.columns:
            null_count = df.filter(df[col].isNull() | (df[col] == "")).count()
            file.write(f"{col}: {null_count} valores nulos\n")

    print(f"La información sobre valores nulos del DataFrame '{archivo_nombre}' ha sido guardada en: {file_path}")

<div style="font-family: 'Garamond'; font-size: 14px;">
    <normal>
Trabajamos con un dataset muy extenso así que por ello lo reduciremos en este notebook.</normal>
</div>

In [6]:
guardar_info_nulos_en_txt(df, "nulos_3df")

La información sobre valores nulos del DataFrame 'nulos_3df' ha sido guardada en: C:\Users\patri\OneDrive - UAB\Documentos\GitHub\BRFSS-clustering\tratamiento\nulos_3df.txt


In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id

In [14]:
# Agregar una columna de identificación utilizando el número de índice de las filas
df = df.withColumn("ID", monotonically_increasing_id())

In [29]:
def impute_mode_values_and_join(df_imputed, conditions_mode):
    for col_name, condition in conditions_mode.items():
        mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
        if isinstance(condition, bool):  
            df_imputed = df_imputed.withColumn(col_name + '_imputed', lit(mode_value if condition else 0)) 
        else:
            df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))
    
    df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')
    
    return df_imputed

In [None]:
# Lista de columnas para imputar con Imputer
columns_to_impute = [
    'NUMADULT', 'NUMMEN', 'NUMWOMEN', 'GENHLTH', 'PHYSHLTH', 'MENTHLTH', 'POORHLTH',
    'PRIMINSR', 'PERSDOC3', 'MEDCOST1', 'CHECKUP1', 'EXERANY2', 'SLEPTIM1', 'LASTDEN4',
    'RMVTETH4', 'CVDINFR4', 'CVDCRHD4', 'CVDSTRK3', 'ASTHMA3', 'CHCSCNC1', 'CHCOCNC1',
    'CHCCOPD3', 'ADDEPEV3', 'CHCKDNY2', 'HAVARTH4', 'DIABETE4', 'DIABAGE4', 'MARITAL',
    'EDUCA', 'RENTHOM1', 'VETERAN3', 'EMPLOY1', 'INCOME', 'COPDCOGH', 'COPDFLEM',
    'COPDBRTH', 'COPDBTST', 'COPDSMOK',
    'DEAF', 'BLIND', 'DECIDE', 'DIFFWALK', 'DIFFDRES', 'DIFFALON'
]

# Crear el imputer con estrategia 'mode' para imputar con la moda
imputer = Imputer(
    inputCols=columns_to_impute,
    outputCols=[f"{col}_imputed" for col in columns_to_impute],  # Nombres de las columnas imputadas
    strategy='mode'  # Utilizar la moda como estrategia de imputación
)

# Aplicar el imputer y transformar el DataFrame
imputer_model = imputer.fit(df)
df_imputed = imputer_model.transform(df)

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in columns_to_impute]), 'ID', 'left')

In [None]:
# Imputación con media para la columna CHILDREN
mean_value = df.selectExpr('avg(CHILDREN) as mean_CHILDREN').collect()[0]['mean_CHILDREN']
df_imputed = df_imputed.fillna(mean_value, subset=['CHILDREN']).withColumn('CHILDREN_imputed', col('CHILDREN').cast('double'))

# Realizar el primer join
df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in columns_to_impute]), 'ID', 'left')

In [None]:
# Salud ginecológica

In [None]:

conditions_mode1 = {
    "ASTHNOW": col("ASTHMA3").isin([2, 7, 9]), #Esta es relativa a si el asma persiste,
    "HADMAM": col("_SEX") == 1,
    "HOWLONG": col("_SEX") == 1,
    "CERVSCRN": col("_SEX") == 1,
    "CRVCLCNC": col("_SEX") == 1,
    "CRVCLPAP": col("_SEX") == 1,
    "CRVCLHPV": col("_SEX") == 1,
    "HADHYST2": col("_SEX") == 1,
    "PREGNANT": col("_SEX") == 1
}

df_imputed = impute_mode_values_and_join(df_imputed, conditions_mode)


# Condiciones y cálculo de moda para las columnas HADSIGM4, COLNSIGM, LASTSIG4, COLNCNCR, VIRCOLO1, VCLNTES2, SMALSTOL, STOLTEST, STOOLDN2, BLDSTFIT, SDNATES1
conditions_mode2 = {
    'HADSIGM4': col("_AGE80") < 45,
    'COLNSIGM': (col("_AGE80") < 45) | (col("HADSIGM4").isin([1])) | (col("COLNSIGM").isin([1, 7, 9, None])),
    'LASTSIG4': (col("_AGE80") < 45) | (col("HADSIGM4").isin([2, 7, 9, None])) | (col("COLNSIGM").isin([9, None])) | (col("SIGMTES1").isNotNull()),
    'COLNCNCR': col("_AGE80") < 45,
    'VIRCOLO1': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])),
    'VCLNTES2': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])) | (col("VIRCOLO1").isin([2, 7, 9])),
    'SMALSTOL': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])),
    'STOLTEST': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])) | (col("SMALSTOL").isin([2, 7, 9, None])),
    'STOOLDN2': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])),
    'BLDSTFIT': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])) | (col("STOOLDN2").isin([2, 7, 9, None])),
    'SDNATES1': (col("_AGE80") < 45) | (col("COLNCNCR").isin([2, 7, 9, None])) | (col("STOOLDN2").isin([2, 7, 9, None])),
}

df_imputed = impute_mode_values_and_join(df_imputed, conditions_mode)

# Condiciones y cálculo de moda para las columnas SMOKE100, SMOKDAY2, USENOW3, ECIGNOW2, LCSFIRST, LCSLAST, LCSNUMCG, LCSCTSC1, LCSSCNCR, LCSCTWHN
conditions_mode = {
    'SMOKE100': True,
    'SMOKDAY2': col('SMOKE100').isin([2, 7, 9, None]),
    'USENOW3': col('SMOKE100').isin([2, 7, 9, None]),
    'ECIGNOW2': True,
    'LCSFIRST': col('SMOKE100').isin([2, 7, 9, None]) | col('SMOKDAY2').isin([7, 9, None]),
    'LCSLAST': col('SMOKE100').isin([2, 7, 9, None]) | col('SMOKDAY2').isin([7, 9, None]) | col('LCSFIRST').isin([888, None]),
    'LCSNUMCG': col('SMOKE100').isin([2, 7, 9, None]) | col('SMOKDAY2').isin([7, 9, None]) | col('LCSFIRST').isin([888, None]),
    'LCSCTSC1': True,
    'LCSSCNCR': col('LCSCTSC1').isin([2, 7, 9, None]),
    'LCSCTWHN': col('LCSCTSC1').isin([2, 7, 9, None]) | col('LCSSCNCR').isin([2, 7, 9, None]),
}

for col_name, condition in conditions_mode.items():
    mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
    if isinstance(condition, bool):  
        df_imputed = df_imputed.withColumn(col_name + '_imputed', lit(mode_value if condition else 0)) 
    else:
        df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')

# Condiciones y cálculo de moda para las columnas FLUSHOT7, TETANUS1, PNEUVAC4, HIVTST7, HIVRISK5, COVIDPOS
columns_to_impute_mode = ['FLUSHOT7', 'TETANUS1', 'PNEUVAC4', 'HIVTST7', 'HIVRISK5', 'COVIDPOS']

for col_name in columns_to_impute_mode: 
    mode_value = df.groupBy(col_name).count().orderBy('count', ascending=False).first()[col_name] 
    df_imputed = df_imputed.withColumn(col_name + '_imputed', when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in columns_to_impute_mode]), 'ID', 'left')


pero usando esta función

def impute_mode_values_and_join(df_imputed, conditions_mode):
    for col_name, condition in conditions_mode.items():
        mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
        if isinstance(condition, bool):  
            df_imputed = df_imputed.withColumn(col_name + '_imputed', lit(mode_value if condition else 0)) 
        else:
            df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))
    
    df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')
    
    return df_imputed


In [None]:

conditions_mode = {
    'CNCRDIFF': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull(),
    'CSRVTRT3': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull(),
    'CSRVPAIN': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull(),
    'CSRVCTL2': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVPAIN').isin([2, 7, 9]) | col('CSRVPAIN').isNull(),
    'CSRVDOC1': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVSUM': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVRTRN': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVINST': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVINSR': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVDEIN': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CSRVCLIN': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CSRVTRT3').isin([1, 3, 4, 5, 7, 9]) | col('CSRVTRT3').isNull(),
    'CNCRAGE': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CNCRDIFF').isin([7, 9]) | col('CNCRDIFF').isNull(),
    'CNCRTYP2': col('CHCSCNC1').isin([2, 7, 9]) | col('CHCOCNC1').isin([2, 7, 9]) | col('CHCSCNC1').isNull() | col('CHCOCNC1').isNull() | col('CNCRDIFF').isin([7, 9]) | col('CNCRDIFF').isNull()
}

for col_name, condition in conditions_mode.items():
    mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
    df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')
                
conditions_mode = {
    'PSATEST1': (col('_SEX') == 1) | (col('_AGE80') < 40),
    'PSASUGST': (col('_SEX') == 1) | (col('_AGE80') < 40),
    'PCSTALK1': (col('_SEX') == 1) | (col('_AGE80') < 40),
    'PCPSARS2': (col('_SEX') == 1) | (col('_AGE80') < 40) | col('PSATEST1').isin([2, 7, 9]) | col('PSATEST1').isNull(),
}

for col_name, condition in conditions_mode.items():
    mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
    df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')

conditions_mode = {
    'CIMEMLOS': col('_AGE80') < 45,
    'CDHOUSE': (col('_AGE80') < 45) | col('CIMEMLOS').isin([2, 9]),
    'CDASSIST': (col('_AGE80') < 45) | col('CIMEMLOS').isin([2, 9]),
    'CDSOCIAL': (col('_AGE80') < 45) | col('CIMEMLOS').isin([2, 9]),
    'CDDISCUS': (col('_AGE80') < 45) | col('CIMEMLOS').isin([2, 9]),
    'CDHELP': (col('_AGE80') < 45) | col('CIMEMLOS').isin([2, 9]) | col('CDASSIST').isin([4, 5, 7, 9]),
}

for col_name, condition in conditions_mode.items():
    mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
    df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')

# Condiciones y cálculo de moda para las nuevas columnas
conditions_mode = {
    'CAREGIV1': True,
    'CRGVREL4': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVLNG1': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVEXPT': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVPER1': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVHOU1': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVHRS1': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVPRB3': col('CAREGIV1').isin([2, 8, 7, 9]),
    'CRGVALZD': (col('CAREGIV1').isin([2, 8, 7, 9])) | (col('CRGVPRB3') == 5),
}

for col_name, condition in conditions_mode.items():
    mode_value = df.groupBy().agg({col_name: "avg"}).collect()[0][0]
    if isinstance(condition, bool):  
        df_imputed = df_imputed.withColumn(col_name + '_imputed', lit(mode_value if condition else 0)) 
    else:
        df_imputed = df_imputed.withColumn(col_name + '_imputed', when(condition, mode_value).otherwise(0))

df_imputed = df_imputed.join(df_imputed.select('ID', *[f"{col}_imputed" for col in conditions_mode.keys()]), 'ID', 'left')


In [None]:
df_imputed