In [0]:
# # #Mount Azure Blob storage to Databricks
# storage_account_key = "accountkey"

# dbutils.fs.mount(
#   source = "wasbs://databricks-practice-container@kepracticestorage.blob.core.windows.net",
#   mount_point = "/mnt/opioid_data",
#   extra_configs = {"fs.azure.account.key.kepracticestorage.blob.core.windows.net": storage_account_key})

# teds1517 = spark.read.csv('dbfs:/mnt/opioid_data/tedsa_puf_2015_2017.csv',header="true", inferSchema="true")
# teds18 = spark.read.csv('dbfs:/mnt/opioid_data/tedsa_puf_2018.csv',header="true", inferSchema="true")

# # Converting Spark DataFrame to Delta Table
# deltaPath = "dbfs:/delta/opioid_data"
# dbutils.fs.rm(deltaPath, True)
# teds1517.write.format("delta").mode("overwrite").save(deltaPath + '/data201517')
# teds18.write.format("delta").mode("overwrite").save(deltaPath + '/data2018')

# display(dbutils.fs.ls(deltaPath))

path,name,size
dbfs:/delta/opioid_data/data201517/,data201517/,0
dbfs:/delta/opioid_data/data2018/,data2018/,0


### Read in data

In [0]:
#### USE BELOW FOR BIG DATA PRACTICE
deltaPath = "dbfs:/delta/opioid_data"
teds1517 = spark.read.format("delta").load(deltaPath + '/data201517')
teds18 = spark.read.format("delta").load(deltaPath + '/data2018')

column_order = teds1517.columns
teds18 = teds18.select(column_order)
teds_all = teds1517.union(teds18)

#### Also run the code below if you want faster run times and/or cheaper cluster
teds_all = teds_all.sample(False, 0.025, seed=0)

In [0]:
# Count number of substances in the person's system when they entered treatment
condition = lambda col: 'FLG' in col
col = teds_all.select(*filter(condition, teds_all.columns)).columns
teds_all = teds_all.withColumn('NUMSUBS', sum(teds_all[col]))

In [0]:
# Subset to columns without a lot of missing data
teds_sm = teds_all.select('CASEID','ADMYR','AGE','GENDER','RACE','ETHNIC','EDUC','EMPLOY','VET','LIVARAG',\
                          'STFIPS','CBSA2010','DIVISION','REGION','SERVICES','PSOURCE','NOPRIOR','ARRESTS','ROUTE1','FRSTUSE1','FREQ1', \
                          'ROUTE2','FRSTUSE2', 'FREQ2','ROUTE3','FRSTUSE3','FREQ3','NUMSUBS','METHUSE','ALCFLG','PSYPROB', \
                          'COKEFLG','MARFLG','HERFLG','METHFLG','OPSYNFLG','PCPFLG','HALLFLG','MTHAMFLG','AMPHFLG','STIMFLG', \
                          'BENZFLG','TRNQFLG','BARBFLG','SEDHPFLG','INHFLG','OTCFLG','OTHERFLG')

### Recoding variables

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, when

def freq_recode(column1, column2, column3):
    if column1 == 1 and column2 == 1 and column3 == 1:
        return 0.0
    if (column1 == 3 or column2 == 3 or column3 == 3):
        return 2.0
    if (column1 == 2 or column2 == 2 or column3 == 2):
        return 1.0
    if (column1 == 1 or column2 == 1 or column3 == 1):
        return 0.0
    else:
        return None
      
freq_recode_udf = udf(freq_recode, DoubleType())

teds_sm = teds_sm.withColumn('FREQ', freq_recode_udf('FREQ1','FREQ2','FREQ3'))

In [0]:
from pyspark.sql.types import IntegerType

def age_recode(column1, column2, column3):
    if column1 == 1 or column2 == 1 or column3 == 1:
        return 1.0
    if column1 == 2 or column2 == 2 or column3 == 2:
        return 2.0
    if column1 == 3 or column2 == 3 or column3 == 3:
        return 3.0
    if column1 == 4 or column2 == 4 or column3 == 4:
        return 4.0
    if column1 == 5 or column2 == 5 or column3 == 5:
        return 5.0
    if column1 == 6 or column2 == 6 or column3 == 6:
        return 6.0
    if column1 == 7 or column2 == 7 or column3 == 7:
        return 7.0
    else:
       return None
      
