Copyright (c) Microsoft Corporation.

Licensed under the MIT License.


# DISCLAIMER
By accessing this code, you acknowledge that the code is not designed, intended, or made available: (1) as a medical device(s); (2) for the diagnosis of disease or other conditions, or in the cure, mitigation, treatment or prevention of a disease or other conditions; or (3) as a substitute for professional medical advice, diagnosis, treatment, or judgment. Do not use this code to replace, substitute, or provide professional medical advice, diagnosis, treatment, or judgement. You are solely responsible for ensuring the regulatory, legal, and/or contractual compliance of any use of the code, including obtaining any authorizations or consents, and any solution you choose to build that incorporates this code in whole or in part.


# Clean Data
We are using an open source Diabetes dataset from . The first step is to clean the source dataset into a version we can work with:

* Replace missing values with "NaNSpec"
* Feature engineering
* Write the results to the data lake


# Library Imports


In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
import pandas as pd

# Read in Data from Azure Data Lake


In [None]:
data_lake_account_name = ''
file_system_name = 'raw'

In [None]:
# Set raw data schema 
rawSchema = StructType([StructField("encounter_id", StringType(), True), 
                    StructField("patient_nbr", StringType(), True), 
                    StructField("race", StringType(), True) , 
                    StructField("gender", StringType(), True), 
                    StructField("age", StringType(), True), 
                    StructField("weight", StringType(), True), 
                    StructField("admission_type_id", StringType(), True), 
                    StructField("discharge_disposition_id", StringType(), True), 
                    StructField("admission_source_id", StringType(), True), 
                    StructField("time_in_hospital", StringType(), True),
                    StructField("payer_code", StringType(), True), 
                    StructField("medical_specialty", StringType(), True), 
                    StructField("num_lab_procedures", StringType(), True), 
                    StructField("num_procedures", StringType(), True), 
                    StructField("num_medications", StringType(), True), 
                    StructField("number_outpatient", StringType(), True), 
                    StructField("number_emergency", StringType(), True), 
                    StructField("number_inpatient", StringType(), True), 
                    StructField("diag_1", StringType(), True), 
                    StructField("diag_2", StringType(), True),
                    StructField("diag_3", StringType(), True), 
                    StructField("number_diagnoses", StringType(), True), 
                    StructField("max_glu_serum", StringType(), True), 
                    StructField("A1Cresult", StringType(), True), 
                    StructField("metformin", StringType(), True), 
                    StructField("repaglinide", StringType(), True), 
                    StructField("nateglinide", StringType(), True), 
                    StructField("chlorpropamide", StringType(), True),
                    StructField("glimepiride", StringType(), True), 
                    StructField("acetohexamide", StringType(), True), 
                    StructField("glipizide", StringType(), True), 
                    StructField("glyburide", StringType(), True), 
                    StructField("tolbutamide", StringType(), True), 
                    StructField("pioglitazone", StringType(), True), 
                    StructField("rosiglitazone", StringType(), True),
                    StructField("acarbose", StringType(), True), 
                    StructField("miglitol", StringType(), True), 
                    StructField("troglitazone", StringType(), True), 
                    StructField("tolazamide", StringType(), True), 
                    StructField("examide", StringType(), True), 
                    StructField("citoglipton", StringType(), True), 
                    StructField("insulin", StringType(), True), 
                    StructField("glyburide-metformin", StringType(), True),  
                    StructField("glipizide-metformin", StringType(), True), 
                    StructField("glimepiride-pioglitazone", StringType(), True), 
                    StructField("metformin-rosiglitazone", StringType(), True), 
                    StructField("metformin-pioglitazone", StringType(), True), 
                    StructField("change", StringType(), True),  
                    StructField("diabetesMed", StringType(), True),
                    StructField("readmitted", StringType(), True), 
                    StructField("FirstName", StringType(), True), 
                    StructField("LastName", StringType(), True),
                    StructField("Id", StringType(), True)
                    ])

