In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Source").getOrCreate()

In [4]:
from __future__ import print_function

import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE
import warnings
import math 
import pyspark.sql.functions as F

In [5]:
rawData = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("./healthcare-dataset-stroke-data.csv")
rawData.printSchema()
rawData.count()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



5110

In [6]:
rawData.groupBy('gender').count().orderBy('gender', ascending=False).show()

+------+-----+
|gender|count|
+------+-----+
| Other|    1|
|  Male| 2115|
|Female| 2994|
+------+-----+



In [7]:
rawData.groupBy('hypertension').count().orderBy('hypertension', ascending=False).show()

+------------+-----+
|hypertension|count|
+------------+-----+
|           1|  498|
|           0| 4612|
+------------+-----+



In [8]:
rawData.groupBy('heart_disease').count().orderBy('heart_disease', ascending=False).show()

+-------------+-----+
|heart_disease|count|
+-------------+-----+
|            1|  276|
|            0| 4834|
+-------------+-----+



In [9]:
rawData.groupBy('ever_married').count().orderBy('ever_married', ascending=False).show()

+------------+-----+
|ever_married|count|
+------------+-----+
|         Yes| 3353|
|          No| 1757|
+------------+-----+



In [10]:
rawData.groupBy('work_type').count().orderBy('work_type', ascending=False).show()

+-------------+-----+
|    work_type|count|
+-------------+-----+
|     children|  687|
|Self-employed|  819|
|      Private| 2925|
| Never_worked|   22|
|     Govt_job|  657|
+-------------+-----+



In [11]:
rawData.groupBy('Residence_type').count().orderBy('Residence_type', ascending=False).show()

+--------------+-----+
|Residence_type|count|
+--------------+-----+
|         Urban| 2596|
|         Rural| 2514|
+--------------+-----+



In [12]:
rawData.groupBy('smoking_status').count().orderBy('smoking_status', ascending=False).show()

+---------------+-----+
| smoking_status|count|
+---------------+-----+
|         smokes|  789|
|   never smoked| 1892|
|formerly smoked|  885|
|        Unknown| 1544|
+---------------+-----+



In [13]:
rawData.groupBy('stroke').count().orderBy('stroke', ascending=False).show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  249|
|     0| 4861|
+------+-----+



In [14]:
rawData.select('bmi').filter(rawData['bmi']!= "N/A").toPandas().astype(float).describe()

Unnamed: 0,bmi
count,4909.0
mean,28.893237
std,7.854067
min,10.3
25%,23.5
50%,28.1
75%,33.1
max,97.6


In [15]:
rawData = rawData.filter(rawData['bmi'] != "N/A")
rawData = rawData.filter(rawData['gender'] != "Other")
rawData.count()

4908

In [16]:
rawData = rawData.drop('id')
rawData.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [17]:
Data = rawData.withColumn('ever_married', F.when(rawData['ever_married'] == "Yes", 1)\
                        .otherwise(0))
Data = Data.withColumn('gender', F.when(rawData['gender'] == 'Male', 1)\
                       .otherwise(0))
Data = Data.withColumn('Residence_type', F.when(rawData['Residence_type'] == 'Urban', 1)\
                       .otherwise(0))
Data = Data.withColumn('work_type', F.when(rawData['work_type'] == 'Never_worked', 0)\
                       .when(rawData['work_type'] == 'children', 1)\
                       .when(rawData['work_type'] == 'Private', 2)\
                       .when(rawData['work_type'] == 'Govt_job', 3)\
                       .when(rawData['work_type'] == 'Self-employed', 4))
Data = Data.withColumn('smoking_status', F.when(rawData['smoking_status'] == 'Unknown', 0)\
                       .when(rawData['smoking_status'] == 'never smoked', 1)\
                       .when(rawData['smoking_status'] == 'smokes', 2)\
                       .when(rawData['smoking_status'] == 'formerly smoked', 3))
Data.count()

4908

In [7]:
print ("So lan xuat hien cua stroke la 0: ",Data.filter(Data['stroke'] == 0).count())
print ("So lan xuat hien cua stroke la 1: ",Data.filter(Data['stroke'] == 1).count())

So lan xuat hien cua stroke la 0:  4699
So lan xuat hien cua stroke la 1:  209


In [18]:
X = Data.drop('stroke')
Y = Data.select('stroke')
stk = SMOTE(random_state=42)
X_res,y_res = stk.fit_resample(X.toPandas(),Y.toPandas())
joinDF = pd.concat([X_res, y_res], axis=1, join="inner")
balancedData = spark.createDataFrame(joinDF)
print ("So lan xuat hien cua stroke la 0: ",balancedData.filter(balancedData['stroke'] == 0).count())
print ("So lan xuat hien cua stroke la 1: ",balancedData.filter(balancedData['stroke'] == 1).count())

So lan xuat hien cua stroke la 0:  4699
So lan xuat hien cua stroke la 1:  4699