age_recode_udf = udf(age_recode, DoubleType())

teds_sm = teds_sm.withColumn('FRSTUSE', age_recode_udf('FRSTUSE1','FRSTUSE2','FRSTUSE3'))

In [0]:
def oral_recode(column1, column2, column3):
    if (column1 == 1 and column1 != None) or (column2 == 1 and column2 != None) or (column3 == 1 and column3 != None):
        return 'Admin_Orally'
    elif (column1 > 1 and column1 != None) or (column2 > 1 and column2 != None) or (column3 > 1 and column3 != None):
        return 'Not_Admin_Orally'
    else:
        return None

oral_rc_udf = udf(oral_recode, StringType())
teds_sm = teds_sm.withColumn('ORAL_DRUG_USE', oral_rc_udf('ROUTE1','ROUTE2','ROUTE3'))

def smoking_recode(column1, column2, column3):
    if (column1 == 2 and column1 != None) or (column2 == 2 and column2 != None) or (column3 == 2 and column3 != None):
        return 'Admin_Smoking'
    elif (column1 > 0 and column1 != None) or (column2 > 0 and column2 != None) or (column3 > 0 and column3 != None):
        return 'Not_Admin_Smoking'
    else:
        return None

smoke_rc_udf = udf(smoking_recode, StringType())
teds_sm = teds_sm.withColumn('SMOKING_DRUG_USE', smoke_rc_udf('ROUTE1','ROUTE2','ROUTE3'))

def inhalation_recode(column1, column2, column3):
    if (column1 == 3 and column1 != None) or (column2 == 3 and column2 != None) or (column3 == 3 and column3 != None):
        return 'Admin_Inhale'
    elif (column1 > 0 and column1 != None) or (column2 > 0 and column2 != None) or (column3 > 0 and column3 != None):
        return 'Not_Admin_Inhale'
    else:
        return None

inhale_rc_udf = udf(inhalation_recode, StringType())
teds_sm = teds_sm.withColumn('INHALE_DRUG_USE', inhale_rc_udf('ROUTE1','ROUTE2','ROUTE3'))

def injection_recode(column1, column2, column3):
    if (column1 == 4 and column1 != None) or (column2 == 4 and column2 != None) or (column3 == 4 and column3 != None):
        return 'Admin_Injection'
    elif (column1 > 0 and column1 != None) or (column2 > 0 and column2 != None) or (column3 > 0 and column3 != None):
        return 'Not_Admin_Injection'
    else:
        return None
      
inject_rc_udf = udf(injection_recode, StringType())
teds_sm = teds_sm.withColumn('INJECT_DRUG_USE', inject_rc_udf('ROUTE1','ROUTE2','ROUTE3'))

def dich_flag_recode(column):
    if column == 0:
        return 'Not_Reported'
    if column == 1:
        return 'Reported'
    else:
        return None

flg_columns = ['ALCFLG','COKEFLG','MARFLG','HERFLG','METHFLG','OPSYNFLG','PCPFLG','HALLFLG','MTHAMFLG','AMPHFLG','STIMFLG','BENZFLG','TRNQFLG',
               'BARBFLG','SEDHPFLG','INHFLG','OTCFLG','OTHERFLG']    
      
dich_flag_udf = udf(dich_flag_recode, StringType())

from functools import reduce

teds_sm  = (reduce(
    lambda recode_df, col_name: recode_df.withColumn(col_name, dich_flag_udf(col(col_name))),
    flg_columns,
    teds_sm
))

In [0]:
# Male is zero, female is 1
teds_sm = teds_sm.withColumn("GENDER", when(col("GENDER") == 1, 'Male') \
                               .when(col("GENDER") == 2, 'Female'))

teds_sm = teds_sm.withColumn("METHUSE", when(col("METHUSE") == 1, 'Yes')\
                            .when(col("METHUSE") == 2, 'No'))

