In [11]:
import os
import pandas as pd
import time
import glob
from IPython.display import clear_output


from pyspark.sql import SparkSession
import pyspark.sql.functions as sql_f
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, datediff, floor, col, avg, substring, when, length, lpad, monotonically_increasing_id, expr, unix_timestamp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from concurrent.futures import ThreadPoolExecutor

spark = SparkSession.builder.getOrCreate()





In [3]:
#Function to measure execution time...
def time_execution(task_name, func):
    start_time = time.time()
    result = func()
    end_time = time.time()
    duration = end_time - start_time
    print(f"{task_name} executed in {duration:.2f} seconds")
    return result, duration


## 1) Creating the spark dataframes 

In [1]:
#setting path to HDFS folder
path = '/synthea_output/'


In [4]:
#Patient files
observations = spark.read.csv(path+"observations.csv", header=True)
patient = spark.read.csv(path+"patients.csv", header=True) 
#Medical files
careplans = spark.read.csv(path+"careplans.csv", header=True)
conditions = spark.read.csv(path+"conditions.csv", header=True)
procedures=spark.read.csv(path+"procedures.csv", header=True)
encounters = spark.read.csv(path+"encounters.csv", header=True)
medications = spark.read.csv(path+"medications.csv", header=True)
#Insurance and hospital files
payer_transitions=spark.read.csv(path+"payer_transitions.csv", header=True)
payers=spark.read.csv(path+"payers.csv", header=True)
providers=spark.read.csv(path+"providers.csv", header=True)
organizations=spark.read.csv(path+"organizations.csv", header=True)

                                                                                

## 3) Cleaning dataframes and renaming variables 

In [5]:
#Renaming columns
patient = (
    patient.withColumnRenamed("Id", "patient_id")
           .withColumnRenamed("MARITAL", "patient_marital")
           .withColumnRenamed("RACE", "patient_race")
           .withColumnRenamed("ETHNICITY", "patient_ethnicity")
           .withColumnRenamed("GENDER", "patient_gender")
           .withColumnRenamed("ZIP", "patient_zip")
)
encounters = (
    encounters.withColumnRenamed("PATIENT", "patient_id")
              .withColumnRenamed("Id", "encounter_id")
              .withColumnRenamed("DESCRIPTION", "encounter_discription")
              .withColumnRenamed("CODE", "encounter_code")
              .withColumnRenamed("START", "encounter_start")
              .withColumn("encounter_start", to_date("encounter_start"))
              .withColumnRenamed("STOP", "encounter_stop")
              .withColumn("encounter_stop", to_date("encounter_stop"))
              .withColumn("PATIENT COST", col("TOTAL_CLAIM_COST") - col("PAYER_COVERAGE"))
              .withColumnRenamed("PAYER", "payer_id")
              .withColumnRenamed("ORGANIZATION", "organization_id")
              .withColumnRenamed("PROVIDER", "provider_id")
)
careplans = (
    careplans.withColumnRenamed("PATIENT", "patient_id")
             .withColumnRenamed("Id", "careplan_id")
             .withColumnRenamed("ENCOUNTER", "encounter_id")
             .withColumnRenamed("DESCRIPTION", "careplan_descriptions")
             .withColumnRenamed("CODE", "careplan_code")
)
procedures = (
    procedures.withColumnRenamed("PATIENT", "patient_id")
              .withColumnRenamed("ENCOUNTER", "encounter_id")
              .withColumnRenamed("DESCRIPTION", "procedure_descriptions")
              .withColumnRenamed("CODE", "procedure_code")
              .withColumnRenamed("DATE", "procedure_date")
              .withColumnRenamed("BASE_COST", "procedure_cost")
)
conditions = (
    conditions.withColumnRenamed("PATIENT", "patient_id")
              .withColumnRenamed("ENCOUNTER", "encounter_id")
              .withColumnRenamed("DESCRIPTION", "condition_description")
              .withColumnRenamed("CODE", "condition_code")
              .withColumnRenamed("START", "condition_start")
              .withColumnRenamed("END", "condition_end")
)
observations = (
    observations.withColumnRenamed("PATIENT", "patient_id")
                .withColumnRenamed("ENCOUNTER", "encounter_id")
                .withColumnRenamed("DATE", "observation_date")
                .withColumn("observation_date", to_date("observation_date"))
                .withColumn("obs_value", col("VALUE").cast("double"))
                .withColumnRenamed("CODE", "observation_code")
                .withColumnRenamed("DESCRIPTION", "observation_description")
)
medications = (
    medications.withColumnRenamed("START", "medication_start")
               .withColumn("medication_start", to_date("medication_start"))
               .withColumnRenamed("STOP", "medication_stop")
               .withColumn("medication_stop", to_date("medication_stop"))
               .withColumnRenamed("PATIENT", "patient_id")
               .withColumnRenamed("PAYER", "payer_id")
               .withColumnRenamed("ENCOUNTER", "encounter_id")
               .withColumnRenamed("CODE", "medication_code")
               .withColumnRenamed("DESCRIPTION", "medication_description")
)
payer_transitions = (
    payer_transitions.withColumnRenamed("PATIENT", "patient_id")
                     .withColumnRenamed("PAYER", "payer_id")
)
payers = (
    payers.withColumnRenamed("Id", "payer_id")
          .withColumnRenamed("NAME", "payer_name")
          .withColumnRenamed("OWNERSHIP", "payer_ownership")
)
providers = (
    providers.withColumnRenamed("Id", "provider_id")
             .withColumnRenamed("SPECIALITY", "provider_specialty")
)
organizations = (
    organizations.withColumnRenamed("Id", "organization_id")
                 .withColumnRenamed("NAME", "organization_name")
                 .withColumnRenamed("ZIP", "organization_zip")
)
organizations = organizations.withColumn(
    "organization_zip",
    col("organization_zip").cast("string")
)

