# Modelling Medical Insurance data using logistic regression

In [0]:
# Data processing
from pyspark.sql.functions import *

# Modeling
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [0]:
write_path = 'dbfs:/tmp/reproducible_ml_uofl/medinsurance_linearreg_class1_7_csv.delta'
medins_chrgs = spark.read.format('delta').load(write_path)

#Show basic summary stats
display(medins_chrgs.summary())

summary,age,bmi,children,region,charges,gender_cd,smoker_cd
count,1338.0,1338.0,1338.0,1338,1338.0,1338.0,1338.0
mean,39.20702541106129,30.663396860986538,1.0949177877429,,13270.422265141257,0.4947683109118087,0.2047832585949177
stddev,14.049960379216149,6.098186911679012,1.205492739781914,,12110.011236693992,0.5001595692843768,0.4036940375456171
min,18.0,15.96,0.0,northeast,1121.8739,0.0,0.0
25%,27.0,26.29,0.0,,4738.2682,0.0,0.0
50%,39.0,30.4,1.0,,9377.9047,0.0,0.0
75%,51.0,34.7,2.0,,16657.71745,1.0,0.0
max,64.0,53.13,5.0,southwest,63770.42801,1.0,1.0


In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import row_number

#create percentile ranks
medins_chrgs = medins_chrgs.select("age","bmi","children","region","charges","gender_cd","smoker_cd", 
                                   f.row_number().over(Window.partitionBy().orderBy(medins_chrgs['charges'])).alias("row_num"),
                                   f.percent_rank().over(Window.partitionBy().orderBy(medins_chrgs['charges'])).alias("percent_rank"))
#display(medins_chrgs)

#create targets for Top 5%, 10%, and 25%
medins_chrgs_new=(medins_chrgs.withColumn('top05_trgt', f.when(f.col('percent_rank') >=0.95, 1).otherwise(0))
                  .withColumn('top10_trgt', f.when(f.col('percent_rank') >=0.9, 1).otherwise(0))
                  .withColumn('top25_trgt', f.when(f.col('percent_rank') >=0.75, 1).otherwise(0))
)
medins_chrgs_new.show()
medins_chrgs_new.groupBy('top05_trgt').count().show()
medins_chrgs_new.groupBy('top10_trgt').count().show()
medins_chrgs_new.groupBy('top25_trgt').count().show()

In [0]:
# Train test split
trainDF, testDF = medins_chrgs_new.randomSplit([.65, .35], seed=42)
# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')

In [0]:
#One hot encoding of string feature Region
trainDF.groupBy('region').count().show()

##Now we need to modify the categorical variable region into one-hot-encoded version
 For this we will use a pipeline to do this in one step

In [0]:
#You can also create a pipeline and do everything together in one easy fit and transform step
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
categoricalColumns = ["region"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
# Use OneHotEncoder to convert categorical variables into binary SparseVectors
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder])
 
# Define the pipeline model.
transform_mdl = pipeline.fit(trainDF)
trainDF21=transform_mdl.transform(trainDF)
trainDF21.show()

In [0]:
trainDF21.printSchema()

