In [None]:
%load_ext autoreload
%autoreload 2
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.window import Window
import utils
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, count
from pyspark.sql.types import ArrayType, StringType, BooleanType, IntegerType, DoubleType

LABEL = 'LOS'

In [None]:
spark = SparkSession.builder \
    .appName("Intensive Care Unit Data Analysis") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.memory.fraction", "0.9") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# check spark configs to only errors:

spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "500")
spark.conf.set("spark.sql.debug.maxToStringFields", "1000")

In [None]:
# Read the datasets
df_admissions = spark.read.csv("datasets/ADMISSIONS.csv", header=True, inferSchema=True).drop("ROW_ID")
df_diagnoses = spark.read.csv("datasets/DIAGNOSES_ICD.csv", header=True, inferSchema=True).drop("ROW_ID")
df_icustays = spark.read.csv("datasets/ICUSTAYS.csv", header=True, inferSchema=True).drop("ROW_ID")
df_patients = spark.read.csv("datasets/PATIENTS.csv", header=True, inferSchema=True).drop("ROW_ID")
# df_chartevents = spark.read.csv("datasets/CHARTEVENTS.csv", header=True, inferSchema=True).drop("ROW_ID")

In [None]:
df = df_patients.join(df_admissions, df_patients["SUBJECT_ID"] == df_admissions["SUBJECT_ID"], how="left").drop(df_admissions["SUBJECT_ID"])
df = df.join(df_icustays, df["HADM_ID"] == df_icustays["HADM_ID"], how="left").drop(df_icustays["SUBJECT_ID"]).drop(df_icustays["HADM_ID"])
df = df.join(df_diagnoses, df["SUBJECT_ID"] == df_diagnoses["SUBJECT_ID"], how="left").drop(df_diagnoses["SUBJECT_ID"]).drop(df_diagnoses["HADM_ID"])

In [None]:
df.show()

Check for missing values in the dataset.

In [None]:
utils.print_missing_value_counts(df)

# Feature Engineering

Let's create a column called 'AGE'

In [None]:
# Ensure 'DOB' and 'ADMITTIME' are in the correct date format if not already
df = df.withColumn('DOB', F.to_date('DOB'))
df = df.withColumn('ADMITTIME', F.to_date('ADMITTIME'))

# Create the 'AGE' column by calculating the difference in years between 'ADMITTIME' and 'DOB'
df = df.withColumn('AGE', F.expr("floor(months_between(ADMITTIME, DOB) / 12)"))

It only matters if the patient is dead or not, not the date of death, so we can drop the date of death columns. And we also drop other death-related columns.

In [None]:
df=df.drop("DOB").drop("DOD").drop("DOD_SSN").drop("EXPIRE_FLAG").drop("DEATHTIME")
df = df.withColumn("DOD_HOSP", F.when(F.col("DOD_HOSP").isNull(), 0).otherwise(1))
df = df.withColumnRenamed("DOD_HOSP", "DIED")
df = df.withColumnRenamed("ICD9_CODE","DISEASES_CODE")

Here we will drop the columns that are not useful for the analysis.

In [None]:
columns_to_remove = [
    "ADMITTIME", "DISCHTIME", "EDREGTIME", "EDOUTTIME", "HOSPITAL_EXPIRE_FLAG",
    "INTIME", "OUTTIME","LANGUAGE","DISCHARGE_LOCATION",
    "ICUSTAY_ID", "SEQ_NUM","HAS_CHARTEVENTS_DATA","DBSOURCE"
]

df = df.drop(*columns_to_remove)

The ethnicity column has too many unique values. We can group them into a few categories.