In [61]:
balancedData.printSchema()

root
 |-- gender: long (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: long (nullable = true)
 |-- heart_disease: long (nullable = true)
 |-- ever_married: long (nullable = true)
 |-- work_type: long (nullable = true)
 |-- Residence_type: long (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: long (nullable = true)
 |-- stroke: long (nullable = true)



In [51]:
balancedData.filter(balancedData['stroke'] == 1).show(300)

+------+------------------+------------+-------------+------------+---------+--------------+------------------+------------------+--------------+------+
|gender|               age|hypertension|heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|stroke|
+------+------------------+------------+-------------+------------+---------+--------------+------------------+------------------+--------------+------+
|     1|              67.0|           0|            1|           1|        2|             1|            228.69|              36.6|             3|     1|
|     1|              80.0|           0|            1|           1|        2|             0|            105.92|              32.5|             1|     1|
|     0|              49.0|           0|            0|           1|        2|             1|            171.23|              34.4|             2|     1|
|     0|              79.0|           1|            0|           1|        4|     

In [19]:
trainData, testData = balancedData.randomSplit([0.7, 0.3])
print("So dong trong train data:", trainData.count())
print("So dong trong test data:", testData.count())

So dong trong train data: 6660
So dong trong test data: 2738


In [20]:
trainData.show(5)

+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| bmi|smoking_status|stroke|
+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|     0|0.08|           0|            0|           0|        1|             1|           139.67|14.1|             0|     0|
|     0|0.64|           0|            0|           0|        1|             1|            83.82|24.9|             0|     0|
|     0|0.72|           0|            0|           0|        1|             1|            66.36|23.0|             0|     0|
|     0|1.08|           0|            0|           0|        1|             0|            60.53|17.5|             0|     0|
|     0|1.16|           0|            0|           0|        1|             1|            60.98|17.2|             0|     0|
+------+

In [21]:
testData.show(5)

+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| bmi|smoking_status|stroke|
+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|     0|0.72|           0|            0|           0|        1|             0|            62.13|16.8|             0|     0|
|     0| 1.0|           0|            0|           0|        1|             1|           199.83|24.5|             0|     0|
|     0|1.08|           0|            0|           0|        1|             1|           159.39|12.8|             0|     0|
|     0|1.16|           0|            0|           0|        1|             1|            60.98|17.2|             0|     0|
|     0|1.32|           0|            0|           0|        1|             0|            75.22|18.6|             0|     0|
+------+

In [21]:
def mean(dataset, column):
    return (dataset.groupBy().mean(column).collect())[0][0]

In [22]:
def stdev(dataset, column):
    avg = mean(dataset, column)
    lendf = len(dataset.select(column).collect())
    value = dataset.select(column).rdd.flatMap(lambda x:x).collect()
    variance = sum([(x-avg)**2 for x in value]) / float(lendf - 1)
    return math.sqrt(variance)

In [23]:
def summarize_dataset(dataset):
    summaries = [(mean(dataset,column), stdev(dataset,column), dataset.select(column).count())\
                 for column in dataset.columns]
    del(summaries[-1])
    return summaries

In [24]:
def summarize_by_class(dataset):
    summaries = dict()
    lenstroke = len(dataset.select('stroke').rdd.flatMap(lambda x:x).distinct().collect())
    for i in range(0,lenstroke):
        item = dataset.select('stroke').rdd.flatMap(lambda x:x).distinct().collect()[i]
        res = dataset.filter(dataset.stroke == item)
        summaries[item] = summarize_dataset(res)
    return summaries

In [25]:
def calculate_probability(x, mean, stdev):
    exponent = math.exp(-((x-mean)**2 / (2 * stdev**2)))
    return (1 / (math.sqrt(2 * math.pi) * stdev)) * exponent

In [26]:
def calculate_class_probability(summaries, row):
    totalRows = sum([summaries[label][0][2] for label in summaries])
    probabilities = dict()
    for class_value, class_summaries in summaries.items():
        probabilities[class_value] = summaries[class_value][0][2] / float(totalRows)
        for i in range(0, len(class_summaries)):
            mean, stdev, _ = class_summaries[i]
            probabilities[class_value] *= calculate_probability(row[i], mean, stdev)  
    return probabilities

In [27]:
def predict(summaries, row):
    probabilites = calculate_class_probability(summaries, row)
    best_label, best_prob = None, -1
    for class_value, probability in probabilites.items():
        if (best_label is None or probability > best_prob):
            best_prob = probability
            best_label = class_value
    return best_label

In [28]:
def naive_bayes(train, test):
    summerize = summarize_by_class(train)
    predictions = list()
    for row in test.collect():
        output = predict(summerize, row)
        predictions.append(output)
    return predictions

In [29]:
def accuracy_metric(actual, predict):
    correct = 0
    for i in range(len(actual)):
        if (actual[i] == predict[i]):
            correct += 1
    return correct / float(len(actual)) * 100

In [30]:
from sklearn.metrics import confusion_matrix
def evaluate_algorithm(trainData, testData, algorithm):
    predicted = algorithm(trainData,testData)
    actual = testData.select('stroke').rdd.flatMap(lambda x: x).collect()
    accuracy = accuracy_metric(actual, predicted)
    tree_cm = confusion_matrix(predicted, actual)
    return accuracy, tree_cm

In [35]:
# import time
# accuracyList = []
# timeList = []
# for i in range(10):
#     trainData, testData = balancedData.randomSplit([0.7, 0.3])
#     start_time = time.time()
#     accuracy, tree_cm = evaluate_algorithm(trainData, testData, naive_bayes)
#     end_time = time.time()
#     accuracyList.append(accuracy)
#     timeList.append(end_time - start_time)
# trainData, testData = balancedData.randomSplit([0.7, 0.3])
# accuracy, tree_cm = evaluate_algorithm(trainData, testData, naive_bayes)
# print(accuracy)
trainData, testData = balancedData.randomSplit([0.7, 0.3])
# summarize_by_class(trainData)

In [48]:
summarize_by_class(trainData)

KeyboardInterrupt: 

In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [38]:
featureAssembler = VectorAssembler(
    inputCols=["age","gender","hypertension","heart_disease","work_type","Residence_type","avg_glucose_level","ever_married","smoking_status","bmi"],
    outputCol="features")
columns_to_drop = ["age","gender","hypertension","heart_disease","work_type","Residence_type","avg_glucose_level","ever_married","smoking_status","bmi"]
dt = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="stroke")

In [39]:
assembleredTestData = featureAssembler.transform(testData)
assembleredTestData = assembleredTestData.drop(*columns_to_drop)
assembleredTrainData = featureAssembler.transform(trainData)
assembleredTrainData = assembleredTrainData.drop(*columns_to_drop)

In [56]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
labelCol="stroke", predictionCol="prediction", metricName="rmse")
modelNaiveBayes = nb.fit(assembleredTrainData)
predictions = modelNaiveBayes.transform(assembleredTestData)
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.5192175792352322


