In [1]:
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark
from pyspark.sql.functions import *

role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
    
spark

In [110]:
bucket = 'mrinal-ml-sagemaker'
import os
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.mllib.util import MLUtils

In [3]:
cdc = spark.read.csv('output.csv', header = True)

In [4]:
cdc = cdc.drop('detail_age_type','detail_age', 'age_substitution_flag','age_recode_27', 'age_recode_12', 'infant_age_recode_22', 'icd_code_10th_revision')

In [5]:
cdc = cdc.drop('record_condition_20' , 'entity_condition_20', 'entity_condition_19', 'entity_condition_18', 'entity_condition_17', 'record_condition_19', 'record_condition_18', 'record_condition_16', 'record_condition_17','record_condition_15','record_condition_14','record_condition_13','record_condition_12','record_condition_11','record_condition_10','record_condition_9','record_condition_8','record_condition_7' , 'record_condition_6', 'record_condition_5', 'entity_condition_16', 'entity_condition_15','entity_condition_14', 'entity_condition_13', 'entity_condition_12', 'entity_condition_11', 'entity_condition_10', 'entity_condition_9','entity_condition_8', 'entity_condition_7', 'entity_condition_6', 'entity_condition_5')

In [6]:
cdc.printSchema()

root
 |-- resident_status: string (nullable = true)
 |-- education_1989_revision: string (nullable = true)
 |-- education_2003_revision: string (nullable = true)
 |-- education_reporting_flag: string (nullable = true)
 |-- month_of_death: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age_recode_52: string (nullable = true)
 |-- place_of_death_and_decedents_status: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- day_of_week_of_death: string (nullable = true)
 |-- current_data_year: string (nullable = true)
 |-- injury_at_work: string (nullable = true)
 |-- manner_of_death: string (nullable = true)
 |-- method_of_disposition: string (nullable = true)
 |-- autopsy: string (nullable = true)
 |-- activity_code: string (nullable = true)
 |-- place_of_injury_for_causes_w00_y34_except_y06_and_y07_: string (nullable = true)
 |-- 358_cause_recode: string (nullable = true)
 |-- 113_cause_recode: string (nullable = true)
 |-- 130_infant_cause_recode: 

In [7]:
cdc = cdc.fillna({'place_of_injury_for_causes_w00_y34_except_y06_and_y07_' : '10'})
cdc = cdc.fillna({'130_infant_cause_recode' : '000'})
cdc = cdc.fillna({'activity_code' : '10'})
cdc = cdc.fillna({'manner_of_death' : '8'})
cdc = cdc.fillna({'Place_of_death_and_decedents_status': '7'})

In [8]:
cdc = cdc.withColumn('method_of_disposition', regexp_replace('method_of_disposition', 'R' , 'O'))
cdc = cdc.withColumn('method_of_disposition', regexp_replace('method_of_disposition', 'E' , 'O'))
cdc = cdc.withColumn('method_of_disposition', regexp_replace('method_of_disposition', 'D' , 'O'))
cdc = cdc.withColumn('method_of_disposition', regexp_replace('method_of_disposition', 'U' , 'O'))
cdc = cdc.withColumn('Place_of_death_and_decedents_status', regexp_replace('Place_of_death_and_decedents_status', '9' , '7'))

In [9]:
#cdc = cdc.withColumn('Place_of_death_and_decedents_status', 
 #                    when(cdc['Place_of_death_and_decedents_status']== 9 , 7).otherwise(cdc['Place_of_death_and_decedents_status']))

In [10]:
#cdc.select('Place_of_death_and_decedents_status').distinct().show()

In [11]:
#cdc.select('place_of_injury_for_causes_w00_y34_except_y06_and_y07_').distinct().show()


In [12]:
#cdc.select("education_1989_revision").distinct().show()

In [13]:
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '00' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '01' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '02' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '03' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '04' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '05' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '06' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '07' , '1'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '08' , '1'))

cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '09' , '2'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '10' , '2'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '11' , '2'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '12' , '3'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '13' , '4'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '14' , '4'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '15' , '5'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '16' , '6'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '17' , '7'))
cdc = cdc.withColumn('education_1989_revision', regexp_replace('education_1989_revision', '99' , '9'))



In [14]:
cdc = cdc.withColumn("education_2003_revision", coalesce(cdc.education_2003_revision,cdc.education_1989_revision))

In [15]:
cdc = cdc.filter(cdc.education_2003_revision.isNotNull())

In [16]:
cdc.groupBy('education_2003_revision').count().orderBy('count', ascending=False).show()