In [None]:
# set transformed data schema
transformedSchema = StructType([StructField("race", StringType(), True), 
                    StructField("gender", StringType(), True), 
                    StructField("age", StringType(), True) , 
                    StructField("admission_type_id", StringType(), True), 
                    StructField("discharge_disposition_id", StringType(), True), 
                    StructField("admission_source_id", StringType(), True), 
                    StructField("time_in_hospital", StringType(), True), 
                    StructField("payer_code", StringType(), True), 
                    StructField("num_lab_procedures", StringType(), True), 
                    StructField("num_procedures", StringType(), True), 
                    StructField("num_medications", StringType(), True),
                    StructField("number_outpatient", StringType(), True), 
                    StructField("number_emergency", StringType(), True), 
                    StructField("number_inpatient", StringType(), True), 
                    StructField("number_diagnoses", StringType(), True), 
                    StructField("max_glu_serum", StringType(), True), 
                    StructField("A1Cresult", StringType(), True), 
                    StructField("metformin", StringType(), True), 
                    StructField("repaglinide", StringType(), True), 
                    StructField("nateglinide", StringType(), True), 
                    StructField("chlorpropamide", StringType(), True), 
                    StructField("glimepiride", StringType(), True),
                    StructField("glipizide", StringType(), True), 
                    StructField("glyburide", StringType(), True), 
                    StructField("tolbutamide", StringType(), True), 
                    StructField("pioglitazone", StringType(), True), 
                    StructField("rosiglitazone", StringType(), True), 
                    StructField("acarbose", StringType(), True), 
                    StructField("miglitol", StringType(), True), 
                    StructField("tolazamide", StringType(), True),
                    StructField("insulin", StringType(), True), 
                    StructField("glyburide-metformin", StringType(), True), 
                    StructField("metformin-rosiglitazone", StringType(), True), 
                    StructField("change", StringType(), True), 
                    StructField("diabetesMed", StringType(), True), 
                    StructField("FirstName", StringType(), True), 
                    StructField("LastName", StringType(), True),
                    StructField("Id", StringType(), True), 
                    StructField("spec_InternalMedicine", BooleanType(), True), 
                    StructField("spec_Emergency/Trauma", BooleanType(), True),
                    StructField("spec_Family/GeneralPractice", BooleanType(), True), 
                    StructField("spec_Cardiology", BooleanType(), True), 
                    StructField("spec_Surgery-General", BooleanType(), True), 
                    StructField("diag_428", BooleanType(), True), 
                    StructField("diag_250", BooleanType(), True), 
                    StructField("diag_276", BooleanType(), True), 
                    StructField("diag_414", BooleanType(), True), 
                    StructField("diag_401", BooleanType(), True),  
                    StructField("diag_427", BooleanType(), True), 
                    StructField("diag_599", BooleanType(), True), 
                    StructField("diag_496", BooleanType(), True), 
                    StructField("diag_403", BooleanType(), True), 
                    StructField("diag_486", BooleanType(), True),  
                    StructField("is_readmitted", BooleanType(), True)
                    ])

# Load Raw Data from ADLS


In [None]:
df_raw = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/diabetic_data.csv",header=True,multiLine=True)
df_names = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/Names/Names.csv",header=True)

In [None]:
def generateNamesMale (df_raw, df_names):
    df_raw_m = df_raw.filter(col('gender') == "Male")
    df_names_m = df_names.filter(col('gender') == "Male")

    df_names_m = df_names_m.toPandas()
    df_raw_m = df_raw_m.toPandas()

    df_names_m = pd.DataFrame(df_names_m.values.repeat(df_raw_m.shape[0]/df_names_m.shape[0] +1,  axis=0), columns=df_names_m.columns)
    df_names_m = df_names_m.head(df_raw_m.shape[0])
    df_names_m = df_names_m.sample(frac= 1)

    df_male = df_raw_m.join(df_names_m[['FirstName', 'LastName']])
    df_sp_male = spark.createDataFrame(df_male)
    return df_sp_male

In [None]:
def generateNamesFemale(df_raw, df_names):
    df_raw_f = df_raw.filter(col('gender') == "Female")
    df_names_f = df_names.filter(col('gender') == "Female")

    df_names_f = df_names_f.toPandas()
    df_raw_f = df_raw_f.toPandas()

    df_names_f = pd.DataFrame(df_names_f.values.repeat(df_raw_f.shape[0]/df_names_f.shape[0] +1,  axis=0), columns=df_names_f.columns)
    df_names_f = df_names_f.head(df_raw_f.shape[0])
    df_names_f = df_names_f.sample(frac= 1)

    df_female = df_raw_f.join(df_names_f[['FirstName', 'LastName']])
    df_sp_female = spark.createDataFrame(df_female)
    return df_sp_female

In [None]:
df_male = generateNamesMale(df_raw, df_names)
df_female = generateNamesFemale(df_raw, df_names)

df = df_male.union(df_female)

In [None]:
df = df.toPandas()

df.loc[:,'Id'] = pd.factorize(df['patient_nbr'])[0]
df_sp = spark.createDataFrame(df)

In [None]:
df_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/prepareddata/')