teds_sm = teds_sm.withColumn("RACE", when(col("RACE") == 1, "Alaska_Native") \
                             .when(col("RACE") == 2, "American_Indian") \
                             .when(col("RACE") == 3, "Asian_Pacific_Islander") \
                             .when(col("RACE") == 4, "Black") \
                             .when(col("RACE") == 5, "White") \
                             .when(col("RACE") == 6, "Asian") \
                             .when(col("RACE") == 7, "Other_Single_Race") \
                             .when(col("RACE") == 8, "Two_or_More_Races") \
                             .when(col("RACE") == 9, "Hawaiian_Pacific_Islander"))

teds_sm = teds_sm.withColumn("ETHNIC", when(col("ETHNIC") == 4, 'Hispanic') \
                             .when(col("ETHNIC") > 0, 'Not_Hispanic'))   
                               
teds_sm = teds_sm.withColumn("EDUC", when(col("EDUC") > 0, col("EDUC")).otherwise(None))

teds_sm = teds_sm.withColumn("EMPLOY", when(col("EMPLOY") == 1, "Full_Time") \
                             .when(col("EMPLOY") == 2, "Part_Time") \
                             .when(col("EMPLOY") == 3, "Unemployed") \
                             .when(col("EMPLOY") == 4, "Not_in_Labor_Force"))

teds_sm = teds_sm.withColumn("VET", when(col("VET") == 1, 'Yes') \
                             .when(col("VET") == 2, "No"))

teds_sm = teds_sm.withColumn("LIVARAG", when(col("LIVARAG") == 1, "Homeless") \
                             .when(col("LIVARAG") == 2, "Dependent_Living") \
                             .when(col("LIVARAG") == 3, "Independent_Living"))

teds_sm = teds_sm.withColumn("ARRESTS", when(col("ARRESTS") >= 0, col("ARRESTS")).otherwise(None))

teds_sm = teds_sm.withColumn("CBSA2010", when(col("CBSA2010") >= 0, col("CBSA2010")).otherwise(None))

teds_sm = teds_sm.withColumn("SERVICES", when(col("SERVICES") == 1, "Detox_Hospital_Inpatient") \
                             .when(col("SERVICES") == 2, "Detox_Residential") \
                             .when(col("SERVICES") == 3, "Rehab_Resid_Inpatient") \
                             .when(col("SERVICES") == 4, "Rehab_Resid_Short_Term") \
                             .when(col("SERVICES") == 5, "Rehab_Resid_Long_Term") \
                             .when(col("SERVICES") == 6, "Ambulatory_Intensive") \
                             .when(col("SERVICES") == 7, "Ambulatory_NonIntensive") \
                             .when(col("SERVICES") == 8, "Ambulatory_Detox"))

teds_sm = teds_sm.withColumn("PSOURCE", when(col("PSOURCE") == 1, "Self") \
                            .when(col("PSOURCE") == 2, "Alcohol_Drug_Prof") \
                            .when(col("PSOURCE") == 3, "Health_Care_Prof") \
                            .when(col("PSOURCE") == 4, "School") \
                            .when(col("PSOURCE") == 5, "Employer")
                            .when(col("PSOURCE") == 6, "Community") \
                            .when(col("PSOURCE") == 7, "Court"))

teds_sm = teds_sm.withColumn("NOPRIOR", when(col("NOPRIOR") == 0, 0) \
                            .when(col("NOPRIOR") > 0, 1))

teds_sm = teds_sm.withColumn("PSYPROB", when(col("PSYPROB") == 1, 'Yes') \
                            .when(col("PSYPROB") == 2, 'No'))

teds_sm = teds_sm.withColumn("DIVISION", when(col("DIVISION") == 0, "US_Territories") \
                            .when(col("DIVISION") == 1, "New_England") \
                            .when(col("DIVISION") == 2, "Middle_Atlantic") \
                            .when(col("DIVISION") == 3, "East_North_Central") \
                            .when(col("DIVISION") == 4, "West_North_Central") \
                            .when(col("DIVISION") == 5, "South_Atlantic")
                            .when(col("DIVISION") == 6, "East_South_Central") \
                            .when(col("DIVISION") == 7, "West_South_Central") \
                            .when(col("DIVISION") == 8, "Mountain") \
                            .when(col("DIVISION") == 9, "Pacific"))

teds_sm = teds_sm.withColumn("REGION", when(col("REGION") == 0, "US_Territories") \
                            .when(col("REGION") == 1, "Northeast") \
                            .when(col("REGION") == 2, "Midwest") \
                            .when(col("REGION") == 3, "South") \
                            .when(col("REGION") == 4, "West"))