In [59]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
sameModel = DecisionTreeModel.load(sc, "/tmp/nb.model")


Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.tree.model.DecisionTreeModel.load.
: java.lang.UnsupportedOperationException: empty collection
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1437)
	at org.apache.spark.mllib.util.Loader$.loadMetadata(modelSaveLoad.scala:120)
	at org.apache.spark.mllib.tree.model.DecisionTreeModel$.load(DecisionTreeModel.scala:316)
	at org.apache.spark.mllib.tree.model.DecisionTreeModel.load(DecisionTreeModel.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [60]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
modelNaiveBayes.save(sc, "/local/nb.model")

TypeError: save() takes 2 positional arguments but 3 were given

In [43]:
modelDecisionTree = dt.fit(assembleredTrainData)
predictions = modelDecisionTree.transform(assembleredTestData)
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.4138795673819817


In [65]:
import time
accuracyMLLIBNaiveBayesList = []
accuracyBuildNaiveBayesList = []
accuracyMLIBDecisionTreeList = []
timeMLLIBNaiveBayesList = []
timeBuildNaiveBayesList = []
timeMLIBDecisionTreeList = []
for i in range(10):
    trainData, testData = balancedData.randomSplit([0.7, 0.3])
#   For mllib 
    assembleredTestData = featureAssembler.transform(testData)
    assembleredTestData = assembleredTestData.drop(*columns_to_drop)
    assembleredTrainData = featureAssembler.transform(trainData)
    assembleredTrainData = assembleredTrainData.drop(*columns_to_drop)
#   Builded Naive Bayes
    start_time = time.time()
    accuracy, tree_cm = evaluate_algorithm(trainData, testData, naive_bayes)
    end_time = time.time()
    accuracyBuildNaiveBayesList.append(accuracy)
    timeBuildNaiveBayesList.append(end_time - start_time)
#   MLLIB NaiveBayes
    start_time = time.time()
    modelNaiveBayes = nb.fit(assembleredTrainData)
    predictions = modelNaiveBayes.transform(assembleredTestData)
    end_time = time.time()
    accuracy = evaluator.evaluate(predictions)
    accuracyMLLIBNaiveBayesList.append(accuracy * 100)
    timeMLLIBNaiveBayesList.append(end_time - start_time)
#   MLLIB Decision Tree
    start_time = time.time()
    modelDecisionTree = dt.fit(assembleredTrainData)
    predictions = modelDecisionTree.transform(assembleredTestData)
    end_time = time.time()
    accuracy = evaluator.evaluate(predictions)
    accuracyMLIBDecisionTreeList.append(accuracy * 100)
    timeMLIBDecisionTreeList.append(end_time - start_time)
     