In [None]:

# Loads necessary supporting packages and creates HiveContext for loading Hive tables into memory
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, array, avg, approx_count_distinct, countDistinct
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer, Imputer
from pyspark.sql import functions as f
import numpy as np
import pandas as pd
import time
from datetime import datetime
from calendar import monthrange
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pyspark.sql.window import Window
import re


# Creates instance of HiveContext necessary for interacting with Hive
hive_context = HiveContext(sc)

# Creates the class needed to return error messages when pre-requisites are not met
class HaltException(Exception): pass

## Read Data of Date1

In [None]:
# read selected features in previous step

select_med_list=pd.read_csv('PROD_MED_FS_COLUMNS_FINAL_T.csv')
select_med_list=select_med_list.name.tolist()

select_rx_list=pd.read_csv('PROD_RX_FS_COLUMNS_FINAL_T.csv')
select_rx_list=select_rx_list.name.tolist()

select_col_list = select_med_list+select_rx_list
select_col_list.append('tgt_cost')
select_col_list.append('member_key')

In [None]:
# prepare data from different source

temp_df1  = hive_context.sql(
    """SELECT * 
    FROM medfeats1_data1 
   
   """
    )

temp_df2  = hive_context.sql(
    """SELECT * 
    FROM medfeats2_data1 
   
   """
    ).drop('tgt_cost')

temp_df3 = hive_context.sql(
    """SELECT * 
    FROM rxfeats_data1 
   
   """
    ).drop('tgt_cost','pgk_rx')

mbr_df  = hive_context.sql(
    """SELECT * 
    FROM mbrlist_data1
   
   """
    )

temp_df = (temp_df1.join(temp_df2,'member_key').join(temp_df3,'member_key')
           .select([temp_df1["*"]]+[c for c in temp_df2.columns if c not in ['member_key']]
                  +[c for c in temp_df3.columns if c not in ['member_key']]))

temp_df = (temp_df.select(select_col_list).withColumn("Process_Month",f.lit("data1"))
           .withColumn("raw_tgt",temp_df.tgt_cost))

In [None]:
# feature engineering

# log/sqrt transform for pmpm type variables to make the distribution Gaussian
pmpm_column_list = [col for col in temp_df.columns if ((('pmpm_' in col) | ('PMPM_' in col)) & (('_cost' in col) | ('_COST' in col)))]
pmpm_column_list.append('tgt_cost')
pmpm_cnt_list = [col for col in temp_df.columns if ((('pmpm_' in col) | ('PMPM_' in col)) & (col not in pmpm_column_list))]
pmpm_df = temp_df.select(['member_key']+[f.log(c).alias(c) for c in pmpm_column_list]+[f.sqrt(c).alias(c) for c in pmpm_cnt_list])
pmpm_df = pmpm_df.na.fill(-4.60517)
pmpm_column_list = pmpm_column_list + pmpm_cnt_list
print(len(pmpm_column_list))
temp_df=temp_df.drop(*pmpm_column_list)
print len(temp_df.columns)
temp_df = temp_df.join(pmpm_df,'member_key').select([temp_df["*"]]+[c for c in pmpm_df.columns if c not in ['member_key']])
print len(temp_df.columns)
temp_df = (temp_df.join(mbr_df,'member_key')
           .select(temp_df["*"],mbr_df["age"])
          )  

## Read Data of Date2

In [None]:
temp_df1  = hive_context.sql(
    """SELECT * 
    FROM medfeats1_Data2 
   
   """
    )

temp_df2  = hive_context.sql(
    """SELECT * 
    FROM medfeats2_Data2 
   
   """
    ).drop('MEDICARE_ID','tgt_cost')

temp_df3 = hive_context.sql(
    """SELECT * 
    FROM rxfeats_Data2 
   
   """
    ).drop('MEDICARE_ID','tgt_cost','pgk_rx')

mbr_df  = hive_context.sql(
    """SELECT * 
    FROM mbrlist_Data2
   
   """
    )

temp_df_02 = (temp_df1.join(temp_df2,'member_key').join(temp_df3,'member_key')
           .select([temp_df1["*"]]+[c for c in temp_df2.columns if c not in ['member_key']]
                  +[c for c in temp_df3.columns if c not in ['member_key']]))

temp_df_02 = (temp_df_02.select(select_col_list).withColumn("Process_Month",f.lit("Data2"))
              .withColumn("raw_tgt",temp_df_02.tgt_cost))


In [None]:
# feature engineering