teds_sm = teds_sm.withColumn("STFIPS", when(col("STFIPS") == 1, "Alabama") \
                            .when(col("STFIPS") == 2, "Alaska") \
                            .when(col("STFIPS") == 4, "Arizona") \
                            .when(col("STFIPS") == 5, "Arkansas")
                            .when(col("STFIPS") == 6, "California") \
                            .when(col("STFIPS") == 8, "Colorado") \
                            .when(col("STFIPS") == 9, "Connecticut") \
                            .when(col("STFIPS") == 10, "Delaware") \
                            .when(col("STFIPS") == 11, "DC") \
                            .when(col("STFIPS") == 12, "Florida") \
                            .when(col("STFIPS") == 15, "Hawaii") \
                            .when(col("STFIPS") == 16, "Idaho") \
                            .when(col("STFIPS") == 17, "Illinois") \
                            .when(col("STFIPS") == 18, "Indiana") \
                            .when(col("STFIPS") == 19, "Iowa") \
                            .when(col("STFIPS") == 20, "Kansas") \
                            .when(col("STFIPS") == 21, "Kentucky") \
                            .when(col("STFIPS") == 22, "Louisiana") \
                            .when(col("STFIPS") == 23, "Maine") \
                            .when(col("STFIPS") == 24, "Maryland") \
                            .when(col("STFIPS") == 25, "Massachusetts") \
                            .when(col("STFIPS") == 26, "Michigan") \
                            .when(col("STFIPS") == 27, "Minnesota") \
                            .when(col("STFIPS") == 28, "Mississippi") \
                            .when(col("STFIPS") == 29, "Missouri") \
                            .when(col("STFIPS") == 30, "Montana") \
                            .when(col("STFIPS") == 31, "Nebraska") \
                            .when(col("STFIPS") == 32, "Nevada") \
                            .when(col("STFIPS") == 33, "New_Hampshire") \
                            .when(col("STFIPS") == 34, "New_Jersey") \
                            .when(col("STFIPS") == 35, "New_Mexico") \
                            .when(col("STFIPS") == 36, "New_York") \
                            .when(col("STFIPS") == 37, "North_Carolina") \
                            .when(col("STFIPS") == 38, "North_Dakota") \
                            .when(col("STFIPS") == 39, "Ohio") \
                            .when(col("STFIPS") == 40, "Oklahoma") \
                            .when(col("STFIPS") == 42, "Pennsylvania") \
                            .when(col("STFIPS") == 44, "Rhode_Island") \
                            .when(col("STFIPS") == 45, "South_Carolina") \
                            .when(col("STFIPS") == 46, "South_Dakota") \
                            .when(col("STFIPS") == 47, "Tennessee") \
                            .when(col("STFIPS") == 48, "Texas") \
                            .when(col("STFIPS") == 49, "Utah") \
                            .when(col("STFIPS") == 50, "Vermont") \
                            .when(col("STFIPS") == 51, "Virginia") \
                            .when(col("STFIPS") == 53, "Washington")
                            .when(col("STFIPS") == 54, "West_Virginia") \
                            .when(col("STFIPS") == 55, "Wisconsin") \
                            .when(col("STFIPS") == 56, "Wyoming") \
                            .when(col("STFIPS") == 72, "Puerto_Rico"))

In [0]:
teds2018 = teds_sm.filter(teds_sm.ADMYR == '2018')

In [0]:
teds_recoded = teds2018.drop('FREQ1','FREQ2','FREQ3','FRSTUSE1','FRSTUSE2','FRSTUSE3','ROUTE1','ROUTE2','ROUTE3','ADMYR','CASEID','CBSA2010','STFIPS','REGION')
teds_recoded = teds_recoded.where(col('NOPRIOR').isNotNull())

In [0]:
from pyspark.sql.types import DoubleType