+-----------------------+--------+
|education_2003_revision|   count|
+-----------------------+--------+
|                      3|11329932|
|                      1| 3805467|
|                      4| 3293354|
|                      2| 3226815|
|                      6| 2480719|
|                      5| 1144604|
|                      7| 1106055|
|                      9| 1024952|
|                      8|  308775|
+-----------------------+--------+



In [17]:
cdc.printSchema()

root
 |-- resident_status: string (nullable = true)
 |-- education_1989_revision: string (nullable = true)
 |-- education_2003_revision: string (nullable = true)
 |-- education_reporting_flag: string (nullable = true)
 |-- month_of_death: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age_recode_52: string (nullable = true)
 |-- Place_of_death_and_decedents_status: string (nullable = false)
 |-- marital_status: string (nullable = true)
 |-- day_of_week_of_death: string (nullable = true)
 |-- current_data_year: string (nullable = true)
 |-- injury_at_work: string (nullable = true)
 |-- manner_of_death: string (nullable = false)
 |-- method_of_disposition: string (nullable = true)
 |-- autopsy: string (nullable = true)
 |-- activity_code: string (nullable = false)
 |-- place_of_injury_for_causes_w00_y34_except_y06_and_y07_: string (nullable = false)
 |-- 358_cause_recode: string (nullable = true)
 |-- 113_cause_recode: string (nullable = true)
 |-- 130_infant_cause_reco

In [18]:
cdc = cdc.drop('education_1989_revision', 'education_reporting_flag','bridged_race_flag', 'race_imputation_flag', 
               'hispanic_origin','entity_condition_1','entity_condition_2','entity_condition_3','entity_condition_4',
              'record_condition_1','record_condition_2','record_condition_3','record_condition_4')

In [114]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

In [21]:
from pyspark.sql.types import DoubleType
cdc = cdc.withColumn("number_of_entity_axis_conditions", cdc["number_of_entity_axis_conditions"].cast(DoubleType()))
cdc = cdc.withColumn("number_of_record_axis_conditions", cdc["number_of_record_axis_conditions"].cast(DoubleType()))

