Length of stay prediction

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("LengthOfStayPrediction") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load the transformed and engineered dataset
data_path = "transformed_engineered_healthcare_dataset.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Show the first few rows to confirm
df.show(5)

+---+---------------+------+---+------+-------+-------+----------------+--------------+------------------+---------------+----------+---------+---------------+------------------+----------------+-----------+----------------+---------+--------------------+-------------+--------------+-------------------+
|_c0|           race|gender|age|diag_1| diag_2| diag_3|time_in_hospital|num_procedures|num_lab_procedures|num_medications|readmitted|     cost|total_diagnoses|      cost_per_day|total_procedures|  age_group|readmitted_index|high_cost|diagnosis_complexity| race_encoded|gender_encoded|procedure_intensity|
+---+---------------+------+---+------+-------+-------+----------------+--------------+------------------+---------------+----------+---------+---------------+------------------+----------------+-----------+----------------+---------+--------------------+-------------+--------------+-------------------+
|  0|      Caucasian|Female|  5|250.83|Unknown|Unknown|               1|             

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Define the feature columns
feature_cols = ["age", "total_diagnoses", "total_procedures", "procedure_intensity",
                "num_medications", "readmitted_index"]

# Removed race_encoded and gender_encoded

# StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index")
            for column in ["race_encoded", "gender_encoded"]]

# Apply StringIndexer to the DataFrame
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Update feature_cols to include indexed columns
feature_cols += ["race_encoded_index", "gender_encoded_index"]

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Select the features and label (time_in_hospital)
model_df = df.select("features", col("time_in_hospital").alias("label"))

# Split the data into training and test sets (70% training, 30% test)
train_df, test_df = model_df.randomSplit([0.7, 0.3], seed=42)

# Show the prepared data
model_df.show(5, truncate=False)

+------------------------------------+-----+
|features                            |label|
+------------------------------------+-----+
|[5.0,1.0,41.0,1.0,1.0,0.0,0.0,0.0]  |1    |
|[15.0,3.0,59.0,1.0,18.0,1.0,0.0,0.0]|3    |
|[25.0,3.0,16.0,1.0,13.0,0.0,1.0,0.0]|2    |
|[35.0,3.0,45.0,1.0,16.0,0.0,0.0,1.0]|2    |
|[45.0,3.0,51.0,1.0,8.0,0.0,0.0,1.0] |1    |
+------------------------------------+-----+
only showing top 5 rows



In [None]:
# Initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")

# Train the model
lr_model = lr.fit(train_df)

# Make predictions on the test set
predictions = lr_model.transform(test_df)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

# Show some predictions
predictions.select("prediction", "label").show(5)

Root Mean Squared Error (RMSE) on test data: 2.4639282226148334
+------------------+-----+
|        prediction|label|
+------------------+-----+
|0.6791828830544108|    2|
| 1.608784272824062|    2|
|1.1026659285460632|    1|
|1.1026659285460632|    6|
| 2.337497575656908|    2|
+------------------+-----+
only showing top 5 rows



Performance Comparison of Spark Operations (RDD vs DataFrame vs SQL)

In [None]:
# DataFrame approach
start_time_df = time.time()

avg_los_df = df.groupBy("age_group").agg(avg("time_in_hospital").alias("avg_length_of_stay"))
avg_los_df.show()

end_time_df = time.time()
df_time = end_time_df - start_time_df
print(f"DataFrame execution time: {df_time} seconds")

+-----------+------------------+
|  age_group|avg_length_of_stay|
+-----------+------------------+
|     Senior| 4.373768956558516|
|Young Adult|3.4644171779141106|
|      Adult|3.8628294782140937|
|      Child|3.0354609929078014|
+-----------+------------------+

DataFrame execution time: 1.6875801086425781 seconds


In [None]:
# Convert DataFrame to RDD
rdd = df.rdd

# RDD approach
start_time_rdd = time.time()