teds_recoded = teds_recoded.withColumn("AGE",teds_recoded["AGE"].cast(DoubleType()))
teds_recoded = teds_recoded.withColumn("NOPRIOR",teds_recoded["NOPRIOR"].cast(DoubleType()))
teds_recoded = teds_recoded.withColumn("ARRESTS",teds_recoded["ARRESTS"].cast(DoubleType()))
teds_recoded = teds_recoded.withColumn("NUMSUBS",teds_recoded["NUMSUBS"].cast(DoubleType()))
teds_recoded = teds_recoded.withColumn("EDUC",teds_recoded["EDUC"].cast(DoubleType()))

In [0]:
# To cut down run time, subsetting to fewer columns
teds_recoded = teds_recoded.select('RACE','DIVISION','LIVARAG','VET','METHUSE','ALCFLG','HERFLG','COKEFLG','EDUC','EMPLOY','SERVICES','OPSYNFLG',
                                   'FRSTUSE','PSOURCE','FREQ','NOPRIOR')

### Subsetting data into training, validation, and holdout data sets

In [0]:
import pyspark.sql.functions as F

(train_temp, test) = teds_recoded.randomSplit([.7, .3], seed=42)
(train, holdout) = train_temp.randomSplit([.8, .2], seed=42)

In [0]:
mode_impute_cols = [field for (field, dataType) in train.dtypes if ((dataType == "string") & (field != "NOPRIOR"))]

for col_name in mode_impute_cols:
    mode_counts = train.groupBy(col_name).count().toPandas()
    mode_value = list(mode_counts.sort_values('count', ascending=False).loc[:, col_name])[0]
    train = train.withColumn(col_name, when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))
    test = test.withColumn(col_name, when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))
    holdout = holdout.withColumn(col_name, when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))

### Linear Regression Model

In [0]:
import mlflow
import mlflow.spark
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, Imputer
from pyspark.ml import Pipeline

with mlflow.start_run(run_name="lasso_reg_model") as run:
  
    # Getting numeric and categorical column names
    categoricalCols = [field for (field, dataType) in train.dtypes if ((dataType == "string") & (field != "NOPRIOR"))]
    numericCols = [field for (field, dataType) in train.dtypes if ((dataType == "double") & (field != "NOPRIOR"))]

    # Adding label to columns created in each step of the modeling pipeline
    indexOutputCols = [x + "_Index" for x in categoricalCols]
    oheOutputCols = [x + "_OHE" for x in categoricalCols]
    numImputedCols = [x + "_Imputed" for x in numericCols]

    # Creating estimator and transformers for modeling pipeline
    stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
    oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)
    numImputer = Imputer(inputCols=numericCols, outputCols=numImputedCols, strategy='mode')

    # Create vector assembler for all columns, numeric and categorical
    assemblerInputs = oheOutputCols + numImputedCols
    vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

    elasticNetParam=1
    mlflow.log_param("elastic_net_param", elasticNetParam)
    
    # Create model
    lr = LogisticRegression(labelCol="NOPRIOR", featuresCol="features",elasticNetParam=elasticNetParam)

    # Create modeling pipeline steps
    lr_pipeline_steps = [stringIndexer, oheEncoder, numImputer, vecAssembler, lr]
    # Run pipeline steps created above
    lr_pipeline = Pipeline(stages=lr_pipeline_steps)

    # Fit pipeline to training data
    lr_pipeline_model = lr_pipeline.fit(train)

    # Set up path to log model
    lr_pipelinePath = deltaPath + "/models/lr_baseline_pipeline_model"
    lr_pipeline_model.write().overwrite().save(lr_pipelinePath)

    # Use model to make predictions on test dataset
    lr_pipeline_trans = lr_pipeline_model.transform(test)

    # Log model
    mlflow.spark.log_model(lr_pipeline_model, "lr_model")

    # Calculate true/false positives and negatives
    TN = lr_pipeline_trans.filter('prediction = 0 AND NOPRIOR = prediction').count()
    TP = lr_pipeline_trans.filter('prediction = 1 AND NOPRIOR = prediction').count()
    FN = lr_pipeline_trans.filter('prediction = 0 AND NOPRIOR <> prediction').count()
    FP = lr_pipeline_trans.filter('prediction = 1 AND NOPRIOR <> prediction').count()

    # Calculate model evaluation metrics
    accuracy = (TN + TP) / (TN + TP + FN + FP)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    F =  2 * (precision*recall) / (precision + recall)

    # Log evaluation metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f-score", F)

    # Extract model portion of pipeline
    lr_model = lr_pipeline_model.stages[-1]
    
    # Extract coefficients into pandas dataframe
    coefficient_df = pd.DataFrame(list(zip(lr_model.coefficients, vecAssembler.getInputCols())), 
                                      columns = ['coefficient','feature']).sort_values(by="coefficient", ascending=False)

    # Log coefficients
    csv_path = "./lr_baseline_coefficients.csv"
    coefficient_df.to_csv(csv_path, index=False)
    mlflow.log_artifact(csv_path)

