## Databricks ML End-To-End Demo: From Data Ingest, Over Model Creation, To Serving

### Objective: Predicting the cause of death from a range of features with a multi-class classification ML model

#### Public CDC dataset: NCHS - Potentially Excess Deaths from the Five Leading Causes of Death

Source https://data.cdc.gov/NCHS/NCHS-Potentially-Excess-Deaths-from-the-Five-Leadi/vdpk-qzpr

#### Download data from URL

In [0]:
%scala
import org.apache.commons.io.IOUtils
import java.net.URL 

val urlfile=new URL("https://data.cdc.gov/api/views/vdpk-qzpr/rows.csv?accessType=DOWNLOAD")
val excess_deaths_csv = IOUtils.toString(urlfile,"UTF-8").lines.toList.toDS()
val excess_deaths_orig = spark
                .read.option("header", true)
                .option("inferSchema", true)
                .csv(excess_deaths_csv)

#### Rename columns to be able to save as Databricks table

In [0]:
%scala
val excess_deaths = excess_deaths_orig.withColumnRenamed("Year", "year")
                .withColumnRenamed("Cause of Death", "cause_of_death")
                .withColumnRenamed("State", "state")
                .withColumnRenamed("State FIPS Code", "state_fips_code")
                .withColumnRenamed("HHS Region", "hhs_region")
                .withColumnRenamed("Age Range", "age_range")
                .withColumnRenamed("Benchmark", "benchmark")
                .withColumnRenamed("Locality", "locality")
                .withColumnRenamed("Observed Deaths", "observed_deaths")
                .withColumnRenamed("Population", "population")
                .withColumnRenamed("Expected Deaths", "expected_deaths")
                .withColumnRenamed("Potentially Excess Deaths", "potentially_excess_deaths")
                .withColumnRenamed("Percent Potentially Excess Deaths", "percent_potentially_excess_deaths")

In [0]:
%scala
excess_deaths.printSchema()

In [0]:
%scala
excess_deaths.head(n=5)

#### Split in train and holdout set

In [0]:
%scala
val Array(training, holdout) = excess_deaths.randomSplit(Array(0.95, 0.05), seed = 12345)

In [0]:
%scala
println(training.count)
println(holdout.count)

#### Write tables to Databricks default DB

In [0]:
%scala
excess_deaths.write.mode("overwrite").format("parquet").saveAsTable("default.excess_deaths_nchs_all")
training.write.mode("overwrite").format("parquet").saveAsTable("default.excess_deaths_nchs_training")
holdout.write.mode("overwrite").format("parquet").saveAsTable("default.excess_deaths_nchs_holdout")

#### Resume where autogenerated experiment code left off

In [0]:
import mlflow
import databricks.automl_runtime

target_col = "cause_of_death"

#### Load training data into PySpark dataframe

In [0]:
from mlflow.tracking import MlflowClient
import os
import uuid
import shutil
import pandas as pd

df_loaded = spark.sql("select * from hive_metastore.default.excess_deaths_nchs_training")
df_loaded = df_loaded.toPandas()

# Preview data
df_loaded.head(5)



Unnamed: 0,year,cause_of_death,state,state_fips_code,hhs_region,age_range,benchmark,locality,observed_deaths,population,expected_deaths,potentially_excess_deaths,percent_potentially_excess_deaths
0,2011,Unintentional Injury,Louisiana,LA,6,0-74,Floating,Nonmetropolitan,335.0,726181.0,136.0,199.0,59.4
1,2011,Unintentional Injury,Louisiana,LA,6,0-79,2005 Fixed,All,1856.0,4428395.0,818.0,1038.0,55.9
2,2011,Unintentional Injury,Louisiana,LA,6,0-79,2005 Fixed,Metropolitan,1504.0,3681862.0,679.0,825.0,54.9
3,2011,Unintentional Injury,Louisiana,LA,6,0-79,2005 Fixed,Nonmetropolitan,352.0,746533.0,139.0,213.0,60.5
4,2011,Unintentional Injury,Louisiana,LA,6,0-79,2010 Fixed,All,1856.0,4428395.0,860.0,996.0,53.7


#### Select supported columns
Select only the columns that are supported. This allows us to train a model that can predict on a dataset that has extra columns that are not used in training.
`[]` are dropped in the pipelines. See the Alerts tab of the AutoML Experiment page for details on why these columns are dropped.

