# Infor722 Iteration 4 

In [27]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, avg, monotonically_increasing_id

In [28]:
spark = SparkSession.builder.master("local[1]").appName("SparkApp")\
    .config("spark.executor.memory", "4g")\
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

## 2. Data Understand

### 2.1. Load Dataset

In [29]:
df_main = spark.read.csv('Datasets/diabetes_012_health_indicators_BRFSS2021.csv', header=True)
df_income = spark.read.csv('Datasets/diabetes_Income.csv', header=True)

### 2.3. Explore Data

In [30]:
df_desc = df_main.describe()
df_desc.show()

[Stage 71:>                                                         (0 + 1) / 1]

+-------+-------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+-----------------+------------------+------+-----------------+------------------+
|summary|       Diabetes_012|            HighBP|           HighChol|          CholCheck|               BMI|             Smoker|             Stroke|HeartDiseaseorAttack|      PhysActivity|             Fruits|            Veggies|  HvyAlcoholConsump|      AnyHealthcare|        NoDocbcCost|           GenHlth|         MentHlth|         PhysHlth|          DiffWalk|   Sex|              Age|         Education|
+-------+-------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+------------------+-

                                                                                

In [31]:
df_main.printSchema()

root
 |-- Diabetes_012: string (nullable = true)
 |-- HighBP: string (nullable = true)
 |-- HighChol: string (nullable = true)
 |-- CholCheck: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- Smoker: string (nullable = true)
 |-- Stroke: string (nullable = true)
 |-- HeartDiseaseorAttack: string (nullable = true)
 |-- PhysActivity: string (nullable = true)
 |-- Fruits: string (nullable = true)
 |-- Veggies: string (nullable = true)
 |-- HvyAlcoholConsump: string (nullable = true)
 |-- AnyHealthcare: string (nullable = true)
 |-- NoDocbcCost: string (nullable = true)
 |-- GenHlth: string (nullable = true)
 |-- MentHlth: string (nullable = true)
 |-- PhysHlth: string (nullable = true)
 |-- DiffWalk: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Education: string (nullable = true)



In [32]:
diabetes_counts = df_main.groupBy("Diabetes_012").count()
diabetes_counts.show()

+------------+------+
|Diabetes_012| count|
+------------+------+
|           0|197191|
|           1|  5619|
|           2| 33568|
+------------+------+



[Stage 74:>                                                         (0 + 1) / 1]                                                                                

### 2.4. Verify the data quality

In [33]:
df_main.printSchema()

root
 |-- Diabetes_012: string (nullable = true)
 |-- HighBP: string (nullable = true)
 |-- HighChol: string (nullable = true)
 |-- CholCheck: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- Smoker: string (nullable = true)
 |-- Stroke: string (nullable = true)
 |-- HeartDiseaseorAttack: string (nullable = true)
 |-- PhysActivity: string (nullable = true)
 |-- Fruits: string (nullable = true)
 |-- Veggies: string (nullable = true)
 |-- HvyAlcoholConsump: string (nullable = true)
 |-- AnyHealthcare: string (nullable = true)
 |-- NoDocbcCost: string (nullable = true)
 |-- GenHlth: string (nullable = true)
 |-- MentHlth: string (nullable = true)
 |-- PhysHlth: string (nullable = true)
 |-- DiffWalk: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Education: string (nullable = true)



In [34]:
# Count the number of missing values per column
missing_values = df_main.select([count(when(col(c).isNull(), c)).alias(c) for c in df_main.columns])
missing_values.show()

[Stage 77:>                                                         (0 + 1) / 1]

+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+
|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex|Age|Education|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+
|           0|     0|       1|        1| 14|     9|     7|                  10|          12|     9|      7|                9|            7|          4|      5|       3|       6|       4|  0|  1|        2|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---

                                                                                

In [35]:
# Percentage of missing values in the dataset
total_rows = df_main.count()
missing_percentage = df_main.select([(count(when(col(c).isNull(), c)) / total_rows * 100).alias(c) for c in df_main.columns])
missing_percentage.show()

[Stage 83:>                                                         (0 + 1) / 1]

+------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+
|Diabetes_012|HighBP|            HighChol|           CholCheck|                 BMI|              Smoker|              Stroke|HeartDiseaseorAttack|        PhysActivity|              Fruits|             Veggies|   HvyAlcoholConsump|       AnyHealthcare|         NoDocbcCost|             GenHlth|            MentHlth|            PhysHlth|            DiffWalk|Sex|                 Age|           Education|
+------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---

                                                                                