In [None]:
# Define the transformation using multiple chained when conditions
df = df.withColumn("ETHNICITY",
    F.when(F.col("ETHNICITY").isin('AMERICAN INDIAN/ALASKA NATIVE', 'AMERICAN INDIAN/ALASKA NATIVE FEDERALLY RECOGNIZED TRIBE'),
           'American Indian/Alaska Native')
    .when(F.col("ETHNICITY").isin('ASIAN', 'ASIAN - ASIAN INDIAN', 'ASIAN - CAMBODIAN', 'ASIAN - CHINESE', 'ASIAN - FILIPINO', 'ASIAN - JAPANESE', 'ASIAN - KOREAN', 'ASIAN - OTHER', 'ASIAN - THAI', 'ASIAN - VIETNAMESE'),
          'Asian')
    .when(F.col("ETHNICITY").isin('BLACK/AFRICAN', 'BLACK/AFRICAN AMERICAN', 'BLACK/CAPE VERDEAN', 'BLACK/HAITIAN'),
          'Black')
    .when(F.col("ETHNICITY").isin('HISPANIC OR LATINO', 'HISPANIC/LATINO - CENTRAL AMERICAN (OTHER)', 'HISPANIC/LATINO - COLOMBIAN', 'HISPANIC/LATINO - CUBAN', 'HISPANIC/LATINO - DOMINICAN', 'HISPANIC/LATINO - GUATEMALAN', 'HISPANIC/LATINO - HONDURAN', 'HISPANIC/LATINO - MEXICAN', 'HISPANIC/LATINO - PUERTO RICAN', 'HISPANIC/LATINO - SALVADORAN'),
          'Hispanic/Latino')
    .when(F.col("ETHNICITY").isin('MIDDLE EASTERN'),
          'Middle Eastern')
    .when(F.col("ETHNICITY").isin('NATIVE HAWAIIAN OR OTHER PACIFIC ISLANDER'),
          'Pacific Islander')
    .when(F.col("ETHNICITY").isin('WHITE', 'WHITE - BRAZILIAN', 'WHITE - EASTERN EUROPEAN', 'WHITE - OTHER EUROPEAN', 'WHITE - RUSSIAN', 'PORTUGUESE'),
          'White')
    .when(F.col("ETHNICITY").isin('CARIBBEAN ISLAND', 'SOUTH AMERICAN'),
          'Caribbean/South American')
    .when(F.col("ETHNICITY").isin('MULTI RACE ETHNICITY'),
          'Multi-Race')
    .when(F.col("ETHNICITY").isin('OTHER'),
          'Other')
    .otherwise('NO DATA REGISTERED')
)

Now we can aggregate the data by primary keys and collect the remaining columns into lists.

In [None]:
# Define your primary key columns
primary_key_columns = ["SUBJECT_ID", "HADM_ID"]

# Identify the remaining columns to be grouped
remaining_columns = [col for col in df.columns if col not in primary_key_columns]

# Group by the primary key columns and aggregate the remaining columns into lists
df = df.groupBy(primary_key_columns).agg(*(F.collect_list(col).alias(col) for col in remaining_columns))



Now we can transform the columns that contain lists of values. If all values in the list are the same, we can replace the list with a single value. If the list is empty, we can replace it with a default value.

In [None]:
df.show()

In [None]:
replace_empty_list_udf = udf(utils.replace_empty_list, ArrayType(StringType()))
transform_list_udf = udf(utils.transform_list, ArrayType(StringType()))
handle_list_udf = udf(utils.handle_list, StringType())
empty_list_udf = F.udf(lambda col: col == [], BooleanType())

for column in df.columns:    
    if isinstance(df.schema[column].dataType, ArrayType):
        if df.filter(empty_list_udf(F.col(column))).count() > 0:
            df = df.withColumn(column, replace_empty_list_udf(col(column)))
            
        df = df.withColumn(column, transform_list_udf(col(column)))
        if len(set(df.select(column).first()[0])) == 1:
            df = df.withColumn(column, handle_list_udf(df[column]))

Check for missing values again.

In [None]:
utils.print_missing_value_counts(df)

In [None]:
df.show()

Now let's analyze the diseases column. We can explode the column and calculate the mean length of stay for each disease code. Then we can rank the diseases for each patient and keep the top-ranked one.

In [None]:
# Explode the DISEASES_CODE column
exploded_df = df.withColumn("DISEASES_CODE", F.explode(col("DISEASES_CODE")))