In [0]:
display(coefficient_df)

coefficient,feature
2.3129890471178896,COKEFLG_OHE
0.8981483315997963,HERFLG_OHE
0.8587917929530692,RACE_OHE
0.7969549961145088,DIVISION_OHE
0.7674057300335143,VET_OHE
0.7293353372839962,LIVARAG_OHE
0.6852738088371813,METHUSE_OHE
0.5412070653047074,ALCFLG_OHE
0.5205335932992824,FRSTUSE_Imputed
0.4567172764323824,EDUC_Imputed


In [0]:
print("accuracy is: " + str(accuracy))
print("precision is: " + str(precision))
print("recall is: " + str(recall))
print("f-score is: " + str(F))

### Random Forest Model

In [0]:
from pyspark.ml.classification import RandomForestClassifier

with mlflow.start_run(run_name="rf_base_model") as run:
    
    # Create model
    rf = RandomForestClassifier(labelCol="NOPRIOR", featuresCol="features")
    
    # Create modeling pipeline steps
    rf_pipeline_steps = [stringIndexer, oheEncoder, numImputer, vecAssembler, rf]
    # Run pipeline steps created above
    rf_pipeline = Pipeline(stages=rf_pipeline_steps)
    
    # Fit pipeline to training data
    rf_pipeline_model = rf_pipeline.fit(train)
    # Use model to make predictions on test dataset
    rf_pipeline_trans = rf_pipeline_model.transform(test)
    
    # Log model
    mlflow.spark.log_model(rf_pipeline_model, "rf_model")

    # Calculate true/false positives and negatives
    TN = rf_pipeline_trans.filter('prediction = 0 AND NOPRIOR = prediction').count()
    TP = rf_pipeline_trans.filter('prediction = 1 AND NOPRIOR = prediction').count()
    FN = rf_pipeline_trans.filter('prediction = 0 AND NOPRIOR <> prediction').count()
    FP = rf_pipeline_trans.filter('prediction = 1 AND NOPRIOR <> prediction').count()

    # Calculate model evaluation metrics
    accuracy = (TN + TP) / (TN + TP + FN + FP)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    F =  2 * (precision*recall) / (precision + recall)
    
    # Log evaluation metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f-score", F)
    
    # Extract feature importances and put them into a pandas dataframe
    rf_model = rf_pipeline_model.stages[-1]
    feature_imp_rf = pd.DataFrame(list(zip(vecAssembler.getInputCols(), rf_model.featureImportances)), 
                                  columns = ['feature','importance']).sort_values(by="importance", ascending=False)
    
    # Log feature importances
    csv_path = "./rf_feat_importances.csv"
    feature_imp_rf.to_csv(csv_path, index=False)
    mlflow.log_artifact(csv_path)

In [0]:
print("accuracy is: " + str(accuracy))
print("precision is: " + str(precision))
print("recall is: " + str(recall))
print("f-score is: " + str(F))

In [0]:
display(feature_imp_rf)

feature,importance
FREQ_Imputed,0.3577912422126961
SERVICES_OHE,0.0534633159653038
EMPLOY_OHE,0.0403537616734606
EDUC_Imputed,0.0379697201186535
FRSTUSE_Imputed,0.0218532623962476
PSOURCE_OHE,0.0142239734160826
LIVARAG_OHE,0.002138040821726
OPSYNFLG_OHE,0.0010376995191774
METHUSE_OHE,0.0009640623315494608
COKEFLG_OHE,0.0008521588155253216


In [0]:
train_sm = train.drop('DIVISION_OHE','HERFLG_OHE','RACE_OHE','ALCFLG_OHE','VET_OHE')
test_sm = test.drop('DIVISION_OHE','HERFLG_OHE','RACE_OHE','ALCFLG_OHE','VET_OHE')
holdout_sm = holdout.drop('DIVISION_OHE','HERFLG_OHE','RACE_OHE','ALCFLG_OHE','VET_OHE')

