In [15]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("DataCleaning") \
    .getOrCreate()

# Load the dataset
file_path = r'D:\PROJECT\Indicators of Heart Disease\cleaned\heart_2022_no_nans.csv'

try:
    # Read the CSV file into a DataFrame
    data = spark.read.csv(file_path, header=True, inferSchema=True)
    # Show the first few rows to verify the load
    #data.show()
except Exception as e:
    print(f"An error occurred: {e}")


In [16]:
# Number of rows in DataFrame
data.count()

246022

In [17]:
# Columns and their summary
data.describe()

DataFrame[summary: string, State: string, Sex: string, GeneralHealth: string, PhysicalHealthDays: string, MentalHealthDays: string, LastCheckupTime: string, PhysicalActivities: string, SleepHours: string, RemovedTeeth: string, HadHeartAttack: string, HadAngina: string, HadStroke: string, HadAsthma: string, HadSkinCancer: string, HadCOPD: string, HadDepressiveDisorder: string, HadKidneyDisease: string, HadArthritis: string, HadDiabetes: string, DeafOrHardOfHearing: string, BlindOrVisionDifficulty: string, DifficultyConcentrating: string, DifficultyWalking: string, DifficultyDressingBathing: string, DifficultyErrands: string, SmokerStatus: string, ECigaretteUsage: string, ChestScan: string, RaceEthnicityCategory: string, AgeCategory: string, HeightInMeters: string, WeightInKilograms: string, BMI: string, AlcoholDrinkers: string, HIVTesting: string, FluVaxLast12: string, PneumoVaxEver: string, TetanusLast10Tdap: string, HighRiskLastYear: string, CovidPos: string]

In [18]:
# Listing columns and their data types
data.dtypes