#adding leading 0's to zip codes to retain their information
organizations = organizations.withColumn(
    "organization_zip",
    when(length(col("organization_zip")) == 4,  
    lpad(col("organization_zip"), 5, "0")
).otherwise(col("organization_zip")))   

In [6]:
#Merge together dataframes on various id fields that will be used for ML modeling...
encounters = (
    encounters
    .join(payers.select("payer_id", "payer_name", "payer_ownership"), on="payer_id", how="left")
    .join(organizations.select("organization_id", "organization_name", "organization_zip"), on="organization_id", how="left")
    .join(providers.select("provider_id", "provider_specialty"), on="provider_id", how="left")
    .join(procedures.select("encounter_id", "procedure_descriptions", "procedure_code"), on="encounter_id", how="left")
    .join(patient.select("patient_id", "BIRTHDATE", "patient_marital", "patient_race", "patient_ethnicity", "patient_gender", "patient_zip"), on="patient_id", how="left")
    .withColumn("age_at_encounter", floor(datediff(col("encounter_start"), col("BIRTHDATE")) / 365.25))
)


In [7]:
encounters.show(5)

25/05/05 23:39:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 16:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------+--------------+--------------+---------------------+-------------------+----------------+--------------+----------+-----------------+------------+----------+---------------+--------------------+----------------+------------------+----------------------+--------------+----------+---------------+------------+-----------------+--------------+-----------+----------------+
|          patient_id|        encounter_id|         provider_id|     organization_id|            payer_id|encounter_start|encounter_stop|ENCOUNTERCLASS|encounter_code|encounter_discription|BASE_ENCOUNTER_COST|TOTAL_CLAIM_COST|PAYER_COVERAGE|REASONCODE|REASONDESCRIPTION|PATIENT COST|payer_name|payer_ownership|   organization_name|organization_zip|provider_specialty|procedure_descriptions|procedure_code| BIRTHDATE|patient_marital|patient_race|patient_ethnicity|patient_gender|patient_zip|ag

                                                                                