In [36]:
# Value counts for the 'Sex' column
sex_distribution = df_main.groupBy("Sex").count()
sex_distribution.show()

+------+------+
|   Sex| count|
+------+------+
|     F|     3|
|Female|123426|
|female|     2|
|     M|     3|
|  feee|     1|
|  Male|112943|
+------+------+



## 3. Data preparation

### 3.1. Selecte data

In [37]:
main_df = df_main.drop('Education', 'MentHlth')

# Show the DataFrame to confirm columns are dropped
main_df.show()

# Display the first few rows of the DataFrame
main_df.show(5)

+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+------+---+
|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|   Sex|Age|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+------+---+
|           0|     0|       1|        1| 15|     1|     0|                   0|           0|     1|      1|                0|            1|          0|      5|      20|       0|Female| 11|
|           2|     1|       0|        1| 28|     0|     0|                   1|           0|     1|      0|                0|            1|          0|      2|       0|       0|Female| 11|
|           2|     1|       1|        1| 33|     0|    

In [38]:
num_columns = len(main_df.columns)
print(f"The number of columns in the DataFrame is: {num_columns}")

The number of columns in the DataFrame is: 19


### 3.2. Clean Data

In [39]:
# Clean 'main_df' DataFrame
main_df = main_df.withColumn("Age", when((col("Age") < 1) | (col("Age") > 13), None).otherwise(col("Age")))
main_df = main_df.withColumn("Diabetes_012", when(~col("Diabetes_012").isin([0, 1, 2]), None).otherwise(col("Diabetes_012")))
main_df = main_df.withColumn("HighBP", when(~col("HighBP").isin([0, 1]), None).otherwise(col("HighBP")))
main_df = main_df.withColumn("HighChol", when(~col("HighChol").isin([0, 1]), None).otherwise(col("HighChol")))
main_df = main_df.withColumn("CholCheck", when(~col("CholCheck").isin([0, 1]), None).otherwise(col("CholCheck")))
main_df = main_df.withColumn("BMI", when((col("BMI") < 12) | (col("BMI") > 99), None).otherwise(col("BMI")))
main_df = main_df.withColumn("Smoker", when(~col("Smoker").isin([0, 1]), None).otherwise(col("Smoker")))
main_df = main_df.withColumn("Stroke", when(~col("Stroke").isin([0, 1]), None).otherwise(col("Stroke")))
main_df = main_df.withColumn("HeartDiseaseorAttack", when(~col("HeartDiseaseorAttack").isin([0, 1]), None).otherwise(col("HeartDiseaseorAttack")))
main_df = main_df.withColumn("PhysActivity", when(~col("PhysActivity").isin([0, 1]), None).otherwise(col("PhysActivity")))
main_df = main_df.withColumn("Fruits", when(~col("Fruits").isin([0, 1]), None).otherwise(col("Fruits")))
main_df = main_df.withColumn("Veggies", when(~col("Veggies").isin([0, 1]), None).otherwise(col("Veggies")))
main_df = main_df.withColumn("HvyAlcoholConsump", when(~col("HvyAlcoholConsump").isin([0, 1]), None).otherwise(col("HvyAlcoholConsump")))
main_df = main_df.withColumn("AnyHealthcare", when(~col("AnyHealthcare").isin([0, 1]), None).otherwise(col("AnyHealthcare")))
main_df = main_df.withColumn("NoDocbcCost", when(~col("NoDocbcCost").isin([0, 1]), None).otherwise(col("NoDocbcCost")))
main_df = main_df.withColumn("GenHlth", when(~col("GenHlth").isin([1, 2, 3, 4, 5]), None).otherwise(col("GenHlth")))
main_df = main_df.withColumn("PhysHlth", when((col("PhysHlth") < 0) | (col("PhysHlth") > 30), None).otherwise(col("PhysHlth")))
main_df = main_df.withColumn("DiffWalk", when(~col("DiffWalk").isin([0, 1]), None).otherwise(col("DiffWalk")))

# Clean 'second_df' DataFrame
second_df = df_income.withColumn("Income", when((col("Income") < 1) | (col("Income") > 11), None).otherwise(col("Income")))