# Map to (age_group, (time_in_hospital, 1)) and reduce by key to compute average
avg_los_rdd = rdd.map(lambda row: (row["age_group"], (row["time_in_hospital"], 1))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .mapValues(lambda x: x[0] / x[1]) \
    .collect()

# Print the results
print("RDD Results (age_group, avg_length_of_stay):")
for age_group, avg_los in avg_los_rdd:
    print(f"{age_group}: {avg_los}")

end_time_rdd = time.time()
rdd_time = end_time_rdd - start_time_rdd
print(f"RDD execution time: {rdd_time} seconds")

RDD Results (age_group, avg_length_of_stay):
Child: 3.0354609929078014
Young Adult: 3.4644171779141106
Adult: 3.8628294782140937
Senior: 4.373768956558516
RDD execution time: 12.335059642791748 seconds


In [None]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("healthcare")

# Spark SQL approach
start_time_sql = time.time()

avg_los_sql = spark.sql("""
    SELECT age_group, AVG(time_in_hospital) as avg_length_of_stay
    FROM healthcare
    GROUP BY age_group
""")
avg_los_sql.show()

end_time_sql = time.time()
sql_time = end_time_sql - start_time_sql
print(f"Spark SQL execution time: {sql_time} seconds")

+-----------+------------------+
|  age_group|avg_length_of_stay|
+-----------+------------------+
|     Senior| 4.373768956558516|
|Young Adult|3.4644171779141106|
|      Adult|3.8628294782140937|
|      Child|3.0354609929078014|
+-----------+------------------+

Spark SQL execution time: 2.5841288566589355 seconds


In [None]:
# Compare the execution times
print("\nPerformance Comparison:")
print(f"DataFrame: {df_time} seconds")
print(f"RDD: {rdd_time} seconds")
print(f"Spark SQL: {sql_time} seconds")


Performance Comparison:
DataFrame: 1.6875801086425781 seconds
RDD: 12.335059642791748 seconds
Spark SQL: 2.5841288566589355 seconds


In [None]:
# Save the predictions as a CSV file
predictions_pandas = predictions.select("prediction", "label").toPandas()
predictions_pandas.to_csv("length_of_stay_predictions.csv", index=False)

# Save the performance comparison
with open("performance_comparison_los.txt", "w") as f:
    f.write(f"DataFrame: {df_time} seconds\n")
    f.write(f"RDD: {rdd_time} seconds\n")
    f.write(f"Spark SQL: {sql_time} seconds\n")

# Download the files
from google.colab import files
files.download("length_of_stay_predictions.csv")
files.download("performance_comparison_los.txt")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Resource Utilization


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CPUResourceUtilizationAnalysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load the transformed and engineered dataset
data_path = "/content/transformed_engineered_healthcare_dataset.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Cache the DataFrame to ensure fair comparison
df.cache()

DataFrame[_c0: int, race: string, gender: string, age: int, diag_1: string, diag_2: string, diag_3: string, time_in_hospital: int, num_procedures: int, num_lab_procedures: int, num_medications: int, readmitted: string, cost: double, total_diagnoses: int, cost_per_day: double, total_procedures: int, age_group: string, readmitted_index: double, high_cost: int, diagnosis_complexity: int, race_encoded: string, gender_encoded: string, procedure_intensity: int]

In [None]:
# DataFrame approach
start_time_df = time.time()

avg_los_df = df.groupBy("age_group").agg(avg("time_in_hospital").alias("avg_length_of_stay"))
avg_los_df.show()

end_time_df = time.time()
df_time = end_time_df - start_time_df
print(f"DataFrame execution time: {df_time} seconds")

# Collect high-level metrics
df_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"DataFrame Job IDs: {df_job_ids}")
for job_id in df_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"DataFrame Job {job_id} Stages: {stages}")

+-----------+------------------+
|  age_group|avg_length_of_stay|
+-----------+------------------+
|     Senior| 4.373768956558516|
|Young Adult|3.4644171779141106|
|      Adult|3.8628294782140937|
|      Child|3.0354609929078014|
+-----------+------------------+

