In [1]:
import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split


In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/17 06:51:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setup Config

In [3]:
# set up config
model_train_date_str = "2009-01-01"
train_test_period_months = 108
oot_period_months = 12
train_test_ratio = 0.8

config = {}
config["model_train_date_str"] = model_train_date_str
config["train_test_period_months"] = train_test_period_months
config["oot_period_months"] =  oot_period_months
config["model_train_date"] =  datetime.strptime(model_train_date_str, "%Y-%m-%d")
config["oot_end_date"] = config["model_train_date"] - timedelta(days = 1)
config["oot_start_date"] =  config['model_train_date'] - relativedelta(months = oot_period_months)
config["train_test_end_date"] =  config["oot_start_date"] - timedelta(days = 1)
config["train_test_start_date"] =  config["oot_start_date"] - relativedelta(months = train_test_period_months)
config["train_test_ratio"] = train_test_ratio 


pprint.pprint(config)

{'model_train_date': datetime.datetime(2009, 1, 1, 0, 0),
 'model_train_date_str': '2009-01-01',
 'oot_end_date': datetime.datetime(2008, 12, 31, 0, 0),
 'oot_period_months': 12,
 'oot_start_date': datetime.datetime(2008, 1, 1, 0, 0),
 'train_test_end_date': datetime.datetime(2007, 12, 31, 0, 0),
 'train_test_period_months': 108,
 'train_test_ratio': 0.8,
 'train_test_start_date': datetime.datetime(1999, 1, 1, 0, 0)}


Label Store

In [4]:
# connect to label store
folder_path = "datamart/gold/label_store/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
label_store_sdf = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",label_store_sdf.count())

label_store_sdf.show()

                                                                                

row_count: 101766
+------------+-----+-------------+
|encounter_id|label|snapshot_date|
+------------+-----+-------------+
|     4679262|    0|   2000-08-01|
|     4680186|    0|   2000-08-02|
|     4682388|    0|   2000-08-03|
|     4728330|    0|   2000-08-04|
|     4729674|    0|   2000-08-05|
|     4730190|    0|   2000-08-06|
|     4731546|    0|   2000-08-07|
|     4734798|    0|   2000-08-08|
|     4736166|    0|   2000-08-09|
|     4736214|    0|   2000-08-10|
|     4765758|    0|   2000-08-11|
|     4772286|    0|   2000-08-12|
|     4798818|    0|   2000-08-13|
|     4801326|    0|   2000-08-14|
|     4804620|    1|   2000-08-15|
|     4804968|    0|   2000-08-16|
|     4809204|    0|   2000-08-17|
|     4834632|    0|   2000-08-18|
|     4840590|    0|   2000-08-19|
|     4840836|    0|   2000-08-20|
+------------+-----+-------------+
only showing top 20 rows



In [5]:
# extract label store
labels_sdf = label_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))

print("extracted labels_sdf", labels_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

[Stage 6:>                                                        (0 + 12) / 12]

extracted labels_sdf 101766 1999-01-01 00:00:00 2008-12-31 00:00:00


                                                                                

Get features

In [6]:
folder_path = "datamart/gold/feature_store/"
files_list = [folder_path + os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
features_store_sdf = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",features_store_sdf.count())

features_store_sdf.show()

                                                                                

row_count: 101766
+------------+--------------------+--------------+----------+-------------+---------+------------+------------------------+---------------------------+--------------------+-------------+-----------+-----------+-----------------+--------------------+------+------+------+-------------+
|encounter_id|race_AfricanAmerican|race_Caucasian|race_Asian|race_Hispanic|is_female|age_midpoint|admission_severity_score|admission_source_risk_score|poor_glucose_control|metformin_ord|insulin_ord|diabetesMed|severity_x_visits|  medication_density|diag_1|diag_2|diag_3|snapshot_date|
+------------+--------------------+--------------+----------+-------------+---------+------------+------------------------+---------------------------+--------------------+-------------+-----------+-----------+-----------------+--------------------+------+------+------+-------------+
|    15758256|                   0|             0|         1|            0|        0|          65|                       1|    

In [7]:
# extract label store
features_sdf = features_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))