In [40]:
columns_of_interest = [
    'Age', 'Diabetes_012', 'HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker',
    'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits', 'Veggies',
    'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost', 'GenHlth', 'PhysHlth',
    'DiffWalk'
]

# Calculate missing values for each field in the main DataFrame
missing_values = main_df.select([count(when(col(c).isNull(), c)).alias(c) for c in columns_of_interest])
missing_values.show()

[Stage 91:>                                                         (0 + 1) / 1]

+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+
|Age|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|
+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+
|  2|           0|     0|       1|        1| 29|     9|     7|                  10|          12|     9|      7|                9|            7|          4|      5|       9|       4|
+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+



                                                                                

In [41]:
# Calculate missing values for the 'Income' field in the second DataFrame
missing_values_second = second_df.select([count(when(col('Income').isNull(), 'Income')).alias('Income')])
missing_values_second.show()

+------+
|Income|
+------+
|     0|
+------+



In [42]:
binary_columns = [
    'HighBP', 'HighChol', 'CholCheck', 'Smoker', 'Stroke', 'HeartDiseaseorAttack',
    'PhysActivity', 'Fruits', 'Veggies', 'HvyAlcoholConsump', 'AnyHealthcare',
    'NoDocbcCost', 'DiffWalk'
]

# Replace NaN in binary columns with 1
for col_name in binary_columns:
    main_df = main_df.na.fill({col_name: 1})

In [43]:
# Define non-binary columns
non_binary_columns = [
    'Age', 'Diabetes_012', 'BMI', 'GenHlth', 'PhysHlth'
]

# Replace NaN in non-binary columns with the column's mean
for col_name in non_binary_columns:
    mean_value = main_df.select(avg(col_name)).first()[0]
    main_df = main_df.na.fill({col_name: mean_value})

                                                                                

In [44]:
columns_of_interest = [
    'Age', 'Diabetes_012', 'HighBP', 'HighChol', 'CholCheck', 'BMI', 'Smoker',
    'Stroke', 'HeartDiseaseorAttack', 'PhysActivity', 'Fruits', 'Veggies',
    'HvyAlcoholConsump', 'AnyHealthcare', 'NoDocbcCost', 'GenHlth', 'PhysHlth',
    'DiffWalk'
]

# Calculate missing values for each field in the main DataFrame
missing_values = main_df.select([count(when(col(c).isNull(), c)).alias(c) for c in columns_of_interest])
missing_values.show()

+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+
|Age|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|
+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+
|  0|           0|     0|       0|        0|  0|     0|     0|                   0|           0|     0|      0|                0|            0|          0|      0|       0|       0|
+---+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+



In [45]:
main_df.select("Sex").distinct().show()

+------+
|   Sex|
+------+
|     F|
|Female|
|female|
|     M|
|  feee|
|  Male|
+------+



[Stage 115:>                                                        (0 + 1) / 1]                                                                                

In [46]:
sex_mapping = {
    'female': 0, 'F': 0, 'M': 1, 'feee': 0, 'Female': 0, 'Male': 1
}

# Applying the mapping using when().otherwise() for each condition
# Note: PySpark does not have direct replacement using dictionary, thus using multiple when().otherwise()
main_df = main_df.withColumn("Sex", 
    when(col("Sex") == "female", 0)
    .when(col("Sex") == "F", 0)
    .when(col("Sex") == "M", 1)
    .when(col("Sex") == "feee", 0)
    .when(col("Sex") == "Female", 0)
    .when(col("Sex") == "Male", 1)
    .otherwise(col("Sex")))

# Display the unique values in the 'Sex' column to confirm changes
main_df.select("Sex").distinct().show()

+---+
|Sex|
+---+
|  0|
|  1|
+---+



In [47]:
# Count the values in the 'Sex' column
main_df.groupBy("Sex").count().show()

[Stage 121:>                                                        (0 + 1) / 1]

+---+------+
|Sex| count|
+---+------+
|  0|123432|
|  1|112946|
+---+------+



                                                                                

### 3.3. Construct the data

In [48]:
# Define bins and corresponding labels
bins = [-1, 0, 10, 20, float('inf')]
labels = [0, 1, 2, 3]

# Creating a new column 'physical_health' based on bins
main_df = main_df.withColumn("physical_health", 
    when((col("PhysHlth") > bins[0]) & (col("PhysHlth") <= bins[1]), labels[0])
    .when((col("PhysHlth") > bins[1]) & (col("PhysHlth") <= bins[2]), labels[1])
    .when((col("PhysHlth") > bins[2]) & (col("PhysHlth") <= bins[3]), labels[2])
    .when(col("PhysHlth") > bins[3], labels[3])
    .otherwise(None))

