# Readmissions 2: Feature engineering and predictive modeling
## Using Snowpark ML for feature engineering and predictive modeling

### Import packages for our notebook



##### Packages to add to notebook from Packages dropdown in top right:
seaborn \
plotly \
snowflake-ml-python \
shap \


In [None]:
# Import Python packages
import pandas as pd
import numpy as np
from itertools import combinations
import seaborn as sns
import os
import plotly.express as px
import json
import sys

# Import Snowflake modules
from snowflake.snowpark import Session 
import snowflake.snowpark.functions as F 
import snowflake.snowpark.types as T
from snowflake.snowpark import Window
from snowflake.snowpark.functions import col


# Pandas Tools
from snowflake.connector.pandas_tools import write_pandas

# create_temp_table warning suppresion
import warnings; warnings.simplefilter('ignore')

from snowflake.snowpark.context import get_active_session
session = get_active_session()


### Create our Snowpark session



In [None]:
session.use_role('datasci')
session.use_database('analytics')
session.use_warehouse('datasci_wh')
session.use_schema('readmit')
print(session.sql("select current_role(), current_warehouse(), current_database(), current_schema(), current_region(), current_client()").collect())

[Row(CURRENT_ROLE()='DATASCI', CURRENT_WAREHOUSE()='DATASCI_WH', CURRENT_DATABASE()='ANALYTICS', CURRENT_SCHEMA()='READMIT', CURRENT_REGION()='PUBLIC.AWS_US_WEST_2', CURRENT_CLIENT()='PythonSnowpark 1.11.1')]


### Inspect the data in a Snowpark dataframe



In [None]:
readmissions_df=session.table('readmissions_enriched').drop('ADMIT_DATE')
readmissions_df.sample(n=10).show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CITY_LAT"  |"CITY_LONG"  |"HOSPITAL_LAT"  |"HOSPITAL_LONG"  |"HOSPITAL_NAME"                                    |"HOSPITAL_STATE"  |"DIAGNOSIS"  |"PATIENT_NUMBER"  |"DV_READMIT_FLAG" 

### How large is the Snowpark dataframe in the Python runtime?



In [None]:
import sys
snowpark_size = sys.getsizeof(readmissions_df) / (1024*1024)
print(f"Snowpark DataFrame Size (snowpark_df): {snowpark_size:.2f} MB")

Snowpark DataFrame Size (snowpark_df): 0.00 MB


### Explore Snowpark Dataframe API



In [None]:
co_df = readmissions_df.filter(F.col("HOSPITAL_STATE") == 'CO').order_by(["DV_READMIT_FLAG", "DIAGNOSIS"], ascending=[0, 0])
co_df.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM readmissions_enriched WHERE ("HOSPITAL_STATE" = 'NJ') ORDER BY "DV_READMIT_FLAG" DESC NULLS LAST, "DIAGNOSIS" DESC NULLS LAST
Logical Execution Plan:
GlobalStats:
    partitionsTotal=1
    partitionsAssigned=1
    bytesAssigned=5365760
Operations:
1:0     ->Result  READMISSIONS_ENRICHED.CITY_LAT, READMISSIONS_ENRICHED.CITY_LONG, READMISSIONS_ENRICHED.HOSPITAL_LAT, READMISSIONS_ENRICHED.HOSPITAL_LONG, READMISSIONS_ENRICHED.HOSPITAL_NAME, READMISSIONS_ENRICHED.HOSPITAL_STATE, READMISSIONS_ENRICHED.DIAGNOSIS, READMISSIONS_ENRICHED.PATIENT_NUMBER, READMISSIONS_ENRICHED.DV_READMIT_FLAG, READMISSIONS_ENRICHED.ADMIT_DATE, READMISSIONS_ENRICHED.LENGTH_OF_STAY, READMISSIONS_ENRICHED.PRIOR_IP_ADMITS, READMISSIONS_ENRICHED.CHRONIC_CONDITIONS_NUMBER, READMISSIONS_ENRICHED.PATIENT_AGE, READMISSIONS_ENRICHED.ORDER_SET_USED, READMISSIONS_ENRICHED.HOSPITAL_ID, READMISSIONS_ENRICHED.HOSPITAL_ADDRESS, READMISSIONS_ENRICHED.HOSPIT

### Explore columns for modeling



In [None]:
missing_check = readmissions_df.select("PATIENT_AGE", "BMI", "LENGTH_OF_STAY", "ORDER_SET_USED", "CHRONIC_CONDITIONS_NUMBER",  "SBUX_COUNT", "DV_READMIT_FLAG", "HOSPITAL_NAME")

