In [None]:
!pip3 install plotly  # For plotting data
!pip3 install scipy
!pip3 install sklearn # Machine Learning Algorithms (Logistic Regression, Random Forest, Decision Tree)
!pip3 install seaborn # Correlation Matrix

In [None]:
# Download the dataset from Kaggle
# Direct Download link to the dataset: https://www.kaggle.com/andrewmvd/heart-failure-clinical-data/download

In [None]:
# Load Dataset using Pyspark
import pyspark
df = spark.read.format("csv").option("inferSchema","true").option("header","true").load("Desktop/heart_failure_clinical_records_dataset.csv")

In [None]:
df.printSchema()

In [None]:
print(df.columns)

In [None]:
df.show(5)
print('Total records in dataset are',df.count())

In [None]:
df.groupby('DEATH_EVENT').count().toPandas()

In [None]:
## Identify if there are any Nulls or missing values (NaNs)
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

In [None]:
!pip3 install pyarrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
heart_data = df.select("*").toPandas()

In [None]:
heart_data.head()

In [None]:
heart_data.describe()

In [None]:
heart_data.shape

In [None]:
# EDA - Exploratory Data Analysis 

#Histogram for all the continuous variables
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize']=[20,20]
heart_data[['age','creatinine_phosphokinase','ejection_fraction','platelets','serum_creatinine','serum_sodium']].hist()

In [None]:
# Age Distribution

import plotly.figure_factory as ply
fig1 = ply.create_distplot([heart_data["age"]], ["age"])
fig1.update_layout(title_text='Age Distribution plot')
fig1.show()

In [None]:
# Age vs Sex Distribution Plot
import plotly.express as px
fig2 = px.histogram(heart_data,x='age',color='sex',nbins=50,title='Patient Age vs Sex Distribution')
fig2.show()

In [None]:
# Age vs Death Event Distribution Plot 
fig3 = px.histogram(heart_data,x='age',color='DEATH_EVENT',nbins=50,title="Patient's Age vs Death Event Distribution")
fig3.show()

In [None]:
# Age vs Death event box plot
fig4 = px.box (heart_data,x="DEATH_EVENT",y="age",points='all',title="Patient's Age vs Death events Box Plot")
fig4.show()

In [None]:
# Sex vs Death event Pie Chart
fig5 = px.pie(heart_data, values='DEATH_EVENT',names='sex', title="Pie Chart distribution of Death event by Gender",width=500, height=500)
fig5.show()

In [None]:
px.histogram(heart_data,x='creatinine_phosphokinase',color='DEATH_EVENT',nbins=50,title="Survival rate of patients with CPK")

In [None]:
px.pie(heart_data, values='DEATH_EVENT',names='anaemia', title="Survival rate of patients with Anaemia",width=500, height=500)

In [None]:
px.pie(heart_data, values='DEATH_EVENT',names='diabetes', title="Survival rate of patients with Diabetes",width=500, height=500)

In [None]:
px.pie(heart_data, values='DEATH_EVENT',names='high_blood_pressure', title="Survival rate of patients with High Blood pressure",width=500, height=500)

In [None]:
px.histogram(heart_data,x='ejection_fraction',color='DEATH_EVENT',nbins=50,title="Survival rate of patients with Ejection Fraction")

In [None]:
px.histogram(heart_data,x='serum_creatinine',color='DEATH_EVENT',nbins=50,title="Survival rate of patients with serum creatinine in blood")

In [None]:
px.histogram(heart_data,x='serum_sodium',color='DEATH_EVENT',nbins=50,title="Survival rate of patients with serum sodium in blood")

In [None]:
px.histogram(heart_data,x='platelets',color='DEATH_EVENT',nbins=50,title="Survival rate vs patient's platelets count")

In [None]:
px.pie(heart_data, values='DEATH_EVENT',names='smoking', title="Survival rate of patients with smoking traces",width=500, height=500)

In [None]:
# Correlation matrix to identify feature relationships and finalize top features

import seaborn as sns

hd_corr_mtrx = heart_data.corr()
ax, fig = plt.subplots(figsize=(15,15))
sns.heatmap(hd_corr_mtrx, vmin=-1, cmap='coolwarm', annot=True)
plt.show()

In [None]:
hd_corr_mtrx[abs(hd_corr_mtrx['DEATH_EVENT']) > 0.1]['DEATH_EVENT']

In [None]:
# Assemble all the features with VectorAssembler

main_features = ['age',
                 'ejection_fraction',
                 'serum_creatinine',
                 'serum_sodium',
                 'time'
                 ]

from pyspark.ml.feature import VectorAssembler

VA = VectorAssembler(inputCols=main_features, outputCol='finalfeatures')

final_df = VA.transform(df)
final_df.show(5)

In [None]:
# Split the data
(training_set, test_set) = final_df.randomSplit([0.6,0.4], seed =2000)
print("Training Dataset Count: " + str(training_set.count()))
print("Test Dataset Count: " + str(test_set.count()))

In [None]:
# Model Training

# Random Forest Classification Model

from pyspark.ml.classification import RandomForestClassifier
randomforest = RandomForestClassifier(labelCol='DEATH_EVENT',featuresCol='finalfeatures',numTrees=10)
rcf_mod = randomforest.fit(training_set)
rcf_pred = rcf_mod.transform(test_set)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MCE = MulticlassClassificationEvaluator(labelCol = 'DEATH_EVENT', metricName = 'accuracy')
accuracy_rcf = MCE.evaluate(rcf_pred)

print('Accuracy of Random Forest Model:', accuracy_rcf)
print('Testing error of the Random Forest Model:' , (1.0 - accuracy_rcf))

In [None]:
# Decision Tree Classification Model

from pyspark.ml.classification import DecisionTreeClassifier
dectree = DecisionTreeClassifier(labelCol='DEATH_EVENT',featuresCol='finalfeatures',maxDepth=10)
dtc_mod = dectree.fit(training_set)
dt_pred = dtc_mod.transform(test_set)

accuracy_dt = MCE.evaluate(dt_pred)

print('Accuracy of Random Forest Model:', accuracy_dt)
print('Testing error of the Random Forest Model:' , (1.0 - accuracy_dt))

In [None]:
# Logistic Regression Classification Model

from pyspark.ml.classification import LogisticRegression

logreg = LogisticRegression(labelCol='DEATH_EVENT',featuresCol='finalfeatures',maxIter=10)
lrc_mod = logreg.fit(training_set)
lr_pred = lrc_mod.transform(test_set)

accuracy_lr = MCE.evaluate(lr_pred)

print('Accuracy of Random Forest Model:', accuracy_lr)
print('Testing error of the Random Forest Model:' , (1.0 - accuracy_lr))

In [None]:
# Accuracy summary of all three Machine Learning Models

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MCE = MulticlassClassificationEvaluator(labelCol = 'DEATH_EVENT', metricName = 'accuracy')

print('Accuracy of Random Forest Model:', MCE.evaluate(rcf_pred))
print('Accuracy of Decision Tree Model:', MCE.evaluate(dt_pred))
print('Accuracy of Logistic Regression Model:', MCE.evaluate(lr_pred))