# Group 04 Model Building: Logistic Regression

-------------------------
Amber Curran (akc6be)

Manpreet Dhindsa (mkd8bb)

Quinton Mays (rub9ez)

---------------------------


## Load Data

To begin we create our Spark Session.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("group04model") \
        .getOrCreate()

To reduce errors and runtime with Spark inference of the schema and without hard-coding the column types we use the supplied data dictionary to set the column types.

In [2]:
import pandas as pd

In [3]:
schemadf = pd.read_csv('/project/ds5559/fa21-group04/data/WiDS_Datathon_2020_Dictionary.csv')
schemadf = schemadf[schemadf['Variable Name'] != 'icu_admit_type'] # added to address error in data dictionary

In [4]:
from pyspark.sql.types import *
# define a python dictionary to map the data types from the data dictionary to the column types in spark schema.
data_types = {
    'integer': IntegerType(),
    'binary': IntegerType(),
    'numeric': FloatType(),
    'string': StringType()
}

In [5]:
# define a pyspark schema for the dataframe
schema = StructType(
    [
        StructField(row['Variable Name'] , 
                    data_types[row['Data Type']], 
                    True) for index, row in schemadf.iterrows()
    ]
)

In [6]:
# read in the data using the schema
df = spark.read.format('csv') \
    .schema(schema) \
    .option('header', True) \
    .load('/project/ds5559/fa21-group04/data/training_v2.csv')

After reviewing the data manually, it was identified that `bmi` was type Float, not String as the data dictionary indicated, so this change was manually coded.

In [7]:
df = df.withColumn("bmi",df.bmi.cast(FloatType()))

Next, the columns that contained more than 50% missing data were removed from the dataset. Note that the dataframe was selected from its own columns excluding the columns to drop instead of dropping the columns, as this coerced all columns to type string as a byproduct.

In [8]:
import pyspark.sql.functions as F
null_counts = df.select([((F.count(F.when(F.isnan(c) | F.col(c).isNull() | F.col(c).contains('NA'), c)))*100/df.count()).alias(c)  for c in df.columns]).collect()[0].asDict()
to_drop = [k for k, v in null_counts.items() if v > 50]

Based on the data dictionary, the columns labeled in Category "APACHE covariate" are all factors in the variables `apache_4a_icu_death_prob` and `apache_4a_hospital_death_prob` and therefore we can remove.

In [9]:
apachelist = ['albumin_apache', 'apache_2_diagnosis', 'apache_3j_diagnosis', 'apache_post_operative', 'arf_apache', 
                'bilirubin_apache', 'bun_apache', 'creatinine_apache', 'fio2_apache', 'gcs_eyes_apache', 'gcs_motor_apache', 
                'gcs_unable_apache', 'gcs_verbal_apache', 'glucose_apache', 'heart_rate_apache', 'hematocrit_apache', 
                'intubated_apache', 'map_apache', 'paco2_apache', 'paco2_for_ph_apache', 'pao2_apache', 'ph_apache', 
                'resprate_apache', 'sodium_apache', 'temp_apache', 'urineoutput_apache', 'ventilated_apache', 'wbc_apache']
to_drop = to_drop + apachelist

Based on the data dictionary, the columns labeled in Category indicating "noninvasive" are removed as they are included in the equivalent measurement which are measured either invasive or non invasive.

In [10]:
noninvasivelist = ['d1_diasbp_noninvasive_max', 'd1_diasbp_noninvasive_min', 'd1_mbp_noninvasive_max', 'd1_mbp_noninvasive_min', 
                'd1_sysbp_noninvasive_max', 'd1_sysbp_noninvasive_min', 'h1_diasbp_noninvasive_max', 'h1_diasbp_noninvasive_min',
                'h1_mbp_noninvasive_max', 'h1_mbp_noninvasive_min', 'h1_sysbp_noninvasive_max', 'h1_sysbp_noninvasive_min']
to_drop = to_drop + noninvasivelist

Next, we examined if the columns `patient_id` and `encounter_id` provided any information, or simply serve as a row_id

In [11]:
df.select('patient_id').distinct().count()

91713

In [12]:
df.select('encounter_id').distinct().count()

91713

In [13]:
df.count()