#Inspect columns for missing values and replace null values with 0
column_names=missing_check.columns
for i in column_names:
    print('Column',i,'has',missing_check.filter(col(i).isNull()).count(),'missing values!')

Column PATIENT_AGE has 0 missing values!
Column BMI has 0 missing values!
Column LENGTH_OF_STAY has 0 missing values!
Column ORDER_SET_USED has 0 missing values!
Column CHRONIC_CONDITIONS_NUMBER has 0 missing values!
Column SBUX_COUNT has 0 missing values!
Column DV_READMIT_FLAG has 0 missing values!
Column HOSPITAL_NAME has 0 missing values!
Column HOSPITAL_LAT has 949 missing values!
Column HOSPITAL_LONG has 949 missing values!


In [None]:
import streamlit as st

feature_cols = ["PATIENT_AGE",'BMI','LENGTH_OF_STAY','PRIOR_IP_ADMITS','CHRONIC_CONDITIONS_NUMBER', 'SBUX_COUNT'] 
df = readmissions_df[feature_cols].to_pandas()

numeric_cols = df.select_dtypes(include='number').columns

for col in numeric_cols:
    st.subheader(f'Histogram for {col}')
    fig = px.histogram(readmissions_df, x=col, title=f'Histogram of {col}', 
                       nbins=10, opacity=0.75)
    fig.update_layout(width=700, height=500)  
    st.plotly_chart(fig, use_container_width=False)  


In [None]:
import plotly.express as px

graph_data=readmissions_df['DIAGNOSIS', 'DV_READMIT_FLAG'].to_pandas()
grouped_data = graph_data.groupby(['DIAGNOSIS', 'DV_READMIT_FLAG']).size().reset_index(name='count')

# Create the stacked bar chart
fig = px.bar(
    grouped_data,
    x='DIAGNOSIS',
    y='count',
    color='DV_READMIT_FLAG',
    title='Number of Rows for Each Diagnosis, Grouped by Readmissions Flag',
    barmode='stack'
)

st.plotly_chart(fig, use_container_width=True)

### Engineer feature to fix missing values and skewness and encode categorical variables

In [None]:
from snowflake.ml.modeling.impute import SimpleImputer

imputer = SimpleImputer(missing_values=None, strategy='constant', fill_value=0, input_cols='SBUX_COUNT', output_cols='SBUX_COUNT_IMP')
imputed_df = imputer.fit(readmissions_df).transform(readmissions_df)

imputed_df.show(5)

In [None]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import DecimalType


# Normalize the LENGTH_OF_STAY column
snowml_mms = snowml.MinMaxScaler(input_cols=["LENGTH_OF_STAY"], output_cols=["LENGTH_OF_STAY_NORM"])
normalized_df = snowml_mms.fit(imputed_df).transform(imputed_df)

# Reduce the number of decimals
new_col = normalized_df.col("LENGTH_OF_STAY_NORM").cast(DecimalType(7, 6))
normalized_df = normalized_df.with_column("LENGTH_OF_STAY_NORM", new_col)

st.subheader(f'Histogram for LENGTH_OF_STAY_NORM')
fig = px.histogram(normalized_df, x='LENGTH_OF_STAY_NORM', title=f'Histogram of LENGTH_OF_STAY_NORM', 
                nbins=10, opacity=0.75)
fig.update_layout(width=700, height=500)  
st.plotly_chart(fig, use_container_width=False)  

In [None]:
#one hot encoding of categorical variables
import snowflake.ml.modeling.preprocessing as snowml

inputs = ['DIAGNOSIS', 'PATIENT_GENDER', 'MARITAL_STATUS', 'HIGH_NA_AT_DISCHARGE']
outputs = ['DIAGNOSIS_OHE', 'PATIENT_GENDER_OHE', 'MARITAL_STATUS_OHE', 'HIGH_NA_AT_DISCHARGE_OHE']

snowml_ohe = snowml.OneHotEncoder(input_cols=inputs,
                                output_cols=outputs)
ohe_df = snowml_ohe.fit(normalized_df).transform(normalized_df)

np.array(ohe_df.columns)

### Save preprocessing steps as pipeline for future use on new data

In [None]:
# Build the preprocessing pipeline for reuse
from snowflake.ml.modeling.pipeline import Pipeline
import joblib


