In [3]:
%%configure -f
{"executorMemory": "2g",
"executorCores": 3,
"numExecutors": 2}


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1544305840468_0002,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1544305840468_0002,pyspark,idle,Link,Link,✔


# Predicting Mortality with  Simulated Medicare&Medicaid Data

## Data Source
https://www.cms.gov/Research-Statistics-Data-and-Systems/Downloadable-Public-Use-Files/SynPUFs/DESample01.html 

Please find data description in the GitHub repository https://github.com/jfan3/docker-project.git 


In [34]:
from pyspark.sql import *
from pyspark.sql import DataFrameStatFunctions as statFunc
import sys
from pyspark.sql.window import Window
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk import tokenize
import numpy as np
from pyspark.sql.types import FloatType

from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.feature import PCA, RFormula, VectorIndexer, VectorAssembler

from pyspark.ml import Pipeline,PipelineModel

import numpy as np
import pandas as pd

In [5]:
sc.setJobGroup("Read_CSV","")

prescription = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://final-project-data/DE1_0_2008_to_2010_Prescription_Drug_Events_Sample_1.csv')
bene_2008 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://final-project-data/DE1_0_2008_Beneficiary_Summary_File_Sample_1.csv')
bene_2009 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://final-project-data/DE1_0_2009_Beneficiary_Summary_File_Sample_1.csv')
bene_2010 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://final-project-data/DE1_0_2010_Beneficiary_Summary_File_Sample_1.csv')



In [6]:
bene_2008 = bene_2008.withColumn("mort", when(bene_2008.BENE_DEATH_DT.isNotNull(), 1).otherwise(0))
bene_2009 = bene_2009.withColumn("mort", when(bene_2009.BENE_DEATH_DT.isNotNull(), 1).otherwise(0))
bene_2010 = bene_2010.withColumn("mort", when(bene_2010.BENE_DEATH_DT.isNotNull(), 1).otherwise(0))


### Constructing variable for next year's mortality

In [7]:
m_2009 = bene_2009.filter(bene_2009["mort"] == 1).select('DESYNPUF_ID').collect()
m_2010 = bene_2010.filter(bene_2010["mort"] == 1).select('DESYNPUF_ID').collect()
m_2009 = [i.DESYNPUF_ID for i in m_2009]
m_2010 = [i.DESYNPUF_ID for i in m_2010]

In [8]:
bene_2008 = bene_2008.withColumn("mort_next_temp", bene_2008.DESYNPUF_ID.isin(m_2009))
bene_2008 = bene_2008.withColumn("mort_next", when(bene_2008.mort_next_temp==True,1).otherwise(0))
bene_2009 = bene_2009.withColumn("mort_next_temp", bene_2009.DESYNPUF_ID.isin(m_2010))
bene_2009 = bene_2009.withColumn("mort_next", when(bene_2009.mort_next_temp==True,1).otherwise(0))


In [9]:
bene_2008.columns

['DESYNPUF_ID', 'BENE_BIRTH_DT', 'BENE_DEATH_DT', 'BENE_SEX_IDENT_CD', 'BENE_RACE_CD', 'BENE_ESRD_IND', 'SP_STATE_CODE', 'BENE_COUNTY_CD', 'BENE_HI_CVRAGE_TOT_MONS', 'BENE_SMI_CVRAGE_TOT_MONS', 'BENE_HMO_CVRAGE_TOT_MONS', 'PLAN_CVRG_MOS_NUM', 'SP_ALZHDMTA', 'SP_CHF', 'SP_CHRNKIDN', 'SP_CNCR', 'SP_COPD', 'SP_DEPRESSN', 'SP_DIABETES', 'SP_ISCHMCHT', 'SP_OSTEOPRS', 'SP_RA_OA', 'SP_STRKETIA', 'MEDREIMB_IP', 'BENRES_IP', 'PPPYMT_IP', 'MEDREIMB_OP', 'BENRES_OP', 'PPPYMT_OP', 'MEDREIMB_CAR', 'BENRES_CAR', 'PPPYMT_CAR', 'mort', 'mort_next_temp', 'mort_next']

In [10]:
to_merge = prescription.groupBy('DESYNPUF_ID').agg(sum('TOT_RX_CST_AMT').alias('TOTAL_DRUG_EXP'))

In [11]:
bene2008 = bene_2008.join(to_merge, how='left', on='DESYNPUF_ID')

In [12]:
bene2008.count()

116352

In [13]:
bene2009 = bene_2009.join(to_merge, how='left', on='DESYNPUF_ID')

In [14]:
bene2009.count()

114538

### Constructing variable for this year's total drug expenditure

In [15]:
df09 = bene2009.select('mort_next','BENE_HMO_CVRAGE_TOT_MONS', 'SP_CNCR', 'TOTAL_DRUG_EXP')
df09=df09.na.fill({'TOTAL_DRUG_EXP': 0})