In [0]:
from databricks.automl_runtime.sklearn.column_selector import ColumnSelector
supported_cols = ["locality", "year", "hhs_region", "percent_potentially_excess_deaths", "expected_deaths", "population", "state", "observed_deaths", "age_range", "state_fips_code", "benchmark", "potentially_excess_deaths"]
col_selector = ColumnSelector(supported_cols)

#### Preprocessors

#### Numerical columns

Missing values for numerical columns are imputed with mean by default.

In [0]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, StandardScaler

num_imputers = []
num_imputers.append(("impute_mean", SimpleImputer(), ["expected_deaths", "observed_deaths", "percent_potentially_excess_deaths", "population", "potentially_excess_deaths"]))

numerical_pipeline = Pipeline(steps=[
    ("converter", FunctionTransformer(lambda df: df.apply(pd.to_numeric, errors="coerce"))),
    ("imputers", ColumnTransformer(num_imputers)),
    ("standardizer", StandardScaler()),
])

numerical_transformers = [("numerical", numerical_pipeline, ["percent_potentially_excess_deaths", "expected_deaths", "population", "observed_deaths", "potentially_excess_deaths"])]

#### Categorical columns

#### Low-cardinality categoricals
Convert each low-cardinality categorical column into multiple binary columns through one-hot encoding.
For each input categorical column (string or numeric), the number of output columns is equal to the number of unique values in the input column.

In [0]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

one_hot_imputers = []

one_hot_pipeline = Pipeline(steps=[
    ("imputers", ColumnTransformer(one_hot_imputers, remainder="passthrough")),
    ("one_hot_encoder", OneHotEncoder(handle_unknown="ignore")),
])

categorical_one_hot_transformers = [("onehot", one_hot_pipeline, ["age_range", "benchmark", "hhs_region", "locality", "state", "state_fips_code", "year"])]

In [0]:
from sklearn.compose import ColumnTransformer

transformers = numerical_transformers + categorical_one_hot_transformers

preprocessor = ColumnTransformer(transformers, remainder="passthrough", sparse_threshold=1)

#### Train - Validation - Test Split
Split the input data into 3 sets:
- Train (90% of the dataset used to train the model)
- Validation (5% of the dataset used to tune the hyperparameters of the model)
- Test (5% of the dataset used to report the true performance of the model on an unseen dataset)

In [0]:
from sklearn.model_selection import train_test_split

split_X = df_loaded.drop([target_col], axis=1)
split_y = df_loaded[target_col]

# Split out train data
X_train, split_X_rem, y_train, split_y_rem = train_test_split(split_X, split_y, train_size=0.9, random_state=130405918, stratify=split_y)

# Split remaining data equally for validation and test
X_val, X_test, y_val, y_test = train_test_split(split_X_rem, split_y_rem, test_size=0.1, random_state=130405918, stratify=split_y_rem)

#### Train classification model

In [0]:
from xgboost import XGBClassifier

In [0]:
import mlflow
import sklearn
from sklearn import set_config
from sklearn.pipeline import Pipeline

set_config(display="diagram")

xgbc_classifier = XGBClassifier(
  colsample_bytree=0.5799125409931767,
  learning_rate=0.2674909487961949,
  max_depth=7,
  min_child_weight=5,
  n_estimators=333,
  n_jobs=100,
  subsample=0.28173904550981416,
  verbosity=0,
  random_state=130405918,
)

model = Pipeline([
    ("column_selector", col_selector),
    ("preprocessor", preprocessor),
    ("classifier", xgbc_classifier),
])

# Create a separate pipeline to transform the validation dataset. This is used for early stopping.
pipeline = Pipeline([
    ("column_selector", col_selector),
    ("preprocessor", preprocessor),
])

mlflow.sklearn.autolog(disable=True)
pipeline.fit(X_train, y_train)
X_val_processed = pipeline.transform(X_val)

model

In [0]:
# Enable automatic logging of input samples, metrics, parameters, and models
mlflow.sklearn.autolog(log_input_examples=True, silent=True)

with mlflow.start_run(experiment_id="2252121153813935", run_name="xgboost") as mlflow_run:
    model.fit(X_train, y_train, classifier__early_stopping_rounds=5, classifier__eval_set=[(X_val_processed,y_val)], classifier__verbose=False)
    
    # Training metrics are logged by MLflow autologging
    # Log metrics for the validation set
    xgbc_val_metrics = mlflow.sklearn.eval_and_log_metrics(model, X_val, y_val, prefix="val_")

    # Log metrics for the test set
    xgbc_test_metrics = mlflow.sklearn.eval_and_log_metrics(model, X_test, y_test, prefix="test_")

    # Display the logged metrics
    xgbc_val_metrics = {k.replace("val_", ""): v for k, v in xgbc_val_metrics.items()}
    xgbc_test_metrics = {k.replace("test_", ""): v for k, v in xgbc_test_metrics.items()}
    display(pd.DataFrame([xgbc_val_metrics, xgbc_test_metrics], index=["validation", "test"]))