preprocessing_pipeline = Pipeline(
    steps=[
            (
                "IMP",
                SimpleImputer(
                    missing_values=None, 
                    strategy='constant', 
                    fill_value=0,
                    input_cols="SBUX_COUNT",
                    output_cols="SBUX_COUNT_IMP"
                )
            ),
            ("OHE",
                snowml.OneHotEncoder(
                    input_cols=["DIAGNOSIS", "PATIENT_GENDER", "MARITAL_STATUS", "HIGH_NA_AT_DISCHARGE"],
                    output_cols=["DIAGNOSIS_OHE", "PATIENT_GENDER_OHE", "MARITAL_STATUS_OHE", "HIGH_NA_AT_DISCHARGE_OHE"]
                )
            ),
            (
                "MMS",
                snowml.MinMaxScaler(
                    input_cols=["LENGTH_OF_STAY"], 
                    output_cols=["LENGTH_OF_STAY_NORM"]                
                )
            )        
    ]
)

PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first

transformed_readmissions = preprocessing_pipeline.fit(readmissions_df).transform(readmissions_df)
np.array(transformed_readmissions.columns)

### Save modeling dataframe as a table in Snowflake



In [None]:
transformed_readmissions.write.mode("overwrite").save_as_table("analytics.readmit.readmissions_modeling")
modeling_df = session.table("readmissions_modeling")
modeling_df.show()

### Define and train XG Boost Classifier using SciKitLearn syntax that is 100% pushed down to Snowflake



In [None]:
from snowflake.ml.modeling.xgboost import XGBClassifier

#Train/test split
readmits_train_df, readmits_test_df = session.table("readmissions_modeling").drop('ROW').random_split(weights=[0.8, 0.2], seed=0)

#Specify columns for modeling'
feature_cols = ['PATIENT_AGE','BMI','PRIOR_IP_ADMITS','LENGTH_OF_STAY_NORM','CHRONIC_CONDITIONS_NUMBER', 'SBUX_COUNT_IMP', "DIAGNOSIS_OHE_AMI", "DIAGNOSIS_OHE_CABG", "DIAGNOSIS_OHE_COPD", "DIAGNOSIS_OHE_HF","DIAGNOSIS_OHE_HIPKNEE","DIAGNOSIS_OHE_PN","PATIENT_GENDER_OHE_F", "PATIENT_GENDER_OHE_M", "MARITAL_STATUS_OHE_N", "MARITAL_STATUS_OHE_Y", "HIGH_NA_AT_DISCHARGE_OHE_Y", "HIGH_NA_AT_DISCHARGE_OHE_N"] 
target_col = "DV_READMIT_FLAG"
output_col = "PREDICTED_READMIT_FLAG"

#Create our classifier
classifier = XGBClassifier(
    input_cols=feature_cols,
    label_cols=target_col,
    output_cols=output_col
)

# Train
classifier.fit(readmits_train_df)

# Eval
train_result = classifier.predict(readmits_train_df)
test_result = classifier.predict(readmits_test_df)


# Analyze results
results_df = test_result["DV_READMIT_FLAG", "PREDICTED_READMIT_FLAG"].to_pandas()
results_df.head()

Unnamed: 0,DV_READMIT_FLAG,PREDICTED_READMIT_FLAG
0,0,0
1,0,0
2,0,0
3,1,1
4,1,1


### Inspect our results with a confusion matrix



In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

actuals = results_df["DV_READMIT_FLAG"].astype(str)
predicts = results_df["PREDICTED_READMIT_FLAG"].astype(str)
cm = confusion_matrix(actuals, predicts)

fig, ax = plt.subplots(figsize=(4, 4))  # Adjust the size as needed
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap='Blues', ax=ax)

plt.show()

### Get model statistics

In [None]:
from snowflake.ml.modeling.metrics import roc_auc_score

train_AUC = roc_auc_score(df=train_result, y_true_col_names=["DV_READMIT_FLAG"], y_score_col_names=["PREDICTED_READMIT_FLAG"])
test_AUC = roc_auc_score(df=test_result, y_true_col_names=["DV_READMIT_FLAG"], y_score_col_names=["PREDICTED_READMIT_FLAG"])

print(f"Training ROC: {train_AUC} \nTest ROC: {test_AUC}")

### Hyperparameter optimization with SciKitLearn GridSearch



In [None]:
from snowflake.ml.modeling.model_selection import GridSearchCV

#Specify grid search area
grid_search = GridSearchCV(
    estimator=XGBClassifier(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.01, 0.05, 0.1, 0.25, 0.5],
    },
    n_jobs = -1,
    scoring="roc_auc",
    input_cols=feature_cols,
    label_cols=target_col,
    output_cols=output_col
)

# Train
grid_search.fit(readmits_train_df)

import matplotlib.pyplot as plt
from IPython.display import display

gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
ROC_val = gs_results["mean_test_score"]

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "ROC":ROC_val})