In [0]:
# Linear regression expect a vector input
vecAssembler = VectorAssembler(inputCols=['age','bmi','children','gender_cd','smoker_cd','regionclassVec'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF21)

# Take a look at the data
#display(vecTrainDF)

In [0]:
testDF21=transform_mdl.transform(testDF) #do the data transformation using saved parameters from training
vecTestDF = vecAssembler.transform(testDF21) #do the feature transformation using vector assembler

In [0]:
# Create linear regression
lr = LogisticRegression(featuresCol="features", labelCol="top10_trgt")
# Fit the linear regresssion model
lrModel = lr.fit(vecTrainDF)
predict_train = lrModel.transform(vecTrainDF)

# Make predictions on testing dataset
predict_test = lrModel.transform(vecTestDF) #make predictions using the trained model

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator(labelCol = "top10_trgt")

auc_train = eval.evaluate(predict_train)
print(auc_train)

auc_test = eval.evaluate(predict_test)
print(auc_test)


In [0]:
print(lrModel.summary.areaUnderROC)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [0]:
import pandas as pd
import time 
import sys
from pyspark.sql.window import *
from pyspark.sql.types import * 
from pyspark.sql.functions import *
import pyspark.sql.functions as f

def get_model_stats(print_lable,dataset,evaluator,mbr_id,target):
  print(print_lable)
  AUROC=evaluator.evaluate(dataset)
  
  tot_event=dataset.count()
  tot_target=dataset.select(target).filter(f.col(target)==1).count()
  inc_rate=float(tot_target)/float(tot_event)
  print('Incidence rate in dataset: ',inc_rate)
  print('AUROC: ',AUROC)
  

  def mbr_prob1(prob_vec):
    return float(prob_vec[1]) #Get the second value from probability vector of the prediction since that is probability of target 1
  def mbr_prob2(rawp_vec):
    prob_vec = np.exp(rawp_vector)/(1+ np.exp(rawp_vec)) #this is the same as the logit function e**x/1+e**x
    return float(prob_vec[1])
  get_prob_1= udf(mbr_prob1, FloatType())
  get_prob_2= udf(mbr_prob2, FloatType())

  try:
    df_plot_tmp = dataset.withColumn('prob_score', get_prob_1('probability'))
  except:
    df_plot_tmp = dataset.withColumn('prob_score', get_prob_2('rawPrediction'))
  df_reg1=(df_plot_tmp.select(mbr_id,"rawPrediction","probability","prob_score",target))

  #Create Percentiles based on the predicted probability
  df_reg_12=  df_reg1.sort(col("prob_score").desc())
  df_reg_12 = df_reg_12.withColumn("new_column",lit("ABC"))
  w = Window().partitionBy("new_column").orderBy(col("prob_score").desc())  
  df_reg_12 = df_reg_12.withColumn("row_num",row_number().over(w)).drop("new_column")
  df_reg2 = (df_reg_12
             .withColumn("score_pctl", f.percent_rank().over(Window.orderBy(f.col("row_num"))))
             .withColumn("pctl_cat", f.when(f.col("score_pctl")==1, f.lit(100)).otherwise(f.floor(f.col("score_pctl")*100) + 1))
             )
  df_reg22 = (df_reg2
              .groupBy("pctl_cat")
              .agg(f.mean(target).alias("pctl_target_rate"),\
                   (f.sum(target)).alias("pctl_target_total"),\
                   (f.count(mbr_id)).alias("total_count"),\
                   (f.mean("prob_score")).alias("mean_predicted_prob")
                  ,)
              .orderBy(f.col("pctl_cat"))
              )

  #Now let's create the cumulative percentiles
  cum_sum = (df_reg22
              .withColumn('cum_target', f.sum(df_reg22.pctl_target_total).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))
              .withColumn('cum_total', f.sum(df_reg22.total_count).over(Window.partitionBy().orderBy().rowsBetween(-sys.maxsize, 0)))
              .withColumn('cum_target_cap_rate',f.col("cum_target")/f.col("cum_total"))
              .withColumn('cum_target_cap_of_total',((f.col("cum_target")/tot_target))*100)
            )
  return cum_sum
  

In [0]:
cump_cap_details = get_model_stats("Logistic Regression",predict_test,eval,"row_num","top10_trgt")
display(cump_cap_details)

pctl_cat,pctl_target_rate,pctl_target_total,total_count,mean_predicted_prob,cum_target,cum_total,cum_target_cap_rate,cum_target_cap_of_total
1,1.0,5,5,0.9949912548065184,5,5,1.0,12.5
2,1.0,4,4,0.9856249392032624,9,9,1.0,22.5
3,1.0,4,4,0.9693315327167512,13,13,1.0,32.5
4,1.0,4,4,0.9497404098510742,17,17,1.0,42.5
5,1.0,4,4,0.8989026248455048,21,21,1.0,52.5
6,1.0,5,5,0.8513014674186706,26,26,1.0,65.0
7,1.0,4,4,0.8082806766033173,30,30,1.0,75.0
8,0.75,3,4,0.6996464878320694,33,34,0.9705882352941176,82.5
9,0.75,3,4,0.5973019450902939,36,38,0.9473684210526316,90.0
10,0.25,1,4,0.5096063390374184,37,42,0.8809523809523809,92.5


