In [1]:
import os
import subprocess

java_home = subprocess.check_output(["/usr/libexec/java_home", "-v", "17"]).strip().decode('utf-8')

# Set JAVA_HOME and PATH
os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = os.path.join(java_home, "bin") + ":" + os.environ["PATH"]
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local pyspark-shell"

# Verify JAVA_HOME and Java version
print("JAVA_HOME:", os.environ['JAVA_HOME'])
!java -version

JAVA_HOME: /Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home
java version "17.0.8" 2023-07-18 LTS
Java(TM) SE Runtime Environment (build 17.0.8+9-LTS-211)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.8+9-LTS-211, mixed mode, sharing)


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CBRFSS") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

file_path = "data.parquet/ "
df = spark.read.parquet("data.parquet")

24/11/28 15:32:50 WARN Utils: Your hostname, ChandeMacBook-Air-8.local resolves to a loopback address: 127.0.0.1; using 10.89.238.200 instead (on interface en0)
24/11/28 15:32:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/28 15:32:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
num_columns = len(df.columns)
num_rows = df.count()
print(f"Dimensions of DataFrame: {num_rows} rows, {num_columns} columns")

Dimensions of DataFrame: 433323 rows, 139 columns


In [4]:
df.printSchema()
df.show(5)

root
 |-- _PSU: string (nullable = true)
 |-- SEXVAR: string (nullable = true)
 |-- GENHLTH: string (nullable = true)
 |-- PHYSHLTH: string (nullable = true)
 |-- MENTHLTH: string (nullable = true)
 |-- CHECKUP1: string (nullable = true)
 |-- EXRACT12: string (nullable = true)
 |-- EXEROFT: string (nullable = true)
 |-- EXERHMM: string (nullable = true)
 |-- BPHIGH6: string (nullable = true)
 |-- BPMEDS: string (nullable = true)
 |-- TOLDHI3: string (nullable = true)
 |-- CHOLMED3: string (nullable = true)
 |-- CVDINFR4: string (nullable = true)
 |-- CVDCRHD4: string (nullable = true)
 |-- CVDSTRK3: string (nullable = true)
 |-- ASTHMA3: string (nullable = true)
 |-- CHCSCNC1: string (nullable = true)
 |-- CHCOCNC1: string (nullable = true)
 |-- CHCCOPD3: string (nullable = true)
 |-- ADDEPEV3: string (nullable = true)
 |-- CHCKDNY2: string (nullable = true)
 |-- HAVARTH4: string (nullable = true)
 |-- DIABETE4: string (nullable = true)
 |-- MARITAL: string (nullable = true)
 |-- EDUCA

24/11/28 15:33:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------+------+-------+--------+--------+--------+--------+-------+-------+-------+------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+-----+-------+-------+--------+-------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+------+--------+-------+--------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------+--------+--------+--------+-------+--------+--------+-------+-------

In [5]:
from pyspark.sql.types import DoubleType

# Caste to double format for easier correlation calculations and prediction later on
for col in df.columns:
    df = df.withColumn(col, df[col].cast(DoubleType()))

In [6]:
df.printSchema()

# Show the first few rows of the DataFrame
df.show(5)

root
 |-- _PSU: double (nullable = true)
 |-- SEXVAR: double (nullable = true)
 |-- GENHLTH: double (nullable = true)
 |-- PHYSHLTH: double (nullable = true)
 |-- MENTHLTH: double (nullable = true)
 |-- CHECKUP1: double (nullable = true)
 |-- EXRACT12: double (nullable = true)
 |-- EXEROFT: double (nullable = true)
 |-- EXERHMM: double (nullable = true)
 |-- BPHIGH6: double (nullable = true)
 |-- BPMEDS: double (nullable = true)
 |-- TOLDHI3: double (nullable = true)
 |-- CHOLMED3: double (nullable = true)
 |-- CVDINFR4: double (nullable = true)
 |-- CVDCRHD4: double (nullable = true)
 |-- CVDSTRK3: double (nullable = true)
 |-- ASTHMA3: double (nullable = true)
 |-- CHCSCNC1: double (nullable = true)
 |-- CHCOCNC1: double (nullable = true)
 |-- CHCCOPD3: double (nullable = true)
 |-- ADDEPEV3: double (nullable = true)
 |-- CHCKDNY2: double (nullable = true)
 |-- HAVARTH4: double (nullable = true)
 |-- DIABETE4: double (nullable = true)
 |-- MARITAL: double (nullable = true)
 |-- EDUCA

Since there are many missing values in the dataset, this step aims to calculate the response rate of each feature. Thus, some features with relatively response rate can be retracted to conduct analysis later.

In [7]:
from pyspark.sql.functions import *

num_rows = df.count()