optimal_model = grid_search.to_sklearn().best_estimator_

optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate

optimal_roc = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
                                 (gs_results_df['learning_rate']==optimal_learning_rate), 'ROC'].values[0]


display(gs_results_df.sort_values('ROC', ascending=False).head(10))

<snowflake.ml.modeling.model_selection.grid_search_cv.GridSearchCV at 0x7fe7c6fd37c0>

### Analyze variable contributions to predictions



In [None]:
import shap


# Initialize the SHAP explainer
test_df = readmits_test_df[feature_cols].to_pandas()
xgb_classifier=classifier.to_xgboost()
explainer = shap.Explainer(xgb_classifier)

# Calculate SHAP values for the test set
shap_values = explainer(test_df)

# Plot the SHAP values
shap.summary_plot(shap_values, test_df, plot_size=0.3)

### Create the registry where our model will reside in deployment



In [None]:
from snowflake.ml._internal.utils import identifier
from snowflake.ml.registry import Registry


# Get sample input data to pass into the registry logging function
session.sql("drop model if exists readmissions_classifier").collect()

sample_data =readmits_test_df[feature_cols].limit(1000)

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

# Define model name
model_name = "readmissions_classifier"

# Create a registry and log the model
native_registry = Registry(session=session, database_name=db, schema_name=schema)

# Let's first log the very first model we trained
model_ver = native_registry.log_model(
    model_name=model_name,
    version_name='V0',
    model=classifier,
    sample_input_data=sample_data, # to provide the feature schema
)

# Add evaluation metric
model_ver.set_metric(metric_name='ROC', value=test_AUC)

# Add a description
model_ver.comment = "This is the first iteration of our Readmissions Prediction model. It is used for demo purposes."

# Now, let's log the optimal model from GridSearchCV
model_ver2 = native_registry.log_model(
    model_name=model_name,
    version_name='V1',
    model=optimal_model,
    sample_input_data=sample_data, # to provide the feature schema
)

# Add evaluation metric
model_ver2.set_metric(metric_name='ROC', value=optimal_roc)
# Add a description
model_ver2.comment = "This is the second iteration of our Readmissions Prediction model \
                        where we performed hyperparameter optimization. \
                        It is used for demo purposes."

native_registry.get_model(model_name).show_versions()

### Predict on new data



In [None]:
new_patients_df = session.table("NEW_PATIENTS_ENRICHED").drop("ADMIT_DATE")
new_patients_df.show(n=10)

### Apply preprocessing to new data and make predictions

In [None]:
transformed_new_patients_df = preprocessing_pipeline.fit_transform(new_patients_df)
transformed_new_patients_df.show()

In [None]:
transformed_new_patients_df = preprocessing_pipeline.fit(new_patients_df).transform(new_patients_df)
transformed_new_patients_df.write.mode("overwrite").save_as_table("analytics.readmit.new_patients_transformed_pipeline")
model_ver = native_registry.get_model(model_name).version('v1')
predictions_df = model_ver.run(transformed_new_patients_df, function_name="predict")
predictions_df.show()

In [None]:
df = predictions_df.to_pandas()

outcome_counts = predictions_df.to_pandas()["output_feature_0"].value_counts().sort_index()
st.bar_chart(outcome_counts)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"DV_READMIT_FLAG"  |"HOSPITAL_NAME"       |"HOSPITAL_LAT"  |"HOSPITAL_LONG"  |"PATIENT_AGE"  |"BMI"  |"LENGTH_OF_STAY"  |"ORDER_SET_USED"  |"CHRONIC_CONDITIONS_NUMBER"  |"SBUX_COUNT"  |"PREDICTED_READMIT_FLAG"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0                  |OCEAN MEDICAL CENTER  |NULL            |NULL             |52             |28.4   |3                 |0                 |2                            |0             |0                         |
|0                  |OCEAN MEDICAL CENTER  |NULL            |NULL             |56             |23.2   |3                 |1                 |2  

### Visualize on a map



In [None]:
from snowflake.snowpark import functions as f
pred_map_df = predictions_df.filter(f.col("HOSPITAL_LAT").isNotNull()).filter(f.col('HOSPITAL_LONG').isNotNull()).groupBy("HOSPITAL_NAME", "HOSPITAL_LAT", "HOSPITAL_LONG").agg(F.sum('"output_feature_0"').alias("TOTAL_PRED_READMITS")).to_pandas()


st.map(pred_map_df, latitude="HOSPITAL_LAT", longitude="HOSPITAL_LONG")


In [None]:
alter warehouse datasci_wh suspend;

In [None]:
session.close()