# Calculate mean LOS for each disease code
disease_mean_los = exploded_df.groupBy("DISEASES_CODE").agg(F.mean("LOS").alias("mean_LOS"))

# Join the mean LOS back to the exploded dataframe
joined_df = exploded_df.join(disease_mean_los, on="DISEASES_CODE", how="left")

# Define window specification
window_spec = Window.partitionBy("SUBJECT_ID", "HADM_ID").orderBy(col("mean_LOS").desc())

# Rank disease codes for each patient and filter to keep the top-ranked one
most_influential_disease_df = joined_df.withColumn("rank", F.row_number().over(window_spec)).filter(col("rank") == 1)

# Select the relevant columns
result_df = most_influential_disease_df.select("SUBJECT_ID", "HADM_ID", "DISEASES_CODE")

# Ensure DISEASES_CODE is a string in both DataFrames
df = df.withColumn("DISEASES_CODE", col("DISEASES_CODE").cast("string"))
result_df = result_df.withColumn("DISEASES_CODE", col("DISEASES_CODE").cast("string"))

# Drop the DISEASES_CODE column from df before joining
df = df.drop("DISEASES_CODE")

# Join df with result_df on SUBJECT_ID and HADM_ID
updated_df = df.join(result_df, on=['SUBJECT_ID', 'HADM_ID'], how='left')

# Add the updated DISEASES_CODE column from result_df
df = updated_df.withColumn(
    "DISEASES_CODE",
    F.coalesce(result_df["DISEASES_CODE"], col("DISEASES_CODE"))
)

df = df.withColumnRenamed("DISEASES_CODE", "MOST_IMPORTANT_DISEASE_CODE")

In [None]:
# Show the updated DataFrame
df.show()

# Graphical Analysis

In [None]:
# Assuming `df` is your PySpark DataFrame
df = df.withColumn("DIED", df["DIED"].cast("integer"))

# Plotting death counts by gender
utils.plot_graph(df, 'GENDER', 'DIED', F.sum, 'Death Counts by Gender', 'Gender', 'Number of Deaths')

## Unique Value Count of marital status

In [None]:
utils.plot_graph(df, 'MARITAL_STATUS', 'MARITAL_STATUS', F.count, 'Count of Marital Status', 'Marital Status', 'Count')

## Unique Value Count of Admission Types

In [None]:
utils.plot_graph(df, 'ADMISSION_TYPE', 'ADMISSION_TYPE', F.count, 'Count of Admission Types', 'Admission Type', 'Count')

## Distribution of Lethality by Religion

The 1º and 2º deadliest religions are not shown in the graph because they are outliers.

In [None]:
# Calculate death rate per religion
death_rate_per_religion = df.groupBy('RELIGION')\
    .agg(
        F.sum('DIED').alias('Deaths'), 
        F.count('DIED').alias('Total')
    )\
    .withColumn('Death Rate', (F.col('Deaths') / F.col('Total')) * 100)\
    .orderBy(F.col('Death Rate').desc())

religions = death_rate_per_religion.rdd.zipWithIndex().filter(lambda x: x[1] in [2,3,4,5,6,7]).map(lambda x: x[0]).toDF()

utils.plot_graph(religions, 'RELIGION', 'Death Rate', F.first, 'Death Rates for Religions Ranked', 'Religion', 'Death Rate (%)')

## Distribution of Lethality by Ethnicity

In [None]:
filtered_df = df.filter(df['ETHNICITY'] != 'NO DATA REGISTERED')

# Calculate death rate per ethnicity
death_rate_per_ethnicity = filtered_df.groupBy('ETHNICITY')\
    .agg(
        F.sum('DIED').alias('Deaths'), 
        F.count('DIED').alias('Total')
    )\
    .withColumn('Death Rate', (F.col('Deaths') / F.col('Total')) * 100)\
    .orderBy(F.col('Death Rate').desc())\
    .limit(5)

utils.plot_graph(death_rate_per_ethnicity, 'ETHNICITY', 'Death Rate', F.first, 'Top 5 Deadliest Ethnicities by Death Rate', 'Ethnicity', 'Death Rate (%)')