In [8]:
encounters.columns

['patient_id',
 'encounter_id',
 'provider_id',
 'organization_id',
 'payer_id',
 'encounter_start',
 'encounter_stop',
 'ENCOUNTERCLASS',
 'encounter_code',
 'encounter_discription',
 'BASE_ENCOUNTER_COST',
 'TOTAL_CLAIM_COST',
 'PAYER_COVERAGE',
 'REASONCODE',
 'REASONDESCRIPTION',
 'PATIENT COST',
 'payer_name',
 'payer_ownership',
 'organization_name',
 'organization_zip',
 'provider_specialty',
 'procedure_descriptions',
 'procedure_code',
 'BIRTHDATE',
 'patient_marital',
 'patient_race',
 'patient_ethnicity',
 'patient_gender',
 'patient_zip',
 'age_at_encounter']

## Preparing Data

In [9]:
modeling_df = encounters.select(
    col("PATIENT COST").cast("double").alias("label"),
    col("age_at_encounter").cast("double"),
    col("patient_marital"),
    col("patient_race"),
    col("patient_ethnicity"),
    col("patient_gender"),
    col("ENCOUNTERCLASS"),
    col("payer_ownership"),
    col("payer_name"),
    col("organization_zip"),
    col("organization_name"),
    col("procedure_code"),
    #col("encounter_discription"), # don't use for RF, LR and GBT models
    col("encounter_code"),
    #col("REASONDESCRIPTION"), # don't use for RF, LR and GBT models
).na.drop().filter(col("PATIENT COST") != 0)


# Define categorical and numeric columns
categorical_cols = ['patient_marital', 'patient_race', 'patient_ethnicity', 
                   'patient_gender', 'ENCOUNTERCLASS',
                   'payer_ownership',"payer_name","organization_name", "organization_zip", 'procedure_code',"encounter_code"]
numeric_cols = ['age_at_encounter']

target_col = "label"

# Create feature engineering pipeline stages
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") 
            for col in categorical_cols]

encoder = OneHotEncoder(
    inputCols=[col+"_index" for col in categorical_cols],
    outputCols=[col+"_encoded" for col in categorical_cols],
    dropLast=True
)

assembler = VectorAssembler(
    inputCols=numeric_cols + [col+"_encoded" for col in categorical_cols],
    outputCol="features"
)

# Cache after feature engineering
feature_pipeline = Pipeline(stages=indexers + [encoder, assembler])
feature_model = feature_pipeline.fit(modeling_df)
feature_df = feature_model.transform(modeling_df).select("features", target_col).cache()

                                                                                

In [13]:
feature_df.count()

1825840

In [14]:
#Splitting training and test dataframes nd present the number of partitions of the training dataframe
train_df, test_df = time_execution(
    "Data splitting",
    lambda: feature_df.randomSplit([0.8, 0.2], seed=53)
)[0]
train_df.rdd.getNumPartitions()

Data splitting executed in 0.05 seconds


2

In [15]:
# Repartition data to distribute it across the cluster
train_df = train_df.repartition(12)

# Optional: Check how many partitions are in the RDD
print(f"Training Data Partitions: {train_df.rdd.getNumPartitions()}")



Training Data Partitions: 12


## Random Forest Model

In [16]:
# Common evaluator for regression tasks
reg_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=30,  # Reduced for initial testing
    maxDepth=10,
    subsamplingRate=0.7,
    featureSubsetStrategy='sqrt',
    seed=42
)

rf_model, rf_time = time_execution(
    "Random Forest training",
    lambda: rf.fit(train_df)
)

rf_predictions = rf_model.transform(test_df)


print("Random Forest Regression Results:")
print("RMSE:", reg_evaluator.evaluate(rf_predictions, {reg_evaluator.metricName: "rmse"}))
print("R2:", reg_evaluator.evaluate(rf_predictions, {reg_evaluator.metricName: "r2"}))