[('State', 'string'),
 ('Sex', 'string'),
 ('GeneralHealth', 'string'),
 ('PhysicalHealthDays', 'double'),
 ('MentalHealthDays', 'double'),
 ('LastCheckupTime', 'string'),
 ('PhysicalActivities', 'string'),
 ('SleepHours', 'double'),
 ('RemovedTeeth', 'string'),
 ('HadHeartAttack', 'string'),
 ('HadAngina', 'string'),
 ('HadStroke', 'string'),
 ('HadAsthma', 'string'),
 ('HadSkinCancer', 'string'),
 ('HadCOPD', 'string'),
 ('HadDepressiveDisorder', 'string'),
 ('HadKidneyDisease', 'string'),
 ('HadArthritis', 'string'),
 ('HadDiabetes', 'string'),
 ('DeafOrHardOfHearing', 'string'),
 ('BlindOrVisionDifficulty', 'string'),
 ('DifficultyConcentrating', 'string'),
 ('DifficultyWalking', 'string'),
 ('DifficultyDressingBathing', 'string'),
 ('DifficultyErrands', 'string'),
 ('SmokerStatus', 'string'),
 ('ECigaretteUsage', 'string'),
 ('ChestScan', 'string'),
 ('RaceEthnicityCategory', 'string'),
 ('AgeCategory', 'string'),
 ('HeightInMeters', 'double'),
 ('WeightInKilograms', 'double'),
 (

In [19]:
#Dropping unnecessary columns
data = data.drop("PhysicalHealthDays","MentalHealthDays","RemovedTeeth","ECigaretteUsage","RaceEthnicityCategory","TetanusLast10Tdap","HighRiskLastYear","CovidPos")

In [20]:
# Printing number of columns and rows
print(f"No of columns: {len(data.dtypes)}")
print(f"No of Rows: {data.count()}")

No of columns: 32
No of Rows: 246022


In [21]:
# Listing distinct values and their count in "HadDiabetes" column
data.groupBy("HadDiabetes").count().collect()

[Row(HadDiabetes='No, pre-diabetes or borderline diabetes', count=5392),
 Row(HadDiabetes='No', count=204834),
 Row(HadDiabetes='Yes', count=33813),
 Row(HadDiabetes='Yes, but only during pregnancy (female)', count=1983)]

In [22]:
# Create a filtered df to filter out unnecessary rows 
filtered_df = data.filter((data.HadDiabetes == "Yes, but only during pregnancy (female)") |
            (data.HadDiabetes == "No, pre-diabetes or borderline diabetes")  )
# Count number of rows to be filtered out
filtered_df.count()

7375

In [23]:
# Remove rows in 'data' that are also present in 'filtered_df'
data = data.exceptAll(filtered_df)

In [24]:
# Counting number of rows after 
data.count()

238647

In [25]:
# Print Schema of data frame
data.printSchema()

root
 |-- State: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- LastCheckupTime: string (nullable = true)
 |-- PhysicalActivities: string (nullable = true)
 |-- SleepHours: double (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- HadAngina: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadSkinCancer: string (nullable = true)
 |-- HadCOPD: string (nullable = true)
 |-- HadDepressiveDisorder: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- HadArthritis: string (nullable = true)
 |-- HadDiabetes: string (nullable = true)
 |-- DeafOrHardOfHearing: string (nullable = true)
 |-- BlindOrVisionDifficulty: string (nullable = true)
 |-- DifficultyConcentrating: string (nullable = true)
 |-- DifficultyWalking: string (nullable = true)
 |-- DifficultyDressingBathing: string (nullable = true)
 |-- DifficultyErrands: string 

In [27]:
from pyspark.sql import functions as F

# Replace the Height column with its values converted to centimeters 
data = data.withColumn("HeightInCentimeters", (F.col("HeightInMeters") * 100))

# Dropping "HeightInMeters" column
data = data.drop("HeightInMeters")
# Show the updated DataFrame to verify the replacement
data.select("HeightInCentimeters").show(5)

+-------------------+
|HeightInCentimeters|
+-------------------+
|              173.0|
|              185.0|
|              188.0|
|              173.0|
|              185.0|
+-------------------+
only showing top 5 rows



In [28]:
# Create list of string columns
categorical_cols = [col for col,dtyp in data.dtypes if dtyp == "string"]

columns_to_encode = []

# Print columns to Encode
for col in categorical_cols:
    if (data.select(col).distinct().count() == 2):
        columns_to_encode.append(col)
columns_to_encode


['Sex',
 'PhysicalActivities',
 'HadHeartAttack',
 'HadAngina',
 'HadStroke',
 'HadAsthma',
 'HadSkinCancer',
 'HadCOPD',
 'HadDepressiveDisorder',
 'HadKidneyDisease',
 'HadArthritis',
 'HadDiabetes',
 'DeafOrHardOfHearing',
 'BlindOrVisionDifficulty',
 'DifficultyConcentrating',
 'DifficultyWalking',
 'DifficultyDressingBathing',
 'DifficultyErrands',
 'ChestScan',
 'AlcoholDrinkers',
 'HIVTesting',
 'FluVaxLast12',
 'PneumoVaxEver']

In [29]:
# Print Number of string columns to encode
len(columns_to_encode)

23

In [30]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

# Create data frame by selecting the columns to encode
data_indexed = data.select(*data.columns)

# Iterate over each column specified for encoding
for column in columns_to_encode:
    indexed_col = column+"Indexed" # Define the name for the new indexed column

    # Initialize StringIndexer to convert string values to numerical indices
    indexer = StringIndexer(
        inputCol=column,  # Column with string values to index
        outputCol=indexed_col,  # Column to store the indexed values
        stringOrderType="alphabetAsc"  # Indexes in alphabetical order
    )

    # Fit the indexer on the data and transform it, adding the new indexed column
    data_indexed = indexer.fit(data_indexed).transform(data_indexed)

    # Cast the new indexed column to integer type
    data_indexed = data_indexed.withColumn(
        indexed_col, 
        col(indexed_col).cast("integer")
    )

    # Display the original and indexed columns for verification
    data_indexed.select([column, indexed_col]).show(5, truncate=False)

+------+----------+
|Sex   |SexIndexed|
+------+----------+
|Male  |1         |
|Male  |1         |
|Male  |1         |
|Female|0         |
|Male  |1         |
+------+----------+
only showing top 5 rows

+------------------+-------------------------+
|PhysicalActivities|PhysicalActivitiesIndexed|
+------------------+-------------------------+
|Yes               |1                        |
|Yes               |1                        |
|Yes               |1                        |
|No                |0                        |
|Yes               |1                        |
+------------------+-------------------------+
only showing top 5 rows

+--------------+---------------------+
|HadHeartAttack|HadHeartAttackIndexed|
+--------------+---------------------+
|No            |0                    |
|No            |0                    |
|No            |0                    |
|No            |0                    |
|No            |0                    |
+--------------+-------------------

+-------------+--------------------+
|PneumoVaxEver|PneumoVaxEverIndexed|
+-------------+--------------------+
|Yes          |1                   |
|Yes          |1                   |
|Yes          |1                   |
|No           |0                   |
|No           |0                   |
+-------------+--------------------+
only showing top 5 rows



In [31]:
# Printing Schema of data_indexed data frame
data_indexed.printSchema()

root
 |-- State: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- LastCheckupTime: string (nullable = true)
 |-- PhysicalActivities: string (nullable = true)
 |-- SleepHours: double (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- HadAngina: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadSkinCancer: string (nullable = true)
 |-- HadCOPD: string (nullable = true)
 |-- HadDepressiveDisorder: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- HadArthritis: string (nullable = true)
 |-- HadDiabetes: string (nullable = true)
 |-- DeafOrHardOfHearing: string (nullable = true)
 |-- BlindOrVisionDifficulty: string (nullable = true)
 |-- DifficultyConcentrating: string (nullable = true)
 |-- DifficultyWalking: string (nullable = true)
 |-- DifficultyDressingBathing: string (nullable = true)
 |-- DifficultyErrands: string 

In [32]:
#Frequency Encoding for State Column

from pyspark.sql.functions import col, count

# Calculate the frequency of each state
state_freq = data_indexed.groupBy("State").agg(count("State").alias("StateFrequency"))

# Join the frequency counts with the original DataFrame
data_encoded_state = data_indexed.join(state_freq, on="State", how="left")

# Display the result
data_encoded_state.select(["State", "StateFrequency"]).show(5, truncate=False)



+-------+--------------+
|State  |StateFrequency|
+-------+--------------+
|Alabama|1875          |
|Alabama|1875          |
|Alabama|1875          |
|Alabama|1875          |
|Alabama|1875          |
+-------+--------------+
only showing top 5 rows



In [33]:
# Printing Schema of data_encoded_state data frame
data_encoded_state.printSchema()

root
 |-- State: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- LastCheckupTime: string (nullable = true)
 |-- PhysicalActivities: string (nullable = true)
 |-- SleepHours: double (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- HadAngina: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadSkinCancer: string (nullable = true)
 |-- HadCOPD: string (nullable = true)
 |-- HadDepressiveDisorder: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- HadArthritis: string (nullable = true)
 |-- HadDiabetes: string (nullable = true)
 |-- DeafOrHardOfHearing: string (nullable = true)
 |-- BlindOrVisionDifficulty: string (nullable = true)
 |-- DifficultyConcentrating: string (nullable = true)
 |-- DifficultyWalking: string (nullable = true)
 |-- DifficultyDressingBathing: string (nullable = true)
 |-- DifficultyErrands: string 

In [36]:
#Ordinal Encoding for GeneralHealth

from pyspark.sql.functions import trim, when, col

# Trim leading and trailing whitespace from GeneralHealth
data_encoded_state = data_encoded_state.withColumn("GeneralHealth", trim(col("GeneralHealth")))

# Apply ordinal encoding with updated conditions
data_encoded_general_health = data_encoded_state.withColumn(
    "GeneralHealthIndex",
    when(col("GeneralHealth") == "Excellent", 5)
    .when(col("GeneralHealth") == "Very good", 4)  # Corrected value
    .when(col("GeneralHealth") == "Good", 3)
    .when(col("GeneralHealth") == "Fair", 2)
    .when(col("GeneralHealth") == "Poor", 1)
    .otherwise(None)
)

# Show the results to verify
data_encoded_general_health.select(["GeneralHealth", "GeneralHealthIndex"]).show(5, truncate=False)


+-------------+------------------+
|GeneralHealth|GeneralHealthIndex|
+-------------+------------------+
|Very good    |4                 |
|Fair         |2                 |
|Fair         |2                 |
|Excellent    |5                 |
|Very good    |4                 |
+-------------+------------------+
only showing top 5 rows



In [None]:
# data_encoded_general_health.printSchema()

In [37]:
# Find distinct values in the LastCheckupTime column
distinct_checkup_times = data_encoded_general_health.select("LastCheckupTime").distinct()

# Show the distinct values
distinct_checkup_times.show(truncate=False)


+-------------------------------------------------------+
|LastCheckupTime                                        |
+-------------------------------------------------------+
|Within past 2 years (1 year but less than 2 years ago) |
|Within past 5 years (2 years but less than 5 years ago)|
|5 or more years ago                                    |
|Within past year (anytime less than 12 months ago)     |
+-------------------------------------------------------+



In [38]:
from pyspark.sql.functions import col, when

# Define the mean values for the intervals using `when` and `otherwise`
data_encoded_checkup = data_encoded_general_health.withColumn(
    "LastCheckupTimeIndex",
    when(col("LastCheckupTime") == "5 or more years ago", 5.0)
    .when(col("LastCheckupTime") == "Within past year (anytime less than 12 months ago)", 0.5)
    .when(col("LastCheckupTime") == "Within past 5 years (2 years but less than 5 years ago)", 3.5)
    .when(col("LastCheckupTime") == "Within past 2 years (1 year but less than 2 years ago)", 1.5)
    .otherwise(None)
)

# Display the result
data_encoded_checkup.select(["LastCheckupTime", "LastCheckupTimeIndex"]).show(20, truncate=False)


+-------------------------------------------------------+--------------------+
|LastCheckupTime                                        |LastCheckupTimeIndex|
+-------------------------------------------------------+--------------------+
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past 2 years (1 year but less than 2 years ago) |1.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago)     |0.5                 |
|Within past year (anytime less than 12 months ago) 

In [41]:
# Show the distinct values of "SmokerStatus" column 
data_encoded_checkup.select("SmokerStatus").distinct().show(truncate=False)

+-------------------------------------+
|SmokerStatus                         |
+-------------------------------------+
|Never smoked                         |
|Current smoker - now smokes every day|
|Former smoker                        |
|Current smoker - now smokes some days|
+-------------------------------------+



In [43]:
from pyspark.sql.functions import col, when

# Define label encoding for SmokerStatus
data_encoded_smoker_status = data_encoded_checkup.withColumn(
    "SmokerStatusIndex",
    when(col("SmokerStatus") == "Never smoked", 0)
    .when(col("SmokerStatus") == "Former smoker", 1)
    .when(col("SmokerStatus") == "Current smoker - now smokes some days", 2)
    .when(col("SmokerStatus") == "Current smoker - now smokes every day", 3)
    .otherwise(None)  # Handle any unexpected values
)

# Display the result
data_encoded_smoker_status.select(["SmokerStatus", "SmokerStatusIndex"]).show(10, truncate=False)


+-------------------------------------+-----------------+
|SmokerStatus                         |SmokerStatusIndex|
+-------------------------------------+-----------------+
|Former smoker                        |1                |
|Former smoker                        |1                |
|Current smoker - now smokes some days|2                |
|Current smoker - now smokes every day|3                |
|Never smoked                         |0                |
|Current smoker - now smokes every day|3                |
|Current smoker - now smokes every day|3                |
|Former smoker                        |1                |
|Former smoker                        |1                |
|Never smoked                         |0                |
+-------------------------------------+-----------------+
only showing top 10 rows



In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

# Create a column with the encoded values
data_encoded_age_category = data_encoded_smoker_status.withColumn(
    "AgeCategoryIndex",
    when(col("AgeCategory") == lit('Age 18 to 24'), lit(0))
    .when(col("AgeCategory") == lit('Age 25 to 29'), lit(1))
    .when(col("AgeCategory") == lit('Age 30 to 34'), lit(2))
    .when(col("AgeCategory") == lit('Age 35 to 39'), lit(3))
    .when(col("AgeCategory") == lit('Age 40 to 44'), lit(4))
    .when(col("AgeCategory") == lit('Age 45 to 49'), lit(5))
    .when(col("AgeCategory") == lit('Age 50 to 54'), lit(6))
    .when(col("AgeCategory") == lit('Age 55 to 59'), lit(7))
    .when(col("AgeCategory") == lit('Age 60 to 64'), lit(8))
    .when(col("AgeCategory") == lit('Age 65 to 69'), lit(9))
    .when(col("AgeCategory") == lit('Age 70 to 74'), lit(10))
    .when(col("AgeCategory") == lit('Age 75 to 79'), lit(11))
    .when(col("AgeCategory") == lit('Age 80 or older'), lit(12))
    .otherwise(lit(None))  # handle any unexpected categories
)

# Display the result
data_encoded_age_category.show(10, truncate=False)



+-------+------+-------------+------------------------------------------------------+------------------+----------+--------------+---------+---------+---------+-------------+-------+---------------------+----------------+------------+-----------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------+-------------------------------------+---------+------------+-----------------+-----+---------------+----------+------------+-------------+-------------------+----------+-------------------------+---------------------+----------------+----------------+----------------+--------------------+--------------+----------------------------+-----------------------+-------------------+------------------+--------------------------+------------------------------+------------------------------+------------------------+--------------------------------+------------------------+----------------+----------------------+---------------

In [46]:
# Display the results of 'AgeCategory', 'AgeCategoryIndex' to verify encoding
data_encoded_age_category.select('AgeCategory', 'AgeCategoryIndex').show(5)

+------------+----------------+
| AgeCategory|AgeCategoryIndex|
+------------+----------------+
|Age 75 to 79|              11|
|Age 75 to 79|              11|
|Age 55 to 59|               7|
|Age 35 to 39|               3|
|Age 70 to 74|              10|
+------------+----------------+
only showing top 5 rows



In [45]:
# Print Schema of data_encoded_age_category
data_encoded_age_category.printSchema()

root
 |-- State: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- GeneralHealth: string (nullable = true)
 |-- LastCheckupTime: string (nullable = true)
 |-- PhysicalActivities: string (nullable = true)
 |-- SleepHours: double (nullable = true)
 |-- HadHeartAttack: string (nullable = true)
 |-- HadAngina: string (nullable = true)
 |-- HadStroke: string (nullable = true)
 |-- HadAsthma: string (nullable = true)
 |-- HadSkinCancer: string (nullable = true)
 |-- HadCOPD: string (nullable = true)
 |-- HadDepressiveDisorder: string (nullable = true)
 |-- HadKidneyDisease: string (nullable = true)
 |-- HadArthritis: string (nullable = true)
 |-- HadDiabetes: string (nullable = true)
 |-- DeafOrHardOfHearing: string (nullable = true)
 |-- BlindOrVisionDifficulty: string (nullable = true)
 |-- DifficultyConcentrating: string (nullable = true)
 |-- DifficultyWalking: string (nullable = true)
 |-- DifficultyDressingBathing: string (nullable = true)
 |-- DifficultyErrands: string 

In [None]:
import os
import pandas as pd

# Directory path
directory = 'D:/PROJECT/CSV_Files'

# Ensure the directory exists
if not os.path.exists(directory):
    os.makedirs(directory)

# Save to CSV
file_path = os.path.join(directory, 'encoded_dataset.csv')
pd_save.to_csv(file_path, index=False)

print(f"File saved successfully to {file_path}")

# # Save to Excel (requires openpyxl or xlsxwriter)
# pandas_df.to_excel(output_path, index=False)

# # Save to Parquet
# pandas_df.to_parquet(output_path, index=False)


In [54]:
# Identify columns with data type 'string' in the DataFrame
col_to_remove = [colm for colm, dt in data_encoded_age_category.dtypes if dt == "string"]

# Drop the identified string columns from the DataFrame
final_df = data_encoded_age_category.drop(*col_to_remove)

In [57]:
pd_save = data_encoded_age_category.toPandas()

In [58]:
import os
import pandas as pd

# Directory path
directory = 'D:/PROJECT/CSV_Files'

# Ensure the directory exists
if not os.path.exists(directory):
    os.makedirs(directory)

# Save to CSV
file_path = os.path.join(directory, 'encoded_dataset(complete).csv')
pd_save.to_csv(file_path, index=False)

print(f"File saved successfully to {file_path}")

# # Save to Excel (requires openpyxl or xlsxwriter)
# pandas_df.to_excel(output_path, index=False)

# # Save to Parquet
# pandas_df.to_parquet(output_path, index=False)


File saved successfully to D:/PROJECT/CSV_Files\encoded_dataset(complete).csv