91713

As the number of distinct `patient_id` and `encounter_id`'s is equal to the number of rows in the dataset, we drop these columns:

In [14]:
to_drop = to_drop + ['encounter_id', 'patient_id']

In [15]:
df = df.select([col for col in df.columns if col not in to_drop])

Next, all missing values in the string columns are converted to `None` type so that they can be later be imputed or dropped as necessary with ease.

In [55]:
df = df.replace(to_replace='NA',
                value=None)

The dataframe is then persisted for faster processing.

In [56]:
df.persist()

DataFrame[hospital_id: int, hospital_death: int, age: float, bmi: float, elective_surgery: int, ethnicity: string, gender: string, height: float, hospital_admit_source: string, icu_admit_source: string, icu_id: int, icu_stay_type: string, icu_type: string, pre_icu_los_days: float, readmission_status: int, weight: float, d1_diasbp_max: float, d1_diasbp_min: float, d1_heartrate_max: float, d1_heartrate_min: float, d1_mbp_max: float, d1_mbp_min: float, d1_resprate_max: float, d1_resprate_min: float, d1_spo2_max: float, d1_spo2_min: float, d1_sysbp_max: float, d1_sysbp_min: float, d1_temp_max: float, d1_temp_min: float, h1_diasbp_max: float, h1_diasbp_min: float, h1_heartrate_max: float, h1_heartrate_min: float, h1_mbp_max: float, h1_mbp_min: float, h1_resprate_max: float, h1_resprate_min: float, h1_spo2_max: float, h1_spo2_min: float, h1_sysbp_max: float, h1_sysbp_min: float, h1_temp_max: float, h1_temp_min: float, d1_bun_max: float, d1_bun_min: float, d1_calcium_max: float, d1_calcium_mi

## Data Exploration

After selecting the columns to include in the dataset, the data must now be cleaned. Where above we dropped columns with more than 50% missing values, we must now deal with the remaining columns which may still contain NA values.

To begin, it is worthwhile to explore the reamining dataframe columns:

In [17]:
df_summary = df.summary()

In [18]:
# define a dictionary of variable names to original datatype for processing pipleines and data exploration
var_types = dict(zip(schemadf['Variable Name'], schemadf['Data Type']))

In [32]:
chosen_columns = ['summary'] + [col for col in df.columns if var_types[col] in ['numeric', 'integer']][:10] # select only numeric or integer columns and summary col
df_summary.select(chosen_columns).show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|       hospital_id|               age|            height|            icu_id|  pre_icu_los_days|            weight|    d1_diasbp_max|     d1_diasbp_min|  d1_heartrate_max| d1_heartrate_min|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|             91713|             87485|             90379|             91713|             91713|             88993|            91548|             91548|             91568|            91568|
|   mean|105.66926171862222|62.309515917014345|169.64158813704822| 508.3576919302607|0.8357660508174694| 84.02833953940066|88.49187311574256| 50.16131428321755|103.00056788397694|70.32184824392802|
| stddev|6

It appears that some of the variables, including age and height, may be candidates for imputing with the mean or median.

# Pipeline Construction

## Feature Selection

We then select the features that we would like to include in our model and parse them into lists for each datatype:

In [102]:
model_features = ['age','elective_surgery', 'icu_admit_source', 'gender', 'weight'] # select the features we want in model manually

In [103]:
special_vars = [] # any variables that we want to give special treatment (bucketize, etc)
model_int_vars = [col for col in model_features if var_types[col] == 'integer' and col not in special_vars]
model_float_vars = [col for col in model_features if var_types[col] == 'numeric' and col not in special_vars]
model_binary_vars = [col for col in model_features if var_types[col] == 'binary' and col not in special_vars]
model_string_vars = [col for col in model_features if var_types[col] == 'string' and col not in special_vars]

In [104]:
print(model_int_vars)
print(model_float_vars)
print(model_binary_vars)
print(model_string_vars)

[]
['age', 'weight']
['elective_surgery']
['icu_admit_source', 'gender']


### Pipeline Packages

In [105]:
from pyspark.ml.feature import Imputer

In [106]:
from pyspark.ml.feature import *
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline  
from pyspark.ml.classification import LogisticRegression

## Data Split

In [107]:
trainVal, test = df.randomSplit([0.8, 0.2], seed=304)

## Integer Variable Pipeline

In [108]:
# Not needed because only remaining integer variables are hospital_id and hospital_death, which contain no NANs!

## Float Variable Pipeline

Float variables are passed to a Max Absolute Scaler, which will scale the values but does not impact the spread of the values.

In [109]:
imputed_float_vars = ['{}_imputed'.format(var) for var in model_float_vars]
float_imputer = Imputer(inputCols=model_float_vars, outputCols=imputed_float_vars)

float_var_vectorizer = VectorAssembler(inputCols=float_imputer.getOutputCols(), outputCol='floatFeatures', handleInvalid='skip')

float_MaxAbs_Scaler = MaxAbsScaler(inputCol=float_var_vectorizer.getOutputCol(),
                                   outputCol='scaledFloatFeatures')

## Binary Variable Pipeline

Binary variables are passed to a vectorizer for use in the model directly.

In [110]:
binary_var_vectorizer = VectorAssembler(inputCols=model_binary_vars, 
                                        outputCol='binaryFeatures',
                                        handleInvalid='skip')

## String Variable Pipeline

String variables are all one hot encoded by first passing them to a string indexer and then to a one hot encoder. Finally, the one hot encoded variables are passed to a vectorizer.

In [111]:
indexed_string_vars = ['{}_indexed'.format(var) for var in model_string_vars]
string_var_indexer = StringIndexer(inputCols=model_string_vars,
                                   outputCols=indexed_string_vars,
                                   handleInvalid='skip')

In [112]:
ohe_string_vars = ['{}_ohe'.format(var) for var in model_string_vars]
string_var_ohe = OneHotEncoder(inputCols=string_var_indexer.getOutputCols(),
                               outputCols=ohe_string_vars)

In [113]:
string_var_vectorizer = VectorAssembler(inputCols=string_var_ohe.getOutputCols(),
                                        outputCol='oheStringFeatures',
                                        handleInvalid='skip')

## Final Pipeline Assembly

To finish the pipeline, all variable type vectors are passed to a final vectorizer and then the pipeline is constructed.

In [114]:
final_feature_vectorizer = VectorAssembler(inputCols=[binary_var_vectorizer.getOutputCol(),
                                                     float_MaxAbs_Scaler.getOutputCol(),
                                                     string_var_vectorizer.getOutputCol()],
                                          outputCol='features', handleInvalid='skip')

In [115]:
lr = LogisticRegression(labelCol='hospital_death', featuresCol='features', maxIter=10, regParam=0.01)

In [116]:
lr_pipeline = Pipeline(stages=[float_imputer,
                               float_var_vectorizer,
                               binary_var_vectorizer,
                               float_MaxAbs_Scaler,
                               string_var_indexer,
                               string_var_ohe,
                               string_var_vectorizer,
                               final_feature_vectorizer,
                               lr])

## Model K-Fold Evaluation

In [117]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [118]:
lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [None]:
# need to set metricName
# model_evaluator = 

In [119]:
crossval = CrossValidator(estimator=lr_pipeline,
                          estimatorParamMaps=lr_paramGrid,
                          evaluator=BinaryClassificationEvaluator().setLabelCol(lr.getLabelCol()),
                          numFolds=5,
                          seed=304)

In [120]:
cvModel = crossval.fit(trainVal)

In [121]:
preds = cvModel.transform(test)

In [122]:
testEvaluator = BinaryClassificationEvaluator(rawPredictionCol='probability',
                                              labelCol='hospital_death',
                                              metricName='areaUnderROC')

In [123]:
testEvaluator.evaluate(preds)

0.6675595877508442

In [124]:
trainValPreds = cvModel.transform(trainVal)

In [125]:
testEvaluator.evaluate(trainValPreds)

0.6656484892015455

In [126]:
trainValPreds.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|73203|
+----------+-----+



In [127]:
preds.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|18373|
+----------+-----+



In [99]:
df.groupBy('hospital_death').count().show()

+--------------+-----+
|hospital_death|count|
+--------------+-----+
|             1| 7915|
|             0|83798|
+--------------+-----+