25/05/05 23:48:27 WARN DAGScheduler: Broadcasting large task binary with size 1035.5 KiB


Random Forest training executed in 184.05 seconds
Random Forest Regression Results:


                                                                                

RMSE: 4626.005034931074




R2: 0.49004260091445506


                                                                                

In [17]:
rf_predictions.show(20)

[Stage 158:>                                                        (0 + 1) / 1]

+--------------------+-----------------+------------------+
|            features|            label|        prediction|
+--------------------+-----------------+------------------+
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|1594.5572449494605|
|(1059,[0,1,5,10,1...|           159.11|

                                                                                

## Linear Regression Model

In [20]:
### Linear Regression
lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    regParam=0.01,
    elasticNetParam=0.5
)

lr_model, lr_time = time_execution(
    "Linear Regression training",
    lambda: lr.fit(train_df)
)

lr_predictions = lr_model.transform(test_df)

print("\nLinear Regression Results:")
print("RMSE:", reg_evaluator.evaluate(lr_predictions, {reg_evaluator.metricName: "rmse"}))
print("R2:", reg_evaluator.evaluate(lr_predictions, {reg_evaluator.metricName: "r2"}))


                                                                                

Linear Regression training executed in 10.82 seconds

Linear Regression Results:


                                                                                

RMSE: 4383.5723855771585




R2: 0.5420921803054333


                                                                                

In [21]:
lr_predictions.show(20)

[Stage 182:>                                                        (0 + 1) / 1]

+--------------------+-----------------+------------------+
|            features|            label|        prediction|
+--------------------+-----------------+------------------+
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|20.119481641363564|
|(1059,[0,1,5,10,1...|           159.11|

                                                                                

## Gradient Boosted Tree Model

In [22]:
# Gradient Boosted Trees Regressor
gbt = GBTRegressor(
    featuresCol="features",
    labelCol="label",
    maxIter=20,  # Reduced iterations
    maxDepth=10,
    stepSize=0.1,
    subsamplingRate=0.7,
    seed=42
)

gbt_model, gbt_time = time_execution(
    "GBT training",
    lambda: gbt.fit(train_df)
)

gbt_predictions = gbt_model.transform(test_df)

print("\nGBT Regression Results:")
print("RMSE:", reg_evaluator.evaluate(gbt_predictions, {reg_evaluator.metricName: "rmse"}))
print("R2:", reg_evaluator.evaluate(gbt_predictions, {reg_evaluator.metricName: "r2"}))

                                                                                

GBT training executed in 1316.55 seconds

GBT Regression Results:


                                                                                

RMSE: 2584.553362696322




R2: 0.8408184600023728


                                                                                

In [23]:
gbt_predictions.show(20)

[Stage 794:>                                                        (0 + 1) / 1]

+--------------------+-----------------+-----------------+
|            features|            label|       prediction|
+--------------------+-----------------+-----------------+
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.7950451420482|
|(1059,[0,1,5,10,1...|           159.11|387.795045142048

                                                                                

## Ensemble of ML Models 

In [25]:
# Start total execution timer
total_start = time.time()

# Split the training data into 3 parts for the 3 models
split_start = time.time()
rf_df, lr_df, gbt_df = train_df.randomSplit([1.0, 1.0, 1.0], seed=42)
split_time = time.time() - split_start
print(f"Data splitting completed in {split_time:.2f} seconds")

# Define model training functions
def train_rf():
    start = time.time()
    rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=30, maxDepth=10, seed=42)
    model = rf.fit(rf_df)
    print(f"Random Forest trained in {time.time() - start:.2f} seconds")
    return model

def train_lr():
    start = time.time()
    lr = LinearRegression(featuresCol="features", labelCol="label", regParam=0.01, elasticNetParam=0.5)
    model = lr.fit(lr_df)
    print(f"Linear Regression trained in {time.time() - start:.2f} seconds")
    return model

def train_gbt():
    start = time.time()
    gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=10, maxDepth=5, stepSize=0.1, seed=42)
    model = gbt.fit(gbt_df)
    print(f"GBT trained in {time.time() - start:.2f} seconds")
    return model

# Train models in parallel with timing
train_start = time.time()
with ThreadPoolExecutor() as executor:
    rf_future = executor.submit(train_rf)
    lr_future = executor.submit(train_lr)
    gbt_future = executor.submit(train_gbt)

    rf_model = rf_future.result()
    lr_model = lr_future.result()
    gbt_model = gbt_future.result()
train_time = time.time() - train_start
print(f"Total model training time (parallel): {train_time:.2f} seconds")

# Make predictions and add row indices
pred_start = time.time()
rf_pred = rf_model.transform(test_df).select("prediction", "label") \
    .withColumnRenamed("prediction", "rf_pred") \
    .withColumn("row_idx", monotonically_increasing_id())

lr_pred = lr_model.transform(test_df).select("prediction") \
    .withColumnRenamed("prediction", "lr_pred") \
    .withColumn("row_idx", monotonically_increasing_id())

gbt_pred = gbt_model.transform(test_df).select("prediction") \
    .withColumnRenamed("prediction", "gbt_pred") \
    .withColumn("row_idx", monotonically_increasing_id())
pred_time = time.time() - pred_start
print(f"Prediction generation completed in {pred_time:.2f} seconds")

# Join predictions on row index
join_start = time.time()
ensemble_df = rf_pred \
    .join(lr_pred, on="row_idx") \
    .join(gbt_pred, on="row_idx") \
    .withColumn("ensemble_prediction", expr("(rf_pred + lr_pred + gbt_pred)/3"))
join_time = time.time() - join_start
print(f"Prediction joining completed in {join_time:.2f} seconds")

# Evaluate ensemble
eval_start = time.time()
reg_evaluator = RegressionEvaluator(labelCol="label", predictionCol="ensemble_prediction")

print("\nEnsemble Results:")
print("RMSE:", reg_evaluator.evaluate(ensemble_df, {reg_evaluator.metricName: "rmse"}))
print("R2:", reg_evaluator.evaluate(ensemble_df, {reg_evaluator.metricName: "r2"}))
eval_time = time.time() - eval_start
print(f"Evaluation completed in {eval_time:.2f} seconds")

# Total execution time
total_time = time.time() - total_start
print(f"\nTotal execution time: {total_time:.2f} seconds")

Data splitting completed in 0.03 seconds


                                                                                

Linear Regression trained in 60.96 seconds


25/05/06 00:27:17 WARN DAGScheduler: Broadcasting large task binary with size 1082.3 KiB
25/05/06 00:27:38 WARN DAGScheduler: Broadcasting large task binary with size 1614.4 KiB
25/05/06 00:28:02 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/05/06 00:28:26 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
25/05/06 00:28:53 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
25/05/06 00:29:20 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
                                                                                

Random Forest trained in 284.06 seconds


                                                                                

GBT trained in 324.22 seconds
Total model training time (parallel): 324.22 seconds
Prediction generation completed in 0.35 seconds
Prediction joining completed in 0.32 seconds

Ensemble Results:




RMSE: 3042.811851453095




R2: 0.7793662769717404
Evaluation completed in 16.22 seconds

Total execution time: 341.14 seconds


                                                                                

In [26]:
ensemble_df.show(20)

[Stage 1221:>                                                       (0 + 1) / 1]

+-------+----------------+-----------------+-----------------+-----------------+-------------------+
|row_idx|         rf_pred|            label|          lr_pred|         gbt_pred|ensemble_prediction|
+-------+----------------+-----------------+-----------------+-----------------+-------------------+
|      0|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      1|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      2|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      3|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      4|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      5|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.4269165804801|
|      6|653.410714612847|           159.11|93.15215373262299|646.7178813959704|  464.42691

                                                                                