In [0]:
# Create a glm with binomial and logit to mimic logictic regression
from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(featuresCol="features", labelCol="top10_trgt",family="binomial", link="logit", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(vecTrainDF)
predict_train_glr = model.transform(vecTrainDF)

eval_glr = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "top10_trgt")

auc_train_glr = eval_glr.evaluate(predict_train_glr)
print(auc_train_glr)
print(model.summary)

# Print the coefficients and intercept for generalized linear regression model
predict_test_glr = model.transform(vecTestDF) #make predictions using the trained model
auc_test_glr = eval_glr.evaluate(predict_test_glr)
print(auc_test_glr)

##This code is used for regularization

In [0]:
# Create linear regression
lr2 = LogisticRegression(featuresCol="features", labelCol="top10_trgt", regParam=0, elasticNetParam=0.0)
# Fit the linear regresssion model
lrModel2 = lr2.fit(vecTrainDF)
predict_train2 = lrModel2.transform(vecTrainDF)

# Make predictions on testing dataset
predict_test2 = lrModel2.transform(vecTestDF) #make predictions using the trained model

In [0]:
eval = BinaryClassificationEvaluator(labelCol = "top10_trgt")

auc_train2 = eval.evaluate(predict_train2)
print(auc_train2)
auc_test2 = eval.evaluate(predict_test2)
print(auc_test2)

print("Training AUROC", lrModel.summary.areaUnderROC)
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [0]:
# Create lasso logistic regression
lr_lasso = LogisticRegression(featuresCol="features", labelCol="top10_trgt", regParam=0.1, elasticNetParam=1)
# Fit the linear regresssion model
lrModel_lasso = lr_lasso.fit(vecTrainDF)
predict_train31 = lrModel_lasso.transform(vecTrainDF)

# Make predictions on testing dataset
predict_test31 = lrModel_lasso.transform(vecTestDF) #make predictions using the trained model

auc_train31 = eval.evaluate(predict_train31)
print(auc_train31)
auc_test31 = eval.evaluate(predict_test31)
print(auc_test31)

print("Training AUROC", lrModel_lasso.summary.areaUnderROC)
print("Coefficients: " + str(lrModel_lasso.coefficients))
print("Intercept: " + str(lrModel_lasso.interceptVector))

In [0]:
# Create ridge:L2 logistic regression
lr_ridge = LogisticRegression(featuresCol="features", labelCol="top10_trgt", regParam=0.1, elasticNetParam=0)
# Fit the linear regresssion model
lrModel_ridge = lr_ridge.fit(vecTrainDF)
predict_train32 = lrModel_ridge.transform(vecTrainDF)

# Make predictions on testing dataset
predict_test32 = lrModel_ridge.transform(vecTestDF) #make predictions using the trained model

auc_train32 = eval.evaluate(predict_train32)
print(auc_train32)
auc_test32 = eval.evaluate(predict_test32)
print(auc_test32)

print("Training AUROC", lrModel_ridge.summary.areaUnderROC)
print("Coefficients: " + str(lrModel_ridge.coefficients))
print("Intercept: " + str(lrModel_ridge.interceptVector))

In [0]:
# Create elastic net L1+L2 logistic regression
lr_elastnet = LogisticRegression(featuresCol="features", labelCol="top10_trgt", regParam=0.1, elasticNetParam=0.1)
# Fit the regresssion model
lrModel_elastnet = lr_elastnet.fit(vecTrainDF)
predict_train33 = lrModel_elastnet.transform(vecTrainDF)

# Make predictions on testing dataset
predict_test33 = lrModel_elastnet.transform(vecTestDF) #make predictions using the trained model

auc_train33 = eval.evaluate(predict_train33)
print(auc_train33)
auc_test33 = eval.evaluate(predict_test33)
print(auc_test33)

print("Training AUROC", lrModel_elastnet.summary.areaUnderROC)
print("Coefficients: " + str(lrModel_elastnet.coefficients))
print("Intercept: " + str(lrModel_elastnet.interceptVector))