print("extracted features_sdf", features_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

[Stage 15:>                                                       (0 + 12) / 12]

extracted features_sdf 101766 1999-01-01 00:00:00 2008-12-31 00:00:00


                                                                                

In [8]:
# prepare data for modeling
data_pdf = labels_sdf.join(features_sdf, on=["encounter_id", "snapshot_date"], how="inner").toPandas()
data_pdf

                                                                                

Unnamed: 0,encounter_id,snapshot_date,label,race_AfricanAmerican,race_Caucasian,race_Asian,race_Hispanic,is_female,age_midpoint,admission_severity_score,admission_source_risk_score,poor_glucose_control,metformin_ord,insulin_ord,diabetesMed,severity_x_visits,medication_density,diag_1,diag_2,diag_3
0,15758256,2005-08-01,0,0,0,1,0,0,65,1,1,0,0,1,1,0,0.024145,161,196,453
1,15765702,2005-08-02,1,1,0,0,0,1,25,2,1,0,0,1,1,2,0.007470,250.03,V42,244
2,15774312,2005-08-03,0,0,1,0,0,0,75,2,1,0,0,1,1,0,0.021053,414,411,410
3,15779280,2005-08-04,0,0,1,0,0,0,65,2,1,0,1,1,1,2,0.059341,427,428,396
4,15784284,2005-08-05,0,0,1,0,0,1,55,0,0,0,0,0,1,0,0.025424,584,786,584
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
101761,403830632,1999-02-24,0,1,0,0,0,0,95,2,2,0,0,0,1,2,0.046154,331,780,294
101762,403830932,1999-02-25,0,0,1,0,0,1,25,3,2,0,0,0,0,7,0.007826,996,585,453
101763,403845980,1999-02-26,0,0,1,0,0,1,45,3,2,0,1,0,1,0,0.060606,295,V62,309
101764,403846202,1999-02-27,0,0,1,0,0,0,65,3,2,1,1,2,1,0,0.045455,590,556,496


In [18]:
# split data into train - test - oot
oot_pdf = data_pdf[(data_pdf['snapshot_date'] >= config["oot_start_date"].date()) & (data_pdf['snapshot_date'] <= config["oot_end_date"].date())]
train_test_pdf = data_pdf[(data_pdf['snapshot_date'] >= config["train_test_start_date"].date()) & (data_pdf['snapshot_date'] <= config["train_test_end_date"].date())]

feature_cols = [c for c in train_test_pdf.columns if c not in ["encounter_id", "snapshot_date", "label", "medical_specialty", 'diag_1', 'diag_2', 'diag_3']]

X_oot = oot_pdf[feature_cols]
y_oot = oot_pdf["label"]
X_train, X_test, y_train, y_test = train_test_split(
    train_test_pdf[feature_cols], train_test_pdf["label"], 
    test_size= 1 - config["train_test_ratio"],
    random_state=88,     # Ensures reproducibility
    shuffle=True,        # Shuffle the data before splitting
    stratify=train_test_pdf["label"]           # Stratify based on the label column
)


print('X_train', X_train.shape[0])
print('X_test', X_test.shape[0])
print('X_oot', X_oot.shape[0])
print('y_train', y_train.shape[0], round(y_train.mean(),2))
print('y_test', y_test.shape[0], round(y_test.mean(),2))
print('y_oot', y_oot.shape[0], round(y_oot.mean(),2))

X_train

X_train 73507
X_test 18377
X_oot 9882
y_train 73507 0.11
y_test 18377 0.11
y_oot 9882 0.11


Unnamed: 0,race_AfricanAmerican,race_Caucasian,race_Asian,race_Hispanic,is_female,age_midpoint,admission_severity_score,admission_source_risk_score,poor_glucose_control,metformin_ord,insulin_ord,diabetesMed,severity_x_visits,medication_density
64476,0,1,0,0,1,75,3,2,0,0,1,1,0,0.136364
92020,0,1,0,0,0,75,2,1,0,0,-1,1,0,0.086183
58142,1,0,0,0,1,75,1,1,0,0,1,1,1,0.034091
50284,0,1,0,0,1,65,3,2,0,0,0,0,0,0.048913
101228,1,0,0,0,1,25,3,3,0,0,1,1,2,0.013774
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
34316,0,1,0,0,0,75,3,2,1,0,-1,1,0,0.036364
99464,0,1,0,0,1,75,3,2,0,0,0,0,0,0.150000
37323,0,1,0,0,1,75,3,2,0,0,1,1,0,0.046296
73206,0,1,0,0,0,65,1,1,0,0,0,1,0,0.029412


In [20]:
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer

numeric_cols = [
    'age_midpoint', 'admission_severity_score', 'admission_source_risk_score',
    'metformin_ord', 'insulin_ord', 'severity_x_visits', 'medication_density'
]

scaler = ColumnTransformer(
    transformers=[('num', StandardScaler(), numeric_cols)],
    remainder='passthrough'
)

scaler.fit(X_train)
X_train_processed = scaler.transform(X_train)
X_test_processed = scaler.transform(X_test)
X_oot_processed = scaler.transform(X_oot)

print('X_train_processed', X_train_processed.shape[0])
print('X_test_processed', X_test_processed.shape[0])
print('X_oot_processed', X_oot_processed.shape[0])

pd.DataFrame(X_train_processed)

X_train_processed 73507
X_test_processed 18377
X_oot_processed 9882


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,0.566614,0.815024,0.534312,-0.44979,0.707689,-0.448947,0.780905,0.0,1.0,0.0,0.0,1.0,0.0,1.0
1,0.566614,-0.138228,-0.885839,-0.44979,-1.675385,-0.448947,0.284900,0.0,1.0,0.0,0.0,0.0,0.0,1.0
2,0.566614,-1.091480,-0.885839,-0.44979,0.707689,-0.166065,-0.230004,1.0,0.0,0.0,0.0,1.0,0.0,1.0
3,-0.060911,0.815024,0.534312,-0.44979,-0.483848,-0.448947,-0.083495,0.0,1.0,0.0,0.0,1.0,0.0,0.0
4,-2.571012,0.815024,1.954464,-0.44979,0.707689,0.116817,-0.430824,1.0,0.0,0.0,0.0,1.0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
73502,0.566614,0.815024,0.534312,-0.44979,-1.675385,-0.448947,-0.207539,0.0,1.0,0.0,0.0,0.0,1.0,1.0
73503,0.566614,0.815024,0.534312,-0.44979,-0.483848,-0.448947,0.915693,0.0,1.0,0.0,0.0,1.0,0.0,0.0
73504,0.566614,0.815024,0.534312,-0.44979,0.707689,-0.448947,-0.109361,0.0,1.0,0.0,0.0,1.0,0.0,1.0
73505,-0.060911,-1.091480,-0.885839,-0.44979,-0.483848,-0.448947,-0.276255,0.0,1.0,0.0,0.0,0.0,0.0,1.0


In [28]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import roc_auc_score, make_scorer
import pprint

# --------------------------------------------------------
# Define the Logistic Regression model
# --------------------------------------------------------
log_reg = LogisticRegression(
    solver='liblinear',  # works well for small/medium datasets & L1/L2 penalty
    random_state=88,
    max_iter=1000
)

# --------------------------------------------------------
# Define the hyperparameter search space
# --------------------------------------------------------
param_dist = {
    'penalty': ['l1', 'l2'],
    'C': [0.001, 0.01, 0.1, 1, 10, 100],  # inverse of regularization strength
    'class_weight': [None, 'balanced']
}

# --------------------------------------------------------
# Create a scorer based on AUC
# --------------------------------------------------------
auc_scorer = make_scorer(roc_auc_score)

# --------------------------------------------------------
# Randomized search with cross-validation
# --------------------------------------------------------
random_search = RandomizedSearchCV(
    estimator=log_reg,
    param_distributions=param_dist,
    scoring=auc_scorer,
    n_iter=10,
    cv=3,
    verbose=1,
    random_state=42,
    n_jobs=-1
)

# --------------------------------------------------------
# Fit the model on TRAIN data
# --------------------------------------------------------
random_search.fit(X_train_processed, y_train)

# --------------------------------------------------------
# Show best parameters and cross-val AUC
# --------------------------------------------------------
print("\nBest parameters found:")
pprint.pprint(random_search.best_params_)
print("Best cross-validation AUC score: ", random_search.best_score_)

# --------------------------------------------------------
# Evaluate best model on TRAIN / TEST / OOT
# --------------------------------------------------------
best_model = random_search.best_estimator_

# TRAIN
y_pred_train = best_model.predict_proba(X_train_processed)[:, 1]
train_auc = roc_auc_score(y_train, y_pred_train)

# TEST
y_pred_test = best_model.predict_proba(X_test_processed)[:, 1]
test_auc = roc_auc_score(y_test, y_pred_test)

# OOT
y_pred_oot = best_model.predict_proba(X_oot_processed)[:, 1]
oot_auc = roc_auc_score(y_oot, y_pred_oot)

# --------------------------------------------------------
# Print all AUC + GINI scores
# --------------------------------------------------------
print("\nAUC Scores:")
print(f"Train AUC: {train_auc:.4f}")
print(f"Test  AUC: {test_auc:.4f}")
print(f"OOT   AUC: {oot_auc:.4f}")

print("\nGINI Scores:")
print(f"Train GINI: {2*train_auc - 1:.4f}")
print(f"Test  GINI: {2*test_auc - 1:.4f}")
print(f"OOT   GINI: {2*oot_auc - 1:.4f}")


Fitting 3 folds for each of 10 candidates, totalling 30 fits

Best parameters found:
{'C': 0.1, 'class_weight': 'balanced', 'penalty': 'l2'}
Best cross-validation AUC score:  0.5738350127980354

AUC Scores:
Train AUC: 0.6045
Test  AUC: 0.6136
OOT   AUC: 0.6145

GINI Scores:
Train GINI: 0.2090
Test  GINI: 0.2271
OOT   GINI: 0.2291


In [29]:
feature_importance = pd.DataFrame({
    'Feature': X_train.columns,
    'Coefficient': best_model.coef_[0]
}).sort_values(by='Coefficient', ascending=False)

print(feature_importance)


                        Feature  Coefficient
5                  age_midpoint     0.297509
13           medication_density     0.246242
7   admission_source_risk_score     0.107068
8          poor_glucose_control     0.087057
0          race_AfricanAmerican     0.077901
1                race_Caucasian     0.031053
10                  insulin_ord     0.011789
6      admission_severity_score     0.010012
11                  diabetesMed    -0.006802
2                    race_Asian    -0.008688
4                     is_female    -0.008897
12            severity_x_visits    -0.051569
3                 race_Hispanic    -0.073692
9                 metformin_ord    -0.116906
