# Spark_ETL.ipynb

#### This file is an experimental trial of implementing PySpark in the data preprocessing steps to create a Spark pipeline for the purpose of understanding the workflow of Spark implementation in data preprocessing. The proper order in which the files should be executed is with data_preprocessing.ipynb at the start, followed by this notebook, then test_train_datasets.ipynb.

### Key components:
- schema enforcement
- windowed de-dup
- quantile-based winsorization
- imputation
- categorical encoding
- clean persistence to Parquet
##### Parquet is an open-source, columnar data storage format commonly used in big data ecosystems like Apache Spark

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T, Window
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [None]:
# Start a Spark session
spark = (SparkSession.builder
         .appName("cardiologist-xgb-etl")
         .config("spark.sql.shuffle.partitions", "200")
         .getOrCreate())

In [None]:
# Columns
# State; Sex; GeneralHealth; PhysicalHealthDays; MentalHealthDays; LastCheckupTime; PhysicalActivities; SleepHours; RemovedTeeth; HadHeartAttack; HadAngina; HadStroke; HadAsthma; HadSkinCancer; HadCOPD; HadDepressiveDisorder; HadKidneyDisease; HadArthritis; HadDiabetes DeafOrHardOfHearing; BlindOrVisionDifficulty; DifficultyConcentrating; DifficultyWalking; DifficultyDressingBathing; DifficultyErrands; SmokerStatus; ECigaretteUsage; ChestScan; RaceEthnicityCategory; AgeCategory; HeightInMeters; WeightInKilograms; BMI; AlcoholDrinkers; HIVTesting; FluVaxLast12; PneumoVaxEver; TetanusLast10Tdap; HighRiskLastYear; CovidPos;

In [None]:
# Set up an explicit schema - edit these!
schema = T.StructType([
    T.StructField("State", T.StringType()),
    T.StructField("Sex", T.StringType()),
    T.StructField("GeneralHealth", T.StringType()),
    T.StructField("PhysicalHealthDays", T.IntegerType()),
    T.StructField("MentalHealthDays", T.IntegerType()),
    T.StructField("LastCheckupTime", T.StringType()),
    T.StructField("PhysicalActivities", T.StringType()),
    T.StructField("SleepHours", T.IntegerType()),
    T.StructField("RemovedTeeth", T.StringType()),
    T.StructField("HadHeartAttack", T.StringType()),
    T.StructField("HadAngina", T.StringType()),
    T.StructField("HadStroke", T.StringType()),
    T.StructField("HadAsthma", T.StringType()),
    T.StructField("HadSkinCancer", T.StringType()),
    # ...
    # add more as needed
    # ...
])

In [None]:
# Read the data
raw = (spark.read
       .option("header", True)
       .schema(schema)
       .option("mode", "PERMISSIVE")             # keep bad rows instead of failing
       .csv("../Data/heart_2022_no_nans.csv"))   # 81 MB

### Canonicalization - convert multiple variations of data into a single, standard format to ensure consistency

In [None]:
# Quarantine malformed rows
bad = raw.filter(F.col("SleepHours").isNull() & F.col("_corrupt_record").isNotNull()) if "_corrupt_record" in raw.columns else spark.createDataFrame([], schema)

In [None]:
# Canonicalize & light fixes - edit these!
df = raw.withColumn("Sex", F.trim(F.lower(F.col("Sex")))) \
        .replace({"m":"Male","f":"Female"}, subset=["Sex"])

In [None]:
# Deduplicate - keep latest per patient
#w = Window.partitionBy("patient_id").orderBy(F.col("event_ts").desc())
#df_latest = (df.withColumn("rn", F.row_number().over(w))                    # row_number window function
#               .filter("rn = 1").drop("rn"))                                # keep latest record

In [None]:
# Outlier winsorization via approxQuantile
num_cols = ["PhysicalHealthDays", "MentalHealthDays", "SleepHours", "BMI"]
q = {}
for c in num_cols:
    lo, hi = df_latest.approxQuantile(c, [0.01, 0.99], 0.01)  # 1%/99% caps
    q[c] = (lo, hi)
for c,(lo,hi) in q.items():
    df_latest = df_latest.withColumn(c, F.when(F.col(c) < lo, lo)
                                             .when(F.col(c) > hi, hi)
                                             .otherwise(F.col(c)))

In [None]:
# Impute numeric nulls
imputer = Imputer(inputCols=num_cols, outputCols=[f"{c}_imp" for c in num_cols])

In [None]:
# Encode categoricals
cat_cols = ["Sex"]              # add more as needed
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_oh"], dropLast=False) for c in cat_cols]

In [None]:
# Create pipeline
pipe = Pipeline(stages=[imputer] + indexers + encoders)     # include categorical encoding
fitted = pipe.fit(df_latest)
silver = fitted.transform(df_latest)

In [None]:
# Optional
# Create domain features; can create bins for numerical values
#silver = silver.withColumn("age_bin", F.when(F.col("age") < 40, "under40")
#                                      .when((F.col("age") >= 40) & (F.col("age") < 55), "40_54")
#                                      .when((F.col("age") >= 55) & (F.col("age") < 70), "55_69")
#                                      .otherwise("70_plus"))

In [None]:
# Persist “silver” layer
(silver
 .write.mode("overwrite")
 .partitionBy()   # add a partition if you have a date
 .parquet("data/silver/patient_snapshot"))