response_rate = df.select([count(when(col(c).isNotNull(),c)).alias(c) for c in df.columns])

# Divide the null count by total no. of rows for each feature
response_rate = response_rate.select([round(col(c) / num_rows * 100,2).alias(c) for c in response_rate.columns])

# Show the counts of non-null values for each feature
response_rate.show()


[Stage 9:>                                                          (0 + 1) / 1]

+-----+------+-------+--------+--------+--------+--------+-------+-------+-------+------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+-----+-------+-------+--------+-------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+------+--------+-------+--------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+---

                                                                                

In [11]:
from pyspark.sql.window import Window

# Convert to a format that can be sorted and add rank
windowSpec = Window.orderBy(desc("response_rate"))
top_response_rate = response_rate.select(
    explode(
        array(*[
            struct(lit(c).alias("feature"), col(c).alias("response_rate"))
            for c in response_rate.columns
        ])
    )
).select("col.*") \
 .orderBy("response_rate", ascending=False) \
 .withColumn("response_rate_rank", dense_rank().over(windowSpec))


In [12]:
# Show the percentage of valid values for each feature
top_response_rate.show(100)

24/11/28 15:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:38:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:38:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------+-------------+------------------+
| feature|response_rate|response_rate_rank|
+--------+-------------+------------------+
|    _PSU|        100.0|                 1|
|  SEXVAR|        100.0|                 1|
| GENHLTH|        100.0|                 1|
|PHYSHLTH|        100.0|                 1|
|MENTHLTH|        100.0|                 1|
|CHECKUP1|        100.0|                 1|
| BPHIGH6|        100.0|                 1|
|CVDINFR4|        100.0|                 1|
|CVDCRHD4|        100.0|                 1|
|CVDSTRK3|        100.0|                 1|
| ASTHMA3|        100.0|                 1|
|CHCSCNC1|        100.0|                 1|
|CHCOCNC1|        100.0|                 1|
|CHCCOPD3|        100.0|                 1|
|ADDEPEV3|        100.0|                 1|
|CHCKDNY2|        100.0|                 1|
|HAVARTH4|        100.0|                 1|
|DIABETE4|        100.0|                 1|
| MARITAL|        100.0|                 1|
|   EDUCA|        100.0|        

                                                                                

In [13]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F


def add_column_y_based_on_x(df: DataFrame, column_x: str) -> DataFrame:
    """
    Adds a new column 'y' to the DataFrame based on the values in column 'x'.

    Parameters:
    df (DataFrame): The input PySpark DataFrame.
    column_x (str): The name of the column to evaluate for conditions.

    Returns:
    DataFrame: A new DataFrame with the additional column 'y'.
    """
    # Create a new column 'y' based on conditions applied to column 'x'
    df = df.withColumn(
        "y",
        F.when(df[column_x].isin(2, 3, 4), 0)  # Assign 0 if x is 2, 3, or 4
        .when(df[column_x] == 1, 1)  # Assign 1 if x is 1
        .otherwise(None),  # Assign None for other values
    )

    return df


# Example usage
# Assuming 'df' is your original DataFrame and you want to evaluate column 'x'
df_with_flag = add_column_y_based_on_x(df, "DIABETE4")


In [14]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F


def calculate_correlations_with_y(df: DataFrame, target_column: str) -> DataFrame:
    """
    Calculate the correlation between each column in the DataFrame and a specified target column,
    handling null values by dropping rows with any nulls in the columns being compared.

    Parameters:
    df (DataFrame): The input PySpark DataFrame.
    target_column (str): The name of the target column to calculate correlations with.

    Returns:
    DataFrame: A new DataFrame containing pairs of columns and their correlation coefficients.
    """

    # Prepare a list to hold correlation results
    correlations = []

    # Calculate correlations between each column and the target column
    for col in df.columns:
        if col == target_column:
            continue  # Skip the target column itself
        correlation_value = df.stat.corr(col, target_column)
        correlations.append((col, target_column, correlation_value))

    # Create a DataFrame from the correlations list
    correlations_df = df.sparkSession.createDataFrame(
        [result for result in correlations if result[2] is not None],
        ["Column", "Target", "Correlation"],
    )

    return correlations_df


# Example usage
correlation_results = calculate_correlations_with_y(df_with_flag, "y")

# Show the correlation results
correlation_results.show()



                                                                                