## Distribution of Length of Stay by Age

# Pre processing

Let's convert the categorical columns to numerical columns. And let's get the categorical and numerical columns names.

In [None]:
all_columns = df.columns
feature_columns = [col for col in all_columns if col not in ['SUBJECT_ID', LABEL]]

# Manually specified categorical and numerical features
categorical_features = ['ADMISSION_TYPE', 'ADMISSION_LOCATION', 'INSURANCE', 'RELIGION',
                        'MARITAL_STATUS', 'ETHNICITY', 'DIAGNOSIS', 'FIRST_CAREUNIT', 'LAST_CAREUNIT', 'MOST_IMPORTANT_DISEASE_CODE']
numerical_features = [col for col in feature_columns if col not in categorical_features and col not in ['SUBJECT_ID', LABEL]]

# Transform the 'M' and 'F' values to 0 and 1 respectively
df = df.withColumn("GENDER", F.when(col("GENDER") == "M", 0).otherwise(1))
# Cast string numerical features to float
df = df.withColumn("AGE", col("AGE").cast("float"))
df = df.withColumn("FIRST_WARDID", col("FIRST_WARDID").cast("float"))
df = df.withColumn("LAST_WARDID", col("LAST_WARDID").cast("float"))
df = df.withColumn("LOS", col("LOS").cast("float"))

df.show()

In [None]:
# Stages in the pipeline
stages = []

# Indexing and encoding categorical features
for categoricalCol in categorical_features:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "ClassVec"])
    stages += [stringIndexer, encoder]

# Assemble all the features along with the encoded categorical features
assemblerInputs = [c + "ClassVec" for c in categorical_features] + numerical_features
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid="keep")
stages += [assembler]

# Pipeline: This will ensure all stages are applied in sequence
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
df_transformed = pipelineModel.transform(df)

Here we are making sure that there isn't simultaneously a patient in the test set and train set.

In [None]:
# Assume subject_ids have been collected as before
subject_ids = [row['SUBJECT_ID'] for row in df_transformed.select("SUBJECT_ID").distinct().collect()]
split_index = int(len(subject_ids) * 0.8)

train_ids = set(subject_ids[:split_index])
test_ids = set(subject_ids[split_index:])

# Directly filter the DataFrame using the list
train_df = df_transformed.filter(F.col("SUBJECT_ID").isin(train_ids))
test_df = df_transformed.filter(F.col("SUBJECT_ID").isin(test_ids))

# Ensure that both train and test data have non-null labels
train_df = train_df.filter(train_df[LABEL].isNotNull())
test_df = test_df.filter(test_df[LABEL].isNotNull())

# Split the test_df into features and labels
X_test = test_df.drop(LABEL)
y_test = test_df.select(LABEL)

# Prediction

## Linear Regression

In [None]:
param = {
    'maxIter': 10,
    'regParam': 0.3,
    'elasticNetParam': 0.8
}

# Define and fit the Linear Regression model on the training set
lr = LinearRegression(featuresCol='features', labelCol=LABEL, **param)
lr_model = lr.fit(train_df)

In [None]:
y_pred = lr_model.transform(X_test)
print(f"Linear Regression - Mean Residual: {utils.mean_residuals(y_pred,y_test)}")

## Random Forest Regression

In [None]:
param = {
    'maxDepth': 5,
    'maxBins': 32,
    'minInstancesPerNode': 1,
    'minInfoGain': 0.0,
    'maxMemoryInMB': 256,
    'cacheNodeIds': False,
    'checkpointInterval': 10,
    'impurity': 'variance',
    'featureSubsetStrategy': 'auto',
    'subsamplingRate': 1.0,
    'seed': None,
    'numTrees': 20,
}

# Define and fit the Random Forest model on the training set
rf = RandomForestRegressor(featuresCol='features', labelCol=LABEL, **param)
rf_model = rf.fit(train_df)

In [None]:
y_pred = rf_model.transform(X_test)
print(f"Random Forest - Mean Residual: {utils.mean_residuals(y_pred,y_test)}")