### Perform a grid search with Gradient Boosted Trees

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
import mlflow
import mlflow.spark
import pandas as pd
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline

with mlflow.start_run(run_name="gbt_maxDepth_stepSize_gs") as run:

    # Getting numeric and categorical column names
    categoricalCols_sm = [field for (field, dataType) in train_sm.dtypes if ((dataType == "string") & (field != "NOPRIOR"))]
    numericCols_sm = [field for (field, dataType) in train_sm.dtypes if ((dataType == "double") & (field != "NOPRIOR"))]

    # Adding label to columns created in each step of the modeling pipeline
    indexOutputCols_sm = [x + "_Index" for x in categoricalCols]
    oheOutputCols_sm = [x + "_OHE" for x in categoricalCols]
    numImputedCols_sm = [x + "_Imputed" for x in numericCols]
    
    assemblerInputs_sm = oheOutputCols_sm + numericCols_sm

    # Creating estimator and transformers for modeling pipeline
    stringIndexer_sm = StringIndexer(inputCols=categoricalCols_sm, outputCols=indexOutputCols_sm, handleInvalid="skip")
    oheEncoder_sm = OneHotEncoder(inputCols=indexOutputCols_sm, outputCols=oheOutputCols_sm)
    numImputer_sm = Imputer(inputCols=numericCols_sm, outputCols=numImputedCols_sm, strategy='mode')

    assemblerInputs_sm = oheOutputCols_sm + numImputedCols_sm
    vecAssembler_sm = VectorAssembler(inputCols=assemblerInputs_sm, outputCol="features")

    # Create model
    gbt = GBTClassifier(labelCol="NOPRIOR", featuresCol="features")

    # Create modeling pipeline steps
    gbt_pipeline_steps = [stringIndexer_sm, oheEncoder_sm, numImputer_sm, vecAssembler_sm, gbt]
    # Run pipeline steps created above
    gbt_pipeline = Pipeline(stages=gbt_pipeline_steps)

    # Build grid to perform grid search for hyperparameter tuning
    gbt_paramGrid = ParamGridBuilder()\
      .addGrid(gbt.maxDepth, [5, 10]) \
      .addGrid(gbt.stepSize, [0.1, 0.5])\
      .build()
  
    # Create model evaluator
    bin_eval_gbt = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="NOPRIOR", metricName="areaUnderROC")

    # Perform train test split
    gbt_tvs = TrainValidationSplit(estimator=gbt_pipeline,
                             trainRatio=0.7,
                             estimatorParamMaps=gbt_paramGrid,
                             evaluator=bin_eval_gbt)
                             # 70% of the data will be used for training, 30% for validation

    # Fit the pipeline with the steps created above
    gbt_tvs_fitted = gbt_tvs.fit(teds_recoded)
    gbt_pred = gbt_tvs_fitted.transform(teds_recoded)

    # Evaluate model
    eval_test_metric = bin_eval_gbt.evaluate(gbt_pred)

    # Log model
    mlflow.spark.log_model(gbt_tvs_fitted, "gbt_maxDepth_stepSize")

    # Log model parameters
    mlflow.log_param("maxDepth", gbt.maxDepth)
    mlflow.log_param("stepSize", gbt.stepSize)

    # Calculate model evaluation metrics
    TN = gbt_pred.filter('prediction = 0 AND NOPRIOR = prediction').count()
    TP = gbt_pred.filter('prediction = 1 AND NOPRIOR = prediction').count()
    FN = gbt_pred.filter('prediction = 0 AND NOPRIOR <> prediction').count()
    FP = gbt_pred.filter('prediction = 1 AND NOPRIOR <> prediction').count()

    # Calculate model evaluation metrics
    accuracy = (TN + TP) / (TN + TP + FN + FP)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    F =  2 * (precision*recall) / (precision + recall)

    # Log evaluation metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f-score", F)

In [0]:
print("accuracy is: " + str(accuracy))
print("precision is: " + str(precision))
print("recall is: " + str(recall))
print("f-score is: " + str(F))