+--------+------+--------------------+
|  Column|Target|         Correlation|
+--------+------+--------------------+
|    _PSU|     y|-0.03476310477591971|
|  SEXVAR|     y|-0.01835857765920...|
| GENHLTH|     y| 0.24776447914348637|
|PHYSHLTH|     y|-0.07914371693121511|
|MENTHLTH|     y| 0.02670181130559514|
|CHECKUP1|     y|-0.10494091045237935|
|EXRACT12|     y|-0.06052741697272239|
| EXEROFT|     y|-0.06447990615671116|
| EXERHMM|     y|-0.05052216441365793|
| BPHIGH6|     y| -0.2471159110963859|
|  BPMEDS|     y| 0.18774166723378258|
| TOLDHI3|     y|-0.03695206303912695|
|CHOLMED3|     y|-0.08009754666103465|
|CVDINFR4|     y|-0.05543264225305641|
|CVDCRHD4|     y|-0.02345067645981...|
|CVDSTRK3|     y|-0.05334410396006527|
| ASTHMA3|     y|-0.03436481717255345|
|CHCSCNC1|     y|-0.00490675275942705|
|CHCOCNC1|     y|-0.03760772282256236|
|CHCCOPD3|     y|-0.05106953089874152|
+--------+------+--------------------+
only showing top 20 rows



                                                                                

In [15]:
windowSpec = Window.orderBy(desc("AbsCorrelation"))

top_correlations_df = (
    correlation_results.withColumn(
        "AbsCorrelation", F.abs(correlation_results["Correlation"])
    )
    .na.drop()
    .orderBy(F.desc("AbsCorrelation"))
    .withColumn("Correlation_Rank", dense_rank().over(windowSpec))
    .drop("AbsCorrelation")

)

top_correlations_df.show(100)


+--------+------+--------------------+----------------+
|  Column|Target|         Correlation|Correlation_Rank|
+--------+------+--------------------+----------------+
|DIABETE4|     y| -0.9264783789919822|               1|
|FEETSORE|     y|  0.5920567148889624|               2|
|INSULIN1|     y|  0.5710825026238983|               3|
|DIABEDU1|     y|  0.5592829728928967|               4|
|DIABTYPE|     y|   0.509762705240806|               5|
|EYEEXAM1|     y|   0.506132843727578|               6|
|DIABEYE1|     y| 0.49393632918931357|               7|
|PREDIAB2|     y| -0.2960771669241459|               8|
|CHKHEMO3|     y| 0.25233950884563205|               9|
| GENHLTH|     y| 0.24776447914348637|              10|
| BPHIGH6|     y| -0.2471159110963859|              11|
|_AGEG5YR|     y| 0.20367327606214386|              12|
|PDIABTS1|     y| -0.1947705289931606|              13|
|  BPMEDS|     y| 0.18774166723378258|              14|
|_RFHYPE6|     y| 0.18399994785910842|          

24/11/28 15:40:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [17]:
combined_df = top_correlations_df.select(col("Column").alias("Feature"), col("Correlation"), col("Correlation_Rank"))
combined_df = combined_df.join(top_response_rate, combined_df.Feature == top_response_rate.feature, "inner")

combined_df.show(100)

24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:45:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 1

+--------+--------------------+----------------+--------+-------------+------------------+
| Feature|         Correlation|Correlation_Rank| feature|response_rate|response_rate_rank|
+--------+--------------------+----------------+--------+-------------+------------------+
|DIABETE4| -0.9264783789919822|               1|DIABETE4|        100.0|                 1|
|FEETSORE|  0.5920567148889624|               2|FEETSORE|         5.59|                59|
|INSULIN1|  0.5710825026238983|               3|INSULIN1|          5.6|                58|
|DIABEDU1|  0.5592829728928967|               4|DIABEDU1|         5.59|                59|
|DIABTYPE|   0.509762705240806|               5|DIABTYPE|          5.6|                58|
|EYEEXAM1|   0.506132843727578|               6|EYEEXAM1|          5.6|                58|
|DIABEYE1| 0.49393632918931357|               7|DIABEYE1|         5.59|                59|
|PREDIAB2| -0.2960771669241459|               8|PREDIAB2|        38.34|                45|

24/11/28 15:47:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:47:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [18]:
analysis_df = combined_df.withColumn("Sum_of_Rank", col("Correlation_Rank")+col("response_rate_rank")).orderBy("Sum_of_Rank")
analysis_df.show(30)

24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:53:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 1

+--------+--------------------+----------------+--------+-------------+------------------+-----------+
| Feature|         Correlation|Correlation_Rank| feature|response_rate|response_rate_rank|Sum_of_Rank|
+--------+--------------------+----------------+--------+-------------+------------------+-----------+
|DIABETE4| -0.9264783789919822|               1|DIABETE4|        100.0|                 1|          2|
| GENHLTH| 0.24776447914348637|              10| GENHLTH|        100.0|                 1|         11|
| BPHIGH6| -0.2471159110963859|              11| BPHIGH6|        100.0|                 1|         12|
|_AGEG5YR| 0.20367327606214386|              12|_AGEG5YR|        100.0|                 1|         13|
|_RFHYPE6| 0.18399994785910842|              15|_RFHYPE6|        100.0|                 1|         16|
| _RFHLTH| 0.15283068991106716|              19| _RFHLTH|        100.0|                 1|         20|
| EMPLOY1|  0.1783402473732575|              16| EMPLOY1|        99.32|  