In [16]:
features = ['BENE_HMO_CVRAGE_TOT_MONS', 'SP_CNCR', 'TOTAL_DRUG_EXP']
assembler = VectorAssembler(inputCols=features,outputCol="features")
df09_new = assembler.transform(df09)
df09_new.select("features", "mort_next").show(truncate=False)


+------------------+---------+
|features          |mort_next|
+------------------+---------+
|[0.0,2.0,0.0]     |1        |
|[0.0,2.0,5140.0]  |0        |
|[0.0,2.0,1110.0]  |0        |
|[5.0,1.0,350.0]   |0        |
|[12.0,2.0,1050.0] |0        |
|[12.0,2.0,0.0]    |0        |
|[0.0,2.0,7350.0]  |0        |
|[12.0,2.0,10250.0]|0        |
|[12.0,2.0,0.0]    |0        |
|[0.0,2.0,0.0]     |0        |
|[12.0,2.0,740.0]  |0        |
|[0.0,2.0,0.0]     |0        |
|[12.0,2.0,9740.0] |0        |
|[0.0,2.0,100.0]   |0        |
|[0.0,2.0,1880.0]  |0        |
|[0.0,2.0,40.0]    |0        |
|[0.0,2.0,350.0]   |0        |
|[0.0,2.0,1480.0]  |0        |
|[0.0,2.0,4260.0]  |0        |
|[12.0,2.0,1230.0] |0        |
+------------------+---------+
only showing top 20 rows

In [17]:
formula = RFormula(formula = "mort_next ~ BENE_HMO_CVRAGE_TOT_MONS + SP_CNCR + TOTAL_DRUG_EXP",featuresCol="features",
    labelCol="label")
RForm_model = formula.fit(bene2009)

In [18]:
df09_new=df09_new.withColumnRenamed("mort_next", "label")
bene2009=bene2009.withColumnRenamed("mort_next", "label")

In [19]:
lr = LogisticRegression(maxIter=10, regParam=0.001)
lrModel = lr.fit(df09_new)


In [20]:
pipeline = Pipeline(stages=[RForm_model,lr])
pip_model = pipeline.fit(df09)


In [21]:
trainingSummary = lrModel.summary
print(dir(trainingSummary))
trainingSummary.accuracy

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_call_java', '_create_from_java_class', '_java_obj', '_new_java_array', '_new_java_obj', 'accuracy', 'areaUnderROC', 'fMeasureByLabel', 'fMeasureByThreshold', 'falsePositiveRateByLabel', 'featuresCol', 'labelCol', 'labels', 'objectiveHistory', 'pr', 'precisionByLabel', 'precisionByThreshold', 'predictionCol', 'predictions', 'probabilityCol', 'recallByLabel', 'recallByThreshold', 'roc', 'totalIterations', 'truePositiveRateByLabel', 'weightedFMeasure', 'weightedFalsePositiveRate', 'weightedPrecision', 'weightedRecall', 'weightedTruePositiveRate']
0.9837346557474376

In [22]:
trainingSummary.recallByLabel

[1.0, 0.0]

## The logistic regression model has accuracy of 0.98 and recall rate of 1

In [23]:
data = spark.createDataFrame([[5.0,1.0,35000.0]], ["BENE_HMO_CVRAGE_TOT_MONS", "SP_CNCR", "TOTAL_DRUG_EXP"])

pip_model.transform(data).select("prediction").show()


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 36916)
----------------------------------------
+----------+
|prediction|
+----------+
|       0.0|
+----------+

Traceback (most recent call last):
  File "/usr/lib64/python3.4/socketserver.py", line 305, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.4/socketserver.py", line 331, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.4/socketserver.py", line 673, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 263, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 238, in poll
    if func():
  File "/usr/lib/spark/p

In [24]:
data_2 = assembler.transform(data)
lrModel.transform(data_2).select("prediction").show()


+----------+
|prediction|
+----------+
|       0.0|
+----------+

### Save the models locally and in S3

In [39]:
pip_model.save("models/pipeline_model")
pip_model.save("s3://final-project-data/models/pipeline_model")

In [42]:
lrModel.save("/models/lrModel")
lrModel.save("s3://final-project-data/models/lrModel")

In [40]:
read_pip = PipelineModel.load("s3://final-project-data/models/pipeline_model")

In [41]:
read_pip.transform(data).select("prediction").show()


+----------+
|prediction|
+----------+
|       0.0|
+----------+

In [35]:
read_lrModel = LogisticRegressionModel.load("lrModel")

In [36]:
read_lrModel.transform(data_2).select("prediction").show()


+----------+
|prediction|
+----------+
|       0.0|
+----------+