# log/sqrt transform for pmpm type variables to make the distribution Gaussian

pmpm_column_list = [col for col in temp_df_02.columns if ((('pmpm_' in col) | ('PMPM_' in col)) & (('_cost' in col) | ('_COST' in col)))]
pmpm_column_list.append('tgt_cost')
pmpm_cnt_list = [col for col in temp_df_02.columns if ((('pmpm_' in col) | ('PMPM_' in col)) & (col not in pmpm_column_list))]
pmpm_df = temp_df_02.select(['member_key']+[f.log(c).alias(c) for c in pmpm_column_list]+[f.sqrt(c).alias(c) for c in pmpm_cnt_list])
pmpm_df = pmpm_df.na.fill(-4.60517)
pmpm_column_list = pmpm_column_list + pmpm_cnt_list
print(len(pmpm_column_list))
temp_df_02=temp_df_02.drop(*pmpm_column_list)
print len(temp_df_02.columns)
temp_df_02 = temp_df_02.join(pmpm_df,'member_key').select([temp_df_02["*"]]+[c for c in pmpm_df.columns if c not in ['member_key']])
print len(temp_df_02.columns)
temp_df_02 = (temp_df_02.join(mbr_df,'member_key')
           .select(temp_df_02["*"],mbr_df["age"])
          )  

## Split to Training and Testing

In [None]:
train_02_mbr_list = temp_df_02.sample(False, 0.6, seed=123).select('member_key').withColumnRenamed('member_key','pgk1')
test_02_mbr_list = temp_df_02.select('member_key').join(train_02_mbr_list,temp_df_02.member_key==train_02_mbr_list.pgk1,'left')
test_02_mbr_list = test_02_mbr_list.filter(f.col('pgk1').isNull()).drop("pgk1")

test_04_mbr_list = (temp_df.select('member_key').join(test_02_mbr_list,temp_df.member_key==test_02_mbr_list.member_key)
                    .select(temp_df.member_key).withColumnRenamed('member_key','pgk1'))
train_04_mbr_list = temp_df.select('member_key').join(test_04_mbr_list,temp_df.member_key==test_04_mbr_list.pgk1,'left')
train_04_mbr_list = train_04_mbr_list.filter(f.col('pgk1').isNull())
# print(test_04_mbr_list.count())
# print(train_04_mbr_list.count())
# print(temp_df.count())

In [None]:
train_02_temp = temp_df_02.join(train_02_mbr_list,temp_df_02.member_key == train_02_mbr_list.pgk1).select(temp_df_02["*"])
train_04_temp = temp_df.join(train_04_mbr_list,'member_key').select(temp_df["*"])

test_02_temp = temp_df_02.join(test_02_mbr_list,'member_key').select(temp_df_02["*"])
test_04_temp = temp_df.join(test_04_mbr_list,temp_df.member_key == test_04_mbr_list.pgk1).select(temp_df["*"])

In [None]:
train_temp_id_df = train_02_temp.union(train_04_temp)
train_temp_id_df = train_temp_id_df.na.fill({'RACE_CD':'Unknown'})
test_temp_id_df = test_02_temp.union(test_04_temp)
test_temp_id_df = test_temp_id_df.na.fill({'RACE_CD':'Unknown'})

## Start a ML pipeline to build different models


In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler,QuantileDiscretizer,StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline

# Character Features
char_df_final=(train_temp_id_df.select("RACE_CD","member_key"))
           
# Numeric Features
num_df_final=train_temp_id_df.select([item[0] for item in train_temp_id_df.dtypes if (item[1] != 'string')])

numlist=([c for c in num_df_final.columns if c not in ["member_key","raw_tgt","Process_Month"] ])
charlist=([c for c in char_df_final.columns if c not in ["member_key","Process_Month"]]) 

string_feature_indexers = [StringIndexer(inputCol=x, outputCol="intx_{0}".format(x)) for x in charlist]

train_final_df = Pipeline(stages=string_feature_indexers).fit(train_temp_id_df).transform(train_temp_id_df) 
test_final_df = Pipeline(stages=string_feature_indexers).fit(train_temp_id_df).transform(test_temp_id_df) 


In [None]:
labelname='tgt_cost'
numlist.remove(labelname)
all_columns = numlist + ["intx_"+x for x in charlist]

# Assembler
assembler = VectorAssembler(inputCols=[col for col in all_columns], outputCol="features")

#Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# define evaluator with different metric
evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="tgt_cost",metricName="r2")
evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="tgt_cost",metricName="rmse")