In [42]:
indexer = StringIndexer(inputCol="resident_status", outputCol="resident_statusIndex").fit(cdc)
new_df = indexer.transform(cdc)
indexer = StringIndexer(inputCol="education_2003_revision", outputCol="education_2003_revisionIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="month_of_death", outputCol="month_of_deathIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="sex", outputCol="sexIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="age_recode_52", outputCol="age_recode_52Index").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="Place_of_death_and_decedents_status", outputCol="Place_of_death_and_decedents_statusIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="marital_status", outputCol="marital_statusIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="day_of_week_of_death", outputCol="day_of_week_of_deathIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="current_data_year", outputCol="current_data_yearIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="injury_at_work", outputCol="injury_at_workIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="manner_of_death", outputCol="manner_of_deathIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="autopsy", outputCol="autopsyIndex").fit(new_df)
new_df = indexer.transform(new_df)

In [47]:
indexer = StringIndexer(inputCol="activity_code", outputCol="activity_codeIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="place_of_injury_for_causes_w00_y34_except_y06_and_y07_", outputCol="place_of_injury_for_causes_w00_y34_except_y06_and_y07_Index").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="358_cause_recode", outputCol="358_cause_recodeIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="113_cause_recode", outputCol="113_cause_recodeIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="130_infant_cause_recode", outputCol="130_infant_cause_recodeIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="39_cause_recode", outputCol="39_cause_recodeIndex").fit(new_df)
new_df = indexer.transform(new_df)
indexer = StringIndexer(inputCol="race", outputCol="raceIndex").fit(new_df)
new_df =indexer.transform(new_df)
indexer = StringIndexer(inputCol="race_recode_3", outputCol="race_recode_3Index").fit(new_df)
new_df =indexer.transform(new_df)

In [48]:
new_df.printSchema()

root
 |-- resident_status: string (nullable = true)
 |-- education_2003_revision: string (nullable = true)
 |-- month_of_death: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age_recode_52: string (nullable = true)
 |-- Place_of_death_and_decedents_status: string (nullable = false)
 |-- marital_status: string (nullable = true)
 |-- day_of_week_of_death: string (nullable = true)
 |-- current_data_year: string (nullable = true)
 |-- injury_at_work: string (nullable = true)
 |-- manner_of_death: string (nullable = false)
 |-- method_of_disposition: string (nullable = true)
 |-- autopsy: string (nullable = true)
 |-- activity_code: string (nullable = false)
 |-- place_of_injury_for_causes_w00_y34_except_y06_and_y07_: string (nullable = false)
 |-- 358_cause_recode: string (nullable = true)
 |-- 113_cause_recode: string (nullable = true)
 |-- 130_infant_cause_recode: string (nullable = false)
 |-- 39_cause_recode: string (nullable = true)
 |-- number_of_entity_axis_conditi

In [51]:
new_df1 = new_df.drop('resident_status','education_2003_revision','month_of_death','sex','age_recode_52','Place_of_death_and_decedents_status',
                     'marital_status','day_of_week_of_death','current_data_year','injury_at_work','manner_of_death','method_of_disposition',
                     'autopsy','activity_code','activity_code','place_of_injury_for_causes_w00_y34_except_y06_and_y07_','358_cause_recode',
                     '113_cause_recode','130_infant_cause_recode','39_cause_recode','race','race_recode_3','race_recode_5','hispanic_originrace_recode')

In [52]:
new_df1.printSchema().

root
 |-- number_of_entity_axis_conditions: double (nullable = true)
 |-- number_of_record_axis_conditions: double (nullable = true)
 |-- resident_statusIndex: double (nullable = true)
 |-- education_2003_revisionIndex: double (nullable = true)
 |-- month_of_deathIndex: double (nullable = true)
 |-- sexIndex: double (nullable = true)
 |-- age_recode_52Index: double (nullable = true)
 |-- Place_of_death_and_decedents_statusIndex: double (nullable = true)
 |-- marital_statusIndex: double (nullable = true)
 |-- day_of_week_of_deathIndex: double (nullable = true)
 |-- current_data_yearIndex: double (nullable = true)
 |-- injury_at_workIndex: double (nullable = true)
 |-- manner_of_deathIndex: double (nullable = true)
 |-- autopsyIndex: double (nullable = true)
 |-- method_of_dispositionIndex: double (nullable = true)
 |-- activity_codeIndex: double (nullable = true)
 |-- place_of_injury_for_causes_w00_y34_except_y06_and_y07_Index: double (nullable = true)
 |-- 358_cause_recodeIndex: double

In [56]:
new_df1 = new_df1.rdd

In [90]:
d = new_df1.map(lambda line: LabeledPoint(line[14],[line[0:13]+line[15:66]]))

In [91]:
d.take(4)

[LabeledPoint(1.0, [1.0,1.0,0.0,3.0,0.0,0.0,10.0,0.0,0.0,2.0,7.0,0.0,0.0,0.0,0.0,62.0,52.0,0.0,4.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,1.0,0.0,2.0,0.0,1.0,6.0,0.0,2.0,0.0,7.0,0.0,0.0,0.0,0.0,57.0,47.0,0.0,5.0,0.0,0.0]),
 LabeledPoint(1.0, [5.0,5.0,0.0,0.0,0.0,0.0,2.0,2.0,2.0,5.0,7.0,0.0,0.0,0.0,0.0,24.0,3.0,0.0,6.0,0.0,0.0]),
 LabeledPoint(1.0, [4.0,4.0,0.0,0.0,0.0,1.0,8.0,0.0,3.0,4.0,7.0,0.0,0.0,0.0,0.0,6.0,8.0,0.0,9.0,0.0,0.0])]

In [66]:
new_df1.take(4)

[Row(number_of_entity_axis_conditions=1.0, number_of_record_axis_conditions=1.0, resident_statusIndex=0.0, education_2003_revisionIndex=3.0, month_of_deathIndex=0.0, sexIndex=0.0, age_recode_52Index=10.0, Place_of_death_and_decedents_statusIndex=0.0, marital_statusIndex=0.0, day_of_week_of_deathIndex=2.0, current_data_yearIndex=7.0, injury_at_workIndex=0.0, manner_of_deathIndex=0.0, autopsyIndex=0.0, method_of_dispositionIndex=1.0, activity_codeIndex=0.0, place_of_injury_for_causes_w00_y34_except_y06_and_y07_Index=0.0, 358_cause_recodeIndex=62.0, 113_cause_recodeIndex=52.0, 130_infant_cause_recodeIndex=0.0, 39_cause_recodeIndex=4.0, raceIndex=0.0, race_recode_3Index=0.0),
 Row(number_of_entity_axis_conditions=1.0, number_of_record_axis_conditions=1.0, resident_statusIndex=0.0, education_2003_revisionIndex=2.0, month_of_deathIndex=0.0, sexIndex=1.0, age_recode_52Index=6.0, Place_of_death_and_decedents_statusIndex=0.0, marital_statusIndex=2.0, day_of_week_of_deathIndex=0.0, current_data_

In [122]:
training, test = d.randomSplit([0.7, 0.3], seed=11)

# Run training algorithm to build the model
model = LogisticRegressionWithLBFGS.train(training, numClasses =3, iterations = 2)

In [123]:
# Compute raw scores on the test set
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

In [125]:
# Instantiate metrics object
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)




Summary Stats
Precision = 0.4487840844840647
Recall = 0.4487840844840647
F1 Score = 0.4487840844840647