precision_score,recall_score,f1_score,accuracy_score,log_loss,roc_auc_score,score
0.9195314613819964,0.919857325976704,0.9196035438922732,0.919857325976704,0.1982849679176193,0.9957265137927068,0.919857325976704
0.923796053648028,0.924272818455366,0.923942958892292,0.924272818455366,0.1909763496887563,0.9960241921815772,0.924272818455366


In [0]:
# model_uri for the generated model
print(f"runs:/{ mlflow_run.info.run_id }/model")

runs:/96cd19d155d448fe97618fd665273c44/model


#### Define input and output for batch inference
The table path assigned to`input_table_name` will be used for batch inference and the predictions will be saved to `output_table_path`.

In [0]:
# redefining key variables here because %pip and %conda restarts the Python interpreter
input_table_name = "default.excess_deaths_nchs_holdout"
output_table_path = "/FileStore/batch-inference/excess_deaths_nchs"

In [0]:
# load table as a Spark DataFrame
table = spark.table(input_table_name)

#### Load model and run inference

In [0]:
import mlflow
from pyspark.sql.functions import struct

model_uri = f"runs:/{ mlflow_run.info.run_id }/model"

# create spark user-defined function for model prediction
predict = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="string")



In [0]:
output_df = table.withColumn("prediction", predict(struct(*table.columns)))

In [0]:
output_df.select('year', 'cause_of_death', 'state', 'prediction').show()

+----+--------------+--------------+----------+
|year|cause_of_death|         state|prediction|
+----+--------------+--------------+----------+
|2009|        Cancer|      New York|    Cancer|
|2009|        Cancer|North Carolina|    Cancer|
|2009|        Cancer|North Carolina|    Cancer|
|2009|        Cancer|North Carolina|    Cancer|
|2009|        Cancer|  North Dakota|    Cancer|
|2009|        Cancer|  North Dakota|    Cancer|
|2009|        Cancer|  North Dakota|    Cancer|
|2009|        Cancer|  North Dakota|    Cancer|
|2009|        Cancer|  North Dakota|    Cancer|
|2009|        Cancer|          Ohio|    Cancer|
|2009|        Cancer|          Ohio|    Cancer|
|2009|        Cancer|          Ohio|    Cancer|
|2009|        Cancer|          Ohio|    Cancer|
|2009|        Cancer|          Ohio|    Cancer|
|2009|        Cancer|      Oklahoma|    Cancer|
|2009|        Cancer|        Oregon|    Cancer|
|2009|        Cancer|        Oregon|    Cancer|
|2009|        Cancer|  Pennsylvania|    

In [0]:
all_pred = output_df.select('year', 'cause_of_death', 'state', 'prediction').count()
all_pred

Out[17]: 10517

In [0]:
correct_pred = output_df.select('year', 'cause_of_death', 'state', 'prediction').filter('cause_of_death = prediction').count()
correct_pred

Out[18]: 9665

In [0]:
percentage_correct_preds = correct_pred/all_pred*100

print("Percentage correct predictions: ", percentage_correct_preds)

Percentage correct predictions:  91.89883046496149


#### If percentage of correct predictions on external test set greater than 90%, register model to model registry

In [0]:
success_threshold = 90

if percentage_correct_preds > success_threshold:

    model_name = "excess_deaths_nchs"
    model_uri = f"runs:/{ mlflow_run.info.run_id }/model"
    registered_model_version = mlflow.register_model(model_uri, model_name)

Registered model 'excess_deaths_nchs' already exists. Creating a new version of this model...
2022/10/11 16:23:50 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: excess_deaths_nchs, version 13
Created version '13' of model 'excess_deaths_nchs'.


#### Update the production `excess_deaths_nchs` model in MLflow Model Registry

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()

if percentage_correct_preds > success_threshold:

    # Archive the previous version of the model
    try:
        client.transition_model_version_stage(
          name=model_name,
          version=int(registered_model_version.version)-1,
          stage="Archived"
        )
    except:
        pass

    # Promote the new model version to Production
    try: 
        client.transition_model_version_stage(
          name=model_name,
          version=registered_model_version.version,
          stage="Production"
        )
    except:
        pass