In [None]:
# extract feature importance from RF. not used here.

def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
# df_med_fs=ExtractFeatureImp(model_fs.stages[-1].featureImportances, prediction_fs, "features") 



## Model 1: RF

### Hyperparameter is not fine tuned because we are going to ensemle all models

In [None]:
rf = RandomForestRegressor(labelCol="tgt_cost", featuresCol="scaled_features", numTrees=20, maxDepth=16, maxBins = 20)
pipeline_rf = Pipeline(stages=[assembler, scaler, rf])
model_rf = pipeline_rf.fit(train_final_df)
prediction_rf=model_rf.transform(test_final_df)
train_pre_rf = model_rf.transform(train_final_df)
# train_pre_rf = model_rf.transform(train_temp_id_df)

In [None]:
# print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_rf))
# print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_rf))

print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_rf))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_rf))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_rf))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_rf))

In [None]:
rf2 = RandomForestRegressor(labelCol="tgt_cost", featuresCol="features", numTrees=50, maxDepth=16, maxBins = 10)
pipeline_rf2 = Pipeline(stages=[assembler, scaler, rf2])
model_rf2 = pipeline_rf2.fit(train_final_df)
prediction_rf2 =model_rf2.transform(test_final_df)
train_pre_rf2 = model_rf2.transform(train_final_df)

In [None]:
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_rf2))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_rf2))

print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_rf2))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_rf2))

In [None]:
# rf2 = RandomForestRegressor(labelCol="tgt_cost", featuresCol="features", numTrees=100, maxDepth=16, maxBins = 20)
# pipeline_rf2 = Pipeline(stages=[assembler, scaler, rf2])
# model_rf2 = pipeline_rf2.fit(train_final_df)
# prediction_rf2 =model_rf2.transform(test_final_df)
# train_pre_rf2 = model_rf2.transform(train_final_df)
# print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_rf2))
# print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_rf2))

# print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_rf2))
# print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_rf2))

## Model 2: Regression

In [None]:

lr = LinearRegression(featuresCol = 'features', labelCol="tgt_cost", maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline_lr = Pipeline(stages=[assembler, scaler, lr])
model_lr = pipeline_lr.fit(train_final_df)
prediction_lr = model_lr.transform(test_final_df)
train_pre_lr = model_lr.transform(train_final_df)



In [None]:
print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_lr))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_lr))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_lr))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_lr))

In [None]:
lr2 = LinearRegression(featuresCol = 'features', labelCol="tgt_cost", maxIter=20, regParam=0.01)
pipeline_lr2 = Pipeline(stages=[assembler, scaler, lr2])
model_lr2 = pipeline_lr2.fit(train_final_df)
prediction_lr2 = model_lr2.transform(test_final_df)
train_pre_lr2 = model_lr2.transform(train_final_df)
print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_lr2))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_lr2))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_lr2))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_lr2))

## Model 3: DTree

In [None]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = "tgt_cost",maxBins=10,maxDepth=10)
pipeline_dt = Pipeline(stages=[assembler, scaler, dt])
model_dt = pipeline_dt.fit(train_final_df)
prediction_dt = model_dt.transform(test_final_df)
train_pre_dt = model_dt.transform(train_final_df)

In [None]:
print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_dt))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_dt))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_dt))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_dt))

## Model 4: GBT

In [None]:
gbt = GBTRegressor(featuresCol = 'features', labelCol = "tgt_cost", maxIter=20, maxDepth=10)
pipeline_gbt = Pipeline(stages=[assembler, scaler, gbt])
model_gbt = pipeline_gbt.fit(train_final_df)
prediction_gbt = model_gbt.transform(test_final_df)
train_pre_gbt = model_gbt.transform(train_final_df)

In [None]:
print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_gbt))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_gbt))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_gbt))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_gbt))

## Final Ensemble Model

In [None]:
m1 = train_pre_rf.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('prediction','p1')
m2 = train_pre_rf2.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK2').withColumnRenamed('prediction','p2').withColumnRenamed('Process_Month','PM2')
m3 = train_pre_lr.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK3').withColumnRenamed('prediction','p3').withColumnRenamed('Process_Month','PM3')
m4 = train_pre_dt.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK4').withColumnRenamed('prediction','p4').withColumnRenamed('Process_Month','PM4')
m5 = train_pre_gbt.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK5').withColumnRenamed('prediction','p5').withColumnRenamed('Process_Month','PM5')