DataFrame execution time: 0.3625023365020752 seconds
DataFrame Job IDs: [42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
DataFrame Job 42 Stages: [I@748c40d9
DataFrame Job 41 Stages: [I@59d3e429
DataFrame Job 40 Stages: [I@6582ff57
DataFrame Job 39 Stages: [I@14f7eb1b
DataFrame Job 38 Stages: [I@36d58297
DataFrame Job 37 Stages: [I@1dd11865
DataFrame Job 36 Stages: [I@40b9f540
DataFrame Job 35 Stages: [I@79f74f9e
DataFrame Job 34 Stages: [I@63acbb0d
DataFrame Job 33 Stages: [I@76ab852d
DataFrame Job 32 Stages: [I@79377d4d
DataFrame Job 31 Stages: [I@3a7f464a
DataFrame Job 30 Stages: [I@3c8df6db
DataFrame Job 29 St

In [None]:
# Convert DataFrame to RDD
rdd = df.rdd

# RDD approach
start_time_rdd = time.time()

avg_los_rdd = rdd.map(lambda row: (row["age_group"], (row["time_in_hospital"], 1))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .mapValues(lambda x: x[0] / x[1]) \
    .collect()

print("RDD Results (age_group, avg_length_of_stay):")
for age_group, avg_los in avg_los_rdd:
    print(f"{age_group}: {avg_los}")

end_time_rdd = time.time()
rdd_time = end_time_rdd - start_time_rdd
print(f"RDD execution time: {rdd_time} seconds")

# Collect high-level metrics
rdd_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"RDD Job IDs: {rdd_job_ids}")
for job_id in rdd_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"RDD Job {job_id} Stages: {stages}")

RDD Results (age_group, avg_length_of_stay):
Child: 3.0354609929078014
Young Adult: 3.4644171779141106
Adult: 3.8628294782140937
Senior: 4.373768956558516
RDD execution time: 3.868530750274658 seconds
RDD Job IDs: [43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
RDD Job 43 Stages: [I@4f8bf835
RDD Job 42 Stages: [I@4bab249f
RDD Job 41 Stages: [I@66fba775
RDD Job 40 Stages: [I@114afd9a
RDD Job 39 Stages: [I@190e5d6
RDD Job 38 Stages: [I@130da6a7
RDD Job 37 Stages: [I@28f67bbc
RDD Job 36 Stages: [I@6e16e515
RDD Job 35 Stages: [I@68ae1354
RDD Job 34 Stages: [I@1668a878
RDD Job 33 Stages: [I@d581a92
RDD Job 32 Stages: [I@543f6861
RDD Job 31 Stages: [I@4c93e501
RDD Job 30 Stages: [I@1febbf83
RDD Job 29 Stages: [I@52aaaa66
RDD Job 28 Stages: [I@7d79bc32
RDD Job 27 Stages: [I@2e91f571
RDD Job 26 Stages: [I@6c445087
RDD Job 25 Stages: [I@6bba3ef3
RDD Job 24 Stages: [I@5b12879a
R

In [None]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("healthcare")

# Spark SQL approach
start_time_sql = time.time()

avg_los_sql = spark.sql("""
    SELECT age_group, AVG(time_in_hospital) as avg_length_of_stay
    FROM healthcare
    GROUP BY age_group
""")
avg_los_sql.show()

end_time_sql = time.time()
sql_time = end_time_sql - start_time_sql
print(f"Spark SQL execution time: {sql_time} seconds")

# Collect high-level metrics
sql_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"Spark SQL Job IDs: {sql_job_ids}")
for job_id in sql_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"Spark SQL Job {job_id} Stages: {stages}")

+-----------+------------------+
|  age_group|avg_length_of_stay|
+-----------+------------------+
|     Senior| 4.373768956558516|
|Young Adult|3.4644171779141106|
|      Adult|3.8628294782140937|
|      Child|3.0354609929078014|
+-----------+------------------+

Spark SQL execution time: 1.0876822471618652 seconds
Spark SQL Job IDs: [45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Spark SQL Job 45 Stages: [I@1184f18f
Spark SQL Job 44 Stages: [I@3e504628
Spark SQL Job 43 Stages: [I@6d215c2c
Spark SQL Job 42 Stages: [I@578c777c
Spark SQL Job 41 Stages: [I@61b2fc8b
Spark SQL Job 40 Stages: [I@5d6f9793
Spark SQL Job 39 Stages: [I@3ac8702e
Spark SQL Job 38 Stages: [I@ecbc79c
Spark SQL Job 37 Stages: [I@786b246a
Spark SQL Job 36 Stages: [I@5919f7fb
Spark SQL Job 35 Stages: [I@20a9a5f6
Spark SQL Job 34 Stages: [I@708e7158
Spark SQL Job 33 Stages: [I@51e75357
Spark SQ

In [None]:
# Compare the execution times and high-level metrics
print("\nCPU Resource Utilization Comparison (Inferred):")
print(f"DataFrame:")
print(f"  Execution time: {df_time} seconds")
print(f"  Number of Jobs: {len(df_job_ids)}")
print(f"RDD:")
print(f"  Execution time: {rdd_time} seconds")
print(f"  Number of Jobs: {len(rdd_job_ids)}")
print(f"Spark SQL:")
print(f"  Execution time: {sql_time} seconds")
print(f"  Number of Jobs: {len(sql_job_ids)}")


CPU Resource Utilization Comparison (Inferred):
DataFrame:
  Execution time: 0.3625023365020752 seconds
  Number of Jobs: 43
RDD:
  Execution time: 3.868530750274658 seconds
  Number of Jobs: 44
Spark SQL:
  Execution time: 1.0876822471618652 seconds
  Number of Jobs: 46


In [None]:
# Compare the execution times
print("\nPerformance Comparison:")
print(f"DataFrame: {df_time} seconds")
print(f"RDD: {rdd_time} seconds")
print(f"Spark SQL: {sql_time} seconds")


Performance Comparison:
DataFrame: 1.3755507469177246 seconds
RDD: 5.448244094848633 seconds
Spark SQL: 1.1103417873382568 seconds


In [None]:
spark = SparkSession.builder \
    .appName("CPUResourceUtilizationAnalysis") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "file:/tmp/spark-events") \
    .getOrCreate()

In [None]:
import psutil
import time

start_time = time.time()
# Run your Spark operation
avg_los_df = df.groupBy("age_group").agg(avg("time_in_hospital").alias("avg_length_of_stay"))
avg_los_df.show()
end_time = time.time()

# Measure CPU usage
cpu_usage = psutil.cpu_percent(interval=(end_time - start_time))
print(f"CPU Usage: {cpu_usage}%")

+-----------+------------------+
|  age_group|avg_length_of_stay|
+-----------+------------------+
|     Senior| 4.373768956558516|
|Young Adult|3.4644171779141106|
|      Adult|3.8628294782140937|
|      Child|3.0354609929078014|
+-----------+------------------+

CPU Usage: 34.6%


In [None]:
# Save the CPU resource utilization comparison
with open("cpu_resource_utilization_comparison_revised.txt", "w") as f:
    f.write("CPU Resource Utilization Comparison (Inferred):\n")
    f.write("DataFrame:\n")
    f.write(f"  Execution time: {df_time} seconds\n")
    f.write(f"  Number of Jobs: {len(df_job_ids)}\n")
    f.write("RDD:\n")
    f.write(f"  Execution time: {rdd_time} seconds\n")
    f.write(f"  Number of Jobs: {len(rdd_job_ids)}\n")
    f.write("Spark SQL:\n")
    f.write(f"  Execution time: {sql_time} seconds\n")
    f.write(f"  Number of Jobs: {len(sql_job_ids)}\n")

# Download the file
from google.colab import files
files.download("cpu_resource_utilization_comparison_revised.txt")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Treatment effectiveness analysis'

In [None]:
# Compute readmission rate by age_group and diagnosis_complexity
readmission_rates_df = df.groupBy("age_group", "diagnosis_complexity") \
    .agg(avg("readmitted_index").alias("readmission_rate"))

# Show the results
readmission_rates_df.show()

+-----------+--------------------+-------------------+
|  age_group|diagnosis_complexity|   readmission_rate|
+-----------+--------------------+-------------------+
|Young Adult|                   0|              0.375|
|      Adult|                   0|0.21311475409836064|
|      Adult|                   1| 0.4390391806758077|
|     Senior|                   0|0.21568627450980393|
|Young Adult|                   1| 0.4518147684605757|
|     Senior|                   1|  0.473175819690717|
|      Child|                   1|0.37412095639943743|
|      Child|                   0| 0.1925925925925926|
+-----------+--------------------+-------------------+



In [None]:
# DataFrame approach
start_time_df = time.time()

readmission_rates_df = df.groupBy("age_group", "diagnosis_complexity") \
    .agg(avg("readmitted_index").alias("readmission_rate"))
readmission_rates_df.show()

end_time_df = time.time()
df_time = end_time_df - start_time_df
print(f"DataFrame execution time: {df_time} seconds")

# Collect high-level metrics
df_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"DataFrame Job IDs: {df_job_ids}")
for job_id in df_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"DataFrame Job {job_id} Stages: {stages}")

+-----------+--------------------+-------------------+
|  age_group|diagnosis_complexity|   readmission_rate|
+-----------+--------------------+-------------------+
|Young Adult|                   0|              0.375|
|      Adult|                   0|0.21311475409836064|
|      Adult|                   1| 0.4390391806758077|
|     Senior|                   0|0.21568627450980393|
|Young Adult|                   1| 0.4518147684605757|
|     Senior|                   1|  0.473175819690717|
|      Child|                   1|0.37412095639943743|
|      Child|                   0| 0.1925925925925926|
+-----------+--------------------+-------------------+

DataFrame execution time: 0.5062446594238281 seconds
DataFrame Job IDs: [51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
DataFrame Job 51 Stages: [I@2112d42
DataFrame Job 50 Stages: [I@33c2

In [None]:
# Convert DataFrame to RDD
rdd = df.rdd

# RDD approach
start_time_rdd = time.time()

# Map to (key, (readmitted_index, count)) where key is (age_group, diagnosis_complexity)
rdd_mapped = rdd.map(lambda row: (
    (row["age_group"], row["diagnosis_complexity"]),
    (row["readmitted_index"], 1)
))

# Reduce by key to sum the readmitted_index and counts
rdd_reduced = rdd_mapped.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Compute the readmission rate (sum of readmitted_index / count)
rdd_rates = rdd_reduced.mapValues(lambda x: x[0] / x[1])

# Collect and print the results
print("RDD Results (age_group, diagnosis_complexity, readmission_rate):")
for (age_group, diag_complexity), rate in rdd_rates.collect():
    print(f"({age_group}, {diag_complexity}): {rate}")

end_time_rdd = time.time()
rdd_time = end_time_rdd - start_time_rdd
print(f"RDD execution time: {rdd_time} seconds")

# Collect high-level metrics
rdd_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"RDD Job IDs: {rdd_job_ids}")
for job_id in rdd_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"RDD Job {job_id} Stages: {stages}")

RDD Results (age_group, diagnosis_complexity, readmission_rate):
(Child, 1): 0.37412095639943743
(Young Adult, 1): 0.4518147684605757
(Adult, 1): 0.4390391806758077
(Senior, 1): 0.473175819690717
(Child, 0): 0.1925925925925926
(Adult, 0): 0.21311475409836064
(Young Adult, 0): 0.375
(Senior, 0): 0.21568627450980393
RDD execution time: 4.2259907722473145 seconds
RDD Job IDs: [52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
RDD Job 52 Stages: [I@6e8c1c3c
RDD Job 51 Stages: [I@4f2b0f7e
RDD Job 50 Stages: [I@64b4037a
RDD Job 49 Stages: [I@3e5603ae
RDD Job 48 Stages: [I@30b585a4
RDD Job 47 Stages: [I@27722c33
RDD Job 46 Stages: [I@63994474
RDD Job 45 Stages: [I@6ddc60da
RDD Job 44 Stages: [I@71d84060
RDD Job 43 Stages: [I@47fde9dc
RDD Job 42 Stages: [I@6e6ebfb3
RDD Job 41 Stages: [I@60bad814
RDD Job 40 Stages: [I@53d24f23
RDD Job 39 Stages:

In [None]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("healthcare")

# Spark SQL approach
start_time_sql = time.time()

readmission_rates_sql = spark.sql("""
    SELECT
        age_group,
        diagnosis_complexity,
        AVG(readmitted_index) as readmission_rate
    FROM healthcare
    GROUP BY age_group, diagnosis_complexity
""")
readmission_rates_sql.show()

end_time_sql = time.time()
sql_time = end_time_sql - start_time_sql
print(f"Spark SQL execution time: {sql_time} seconds")

# Collect high-level metrics
sql_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"Spark SQL Job IDs: {sql_job_ids}")
for job_id in sql_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"Spark SQL Job {job_id} Stages: {stages}")

+-----------+--------------------+-------------------+
|  age_group|diagnosis_complexity|   readmission_rate|
+-----------+--------------------+-------------------+
|Young Adult|                   0|              0.375|
|      Adult|                   0|0.21311475409836064|
|      Adult|                   1| 0.4390391806758077|
|     Senior|                   0|0.21568627450980393|
|Young Adult|                   1| 0.4518147684605757|
|     Senior|                   1|  0.473175819690717|
|      Child|                   1|0.37412095639943743|
|      Child|                   0| 0.1925925925925926|
+-----------+--------------------+-------------------+

Spark SQL execution time: 0.5531532764434814 seconds
Spark SQL Job IDs: [54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Spark SQL Job 54 Stages: [I@24db3288
Spark SQL Job 53 St

In [None]:
# Compare the execution times and high-level metrics
print("\nPerformance and CPU Resource Utilization Comparison (Inferred):")
print(f"DataFrame:")
print(f"  Execution time: {df_time} seconds")
print(f"  Number of Jobs: {len(df_job_ids)}")
print(f"RDD:")
print(f"  Execution time: {rdd_time} seconds")
print(f"  Number of Jobs: {len(rdd_job_ids)}")
print(f"Spark SQL:")
print(f"  Execution time: {sql_time} seconds")
print(f"  Number of Jobs: {len(sql_job_ids)}")


Performance and CPU Resource Utilization Comparison (Inferred):
DataFrame:
  Execution time: 0.5062446594238281 seconds
  Number of Jobs: 52
RDD:
  Execution time: 4.2259907722473145 seconds
  Number of Jobs: 53
Spark SQL:
  Execution time: 0.5531532764434814 seconds
  Number of Jobs: 55


In [None]:
# Save the treatment effectiveness results as a CSV file
readmission_rates_pandas = readmission_rates_df.toPandas()
readmission_rates_pandas.to_csv("treatment_effectiveness_results.csv", index=False)

# Save the performance and CPU utilization comparison
with open("performance_comparison_treatment_effectiveness.txt", "w") as f:
    f.write("Performance and CPU Resource Utilization Comparison (Inferred):\n")
    f.write("DataFrame:\n")
    f.write(f"  Execution time: {df_time} seconds\n")
    f.write(f"  Number of Jobs: {len(df_job_ids)}\n")
    f.write("RDD:\n")
    f.write(f"  Execution time: {rdd_time} seconds\n")
    f.write(f"  Number of Jobs: {len(rdd_job_ids)}\n")
    f.write("Spark SQL:\n")
    f.write(f"  Execution time: {sql_time} seconds\n")
    f.write(f"  Number of Jobs: {len(sql_job_ids)}\n")

# Download the files
from google.colab import files
files.download("treatment_effectiveness_results.csv")
files.download("performance_comparison_treatment_effectiveness.txt")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Cost analysis

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CostAnalysisPerDiagnosis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load the transformed and engineered dataset
data_path = "/content/transformed_engineered_healthcare_dataset.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Cache the DataFrame to ensure fair comparison
df.cache()

# Show the first few rows to confirm
df.show(5)

+---+---------------+------+---+------+-------+-------+----------------+--------------+------------------+---------------+----------+---------+---------------+------------------+----------------+-----------+----------------+---------+--------------------+-------------+--------------+-------------------+
|_c0|           race|gender|age|diag_1| diag_2| diag_3|time_in_hospital|num_procedures|num_lab_procedures|num_medications|readmitted|     cost|total_diagnoses|      cost_per_day|total_procedures|  age_group|readmitted_index|high_cost|diagnosis_complexity| race_encoded|gender_encoded|procedure_intensity|
+---+---------------+------+---+------+-------+-------+----------------+--------------+------------------+---------------+----------+---------+---------------+------------------+----------------+-----------+----------------+---------+--------------------+-------------+--------------+-------------------+
|  0|      Caucasian|Female|  5|250.83|Unknown|Unknown|               1|             

In [None]:
# Filter out "Unknown" diagnoses in diag_1
df_filtered = df.filter(df.diag_1 != "Unknown")

# Compute average cost and cost_per_day by diag_1
cost_analysis_df = df_filtered.groupBy("diag_1") \
    .agg(
        avg("cost").alias("avg_cost"),
        avg("cost_per_day").alias("avg_cost_per_day")
    )

# Show the results
cost_analysis_df.show()

+------+------------------+------------------+
|diag_1|          avg_cost|  avg_cost_per_day|
+------+------------------+------------------+
|   296| 8254.299532094592|1734.4732701104278|
|   691|           9627.95| 2406.987548828125|
|   451| 7186.479292307694|2003.6451126266313|
|   853| 8121.248588235294|1988.0018774892771|
|   800| 7500.823850000001| 2994.236695568971|
|   944|         7445.5077|2390.7143798828124|
|   870|         12886.043|    2147.673828125|
|250.01| 6395.559413114754| 3323.097402237022|
|   447| 7381.865275409836| 4029.495384655848|
|   591|10003.633944444444|3302.7969339037695|
|     7|        10129.7685|1648.6318150111606|
|   574| 9507.487448762111| 2935.114012225516|
|   475| 6787.018499999999|3070.6743873232886|
|   718| 6414.497194117647| 3025.173179476869|
|   307|       9479.350428| 2204.928088902146|
|   577|  8830.82256215686|2484.5096843762667|
|   581| 8641.357894736842| 2985.772487268791|
|   205|     10326.9765625|2529.3746375483465|
|   747|11122

In [None]:
# DataFrame approach
start_time_df = time.time()

df_filtered = df.filter(df.diag_1 != "Unknown")
cost_analysis_df = df_filtered.groupBy("diag_1") \
    .agg(
        avg("cost").alias("avg_cost"),
        avg("cost_per_day").alias("avg_cost_per_day")
    )
cost_analysis_df.show()

end_time_df = time.time()
df_time = end_time_df - start_time_df
print(f"DataFrame execution time: {df_time} seconds")

# Collect high-level metrics
df_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"DataFrame Job IDs: {df_job_ids}")
for job_id in df_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"DataFrame Job {job_id} Stages: {stages}")

+------+------------------+------------------+
|diag_1|          avg_cost|  avg_cost_per_day|
+------+------------------+------------------+
|   296| 8254.299532094592|1734.4732701104278|
|   691|           9627.95| 2406.987548828125|
|   451| 7186.479292307694|2003.6451126266313|
|   853| 8121.248588235294|1988.0018774892771|
|   800| 7500.823850000001| 2994.236695568971|
|   944|         7445.5077|2390.7143798828124|
|   870|         12886.043|    2147.673828125|
|250.01| 6395.559413114754| 3323.097402237022|
|   447| 7381.865275409836| 4029.495384655848|
|   591|10003.633944444444|3302.7969339037695|
|     7|        10129.7685|1648.6318150111606|
|   574| 9507.487448762111| 2935.114012225516|
|   475| 6787.018499999999|3070.6743873232886|
|   718| 6414.497194117647| 3025.173179476869|
|   307|       9479.350428| 2204.928088902146|
|   577|  8830.82256215686|2484.5096843762667|
|   581| 8641.357894736842| 2985.772487268791|
|   205|     10326.9765625|2529.3746375483465|
|   747|11122

In [None]:
# Convert DataFrame to RDD
rdd = df.rdd

# RDD approach
start_time_rdd = time.time()

# Filter out "Unknown" diagnoses and map to (diag_1, (cost, cost_per_day, count))
rdd_filtered = rdd.filter(lambda row: row["diag_1"] != "Unknown")
rdd_mapped = rdd_filtered.map(lambda row: (
    row["diag_1"],
    (row["cost"], row["cost_per_day"], 1)
))

# Reduce by key to sum the costs and counts
rdd_reduced = rdd_mapped.reduceByKey(lambda x, y: (
    x[0] + y[0],  # Sum cost
    x[1] + y[1],  # Sum cost_per_day
    x[2] + y[2]   # Sum count
))

# Compute averages
rdd_averages = rdd_reduced.mapValues(lambda x: (
    x[0] / x[2],  # avg_cost
    x[1] / x[2]   # avg_cost_per_day
))

# Collect and print the results
print("RDD Results (diag_1, avg_cost, avg_cost_per_day):")
for diag_1, (avg_cost, avg_cost_per_day) in rdd_averages.collect():
    print(f"{diag_1}: ({avg_cost}, {avg_cost_per_day})")

end_time_rdd = time.time()
rdd_time = end_time_rdd - start_time_rdd
print(f"RDD execution time: {rdd_time} seconds")

# Collect high-level metrics
rdd_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"RDD Job IDs: {rdd_job_ids}")
for job_id in rdd_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"RDD Job {job_id} Stages: {stages}")

RDD Results (diag_1, avg_cost, avg_cost_per_day):
250.83: (8085.4931129032275, 2764.7453813689913)
276: (7615.07194917421, 2729.8194084773463)
648: (7042.071391929826, 2824.2511173421017)
414: (10008.696631107081, 4969.686801584133)
434: (8407.559350527363, 2223.7483319691232)
250.7: (10212.064025031137, 1964.1593367622277)
157: (10136.946108823528, 2315.953737117396)
518: (9783.864436479844, 2405.2474238527698)
410: (10589.532829746011, 3323.9666264289604)
737: (11231.153375000002, 2288.877144368489)
189: (8697.772595041324, 1957.320505924663)
786: (6717.265535191206, 4019.1886682414947)
427: (7911.089234756989, 3068.9897975817917)
996: (9284.862485851085, 3075.7297324206775)
277: (8297.325863636363, 2030.1748065999952)
462: (7455.557581818182, 3923.99384765625)
411: (7594.234126530612, 4206.257357866328)
486: (8492.819261320217, 2123.214940703334)
511: (9142.58688156863, 2721.462600750631)
432: (9668.3477247191, 2026.010659955754)
626: (6805.593230379747, 3683.126355341014)
295: (811

In [None]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("healthcare")

# Spark SQL approach
start_time_sql = time.time()

cost_analysis_sql = spark.sql("""
    SELECT
        diag_1,
        AVG(cost) as avg_cost,
        AVG(cost_per_day) as avg_cost_per_day
    FROM healthcare
    WHERE diag_1 != 'Unknown'
    GROUP BY diag_1
""")
cost_analysis_sql.show()

end_time_sql = time.time()
sql_time = end_time_sql - start_time_sql
print(f"Spark SQL execution time: {sql_time} seconds")

# Collect high-level metrics
sql_job_ids = spark.sparkContext.statusTracker().getJobIdsForGroup()
print(f"Spark SQL Job IDs: {sql_job_ids}")
for job_id in sql_job_ids:
    stages = spark.sparkContext.statusTracker().getJobInfo(job_id).stageIds
    print(f"Spark SQL Job {job_id} Stages: {stages}")

+------+------------------+------------------+
|diag_1|          avg_cost|  avg_cost_per_day|
+------+------------------+------------------+
|   296| 8254.299532094592|1734.4732701104278|
|   691|           9627.95| 2406.987548828125|
|   451| 7186.479292307694|2003.6451126266313|
|   853| 8121.248588235294|1988.0018774892771|
|   800| 7500.823850000001| 2994.236695568971|
|   944|         7445.5077|2390.7143798828124|
|   870|         12886.043|    2147.673828125|
|250.01| 6395.559413114754| 3323.097402237022|
|   447| 7381.865275409836| 4029.495384655848|
|   591|10003.633944444444|3302.7969339037695|
|     7|        10129.7685|1648.6318150111606|
|   574| 9507.487448762111| 2935.114012225516|
|   475| 6787.018499999999|3070.6743873232886|
|   718| 6414.497194117647| 3025.173179476869|
|   307|       9479.350428| 2204.928088902146|
|   577|  8830.82256215686|2484.5096843762667|
|   581| 8641.357894736842| 2985.772487268791|
|   205|     10326.9765625|2529.3746375483465|
|   747|11122

In [None]:
# Compare the execution times and high-level metrics
print("\nPerformance and CPU Resource Utilization Comparison (Inferred):")
print(f"DataFrame:")
print(f"  Execution time: {df_time} seconds")
print(f"  Number of Jobs: {len(df_job_ids)}")
print(f"RDD:")
print(f"  Execution time: {rdd_time} seconds")
print(f"  Number of Jobs: {len(rdd_job_ids)}")
print(f"Spark SQL:")
print(f"  Execution time: {sql_time} seconds")
print(f"  Number of Jobs: {len(sql_job_ids)}")


Performance and CPU Resource Utilization Comparison (Inferred):
DataFrame:
  Execution time: 0.6558525562286377 seconds
  Number of Jobs: 64
RDD:
  Execution time: 2.958327531814575 seconds
  Number of Jobs: 65
Spark SQL:
  Execution time: 0.9662423133850098 seconds
  Number of Jobs: 67


In [None]:
# Save the cost analysis results as a CSV file
cost_analysis_pandas = cost_analysis_df.toPandas()
cost_analysis_pandas.to_csv("cost_analysis_per_diagnosis_results.csv", index=False)

# Save the performance and CPU utilization comparison
with open("performance_comparison_cost_analysis.txt", "w") as f:
    f.write("Performance and CPU Resource Utilization Comparison (Inferred):\n")
    f.write("DataFrame:\n")
    f.write(f"  Execution time: {df_time} seconds\n")
    f.write(f"  Number of Jobs: {len(df_job_ids)}\n")
    f.write("RDD:\n")
    f.write(f"  Execution time: {rdd_time} seconds\n")
    f.write(f"  Number of Jobs: {len(rdd_job_ids)}\n")
    f.write("Spark SQL:\n")
    f.write(f"  Execution time: {sql_time} seconds\n")
    f.write(f"  Number of Jobs: {len(sql_job_ids)}\n")

# Download the files
from google.colab import files
files.download("cost_analysis_per_diagnosis_results.csv")
files.download("performance_comparison_cost_analysis.txt")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

visualizing