After suming the rank the response rate and correlation of all features, we extracted the top most features that may be related to diabetes by looking up the questions in the codebook as well.

Based on the analysis on the dataframe and research from the paper, below features are extrcated for conducting further machine learning prediction of diabetes.
- GENHLTH (general health)
- _AGEG5YR (age cat)
- _RFHYPE6 (high BP)
- EMPLOY1 (employment status)
- _MICHD (heart disease)
- _DRDXAR2 (arithris)
- _HCVU653 (has/has not insurance)
- _RFCHOL3 (high cholesterol)
- METVL12_ (activety met value)
- ALCDAY4 (alocol consumption last 30 days)
- _BMI5CAT (BMI category)
- DIFFWALK (difficulty walking)
- _TOTINDA (physical activity)
- EDUCA (education)
- _INCOMG1 (income)
- CHCKDNY2 (kidney diease)
- SMOKE100 (smoking)
- CVDINFR4(heart attack)

In [19]:
df_with_flag.show(2)
columns = [
    "GENHLTH",
    "_AGEG5YR", #9 is valid value
    "_RFHYPE6",
    "EMPLOY1",
    "_MICHD",
    "_DRDXAR2",
    "_HCVU653",
    "_RFCHOL3",
    "METVL12_", #9 is valid value
    "ALCDAY4",
    "_BMI5CAT",
    "DIFFWALK",
    "_TOTINDA",
    "EDUCA",
    "_INCOMG1",
    "CHCKDNY2",
    "FALL12MN", ## 9 is valid value but 99 is not
    "SMOKE100",
    "CVDINFR4",
    "y"
]
new_df = df_with_flag.select(*columns)

# Replacing some useless value to Null
for c in columns:
    if c == "_AGEG5YR" or c == "METVL12_":
        continue
    elif c == "FALL12MN":
        new_df = new_df.replace({99: None}, subset=[c])
    else:
        new_df = new_df.replace({9: None, 99: None, 999: None}, subset=[c])

# Show the updated DataFrame
new_df.show(2)


+-------+------+-------+--------+--------+--------+--------+-------+-------+-------+------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+-----+-------+-------+--------+-------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+--------+-------+--------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+

In [20]:
num_columns = len(new_df.columns)
num_rows = new_df.count()
print(f"Dimensions of DataFrame: {num_rows} rows, {num_columns} columns")

Dimensions of DataFrame: 433323 rows, 20 columns


In [21]:
df_cleaned = new_df.na.drop()
df_cleaned.show(5)

+-------+--------+--------+-------+------+--------+--------+--------+--------+-------+--------+--------+--------+-----+--------+--------+--------+--------+--------+---+
|GENHLTH|_AGEG5YR|_RFHYPE6|EMPLOY1|_MICHD|_DRDXAR2|_HCVU653|_RFCHOL3|METVL12_|ALCDAY4|_BMI5CAT|DIFFWALK|_TOTINDA|EDUCA|_INCOMG1|CHCKDNY2|FALL12MN|SMOKE100|CVDINFR4|  y|
+-------+--------+--------+-------+------+--------+--------+--------+--------+-------+--------+--------+--------+-----+--------+--------+--------+--------+--------+---+
|    2.0|     9.0|     1.0|    2.0|   2.0|     2.0|     1.0|     2.0|   106.0|  220.0|     2.0|     2.0|     1.0|  6.0|     5.0|     2.0|     2.0|     2.0|     2.0|  0|
|    3.0|     8.0|     1.0|    1.0|   2.0|     2.0|     1.0|     1.0|   104.0|  210.0|     4.0|     2.0|     1.0|  6.0|     5.0|     2.0|    88.0|     2.0|     2.0|  0|
|    3.0|     7.0|     2.0|    1.0|   2.0|     2.0|     1.0|     2.0|   103.0|  888.0|     4.0|     1.0|     1.0|  4.0|     2.0|     2.0|    88.0|     1.0|

In [22]:
num_columns = len(df_cleaned.columns)
num_rows = df_cleaned.count()
print(f"Dimensions of DataFrame: {num_rows} rows, {num_columns} columns")

[Stage 463:>                                                        (0 + 1) / 1]

Dimensions of DataFrame: 95574 rows, 20 columns


                                                                                

In [24]:
df_cleaned.write.parquet("output/processedv3.parquet")

                                                                                