In [None]:
m1t = prediction_rf.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('prediction','p1').withColumnRenamed('member_key','PGK1')
m2t = prediction_rf2.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK2').withColumnRenamed('prediction','p2').withColumnRenamed('Process_Month','PM2')
m3t = prediction_lr.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK3').withColumnRenamed('prediction','p3').withColumnRenamed('Process_Month','PM3')
m4t = prediction_dt.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK4').withColumnRenamed('prediction','p4').withColumnRenamed('Process_Month','PM4')
m5t = prediction_gbt.select('prediction', "tgt_cost",'member_key','Process_Month').withColumnRenamed('member_key','PGK5').withColumnRenamed('prediction','p5').withColumnRenamed('Process_Month','PM5')

In [None]:
prediction_all_df = (m1.join(m2,(m1.member_key==m2.PGK2) & (m1.Process_Month == m2.PM2))
                     .join(m3,(m1.member_key==m3.PGK3) & (m1.Process_Month == m3.PM3))
                     .join(m4,(m1.member_key==m4.PGK4) & (m1.Process_Month == m4.PM4))
                     .join(m5,(m1.member_key==m5.PGK5) & (m1.Process_Month == m5.PM5))
                     .select(m1["*"],m2.p2, m3.p3, m4.p4, m5.p5)
                    )


prediction_all_df.count()

In [None]:
# check point save

# hive_context.sql('DROP TABLE IF EXISTS ens_df_1')
# (m1t.write.option("path", "/user/hive/warehouse/db/ens_df_1")
#        .saveAsTable("ens_df_1"))
# hive_context.sql('DROP TABLE IF EXISTS ens_df_2')
# (m2t.write.option("path", "/user/hive/warehouse/db/ens_df_2")
#        .saveAsTable("ens_df_2"))
# hive_context.sql('DROP TABLE IF EXISTS ens_df_3')
# (m3t.write.option("path", "/user/hive/warehouse/db/ens_df_3")
#        .saveAsTable("ens_df_3"))
# hive_context.sql('DROP TABLE IF EXISTS ens_df_4')
# (m4t.write.option("path", "/user/hive/warehouse/db/ens_df_4")
#        .saveAsTable("ens_df_4"))
# hive_context.sql('DROP TABLE IF EXISTS ens_df_5')
# (m5t.write.option("path", "/user/hive/warehouse/db/ens_df_5")
#        .saveAsTable("ens_df_5"))

In [None]:
# load check point

# m1t_r  = hive_context.sql(
#     """SELECT * 
#     FROM ens_df_1
   
#    """)

# m2t_r  = hive_context.sql(
#     """SELECT * 
#     FROM ens_df_2
   
#    """)

# m3t_r  = hive_context.sql(
#     """SELECT * 
#     FROM ens_df_3
   
#    """)

# m4t_r  = hive_context.sql(
#     """SELECT * 
#     FROM ens_df_4
   
#    """)

# m5t_r  = hive_context.sql(
#     """SELECT * 
#     FROM ens_df_5
   
#    """)


# testing_all_df = (m1t_r.join(m2t_r,(m1t_r.PGK1==m2t_r.PGK2) & (m1t_r.Process_Month == m2t_r.PM2),'left')
#                      .join(m3t_r,(m1t_r.PGK1==m3t_r.PGK3) & (m1t_r.Process_Month == m3t_r.PM3))
#                      .join(m4t_r,(m1t_r.PGK1==m4t_r.PGK4) & (m1t_r.Process_Month == m4t_r.PM4))
#                      .join(m5t_r,(m1t_r.PGK1==m5t_r.PGK5) & (m1t_r.Process_Month == m5t_r.PM5))
#                      .select(m1t_r["*"],m2t_r.p2, m3t_r.p3, m4t_r.p4, m5t_r.p5)
#                     )

In [None]:
vectorAssembler = VectorAssembler(inputCols = ['p1', 'p2', 'p3', 'p4','p5'], outputCol = 'features_en')
prediction_all_vdf = vectorAssembler.transform(prediction_all_df)
prediction_all_vdf = prediction_all_vdf.select(['features_en', "tgt_cost"])

In [None]:
LREN = LinearRegression(featuresCol = 'features_en', labelCol="tgt_cost",maxIter=25)
LREN_model = LREN.fit(prediction_all_vdf)
train_pre_ensmb = LREN_model.transform(prediction_all_vdf)
print("Coefficients: " + str(LREN_model.coefficients))
print("Intercept: " + str(LREN_model.intercept))