In [None]:
def create_additional_features(df): 
    to_drop = ['acetohexamide', 'troglitazone', 'examide', 'citoglipton',
           'glipizide-metformin', 'glimepiride-pioglitazone',
           'metformin-pioglitazone', 'weight', 'patient_nbr', 'encounter_id']
    df.drop(to_drop, axis=1, inplace=True, errors = 'ignore')
    df_transformed = df.replace('?', np.nan) 
    #print(df_transformed.shape)

    spec_counts_raw = {"specs": ['InternalMedicine', 'Emergency/Trauma', 'Family/GeneralPractice','Cardiology',
                       'Surgery-General'], "num patients": [14635,  7565,  7440,  5352,  3099]}

    df_transformed['medical_specialty'] = df_transformed['medical_specialty'].replace(np.nan, "NaNSpec")
    spec_counts = pd.DataFrame(data = spec_counts_raw)
    spec_thresh = 5
    for (index, row) in spec_counts.head(spec_thresh).iterrows():
        spec = row['specs']
        new_col = 'spec_' + str(spec)
        df_transformed[new_col] = (df_transformed.medical_specialty == spec)
    #print(df_transformed.shape)

    diag_counts_raw = {"icd9value": ['428', '250', '276', '414', '401', '427', '599', '496', '403', '486'],
                    'num patients w diag': [18101., 17861., 13816., 12895., 12371., 11757.,  6824.,  5990.,5693., 5455.]}

    diag_counts = pd.DataFrame(diag_counts_raw, columns = [ 'icd9value', 'num patients w diag'])

    diag_thresh = 10
    for (index, row) in diag_counts.head(diag_thresh).iterrows():
        icd9 = row['icd9value']
        new_col = 'diag_' + str(icd9)
        df_transformed[new_col] = (df_transformed.diag_1 == icd9)|(df_transformed.diag_2 == icd9)|(df_transformed.diag_3 == icd9)
    
    #print(df_transformed.columns)

    df_transformed = df_transformed.reset_index(drop=True)

    df_transformed2 = pd.DataFrame(df_transformed, copy=True) #preserve df_transformed so I can rerun this step
    df_transformed2['age'] = df_transformed2.age.str.extract('(\d+)-\d+')

    to_drop = ['acetohexamide', 'troglitazone', 'examide', 'citoglipton',
        'glipizide-metformin', 'glimepiride-pioglitazone',
        'metformin-pioglitazone', 'weight', 'medical_specialty', 'diag_2',
        'diag_1', 'diag_3', 'patient_nbr', 'encounter_id']
    df_transformed2.drop(to_drop, axis=1, inplace=True,errors = 'ignore')
    #print(df_transformed2.shape)

    df_transformed2 = df_transformed2.reset_index(drop=True)

    print(df_transformed2['readmitted'].value_counts())

    #create outcome variable
    df_transformed2['is_readmitted'] = (df_transformed2.readmitted != 'NO')  #check why this is happening
    df_transformed2.drop('readmitted', axis=1, inplace=True)

    df = pd.DataFrame(df_transformed2)

    #partition training and test data, one balanced training set, all remaining for testing 
    outcome_column = 'is_readmitted' 

    #Imputing with outlying value since we are focusing on tree based methods
    df = df.fillna(-9999) 

    df = df.reset_index(drop=True)
    df.dtypes
    
    return df

# Train-Test Split

Split data into a 50-50 training-test split

In [None]:
# read in raw data 
df = spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/prepareddata/",header=True,multiLine=True)
df_raw = df.toPandas()

df_raw.rename(columns = {'readmitted\r':'readmitted'}, inplace = True)

#save dataset 
df_sp = spark.createDataFrame(df_raw,schema=rawSchema)
df_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/prepareddata/')

df_sp.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "patientHubDB")\
    .option("spark.cosmos.container", "Patient")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('overwrite')\
    .save()

from sklearn.model_selection import train_test_split
train, test = train_test_split(df_raw, test_size=0.5)

# save test df 
test_sp = spark.createDataFrame(test,schema=rawSchema)
test_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/preparedtestdata/')

df = create_additional_features(train)

# save train df 
train_sp = spark.createDataFrame(df,schema=transformedSchema)
train_sp.write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/DatasetDiabetes/preparedtraindata/')

# Save Prepared Data to Spark Table


In [None]:
#load the cleaned data to a spark database 
try:
   spark.sql("CREATE DATABASE diabetesdb")
except:
   print("Database already exists")
train_sp.write.mode("overwrite").saveAsTable("diabetesdb.traindata")

In [None]:
try:
   spark.sql("CREATE DATABASE diabetesdb")
except:
   print("Database already exists")
test_sp.write.mode("overwrite").saveAsTable("diabetesdb.testdata")