In [1]:
#DATABRICKS Visualizations and Explorations
import os
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql.functions import col, udf

In [2]:
#Reading all the 10 years CSV
year2005 = (spark.read.csv(path='/FileStore/tables/2005_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2006 = (spark.read.csv(path='/FileStore/tables/2006_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2007 = (spark.read.csv(path='/FileStore/tables/2007_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2008 = (spark.read.csv(path='/FileStore/tables/2008_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2009 = (spark.read.csv(path='/FileStore/tables/2009_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2010 = (spark.read.csv(path='/FileStore/tables/2010_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2011 = (spark.read.csv(path='/FileStore/tables/2011_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2012 = (spark.read.csv(path='/FileStore/tables/2012_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2013 = (spark.read.csv(path='/FileStore/tables/2013_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2014 = (spark.read.csv(path='/FileStore/tables/2014_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())
year2015 = (spark.read.csv(path='/FileStore/tables/2015_data.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())


In [3]:
# Merging all 11 years data into dataframe
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

MergeData=unionAll(year2005, year2006, year2007, year2008, year2009, year2010, year2011, year2012, year2013, year2014, year2015)

In [4]:
#Count of Merged Data
MergeData.count()

In [5]:
# Create SQL Table from data frame
from pyspark.sql import Row
MergeData.registerTempTable("mergedTable")

In [6]:
# Visualization-Male vs female deaths by resident status
%sql select resident_status,sex, count(sex) from mergedTable group by resident_status,sex order by resident_status,sex

resident_status,sex,count(sex)
1,F,11419032
1,M,11030268
2,F,2027162
2,M,2288859
3,F,398187
3,M,506845
4,F,16495
4,M,33825


In [7]:
#Visualization-Male vs female deaths by month of the year

In [8]:

%sql select month_of_death,sex, count(sex) from mergedTable group by month_of_death,sex order by month_of_death,sex

month_of_death,sex,count(sex)
1,F,1302416
1,M,1269199
2,F,1177388
2,M,1147291
3,F,1262972
3,M,1231322
4,F,1150376
4,M,1145743
5,F,1134538
5,M,1144327


In [9]:
#Read Disease description CSV (ICD10)

In [10]:
icd10= (spark.read.csv(path='/FileStore/tables/ICD10.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())

In [11]:
#Create SQL Table of diseas description

In [12]:

icd10.registerTempTable("icd10")


In [13]:
#Top 20 diseases causing death for either sex

In [14]:
%sql select  m.sex,m.icd_code_10th_revision,i.description, count(m.sex) from mergedTable m,icd10 i where i.code=m.icd_code_10th_revision group by i.description,m.icd_code_10th_revision,m.sex order by count(m.sex) DESC,m.sex  LIMIT 20

sex,icd_code_10th_revision,description,count(sex)
M,I219,"Acute myocardial infarction, unspecified",765648
F,G309,"Alzheimer's disease, unspecified",626180
F,I219,"Acute myocardial infarction, unspecified",622679
F,J449,"Chronic obstructive pulmonary disease, unspecified",602655
M,J449,"Chronic obstructive pulmonary disease, unspecified",534241
M,C61,Malignant neoplasm of prostate,311877
F,J189,"Pneumonia, unspecified organism Y",276400
M,G309,"Alzheimer's disease, unspecified",270020
M,J189,"Pneumonia, unspecified organism Y",236009
M,C189,"Malignant neoplasm of colon, unspecified",228669


In [15]:
#Top 20 diseases causing deaths

In [16]:
%sql select  m.icd_code_10th_revision,i.description, count(m.icd_code_10th_revision) 
from mergedTable m,icd10 i where i.code=m.icd_code_10th_revision group by i.description,m.icd_code_10th_revision 
order by count(m.icd_code_10th_revision) DESC LIMIT 20

icd_code_10th_revision,description,count(icd_code_10th_revision)
I219,"Acute myocardial infarction, unspecified",1388327
J449,"Chronic obstructive pulmonary disease, unspecified",1136896
G309,"Alzheimer's disease, unspecified",896200
J189,"Pneumonia, unspecified organism Y",512409
C189,"Malignant neoplasm of colon, unspecified",451882
C259,"Malignant neoplasm of pancreas, unspecified",403164
A419,"Sepsis, unspecified organism",368875
C61,Malignant neoplasm of prostate,311877
I119,Hypertensive heart disease without heart failure,251086
G20,Parkinson's disease,246814


In [17]:
#Method of Disposition

In [18]:
%sql SELECT current_data_year AS Year,CASE method_of_disposition WHEN 'C' THEN 'Cremation' WHEN 'B' THEN 'Burial' WHEN 'D' THEN 'Donation' WHEN 'E' THEN 'Entombment' WHEN 'O' THEN 'Other' WHEN 'R' THEN 'RemovedFromUSA' WHEN 'U' THEN 'Unknown'END AS MethodofDisposition,COUNT(*) AS Count FROM mergedTable GROUP BY 1, 2 ORDER BY 1, 3

Year,MethodofDisposition,Count
2005,Other,2199
2005,Donation,4795
2005,Entombment,21247
2005,RemovedFromUSA,31954
2005,Cremation,350018
2005,Burial,553202
2005,Unknown,1489091
2006,Other,2252
2006,Donation,6883
2006,Entombment,23412


In [19]:
#Manner of death per month

In [20]:
%sql SELECT month_of_death AS Month, CASE manner_of_death WHEN '0' THEN 'Not Specified' WHEN '1' THEN 'Accident' WHEN '2' THEN 'Suicide' WHEN '3' THEN 'Homicide' WHEN '4' THEN 'Pending investigation' WHEN '5' THEN 'Could not be determine' WHEN '6' THEN 'Self-Inflicted' WHEN '7' THEN 'Natural' ELSE 'OTHER' END AS MannerOfDeath,COUNT(*) AS Count FROM mergedTable GROUP BY 1, 2 ORDER BY 1,2
  

Month,MannerOfDeath,Count
1,Accident,110563
1,Could not be determine,10808
1,Homicide,16172
1,Natural,1859004
1,OTHER,536266
1,Pending investigation,4261
1,Suicide,34541
2,Accident,101932
2,Could not be determine,9667
2,Homicide,13474


In [21]:
#Dropping Columns

In [22]:
MergeData.columns
MergeData = MergeData.drop('record_condition_1','record_condition_2','record_condition_3','record_condition_4','record_condition_5','record_condition_6','record_condition_7','record_condition_8','record_condition_9','record_condition_10','record_condition_11','record_condition_12','record_condition_13','record_condition_14','record_condition_15','record_condition_16','record_condition_17','record_condition_18','record_condition_19','record_condition_20')
 

In [23]:
MergeData = MergeData.drop('113_cause_recode', '130_infant_cause_recode','39_cause_recode','number_of_entity_axis_conditions','entity_condition_1','entity_condition_2','entity_condition_3','entity_condition_4','entity_condition_5','entity_condition_6','entity_condition_7','entity_condition_8','entity_condition_9','entity_condition_10','entity_condition_11','entity_condition_12','entity_condition_13','entity_condition_14','entity_condition_15','entity_condition_16','entity_condition_17','entity_condition_18','entity_condition_19')

In [24]:
MergeData = MergeData.drop('icd_code_10th_revision', 'age_recode_27', 'age_recode_12','detail_age','entity_condition_20','education_2003_revision','education_1989_revision')

In [25]:
MergeData = MergeData.filter((MergeData.method_of_disposition == 'B') | (MergeData.method_of_disposition == 'C' ))

In [26]:
MergeData = MergeData.drop('detail_age_type','age_substitution_flag','age_substitution_flag','infant_age_recode_22','day_of_week_of_death','current_data_year','358_cause_recode','number_of_record_axis_conditions','hispanic_origin','race_recode_5')

In [27]:
#Columns of Merge Data

In [28]:
MergeData.columns

In [29]:
categoricalColumns = ['resident_status',
 'month_of_death',
 'sex',
 'age_recode_52',
 'place_of_death_and_decedents_status',
 'marital_status',
 'injury_at_work',
 'manner_of_death',
 'autopsy',
 'activity_code',
 'place_of_injury_for_causes_w00_y34_except_y06_and_y07_',
 'race',
 'race_recode_3',
 'hispanic_originrace_recode']

In [30]:
#Null Imputations

In [31]:
MergeData = MergeData.fillna({'place_of_injury_for_causes_w00_y34_except_y06_and_y07_': 12})
MergeData = MergeData.fillna({'activity_code': 11})
MergeData = MergeData.fillna({'manner_of_death': 999})
MergeData = MergeData.fillna({'place_of_death_and_decedents_status': 999})


In [32]:
#Pipeline- String Indexing, encoding and Vector Assembling

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [34]:
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [35]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol='method_of_disposition', outputCol="label")
stages += [label_stringIdx]

In [36]:
assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [37]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(MergeData)
preppedDataDF = pipelineModel.transform(MergeData)

In [38]:
#Choose the column that is predicted and the columns(features) that are used for prediciting

In [39]:
selectedcols = ["label", "features"]
dataset = preppedDataDF.select(selectedcols)

In [40]:
#Splitting the dataset into training set and test set

In [41]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

In [42]:
#Logistic Regression model for predicting method-of-disposition (burial vs cremation)

In [43]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [44]:
predictions = lrModel.transform(testData)

In [45]:
# Comparing the predicted value against actual value

In [46]:
selected = predictions.select("label", "prediction", "probability")

In [47]:
#Efficiency of Model

In [48]:
#Area under Curve
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [49]:
#Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName = 'accuracy')
evaluator.evaluate(predictions)

In [50]:
# Merging all 11 years data into dataframe
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

JoinedData=unionAll(year2005, year2006, year2007, year2008, year2009, year2010, year2011, year2012, year2013, year2014, year2015)
display(JoinedData)

resident_status,education_1989_revision,education_2003_revision,education_reporting_flag,month_of_death,sex,detail_age_type,detail_age,age_substitution_flag,age_recode_52,age_recode_27,age_recode_12,infant_age_recode_22,place_of_death_and_decedents_status,marital_status,day_of_week_of_death,current_data_year,injury_at_work,manner_of_death,method_of_disposition,autopsy,activity_code,place_of_injury_for_causes_w00_y34_except_y06_and_y07_,icd_code_10th_revision,358_cause_recode,113_cause_recode,130_infant_cause_recode,39_cause_recode,number_of_entity_axis_conditions,entity_condition_1,entity_condition_2,entity_condition_3,entity_condition_4,entity_condition_5,entity_condition_6,entity_condition_7,entity_condition_8,entity_condition_9,entity_condition_10,entity_condition_11,entity_condition_12,entity_condition_13,entity_condition_14,entity_condition_15,entity_condition_16,entity_condition_17,entity_condition_18,entity_condition_19,entity_condition_20,number_of_record_axis_conditions,record_condition_1,record_condition_2,record_condition_3,record_condition_4,record_condition_5,record_condition_6,record_condition_7,record_condition_8,record_condition_9,record_condition_10,record_condition_11,record_condition_12,record_condition_13,record_condition_14,record_condition_15,record_condition_16,record_condition_17,record_condition_18,record_condition_19,record_condition_20,race,bridged_race_flag,race_imputation_flag,race_recode_3,race_recode_5,hispanic_origin,hispanic_originrace_recode
1,11,,0,1,F,1,45,,35,15,7,,1,M,2,2005,U,7.0,U,N,,,C439,98,28,,15,1,11C439,,,,,,,,,,,,,,,,,,,,1,C439,,,,,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,13,,0,1,M,1,61,,38,18,8,,1,D,7,2005,U,7.0,U,N,,,J439,266,84,,28,1,11J439,,,,,,,,,,,,,,,,,,,,1,J439,,,,,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,12,,0,1,F,1,79,,41,21,10,,6,D,1,2005,U,7.0,U,N,,,I698,239,70,,24,5,11R628,21I698,61J449,62M199,63R568,,,,,,,,,,,,,,,,5,I698,J449,M199,R568,R628,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,12,,0,1,M,1,50,,36,16,7,,1,S,4,2005,U,7.0,U,N,,,E119,159,46,,16,4,11I469,61E119,62I500,63K862,,,,,,,,,,,,,,,,,4,E119,I469,I500,K862,,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,14,,0,1,F,1,68,,39,19,9,,1,M,2,2005,U,7.0,U,N,,,C349,93,27,,8,3,11C349,61F179,62J449,,,,,,,,,,,,,,,,,,3,C349,F179,J449,,,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,3,,0,1,F,1,89,,43,23,11,,6,W,7,2005,U,7.0,U,N,,,I679,239,70,,24,3,11N19,21I679,61F03,,,,,,,,,,,,,,,,,,3,F03,I679,N19,,,,,,,,,,,,,,,,,,3,,,2,3,100,8
1,12,,0,1,F,1,68,,39,19,9,,1,D,7,2005,U,7.0,U,N,,,J439,266,84,,28,1,11J439,,,,,,,,,,,,,,,,,,,,1,J439,,,,,,,,,,,,,,,,,,,,1,,,1,1,100,6
1,12,,0,1,M,1,61,,38,18,8,,1,S,6,2005,U,7.0,U,N,,,J449,267,86,,28,3,11I279,21J449,61F102,,,,,,,,,,,,,,,,,,3,F102,I279,J449,,,,,,,,,,,,,,,,,,3,,,2,3,100,8
1,14,,0,1,F,1,73,,40,20,9,,1,D,6,2005,U,7.0,U,N,,,J439,266,84,,28,1,11J439,,,,,,,,,,,,,,,,,,,,1,J439,,,,,,,,,,,,,,,,,,,,1,,,1,1,100,6
3,8,,0,1,F,1,85,,43,23,11,,7,W,6,2005,U,7.0,U,N,,,C80,125,43,,15,1,11C80,,,,,,,,,,,,,,,,,,,,1,C80,,,,,,,,,,,,,,,,,,,,1,,,1,1,100,6


In [51]:
#Replacing null values in manner_of_death

In [52]:
JoinData = JoinedData.fillna({'manner_of_death': 12})
JoinData = JoinedData.filter(JoinedData.manner_of_death == '2') 


In [53]:
#Creating joint table on new joined data which is modified
JoinData.registerTempTable("jointTable")

In [54]:
#Total suicides committed 2005-2015

In [55]:
%sql SELECT current_data_year AS Year,CASE manner_of_death WHEN '2' THEN 'Suicide' END AS TotalSuicidesCommited,COUNT(*) AS Count FROM jointTable GROUP BY 1, 2 ORDER BY 1, 3

Year,TotalSuicidesCommited,Count
2005,Suicide,32934
2006,Suicide,33562
2007,Suicide,34827
2008,Suicide,36251
2009,Suicide,37205
2010,Suicide,38710
2011,Suicide,39878
2012,Suicide,40929
2013,Suicide,41509
2014,Suicide,43139