# Count the values in the 'physical_health' column
physical_health_distribution = main_df.groupBy("physical_health").count()
physical_health_distribution.show()

[Stage 124:>                                                        (0 + 1) / 1]

+---------------+------+
|physical_health| count|
+---------------+------+
|              1| 51500|
|              3| 18113|
|              2|  9987|
|              0|156778|
+---------------+------+



                                                                                

In [49]:
# Create new column 'Cardiovascular Disease' based on conditions
main_df = main_df.withColumn("Cardiovascular Disease", 
    when((col("HeartDiseaseorAttack") == 1) | (col("Stroke") == 1), 1).otherwise(0))

# Create new column 'Diseases of Metabolic Syndrome' based on conditions
main_df = main_df.withColumn("Diseases of Metabolic Syndrome", 
    when((col("HighBP") == 1) | (col("HighChol") == 1), 1).otherwise(0))

# Show the updated DataFrame to see the changes (optional)
main_df.show()

+------------+------+--------+---------+-----------------+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+
|Diabetes_012|HighBP|HighChol|CholCheck|              BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|Sex|Age|physical_health|Cardiovascular Disease|Diseases of Metabolic Syndrome|
+------------+------+--------+---------+-----------------+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+
|           0|     0|       1|        1|               15|     1|     0|                   0|           0|     1|      1|                0|            1|          0|      5|      20|       0

In [50]:
# Count the values in the 'Cardiovascular Disease' column
cardiovascular_disease_counts = main_df.groupBy("Cardiovascular Disease").count()
cardiovascular_disease_counts.show()

# Count the values in the 'Diseases of Metabolic Syndrome' column
metabolic_syndrome_counts = main_df.groupBy("Diseases of Metabolic Syndrome").count()
metabolic_syndrome_counts.show()

[Stage 128:>                                                        (0 + 1) / 1]                                                                                

+----------------------+------+
|Cardiovascular Disease| count|
+----------------------+------+
|                     1| 26589|
|                     0|209789|
+----------------------+------+

+------------------------------+------+
|Diseases of Metabolic Syndrome| count|
+------------------------------+------+
|                             1|138001|
|                             0| 98377|
+------------------------------+------+



### 3.4. Integrate various data sources

In [53]:
main_df = main_df.withColumn("index", monotonically_increasing_id())
second_df = second_df.withColumn("index", monotonically_increasing_id())

# Perform the inner join on the index column
final_df = main_df.join(second_df, main_df.index == second_df.index, how='inner')
final_df.show()

+------------+------+--------+---------+-----------------+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+-----+------+-----+
|Diabetes_012|HighBP|HighChol|CholCheck|              BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|Sex|Age|physical_health|Cardiovascular Disease|Diseases of Metabolic Syndrome|index|Income|index|
+------------+------+--------+---------+-----------------+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+-----+------+-----+
|           0|     0|       1|        1|               15|     1|     0|                   0|           0|     1|      1|            

In [55]:
num_columns = len(final_df.columns)
print(f"The number of columns in final_df: {num_columns}")
num_rows = final_df.count()
print(f"The number of rows in final_df: {num_rows}")

The number of columns in final_df: 25
The number of rows in final_df: 236378


In [56]:
final_df = final_df.drop('index')
num_columns = len(final_df.columns)
print(f"The number of columns in final_df: {num_columns}")

The number of columns in final_df: 23


### 3.5. Format the data as required

In [57]:
# Calculate the number of missing values per column
final_missing_value = final_df.select([count(when(col(c).isNull(), c)).alias(c) for c in final_df.columns])
final_missing_value.show()

[Stage 149:>                                                        (0 + 1) / 1]

+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+------+
|Diabetes_012|HighBP|HighChol|CholCheck|BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|PhysHlth|DiffWalk|Sex|Age|physical_health|Cardiovascular Disease|Diseases of Metabolic Syndrome|Income|
+------------+------+--------+---------+---+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+---+---+---------------+----------------------+------------------------------+------+
|           0|     0|       0|        0|  0|     0|     0|                   0|           0|     0|      0|                0|            0|          0|      0|       0|       0|  0|  0|              0|          

                                                                                

## 4. 