In [None]:
testing_all_vdf = vectorAssembler.transform(testing_all_df)
testing_all_vdf = testing_all_vdf.select(['features_en', "tgt_cost",'PGK1', 'Process_Month'])
prediction_ensmb = LREN_model.transform(testing_all_vdf)
print("R Squared (R2) on test data = %g" % evaluator_r2.evaluate(prediction_ensmb))
print("MSE (rmse) on test data = %g" % evaluator_mse.evaluate(prediction_ensmb))
print("R Squared (R2) on training data = %g" % evaluator_r2.evaluate(train_pre_ensmb))
print("MSE (rmse) on training data = %g" % evaluator_mse.evaluate(train_pre_ensmb))

## put members into different severity bucket

In [None]:
# stat_df = prediction_ensmb.select('tgt_cost', 'PGK1', 'Process_Month', 'prediction').toPandas()

stat_df = prediction_rf2.select('tgt_cost', 'member_key', 'Process_Month', 'prediction').toPandas()

In [None]:
stat_df['rank_actual']=stat_df['tgt_cost'].rank(ascending=False)
stat_df['perc_actual']=stat_df['rank_actual']/stat_df.shape[0]
stat_df['rank_pred']=stat_df['prediction'].rank(ascending=False)
stat_df['perc_pred']=stat_df['rank_pred']/stat_df.shape[0]
def bucket_customized(p,c1,c2,c3):
    
    if p<=c1:
        cat='High'
    elif (p>c1) & (p<=c2):
        cat = 'Med'
    elif (p>c2) & (p<=c3):
        cat = 'Low' 
    else: cat = 'MNT'
    return cat

In [None]:
stat_df['perct_act_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_actual'],0.2,0.4,0.6), axis=1)
stat_df['perct_pre_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_pred'],0.2,0.4,0.6), axis=1)

stat_df['perct_act_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_actual'],0.15,0.3,0.45), axis=1)
stat_df['perct_pre_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_pred'],0.15,0.3,0.45), axis=1)
# stat_df['perct_act_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_actual'],0.9,0.7,0.5), axis=1)
# stat_df['perct_pre_cat'] = stat_df.apply(lambda row: bucket_customized(row['perc_pred'],0.9,0.7,0.5), axis=1)

In [None]:
stat_df.groupby(["perct_act_cat", "perct_pre_cat"]).size()

In [None]:
stat_df = (stat_df.join(test_final_df, (stat_df.PGK1==test_final_df.member_key) & (stat_df.Process_Month==test_final_df.Process_Month))
           .select(stat_df["*"],test_final_df. , test_final_df. , test_final_df,)
          )

## Data visualization

In [None]:
%matplotlib inline
import seaborn as sns
import matplotlib.pyplot as plt
import math

In [None]:
evaldf_test = prediction_rf2.select(['tgt_cost','prediction'])
edf_list = [(r.tgt_cost,r.prediction) for r in evaldf_test.collect()]

In [None]:
actual = [x[0] for x in edf_list]
predicted = [x[1] for x in edf_list]
act_pred = [actual,predicted]
labels = ['actual','predicted']

In [None]:

for i in range(len(act_pred)): 
    # Draw the density plot
    sns.distplot(act_pred[i], hist = False, kde = True,
                 kde_kws = {'linewidth': 3},
                 label = labels[i])
    
# Plot formatting
plt.legend(prop={'size': 16})
plt.title('Density Plot of Actual and Predicted values')
plt.xlabel('Log_total_pmpm')
plt.ylabel('Density')

In [None]:
evaldf_test2 = prediction_rf2.select(['raw_tgt','prediction'])
edf_list2 = [(r.raw_tgt,r.prediction) for r in evaldf_test2.collect()]
actual2 = [x[0] for x in edf_list2]
predicted2 = [math.exp(x[1]) for x in edf_list2]
act_pred2 = [actual2,predicted2]
labels = ['actual','predicted']
plt.hist(act_pred2[0], bins=np.logspace(np.log10(0.1),np.log10(60000.0), 50),alpha=0.5)
plt.hist(act_pred2[1], color='r',bins=np.logspace(np.log10(0.1),np.log10(60000.0), 50),alpha=0.5)

plt.gca().set_xscale("log")
plt.show()

In [None]:
model_rf.save("rf1") 
model_rf2.save("rf2") 
model_lr.save("lr1")
model_lr2.save("lr2")
model_gbt.save("gbt")
model_dt.save("dt")